Today's article is the second half of the half-baked article I wrote last Saturday.
As I promised, I will show you how to bootstrap the development cluster and build the first version of our pipeline that maps
an unstructured blog of text (in this case a piece of news) into a
structured object (in this case a list of JSONs representing sentiment scores for different crypto coins).
Tip
Unstructured-to-structured data mappers are the building blocks of robust agentic systems. If you get familiar with this problem, you are well on your way to building agentic systems THAT WORK.
So, without further ado, let's do it!
All the source code is available in this repository.
> Go to the repo, and give it a star if you got value from it.
Tools
To follow along, you will need to install the following tools in your machine:
Docker (must have)
kind to create a local Kubernetes cluster.
kubectl to interact with the Kubernetes cluster.
uv to manage the Python environment of each service we will build.
Alternatively, you can reopen the project using the devcontainer in the root of the repository. Inside the devcontainer you will have `kind` and `kubectl` installed.
So, the only MUST HAVE tool you need in your machine is a Docker runtime, for example Docker Desktop.
Tip
If you are a Windows users, I strongly recommend you to use the devcontainer.
Step 1. Create the development Kubernetes cluster
It is important to set up a local development environment that resembles as much as possible the production environment. The more similar your dev and prod environments are, the easier it will be to deploy changes to the system with speed and reliability.
In production, Kai and Nil's trading firm (like maaaany companies out there) uses a Kubernetes cluster to run all the
infrastructure services, for example Apache Kafka to allow async communication between the different services.
business services, for example Nil's amazing trading bots.. and soon our super smart sentiment extractor service.
Now, the closest-to-production-way I know to set up a local environment like this, is with Kubernetes kind.
What is kind?
Kind is a Kubernetes distribution where every cluster node is a Docker container.
It is a great way to get started with Kubernetes as an AI engineer, and it is the tool we will use to set up our development Kubernetes cluster.
You can create your development cluster with
make dev-environment
Again, if you don't like, or you don't have `make` you can just run:
cd deployments/dev/kind && sh ./create_cluster.sh
This will create a one-node Kind cluster with the following services:
Apache Kafka with Strimzi, which allows data to be sent and received asynchronously between our 2 services.
Kafka UI you can use to inspect the topics of your kafka cluster.
To access the Kafka UI you will need to forward the port 9092 to your local machine.
First of all, ensure the `kubectl` is pointing to the correct cluster.
export KUBECONFIG=$HOME/.kube/config-rwml-68f3
Then, you can forward the port 9092 to your local machine with `kubectl`.
kubectl port-forward svc/kafka-ui -n kafka 9092:8080
Now you can access the Kafka UI at `http://localhost:9092`.
BOOM!
Step 2. Build the news-producer service
Our pipeline will be composed of 2 services:
The news producer, which is the service that fetches news in real-time from this awesome news API our CEO Kai mentioned in our first meeting, and push them to a Kafka topic
The news sentiment extractor, which is the service that will consume the news from the Kafka topic, parse them into sentiment scores, and push them into another Kafka topic.
I told you this first version of the pipeline will be a bit bare-bones, so I will mock both the data producer and the sentiment extractor (no LLMs yet, that is for next week).
Now, even at this early stage, I strongly recommend you extract business logic from configuration parameters.
For example, for our first service, the news producer, we will have a `config.py` file that will contain the info our service needs to talk to Kafka.
from pydantic_settings import BaseSettings
class Config(BaseSettings):
kafka_broker_address: str = 'localhost:31234
kafka_input_topic: str = 'news'
Tip
I am setting some default values to get up and running quickly, but feel free to source these parameters using environment variables and a tool like direnv.
The main loop is simple: we use QuixStreams (the most-user friendly real-time-data processing Python library I know of) to push messages one by one into the `news` Kafka topic.
These news are not the real ones, but fake ones I am mocking with the `MockAPI` class.
What is a Kafka topic?
A Kafka topic is a named queue of messages.
In our case, we will have 2 topics:
`news`, which will contain the news articles.
`news_sentiment`, which will contain the sentiment scores.
In the real world, many companies use Kafka as a storage system, where they push EVERYTHING. This is not the original use case of Kafka, but it has become a safety net for many companies to replay events in case of a disaster and repopulate their analytics databases and feature stores.
You can run the service as a standalone Python script with
cd services/news-producer
uv run src/news_producer/main.py
and you should see a console output like this:
2025-06-10 14:12:30.764 | DEBUG | __main__:run:37 - News {'id': '1', 'timestamp_ms': 1718000000, 'text': 'Reuters: Goldman Sachs about to buy 1B BTC, and sell 1B ETH'} pushed to Kafa
2025-06-10 14:12:31.765 | DEBUG | __main__:run:37 - News {'id': '3', 'timestamp_ms': 1718000000, 'text': 'The grass is green'} pushed to Kafa
2025-06-10 14:12:32.771 | DEBUG | __main__:run:37 - News {'id': '1', 'timestamp_ms': 1718000000, 'text': 'Reuters: Goldman Sachs about to buy 1B BTC, and sell 1B ETH'} pushed to Kafa
2025-06-10 14:12:33.772 | DEBUG | __main__:run:37 - News {'id': '3', 'timestamp_ms': 1718000000, 'text': 'The grass is green'} pushed to Kafa
Nice. Let's move on to the next service.
Step 3. Built the sentiment-extractor service
Again, no secrets here. A little bit of:
QuixStreaming
Kafking
Pythoning, and
Mocking
(I just came up with these verbs. Please use them at your own discretion)
You can run the service as a standalone Python script with
cd services/sentiment-extractor
uv run src/sentiment_extractor/main.py
And you should see a console output like this:
2025-06-10 14:12:59.894 | DEBUG | __main__:<lambda>:66 - Sentiment scores: {'coin': 'BTC', 'score': 1, 'timestamp_ms': 1718000000}
2025-06-10 14:12:59.895 | DEBUG | __main__:<lambda>:66 - Sentiment scores: {'coin': 'ETH', 'score': '-1', 'timestamp_ms': 1718000000}
2025-06-10 14:13:00.902 | DEBUG | __main__:<lambda>:66 - Sentiment scores: {'coin': 'BTC', 'score': 1, 'timestamp_ms': 1718000000}
Look. The sentiment scores are very dumb, but the whole skeleton of the pipeline is there.
Which means that next week we can start adding some life to it, and start using LLMs to extract sentiment scores.
In the meantime, I will also find a good source of crypto news we can use to test the pipeline.
Talk to you next week :-)
Do you want to learn real world AI engineering?
I am no AI influencer.
I am too old for that.
I am a human-engineer-maths-olympian-turned-ml-engineer-who-along-the-way-met-the-woman-of-his-life-and-was-blessed-with-2-beautiful-boys-at-the-time-when-he-decided-to-go-all-in-and-teach-everything-he-has-learned-in-the-last-10-years-building-stats-ml-things.
Yes. I teach live.
Live coding. You and me.
From idea to system. Step by step.
This is the expertise companies are looking (and paying) for.
This is what I teach in each of my live courses.
They are not easy. But hey, who told you real world ML was easy?
Do you want to learn how to build real time ML systems (like in crypto trading firms?)
Do you want to learn agentic workflows that WORK and learn to deploy and operate them in production Kubernetes clusters?
When you enrol in any of my courses you also get lifetime access to my private Discord community. We are 700. And we have good brains. Come join us an level up.
See you on the other side.