Creating Camunda Airbyte Connector

Prerequisites

Today we will create Airbyte Connector to pull Camunda History events from Camunda rest-engine API.

Camunda is an open-source platform for workflow and decision automation. It provides tools for modeling and executing business processes, orchestrating workflows, and managing decisions.

Camunda uses it's History API to expose details of the events which happened in the past. This will include all automated and manual tasks, decisions, time of their start and finish and a lot of other useful details describing the events.

There are several resources we will need to get going.

Camunda-based Java Application.

We will use simple spring-boot application simulating workflow of a takeaway restaraunt

git clone git@github.com:metaops-solutions/camunda-diy.git
cd camunda-diy
docker compose up

It will take few minutes to start Postgres Database, Java App and simple shell script which will generate some workload. You can watch how it works by accessing Camunda cockpit webapp with username/password set as demo/demo

http://localhost:8090/

Airbyte

Run it locally is the simplest way

git clone git@github.com:airbytehq/airbyte.git
cd airbyte
./run-ab-platform.sh

When it's started you will be able to access UI with User: airbyte and password: password

http://localhost:8000

Warehouse DB(Postgres)

As destination we will use locally running Postgres. Simply start it as following

docker run --name postgreswarehouse -p 5432:5432 -e POSTGRES_PASSWORD=mysecretpassword -d postgres

To get it ready for airbyte create the following database:

psql -U postgres -h localhost -p 5432 
password: mysecretpassword

create database warehouse;
CREATE ROLE airbyte WITH LOGIN PASSWORD 'password';
GRANT ALL PRIVILEGES on DATABASE warehouse to airbyte;
\c warehouse
GRANT ALL ON SCHEMA public TO airbyte;

Camunda API Documentation

Good idea to have details of the Camunda API in front of you,

https://docs.camunda.org/manual/7.18/reference/rest/history/activity-instance/

In this example we will only extract Activity Instances events by using "Get Historic Activity Instances" REST call


Airbyte Connector Builder

Creating New Connector

From the Builder TAB in Airbyte UI select option "Start from scratch".

Set name of the Connector as "Camunda" and API Base URL as "http://host.docker.internal:8090/engine-rest"*

* since all components of this demo running in docker we will use internal hostname "host.docker.internal" for any communication between containers

By default Camunda doesn't require Authentication so we can ignore those settings.

Next step is to configure streams. In Airbyte Stream is the atomic unit for reading data from a Source. In the connector there has to be at least one stream.

Add new stream based on Camunda API Documentation

Name: "activity-instance"
URL path: "/history/activity-instance"

There are several configuration blocks in the stream.

To reduce the load on a source system and to avoid potential out of memory problem we will set a limit to a number of records returning from API by adding Query Parameter

maxResults: 100

We also need to enable pagination in case if there are more then 100 events matching our request.

We do it by enabling Pagination and Injecting offset into outgoing http request by adding new query parameter

Parameter Name: firstResult

Airbyte will handle the rest. Iterate index of this query parameter until the number of events returned by API is less then 100.

Now we are ready to run our first test, by pressing test in the top right corner of the builder UI.

Few things happen. Airbyte runs configured query agains your test API, returns the payload and tries to automatically detect the schema of the payload.

!! important. Airbyte does a good job of guessing the schema, hoverer if some fields are not returned by the API Airbyte will not include them. If later you DataSource will start returning those field Airbyte will ignore them, until you explicitly enable them.

For the best results you can start with schema discovered automatically, but later add all custom field returned by the API as per documentation.

Incremental Sync

Airbyte implements Incremental Sync feature to avoid fetching the same data multiple times.

To enable Incremental Sync for our connector we need to define "Cursor Field". Airbyte does a good guess what field can be used as a cursor. Usually it will be a timestamp field.

For Camunda events "startTime" field of the Payload is a good candidate, which will make sure we extract the event as soon as Activity started.

Airbyte does a good job at detecting "Cursor Datetime Formats", so suggested option usually works fine.

Cursor Field: startTime
Cursor Datetime Formats: %Y-%m-%dT%H:%M:%S.%f%z

API Time Filtering Capabilities can be set to "Start" indicating that we single parameter and not a range.

API Time Filtering Capabilities: Start

To make sure that our cursor field being used to query the data we need to inject it by setting "Inject Start Time into outgoing HTTP Request" and adding new Query Parameter

Parameter Name: startedAfter

as per Camunda API Documentation

!!! Unlike Python Libraries used by Airbyte which parse Time in microseconds as a 6 digit zero-padded number, Camunda only uses 3 digits for microseconds field. To avoid issues with Camunda API when querying the Data we need to set an "Outgoing Datetime Format"

Outgoing Datetime Format:  %Y-%m-%dT%H:%M:%S.000%z

we simply use "000" for microseconds instead of %f which will produce 000000 instead. This is minor issue and won't have an impact since airbyte does automatic deduplication of the events. In order to do it we need to go to the top configuration block and define "Primary Key" as "id" which is unique for every Camunda event.

Primary Key: id

This is all we need for a basic Camunda Connector to work. If we run Test again we will be prompted to populate the "Start date" Value to set a starting point for ingesting the events.

After the first run all subsequent runs of connectors will increment this value automatically by using "startTime" field of the very last event in the payload.

You can publish the connector to you workspace and create new Connection to test it.


Testing the Connector

Create Source

When creating a source we can search for "Camunda" type connector. To setup the source we only need to provide the initial "Start date".

Create Destination

Creating Destination of a type postgres we can populate the following values based on our configuration of the Postgres Database:

Host: host.docker.internal
Port: 5432
DB Name: warehouse
User: airbyte
Password: password

Now we can connect both together making sure:

  • at least one stream is enabled

  • Sync mode set to Append + Deduped

Running sync

After the first sync is completed we can see the number of record fetched. The subsequent runs will show much smaller number since we will only be extracting delta from the last successful sync.

You can observe this by running "select count(*)" statement at the destination Postgres Database and running sync jobs.

Advanced Configuration

Supposedly something went wrong and you want to fetch some data again, at the same time you don't want to do a full re-sync.

You can do it by changing the cursor value in Airbyte by going into Connection Settings - Advanced -> Connection State and edit cursor value for the stream you want to change.


Summary

We have implemented connector for Camunda extracting activity instances events from Camunda History REST API.

In order to make this connector production ready we need to address the problem of ingesting events which ended after the sync run, merge and de-duplicate datasets and normalise the Data using DBT so it can be used for reporting and analytics in the Postgres Database.

We did this work and it's publicly available in GitHub https://github.com/metaops-solutions/airbyte-camunda

Beware: Neglecting the importance of correct timestamp format in incremental sync can lead to data chaos, system errors, and operational disruptions!

Beware: Neglecting the importance of correct timestamp format in incremental sync can lead to data chaos, system errors, and operational disruptions!

Beware: Neglecting the importance of correct timestamp format in incremental sync can lead to data chaos, system errors, and operational disruptions!

Andrey Kozichev

Subscribe for the latest blogs and news updates!

Related Posts

camunda

Mar 28, 2023

Companies that have bought into the light-touch workflow approach are inevitably going to be in trouble. There is a ticking technical debt clock that sits in your code that will need to be mitigated.

camunda

Oct 13, 2023

Camunda’s expertise in process automation offers more than meets the eye – it holds valuable insights within orchestrated workflows, waiting to be discovered.

© MetaOps 2024

© MetaOps 2024

© MetaOps 2024