Event-Driven Integrations. Using The Webhook API To Create Custom Business Processes

This article is a based on the talk of the same name given to the 2023 Preservica User Group in Oxford.

Introduction to webhooks and APIs

Webhooks are a type of API which allow apps to stay up-to-date with real-time information. Webhooks are also called event-driven APIs and are typically used to provide other applications with real-time data.

Using webhooks, applications can send data automatically to 3rd party systems when certain events are triggered within an application. Unlike the traditional process of “polling” in which a client asks the application if anything has changed, web hooks automatically send out information to subscribed systems when certain events have happened.

Webhook

Preservica Webhooks

Webhooks are a new API introduced in Preservica v6.8. They are based on a publisher-subscriber pattern.

Preservica is the publisher and will send messages to all registered subscribers when certain events are triggered within the system.

The key difference between webhooks and traditional APIs is Who triggers the process. With traditional APIs the trigger is an event outside Preservica and with webhooks the trigger is an event inside Preservica. The result is that you do not have to continually poll Preservica to get new information.

This allows users to build custom business processes and workflows which are triggered based on events within Preservica.

Webhooks work best when you need to take some action when something new has happened within Preservica.

Preservica has provided two events as part of the first webhook release with Preservica v6.8. More events will be added in future versions.

Webhook Documentation

The webhook API is documented in the official Preservica Swagger pages and the 3rd party pyPreservica Python SDK

Swagger

The following examples below will use the Preservica SDK pyPreservica

pyPreservica

Subscribing

Before a system can receive notifications from Preservica, it must subscribe to a notification trigger.

When creating a new subscription service, you need to generate a shared secret and pass it as an argument to the subscribe method. This is then used by the subscriber to verify the messages sent by Preservica (publisher) are genuine (to prevent spoofing attacks). Its known as a shared secret because its held by both the publisher (Preservica) and the subscriber (web hook server).

Using the PyPreservica SDK to create a new subscription for Ingested events, you pass the address of a web service which can receive HTTP POST requests. This could be a web server or some API gateway service. If you use a web server it must provide a publicly accessible endpoint. If you are running a local server for testing purposes it will need to use some kind of ingress service to make it publicly accessible.

Along with the URL and the shared secret, you need to pass an argument which specifies which type of even you are interested in. The INDEXED event is triggered at the end of the ingest process after the content has been full text indexed and thumbnails are available.

To use the Preservica Webhook API requires the user making the API call to have at least the repository manager role, ROLE_SDB_MANAGER_USER

The following pyPreservica python script for creating a new subscription is

from pyPreservica import WebHooksAPI

webhook = WebHooksAPI()

subscription = webhook.subscribe("http://my-preservica-webhook.com:8080", TriggerType.INDEXED, "my secret key!")

where http://my-preservica-webhook.com:8080 is the web hook endpoint URL.

During the subscription process, Preservica will send a challenge response message to the specified endpoint URL to verify that it exists and its publicly accessible.
Preservica sends a POST request to the URL with a challengeCode query parameter. The server must respond with the expected challenge response or the subscription will fail.

The response sent back to Preservica takes the form of a simple json document which includes the original challenge code and a hexHmac256Response which is a hexadecimal encoded hmac256 of the challenge Code using the shared secret as the hmac key.


{
  "challengeCode": "challengeCode",
  "challengeResponse": "hexHmac256Response"
}

Therefore, the web hook process will need a copy of the secret key to verify requests.

Receiving Events

To receive web hook notifications pyPreservica provides a reference web server implementation which includes support for the negotiation of the challenge request handshake during the subscription request and verification of each subsequent webhook event.

To implement the web server, extend the base class WebHookHandler from the SDK and implement a single method do_WORK() this method is called every time Preservica calls the web hook. This method is therefore where any processing takes place. This method is passed a python dict object containing information about the event such as Asset references etc.


class MyWebhook(WebHookHandler):
  def do_WORK(event):
    # Do something useful
   

The handler can then be used to create a web server, the web server should be run from the same directory as a pyPreservica credential.properties file containing the shared secret which was used earlier to create the web hook subscription.

The WebHookHandler extends the standard Python BaseHTTPRequestHandler base class and will provide the handshake and message verification automatically. Only verified messages are actioned. This class is part of the pyPreservica SDK.



class WebHookHandler(BaseHTTPRequestHandler):
    """
    A sample web hook web server which provides handshake verification
    The shared secret key is passed in via the HTTPServer

    Extend the class and implement do_WORK() method
    The JSON document is passed into do_WORK()

    """

    def hmac(self, key, message):
        return hmac.new(key=bytes(key, 'latin-1'), msg=bytes(message, 'latin-1'), digestmod=hashlib.sha256).hexdigest()

    def do_POST(self):
        result = urlparse(self.path)
        q = parse_qs(result.query)
        if 'challengeCode' in q:
            code = q['challengeCode'][0]
            signature = self.hmac(self.server.secret_key, code)
            response = f'{{ "challengeCode": "{code}", "challengeResponse": "{signature}" }}'
            self.send_response(200)
            self.send_header("Content-type", "application/json")
            self.end_headers()
            self.wfile.write(bytes(response.encode('utf-8')))
            self.log_message(f"Handshake Completed. {response.encode('utf-8')}")
        else:
            verif_sig = self.headers.get("Preservica-Signature", None)
            if "chunked" in self.headers.get("Transfer-Encoding", "") and (verif_sig is not None):
                payload = ""
                while True:
                    line = self.rfile.readline().strip()
                    chunk_length = int(line, 16)
                    if chunk_length != 0:
                        chunk = self.rfile.read(chunk_length)
                        payload = payload + chunk.decode("utf-8")
                    self.rfile.readline()
                    if chunk_length == 0:
                        verify_body = f"preservica-webhook-auth{payload}"
                        signature = self.hmac(self.server.secret_key, verify_body)
                        if signature == verif_sig:
                            self.log_message("Signature Verified. Doing Work...")
                            self.log_message(payload)
                            self.send_response(200)
                            self.end_headers()
                            self.do_WORK(json.loads(payload))
                        break


The following is an example Python web server which can respond to Preservica web hooks. In this example the web hook server downloads the thumbnail image for every ingested asset.

from http.server import HTTPServer
from sys import argv
from pyPreservica import *

BIND_HOST = '0.0.0.0'
PORT = 8080


class MyWebHook(WebHookHandler):
    def do_WORK(self, event):
        for reference in list(event['events']):
            ref = reference['entityRef']
            asset = self.server.client.asset(ref)
            self.server.client.thumbnail(asset, f"{ref}.jpg")


if __name__ == '__main__':

    config = configparser.ConfigParser(interpolation=configparser.Interpolation())
    config.read('credentials.properties', encoding='utf-8')
    secret_key = config['credentials']['secret.key']

    if len(argv) > 1:
        arg = argv[1].split(':')
        BIND_HOST = arg[0]
        PORT = int(arg[1])

    print(f'Listening on http://{BIND_HOST}:{PORT}\n')

    httpd = HTTPServer((BIND_HOST, PORT), MyWebHook)
    httpd.secret_key = secret_key
    httpd.client = EntityAPI()
    httpd.serve_forever()

The web server can be then be started from the command line using.


  $ python3 server.py 0.0.0.0:8000

Use Cases

The typical use case for web hooks is when you want to take some action after new material has been ingested into Preservica. For example, you may wish to add additional descriptive metadata to an Asset from an external metadata management system. The list below provides a few examples of where a web hook may be more appropriate than a traditional polling API.

Microsoft Teams Integration

This use case shows how we can chain webhooks together to allow 3rd party systems to communicate with each other. We are going to use Microsoft Teams as the 3rd party system as Teams provides incoming webhook functionality. We can connect an outgoing Preservica webhook to an incoming teams webhook to transfer information into Teams.

UseCase

The end result is that every Asset ingested into Preservica will automatically send a notification into a Microsoft Teams channel.

We will use the python library pymsteams to help create the messages

Our web hook server now looks like:

from http.server import HTTPServer
from sys import argv
from pyPreservica import *
import pymsteams

BIND_HOST = '0.0.0.0'
PORT = 8080

class MyWebHook(WebHookHandler):
    
    def teams_message(self, asset):
        myTeamsMessage = pymsteams.connectorcard(self.server.teams_url)
        myTeamsMessage.color("red")
        # create the section
        myMessageSection = pymsteams.cardsection()
        myTeamsMessage.title("Preservica has ingested a new asset")

        # Section Title
        myMessageSection.title(asset.title)
        # Facts are key value pairs displayed in a list.
        myMessageSection.addFact("Asset Title", asset.title)
        myMessageSection.addFact("Asset Description", asset.description)

        for bitstream in self.server.client.bitstreams_for_asset(asset):
            myMessageSection.addFact("BitStream Name",  bitstream.filename)
            myMessageSection.addFact("BitStream Length", f"{bitstream.length} Bytes")

        # Section Text
        myMessageSection.text("Asset Metadata")
        # Section Images
        myMessageSection.addImage(f"https://ua.access.preservica.com/download/thumbnail/IO_{asset.reference}")
        # Add your section to the connector card object before sending
        myTeamsMessage.addSection(myMessageSection)
        myTeamsMessage.addLinkButton("View the Asset in UA", f"https://ua.access.preservica.com/uncategorized/IO_{asset.reference}")
        myTeamsMessage.summary("Test Message")
         
        myTeamsMessage.send()
    
    
    def do_WORK(self, event):
        for reference in list(event['events']):
            ref = reference['entityRef']
            asset = self.server.client.asset(ref)
            self.teams_message(asset)

if __name__ == '__main__':

    config = configparser.ConfigParser(interpolation=configparser.Interpolation())
    config.read('credentials.properties', encoding='utf-8')
    secret_key = config['credentials']['secret.key']

    if len(argv) > 1:
        arg = argv[1].split(':')
        BIND_HOST = arg[0]
        PORT = int(arg[1])

    print(f'Listening on http://{BIND_HOST}:{PORT}\n')

    httpd = HTTPServer((BIND_HOST, PORT), MyWebHook)
    httpd.secret_key = secret_key
    httpd.client = EntityAPI()
    httpd.serve_forever()

Every ingest now creates a new Teams message containing information about the Asset, for example

Teams

Event Driven Serverless Architecture

In the examples above, a web server is used to receive the web hook notifications. This can turn out to be inefficient as the web server needs to be running on a server even when no notifications are being sent.

In turns out that webhooks are especially well suited to modern serverless architecture such as Amazon Lambda and Azure Functions. Running a dedicated web server can be inefficient and unnecessary.

AWS Lambda and Azure Functions are code fragments which are triggered to run from external events such as web hooks. This is a simple and cost-effective approach to processing events.

AWS has a dedicated set of infrastructure based on the API Gateway service which allows public API endpoints to be created. These API endpoints act as the “front door” for the webhook applications managing traffic management, CORS support, authorization and access control, throttling, and monitoring etc. For the Preservica webhook use case the API gateway only needs to accept HTTP POST requests.

APIGateway

When the API gateway receives a webhook notification it will pass the message payload from Preservica to the AWS Lambda function which will carry out the appropriate action.

AWS Lambda supports multiple languages such as Java, Go, PowerShell, Node.js, C#, Python, and Ruby code, so you can build your applications in the language of your choice.

A basic AWS Lambda function for Preservica web hooks in Python would look something like

    import json
    import os
    import hmac
    import hashlib

     
    def lambda_handler(event, context):
        secret_key = os.environ.get('PRES_SECRET_KEY')
        if 'queryStringParameters' in event:
            if event['queryStringParameters'] is not None:
                if 'challengeCode' in event['queryStringParameters']:
                    message = event['queryStringParameters']['challengeCode']
                    signature = hmac.new(key=bytes(secret_key, 'latin-1'), msg=bytes(message, 'latin-1'),
                                         digestmod=hashlib.sha256).hexdigest()
                    return {
                        "statusCode": 200,
                        "headers": {
                            "Content-Type": "application/json"
                        },
                        "body": json.dumps({
                            "challengeCode": f"{message}", "challengeResponse": f"{signature}"})
                    }
        else:
            if 'Preservica-Signature' in event['headers']:
                verify_body = f"preservica-webhook-auth{event['body']}"
                signature = hmac.new(key=bytes(secret_key, 'latin-1'), msg=bytes(verify_body, 'latin-1'),
                                     digestmod=hashlib.sha256).hexdigest()
                doc = json.loads(event['body'])
                if signature == event['headers']['Preservica-Signature']:
                    for reference in list(doc['events']):
                        ref = reference['entityRef']
                        ## DO WORK HERE
                    return {
                        "statusCode": 200,
                        "headers": {
                            "Content-Type": "application/json"
                        },
                        "body": json.dumps(event['body'])
                    }

Where we are fetching the shared key from the environment variables.

The event information sent to the lambda function comprises a json document with a body field which looks like

    {   
      "subscriptionId":"9495ddbf7d4a0b9f030a3411c87f967d",
        "timestamp":"2023-10-11T10:52:42.469",
        "tenant":"BRAMA",
        "version":"6.10",
        "events":[{
            "event":"FullTextIndexed",
            "entityType":"IO",
            "entityRef":"7fdea7e0-1e72-4fb1-aadb-d7f9ee6ed8c2",
            "identifiers":[]}],
        "trigger":"FULL_TEXT_INDEXED"
    }

From this json document the entity type and entity reference of the ingested Asset can be extracted.