BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage News Stream API Evolution: a Closer Look at JEP 461's Stream Gatherers

Stream API Evolution: a Closer Look at JEP 461's Stream Gatherers

After its review concluded, JEP 461, Stream Gatherers (Preview), has been Completed for JDK 22. This JEP proposes to enhance the Stream API to support custom intermediate operations. "This will allow stream pipelines to transform data in ways that are not easily achievable with the existing built-in intermediate operations." Further details on this JEP may be found in the original design document written by Viktor Klang, software architect, Java Platform Group at Oracle.

At the core of this JEP is an interface, Gatherer<T, A, R>, an intermediate operation in Java's Stream API that processes input elements (T) while optionally mutating an intermediate state (A) and transforming these elements into a different type of output (R). This operation is defined by four key functions. The first is an optional initializer, which provides a private state for processing stream elements. The second is an integrator, responsible for integrating new elements from the input stream. This integrator may emit elements into the output stream or terminate processing early. The third function is an optional combiner, which facilitates parallel evaluation. The fourth and final function is an optional finisher. This is invoked when there are no more input elements and can potentially emit additional output elements. Depending on the use of a combiner function, the entire operation can be executed either sequentially or in parallel.

The motivation behind JEP 461 stems from the limitations of the existing Stream API, which was introduced in Java 8. While the API provides a rich set of intermediate and terminal operations, such as mapping, filtering, reduction, and sorting, it lacks the flexibility to handle more complex tasks that do not fit neatly into these predefined operations.

For instance, creating a stream of distinct strings based on their length rather than their content is not straightforward with the current API. To implement the functionality, developers might create a class that redefines object equality based on string length, wrapping each string in an instance of this class for the distinct operation. Consider the following coding snippets:

record DistinctByLength(String str) {

    @Override 
    public boolean equals(Object obj) {
        return obj instanceof DistinctByLength(String other)
               && str.length() == other.length();
    }

    @Override 
    public int hashCode() {
        return str == null ? 0 : Integer.hashCode(str.length());
    }
}

var result = Stream.of("foo", "bar", "baz", "quux")
                   .map(DistinctByLength::new)
                   .distinct()
                   .map(DistinctByLength::str)
                   .toList();

// result ==> [foo, quux]

While this method works, it leads to less intuitive and harder-to-maintain code, complicating what should be a straightforward task.

To address the limitations, JEP 461 introduces Stream.gather(Gatherer), a new intermediate operation that processes stream elements using a user-defined entity called a gatherer. This operation is analogous to Stream.collect(Collector) but for intermediate operations. Gatherers can transform stream elements in various ways, including one-to-one, one-to-many, many-to-one, or many-to-many transformations. They can also track previously seen elements to influence the transformation of later elements, short-circuit to transform infinite streams into finite ones, and enable parallel execution.

For example, developers working with a stream of temporally ordered temperature readings, represented by a Reading record that includes a timestamp and a temperature in kelvins, often face the challenge of detecting suspicious changes in temperature. These are defined as changes of more than 30° Kelvin across two consecutive readings within a five-second window.

Traditionally, without a suitable intermediate operation in the Stream API, developers would need to evaluate the stream into a list and then execute their analysis logic in a loop. This imperative approach, while functional, is not the most expressive or efficient method for handling such tasks in Java. It necessitates manual management of state and iteration, as illustrated in the following findSuspicious method.

record Reading(Instant obtainedAt, int kelvins) {}

static List<List<Reading>> findSuspicious(Stream<Reading> source) {
   var suspicious = new ArrayList<List<Reading>>();
   Reading previous = null;
   boolean hasPrevious = false;
   for (Reading next : source.toList()) {
       if (!hasPrevious) {
           hasPrevious = true;
           previous = next;
       } else {
           if (isSuspicious(previous, next))
               suspicious.add(List.of(previous, next));
           previous = next;
       }
   }
   return suspicious;
}

With the introduction of gatherers in JEP 461, the developers can now express this logic more succinctly and declaratively. The windowSliding gatherer, for instance, allows us to group elements into sliding windows, which can then be filtered based on our criteria for suspicious changes. The revised findSuspicious method using a gatherer looks like this:

List<List<Reading>> findSuspicious(Stream<Reading> source) {
    return source.gather(Gatherers.windowSliding(2))
                 .filter(window -> (window.size() == 2
                                    && isSuspicious(window.get(0),
                                                    window.get(1))))
                 .toList();
}

JEP 461 also introduces built-in gatherers like fold, mapConcurrent, scan, windowFixed, and windowSliding. These gatherers provide common operations that can be used directly or as building blocks for more complex transformations.

Consider the following code example:

List<List<Integer>> windows = Stream.of(1, 2, 3, 4, 5, 6, 7, 8)
                                    .gather(Gatherers.windowFixed(3))
                                    .toList();

This snippet uses the windowFixed gatherer to group elements into fixed-size windows. The stream of integers is divided into sublists, each containing three elements. The resulting list, windows, contains sublists [[1, 2, 3], [4, 5, 6], [7, 8]].

Consider another example:

Optional<String> numberString 
        = Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9)
                .gather(Gatherers.fold(() -> "", (string, number) -> string + number))
                .findFirst();

In this example, the fold gatherer concatenates a stream of integers into a single string. The gatherer starts with an empty string and appends each number to it, resulting in Optional[123456789].

The proposal emphasizes that while gatherers are similar to collectors in design, they serve different purposes. Gatherers are used for transforming and manipulating stream elements in intermediate operations, whereas collectors are used for summarizing the output of a stream pipeline in terminal operations.

In conclusion, JEP 461 introduces stream gatherers to the Java Stream API, representing an enhancement in stream processing. This addition provides Java developers with new tools for constructing stream pipelines, potentially increasing the expressiveness and flexibility of handling complex tasks within the existing API.

About the Author

Rate this Article

Adoption
Style

BT