{"id":7690,"date":"2018-07-18T16:29:39","date_gmt":"2018-07-18T19:29:39","guid":{"rendered":"http:\/\/blog.plataformatec.com.br\/?p=7690"},"modified":"2018-10-17T16:47:20","modified_gmt":"2018-10-17T19:47:20","slug":"whats-new-in-flow-v0-14","status":"publish","type":"post","link":"http:\/\/blog.plataformatec.com.br\/2018\/07\/whats-new-in-flow-v0-14\/","title":{"rendered":"What’s new in Flow v0.14"},"content":{"rendered":"
Flow v0.14 has been recently released with more fine grained control on data emission and tighter integration with GenStage.<\/p>\n
In this blog post we will start with a brief recap of Flow and go over the new changes. We end the post with a description of the new Elixir Development Subscription service by Plataformatec and how it has helped us bring those improvements to Flow.<\/p>\n
Flow<\/a> is a library for computational parallel flows in Elixir. It is built on top of GenStage<\/a> which specifies how Elixir processes should communicate with back-pressure.<\/p>\n Flow is inspired by the MapReduce and Apache Spark models but focuses on single node performance<\/a>. It aims to use all cores of your machines efficiently.<\/p>\n The “hello world” of data processing is a word counter. Here is how we would count the words in a file with If you have a machine with 4 cores, the example above will create 9 light-weight Elixir processes that run concurrently:<\/p>\n The key operation in the example above is precisely the The other insight here is that map operations can always stream the data, as they simply transform it. The My ElixirConf 2016 keynote also provides an introduction to Flow<\/a> (tickets to ElixirConf 2018 are also available<\/a>!).<\/p>\n With this in mind, let’s see what Flow v0.14 brings.<\/p>\n Flow v0.14 gives more explicit control on how the reducing stage works. Let’s see a pratical example. Imagine you want to connect to Twitter’s firehose and count the number of mentions of all users on Twitter. Let’s start by adapting our word counter example:<\/p>\n We changed our code to use some fictional twitter client that streams tweets and then proceeded to retrieve the mentions in each each tweet. The mentions are routed to partitions, which counts them. If we attempted to run the code above, the code would run until the machine eventually runs out of memory, as the Twitter firehose never finishes.<\/p>\n A possible solution is to use a window that controls the data accumulation. We will say that we want to accumulate the data for minute. When the minute is over, the “reduce” operation will emit its accumulator, which we will persist to some storage:<\/p>\n The first change is in the first line. We create a window that lasts 1 minute and discards any accumulated state before starting the next window. We pass the window as argument to The remaining changes are after the Finally, since the flow is infinite, we are no longer calling While the solution above is fine, it unfortunately has two implicit decisions in it:<\/p>\n The control of the accumulator is kept in multiple places: the window definition says the accumulator must be discarded after Flow v0.14 introduces a new function named As you can see, the window no longer controls when data is discarded. Flow v0.14 also introduces a for the remaining users, we will get the top 10 most mentioned per partition and send them to the next stage<\/p>\n<\/li>\n<\/ol>\n We can perform this as:<\/p>\n In the example above, we changed Finally, we have added a call to If you are familiar with data processing pipelines, you may be aware of two pitfalls in the solution above: 1. we are using processing time for handling events and 2. instead of a periodic window, it would probably be best to process events on sliding windows. For the former, you can learn more about the pitfalls of processing time vs event time in Flow’s documentation<\/a>. For the latter, we note that Flow does not support sliding windows out of the box but they are straight-forward to implement on top of At the end of the day, the new functionality in Flow v0.14 gives developers more control over their flows while also making the code clearer.<\/p>\n Flow v0.14 also introduces new functions to make integration with GenStage<\/a> easier. One of these functions is While the above is handy, it is a little bit awkward. Since the For this reason, Flow v0.14 also introduces Some of the improvements done to Flow in version v0.14 were motivated by the feedback we have received from companies participating in our new service called Elixir Development Subscription<\/strong>.<\/p>\n The Elixir Development Subscription service helps companies build Elixir applications with speed and confidence, by leveraging Plataformatec’s engineering team for support and assistance.<\/p>\n If you are adopting Elixir or any of the tools in its ecosystem, such as GenStage, Flow, Phoenix, Ecto and others, and you would like to learn more about the service, please fill in the form below, and we will reach out to you as new spots become available!<\/p>\nFlow<\/code>:<\/p>\n
File.stream!(\"path\/to\/some\/file\")\n|> Flow.from_enumerable()\n|> Flow.flat_map(&String.split(&1, \" \"))\n|> Flow.partition()\n|> Flow.reduce(fn -> %{} end, fn word, acc ->\nMap.update(acc, word, 1, & &1 + 1)\nend)\n|> Enum.to_list()\n<\/code><\/pre>\n
\n
Flow.from_enumerable\/1<\/code>)<\/li>\n
Flow.partition\/2<\/code>)<\/li>\n
Flow.partition\/2<\/code>)<\/li>\n<\/ul>\n
partition\/2<\/code> call. Since we want to count words, we need to make sure that we will always route the same word to the same partition, so all occurrences belong to a single place and not scattered around.<\/p>\n
reduce<\/code> operation, on the other hand, needs to accumulate the data until all input data is fully processed. If the Flow is unbounded (i.e. it never finishes), then you need to specify windows and triggers to check point the data (for example, check point the data every minute or after 100_000 entries or on some condition specified by business rules).<\/p>\n
Explicit control over reducing stages<\/h2>\n
SomeTwitterClient.stream_tweets!()\n|> Flow.from_enumerable()\n|> Flow.flat_map(fn tweet -> tweet[\"mentions\"] end)\n|> Flow.partition()\n|> Flow.reduce(fn -> %{} end, fn mention, acc ->\nMap.update(acc, mention, 1, & &1 + 1)\nend)\n|> Enum.to_list()\n<\/code><\/pre>\n
window = Flow.Window.periodic(1, :minute, :discard)\n\nSomeTwitterClient.stream_tweets!()\n|> Flow.from_enumerable()\n|> Flow.flat_map(fn tweet -> tweet[\"mentions\"] end)\n|> Flow.partition(window: window)\n|> Flow.reduce(fn -> %{} end, fn mention, acc ->\nMap.update(acc, mention, 1, & &1 + 1)\nend)\n|> Flow.each_state(fn acc -> MyDb.persist_count_so_far(acc) end)\n|> Flow.start_link()\n<\/code><\/pre>\n
Flow.partition\/1<\/code>.<\/p>\n
Flow.reduce\/3<\/code>. Whenever the current window terminates, we see that a trigger is emitted. This trigger means that the
reduce\/3<\/code> stage will stop accumulating data and invoke the next functions in the Flow. One of these functions is
each_state\/2<\/code>, that receives the state accumulated so far and persists it to a database.<\/p>\n
Enum.to_list\/1<\/code> at the end of the flow, but rather
Flow.start_link\/1<\/code>, allowing it to run permanently as part of a supervision tree.<\/p>\n
\n
each_state<\/code> only runs when the window finishes (i.e. a trigger is emitted) but this relationship is not clear in the code<\/p>\n<\/li>\n
each_state<\/code> while
reduce<\/code> controls its initial value<\/p>\n<\/li>\n<\/ul>\n
on_trigger\/2<\/code> to make these relationships clearer. As the name implies,
on_trigger\/2<\/code> is invoked with the reduced state whenever there is a trigger. The callback given to
on_trigger\/2<\/code> must return a tuple with a list of the events to emit and the new accumulator. Let’s rewrite our example:<\/p>\n
window = Flow.Window.periodic(1, :minute)\n\nSomeTwitterClient.stream_tweets!()\n|> Flow.from_enumerable()\n|> Flow.flat_map(fn tweet -> tweet[\"mentions\"] end)\n|> Flow.partition(window: window)\n|> Flow.reduce(fn -> %{} end, fn mention, acc ->\nMap.update(acc, mention, 1, & &1 + 1)\nend)\n|> Flow.on_trigger(fn acc ->\nMyDb.persist_count_so_far(acc)\n{[], %{}} # Nothing to emit, reset the accumulator\nend)\n|> Flow.start_link()\n<\/code><\/pre>\n
on_trigger\/2<\/code> gives developers full control on how to change the accumulator and which events to emit. For example, you may choose to keep part of the accumulator for the next window. Or you could process the accumulator to pick only the most mentioned users to emit to the next step in the flow.<\/p>\n
emit_and_reduce\/3<\/code> function that allows you to emit data while reducing. Let’s say we want to track popular users in two ways:<\/p>\n
\n
window = Flow.Window.periodic(1, :minute)\n\nSomeTwitterClient.stream_tweets!()\n|> Flow.from_enumerable()\n|> Flow.flat_map(fn tweet -> tweet[\"mentions\"] end)\n|> Flow.partition(window: window)\n|> Flow.emit_and_reduce(fn -> %{} end, fn mention, acc ->\ncounter = Map.get(acc, mention, 0) + 1\n\nif counter == 100 do\n{[mention], Map.delete(acc, mention)}\nelse\n{[], Map.put(acc, mention, counter)}\nend\nend)\n|> Flow.on_trigger(fn acc ->\nmost_mentioned =\nacc\n|> Enum.sort(acc, fn {_, count1}, {_, count2} -> count1 >= count2 end)\n|> Enum.take(10)\n\n{most_mentioned, %{}}\nend)\n|> Flow.shuffle()\n|> Flow.map(fn mention -> IO.puts(mention) end)\n|> Flow.start_link()\n<\/code><\/pre>\n
reduce\/3<\/code> to
emit_and_reduce\/3<\/code>, so we can emit events as we process them. Then we changed
Flow.on_trigger\/2<\/code> to also emit the most mentioned users.<\/p>\n
Flow.shuffle\/1<\/code>, that will receive all of the events emitted by
emit_and_reduce\/3<\/code> and
on_trigger\/2<\/code> and shuffle them into a series of new stages for further parallel processing.<\/p>\n
reduce\/3<\/code> and
on_trigger\/2<\/code> above.<\/p>\n
Tighter integration with GenStage<\/h2>\n
through_stages\/3<\/code>, which complements
from_stages\/2<\/code> and
into_stages\/3<\/code>, allowing developers to pipe a flow through already running, hand-written stages:<\/p>\n
Flow.from_stages([MyProducer])\n|> Flow.map(...)\n|> Flow.partition(...)\n|> Flow.reduce(...)\n|> Flow.through_stages([MyProducerConsumer])\n|> Flow.map(...)\n|> Flow.into_stages([MyConsumer])\n<\/code><\/pre>\n
*_stages<\/code> functions expect already running stages, it means that you need to start those stages in a separate part of your application and then integrate them into the Flow.<\/p>\n
from_specs\/2<\/code>,
through_specs\/3<\/code> and
into_specs\/3<\/code>, which receives child specifications<\/a> that control how the stages are started. In this case, the Flow takes care of starting those stages and passing the data through them.<\/p>\n
The Elixir Development Subscription<\/h2>\n