BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Articles Increasing the Quality of Patient Care through Stream Processing

Increasing the Quality of Patient Care through Stream Processing

Leia em Português

Bookmarks

Key Takeaways

  • Technology adoption has improved quality of care throughout the healthcare industry. This has lead care organizations and practitioners to invest more in technology platforms
  • As different care platforms specialized in their own fields and employed across the industry, care organizations are seeing disaggregation in these technology platforms creating siloed data streams
  • Care providers today shift from one system to another to grasp a holistic view of a patient's health record and history. On the other hand, patients are kept in the dark in terms of their medical history, medical record portability and correlated care coordination
  • This paper discusses a solution to aggregate health data streams from different sources, clean them, normalize and enrich them for pattern recognition and complex event processing
  • The paper also presents a proof of concept with an open source technology selection

Building a patient-centric healthcare platform by connecting disparate information systems

Technology adoption in the healthcare industry is on the rise. Healthcare providers and provider institutions are pressured by governments to build efficient care platforms. These efforts have improved the quality of care, reducing inefficiencies in care coordination. On the other hand, the expectation of a modern healthcare system is greatly increased. Patients demand prompt feedback, and care providers require correlation among multiple diagnoses to provide more personalized care. With that shift in expectation, providers worry about doing too little than of doing too much. Patients feel the same, they are likely to be grateful for the extra test done in the name of “being thorough”. This phenomenon has lead to a global epidemic of overtesting and is contributing massively towards rising healthcare costs. To meet the stakeholder expectation, to solve the problem of overtesting, and to ultimately improve the quality of care, the healthcare practitioners can take a data-driven approach. A data-driven approach such that decisions were made through observing and subscribing to patient’s health data streams.

Following solution is an attempt to capture, correlate and process those disaggregated health data streams for analysis and decision making.   

Diagram 1: Individual Healthcare Chart
 

Problem statement & analysis

Healthcare services are often time and mission-critical by nature.  Regardless of the complexity and thoroughness, if the diagnosis cannot reach the physician or the patient on time, the consequences can be dire. The need for electronic medical records has resulted in provider institutions to invest in Electronic Medical Record (EMR) systems and medical testing specific reporting platforms. However, this effort has created siloed, disaggregated information streams. These siloed streams alone contain all sorts of information which should be integrated at different levels to generate useful insights and ultimately help the physician to make better decisions.

There are two major concerns with disaggregated information streams.

  1. Lack of correlation
  2. lack of availability in a meaningfully consumable form

By providing a layer of correlated data points, physicians and care providers can make much more sense of all the medical tests being done on a patient. This will improve decision making and will cut down over testing. Similarly, by improving the availability of the data (raw and correlated), the healthcare institutions can enable and empower a connected healthcare ecosystem. Improved availability will create an app economy within the network of healthcare platforms.  

Today, consumer technology platforms are diverse and complex. People actively use wearable connected devices like smartwatches, fitness trackers and health monitors. These devices can provide enough information about a specific person’s daily activities and routines. These information streams can then combine with clinical information which can paint a health graph of a given individual that updates near real time. And that correlated information chart can then be used for healthcare decision making, care planning, appointment scheduling, and improved outpatient care. Unfortunately, these data streams are often ignored.

Overtesting is also identified as a major problem in the healthcare industry. It is one of the factors contributing to rising healthcare costs. If physicians can have a consolidated view of the basic test results, that itself should be able to provide an implicit view of causes for certain medical conditions avoiding over prescription of further testing.

In summary, these problems of the healthcare industry have a direct correlation to inefficiencies in how healthcare data is stored and processed. The expectation of physicians today is to have a holistic view of the patients' health background. This should include complete medical history, daily activities (monitored with wearables), interactions with other physicians and caregivers and their diagnosis and possibly machine learning predictions of correlated data points. The expectation of the patient today is to have a portable self-medical view with the history of acute and chronic conditions, health progression and the effects of treatment.

Proposed Solution

This paper discusses a layered approach to address data correlation and availability challenges identified in the healthcare domain. We commonly see that in the healthcare, the data sits with and stream from EMR platforms and medical devices. In addition to the data owned by providers and institutions, citizens own a portion of day-to-day activity based health data. The solution proposes data access and an integration layer to connect different data streams from different sources which then will consolidate and feed into a stream processing layer. The decisions and KPIs derived from the stream processing layer is then personalized and enriched for targeted patients and physicians and are exposed as an API layer for the subscription.  

Diagram 2: High-Level solutions architecture (L0)
 

Data Access Layer

Most modern EMR platforms offer rich REST APIs (Cerner, EPIC, OpenMRS etc.). They are often standards-based (HL7, FHIR) and easy to subscribe and consume. On the other hand, data access in wearables can differ from product to product. Some expose the accumulated data through a cloud storage API and some through SDKs. For instance, Fitbit provides a REST APIs whereas MS Health and Jawbone provide an SDK. Most sensor based SDKs support protocols such as CoAP, MQTT and HTTP. The devices commonly require an intermediary layer such as a mobile application, which would eventually transfer the data to respective third party applications.

Data Integration Layer

The integration layer will normalize these data streams. It will connect to multiple data sources and data dictionaries to make raw data meaningful. Patient health data in the form of medical tests results need references, the integration layer can call dictionary services and enrich the raw form before fed into further processing. The integration layer is also responsible for building a standard unification model. Different data sources can have different formats (JSON, XML, EDI) and vendor vocabularies it is important to create a standard unification model for ease of processing. The integration layer employs a fair amount of enterprise data integration patterns like transformation, payload construction, message filtering, and scatter-gather. This layer also incorporates scheduling to pull data at intervals and at a given (scheduled) time. Finally, the normalized data is persisted for further processing.

Stream processing Layer

Stream processing layer is the templated intelligence layer. This layer typically forms a lambda type architecture. It will process normalized health data that is persisted and will also correlate to real-time event streams that are directly fed in through other systems. Stream processing layer is configurable by nature. Rules have to be fed in for the expected result. A typical job type of this layer would be to correlate a patient’s daily activity in a fitbit stream with the indicators of a wearable blood pressure monitor and compare/contrast with historical personal health records. That view will increase the physician's decision-making ability, make further investigation recommendations streamlined and will reduce medical errors due to the lack of a holistic view. As such this layer is equipped to do rule-based event processing, batch, real-time and predictive stream processing.

Personalization / Enrichment Layer

Each EMR and device can only provide a localized view of the data it owns. However, a comprehensive patient-centric healthcare view is an aggregation of many systems. This aggregation happens at the integration layer and a large amount of intelligence is added at the stream processing layer. The personalization layer adds the patient-centric context to the data. It adds the patient personal information as an enrichment so the providers and patients can view the data as meaningful information. This layer also models data entitlement policies. The rules necessary to expose patient data to relevant parties based on their needs and privileges. 

Subscription Layer

Personalized and enriched data and indicators then need to be delivered to patients, physicians, and other healthcare providers. The subscription layer provides a multi-medium platform for consumers to subscribe to these data streams. Provider applications can consume this data as APIs or as subscriptions in pub/sub-mode. Subscription layer maintains security handshake with the client platform, typically with standard security protocols like OAuth/mTLS etc.

Solution Implementation

Diagram 3: Detailed solutions architecture (L1)
 

As the paper discusses multiple layers in the solution, there can be different technology choices for each of those. Making the solution more generic and widely accessible we turn towards open source software and framework choices. For the implementation of the data access and data integration layers, we have options like Apache Camel, Apache Synapse, WSO2 Data Services (Apache 2.0) and Spring integration. For the stream processing layer, we can use frameworks like Siddhi (Apache 2.0), Apache Spark, Apache Storm or Apache Flink. Personalization and enrichment are also services by the same tools we use for data integration and normalization. Tools like Synapse and Camel shine in this space. The subscription layer is about exposing the endpoints securely for external consumption and integration. Open source API management tools like WSO2 API Manager (Apache 2.0), Kong API Manager (Apache 2.0) and Tyk (MPL) fill up this space.

To access data from wearables, healthcare devices and vendor specific EMR platforms (Cerner, Epic etc), the implementation can use connector extensions in different tools. In Apache Camel these are called Camel components, in Apache Synapse these are synapse extensions. These connectors allow you to interact with the functionality and pull data of a third-party product or platform from the data access layer. These tools have a wide range of connectors to different platforms that can be installed on demand, that will enable access to different healthcare platform functions (i.e. Cerner EMR, Epic EMR, Fitbit etc). The connectors encapsulate the vendor protocol, authentication mechanism and data access including serialization and deserialization.

Diagram 4: Connecting data sources
 
<property description="ID" expression="//patientId" name="id" scope="default" type="STRING"/>

 <!-- EPIC CONNECTOR INITIALIZATION -->
<sequence key="EPIC_CONNECTOR_INITIALIZATION"/>

<!-- Search Observation EPIC-Connector Operation -->
 <epic.searchObservation>
    <patient>{$ctx:id}</patient>
    <code>718-7</code>
 </epic.searchObservation>
   ...
Code block 1: Connecting to Epic EMR with Apache Synapse

 

Tools like Synapse and Camel also performs the role of the integration layer in the discussed solution. Once the data is pulled from different sources, through connectors and JDBC links, the data is then canonicalized to a common format with aid of healthcare data dictionaries and vendor-specific vocabularies. Both Camel and Synapse can act as an ESB in this layer and perform multiple enterprise integration patterns like message transformation, scatter-gather, payload factory etc. Finally, the normalized data stream is written to a persistent storage or to an event queue/source.  

Diagram 5: Transformation and normalization
 
<!-- EPIC Error code 4101 = Resource request returns no results -->
<!-- Capture Issue from EPIC’s predefined Error Response  -->
<filter description="Compares EPIC error code with Response" regex="4101" source="get-property('uri.var.issue')">
    <then>
      <log level="custom">
        <property name="EPIC_ERROR_CODE" value="4101"/>
      </log>
      <drop/>
    </then>
    <else>
       <!-- This property will be used only when Reports are updated -->
       <property name="updatedReportCount" scope="default" type="DOUBLE" value="1.0"/>
       <!-- Use EPIC_REPORT_FILTER Sequence for Filter According to Date -->
      <filter xpath="$ctx:updatedReportCount >  0 ">
        <then>
        <log level="custom">
        <property expression="get-property('updatedReportCount')" name="Updated Report Count"/>
Code block 2: Filter and normalize with Apache Synapse
 

The normalized data is then fetched by the stream processing layer as per the discussed solution. During implementation, we chose “Siddhi”: the open source Stream Processing framework for this task. Siddhi will take the normalized events and run the defined rules on them to generate summarizations, alerts and predictions. The type of queries and rules that can be applied to the data is templated and parameterized. Based on scenarios and the type of deployment (Provider oriented or patient-oriented) these templates can be materialized to actual rules books that Siddhi engine will apply on the data. The summarizations can then be published to topics, to persistent storages for further processing/optimization.

Above implementation will be the same with other such frameworks like Apache Spark, Storm or Flink. All of these tools are rich with stream processing capabilities, in-memory over the wire as well as correlating with historical data, implementing lambda type patterns.

Diagram 6: Stream processing
 
-- Siddhi App for Analyzing Observation Records
@App:name("DiagnosticReportApp")
@App:description("Analyze Diagnostic Reports and alert generating Test App")

-- Fetch Observation Reports from Kafka Stream and publish them to Siddhi Stream
@info(name = 'inputStream') 
@source(
type='kafka', 
topic.list='hemoglobin-epic', 
group.id='test-consumer-group', 
threading.option='single.thread', 
bootstrap.servers='localhost:9092', 
partition.no.list = '0',
@map(type='json' , @attributes(entry="$.entry")))
define stream DiagnosticReportDataStream (entry string);

@info(name='Keeps splitted test results') 
define stream SplittedDiagnosticReportStream(entry string, jsonElement string);

@info(name='Keeps Final Analyzed Report values') 
define stream ReportStream(patientId string ,title string, value double, low double, high double, reportdate string,reportId string,unit string);

-- Intermediate Stream used for internal communication between apps  
@sink(type = 'inMemory' , topic = 'DIAGNOSTIC_ALERT',
	@map(type = 'passThrough'))
define stream AlertStream(patientId string , report string, result string,reportdate string,reportId string , highValue string , lowValue string , myValue string );

-- $.entry array contains multiple test Results. Split those into separate events for further processing
@info(name = 'split_entry_query')
from DiagnosticReportDataStream#json:tokenize(entry,'$')
select *
insert into SplittedDiagnosticReportStream;

-- Extract necessary fields for analyzing
@info(name='extract_report_fields_query')
from SplittedDiagnosticReportStream
select str:split(json:getString(jsonElement,'$.resource.subject.reference'), "/" , 8) as patientId , json:getString(jsonElement,'$.resource.code.text') as title , json:getDouble(jsonElement, '$.resource.valueQuantity.value') as value , json:getDouble(jsonElement, '$.resource.referenceRange[0].low.value') as low , json:getDouble(jsonElement, '$.resource.referenceRange[0].high.value') as high,json:getString(jsonElement, '$.resource.effectiveDateTime') as reportdate, json:getString(jsonElement, '$.resource.id') as reportId , json:getString(jsonElement,'$.resource.valueQuantity.unit') as unit
insert into ReportStream;


-- If the value is out of the reference range, alert the alert recipients.
@info(name = 'abnormal_filtering_query')
from ReportStream[value < low or value > high]
select patientId as patientId , title as report, 'ABNORMAL' as result , reportdate as reportdate , reportId as reportId , str:concat(high , unit) as highValue , str:concat(low , unit) as lowValue , str:concat(value , unit) as myValue 


Code block 3: Stream processing with Siddhi stream processing engine
 

For personalization and enrichment, we can reuse EI from WSO2’s integration stack. EI will fetch summarizations, alert information, predictions and co-relate that data with patient personal identification information, provider identification information in order to provide context and as preparation for end-user consumption.

Diagram 7: Personalization and entitlement
 
<!-- EPIC CONNECTOR INITIALIZATION -->
<sequence key="EPIC_CONNECTOR_INITIALIZATION"/>

<!--Get Patient Data From EPIC-->
<epic.searchPatient>
   <id>{$ctx:uri.var.patientid}</id>
</epic.searchPatient>

<!--Persist for alert enrichment-->
<property description="Patient Name" expression="json-eval($.name[0].text)" name="uri.var.patientName" scope="default" type="STRING"/>
<property description="Home Address Line" expression="json-eval($.address[0].line[0])" name="uri.var.patientAddress" scope="default" type="STRING"/>
...
<!--Get Physician Data from EPIC-->
<epic.readById>
   <type>Practitioner</type>
   <id>{$ctx:uri.var.prescriberId}</id>
</epic.readById>

<!--Persist for alert enrichment-->
<property description="Practitioner Name" expression="json-eval($.name.text)" name="uri.var.pratitionerName" scope="default" type="STRING"/>
...
Code block 4: Personalization with Synapse flows
 

For the subscriptions layer, we have used WSO’2 open source API Management product (API-M) from a choice of open source API Manager products. The enriched event streams are then exposed as topics and APIs and are listed in the developer portal of the API Manager. The developer portal maintains the client subscriptions, security with the clients and key issuing.  The API gateway will tunnel the requests to the enterprise integration layer while validating security, throttling and access management policies.

Diagram 8: Subscription & presentation
 

Diagram 9: APIs for subscription
 

Summary

This paper discusses a solution for provider centric disaggregated, siloed data and events stream problem in healthcare. The solution provides a patient-centric consolidated view of the data by tapping into different sources spread across care organizations. The paper also provides a reference implementation with a technology selection providing the reader a choice for tools that have capabilities for ETL, message-oriented integration, stream processing and API Management.

About the Authors

Nuwan Bandara is a solutions architect working closely with Fortune 1000 global enterprises on integration and messaging software projects. He is a Director at WSO2 and is leading solutions engineering teams in North America. Nuwan has worked on interesting projects in healthcare, government, and financial industries in making systems integration simple. He is a programmer, a speaker, and an author.

Himasha Guruge is a solutions engineer focused on API Management, workflows management and Integration. She is a senior engineer at WSO2 working closely on enterprise solutions in Europe.

 

Nadeeshan Gimhana is final year computer science and engineering student at University of Moratuwa, Sri Lanka. Nadeeshan is the core developer of the reference implementation of the solution discussed in this paper.

Rate this Article

Adoption
Style

BT