Let me show you step-by-step how to do feature engineering in real-time using Apache Kafka, Docker and Python.
The problem
Feature engineering is about transforming raw data into predictive signals for your Machine Learning model.
And for many real-word problems, you need to run this transformation in real-time. Otherwise, your features will be irrelevant.
For example 💁
Imagine you build a Machine Learning model that can predict stock price changes in the next 1-minute, but the input features the model needs are computed with an 1-hour delay.
No matter how good your model is, the whole system will NOT work 😔
So, as a professional ML engineer, you need to go beyond static CSV files and local Jupyter notebooks, and learn the tools that help you build production-ready real-time systems.
Let’s go through a step-by-step example.
Hands-on example 👩💻🧑🏽💻
Let me show you how to build a real-time feature-engineering service that
reads raw crypto-trade data from a Kafka topic (input topic),
transforms this raw data into Open-High-Low-Close features using stateful window aggregations, and
saves the final features into another Kafka topic (output topic), so the data is accessible to downstream services.
These are the tools we will use to build this:
Python Poetry to package our Python code professionally 😎
Quix Streams to do window aggregations in real-time ⚡
Docker to containerize our service, and ease the deployment to a production-environment 📦
Docker Compose to test things out locally.
A Kafka message broker, in this case Redpanda, to enable communication between our service and the rest of the infrastructure.
Steps
You can find all the source code in this repository
→ Give it a star ⭐ on Github to support my work 🙏
These are the 4 steps to build a real-time feature-engineering service.
Step 1. Create the project structure
We will use Python Poetry to create our ML project structure. You can install it in your system with a one-liner.
$ curl -sSL https://install.python-poetry.org | python3 -
Once installed, go to the command line and type
$ poetry new trade_to_ohlc --name src
to generate the following folder structure
trade_to_ohlc
├── README.md
├── src
│ └── __init__.py
├── pyproject.toml
└── tests
└── __init__.py
Then type
$ cd trade_to_ohlc && poetry install
to create your virtual environment and install your local package src
in editable mode.
Step 2. Start a local Kafka cluster with Docker Compose
To develop and run our feature engineering service locally we first need to spin up a local lightweight Kafka cluster.
And the simplest-most-straight-forward way to do so is to use Docker an Redpanda.
What is Redpanda? 🟥🐼
Redpanda is a Kafka API-compatible data streaming platform, written in C++, that eliminates most of the of complexities that the original Apache Kafka has, while improving performance.
To spin up a minimal Redpanda cluster with one broker you add this docker-compose.yml
version: '3.7'
name: ohlc-data-pipeline
services:
redpanda:
container_name: redpanda
image: docker.redpanda.com/redpandadata/redpanda:v23.2.19
...
console:
container_name: redpanda-console
image: docker.redpanda.com/redpandadata/console:v2.3.8
...
depends_on:
- redpanda
Now go to the command line and start your local Redpanda cluster by running
$ docker compose up -d
Congratulations! You have a Redpanda cluster up and running.
It is time to focus on the feature engineering logic.
Step 3. Feature engineering script
Our script needs to do 3 things:
Read input data from a Kafka topic,
Transform this data into OHLC features, and
Save the final data into another Kafka topic.
And the thing is, you can do the 3 things in Python using the Quix Streams library.
Install the Quix Streams library inside your virtual environment
$ poetry add quixstreams
And create a new Python file to define your feature engineering
dashboard
├── README.md
├── main.py
├── src
│ └── __init__.py
├── pyproject.toml
└── tests
└── __init__.py
Inside this main.py file you
Create a Quix Application, which will handle all the low-level communication with Kafka for you.
from quixstreams import Application
app = Application(
broker_address=os.environ["KAFKA_BROKER_ADDRESS"],
consumer_group="json__trade_to_ohlc_consumer_group"
)
Define the input and output Kafka topics of our application
input_topic = app.topic('raw_trade', value_deserializer="json")
output_topic = app.topic('ohlc', value_serializer="json")
Define you feature engineering logic, in this case we use 10-second window aggregations, using a Pandas-like API.
# Create your Streaming data frame
sdf = app.dataframe(input_topic)
# 10-second window aggregations
sdf = sdf.tumbling_window(timedelta(seconds=WINDOW_SECONDS), 0) \
.reduce(reduce_price, init_reduce_price) \
.final()
Produce the result to the output topic
sdf = sdf.to_topic(output_topic)
Start processing incoming messages with
app.run(sdf)
If you now run your feature engineering
$ poetry run python main.py
...
INFO:quixstreams.app:Waiting for incoming messages
your script will just hang, waiting for incoming messages to arrive from Kafka.
Why there are no incoming messages? 🤔
Simply because there is no data in the input Kafka topic “trades”.
To fix this you can either
Write a second script that dumps mock raw data into the topic, or
Use the actual service that will generate production raw data.
In this case, I opted for 2, and here you can find the complete implementation of the trade producer service.
Step 4. Dockerization of our service
So far you have a working feature engineering service locally. However, to make sure your app will work once deployed in a production environment, like a Kubernetes Cluster, you need to dockerize it.
For that, you need to add a Dockerfile to your repo
trade_to_ohlc
├── Dockerfile
├── README.md
├── main.py
├── poetry.lock
├── pyproject.toml
├── src
│ └── ...
└── tests
└── __init__.py
with the following layered instructions
From this Dockerfile you can build your Docker image
$ docker build -t ${DOCKER_IMAGE_NAME} .
and run the container locally
$ docker run ${DOCKER_IMAGE_NAME}
Another way to spin up your service is to add it to your docker-compose.yml file, together with the trade producer service.
version: '3.7'
name: ohlc-data-pipeline
services:
redpanda:
container_name: redpanda
image: docker.redpanda.com/redpandadata/redpanda:v23.2.19
...
console:
container_name: redpanda-console
image: docker.redpanda.com/redpandadata/console:v2.3.8
...
depends_on:
- redpanda
trade-producer:
container_name: trade-producer
restart: always
build:
context: "./trade_producer"
...
depends_on:
- redpanda
trade-to-ohlc:
container_name: trade-to-ohlc
restart: always
build:
context: "./trade_to_ohlc"
...
depends_on:
- redpanda
Now go to the command line and run everything with
$ docker compose up -d
Congratulations! You have a production-ready real-time feature engineering script up and running.
Wanna learn to build real-time ML systems, together? 🏗️🙋🏾♂️
On September 16th 150+ brave students and myself will start building, step-by-step a real-time ML system that predicts crypto prices.
After completing this 4-week program (+ A LOT of hard work on your end) you will know how to
Build modular and scalable real-time ML systems
For the business problem you care about
Using any real-time data source
Following MLOps best practices, for fast iteration and short-time-to-market.
And of course, we will implement Feature Engineering in Real-time ⚡
Wanna know more about
Building a Real-time ML System. Together?
↓↓↓
Talk to you next week,
Peace and Love
Pau
Hi Pau,
I’m really interested in joining your course, "Building a Real-Time ML System Together." I’ve gained skills through self-learning: training/testing models, tuning parameters, and evaluating their effectiveness, primarily using the scikit-learn library. However, I’m unsure if my current knowledge is enough to start this course, especially since I have no experience with deployment. Could you let me know if this is sufficient background to begin?
I’m based in Manitoba, Canada, and with the 7-hour time difference, your live sessions at 11 AM CET would be at 4 AM my time. On top of that, I have a full-time job and kids, so I would need to rely heavily on the recorded coding sessions.
Given my circumstances, would you still recommend the course? Are the recordings comprehensive enough for someone in my situation to follow along without attending live?
Thank you for your time, and I’m looking forward to hearing your thoughts!
Regards,
Rahat
Love this one Pau!