The Set Up

Imagine you are supposed to build a python service using machine learning model(trained offline) to detect if a web request is anomalous or not. The requests are coming at a rate of 1000 per second initially but will gradually increase as your main application reaches more customers. One can assume the requests to be like entries in the web logs and each request can have a potential attack. These requests are streaming in from a kafka consumer and needs to be replied with a prediction to a kafka producer.

ML service with kafka python
ML service with kafka python

We are talking of fast data here!

It goes without saying that we need to have a scalable ML service to match the speed of incoming requests in a digital world we are staying in. Point to note here is we are not dealing with big data but kind of fast data during prediction time. Scaling kafka consumer horizontally by increasing the number of pods in a kubernetes(k8s) cluster is a standard approach one would immediately think of. But we can do much more than that. Also, machine learning team mostly coding in Python “may” need to find a solution specific to python coding environment. If we have more tools and ways to tackle scalability within python, one can also use such techniques for serving predictions for fast data on IoT devices etc with minimal distributed computing available.

Scaling the kafka consumer for a machine learning service in python-Fast Data
Fast Data!

Horizontal scaling is same irrespective of the language used, due to amazing benefits of installing a service in a k8s cluster. One can increase the smaller configuration pods in the k8s cluster and scale the kafka based service easily. However there will be many scenarios when one is limited with the infrastructure available to deploy the kafka based service. For eg. machines where a service is running has more cpus(cores) available, it would make sense to use all of what is available. And also, it doesn’t make sense to spend heavily on multiple machines when kafka based streaming platform is already burning your wallet. Hence this article becomes more interesting only if we limit our discussion to what can be done when coding in python.

Don’t be a bad fish

With the given set up above, we have three major problems to solve.

Consume fast, process fast and produce fast.

If either of the step is slow, the entire service can become a bottleneck to the overall company’s backend system.

For the scale(amount of data) or the rate of requests (fast data) we are talking about, a small function like consume, predict or produce in a tiny ML service can cost the entire company a big customer!

Scaling the kafka consumer for a machine learning service in python-One bad fish
One bad fish can spoil the whole pond

Hopefully we are convinced by now that we need to make all these three functions fast with lowest possible latency. In the interest of the topic chosen here, we will limit our discussion to scale the consuming part and discuss process(predict) and produce in a separate article.

Get to the point

In order to scale the kafka consumer, one must also be aware of certain configurable settings or parameters which if not handled properly, can lead to kafka lags, basically extreme slowness of the consumer. So a brief on what those settings are:

  1. Auto commit configuration allow us to control if a consumer commits in background(asynchronous) or the user has to do it explicitly(synchronous). As a result, auto commit is not enabled, there would be a certain blocking time before polling and processing for another message. Also, pointed out in the previous link, one can do commit in batches with appropriate message.
  2. Kafka consumer is consuming from a topic that can have the data in multiple partitions. One consumer can consume from multiple partitions. But when kafka(or the producer) is distributing the data in multiple partitions, why not make multiple consumers too to consume in parallel from all the partitions. Though, it is useless to have number of consumers more than the number of partitions. Number of consumers should be at most the number of partitions and matching the number of partitions in ideal scenario.
  3. Other important relevant property is consumer group. Just like multiple partitions, there can be multiple consumer groups. One can give it a read to these two articles on how to leverage this property. I’ve not yet exploited multiple groups so not talking much about it here.

We will leverage the kafka properties in order to scale the consumer in python.

Multiple Consumers via multiprocessing

The first way is to have multiple processes running as many as the number of cores. This gives us as many consumers as possible on the same machine allowing the service to consume in parallel. Refer here for below code.

from multiprocessing import Process

from confluent_kafka.avro import AvroConsumer


consumer_config = {‘enable.auto.commit’:True, ‘bootstrap.servers’:’localhost:9092′, ‘group.id’:’group1′ }

# Deliberately kept the config very simple

topic_name = ‘raw_requests’

num_consumers = 4


def process_msg(msg):

  # As per your your application, process the message

  pass


def consume():

  consumer = AvroConsumer(consumer_config)

  consumer.subscribe(topic_name)

  while True:

    avro_msg = consumer.poll(1)

    <Various checks and validation on the msg>

    process_msg(avro_msg)

    #consumer.commit() for synchronous commit


  consumer.close()


def start_consumers(num_consumers):


  for i in range(num_consumers):

    process = Process(target=consume, args=())

    process.start()

  # Take care of restarting the process if it dies


  return

The above code snippet shows in a very simple way how to start as many consumers as the value of num_consumers. This takes the the advantage of multiprocessing module and starts a separate process for every consumer. There are multiple things in the above code which one needs to take care of. So please spend some time on different arguments to the function multiprocessing.Process().

Each consumer is nothing but an individual process in our case. So if we launch two different processes both consuming messages in parallel, effectively we have two consumers consuming from two different partitions of the same kafka topic.

Process messages in async via multi-threading

The purpose of doing this is to let processing become independent of consuming from the kafka topic. In the above code snippet, you can notice the next poll cannot happen until the message processing(call to process_msg()) is done. Consuming is kind of blocked by processing. What is the solution?

Make processing independent of consuming by executing it in a thread. Though multi-threading looks admiring at first glance, but in background threads in python execute in concurrent mode but not in parallel. For us parallel execution is taken care by the multiple processors. To untie or decouple the processing from consuming, we have multiple threads such that

the consumer thread keeps queuing messages and the process thread keeps dequeuing messages for processing. However, both are executing concurrently not in parallel, with the benefit of both independent of each other!

Everything remains same except of processing the message from consumer thread, we spawn a new thread for processing.

import threading

from queue import


Queueq = Queue(100)

def consume():

  consumer = AvroConsumer(consumer_config)

  consumer.subscribe(topic_name)

  while True:

    avro_msg = consumer.poll(1)

    <Various checks and validation on the msg>

    # process_msg(avro_msg)


    q.put(avro_msg)

    t = threading.Thread(process_msg, args=q)

    # In process_msg, you can dequeue to process the message

    #consumer.commit() for synchronous commit


consumer.close()

Again note that I have made extremely simple for explanatory purpose but one has to read on how threads operate in python and catching various exceptions like dead thread and starting a new thread etc. As pointed out in this link that

Kafka consumer is not thread safe. Multi-threaded access must be properly synchronised. It is the responsibility of the user to ensure that multi-threaded access is properly synchronised.

Both the above tasks of having multiple consumers and decoupling processing and consuming demands an extra vigilance on the configuration settings of the kafka consumer. One needs to look into setting like commit interval time (auto.commit.interval.ms), session timeout time (session.timeout.ms), maximum delay between poll (max.poll.interval.ms) etc. Incorrect setting of these parameters can lead to rebalancing of the consumer group and hence slowing down your service as none of the consumers work during rebalancing task!

I will soon be writing on how to add a producer in the same service while keeping everything still working, and how to train and predict faster with a simple ML model like SVM. The effort on those tasks will be comparatively less as once the messages are consumed, it is under the control of your service on how to process and how to produce.

Conclusion

In the end, just remember the following:

At any moment of a service processing the data consumed from a kafka consumer, there are only two main operations taking place — consuming and processing. If either of the operation is executing synchronously, it blocks the other operation. Auto commit configuration in kafka consumer allow us to keep polling the data asynchronously. This benefit, when combined with multi-threading in python, allow us to process asynchronously. In other words, auto commit feature let the polling to continue but if processing is taking time then there is a high chance of kafka lag building up! Therefore we also need to take care of the processing such that it doesn’t block from consuming messages. Python’s multi-threading comes to the rescue.

Not all ML engineering problems are about handling big data, there’s also streaming fast data 😉

Some references that might give more insight :

Source code for kafka.consumer.multiprocess

Kafka basics

Concurrency and parallel execution

Multi-threading in python

https://www.confluent.io/blog/kafka-consumer-multi-threaded-messaging

Recommended reads.