Python

Learn how to connect to Kafka in Sealos DevBox using Python

This guide will walk you through the process of connecting to Kafka using Python within your Sealos DevBox project.

Prerequisites

Activating the Python Environment

Before you start, you need to activate the Python virtual environment in your DevBox. Open the terminal within Cursor IDE and run:

source ./bin/activate

You should see your prompt change, indicating that the virtual environment is now active.

Installing Required Packages

In your Cursor terminal, install the necessary packages:

pip install kafka-python python-dotenv

This command installs:

  • kafka-python: The Apache Kafka client for Python
  • python-dotenv: A Python package that allows you to load environment variables from a .env file

Connection Setup

Set up the environment variables

First, let's set up the environment variables for our Kafka connection. Create a .env file in your project root with the following content:

.env
KAFKA_BOOTSTRAP_SERVERS=your_kafka_bootstrap_servers:9092
KAFKA_TOPIC=your_topic_name
KAFKA_CONSUMER_GROUP=your_consumer_group_id

Replace the placeholders with your actual Kafka credentials from the Database app in Sealos.

Create a Kafka client module

Create a new file named kafka_client.py with the following content:

kafka_client.py
import os
from dotenv import load_dotenv
from kafka import KafkaProducer, KafkaConsumer
 
# Load environment variables
load_dotenv()
 
def get_kafka_producer():
    try:
        producer = KafkaProducer(bootstrap_servers=os.getenv('KAFKA_BOOTSTRAP_SERVERS'))
        print("Successfully connected to Kafka producer")
        return producer
    except Exception as e:
        print(f"Error connecting to Kafka producer: {e}")
        return None
 
def get_kafka_consumer(topic, group_id=None):
    try:
        consumer = KafkaConsumer(
            topic,
            bootstrap_servers=os.getenv('KAFKA_BOOTSTRAP_SERVERS'),
            auto_offset_reset='earliest',
            enable_auto_commit=True,
            group_id=group_id or 'my-default-group'
        )
        print(f"Successfully connected to Kafka consumer for topic: {topic}")
        return consumer
    except Exception as e:
        print(f"Error connecting to Kafka consumer: {e}")
        return None

This module provides two main functions:

  1. get_kafka_producer(): This function creates a Kafka producer using the bootstrap servers specified in the environment variables.
  2. get_kafka_consumer(topic): This function creates a Kafka consumer for a specified topic.

Create a test script

Now, let's create a test script to verify our connection and perform some basic Kafka operations. Create a file named test_kafka.py with the following content:

test_kafka.py
import os
from dotenv import load_dotenv
from kafka_client import get_kafka_producer, get_kafka_consumer
 
# Load environment variables
load_dotenv()
 
def test_kafka_producer():
    producer = get_kafka_producer()
    if producer:
        topic = os.getenv('KAFKA_TOPIC')
        message = "Hello from Sealos DevBox!"
        producer.send(topic, message.encode('utf-8'))
        producer.flush()
        print(f"Message sent to topic {topic}: {message}")
        producer.close()
 
def test_kafka_consumer():
    topic = os.getenv('KAFKA_TOPIC')
    group_id = os.getenv('KAFKA_CONSUMER_GROUP')
    consumer = get_kafka_consumer(topic, group_id)
    if consumer:
        print(f"Waiting for messages on topic {topic}...")
        for message in consumer:
            print(f"Received message: {message.value.decode('utf-8')}")
            break  # Exit after receiving one message
        consumer.close()
 
if __name__ == "__main__":
    test_kafka_producer()
    test_kafka_consumer()

This script demonstrates how to:

  1. Create a Kafka producer and send a message to a topic.
  2. Create a Kafka consumer and read a message from a topic.

Running the Test Script

To run the test script, make sure your virtual environment is activated, then execute:

python test_kafka.py

If everything is set up correctly, you should see output indicating successful connection to Kafka, message sending, and message receiving.

Best Practices

  1. Always activate the virtual environment before running your Python scripts or installing packages.
  2. Use environment variables to store sensitive information like Kafka bootstrap servers.
  3. Handle exceptions appropriately to manage potential errors.
  4. Consider using asynchronous Kafka clients for better performance in production environments.
  5. Implement proper logging instead of print statements in production code.

Troubleshooting

If you encounter connection issues:

  1. Ensure you've activated the virtual environment with source ./bin/activate.
  2. Verify that your Kafka cluster is running and accessible.
  3. Double-check your Kafka credentials in the .env file.
  4. Check the Kafka logs in the Database app for any error messages.
  5. Make sure your DevBox environment has network access to the Kafka bootstrap servers.

For more detailed information on using Kafka with Python, refer to the kafka-python documentation.

Edit on GitHub

Last updated on

On this page