Go

Learn how to connect to Kafka in Sealos DevBox using Go

This guide will walk you through the process of connecting to Kafka using Go within your Sealos DevBox project.

Prerequisites

Install Required Packages

In your Cursor terminal, install the necessary packages:

go get github.com/joho/godotenv
go get github.com/confluentinc/confluent-kafka-go/v2/kafka

This command installs:

  • github.com/confluentinc/confluent-kafka-go/v2/kafka: The Confluent Kafka client for Go

System Dependencies

The confluent-kafka-go package requires librdkafka as a system dependency. In Sealos DevBox, you might need to install it manually. Run the following commands in your Cursor terminal:

sudo apt-get update
sudo apt-get install -y gcc libc6-dev librdkafka-dev

Connection Setup

Set up the environment variables

First, let's set up the environment variables for our Kafka connection. Create a .env file in your project root with the following content:

.env
KAFKA_BROKER=your_kafka_host:9092
KAFKA_GROUP_ID=group-id
KAFKA_TOPIC=topic-name

Replace the placeholders with your actual Kafka broker address, group ID, and topic name from the Database app in Sealos.

Create the main.go file

Create a new file named main.go with the following content:

main.go
package main
 
import (
	"fmt"
	"log"
	"os"
 
	"github.com/confluentinc/confluent-kafka-go/v2/kafka"
	"github.com/joho/godotenv"
)
 
var (
	broker  string
	groupId string
	topic   string
)
 
func loadEnv() error {
	// Load environment variables from .env file
	err := godotenv.Load()
	if err != nil {
		log.Fatal("Error loading .env file")
	}
 
	broker = os.Getenv("KAFKA_BROKER")
	groupId = os.Getenv("KAFKA_GROUP_ID")
	topic = os.Getenv("KAFKA_TOPIC")
 
	return nil
}
 
func startProducer() {
	p, err := kafka.NewProducer(&kafka.ConfigMap{
		"bootstrap.servers":        broker,
		"allow.auto.create.topics": true,
	})
	if err != nil {
		panic(err)
	}
 
	go func() {
		for e := range p.Events() {
			switch ev := e.(type) {
			case *kafka.Message:
				if ev.TopicPartition.Error != nil {
					fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
				} else {
					fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
				}
			}
		}
	}()
 
	for _, word := range []string{"message 1", "message 2", "message 3"} {
		p.Produce(&kafka.Message{
			TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
			Value:          []byte(word),
		}, nil)
	}
}
 
func startConsumer() {
	c, err := kafka.NewConsumer(&kafka.ConfigMap{
		"bootstrap.servers": broker,
		"group.id":          groupId,
		"auto.offset.reset": "earliest",
	})
 
	if err != nil {
		panic(err)
	}
	c.Subscribe(topic, nil)
 
	for {
		msg, err := c.ReadMessage(-1)
		if err == nil {
			fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
		} else {
			fmt.Printf("Consumer error: %v (%v)\n", err, msg)
			break
		}
	}
 
	c.Close()
}
 
func main() {
	if err := loadEnv(); err != nil {
		fmt.Println(err)
		return
	}
 
	startProducer()
	startConsumer()
}

Let's break down the main components of this code:

  1. Imports and Variables: We import the necessary packages and define global variables for the broker address, group ID, and topic name.

  2. startProducer function:

    • Creates a new Kafka producer
    • Uses a goroutine to handle delivery reports
    • Produces sample messages to the specified topic
  3. startConsumer function:

    • Creates a new Kafka consumer
    • Subscribes to the specified topic
    • Continuously reads messages from the topic and prints them
  4. Main function: Calls both startProducer() and startConsumer() to demonstrate producing and consuming messages.

Usage

To run the application, use the following command in your Cursor terminal:

go run main.go

This will execute the main function, demonstrating both producing and consuming messages with Kafka.

Best Practices

  1. In a real-world scenario, separate the producer and consumer into different applications or services.
  2. Use environment variables for Kafka configuration instead of hardcoding values.
  3. Implement proper error handling and logging.
  4. Implement graceful shutdown to properly close Kafka connections.

Troubleshooting

If you encounter connection issues:

  1. Verify your Kafka broker address in the broker variable.
  2. Ensure your Kafka cluster is running and accessible.
  3. Check for any network restrictions in your DevBox environment.
  4. Confirm that the required packages and system dependencies are correctly installed.
  5. If you encounter cgo related errors, make sure you have the necessary build tools installed (sudo apt-get install build-essential).

For more detailed information on using Kafka with Go, refer to the confluent-kafka-go documentation.

Edit on GitHub

Last updated on

On this page