Railgun: A new weapon for mission critical streaming tasks

Practically all distributed streaming engines today implement sliding windows as hopping windows — to a point that they are called the same by systems like Flink or Spark Streaming. There are many reasons for this design decision and we dived deep into this topic in a previous post. This raises the question of whether it is even possible […]

Practically all distributed streaming engines today implement sliding windows as hopping windows — to a point that they are called the same by systems like Flink or Spark Streaming.

There are many reasons for this design decision and we dived deep into this topic in a previous post.

This raises the question of whether it is even possible to have accurate metrics per event while respecting tight latency Service Level Agreements (SLAs), and have it run in a distributed setting.

In this post we want to talk about Railgun, a novel, elastic, and distributed streaming engine implemented by Feedzai to provide accurate metrics while complying with millisecond-level latencies on high percentiles (≥ 99.9%).

Sounds too good to be true? Let’s look at the several pieces and techniques that enable Railgun’s accuracy and performance.

Event Reservoir

Accurate metrics per event require bookkeeping all the events. Namely, and to support multiple window types (real-time windows, delayed windows, tumbling windows, etc.), for a specific point in time we need to know:

  1. what events have expired for a metric aggregation.
  2. what events have arrived for a metric aggregation.

To do this in a few milliseconds, we need a way to store and access events efficiently. The Event Reservoir is our secret sauce to make this possible, by storing raw events in disk and providing iterators to support the two actions above, in lightning speed.

For efficiency, the Event Reservoir works with chunks of events. Whenever a new event arrives, it is added to the current open chunk, which is kept in memory.

Chunks have a target byte size. When chunks achieve this size, they are considered closed and get written asynchronously to persistent storage. As soon as they are closed, chunks become immutable. This immutability allows us to apply aggressive compression and serialization algorithms to minimize I/O storage.

Importantly, the Event Reservoir takes advantage of the time-based, sequential access of events. In fact, and regardless of the type of window, events are always consumed in order.

Event Reservoir

This allows us to define reservoir iterators that efficiently load chunks of events from persistent storage to memory. To know what events have arrived and what have expired, we only need to keep the head and the tail of the window in memory.

With this approach, memory consumption is optimal, and we can support extremely long windows with the same performance and accuracy as small windows.

To make sure events are always available in memory, whenever we start to consume a new chunk we eagerly load the next chunk into cache, before it is needed. This allows us to avoid expensive I/O syscalls and serialization operations in the critical path of processing an event.

How cool is that?

But that’s not all…

Still, if for some reason, chunks are evicted from the application cache right before they are requested, thus resulting in syscall to fetch from disk, the request probably won’t trigger an actual read request to the disk. This is because chunks are organized as a sequence in a single file, up to a maximum configured size, which then rolls to a newly created sequence file.

As such, this approach exploits the I/O subsystem of the operating system to read ahead the chunk contents into the page cache. When a chunk not in cache is requested, the operating system can deliver it directly from page cache, thus paying only the deserialization cost — a fraction of what it would be if an actual I/O request to disk was required.

Optimized Plan DAGs

All metric executions within a Reservoir are optimized to minimize computations.

When a new metric is added, we add it to a DAG of operations. A DAG in Railgun always follows the order: WindowFilterGroup ByAggregator.

This design restricts Railgun’s metric expressibility to follow a strict order of operations, but allows us to optimize performance, by avoid repeating unnecessary computations, especially the ones related with windows.

Since we often have metrics sharing the same Window, Filter, and Group By, the plan optimizes these by reusing the DAG’s prefix path. For instance, consider these two queries with three metric aggregations:

  • SELECT SUM(amount), COUNT(*) FROM payments
  • SELECT AVG(amount) FROM payments
    GROUP BY merchant [RANGE 5 MINUTES]

Then, the DAG of these queries is:

Optimized DAG Example

where all metrics share the same window, but metrics in the first query group the events by the card field, while metrics in the second query group the events by the merchant field.

Aggregation State Store: RocksDB

Similarly to Flink and other systems, we store in RocksDB the state of the metric aggregations. We use RocksDB given its proven maturity and low-latency performance for a key-value store.

Messaging Layer: Kafka

We use Kafka as the underlying messaging system, but also as a way to be fault-tolerant and to distribute load between the several Railgun instances.

Since both the state store and the reservoir write their state to persistent storage often, when a fault happens we only need to recover a very small amount of events, which can be efficiently pulled from Kafka topics upon request.

Kafka’s Zookeeper is also responsible for detecting when a Railgun Node (configured also as a Kafka Consumer) crashed and is no longer able to poll messages.

As a messaging system, Kafka stores messages in topics and provides built-in capabilities to split topics into several partitions.

In our case, we configure as many topics per stream as different top-level group bys (e.g. card, merchant, etc.). Then, for each topic, we tune the number of partitions to achieve a good level of parallelism and support a high throughput.

We exploit Kafka’s internal division of topic-partition to determine the number of Railgun Task Processors.

The Task Processor is the main unit of concurrency inside Railgun, and where metrics are computed. We have at least as many Task Processors as different topic-partitions configured in Kafka.

Task Processors live inside a Processor, and a Processor is our main unit of parallelism. Each Processor uses a single dedicated thread to minimize context switching and synchronization.

Technically there is no difference between two Processors in a single physical Railgun Node, or two Processors in two different physical Railgun Nodes. This flexibility allows us to better exploit the machines available for the deployment.

Putting it all together

The following image shows the Railgun execution from the perspective of an event, end-to-end, from the moment the event enters the front-end in REST (step 1), to the moment Railgun replies to the event submission with all the metric aggregations computed for this event (step 6).

Tale of an Event in Railgun

Show me the numbers: Flink vs Railgun

By now you are probably wondering how this works in practice, right?

To test Railgun, we compared it with one of the most popular and robust Streaming Engines: Apache Flink.

We used a Kubernetes deployment with 3 separate VMs:

  • 1 Kafka node(including Zookeeper) with 10 partitions per topic.
  • 1 injector sending events at a sustained throughput of 500 ev/s.
  • 1 computing engine — Railgun or Flink — with a JVM heap of 10GB where we compute just one metric:
    SUM(amount) GROUP BY card OVER 60min Window.
    For the latter, in Railgun we use a real-time sliding window, whereas in Flink we use hopping windows.

Our goal is to validate how latencies vary between engines with the same throughput and cluster setup.

We measure latency from the injector based on the reply message time: the end-to-end latency since we send the message until the moment we consume the aggregation response. Latencies are corrected to take into account the coordination omission problem.

All runs are of 35 minutes, where the first 5 minutes are for warm up, and ignored for latency purposes. We use an anonymized fraud dataset from one of our clients to simulate real-world dictionary cardinality for aggregation states.

For Flink, since real-time sliding windows are not available out of the box, we use hopping windows, where we vary the size of the hop from 5min to 5seconds. In a previous post, we’ve explained the relationship between the hop size and accuracy. But in a nutshell, the larger the hop size, the less frequent metrics are accurate. To ensure that Flink is optimized for latency, rather than throughput, we set Flink’s Kafka Client to use a batch timeout of zero.

For Railgun we use a real-time sliding window, accurate for every event, to compute the exact same metric.

Latency Railgun vs Flink at 500ev/s

The image above shows how Flink latencies are affected when we increase the hop’s granularity. Clearly, in this setup, with hops of 10s or less, Flink is unable to keep up, even with a small throughput of 500 ev/s.

In most of Feedzai’s setups, we are required to score events in less than 250ms in the 99.9% percentile. Note that, to fulfill this requirement we would need hops of at least 1 minute, which would severely compromise the metrics’ accuracy.

In opposition, Railgun is able to deliver accurate metrics per event with lower latencies than Flink in all percentiles.

Do you want to know more? Please check out our paper to find more benchmarks and details regarding Railgun!

This work was done in collaboration with João Oliveirinha and Pedro Cardoso.

Railgun: A new weapon for mission critical streaming tasks was originally published in Feedzai Techblog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Source: Feedzai