In December 2018, we published the Plataformatec and Elixir: 2018 in review article, in which we shared some of our achievements throughout the year, as well as some of the goals we planned for 2019. One of these goals was to engage the R&D team to develop a new open source tool that could streamline data processing pipelines with Elixir.
Today, we are glad to announce the first official release of this tool: Broadway v0.1. Broadway was mainly designed to help developers build concurrent, multi-stage data ingestion and data processing pipelines. It allows developers to consume data efficiently from different sources, such as Amazon SQS, RabbitMQ, and others.
Motivation
We have worked with many companies building data processing pipelines and we have noticed that they were often reimplementing the same features and also running into common pitfalls when assembling complex GenStage topologies. The goal of Broadway is to significantly cut down the development time to assemble those pipelines, while providing many features and avoiding common pitfalls.
Features
Broadway comes with a handful of features that take the burden of defining concurrent GenStage topologies and provide a simple configuration API that automatically defines concurrent producers, concurrent processing, batch handling, and more, leading to both time and cost efficient ingestion and processing of data. Some of those features include:
- Back-pressure
- Automatic acknowledgements
- Batching
- Automatic restarts in case of failures
- Graceful shutdown
- Built-in testing
- Partitioning
Other features are already on the roadmap, such as:
- Rate-limiting
- Statistics / Metrics
- Back-off
How does it work?
Similarly to other process-based behaviours, we can create a Broadway-based data pipeline by defining a module like this:
defmodule MyBroadway do
use Broadway
alias Broadway.Message
def start_link(_opts) do
Broadway.start_link(__MODULE__,
name: __MODULE__,
producers: [
sqs: [
module: {BroadwaySQS.Producer, queue_name: "my_queue"}
]
],
processors: [
default: [stages: 50]
],
batchers: [
s3_odd: [stages: 2, batch_size: 10],
s3_even: [stages: 1, batch_size: 10]
]
)
end
...callbacks...
end
The configuration above defines a pipeline with:
- 1 producer
- 50 processors
- 1 batcher named
:s3_odd
with 2 consumers - 1 batcher named
:s3_even
with 1 consumer
[producer_1]
/ \
/ \
/ \
/ \
[processor_1] [processor_2] ... [processor_50] <- process each message
/\ /\
/ \ / \
/ \ / \
/ x \
/ / \ \
/ / \ \
/ / \ \
[batcher_s3_odd] [batcher_s3_even]
/\ \
/ \ \
/ \ \
/ \ \
[consumer_s3_odd_1] [consumer_s3_odd_2] [consumer_s3_even_1] <- process each batch
In order to process the data provided by the SQS producer, we need to implement two Broadway callbacks: handle_message/3
, invoked by processors for each message, and handle_batch/4
, invoked by consumers for each batch:
defmodule MyBroadway do
use Broadway
alias Broadway.Message
...start_link...
@impl true
def handle_message(_, %Message{data: data} = message, _) when is_odd(data) do
message
|> Message.update_data(&process_data/1)
|> Message.put_batcher(:s3_odd)
end
def handle_message(_, %Message{data: data} = message, _) do
message
|> Message.update_data(&process_data/1)
|> Message.put_batcher(:s3_even)
end
@impl true
def handle_batch(:s3_odd, messages, _batch_info, _context) do
# Send batch of messages to S3 "odd" bucket
end
def handle_batch(:s3_even, messages, _batch_info, _context) do
# Send batch of messages to S3 "even" bucket
end
defp process_data(data) do
# Do some calculations, generate a JSON representation, etc.
end
end
At the end of the pipeline, messages are automatically acknowledged by the SQS producer.
Note: You can also use existing GenStage producers as the source of the pipeline. For more information see the Custom Producers Guide.
What’s next?
There’s a lot more about Broadway. We put a lot of effort in the documentation, including architectural aspects and a full guide on consuming events from Amazon SQS queues.
As with any first release, we expect to gather as much feedback as possible from the community so we can incorporate new use cases and improve the API appropriately. You can also contribute to this project in many ways, either by giving the project a try or building your own connector. The SQS connector presented in this post is already available. A RabbitMQ connector is also planned and should be available soon.
We plan to continue pushing the Elixir ecosystem forward! If you would like to build Elixir systems together with our team, reach out and we will be glad to discuss anything Elixir related, from data pipelines to web applications and distributed systems!
Happy coding!