Cassaforte is a new Clojure client for
Apache Cassandra 1.2+. It is built around CQL 3 and focuses on ease
of use. You will likely find that using Cassandra from Clojure has
never been so easy.
1.2.0 is a minor release that introduces one minor feature, fixes a
couple of bugs, and makes Cassaforte compatible with Cassandra 2.0.
Changes between Cassaforte 1.1.x and 1.2.0
Cassandra Java Driver Update
Cassandra Java driver has been updated to 1.0.3 which
supports Cassandra 2.0.
Fix problem with batched prepared statements
insert-batch didn’t play well with prepared statements, problem fixed now. You can use insert-batch
normally with prepared statements.
Hayt query generator update
Hayt is updated to 1.1.3 version, which contains fixes for token function and some internal improvements
that do not influence any APIs.
Added new Consistency level DSL
Consistency level can now be (also) passed as a symbol, without resolving it to ConsistencyLevel instance:
Query DSL added for managing users create-user, alter-user, drop-user, grant, revoke,
list-users, list-permissions for both multi and regular sessions.
News and Updates
New releases and updates are announced on
Twitter. Cassaforte also has a
mailing list, feel
free to ask questions and report issues there.
Automatic recovery of channels that are created without an explicit
number now works correctly.
Contributed by Joe Freeman.
clj-http Upgrade
clj-http dependency has been updated to 0.7.6.
Clojure 1.3 is No Longer Supported
Langohr requires Clojure 1.4+ as of this version.
More Convenient Publisher Confirms Support
langohr.confirm/wait-for-confirms is a new function that
waits until all outstanding confirms for messages
published on the given channel arrive. It optionally
takes a timeout:
123
(langohr.confirm/wait-for-confirmsch);; wait up to 200 milliseconds(langohr.confirm/wait-for-confirmsch200)
Route One is a Clojure DSL for URL/URI/path generation
from a route map, compatible with Compojure’s Clout.
1.0.0-rc2 is a development milestone release that further improves Compojure
integration.
Changes between Route One 1.0.0-rc1 and 1.0.0-rc2
Tight Compojure integration
It is now possible to define named Compojure routes with Route One:
1234567
(ns my-app(:require[compojure.core:ascompojure:ascompojure])(:useclojurewerkz.route-one.compojure))(compojure/defroutesmain-routes(GETaboutrequest(handlers.root/root-pagerequest));; will use /about as a template(GETdocumentsrequest(handlers.root/documents-pagerequest));; will use /documents as a template)
This will generate main-routes in same exact manner Compojure
generates them, but will also add helper functions for building urls
(about-path, about-url, documents-path, document-url and so
on).
To use this feature, you’ll have to bring in Compojure as a dependency
to your project:
Last week we introduced a new project we’ve been working on for
a few months, EEP.
To make event processing concurrent and parallel in EEP, there needs
to be a way to transfer events (messages) from threads that
produce them to threads that consume them. In addition,
it is desirable to also be able to filter events
consumers are interested in.
There are multiple message passing libraries available on the JVM.
Some of them are stable and very mature but have very small
contributor base, others are extremely actively used but less
convenient to consume from Clojure, some do not offer the features we
wanted. So we decided to wait and see, and not make the choice yet.
Enter Reactor
This move turned out to be the right one: a couple of months after
the first EEP prototype was put up on GitHub to ask for some feedback
from our friends (hi, Darach!), folks at Pivotalintroduced Reactor, a “foundational framework for asynchronous programming
on the JVM”.
Reactor core is an event (message) passing library that has several
features that we found very handy for stream processing:
Consumer may consume events selectively (events have routing keys)
Message passing implementation is pluggable
It’s very easy to run multiple reactors in the same JVM
After seeing that it only took 2 hours to write a first Meltdown version
with some tests, we were convinced Reactor is a great choice for our
needs.
Meltdown Goes to School
Since Reactor was a really young project by the time we started
using it (we may have been the first people outside of Pivotal
who have built something on top of Reactor at that point),
it took a few iterations and serious breaking API changes
in both libraries until we’ve been confident enough to port
EEP to it.
In the end it took Alex a few hours to make the switch,
which again demonstrates how well Reactor and Meltdown
fit EEP.
The Future of Meltdown
Meltdown still does not cover all of Reactor’s functionality and
Reactor is under active development, so Meltdown will stay
alpha for some time. Since we’ve announced it here, we will do
our best with writing some initial documentation guides
for it.
In the meantime, feel free to give Meltdown a try. Check it out
in the REPL, try modeling a problem that needs message passing
with it. It likely will already take you a long way (except
for the really poor documentation).
If you work with data a lot, and you have a lot of it, it becomes
nearly impossible to process the entire corpus in one run. Sometimes
you simply can’t do that at all, since the data is coming in form of
events. Moreover, as your codebase grows, you’ll be forced to create
a library that allows you get most of routing out of the way, so that
you could pay more attention to details rather than to grasp the entire
flow.
There’s been lot of progress on this subject lately in the Clojure
community. Prismatic folks released their processing library,
Graph. Kyle Kingsbury created
Riemann that uses a similar
approach internally. Zach Tellman, creator of Aleph, released
Lamina for working with streams,
a couple of years ago.
Eventsourced,
Pipes from Tinkerpop, and
Storm by Nathan Marz can also
be counted as good example.
The basic idea remains the same. You have a stream of data coming in
in form of events. You build a topology of functions that broadcast,
transform, filter, aggregate or save state of said events. At any
given point in time you can know the intermediate result of
calculation, in case when stream of events is being fetched from some
data source, or can get results interactively (real-time,
yo), and react to the system behavior.
After trying pretty much same approach out for quite some things, it
did work quite well. Of course, depending on the required throughput
my approach may be not exactly what you want to use in your production
system, but most likely the interface will still be similar to the
alternatives, even though implementation details will vary.
Today we are release our own library into this melting pot of JVM-based
stream processing projects.
When we’ve first started investigating state of the art of event processing,
intuitive choices for inspiration were Erlang (gen_event) and
Node.js (don’t judge!). They certainly
have very different approaches to concurrency but there are similarities.
In gen_event, two
functions that are used more often than others are
gen_event:add_handler and gen_event:notify. The former subscribes you to
an occuring event, the latter sends events to the emitter, which dispatches
them to the handler. Node.js approach is
very similar: multiple handlers per event type, routed on emission.
Next we will briefly cover EEP concepts and demonstrate what it feels
like to use it with some code examples.
Core concepts
Core concepts in EEP are:
Emitter is responsible for handler registration and event
routing. It holds everything together.
Event is a tuple dispatched by world into the emitter. Event is an
arbitrary tuple of user-defined structure. There’s no validation
provided internally for structure.
Event Type is a unique event type identifier, used for routing. It can
be number, symbol, keyword, string or anything else. All the events
coming into Emitter have type associated with them.
Handler is a function and optional state attached to it. Function is a
callback, executed whenever Event Type is matched for the
event. Single handler can be used for multiple Event Types, but
Event Type can only have one Handler at a time.
Building Data Flows
Now, with these building blocks we can go ahead and start building
processing graphs. For that, we need to define several types of handlers
that are aware of what data looks like.
filter receives events of a certain type, and forwards ones
for which filter-fn returns true to one or more other handlers
splitter receives events of a certain type, and dispatches them
to type returned by predicate function. For example, you can split
stream of integers to even and odd ones and process them down the
pipeline differently.
transformer defines a transformer that gets typed tuples, applies
transformation function to each one of them and forwards them to
one or more other handlers. It’s similar to applying map to
elements of a list, except for function is applied to stream of data.
aggregator is initialized with initial value, then gets events of
a certain type and aggregates state by applying aggregate function to
current state and an incoming event. It’s similar to reduce
function in Clojure, except for it’s applied to the stream of data.
multicast receives events of a certain type and broadcasts them
to several handlers with different types. For example, whenever an
alert is received, you may want to send notifications via email,
IRC, Jabber and append event to the log file.
observer receives events of a certain type and runs function
(potentially with side-effects) on each one of them.
buffer receives events of a certain type and stores them in a
circular buffer with given capacity. As soon as capacity is
reached, it distributes them to several other handlers.
rollup acts in a manner similar to buffer, except for it’s
time-bound but not capacity-bound, so whenever a time period is
reached, it dispatches all the events to several other handlers.
Let’s take a closer look at an example of stream processing. For
instance, you have a discrete stream of events coming from the web
servers, that hold information about page loads on your
website. Interesting infromation to monitor would be:
host (where the page load occured)
response code (HTTP status of the response)
user agent infromation
response duration
url of the response
From that single payload type you can already yield an incredible amount
of information, for example
slowest/fastest response time
last 20 response times
total number of responses (per given amount of time)
response number breakdown by status code
hottest URLs on the website (page loads breakdown by url)
user agent breakdown
count only 404s
If you do it in the most straightforward way, you will end up with lots
of ad-hoc code, that is related to routing: metrics that are related to
response time only need response time, you’ll need to have buffers for
rollups that will aggregate data for a certain period and stream it to
next computation unit and so on.
We went through many use-cases that are related to discrete data
aggregation and worked out several entities that will help to create
calculation topologies. Besides that, you’ll need a queue that will have
some build-in routing capabilities and will manage buffered aggregation,
filters, stream data down to several handlers and many other,
not-so-obvious things.
Now, we’ll need to declare the metrics we want to aggregate on. In order
to make our processing graph more reusable, we’ll separate metric
retrieval from metric calculation. This way, we’ll be able to reuse same
aggregate function for several types of metrics and will be able to
achieve desired result by providing appropriate routing.
12345678
(def emitter(new-emitter));; Redistribute an event to all transformers and aggregators(defmulticastemitter:page-load[:total-request-count:load-time-metrics:status-code-metrics:user-agent-metrics:url-metrics])
Now, let’s define transformers that receive a complete event, take a
single field out of it and redistribute it further down:
1234567891011
;; Take only :load-time for metrics related to load time(deftransformeremitter:load-time-metrics:load-time[:load-time-slowest:load-time-fastest:load-times-last-20]);; Take only :status for metrics related to status code(deftransformeremitter:status-code-metrics:status:count-by-status-code);; Take only :user-agent for metrics related to user agent(deftransformeremitter:user-agent-metrics:user-agent:count-by-user-agent);; Take only :url for metrics related to url(deftransformeremitter:url-metrics:url:count-by-url)
Now, we can define our aggregate functions:
123456789101112131415161718192021222324252627
;; Define an counter aggregator for all requests(defaggregatoremitter:total-request-count(fn [acc_](inc acc))0);; Preserve only slowest load time(defaggregatoremitter:load-time-slowest(fn [previouscurrent](if (and previous(< previouscurrent))previouscurrent))nil);; Preserve only fastest load time(defaggregatoremitter:load-time-fastest(fn [previouscurrent](if (and previous(> previouscurrent))previouscurrent))nil)(let [count-aggregate(fn [accmetric](assoc accmetric(inc (get accmetric0))))];; Aggregate counts by status code(defaggregatoremitter:count-by-status-codecount-aggregate{});; Aggregate counts by user agent code(defaggregatoremitter:count-by-user-agentcount-aggregate{});; Aggregate counts by user url(defaggregatoremitter:count-by-urlcount-aggregate{}));; Define a buffer for last 20 events(defbufferemitter:load-times-last-2020)
Now our graph is ready, we can visualize it:
In order to pump some data into the processing graph, let’s generate some random events:
(defn rand-between"Generates a random number between two points"[startend](+ start(rand-int (- endstart))))(def hosts["host01""host02""host03""host04""host05"])(def urls["/url01""/url02""/url03""/url04""/url05"])(def user-agents["Chrome""Mozilla""Safari""Firefox"])(def status-codes[200404500302])(defn gen-events"Generates an infinite stream of random data"([](gen-events[]0))([ci](lazy-cat c(gen-events[{:event_idi:host(get hosts(rand-between0(count hosts))):status(get status-codes(rand-between0(count status-codes))):url(get urls(rand-between0(count urls))):user-agent(get user-agents(rand-between0(count user-agents))):load-time(rand-between300500)}](inc i)))))(defn median"Calculates median for given array of numbers"[data](let [sorted(sort data)n(count data)i(bit-shift-right n1)](if (even?n)(/ (+ (nth sorted(dec i))(nth sortedi))2)(nth sorted(bit-shift-right n1)))))
And pump data to the emitter:
1234567891011
(doseq [event(take 20000(gen-events))](notifyemitter:page-loadevent))(println "Total request count: "(state(get-handleremitter:total-request-count)))(println "Count by url:"(state(get-handleremitter:count-by-url)))(println "Count by user agent:"(state(get-handleremitter:count-by-user-agent)))(println "Count by user status:"(state(get-handleremitter:count-by-status-code)))(println "Fastest load time:"(state(get-handleremitter:load-time-fastest)))(println "Slowest load time:"(state(get-handleremitter:load-time-slowest)))(println "Last 20 load times:"(state(get-handleremitter:load-times-last-20)))(println "Median of fast 20 load times:"(float (median(state(get-handleremitter:load-times-last-20)))))
Why Stream Processing
There’re many advantages of using such an approach for data
processing. First of all, you whenever you’re working with a stream,
you can have latest data available at all times. There’s no need to
go through an entire corpus of data, only get the state of the
handlers of your interest.
Every handler is reusable, and you can generate graphs in such a way, there’s not a single
entrypoint to each handler, but there’re several ones. If internal EEP handlers are not
enough for you, you can always reuse IHandler protocol and extend it with any other handler
of your preference, that would give you an ability to have sliding, tumbling, monotonic windows,
different types of buffers, custom aggregators and so on.
What You Can Do With It
Event streams are very common in every system. One application that’s been quite popular
in recent years is “logs as data” processed as a stream. Every production system produces
a stream of events and it becomes increasingly obvious to both engineers and business
owners alike that tapping into that data can bring a lot of benefits.
To make this more useful, you can use stream processing libraries such
as EEP to propagate events to mobile and Web clients, publish them to
other apps using messaging technologies such as RabbitMQ, generate
alerts and much more.
EEP is a generic project that can be used in a wide range of cases.
Enter Meltdown
Initial implementation of EEP was based on thread pools, and was
functioning reasonably well but after some research we decided to take
a look if we can use ring buffer abstraction from
Disruptor. After several
interactions we ended up using
Reactor, a new event-driven
programming framework from Pivotal. It was a
game-changer. EEP got way faster, routing got so much easier (and much
faster, too).
Our Clojure interface to Reactor is a separate library, creatively
named Meltdown. Now you can deploy a meltdown into production!
We will cover Meltdown in more details in a separate blog post.
(Some) Future Plans
We’re working hard to bring a good support for numerical analysis and good set of statistical
functions to be available at your fingertips right in EEP. You can watch our progress on GitHub and follow the news on Twitter @clojurewerkz.
Spyglass now depends on org.clojure/clojure version 1.5.1. It is
still compatible with Clojure 1.3+ and if your project.clj depends
on a different version, it will be used, but 1.5 is the default now.
We encourage all users to upgrade to 1.5, it is a drop-in replacement
for the majority of projects out there.
Asynchronous Cache Store
Spyglass now ships both sync and async implementations of clojure.core.cache.
To instantiate async store, use clojurewerkz.spyglass.cache/async-spyglass-cache-factory.
clojurewerkz.spyglass.cache/spyglass-cache-factory was renamed to clojurewerkz.spyglass.cache/sync-spyglass-cache-factory.
;; uses credentials from environment variables, e.g. on Heroku:
(c/text-connection “127.0.0.1:11211” (System/getenv “MEMCACHE_USERNAME”)
(System/getenv "MEMCACHE_PASSWORD"))
When you need to fine tune things and want to use a custom connection factory, you need
to instantiate auth descriptor and pass it explicitly, like so:
Futures returned by async Spyglass operations now implement “blocking dereferencing”:
they can be dereferenced with a timeout and default value, just like futures created
with clojure.core/future and similar.
Contributed by Joseph Wilk.
Support For Configurable Connections
New functions clojurewerkz.spyglass.client/text-connection-factory and
clojurewerkz.spyglass.client/bin-connection-factory provide a Clojuric
way of instantiating connection factories. Those factories, in turn, can be
passed to new arities of clojurewerkz.spyglass.client/text-connection and
clojurewerkz.spyglass.client/bin-connection to control failure mode,
default transcoder and so on:
SyncSpyglassCache uses synchronous operations from clojurewerkz.spyglass.client. Asynchronous implementation
that returns futures will be added in the future.
SpyMemcached 2.8.10
SpyMemcached has been upgraded to 2.8.10.
Improved Couchbase Support
clojurewerkz.spyglass.couchbase/connection is a new function that connects to Couchbase with the given
bucket and credentials. It returns a client that regular clojurewerkz.spyglass.memcached functions can
use.
Route One is a Clojure DSL for URL/URI/path generation
from a route map, compatible with Compojure’s Clout.
Route One is intentionally a very small library that lets you do two things:
Ability to define routes
Path generation from routes
Route One can be used as part of Web applications, mail delivery services and any other
application that needs to generate URLs using a predefined map of routes.
Supported Clojure Versions
Route One targets Clojure 1.4+, tested against 3 Clojure versions x 3 JDKs on travis-ci.org, and is released under the Eclipse Public License.