BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Articles Refactoring to Reactive - Anatomy of a JDBC migration

Refactoring to Reactive - Anatomy of a JDBC migration

Bookmarks

Key takeaways

  • Reactive programming comes with a learning curve, requiring experience, practice, and an open mind to fully grok.
  • It is possible to introduce reactive programming incrementally in any application.
  • Reactive programming and non-blocking libraries like Netty increase scalability and elasticity, and lower operating and development cost. (See the Reactive Manifesto for more context.)
  • Transformation of streams of future values into other streams of future values is a powerful programming paradigm that with practice offers great rewards.
  • Composability is the hallmark of functional programming; in this article we will explore monadic composition, using the flatMap operator of the Observable.

 

Reactive programming is the new kid on the block, offering built-in solutions for some of the most difficult concepts in programming including concurrency management and flow control. But if you work on an application development team there's a good chance you are not using reactive and so you might have questions - how do I get there, how do I test it, can I introduce it in phases?

In this article we will transform a real (intentionally simple) legacy application (with a classic setup consisting of a web server and database backend), to a reactive model, striving for a threefold benefit:

  1. Work in a functional style, allowing our code to be composed and thus reused, rendering it clearer and more readable thanks to a higher level of abstraction, immutability and fluent style. For example, the Function<T,Observable<U>> type is composable, because we can chain together multiple such functions using the flatMap operator of the Observable.
     
  2. Build a more resilient application, allowing our server to sustain a greater number of concurrent users than the actual number of available threads, while still allowing the code to be executed concurrently, and not enqueued in some thread pool.

    We will do this by having the application react as each chunk of information is received from the database, rather than having the thread wait for that information. This is a transformation from a pull mechanism (doing blocking I/O) to a push mechanism that continues execution when data becomes available or some event occurs, such as a timeout or an exception.
     
  3. Build a more responsive application, allowing our browser view to be updated even as the first bytes start coming back from the database. This is achievable with Servlet 3.1 streaming API and will become available with Spring Reactor and the associated web extensions on the Spring stack.

The examples in this article may be downloaded from this link.

We want to endow our program with its new reactive makeover in small steps, using incremental refactoring.

Before we start, let's enrich our test suite for the legacy application, because we rely on such tests for the correctness of the refactored versions of the program on its journey to its new functional and reactive form.

Once the tests are done, we can start with the first refactoring step: replacing method return types with Observable, Single or Completable, all part of RxJava. Specifically, for each method that returns a value of type T we can return Single<T>, an Observable<T> with exactly one value to emit, or an error. For List<T>, we will change the return type to Observable<T>, while for each void method we will return Completable, which can be converted to Single<Void>.

Rather than replace all method return types in all the layers at the same time, let's select one layer and start there. Converting a method that returns T into one that returns Single<T>; or one returning List<T> to an Observable<T>; or a void to a Completable does not need to change that method as it is known by its client code: instead you can retain the method's original signature, but implement it as a delegating method to a new method, that contains the implementation that returns an Observable<T>. The delegating method (with the original method signature) calls toBlocking on the observable, to enforce the original synchronous contract, before returning any values. Starting as small as possible is a great migration strategy that helps you overcome the learning curve and deliver steady incremental progress. You can apply incremental refactoring with RxJava.

Here is a concrete example. You can see all of the code and its history here where I take a classical Spring application tkssharma/Spring-JDBC and convert it to RxJava in two ways: using the RxJDBC library (in the rx-jdbc branch) and using the pgasync library (in pgasync branch).
Let's look at the following methods from the Spring-JDBC project:

List<Student> getStudents();
Student getStudent(int studentId);

Following the above migration strategy, we will retain these method signatures, but make a small change in their implementation:

public List<Student> getStudents() {
  return getStudentStream().toList().toBlocking().single();
}

We introduced an extra layer:

Observable<Student> getStudentStream();
Single<Student> getStudent(int studentId);

With the following implementation:

public Observable<Student> getStudentStream() {
  List<Student> students = getStudentsViaJDBC();
  return Observable.from(students);
}

Where getStudentsViaJDBC is the initial implementation.

What we have effectively done is to create a new reactive layer, while retaining our original non-reactive signature, and then replaced the original implementation with a call to our new reactive incarnation. We will make a few further iterations on this data-access layer, and then make the application reactive upwards, toward the controller layer, with the final goal of making it reactive end to end.
The Observable.toBlocking method acts as a bridge between the classic and reactive worlds. It is what you need to use to plug a reactive code (even if only in API), to a code that is classical in the large scale: like a servlet in one end and JDBC at the other. Until those ends are made reactive as well, we need this method.

Of course, at the end of our refactoring, the synchronous toBlocking call is anathema to the asynchronous reactive paradigm, so ultimately we will want to remove those from our production code.

Let's say you have a method List<Student> getStudents(). You would create a new method Observable<Student> getStudentStream() and move the implementation to it, wrapping the resulting list into an Observable using Observable.from(Iterable). The original method will call the new one and then studentStream.toList().toBlocking().single() to convert it back to List. This is inherently blocking, but this is ok since the existing implementation of getStudentStream is already blocking at this point.

The biggest obstacle in the reactive learning curve is learning to think in terms of Observables. You may find it intuitive for a List to become a stream, (which is, after all, exactly what an Observable is), but applying that notion to individual values is less intuitive. To conceptualize this, consider the traditional Java concept of a Future: although it contains a single value, it can be viewed as a particular case of a stream of future values, which happens to contain just a single value, or an error if no value can in fact be successfully emitted.

Our first step, wrapping return types in Observable, does not change the nature of execution. It is still synchronous, doing blocking I/O just like JDBC does. .

We have completed step one of our refactoring: changing signatures from List<Student> getStudents() to Observable<Student> getStudents(). Interestingly, even the method Student getStudent(), which returns just a single student, is also refactored to Observable<Student> getStudent(), or potentially to Single<Student> getStudent(). Furthermore, even void methods are refactored to return Completable.
Applying the reactive paradigm can be further employed from the top down, by wrapping large or small parts into a reactive envelope (API), and then further breaking down each part where for example we need asynchronicity or non-blocking I/O.

To implement the new signature, instead of returning studentList we return Observable.just(studentList).

At this point we have introduced Observable in a few places, but besides simply wrapping and unwrapping a list, basically nothing has changed . But it is an important step because it has made our code composable, and we are now ready to make the next move and start using some of the power behind Observable, namely lazy evaluation, which is also available in Java streams. Instead of returning Observable.just(studentList), let's return

Observable.defer(
   ()->Observable.just(actuallyProduceStudentsViaTripToDatabase()
)).

Notice that actuallyProduceStudentsViaTripToDatabase is in fact the legacy method we started from, that returned the List<Student>. By wrapping it with Observable.defer or Observable.fromCallable, we obtained a lazy Observable, which only initiates the query to the database at the moment a subscriber subscribes to that data.

At this point only the data access layer API has been modified to return Observable; the controller methods are as yet unchanged, and so they must consume (subscribe to) the observable and wait for its results in the same thread; aka blocking I/O. But our goal for this article is to create an end-to-end asynchronous processing, which means that instead of the controller methods returning an already populated Result (with data already available for the template rendering), we want to end up with an asynchronous Spring MVC, which is supplied by the DeferredResult class, the async object provided by Spring MVC. (Spring plans support for streaming in the upcoming Spring Reactive Web, powered by the Spring Reactor ecosystem.) Using this approach, the controller method returns not a completed Result, but a promise that when the result becomes available, it will be set on the previously returned DeferredResult. If we just modify the controller methods that return Result to return DeferredResult, that in itself is sufficient to provide a degree of asynchronicity.

@RequestMapping(value = "/student.html", method = RequestMethod.GET)
public DeferredResult<ModelAndView> showStudents(Model model) {
   Observable<ModelAndView> observable = studentDAO.getAllStudents()
           .toList()
           .map(students -> {
               ModelAndView modelAndView = new ModelAndView("home");
               modelAndView.addObject("students", students);
               return modelAndView;
           });
   DeferredResult<ModelAndView> deferredResult = new DeferredResult<>();
   observable.subscribe(result -> deferredResult.setResult(result), 
                             e -> deferredResult.setErrorResult(e));
   return deferredResult;
}

We have made an important step toward asynchronicity, but, surprisingly, this method is still waiting for the results to come from the database, i.e. it is still blocking. Why is that? You may recall that until now, the Observable returned by the access layer executes its subscribe from the calling thread, so despite using the DeferredResult approach, the method will block until the Observable delivers the data, consuming thread resources.

The next step will be to change the Observable so that it does not block the current thread on the subscribe call. This can be done in two ways: one is using the native-reactive libraries, and the second is to use Observable.subscribeOn(scheduler) and observeOn(scheduler), executing the subscribe method and the observe methods on different schedulers (think of schedulers as thread pools).

The observe methods are map, flatMap, filter, which transform an observable to another observable, as well as methods such as doOnNext, which executes actions each time a new element is emitted in the stream. This second approach (using subscribeOn) is one small intermediate step towards the goal of fully non-blocking libraries. It simply moves the subscribe and observe actions to different threads: these actions will still block until the results are available (only they will block other threads), after which they will push the results back to the subscriber, which further pushes them to a DeferredResult. There are libraries that implement RxJava on top of JDBC, that use this manner of blocking threads (either the calling thread or other threads, as configured.) This approach is currently required for JDBC, since JDBC is a blocking API. Generally,  this approach can be used as an intermediate step to the goal of fully non-blocking libraries, but ultimately the native-reactive approach is the goal, because it boosts scalability, allowing you to support a greater number of truly concurrent user operations (aka flows) than the number of available threads.

Here is the getStudents implementation using the RxJDBC library:

public Observable<Student> getStudents() {
   Class<String> stringClass = String.class;
   return database
           .select("select id,name from student")
           .getAs(Integer.class, stringClass)
           .map(row->{
                   Student student = new Student();
                   student.setId(row._1());
                   student.setName(String.valueOf(row._2()));
                   return student;
               });
}

In order to get the RxJDBC library, add this dependency in the Maven project:

<dependency>
   <groupId>com.github.davidmoten</groupId>
   <artifactId>rxjava-jdbc</artifactId>
   <version>0.7.2</version>
</dependency>

The third step is to introduce a true reactive library. There are a few, even for relational databases, but you can find more when focusing on a specific database, such as Postgres. This is because the database access library is specific for each low level protocol of each database. Here we use the postgres-async-driver project, which itself uses RxJava.

Here is the getStudents implementation again, this time with the pgasync library:

public Observable<Student> getStudents() {
       return database
       .queryRows("select id,name from student")
       .map(row -> {
           Student student = new Student();
           int idx = 0;
           student.setId(row.getLong(idx++));
           student.setName(row.getString(idx++));
           return student;
       });
   }

To use the pgasync library, import this maven dependency:

<dependency>
    <groupId>com.github.alaisi.pgasync</groupId>
    <artifactId>postgres-async-driver</artifactId>
    <version>0.9</version>
</dependency>

At this moment we have a truly reactive (asynchronous, event-driven, non-blocking) backend implementation. We also have an end-to-end asynchronous solution that allows us to process more user actions concurrently (at the I/O flows level) than actual threads in the JVM.

Next, let's work on transactions. We will take a scenario where we want to modify data using DML (data modification language) operations INSERT or UPDATE. Even for the simplest transaction consisting of a single DML statement, introducing asynchronicity is still complicated since we are so used to transactions that block threads. And all the more so in the case of more realistic transactions that typically contain multiple statements.

Here is how a transaction would look:

public class T {
 private Observable<Long> dml(String query, Object... params) {
   return database.begin()
           .flatMap(transaction ->
                   executeDmlWithin(transaction, query, params)
                           .doOnError(e -> transaction.rollback()));
}

private Observable<Long> executeDmlWithin(
       Transaction transaction, String query, Object[] params) {
   return transaction.querySet(query, params)
        .flatMap(resultSet -> {
            Long updatedRowsCount = resultSet.iterator().next().getLong(0);
            return commitAndReturnUpdateCount(transaction, updatedRowsCount);
        });
}

private Observable<Long> commitAndReturnUpdateCount(
       Transaction transaction, Long updatedRowsCount) {
   return transaction.commit()
        .map(__ -> updatedRowsCount);
 }
}

This is a single-statement transaction, but it illustrates how you can do transactions in an async reactive API. Transaction begin, commit, and rollback, are all monadic functions: they return an Observable and they can be chained with flatMap.

Let's trace through the example above starting with the signature. The dml execution function takes a data modification language (DML) statement, like UPDATE or INSERT, along with any parameters, and "schedules" it for execution. Notice that db.begin returns Observable<Transaction>. The transaction is not created right away, because it involves I/O with the database. So this is an asynchronous operation such that when execution completes it returns a Transaction object on which SQL queries followed by commit or rollback can subsequently be called as required. This Transaction object will be passed from Java closure to Java closure, as we see above: first, transaction is available as an argument to the flatMap operator. There it is used in three spots:

  • first it launches the DML statement within the transaction. Here, the result of the querySet operation that executes the DML is also an Observable that holds the result of the DML (generally a Row with updated row counts), and is further transformed with flatMap to another Observable.
  • The second flatMap then uses our transaction object to commit the transaction. There, the transaction variable is enclosed by a lambda function and is provided as an argument to this second flatMap. This is one way you can send data from one part of an async flow to another: using a variable from the lexical scope and using it in a lambda expression created at one time, but executed at a later time and potentially in a different thread. This is the significance of lambda expressions being Java closures: they enclose variables used in the expressions. You can send data like this using any Java closure, not just lambdas.
  • The third usage of the transaction variable is the doOnError, where the transaction is rolled back. Again note how the transaction variable is passed in three places via the usual Java lexical scoping, even though some pieces of the code will be executed synchronously (as part of the method execution, in the calling thread), and others will be executed later, when some events happen, i.e when a response comes from the database, asynchronously and on different threads. The value transaction is available however in all these contexts. Ideally shared values should be immutable, stateless or thread safe. Java only requires them to be effectively-final but this is not enough for non-primitive values.

If successful, the transaction commit result will be translated (mapped) to an update count, which can be used by callers. Here, in order to transmit the number of updated/inserted rows to an outside caller of the transactional method, we cannot capture the result count by using Java closures, since the callee is not in the same lexical scope as the caller. In this case we need to encapsulate the result in the data type of the resulting observable. If we need to carry multiple results we can resort to immutable Java classes, arrays or unmodifiable Java collections. On error, rollback is called on the transaction. The error then bubbles up the observable chain (not the call stack), unless it is stopped from doing so via specific Observable operators that say “when this Observable has an error, use this other Observable, or perhaps try the same one again”.

This transactional update is a first example of flatMap chaining: what we do is pipe multiple steps one to another, in an event driven manner: when the transaction is started, a query can be issued; when the query result is available, some result parsing and transaction commit can be issued; when the transaction is complete, the result is used to replace the successful commit result, (which contains no information), with the result (here, the update count). If the final observable would not have been Observable<Void> but Observable<T>, we could have packaged T with our result Long into a data transfer object.

In the reactive world we aim to bring a blocking application to a non-blocking state. (A blocking application is one that blocks when performing I/O operations such as opening TCP connections.) Most of the legacy Java APIs for opening sockets, talking to databases (JDBC), file/inputStream/outputStream, are all blocking APIs. The same is true about the early implementations of the Servlet API and many other Java constructs.

Over time things started to adopt non-blocking counterparts; for example, Servlet 3.x integrated a few concepts like async and streaming. But in a typical J2EE application one would typically find blocking calls, which is not always a bad thing; blocking semantics are easier to understand than explicit async APIs. Some languages like C#, Scala, and Haskell have constructs that transparently generate non-blocking implementations from blocking code, for example, the async high order function in C# and Scala. In Java, to my knowledge, the most robust way to perform non-blocking operations is by using Reactive Streams or RxJava, or with non-blocking libraries such as Netty. However things remain pretty explicit, so the entry barrier can be high. Still, when you need to support more concurrent users than the number of threads, or when your application is I/O-bound and you want to minimize costs, then doing things non-blocking will get you an extra order of magnitude in scalability, elasticity and cost reduction.

When discussing elasticity or robustness, it is helpful to consider the moment when all threads are waiting for I/O. For example, let's assume a modern JVM can support 5000 threads. This means that when 5000 calls to various web services are waiting on their respective threads in a blocking application, simply no more user requests can be processed at that stage (they can only be enqueued for later processing by some specialized threads doing just enqueuing). That might be fine in a controlled context, such as a corporate intranet, but is certainly not what a startup needs when a sudden 10X burst of users decide to check out their product.

Of course one solution to traffic spikes is horizontal scalability; bringing up more servers, but that is not elastic enough, not to mention costs. Again, it all depends on the kind of I/O an application does. But even if the HTTP is the only potentially-slow I/O an internet service is exposed to, and all the other I/O ops are with internal databases and services that are HA (highly available) and low latency, then at least HTTP will move bytes slowly with a slow client on the other side of the planet.

It is true that this problem is the remit of professional load balancers, but you never know when the most "highly available" internal or external service goes down, and when the most "low latency" service is actually just "near-realtime" and not hard-realtime, and at that moment it will just respond slowly because of garbage collection. Then if you are blocking in only parts of your stack, there will be a blocking bubble, which means that threads will start blocking on the slowest blocking I/O and bring resources to a halt because of a single slow blocking access that is requested by 5% of the traffic and has low business importance.

Hopefully I have convinced you that making an application non-blocking adds value in many situations, so let's come back to our legacy application. It is blocking in all its layers, HTTP, and database access, so let's start from there. Unless all layers on a vertical (here HTTP and database access) are being made async, the full flow cannot be async.

There is also a difference between async and non-blocking in that while non-blocking implies async (unless we have language constructs), async can always be done for a blocking call by simply "moving" it to a different thread. This has some of the same issues as the initial blocking solution, but can be a step towards the end goal, in an incremental approach. For the HTTP side, we are already partially covered by the current state of the Servlet spec and Spring MVC, which gives us async behavior, but not streaming.

Async implies that when the database finishes responding, the processing will kick in. When processing completes the web layer starts rendering. When the web page (or the JSON payload)  is rendered, the HTTP layer is call with "here's your full response payload".

The next step would be streaming: when the database tells the processing layer "here's some more data for you", the processing layer accepts it. This acceptance does not necessarily imply that a dedicated thread is being used, for example NIO or Linux epoll would be non-blocking. Here the idea is that 100K connections are being queried by a single thread to the OS with the questions "is there anything new on the 100K connections?" Then the processing layer can do a transformation that outputs more semantic units, like "students". It may be useful at times, where say the data from the database represents just part of a student, to keep the partial info in the processing layer buffers. When a bulk data fetch from the db has finally obtained all of the data on that student, it can be "closed" and sent to the upper layer for rendering. In such a pipeline, any component can stream at any granularity: some will just copy bytes from left to right;others will send full student instances or even batches of them, while others, like the DeferredResult of MVC Spring, will need the whole result before starting to write an HTTP response.

So back to the steps of refactoring:

  1. Put Observable in signatures
  2. Put observable.subscribeOn and observable.observeOn(scheduler) to move blocking computations (e.g. JDBC calls) to a different thread pool
  3. Make it async: use Spring MVC async
  4. Make the backend non-blocking: use a specialized library for your database that implements a non-blocking alternate implementation
  5. Wrap that non-blocking implementation in RxJava or your preferred reactive framework, if not already wrapped in it, like in our case
  6. Make it streaming: use Vert.x
  7. Do the writes
  8. Do the (multi-statement) transactions
  9. Verify your error handling

To run the app:
We use the PostgreSQL database. You can install it and set the postgres user password "mysecretpassword". Or simpler, install docker and run:

sudo docker run -d -p 5432:5432 -e POSTGRES_PASSWORD=mysecretpassword -d postgres

Now, execute student.sql to create the table and insert some sample rows.
Then do a mvn install. Then deploy the war file in tomcat or jetty.
The URL is here, click on "students."

More on composability

We have spoken a lot about composability, and it pays to understand those semantics. In the particular context of reactive programming, I will highlight how a kind of composition functions, let's call it "sequencing". That means that we have a pipeline of processing, that given some input produces some output in a series of steps. Each step can be synchronous (for example, applying some computation to transform a value), or asynchronous (like going to a web service or a database to get some more information). The pipeline can be pulled by its consumers or pushed by its producers.

Let's consider another example. Suppose we are building a non-blocking web server that needs to send data to a backend system for processing, and then return the result in the response. It also needs to authenticate the user making the request and apply authorization. So already a few steps of processing are emerging in what we can already call a pipeline of processing of a request leading to a state change in the backend (and other systems) and a response to the end user.

In composing such a pipeline, it would be ideal if we would not need to be aware of the details of any particular step, like whether it is synchronous or asynchronous, or how many retries it performs, if any. If asynchronous, from which thread pool does it take, or which non-blocking framework does it use.

Conversely, when I need to change a processing step from a synchronous form to asynchronous, I should only need to modify the internal implementation of the monadic function, nothing outside.

To illustrate, let's say we want to have a step for validating a JWT token from inside a resource server (an app server). This can be done with a library that checks data on the token payload. Or it can be done with a network call to an identity provider (IdP) to verify even more things, like whether the user is still valid.

Let's defined this monadic function (the return type is a monad, a type having flatMap on it):

Observable<Boolean> isValid(String token)

Now we can implement it in-memory, a cpu-intensive operation utilizing some token decryption library, validating the signature and some information from it, like expiration date and some ids.

Or we can add a trip to Google, if we use it as an IdP server.

In both cases, the world outside of this function, including the pipeline itself, is not aware of how the Observable<Boolean> is implemented beneath the covers: is it just calling its subscriber in the same thread, like the in-memory version, and then it is equivalent to a function boolean isValid(token). Could be. Or is it actually doing I/O with Google and when the answer returns, the response is parsed and a boolean conclusion is eventually reached. Could also be. The design is agnostic to the implementation.

We could also wrap such a function into another one, with the same signature (String->Observable<Boolean), that adds retry mechanisms on top of this validation (which would make sense for a Google trip, if an HTTP request happens to be lost in traffic or have a large latency). Or it would add a graceful degradation functionality, like, if I cannot use the network to access sites like Google that are outside the data center, then I can just verify signatures with my library and get on with it.

All these alternative solutions, or decorators, can be added, and each of them would still be a function from String to Observable<Boolean>.

So we have low coupling, because changing from sync to async and back does not affect the API.

But, unlike Java Future, the Observable type is composable: let's say that on "token valid" case, we call a function that returns a standard Response, otherwise, we return an ErrorResponse.

Let's say we have an Observable<String>, (which does not imply that we are waiting for multiple tokens - we can wait for just one, which is a form of Future<String>). On this "token observable" we apply flatMap using the isValid function, and obtain a "boolean observable". On this one we apply a flatMap with a lambda function with an "if" statement: if things are valid, return the Observable<Response>, otherwise return another Observable<ErrorResponse>.

That could look like this:

responseObservable = tokenObservable.flatMap(token -> isValid(token)
	.flatMap(valid -> valid? process(request) : 
                             Observable.just(new ErrorResponse(“invalid”)));

You notice that with every flatMap we start with a value of type Observable<T> and obtain another Observable<U> where T and U can be the same or different type parameters.

This composition, then, is an important property; to compose from small components of a certain shape, larger ones of that same shape. But what is that shape?

In the case of the Monad, we can model it as a "type" that has a type parameter T, and two functions: flatMap and lift. The latter is easy: it converts an instance of type T to an instance of the monad. It is the Observable.just(value), or Option.ofNullable(value), to give two examples for two monads.

How about flatMap? This is a high order function that given an observable<T> instance called source, and a monadic function f(T->Observable<U>), then newObservable = sourceObservable.flatMap(t->f(t)) is of type Observable<U> and signifies, in the case of Observable, that when an element of type T is available on source, than the function f is called on it, resulting in a new observable for every such element, and when result elements start to appear on the resulting Observables<U>, they are (also) emitted as part of newObservable, in their order of appearance. Why Observables<U>? Because if sourceObservable emits three elements, then function f applied to each of them will generate a total of three Observables. These can be merged or concatenated. Merged means that all elements from all three observables are added to newObservable "output" as soon as they emerge. This is what flatMap does, merges the three observable results. The alternative is to first wait for all the elements from the first resulting observable, then concatenate it with the second one. This is what concatMap does, it concatenates the resulting observables.

This property of the Observable type, that I can generate from one Observable value, new Observable values with augmented functionality, with more steps of processing, more decorator functionality like retry and fallback mechanisms, this is a large part of what I call composability.

Non-blocking under the covers

At one point, I mentioned that it is possible to have more ongoing flows than available threads by using non-blocking async I/O libraries: you may wonder how that could be possible. So let's take a closer look at how libraries like Netty work, (Netty is the library used as the non-blocking I/O workhorse by Vert.x and PgAsync).

Java has an API called NIO, which aims at working with multiple connections with fewer threads. It works by making certain OS syscalls under the covers. (In the case of Linux these can be epoll or poll.) For example, let's assume we have 1000 connections opened. A thread will call an NIO method called selector.select, which is a blocking call, and return 10 connections that had queued events like "more data available", "connection closed" and others, since the last query. Now the thread doing the query will typically dispatch the 10 events to other threads, so that it can continue polling. So this first thread is an infinite loop continuously querying for events on the opened connections. The 10 events will be dispatched for processing to a thread pool or to an event loop. Netty has an unbound thread pool to process the events. The event processing is cpu-bound (compute intensive). Any I/O would be delegated back to NIO.

A great resource that covers all of these techniques in depth is the classic Reactive Programming with RxJava Creating Asynchronous, Event-Based Applications by Tomasz Nurkiewicz & Ben Christensen.

About the Author

Nicolae Marasoiu is a passionate software developer with years of experience building high performance server-side applications for product and outsourcing companies, from startups to well-established firms. He enjoys contributing in many areas of product development and inspiring teams in their technical adventures.

Rate this Article

Adoption
Style

Hello stranger!

You need to Register an InfoQ account or or login to post comments. But there's so much more behind being registered.

Get the most out of the InfoQ experience.

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Community comments

  • Hope you enjoy it! Feedbacks to improve welcomed

    by Nicolae Marasoiu,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    Happy reading and if you would like to see the Vert.x streaming solution and others in a follow up article, or have any questions let me know, I am at nicolae.marasoiu@gmail.com, www.quora.com/profile/Nicolae-Marasoiu, www.linkedin.com/in/dumitru-nicolae-marasoiu-a1...

  • Re: Hope you enjoy it! Feedbacks to improve welcomed

    by Fanime Fartoon,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    Hi, I'd love to see an example with Vert.x

    Thanks for this one. =)

  • About database connections

    by Michał Gryglicki,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    Hi,
    I like a lot that you touched so many fundamental subjects with thread pools and jdbc and the requirement of being non-blocking top-down to gain most of it. Thanks for this.
    One thing that can be clarified is the DB connections. Correct me if I'm wrong, but I think that even when using pgasync library you are freeing the link between the thread that sends the query to the DBMS and the connection itself. So one thread can send the query and another thread can process results sent by the DBMS. But you still need to block the connection itself for the time of the query execution or event the whole transaction execution (if transactions with scope bigger than a single execution is used).
    So as I understand it with db intensive applications you are still limited by the connection pool size. And you also need to make this connection pool reactive to not block on connection acquiring.
    Nevertheless great article that explains a lot of concepts in simple words and simple examples.

  • Re: About database connections

    by Michał Gryglicki,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    To explain a little more: As much as I know, it's not possible to execute 2 queries concurrently using the same tcp connection to postgres. Or am I wrong?

  • Re: About database connections

    by Nicolae Marasoiu,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    Hi and indeed, at the TCP connection level, as you well observed, the connection itself remains allocated for the duration of the query conversation or the transaction, this is the way relational databases are built until now. We can imagine a possible solution which would need to be implemented in the database servers, for a manner to hold a transaction via an id across connections, which could be short lived or allow conversations from multiple transactions multiplex them. In fact that reminds me that Oracle supports multiplexing logical connections on top of physical ones. Strictly at TCP level, having 300K concurrent connections toward a single database server is possible, but each web server would hold at most 50K transactions at the same time (since any TCP connections occupies a local ephemeral port from a limited pool of less than 64K). In any case when you need this many concurrent transactions we usually consider databases which has different scaling properties e.g. HBase has horizontally scalable writes, something that is uncommon in relational databases. Considering now again that limit of say 30K of concurrent outbound connections going out of a Jetty server to a database server, this is much higher than the number of threads the JVM can support (which is about 10K), and much more easy on resources like memory and context switching. So the bottleneck is raised quite a bit. However the actual limitation is at the database server. This touches on the expected limitation of the example. What I wanted to demonstrate is that you can take an existing application and make it reactive. The added benefits will be much smaller as long as the database remains relational like I mentioned at one point in the article. The real benefit is when you go distributed databases and reactive front-ends like Vert.x, there all the bottlenecks go higher in all layers. Of course, there we do not have traditional transactions anymore. But if they would be (like in NewSql systems), they would probably have to support multiplexing transactions on tcp connections since that would become relevant to performance in that scenario. Great chance to bring all that clarification :) Please let me know if you see more chance for discussion on this really interesting topic.

  • Re: Hope you enjoy it! Feedbacks to improve welcomed

    by Nicolae Marasoiu,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    Great that you enjoyed it and yes! I think a Vert.x one will come:) Thanks again:)

  • concatMap may be better suited for the "monad" "bind" function than flatMap

    by Nicolae Marasoiu,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    By the way, the Observable is a Monad but "bind" function can be concatMap rather than flatMap.

  • Transactionality re-worked

    by Nicolae Marasoiu,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    Hello everyone, the rollback method was not correct (the transaction.rollback() returned an Observable which was not consumed/waited for). I updated the code with more correct transactionality which can be applied as a high order function (HOF) to decorate an action (query or series of queries). An action is a monadic Function<Transaction, Observable><T>> or Function<Transaction, Completable> and one can compose such actions. There are 2 HOFs that given an action return an action with transaction around it. One for Observable returning actions and one for Completable returning ones.

  • Spring 5 Reactor, WebFlux, reactive Spring Data

    by Nicolae Marasoiu,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    Hello everyone,
    I am pleased to announce you that Spring 5 comes with its own Reactive Streams implementation, Spring Reactor, with its core type Flux cleaner, backpressure capable by default compared to RxJava 1 or 2.
    Further, Spring MVC supports controllers return Flux<T>, so it is now fully reactive / streaming capable (which was not, at the time the article was written).
    Also, Spring Data supports reactive paradigm and the ReactiveCrudRepository, for instance, returns reactive types e.g. Flux<T>, with non blocking implementation underneath using the non blocking drivers of Cassandra, Redis, Mongo, etc.
    So currently, using Spring 5 stack, one can make a modern end-to-end reactive application.
    The HTTP support for reactive is also for client-side, and in the github.com/nmarasoiu/sample-webflux-annot-cassa... repo I demonstrate all of the above, including a mocking booking "server" by calling google with non blocking/reactive WebClient and parsing the result in a certain way to pretend the booking was ok not not ok. The repo is based on an existing one, I just added a bookHotel functionality that demonstrates how to do reactively such a thing as getting hotels and trying to book each one in its turn, and use reactive HTTP client in the process. Hope you enjoy it!

  • Re: Spring 5 Reactor, WebFlux, reactive Spring Data

    by Bai Hantsy,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    Hi Nicolae, I also jumped into Spring 5 Reactive feature and the reactive support in the latest Spring Data, but it seems it only included limited support for Mongo, Cassandra, Redis.

    github.com/hantsy/spring-reactive-sample

    Jdbc and JPA are not get reactive because the official JDBC spec was designated to blocking.

  • Re: Spring 5 Reactor, WebFlux, reactive Spring Data

    by Bai Hantsy,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    Hi Nicolae, I also jumped into Spring 5 Reactive feature and the reactive support in the latest Spring Data, but it seems it only included limited support for Mongo, Cassandra, Redis.

    github.com/hantsy/spring-reactive-sample

    Jdbc and JPA are not get reactive because the official JDBC spec was designated to blocking.

  • Re: Spring 5 Reactor, WebFlux, reactive Spring Data

    by Nicolae Marasoiu,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    Hi Bai,
    Recently a Reactive JVM Database Connectivity library called R2DBC sprung up, and it is already included in the Spring Data (all versions are still snapshot).
    Here is a demo with Spring WebFlux and R2DBC, a Reactor based solution end to end: dzone.com/articles/introduction-to-reactive-api...

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

BT