Event-Driven Integrations. Using The Webhook API To Create Custom Business Processes
10 Jun 2023This article is a based on the talk of the same name given to the 2023 Preservica User Group in Oxford.
Introduction to webhooks and APIs
Webhooks are a type of API which allow apps to stay up-to-date with real-time information. Webhooks are also called event-driven APIs and are typically used to provide other applications with real-time data.
Using webhooks, applications can send data automatically to 3rd party systems when certain events are triggered within an application. Unlike the traditional process of “polling” in which a client asks the application if anything has changed, web hooks automatically send out information to subscribed systems when certain events have happened.
Preservica Webhooks
Webhooks are a new API introduced in Preservica v6.8. They are based on a publisher-subscriber pattern.
Preservica is the publisher and will send messages to all registered subscribers when certain events are triggered within the system.
The key difference between webhooks and traditional APIs is Who triggers the process. With traditional APIs the trigger is an event outside Preservica and with webhooks the trigger is an event inside Preservica. The result is that you do not have to continually poll Preservica to get new information.
This allows users to build custom business processes and workflows which are triggered based on events within Preservica.
Webhooks work best when you need to take some action when something new has happened within Preservica.
Preservica has provided two events as part of the first webhook release with Preservica v6.8. More events will be added in future versions.
- Ingesting Assets
- Moving Assets
Webhook Documentation
The webhook API is documented in the official Preservica Swagger pages and the 3rd party pyPreservica Python SDK
The following examples below will use the Preservica SDK pyPreservica
Subscribing
Before a system can receive notifications from Preservica, it must subscribe to a notification trigger.
When creating a new subscription service, you need to generate a shared secret and pass it as an argument to the subscribe method. This is then used by the subscriber to verify the messages sent by Preservica (publisher) are genuine (to prevent spoofing attacks). Its known as a shared secret because its held by both the publisher (Preservica) and the subscriber (web hook server).
Using the PyPreservica SDK to create a new subscription for Ingested events, you pass the address of a web service which can receive HTTP POST requests. This could be a web server or some API gateway service. If you use a web server it must provide a publicly accessible endpoint. If you are running a local server for testing purposes it will need to use some kind of ingress service to make it publicly accessible.
Along with the URL and the shared secret, you need to pass an argument which specifies which type of even you are interested in. The INDEXED event is triggered at the end of the ingest process after the content has been full text indexed and thumbnails are available.
To use the Preservica Webhook API requires the user making the API call
to have at least the repository manager role, ROLE_SDB_MANAGER_USER
The following pyPreservica python script for creating a new subscription is
from pyPreservica import WebHooksAPI
webhook = WebHooksAPI()
subscription = webhook.subscribe("http://my-preservica-webhook.com:8080", TriggerType.INDEXED, "my secret key!")
where http://my-preservica-webhook.com:8080 is the web hook endpoint URL.
During the subscription process, Preservica will send a challenge response message to the specified
endpoint URL to verify that it exists and its publicly accessible.
Preservica sends a POST request to the URL with a challengeCode query parameter.
The server must respond with the expected challenge response or the subscription will fail.
The response sent back to Preservica takes the form of a simple json document which includes the original challenge code and a hexHmac256Response which is a hexadecimal encoded hmac256 of the challenge Code using the shared secret as the hmac key.
{
"challengeCode": "challengeCode",
"challengeResponse": "hexHmac256Response"
}
Therefore, the web hook process will need a copy of the secret key to verify requests.
Receiving Events
To receive web hook notifications pyPreservica provides a reference web server implementation which includes support for the negotiation of the challenge request handshake during the subscription request and verification of each subsequent webhook event.
To implement the web server, extend the base class WebHookHandler from the SDK and implement a single method do_WORK() this method is called every time Preservica calls the web hook. This method is therefore where any processing takes place. This method is passed a python dict object containing information about the event such as Asset references etc.
class MyWebhook(WebHookHandler):
def do_WORK(event):
# Do something useful
The handler can then be used to create a web server, the web server should be run from the same directory as a pyPreservica credential.properties file containing the shared secret which was used earlier to create the web hook subscription.
The WebHookHandler extends the standard Python BaseHTTPRequestHandler base class and will provide the handshake and message verification automatically. Only verified messages are actioned. This class is part of the pyPreservica SDK.
class WebHookHandler(BaseHTTPRequestHandler):
"""
A sample web hook web server which provides handshake verification
The shared secret key is passed in via the HTTPServer
Extend the class and implement do_WORK() method
The JSON document is passed into do_WORK()
"""
def hmac(self, key, message):
return hmac.new(key=bytes(key, 'latin-1'), msg=bytes(message, 'latin-1'), digestmod=hashlib.sha256).hexdigest()
def do_POST(self):
result = urlparse(self.path)
q = parse_qs(result.query)
if 'challengeCode' in q:
code = q['challengeCode'][0]
signature = self.hmac(self.server.secret_key, code)
response = f'{{ "challengeCode": "{code}", "challengeResponse": "{signature}" }}'
self.send_response(200)
self.send_header("Content-type", "application/json")
self.end_headers()
self.wfile.write(bytes(response.encode('utf-8')))
self.log_message(f"Handshake Completed. {response.encode('utf-8')}")
else:
verif_sig = self.headers.get("Preservica-Signature", None)
if "chunked" in self.headers.get("Transfer-Encoding", "") and (verif_sig is not None):
payload = ""
while True:
line = self.rfile.readline().strip()
chunk_length = int(line, 16)
if chunk_length != 0:
chunk = self.rfile.read(chunk_length)
payload = payload + chunk.decode("utf-8")
self.rfile.readline()
if chunk_length == 0:
verify_body = f"preservica-webhook-auth{payload}"
signature = self.hmac(self.server.secret_key, verify_body)
if signature == verif_sig:
self.log_message("Signature Verified. Doing Work...")
self.log_message(payload)
self.send_response(200)
self.end_headers()
self.do_WORK(json.loads(payload))
break
The following is an example Python web server which can respond to Preservica web hooks. In this example the web hook server downloads the thumbnail image for every ingested asset.
from http.server import HTTPServer
from sys import argv
from pyPreservica import *
BIND_HOST = '0.0.0.0'
PORT = 8080
class MyWebHook(WebHookHandler):
def do_WORK(self, event):
for reference in list(event['events']):
ref = reference['entityRef']
asset = self.server.client.asset(ref)
self.server.client.thumbnail(asset, f"{ref}.jpg")
if __name__ == '__main__':
config = configparser.ConfigParser(interpolation=configparser.Interpolation())
config.read('credentials.properties', encoding='utf-8')
secret_key = config['credentials']['secret.key']
if len(argv) > 1:
arg = argv[1].split(':')
BIND_HOST = arg[0]
PORT = int(arg[1])
print(f'Listening on http://{BIND_HOST}:{PORT}\n')
httpd = HTTPServer((BIND_HOST, PORT), MyWebHook)
httpd.secret_key = secret_key
httpd.client = EntityAPI()
httpd.serve_forever()
The web server can be then be started from the command line using.
$ python3 server.py 0.0.0.0:8000
Use Cases
The typical use case for web hooks is when you want to take some action after new material has been ingested into Preservica. For example, you may wish to add additional descriptive metadata to an Asset from an external metadata management system. The list below provides a few examples of where a web hook may be more appropriate than a traditional polling API.
- Catalogue lookup after ingest
- Transcribe video or audio after ingesting AV material
- Send an automatically created access version of an Asset to a discovery platform
Microsoft Teams Integration
This use case shows how we can chain webhooks together to allow 3rd party systems to communicate with each other. We are going to use Microsoft Teams as the 3rd party system as Teams provides incoming webhook functionality. We can connect an outgoing Preservica webhook to an incoming teams webhook to transfer information into Teams.
The end result is that every Asset ingested into Preservica will automatically send a notification into a Microsoft Teams channel.
We will use the python library pymsteams to help create the messages
Our web hook server now looks like:
from http.server import HTTPServer
from sys import argv
from pyPreservica import *
import pymsteams
BIND_HOST = '0.0.0.0'
PORT = 8080
class MyWebHook(WebHookHandler):
def teams_message(self, asset):
myTeamsMessage = pymsteams.connectorcard(self.server.teams_url)
myTeamsMessage.color("red")
# create the section
myMessageSection = pymsteams.cardsection()
myTeamsMessage.title("Preservica has ingested a new asset")
# Section Title
myMessageSection.title(asset.title)
# Facts are key value pairs displayed in a list.
myMessageSection.addFact("Asset Title", asset.title)
myMessageSection.addFact("Asset Description", asset.description)
for bitstream in self.server.client.bitstreams_for_asset(asset):
myMessageSection.addFact("BitStream Name", bitstream.filename)
myMessageSection.addFact("BitStream Length", f"{bitstream.length} Bytes")
# Section Text
myMessageSection.text("Asset Metadata")
# Section Images
myMessageSection.addImage(f"https://ua.access.preservica.com/download/thumbnail/IO_{asset.reference}")
# Add your section to the connector card object before sending
myTeamsMessage.addSection(myMessageSection)
myTeamsMessage.addLinkButton("View the Asset in UA", f"https://ua.access.preservica.com/uncategorized/IO_{asset.reference}")
myTeamsMessage.summary("Test Message")
myTeamsMessage.send()
def do_WORK(self, event):
for reference in list(event['events']):
ref = reference['entityRef']
asset = self.server.client.asset(ref)
self.teams_message(asset)
if __name__ == '__main__':
config = configparser.ConfigParser(interpolation=configparser.Interpolation())
config.read('credentials.properties', encoding='utf-8')
secret_key = config['credentials']['secret.key']
if len(argv) > 1:
arg = argv[1].split(':')
BIND_HOST = arg[0]
PORT = int(arg[1])
print(f'Listening on http://{BIND_HOST}:{PORT}\n')
httpd = HTTPServer((BIND_HOST, PORT), MyWebHook)
httpd.secret_key = secret_key
httpd.client = EntityAPI()
httpd.serve_forever()
Every ingest now creates a new Teams message containing information about the Asset, for example
Event Driven Serverless Architecture
In the examples above, a web server is used to receive the web hook notifications. This can turn out to be inefficient as the web server needs to be running on a server even when no notifications are being sent.
In turns out that webhooks are especially well suited to modern serverless architecture such as Amazon Lambda and Azure Functions. Running a dedicated web server can be inefficient and unnecessary.
AWS Lambda and Azure Functions are code fragments which are triggered to run from external events such as web hooks. This is a simple and cost-effective approach to processing events.
- No application server provisioning and maintenance required
- Web server infrastructure is provided for you through the API Gateway
- Only pay for the milliseconds used
- Highly Scalable
- Secure
AWS has a dedicated set of infrastructure based on the API Gateway service which allows public API endpoints to be created. These API endpoints act as the “front door” for the webhook applications managing traffic management, CORS support, authorization and access control, throttling, and monitoring etc. For the Preservica webhook use case the API gateway only needs to accept HTTP POST requests.
When the API gateway receives a webhook notification it will pass the message payload from Preservica to the AWS Lambda function which will carry out the appropriate action.
AWS Lambda supports multiple languages such as Java, Go, PowerShell, Node.js, C#, Python, and Ruby code, so you can build your applications in the language of your choice.
A basic AWS Lambda function for Preservica web hooks in Python would look something like
import json
import os
import hmac
import hashlib
def lambda_handler(event, context):
secret_key = os.environ.get('PRES_SECRET_KEY')
if 'queryStringParameters' in event:
if event['queryStringParameters'] is not None:
if 'challengeCode' in event['queryStringParameters']:
message = event['queryStringParameters']['challengeCode']
signature = hmac.new(key=bytes(secret_key, 'latin-1'), msg=bytes(message, 'latin-1'),
digestmod=hashlib.sha256).hexdigest()
return {
"statusCode": 200,
"headers": {
"Content-Type": "application/json"
},
"body": json.dumps({
"challengeCode": f"{message}", "challengeResponse": f"{signature}"})
}
else:
if 'Preservica-Signature' in event['headers']:
verify_body = f"preservica-webhook-auth{event['body']}"
signature = hmac.new(key=bytes(secret_key, 'latin-1'), msg=bytes(verify_body, 'latin-1'),
digestmod=hashlib.sha256).hexdigest()
doc = json.loads(event['body'])
if signature == event['headers']['Preservica-Signature']:
for reference in list(doc['events']):
ref = reference['entityRef']
## DO WORK HERE
return {
"statusCode": 200,
"headers": {
"Content-Type": "application/json"
},
"body": json.dumps(event['body'])
}
Where we are fetching the shared key from the environment variables.
The event information sent to the lambda function comprises a json document with a body field which looks like
{
"subscriptionId":"9495ddbf7d4a0b9f030a3411c87f967d",
"timestamp":"2023-10-11T10:52:42.469",
"tenant":"BRAMA",
"version":"6.10",
"events":[{
"event":"FullTextIndexed",
"entityType":"IO",
"entityRef":"7fdea7e0-1e72-4fb1-aadb-d7f9ee6ed8c2",
"identifiers":[]}],
"trigger":"FULL_TEXT_INDEXED"
}
From this json document the entity type and entity reference of the ingested Asset can be extracted.