camunda
Andrey Kozichev
Creating Camunda Airbyte Connector
Prerequisites
Today we will create Airbyte Connector to pull Camunda History events from Camunda rest-engine API.
Camunda is an open-source platform for workflow and decision automation. It provides tools for modeling and executing business processes, orchestrating workflows, and managing decisions.
Camunda uses it's History API to expose details of the events which happened in the past. This will include all automated and manual tasks, decisions, time of their start and finish and a lot of other useful details describing the events.
There are several resources we will need to get going.
Camunda-based Java Application.
We will use simple spring-boot application simulating workflow of a takeaway restaraunt
It will take few minutes to start Postgres Database, Java App and simple shell script which will generate some workload. You can watch how it works by accessing Camunda cockpit webapp with username/password set as demo/demo
http://localhost:8090/
Airbyte
Run it locally is the simplest way
When it's started you will be able to access UI with User: airbyte and password: password
http://localhost:8000
Warehouse DB(Postgres)
As destination we will use locally running Postgres. Simply start it as following
To get it ready for airbyte create the following database:
Camunda API Documentation
Good idea to have details of the Camunda API in front of you,
https://docs.camunda.org/manual/7.18/reference/rest/history/activity-instance/
In this example we will only extract Activity Instances events by using "Get Historic Activity Instances" REST call
Airbyte Connector Builder
Creating New Connector
From the Builder TAB in Airbyte UI select option "Start from scratch".
Set name of the Connector as "Camunda" and API Base URL as "http://host.docker.internal:8090/engine-rest"*
* since all components of this demo running in docker we will use internal hostname "host.docker.internal" for any communication between containers
By default Camunda doesn't require Authentication so we can ignore those settings.
Next step is to configure streams. In Airbyte Stream is the atomic unit for reading data from a Source. In the connector there has to be at least one stream.
Add new stream based on Camunda API Documentation
There are several configuration blocks in the stream.
To reduce the load on a source system and to avoid potential out of memory problem we will set a limit to a number of records returning from API by adding Query Parameter
We also need to enable pagination in case if there are more then 100 events matching our request.
We do it by enabling Pagination and Injecting offset into outgoing http request by adding new query parameter
Airbyte will handle the rest. Iterate index of this query parameter until the number of events returned by API is less then 100.
Now we are ready to run our first test, by pressing test in the top right corner of the builder UI.
Few things happen. Airbyte runs configured query agains your test API, returns the payload and tries to automatically detect the schema of the payload.
!! important. Airbyte does a good job of guessing the schema, hoverer if some fields are not returned by the API Airbyte will not include them. If later you DataSource will start returning those field Airbyte will ignore them, until you explicitly enable them.
For the best results you can start with schema discovered automatically, but later add all custom field returned by the API as per documentation.
Incremental Sync
Airbyte implements Incremental Sync feature to avoid fetching the same data multiple times.
To enable Incremental Sync for our connector we need to define "Cursor Field". Airbyte does a good guess what field can be used as a cursor. Usually it will be a timestamp field.
For Camunda events "startTime" field of the Payload is a good candidate, which will make sure we extract the event as soon as Activity started.
Airbyte does a good job at detecting "Cursor Datetime Formats", so suggested option usually works fine.
API Time Filtering Capabilities can be set to "Start" indicating that we single parameter and not a range.
To make sure that our cursor field being used to query the data we need to inject it by setting "Inject Start Time into outgoing HTTP Request" and adding new Query Parameter
as per Camunda API Documentation
!!! Unlike Python Libraries used by Airbyte which parse Time in microseconds as a 6 digit zero-padded number, Camunda only uses 3 digits for microseconds field. To avoid issues with Camunda API when querying the Data we need to set an "Outgoing Datetime Format"
we simply use "000" for microseconds instead of %f which will produce 000000 instead. This is minor issue and won't have an impact since airbyte does automatic deduplication of the events. In order to do it we need to go to the top configuration block and define "Primary Key" as "id" which is unique for every Camunda event.
This is all we need for a basic Camunda Connector to work. If we run Test again we will be prompted to populate the "Start date" Value to set a starting point for ingesting the events.
After the first run all subsequent runs of connectors will increment this value automatically by using "startTime" field of the very last event in the payload.
You can publish the connector to you workspace and create new Connection to test it.
Testing the Connector
Create Source
When creating a source we can search for "Camunda" type connector. To setup the source we only need to provide the initial "Start date".
Create Destination
Creating Destination of a type postgres we can populate the following values based on our configuration of the Postgres Database:
Now we can connect both together making sure:
at least one stream is enabled
Sync mode set to Append + Deduped
Running sync
After the first sync is completed we can see the number of record fetched. The subsequent runs will show much smaller number since we will only be extracting delta from the last successful sync.
You can observe this by running "select count(*)" statement at the destination Postgres Database and running sync jobs.
Advanced Configuration
Supposedly something went wrong and you want to fetch some data again, at the same time you don't want to do a full re-sync.
You can do it by changing the cursor value in Airbyte by going into Connection Settings - Advanced -> Connection State and edit cursor value for the stream you want to change.
Summary
We have implemented connector for Camunda extracting activity instances events from Camunda History REST API.
In order to make this connector production ready we need to address the problem of ingesting events which ended after the sync run, merge and de-duplicate datasets and normalise the Data using DBT so it can be used for reporting and analytics in the Postgres Database.
We did this work and it's publicly available in GitHub https://github.com/metaops-solutions/airbyte-camunda
Related Posts
metaops
May 22, 2024
If you've ever thought that Business Process Management conferences were all about mundane presentations and tedious discussions, think again.
camunda
Mar 28, 2023
Companies that have bought into the light-touch workflow approach are inevitably going to be in trouble. There is a ticking technical debt clock that sits in your code that will need to be mitigated.