BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage News Release of Open Liberty 19.0.0.9 Completes Support of MicroProfile Standalone Reactive Streams APIs

Release of Open Liberty 19.0.0.9 Completes Support of MicroProfile Standalone Reactive Streams APIs

Bookmarks

In keeping with IBM's commitment to their four-week release cycle of Open Liberty, version 19.0.0.9 was recently made available to the Java community with new features that include: support for reactive messaging; testing database connections with REST APIs; and support for OAuth 2.0 client secret and access token hashing.

First introduced in September 2017, Open Liberty is an open-source implementation of IBM's WebSphere Liberty application server to build microservices and cloud-native applications. Now compatible with Jakarta EE 8, Open Liberty's ongoing support for MicroProfile includes the new standalone reactive streams APIs: MicroProfile Reactive Streams Operators (version 19.0.0.4); MicroProfile Context Propagation (version 19.0.0.8); and MicroProfile Reactive Messaging (version 19.0.0.9).

Reactive Streams

The Reactive Streams initiative, based on the Reactive Manifesto, started in 2013 as a collaboration among Lightbend, Netflix and Pivotal. The initial specification, released in 2015, was ultimately adopted with the release of JDK 9 as the java.util.concurrent.Flow API.

MicroProfile Reactive Messaging

Open Liberty 19.0.0.9 provides a full implementation of the MicroProfile Reactive Messaging 1.0 API. Released in July 2019, Reactive Messaging provides asynchronous messaging support to send, receive, and process messages in a microservices-based architecture. Utilizing the @Incoming and @Outgoing annotations on methods to consume and feed messages, respectively, Open Liberty processes the annotated methods connected by channels. Consider the following diagram:

The publisher method, send(), sends the message "hello" to the processing method, process(), where the messages "hello" and "world," are concatenated and sent to the subscribing method, receive(). This can be coded as follows:

    
@Outgoing("hello")
public String send() {
    return "hello";
    }

@Incoming("hello")
@Outgoing("world")
public String process(String input) {
    return input + " world";
    }

@Incoming("bar")
public void receive(String input) {
    System.out.println("Message received: " + input);
    }
    

Reactive Messaging depends upon the MicroProfile APIs, Reactive Streams Operators and CDI (JSR 365).

MicroProfile Context Propagation

A full implementation of the MicroProfile Context Propagation 1.0 API was introduced in Open Liberty 19.0.0.8. Also released in July 2019, Context Propagation, formerly known as MicroProfile Concurrency, is built upon the Java SE CompleteableFuture class that enables developers to create pipelines of dependant stages that run with predictable and reliable threads on the Open Liberty global thread pool.

Interfaces specified in this API include ManagedExecutor and ThreadContext. Instances of each are achieved through their respective builder patterns.

    
ThreadContext threadContext = ThreadContext.builder()
    .propagated(ThreadContext.SECURITY)
    .build();

ManagedExecutor executor = ManagedExecutor.builder()
    .maxAsync(5)
    .propagated(ThreadContext.CDI, ThreadContext.APPLICATION)
    .build();

CompletableFuture<Long> stage = CompletableFuture.supplyAsync(supplier1)
    .thenApplyAsync(function1)
    .thenApply(threadContext.contextualFunction(function2));
    

A completion stage, not created by a managed executor, may still run with a predictable thread context if its action is pre-contextualized with the contextualFunction() method defined in ThreadContext. In the example above, supplier1 and function1 run with non-deterministic thread context. However, the pre-contextualized action, function2, always runs with the context of the thread that invoked the contextualFunction() method.

Managed executors in Context Propagation are fully compatible with the Java SE ManagedExecutorService interface.

MicroProfile Reactive Streams Operators

A full implementation of the MicroProfile Reactive Streams Operators 1.0 API was Introduced in version 19.0.0.4. Released in January 2019, Reactive Streams Operators allows developers to create reactive streams, process the data transiting in those streams, and accumulate results.

Interfaces specified in this API include: PublisherBuilder; SubscriberBuilder; and ProcessorBuilder. It's important to note that due to the 40-50 individual requirements and approximately 100 tests in the TCK, the Reactive Streams Operators interfaces were not intended to be directly implemented by developers. As stated on the website:

The semantics defined by Reactive Streams are very strict, and are non trivial, particularly in the areas of thread safety, to implement correctly. Typically, application developers are expected to use third party libraries that provide the tools necessary to manipulate and control streams.

The ReactiveStreams class provides factory methods for publisher and processor builders. Consider the example marble diagram below where the of() method defined in ReactiveStreams creates an instance of PublisherBuilder that emits a given set of elements:

This can be coded and manipulated as follows:

    
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;

public class Main {

    public static void main(String[] args) {
        // create a stream of numbers
        ReactiveStreams.of(3, 1, 5, 6)
                .filter(s -> s > 4)
                .forEach(number -> System.out.println(">> " + number))
                .run();
        }
    }
    

With the applied filter, only the numbers, 5 and 6, are displayed.

Support for all three MicroProfile reactive streams APIs is now complete with this latest Open Liberty release.

Resources

Rate this Article

Adoption
Style

BT