Flow v0.14 has been recently released with more fine grained control on data emission and tighter integration with GenStage.
In this blog post we will start with a brief recap of Flow and go over the new changes. We end the post with a description of the new Elixir Development Subscription service by Plataformatec and how it has helped us bring those improvements to Flow.
Quick introduction to Flow
Flow is a library for computational parallel flows in Elixir. It is built on top of GenStage which specifies how Elixir processes should communicate with back-pressure.
Flow is inspired by the MapReduce and Apache Spark models but focuses on single node performance. It aims to use all cores of your machines efficiently.
The “hello world” of data processing is a word counter. Here is how we would count the words in a file with Flow:
File.stream!("path/to/some/file")
|> Flow.from_enumerable()
|> Flow.flat_map(&String.split(&1, " "))
|> Flow.partition()
|> Flow.reduce(fn -> %{} end, fn word, acc ->
Map.update(acc, word, 1, & &1 + 1)
end)
|> Enum.to_list()
If you have a machine with 4 cores, the example above will create 9 light-weight Elixir processes that run concurrently:
- 1 process for reading from the file (Flow.from_enumerable/1)
- 4 processes for performing map operations (everything before Flow.partition/2)
- 4 processes for performing reduce operations (everything after Flow.partition/2)
The key operation in the example above is precisely the partition/2 call. Since we want to count words, we need to make sure that we will always route the same word to the same partition, so all occurrences belong to a single place and not scattered around.
The other insight here is that map operations can always stream the data, as they simply transform it. The reduce operation, on the other hand, needs to accumulate the data until all input data is fully processed. If the Flow is unbounded (i.e. it never finishes), then you need to specify windows and triggers to check point the data (for example, check point the data every minute or after 100_000 entries or on some condition specified by business rules).
My ElixirConf 2016 keynote also provides an introduction to Flow (tickets to ElixirConf 2018 are also available!).
With this in mind, let’s see what Flow v0.14 brings.
Explicit control over reducing stages
Flow v0.14 gives more explicit control on how the reducing stage works. Let’s see a pratical example. Imagine you want to connect to Twitter’s firehose and count the number of mentions of all users on Twitter. Let’s start by adapting our word counter example:
SomeTwitterClient.stream_tweets!()
|> Flow.from_enumerable()
|> Flow.flat_map(fn tweet -> tweet["mentions"] end)
|> Flow.partition()
|> Flow.reduce(fn -> %{} end, fn mention, acc ->
Map.update(acc, mention, 1, & &1 + 1)
end)
|> Enum.to_list()
We changed our code to use some fictional twitter client that streams tweets and then proceeded to retrieve the mentions in each each tweet. The mentions are routed to partitions, which counts them. If we attempted to run the code above, the code would run until the machine eventually runs out of memory, as the Twitter firehose never finishes.
A possible solution is to use a window that controls the data accumulation. We will say that we want to accumulate the data for minute. When the minute is over, the “reduce” operation will emit its accumulator, which we will persist to some storage:
window = Flow.Window.periodic(1, :minute, :discard)
SomeTwitterClient.stream_tweets!()
|> Flow.from_enumerable()
|> Flow.flat_map(fn tweet -> tweet["mentions"] end)
|> Flow.partition(window: window)
|> Flow.reduce(fn -> %{} end, fn mention, acc ->
Map.update(acc, mention, 1, & &1 + 1)
end)
|> Flow.each_state(fn acc -> MyDb.persist_count_so_far(acc) end)
|> Flow.start_link()
The first change is in the first line. We create a window that lasts 1 minute and discards any accumulated state before starting the next window. We pass the window as argument to Flow.partition/1.
The remaining changes are after the Flow.reduce/3. Whenever the current window terminates, we see that a trigger is emitted. This trigger means that the reduce/3 stage will stop accumulating data and invoke the next functions in the Flow. One of these functions is each_state/2, that receives the state accumulated so far and persists it to a database.
Finally, since the flow is infinite, we are no longer calling Enum.to_list/1 at the end of the flow, but rather Flow.start_link/1, allowing it to run permanently as part of a supervision tree.
While the solution above is fine, it unfortunately has two implicit decisions in it:
- 
each_stateonly runs when the window finishes (i.e. a trigger is emitted) but this relationship is not clear in the code
- 
The control of the accumulator is kept in multiple places: the window definition says the accumulator must be discarded after each_statewhilereducecontrols its initial value
Flow v0.14 introduces a new function named on_trigger/2 to make these relationships clearer. As the name implies, on_trigger/2 is invoked with the reduced state whenever there is a trigger. The callback given to on_trigger/2 must return a tuple with a list of the events to emit and the new accumulator. Let’s rewrite our example:
window = Flow.Window.periodic(1, :minute)
SomeTwitterClient.stream_tweets!()
|> Flow.from_enumerable()
|> Flow.flat_map(fn tweet -> tweet["mentions"] end)
|> Flow.partition(window: window)
|> Flow.reduce(fn -> %{} end, fn mention, acc ->
Map.update(acc, mention, 1, & &1 + 1)
end)
|> Flow.on_trigger(fn acc ->
MyDb.persist_count_so_far(acc)
{[], %{}} # Nothing to emit, reset the accumulator
end)
|> Flow.start_link()
As you can see, the window no longer controls when data is discarded. on_trigger/2 gives developers full control on how to change the accumulator and which events to emit. For example, you may choose to keep part of the accumulator for the next window. Or you could process the accumulator to pick only the most mentioned users to emit to the next step in the flow.
Flow v0.14 also introduces a emit_and_reduce/3 function that allows you to emit data while reducing. Let’s say we want to track popular users in two ways:
- whenever a user reaches 100 mentions, we immediately send it to the next stage for processing and reset its counter
- 
for the remaining users, we will get the top 10 most mentioned per partition and send them to the next stage 
We can perform this as:
window = Flow.Window.periodic(1, :minute)
SomeTwitterClient.stream_tweets!()
|> Flow.from_enumerable()
|> Flow.flat_map(fn tweet -> tweet["mentions"] end)
|> Flow.partition(window: window)
|> Flow.emit_and_reduce(fn -> %{} end, fn mention, acc ->
counter = Map.get(acc, mention, 0) + 1
if counter == 100 do
{[mention], Map.delete(acc, mention)}
else
{[], Map.put(acc, mention, counter)}
end
end)
|> Flow.on_trigger(fn acc ->
most_mentioned =
acc
|> Enum.sort(acc, fn {_, count1}, {_, count2} -> count1 >= count2 end)
|> Enum.take(10)
{most_mentioned, %{}}
end)
|> Flow.shuffle()
|> Flow.map(fn mention -> IO.puts(mention) end)
|> Flow.start_link()
In the example above, we changed reduce/3 to emit_and_reduce/3, so we can emit events as we process them. Then we changed Flow.on_trigger/2 to also emit the most mentioned users.
Finally, we have added a call to Flow.shuffle/1, that will receive all of the events emitted by emit_and_reduce/3 and on_trigger/2 and shuffle them into a series of new stages for further parallel processing.
If you are familiar with data processing pipelines, you may be aware of two pitfalls in the solution above: 1. we are using processing time for handling events and 2. instead of a periodic window, it would probably be best to process events on sliding windows. For the former, you can learn more about the pitfalls of processing time vs event time in Flow’s documentation. For the latter, we note that Flow does not support sliding windows out of the box but they are straight-forward to implement on top of reduce/3 and on_trigger/2 above.
At the end of the day, the new functionality in Flow v0.14 gives developers more control over their flows while also making the code clearer.
Tighter integration with GenStage
Flow v0.14 also introduces new functions to make integration with GenStage easier. One of these functions is through_stages/3, which complements from_stages/2 and into_stages/3, allowing developers to pipe a flow through already running, hand-written stages:
Flow.from_stages([MyProducer])
|> Flow.map(...)
|> Flow.partition(...)
|> Flow.reduce(...)
|> Flow.through_stages([MyProducerConsumer])
|> Flow.map(...)
|> Flow.into_stages([MyConsumer])
While the above is handy, it is a little bit awkward. Since the *_stages functions expect already running stages, it means that you need to start those stages in a separate part of your application and then integrate them into the Flow.
For this reason, Flow v0.14 also introduces from_specs/2, through_specs/3 and into_specs/3, which receives child specifications that control how the stages are started. In this case, the Flow takes care of starting those stages and passing the data through them.
The Elixir Development Subscription
Some of the improvements done to Flow in version v0.14 were motivated by the feedback we have received from companies participating in our new service called Elixir Development Subscription.
The Elixir Development Subscription service helps companies build Elixir applications with speed and confidence, by leveraging Plataformatec’s engineering team for support and assistance.
If you are adopting Elixir or any of the tools in its ecosystem, such as GenStage, Flow, Phoenix, Ecto and others, and you would like to learn more about the service, please fill in the form below, and we will reach out to you as new spots become available!