Data Engineering in NHS: processing HL7 with Airbyte and dbt

HL7 in Healthcare and Data Analytics

Health Level Seven (HL7) messages are widely used in the healthcare industry for exchanging information between different systems. They facilitate interoperability between various healthcare information systems, such as Electronic Health Records (EHRs), laboratory systems(LIS), radiology information systems(RIS), medical devices and more.

HL7 has become a de facto standard for system-to-system communications within the NHS.

The use of HL7 messages in data analytics offers a wealth of possibilities in the healthcare domain. It enables in-depth analysis of clinical information, patient demographics, diagnoses, medications, and treatment plans. By aggregating data extracted from HL7 messages, healthcare professionals can conduct trend analysis, identify patterns, and derive valuable insights.

Moreover, HL7 often contains essential information for regulatory reporting and compliance, contributing to key performance indicators (KPIs) in healthcare analytics and ensuring data quality assurance. The structured nature of these messages enhances the efficiency and accuracy of data analytics processes, making it a cornerstone for informed decision-making in the healthcare industry.


Ingesting HL7 into a Data Warehouse

In order to process HL7 message first we need to load it into a Data Warehouse. We will use Airbyte to ingest HL7 message from an Azure Storage Account into Postgres.

HL7 v2.x messages use a non-XML encoding syntax based on segments(lines) and one-character delimiters. Each segment contains fields separated by delimiters. Each field can be split into multiple sub-fields separated by another delimiter, and so on creating complex nested structures.

Example:

MSH|^~\&|MegaReg|XYZHospC|SuperOE|XYZImgCtr|20060529090131-0500||ADT^A01^ADT_A01|01052901|P|2.5
EVN||200605290901||||
PID|||56782445^^^UAReg^PI||KLEINSAMPLE^BARRY^Q^JR||19620910|M||2028-9^^HL70005^RA99113^^XYZ|260 GOODWIN CREST DRIVE^^BIRMINGHAM^AL^35209^^M~NICKELL’S PICKLES^10000 W 100TH AVE^BIRMINGHAM^AL^35200^^O|||||||0105I30001^^^99DEF^AN
PV1||I|W^389^1^UABH^^^^3||||12345^MORGAN^REX^J^^^MD^0010^UAMC^L||67890^GRAINGER^LUCY^X^^^MD^0010^UAMC^L|MED|||||A0||13579^POTTER^SHERMAN^T^^^MD^0010^UAMC^L|||||||||||||||||||||||||||200605290900
OBX|1|NM|^Body Height||1.80|m^Meter^ISO+|||||F
OBX|2|NM|^Body Weight||79|kg^Kilogram^ISO+|||||F
AL1|1||^ASPIRIN
DG1|1||786.50^CHEST PAIN,

There are few connectors in Airbyte we can use for ingestion of such messages:

  • "File" source connector for a single file ingestion

  • One of the platform-specific storage source connectors, i.e. S3, Azure Storage Account or Google Cloud Storage(GCS) for bulk operations

Configure a Source Connector to pull data from Azure Storage Account using "File Source" type.

Source Name:       azure-blob-hl7-example
Dataset Name:      sample.hl7
File Format:       csv
Storage Provider:  AzBlob: Azure Blob Storage
Storage Account:   hl7examples
SAS Token:         _provide_your_token
Shared Key:        _or_provide_key
URL:               extract/sample.hl7
Reader Options:    { "sep" : "None", "names" : ["col0"

Important! “Reader Option” configuration. By Setting separator to “None” we telling Airbyte not to parse the line into fields and ingest it as a single string. Providing one-element array “names”, we configure a single column in destination dataset for our strings called “col0”.

Configure the Destination as a Postgres Database and setting up a connection. We can then sync the messages into Postgres.

After the initial sync completed, we can see the results in the Postgres Database:

warehouse=# \d
                 List of relations
 Schema |          Name           | Type  |  Owner  
--------+-------------------------+-------+---------
 public | _airbyte_raw_sample_hl7 | table | airbyte
 public | sample_hl7              | table | airbyte
(2 rows)

warehouse=# \d sample_hl7 
                               Table "public.sample_hl7"
           Column           |           Type           | Collation | Nullable | Default 
----------------------------+--------------------------+-----------+----------+---------
 col0                       | text                     |           |          | 
 _airbyte_ab_id             | character varying        |           |          | 
 _airbyte_emitted_at        | timestamp with time zone |           |          | 
 _airbyte_normalized_at     | timestamp with time zone |           |          | 
 _airbyte_sample_hl7_hashid | text                     |           |          | 
Indexes:
    "fd76249025c82ded0a90b8d5942fed7a" btree (_airbyte_emitted_at)

With data taking 1 line per row

warehouse=# select col0 from sample_hl7;
                                                                                                                col0                                                                                                                
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 MSH|^~\&|MegaReg|XYZHospC|SuperOE|XYZImgCtr|20060529090131-0500||ADT^A01^ADT_A01|01052901|P|2.5
 EVN||200605290901||||
 PID|||56782445^^^UAReg^PI||KLEINSAMPLE^BARRY^Q^JR||19620910|M||2028-9^^HL70005^RA99113^^XYZ|260 GOODWIN CREST DRIVE^^BIRMINGHAM^AL^35209^^M~NICKELL’S PICKLES^10000 W 100TH AVE^BIRMINGHAM^AL^35200^^O|||||||0105I30001^^^99DEF^AN
 PV1||I|W^389^1^UABH^^^^3||||12345^MORGAN^REX^J^^^MD^0010^UAMC^L||67890^GRAINGER^LUCY^X^^^MD^0010^UAMC^L|MED|||||A0||13579^POTTER^SHERMAN^T^^^MD^0010^UAMC^L|||||||||||||||||||||||||||200605290900
 OBX|1|NM|^Body Height||1.80|m^Meter^ISO+|||||F
 OBX|2|NM|^Body Weight||79|kg^Kilogram^ISO+|||||F
 AL1|1||^ASPIRIN
 DG1|1||786.50^CHEST PAIN, UNSPECIFIED^I9|||A
(8 rows)

Time: 8.136


Processing HL7 message with dbt

We use dbt to parse HL7 message in the Data Warehouse.

Every HL7 Message has a header in a segment code “MSH”. The Data in this first line will define how to interpret the rest of the message.

Lets have a closer look at this segment and see how can we process it.

MSH|^~\&|MegaReg|XYZHospC|SuperOE|XYZImgCtr|20060529090131-0500||ADT^A01^ADT_A01|01052901|P|2.5

‘|’ - recommended as a field delimiter in a segment but technically it can be any single character.

We can parse this header with the following dbt code:

Example: hl7_parsed.sql

SELECT
  SPLIT_PART(col0, '|', 1) AS col1,
  SPLIT_PART(col0, '|', 2) AS col2,
  SPLIT_PART(col0, '|', 3) AS col3,
  SPLIT_PART(col0, '|', 4) AS col4,
  SPLIT_PART(col0, '|', 5) AS col5,
  SPLIT_PART(col0, '|', 6) AS col6,
  SPLIT_PART(col0, '|', 7) AS col7,
  SPLIT_PART(col0, '|', 8) AS col8,
  SPLIT_PART(col0, '|', 9) AS col9,
  SPLIT_PART(col0, '|', 10) AS col10,
  SPLIT_PART(col0, '|', 11) AS col11,
  SPLIT_PART(col0, '|', 12) AS col12,
  SPLIT_PART(col0, '|', 13) AS col13,
  SPLIT_PART(col0, '|', 14) AS col14,
  SPLIT_PART(col0, '|', 15) AS col15,
  SPLIT_PART(col0, '|', 16) AS col16,
  SPLIT_PART(col0, '|', 17) AS col17,
  SPLIT_PART(col0, '|', 18) AS col18,
  SPLIT_PART(col0, '|', 19) AS col19,
  SPLIT_PART(col0, '|', 20) AS col20,
  SPLIT_PART(col0, '|', 21) AS col21,
  SPLIT_PART(col0, '|', 22) AS col22,
  SPLIT_PART(col0, '|', 23) AS col23,
  SPLIT_PART(col0, '|', 24) AS col24,
  SPLIT_PART(col0, '|', 25) AS col25,
  SPLIT_PART(col0, '|', 26) AS col26,
  SPLIT_PART(col0, '|', 27) AS col27,
  SPLIT_PART(col0, '|', 28) AS col28,
  SPLIT_PART(col0, '|', 29) AS col29,
  SPLIT_PART(col0, '|', 30) AS col30,
  SPLIT_PART(col0, '|', 31) AS col31,
  SPLIT_PART(col0, '|', 32) AS col32,
  SPLIT_PART(col0, '|', 33) AS col33,
  SPLIT_PART(col0, '|', 34) AS col34,
  SPLIT_PART(col0, '|', 35) AS col35,
  SPLIT_PART(col0, '|', 36) AS col36
FROM {{ source('unparsed_hl7', 'sample_hl7') }}

This model will form a basis for parsing of all HL7 segments.

The second dbt model to parse MSH segments:

Example: hl7_segment_msh.sql

SELECT
  col1 as SEGMENT_ID,
  col2 as ENCODING_CHARACTERS,
  col3 as SENDING_APPLICATION,
  col4 as SENDING_FACILITY,
  col5 as RECEIVING_APPLICATION,
  col6 as RECEIVING_FACILITY,
  col7 as DATE_TIME_OF_MESSAGE,
  col8 as SECURITY_TYPE,
  col9 as MESSAGE_TYPE,
  col10 as MESSAGE_CONTROL_ID,
  col11 as PROCESSING_ID,
  col12 as VERSION_ID
FROM {{ ref('hl7_parsed') }} as hl7_parsed
WHERE col1 = 'MSH'

and finally, the dbt model for MSH Message type

Example: hl7_msh_mesg_type.sql

SELECT 
  SPLIT_PART(MESSAGE_TYPE,'^',1) as MESSAGE_CODE,
  SPLIT_PART(MESSAGE_TYPE,'^',2) as TRIGGER_EVENT,
  SPLIT_PART(MESSAGE_TYPE,'^',2) as MESSAGE_STRUCTURE
FROM {{ ref('hl7_segment_msh') }}

After running all 3 models, we get the following results in the Database:

warehouse=# \d
                 List of relations
 Schema |          Name           | Type  |  Owner  
--------+-------------------------+-------+---------
 public | _airbyte_raw_sample_hl7 | table | airbyte
 public | hl7_msh_mesg_type       | view  | airbyte
 public | hl7_parsed              | view  | airbyte
 public | hl7_segment_msh         | view  | airbyte
 public | sample_hl7              | table | airbyte
(5 rows)

warehouse=# select * from hl7_segment_msh ;
 segment_id | encoding_characters | sending_application | sending_facility | receiving_application | receiving_facility | date_time_of_message | security_type |  message_type   | message_control_id | processing_id | version_id 
------------+---------------------+---------------------+------------------+-----------------------+--------------------+----------------------+---------------+-----------------+--------------------+---------------+------------
 MSH        | ^~\&                | MegaReg             | XYZHospC         | SuperOE               | XYZImgCtr          | 20060529090131-0500  |               | ADT^A01^ADT_A01 | 01052901           | P             | 2.5
(1 row)

Time: 22.909 ms
warehouse=# select * from hl7_msh_mesg_type;
 message_code | trigger_event | message_structure 
--------------+---------------+-------------------
 ADT          | A01           | A01
(1 row)

Time: 13.506

Gradually iterating over this approach, we can easily extend this dbt model to support a greater number of segment types and incorporate bulk ingestion with Airbyte by using vendor-specific source connectors.

The same approach with minor differences can be applied for ingesting into Snowflake, Databricks or BigQuery.


References

https://github.com/metaops-solutions/hl7-airbyte-dbt

https://www.hl7.org/

https://www.hl7.org.uk/


Leveraging HL7 for NHS data analytics allows for in-depth analysis of clinical information, encompassing patient demographics, diagnoses, medications, and treatment plans. By aggregating data extracted from HL7 messages, healthcare professionals can conduct trend analysis, identify patterns, and derive valuable insights.

Leveraging HL7 for NHS data analytics allows for in-depth analysis of clinical information, encompassing patient demographics, diagnoses, medications, and treatment plans. By aggregating data extracted from HL7 messages, healthcare professionals can conduct trend analysis, identify patterns, and derive valuable insights.

Leveraging HL7 for NHS data analytics allows for in-depth analysis of clinical information, encompassing patient demographics, diagnoses, medications, and treatment plans. By aggregating data extracted from HL7 messages, healthcare professionals can conduct trend analysis, identify patterns, and derive valuable insights.

Andrey Kozichev

Subscribe for the latest blogs and news updates!

Related Posts

airbyte

Apr 17, 2024

Developing and managing pre-configured Airbyte connectors with Terraform removes the dependency on DevOps.

camunda

Mar 14, 2024

Beware: Neglecting the importance of correct timestamp format in incremental sync can lead to data chaos, system errors, and operational disruptions!

© MetaOps 2024

© MetaOps 2024

© MetaOps 2024