Handling data streams
As the version 2 of Play! Framework was published, I was very interested in its new capabilities to handle data streams reactively.
As a technical proof of concept, I wrote a parser that works with chunks of data instead of loading the whole content in memory.
My source was a file containing the geographical coordinates of Wikipedia articles.
(This file is the result of an experience of Triposo, showing how Wikipedia has spread over the planet since the start of the Wikipedia project. Do not forget to watch the other labs from Triposo, they are great!)
Play2 architecture is based on event, and gives us some tools to work with streams of data:
- Enumerators produce chunk of data
- Enumeratees transform these chunks
- Iteratees consumes these chunks
(For more information, you can read:
- Is socket.push(bytes) all you need to program Realtime Web apps? from Sadek Drobi, CTO Zenexity.
- Zound, a PlayFramework 2 audio streaming experiment using Iteratees from Gaetan Renaudeau
- If you understand french, you can read Realtime Web Application, un exemple avec Play2 from Nicolas Martignole)
The production of data is an Enumerator, sending line after line of the input file:
def lineEnumerator(source: Source) : Enumerator[String] = {
val lines = source.getLines()
Enumerator.fromCallback1[String] ( _ => {
val line = if (lines.hasNext) {
Some(lines.next())
} else {
None
}
Future.successful(line)
}, source.close)
}
With an Enumeratee, each line can be possibly parsed into a Coordinate class:
val lineParser: Enumeratee[String, Option[Coordinate]] = Enumeratee.map[String] { line =>
line.split("\t") match {
case Array(_, IsDouble(latitude), IsDouble(longitude)) => Some(Coordinate(latitude, longitude))
case _ => None
}
}
The Enumerator can be composed with an Enumeratee with:
val source = scala.io.Source.fromFile(Play.getExistingFile("conf/coosbyid.txt").get)
lineEnumerator(source) &> lineParser
I know, the method's name "&>" can make some of you go away. Please stay! This sign is like the pipe in bash. It is very easy to understand:
lineEnumerator(source) &> lineParser
is the same as (removing infix notation)
lineEnumerator(source).&>(lineParser)
which is the same as (method alias)
lineEnumerator(source).through(lineParser)
Use the last form if the first one is not your taste... :)
With a last Enumeratee to produce JSON, I can send the stream directly to the browser with Server Send Events.
val source = scala.io.Source.fromFile(Play.getExistingFile("conf/coosbyid.txt").get)
val jsonStream = lineEnumerator(source) &> lineParser &> validCoordinate &> asJson
val eventDataStream = jsonStream &> EventSource()
Ok.chunked(eventDataStream).as("text/event-stream")
What to notice:
Only chunks of data are in memory. The whole content of the source file is never loaded completely.
Each step of the process is isolated in an Enumertor or Enumeratee, making it very easy to modify, to re-use, to combine in a different way.
The Enumerator is reading a file, but you can imagine it could read data from a web service, of from a database.
Server-Send Events
When we want to send events in "real time" to the browser, what technologies are available?
- polling: the browser pools the server every x milliseconds to check if there is a new message. This method is not very efficient, because a lot of requests are necessary to give the illusion to update the application in real time.
- long-polling (or Comet): the browser opens a connection to the server (for example in a iframe), and the server keeps the connection opened. When the server wants to push data to the client, it sends this data with the opened connection. The client receives the data, and opens a connection again for further messages. With this method, the browser is always showing that it is waiting for data. This technology does not scale on threaded system, as each opened connection uses a thread. In JEE environment, we need an asynchronous servlet 3.1 not to make the server exploding.
- Server-Send Events (SSE) are quite similar to Comet. The main difference is that the browser manages this connection. For example, it opens the connection again if it falls.
- WebSockets provide a bi-directional, full-duplex communications channels. It is a different protocol than HTTP.
I choose to use Server-Send Events instead of WebSockets because of the following reasons:
- I've already played with WebSockets and wanted to try something new.
- WebSockets are great and can communicate in both directions. But this technology is a new protocol, sometimes difficult to integrate in an existing infrastructure (Proxy, Load-Balancer, Firewall...) Server-Send Events, on the other hand, use the HTTP protocol. The PaaS Heroku does not support WebSockets yet, but support SSE. When pushing data from the server to clients is all what you need, SSE can be what is the most appropriate and is well supported (except in IE for the moment)
Server-Send Events API
The Javascript API is very simple:
feed = new EventSource('/stream');
// receive message
feed.addEventListener('message', function(e) {
var data = JSON.parse(e.data);
// do something with data
}, false);
Visualizing the results
As the stream is sending coordinates, my first attempt was to display them on a earth in 3D. For this, I used three.js, which was very simple. The first results were promising, but sadly, the browser could not display so much information in 3D. I had to found an alternative.
My second attempt was to display these coordinates on a 2D canvas, and that worked well, although less impressive that a 3D map... :)
You can see the result on Heroku: http://wiki-growth.herokuapp.com/
The code source is available on github: https://github.com/yanns/play2-wiki-growth-sse
You can run it by yourself with Play and let heroku sleeping.