Drinking From The Data Firehose
If you work with data a lot, and you have a lot of it, it becomes nearly impossible to process the entire corpus in one run. Sometimes you simply can’t do that at all, since the data is coming in form of events. Moreover, as your codebase grows, you’ll be forced to create a library that allows you get most of routing out of the way, so that you could pay more attention to details rather than to grasp the entire flow.
There’s been lot of progress on this subject lately in the Clojure community. Prismatic folks released their processing library, Graph. Kyle Kingsbury created Riemann that uses a similar approach internally. Zach Tellman, creator of Aleph, released Lamina for working with streams, a couple of years ago. Eventsourced, Pipes from Tinkerpop, and Storm by Nathan Marz can also be counted as good example.
The basic idea remains the same. You have a stream of data coming in in form of events. You build a topology of functions that broadcast, transform, filter, aggregate or save state of said events. At any given point in time you can know the intermediate result of calculation, in case when stream of events is being fetched from some data source, or can get results interactively (real-time, yo), and react to the system behavior.
After trying pretty much same approach out for quite some things, it did work quite well. Of course, depending on the required throughput my approach may be not exactly what you want to use in your production system, but most likely the interface will still be similar to the alternatives, even though implementation details will vary.
Today we are release our own library into this melting pot of JVM-based stream processing projects.
EEP is our own young entrant to this space.
When we’ve first started investigating state of the art of event processing,
intuitive choices for inspiration were Erlang (
Node.js (don’t judge!). They certainly
have very different approaches to concurrency but there are similarities.
In gen_event, two
functions that are used more often than others are
gen_event:notify. The former subscribes you to
an occuring event, the latter sends events to the emitter, which dispatches
them to the handler. Node.js approach is
very similar: multiple handlers per event type, routed on emission.
Next we will briefly cover EEP concepts and demonstrate what it feels like to use it with some code examples.
Core concepts in EEP are:
Emitteris responsible for handler registration and event routing. It holds everything together.
Eventis a tuple dispatched by world into the emitter. Event is an arbitrary tuple of user-defined structure. There’s no validation provided internally for structure.
Event Typeis a unique event type identifier, used for routing. It can be number, symbol, keyword, string or anything else. All the events coming into
Emitterhave type associated with them.
Handleris a function and optional state attached to it. Function is a callback, executed whenever
Event Typeis matched for the event. Single handler can be used for multiple
Event Types, but
Event Typecan only have one
Handlerat a time.
Building Data Flows
Now, with these building blocks we can go ahead and start building processing graphs. For that, we need to define several types of handlers that are aware of what data looks like.
filterreceives events of a certain type, and forwards ones for which
trueto one or more other handlers
splitterreceives events of a certain type, and dispatches them to type returned by predicate function. For example, you can split stream of integers to even and odd ones and process them down the pipeline differently.
transformerdefines a transformer that gets typed tuples, applies transformation function to each one of them and forwards them to one or more other handlers. It’s similar to applying
mapto elements of a list, except for function is applied to stream of data.
aggregatoris initialized with initial value, then gets events of a certain type and aggregates state by applying aggregate function to current state and an incoming event. It’s similar to
reducefunction in Clojure, except for it’s applied to the stream of data.
multicastreceives events of a certain type and broadcasts them to several handlers with different types. For example, whenever an alert is received, you may want to send notifications via email, IRC, Jabber and append event to the log file.
observerreceives events of a certain type and runs function (potentially with side-effects) on each one of them.
bufferreceives events of a certain type and stores them in a circular buffer with given capacity. As soon as capacity is reached, it distributes them to several other handlers.
rollupacts in a manner similar to buffer, except for it’s time-bound but not capacity-bound, so whenever a time period is reached, it dispatches all the events to several other handlers.
Let’s take a closer look at an example of stream processing. For instance, you have a discrete stream of events coming from the web servers, that hold information about page loads on your website. Interesting infromation to monitor would be:
- host (where the page load occured)
- response code (HTTP status of the response)
- user agent infromation
- response duration
- url of the response
From that single payload type you can already yield an incredible amount of information, for example
- slowest/fastest response time
- last 20 response times
- total number of responses (per given amount of time)
- response number breakdown by status code
- hottest URLs on the website (page loads breakdown by url)
- user agent breakdown
- count only 404s
If you do it in the most straightforward way, you will end up with lots of ad-hoc code, that is related to routing: metrics that are related to response time only need response time, you’ll need to have buffers for rollups that will aggregate data for a certain period and stream it to next computation unit and so on.
We went through many use-cases that are related to discrete data aggregation and worked out several entities that will help to create calculation topologies. Besides that, you’ll need a queue that will have some build-in routing capabilities and will manage buffered aggregation, filters, stream data down to several handlers and many other, not-so-obvious things.
Now, we’ll need to declare the metrics we want to aggregate on. In order to make our processing graph more reusable, we’ll separate metric retrieval from metric calculation. This way, we’ll be able to reuse same aggregate function for several types of metrics and will be able to achieve desired result by providing appropriate routing.
1 2 3 4 5 6 7 8
Now, let’s define transformers that receive a complete event, take a single field out of it and redistribute it further down:
1 2 3 4 5 6 7 8 9 10 11
Now, we can define our aggregate functions:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
Now our graph is ready, we can visualize it:
In order to pump some data into the processing graph, let’s generate some random events:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
And pump data to the emitter:
1 2 3 4 5 6 7 8 9 10 11
Why Stream Processing
There’re many advantages of using such an approach for data processing. First of all, you whenever you’re working with a stream, you can have latest data available at all times. There’s no need to go through an entire corpus of data, only get the state of the handlers of your interest.
Every handler is reusable, and you can generate graphs in such a way, there’s not a single
entrypoint to each handler, but there’re several ones. If internal EEP handlers are not
enough for you, you can always reuse
IHandler protocol and extend it with any other handler
of your preference, that would give you an ability to have sliding, tumbling, monotonic windows,
different types of buffers, custom aggregators and so on.
What You Can Do With It
Event streams are very common in every system. One application that’s been quite popular in recent years is “logs as data” processed as a stream. Every production system produces a stream of events and it becomes increasingly obvious to both engineers and business owners alike that tapping into that data can bring a lot of benefits.
To make this more useful, you can use stream processing libraries such as EEP to propagate events to mobile and Web clients, publish them to other apps using messaging technologies such as RabbitMQ, generate alerts and much more.
EEP is a generic project that can be used in a wide range of cases.
Initial implementation of EEP was based on thread pools, and was functioning reasonably well but after some research we decided to take a look if we can use ring buffer abstraction from Disruptor. After several interactions we ended up using Reactor, a new event-driven programming framework from Pivotal. It was a game-changer. EEP got way faster, routing got so much easier (and much faster, too).
Our Clojure interface to Reactor is a separate library, creatively named Meltdown. Now you can deploy a meltdown into production!
We will cover Meltdown in more details in a separate blog post.
(Some) Future Plans
We’re working hard to bring a good support for numerical analysis and good set of statistical functions to be available at your fingertips right in EEP. You can watch our progress on GitHub and follow the news on Twitter @clojurewerkz.
EEP is a ClojureWerkz Project
- Monger, a Clojure MongoDB client for a more civilized age
- Langohr, a Clojure client for RabbitMQ that embraces the AMQP 0.9.1 model
- Cassaforte, a Clojure Cassandra client built around CQL
- Elastisch, a minimalistic Clojure client for ElasticSearch
- Welle, a Riak client with batteries included
- Neocons, a client for the Neo4J REST API
- Quartzite, a powerful scheduling library
and several others. If you like EEP, you may also like our other projects.