Rust

Learn how to connect to Kafka in Sealos DevBox using Rust

This guide will walk you through the process of connecting to Kafka using Rust within your Sealos DevBox project.

Prerequisites

Install Required Dependencies

In your Cursor terminal, add the necessary dependencies to your Cargo.toml file:

[dependencies]
rdkafka = "0.28"
tokio = { version = "1.28", features = ["full"] }
dotenv = "0.15"

These dependencies include:

  • rdkafka: A high-level Apache Kafka client library for Rust
  • tokio: An asynchronous runtime for Rust
  • dotenv: A library for loading environment variables from a file

Connection Setup

Set up the environment variables

First, let's set up the environment variables for our Kafka connection. Create a .env file in your project root with the following content:

.env
KAFKA_BROKERS=your_kafka_bootstrap_servers:9092
KAFKA_TOPIC=your_topic_name
KAFKA_GROUP_ID=rust-consumer-group

Replace the placeholders with your actual Kafka credentials from the Database app in Sealos.

Create the main.rs file

Create a new file named src/main.rs with the following content:

use rdkafka::config::ClientConfig;
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::consumer::{StreamConsumer, Consumer};
use rdkafka::message::Message;
use std::time::Duration;
use dotenv::dotenv;
use std::env;
 
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    dotenv().ok();
 
    let brokers = env::var("KAFKA_BROKERS").expect("KAFKA_BROKERS must be set");
    let topic = env::var("KAFKA_TOPIC").expect("KAFKA_TOPIC must be set");
    let group_id = env::var("KAFKA_GROUP_ID").expect("KAFKA_GROUP_ID must be set");
 
    // Producer setup
    let producer: FutureProducer = ClientConfig::new()
        .set("group.id", &group_id)
        .set("bootstrap.servers", &brokers)
        .set("message.timeout.ms", "5000")
        .create()?;
 
    // Produce a message
    let delivery_status = producer
        .send(
            FutureRecord::to(&topic)
                .payload("Hello from Sealos DevBox!")
                .key("key"),
            Duration::from_secs(0),
        )
        .await;
 
    println!("Delivery status: {:?}", delivery_status);
 
    // Add a delay to ensure the message is processed
    tokio::time::sleep(Duration::from_secs(1)).await;
 
    // Consumer setup
    let consumer: StreamConsumer = ClientConfig::new()
        .set("group.id", "rust-consumer-group")
        .set("bootstrap.servers", &brokers)
        .set("enable.partition.eof", "false")
        .set("session.timeout.ms", "6000")
        .set("enable.auto.commit", "true")
        .set("auto.offset.reset", "earliest")  // Add this line
        .create()?;
 
    consumer.subscribe(&[&topic])?;
 
    // Consume messages
    println!("Waiting for messages...");
    let mut message_count = 0;
    let max_messages = 5;  // Set the maximum number of messages to receive
    
    while message_count < max_messages {
        match tokio::time::timeout(Duration::from_secs(5), consumer.recv()).await {
            Ok(Ok(msg)) => {
                println!("Received message: {:?}", msg.payload_view::<str>());
                message_count += 1;
            }
            Ok(Err(e)) => println!("Error while receiving message: {:?}", e),
            Err(_) => {
                println!("No more messages received after {} seconds. Exiting.", 5);
                break;
            }
        }
    }
    
    println!("Received {} messages in total.", message_count);
 
    Ok(())
}

This script demonstrates how to:

  1. Set up a Kafka producer and send a message to a topic.
  2. Set up a Kafka consumer and read messages from a topic.

Usage

To run the application, use the following command in your Cursor terminal:

cargo run

This will compile and execute the main function, demonstrating the connection to Kafka, message production, and consumption.

Best Practices

  1. Use environment variables for Kafka configuration details.
  2. Implement proper error handling using Rust's Result type.
  3. Use asynchronous programming with Tokio for better performance.
  4. Consider implementing more robust consumer logic for production use, including proper error handling and graceful shutdown.

Troubleshooting

If you encounter connection issues:

  1. Verify your Kafka broker addresses in the .env file.
  2. Ensure your Kafka cluster is running and accessible from your DevBox environment.
  3. Check for any network restrictions in your DevBox environment.
  4. Confirm that all required dependencies are correctly specified in your Cargo.toml file.

For more detailed information on using Kafka with Rust, refer to the rdkafka documentation.

Edit on GitHub

Last updated on

On this page