Read time: 6 minutes
Two weeks ago I challenged you to build your first feature pipeline. So I decided to take the challenge myself.
Real-time ML is a fascinating topic, which I wanna go deeper into in the following weeks and months. Because of this, I started creating a sequence of small projects where I build real-time products, beginning with a real-time feature pipeline.
Real-time feature pipeline
A real-time feature pipeline is a program that
ingests real-time raw data (e.g. from an external web socket, or from an enterprise message bus like Kafka),
processes this raw data into features (aka inputs for your ML model), and
sends these features to downstream services (e.g. a Feature Store, or back to the message bus).
Traditionally, real-time feature engineering has been a rather obscure topic among data scientists and ML engineers.
The reason is that Python, the lingua franca in the ML world, is not suitable for stream processing, due to its slowness. Hence, traditional stream processing engines and tools have been written in more efficient languages, like Java, which lay outside the core skills of most ML engineers. Data engineers, who often have a software engineering background, have taken the responsibility to implement and manage real-time pipelines using Java-based tools like Apache Spark or Apache Flink.
Luckily things have changed with the emergence of Rust, a next-generation compiled language, which binds very naturally with Python interfaces. Bytewax, a Python library with native Rust bindings, is a great example of this.
Bytewax is the library that I decided to use for this little project. Let’s see how it works and how I used it to build my MVP.
From raw trades to OHLC data
You can find all the code in this GitHub repo, and the final app is publicly deployed on Streamlit Cloud 👉🏽 click here to see it in action
Bytewax is designed following a dataflow paradigm. The dataflow is the sequence of operations and transformations necessary to
ingest incoming data → e.g. read real-time data from a Coinbase websocket,
transform it sequentially, using either
stateless transformations → e.g. parse strings to Python dictionaries.
stateful transformations → e.g. aggregate trade prices over 30-second windows, and
output the final data to an output sink → e.g. plot the final OHLC data on a Streamlit/Bokeh UI component.
To execute a dataflow, you first need to create it and then attach each of the steps, including input fetching and output writing. Only then you can run it.
# instantiate dataflow object
from bytewax.dataflow import Dataflow
flow = Dataflow()
# read input data from websocket
flow.input("input", ManualInputConfig(input_builder))
# parse string data to Python dictionaries
# 1-to-1 stateless operation
flow.map(json.loads)
# generate a key to index the data (key, data)
# this is a preliminary step to use a stateful operator (see next step)
# 1-to-1 stateless operation
flow.map(key_on_product)
# aggregate events (aka trades) in 30-second windows
# Stateful operation
flow.fold_window("30_sec", cc, wc, build_array, acc_values)
# compute Open-High-Low-Close prices for each window
flow.map(calculate_features)
# send output to Streamlit/Bokeh chart
flow.capture(ManualOutputConfig(output_builder))
# time to run it!
from bytewax.execution import run_main
run_main(flow)
Each data flow step is defined by
an operation primitive, for example
map(…)
is a 1-to-1 mapping, anda custom function (e.g.
json.loads
) that specifies what this 1-to-1 mapping does (in this case parse a string into a Python dictionary).
💡 Tip
Writing stream processing pipelines in Bytewax is all about understanding what are the primitives to choose and the custom functions to implement to build each of the dataflow steps.
In my code, I split the dataflow construction into 2 files:
src/dataflow.py
→ defines the dataflow from input to the penultimate step.src/frontend.py
→ adds the last step to the dataflow, to plot the data on a Streamlit/Bokeh char.
If you are interested and wanna dig deeper into the implementation details, I recommend you take a look at the GitHub repo. And give it a ⭐ if you found it useful.
Next steps
The app I just described is a good first project. However, it lacks one crucial ingredient, and that is data persistence. Every time you re-load the frontend app, you lose all the historical OHLC data processed until then.
To solve that, I am planning to add a Feature Store, where we can store (and later serve) the OHLC data generated by the feature pipeline.
Need help building ML products?
Join the new Serverless ML community and connect with hundreds of ML builders worldwide. Ask the community, and share your pain points and victories.
Keep on building.
Keep on learning
Peace and love.
Pau
In this first project, I decided to build a Streamlit app that
fetches real-time trade data from the Coinbase Websocket API
transforms this trade data into OHLC data in real-time using Bytewax, and
The final app is publicly deployed on Streamlit Cloud (👉🏽 click here to see it in action)