Reactive Programming at Picnic: Our findings thus far

Philip Leonard
Picnic Engineering
Published in
8 min readMar 8, 2018

--

Here at Picnic we are concerned with creating the smoothest and most satisfying online grocery shopping experience for our customers. From a backend perspective this means creating systems that are Resilient, Responsive and Reliable. New features should be easy to implement and satisfy these three R’s from the get-go. Whilst migrating to microservices is a stepping stone to building such a platform, reactive programming is becoming the tool that developers are adopting in the pursuit of responsive, resilient, elastic and message driven services as underlined in the Reactive Manifesto.

Image courtesy of projectreactor.io

There are a number of reactive programming frameworks out in the wild that attempt to address this problem in an elegant manner. At Picnic, we have opted for the Reactive Extensions pattern and opt for RxJava (2.x) and its Spring’s successor; Reactor, given their prevalence in other third party libraries and frameworks, and the extensiveness and fluency of their respective APIs. These are both, in part, implementations of the Reactive Streams API that describes the following simple set of interfaces:

  • Publisher: a producer of an unbounded number of sequenced elements,
    publishing them according to the demand of the Subscriber.
  • Subscriber: a consumer of events from the publisher sequence.
  • Subscription: represents a one-to-one lifecycle of a Subscriber
    subscribing to a Publisher.
  • Processor: a composite of Publisher and Subscriber.

The implementation of Publisher that we are concerned with in RxJava is Flowable<T>. We have opted to use this over Observable<T> as it allows us to be more agnostic to the implementation of reactive streams and also encourages developers to be proactive in handling backpressure.

With these simple yet powerful tools, we are able to compose complex concurrent operations into simple, readable chains of reactive operations.

We currently use RxJava in a number of our backend systems, notably for the parallelisation of purchase order generation — being the orders that we place to our wholesalers — and also in our real-time dashboard platform.

RxJava use case at Picnic

Demand prediction was previously a single-threaded sequential process where demand for products belonging to certain suppliers and certain categories (Order Groups) was calculated sequentially. With RxJava, using just some of the simplest operators we were able to leverage our multicore instances and process these demand prediction tasks in parallel. The ability to concisely express concurrent operations and specify fine-grained thread scheduling eliminates the callback hell traditionally synonymous with writing multithreaded programs in Java.

Before the refactor, our purchase order generation code resembled something like the following:

Here we sequentially stream order groups, generate a purchase order for each insert the results into the repository and return the set to the caller. Without even having to refactor our data sources, and simply wrapping our blocking sources with reactive types, we began to immediately enjoy the benefits of concurrent programming;

Such a small refactor results in the parallel generation of purchase orders on the computation scheduler; the RxJava scheduler whose thread pool size is limited to the number of cores on the machine. Therefore with an 8 core instance we can generate up to 8 purchase orders in parallel. The generated orders are then observed on RxJava’s optimised IO scheduler where we then schedule the repository insertion.

While one can benefit from small refactors of blocking operations into reactive Flowable types, in practise, one should structure their application according to the functional reactive programming paradigm to satisfy the three aforementioned Rs.

Such changes are (in part) replicable using Java 8’s parallel streams;

However, there are a few distinct advantages to the reactive approach. The clearest being the succinctness of the implementation. The second is that while parallelised, this Java streams approach is far from reactive as we must block the calling thread for the execution of the entire chain. Consequently, stream is still pulling, not pushing. Finally, in the RxJava approach, the switching of Schedulers is innately part of stream composition, and as a result we can observe (and in our case, store) the output of the upstream observable as soon as the items are emitted, and on the desired thread pool.

In the remainder of this post we will be touching upon a couple of utilities that we here at Picnic have devised using RxJava on our journey to becoming reactive. We will also take a look at where the Picnic backend is heading with reactive frameworks, using Spring’s latest reactive WebFlux library.

Picnic Rx Utilities

Like with any library that is used extensively within a company’s codebase, it didn’t take long before we were devising our own utilities to reduce boilerplate code and to make the lives of reactive developers easier when using RxJava. The utilities can be found at https://github.com/PicnicSupermarket/reactive-support.

Propagating Thread Contexts

While you can read all the articles you want about how RxJava provides concurrency for free, we have run into a couple of interesting problems that have highlighted the importance of reserving a concurrent mindset in the land of reactive programming.

One of the most pressing issues that we encountered was the propagation of thread local contexts to Rx scheduled threads. In RxJava, scheduling a concurrent action or event on a separate thread pool is as easy as specifying a Scheduler when observing and subscribing:

Flowable.defer(() -> io()).subscribeOn(Schedulers.io());

When we specify that a Flowable should be subscribed on the IO scheduler we are indicating that io() be executed on a separate IO thread. Therin lie the performance benefits of RxJava. However, for any code that lies within this scheduled io() method that somehow utilises, or uses libraries that utilise, ThreadLocal variables we must manually specify these ourselves.

In the Picnic backend, these contexts of interest are SLF4J’s MDC logging context map, and Spring’s SecurityContext. Without propagation, logging and calling secured methods such as in the following example prove problematic:

Any logging information such as trace id, username and ip are lost and logging loses much of its meaning (if not becoming completely unreadable) when chasing issues in production. Secondly, our security context is also lost along with any privileges that went with it, and secured methods may no longer be called from Rx scheduled tasks.

Fortunately using the RxJavaPlugins utility we can provide a schedule handler where we can set and unset our relevant thread contexts before and after running any RxJava scheduled runnable, in such a manner as shown below:

We call this from a Spring ServerletContextListener#contextInitialized method that now sets and unsets our desired thread contexts before and after any runnable is scheduled on an Rx Scheduler. We are careful to clear up our contexts after running a scheduled task to avoid the leaking of logging variables or more severely, privilege escalation. A generic version of this utility is available in our open source reactive support library, and can be used in the following fashion using RxThreadLocals;

Minimising DeferredResult boilerplate

In the parts of our backend where we do not as of yet use Spring 5, we discovered that we were writing the same boilerplate code for handling the conversion between reactive types and DeferredResults and SseEmitters over and over again.

DeferredResult s are Spring 4's solution for handling asynchronous REST queries, and they allow us also to handle an increasing number of requests to the backend when communicating over REST.

We simplified the idea of switching between an Observable and a DeferredResult to the following function;

called as show below, where we specify the collector for the collection type that we wish to return as the deferred result (thus serialised by Jackson to JSON);

Our Reactive Future: Moving to Spring WebFlux

Why should we only be concerned with reactive streams within services? If and where we can, reactive services should stream information to their consumers to avoid any reactive speed bumps, thus creating realtime streaming data pipelines. Systems such as Apache Kafka have gained great popularity for just this reason. Spring WebFlux now brings this idea to Java REST web services.

From a supply chain perspective at Picnic we are concerned about accurately predicting demand and placing orders at our wholesalers that represent as accurately as possible the orders from our customers. As mentioned previously, this is a resource intensive process that we parallelised with the help of RxJava.

For this task we have a number of interconnected systems that communicate over REST. The most relevant to this example are the Purchase Order Proposal service that handles the generation of predictions and the Purchase Order Management service that coordinates and automates the placing of these orders to our suppliers, updating of stock and informing the warehouse management system of expected wholesaler deliveries. A cron job kicks off the entire chain when placing orders that are due to be placed at our suppliers.

Whilst the services themselves implement RxJava and therefore to some degree can be labelled as reactive, the communication means are innately not streamed.

Spring WebFlux can help us overcome these reactive speed bumps, and for our use case we can stream and react to our generated purchase orders as soon as they are computed rather than awaiting the entire list.

Such a Spring 5 REST controller:

can be consumed using the WebFlux WebClient in the following manner:

Where the RxJava2Adapter simply converts a Reactor Flux to an RxJava Flowable (as mentioned above, both are merely implementations of Publisher).

Here we are now gluing two reactive services together with a stream, therefore making the pipeline reactive.

There is one clear disadvantage to this approach, the inability to assert backpressure from within a single HTTP JSON stream. It might be the case that the proposer service is producing orders faster than the management service can consume them. In this case it is not possible to implement backpressure so that the proposer slows the production of orders in line with the rate at which the manager service can consume them. It will be interesting to see if perhaps HTTP/2 multiplexing can be utilised to embed backpressure signals into the connection itself, or whether a new reactive application layer protocol is in fact better suited for reactive REST communication in the future.

Conclusion

At Picnic we are actively taking steps to moving towards a reactive platform. We have learnt that while reactive programming frameworks can help us make leaps and bounds in our journey to get there, we are often reminded to think about how concurrency really works under the hood. Whilst these languages also provide fluent and concise syntax, there are still opportunities to reduce boilerplate code. Finally, looking to the future and our move towards a microservices architecture, we turn towards technologies and methodologies that can aid us in the composition of reactive services and streaming pipelines, to form a truly reactive technology platform.

--

--

Tech Lead @ Picnic. Reactive Programming, Observability & Distributed Systems Enthusiast.