Node.js

Learn how to connect to Kafka in Sealos DevBox using Node.js

This guide will walk you through the process of connecting to Kafka using Node.js within your Sealos DevBox project.

Prerequisites

Install Required Packages

In your Cursor terminal, install the necessary packages:

npm install kafkajs dotenv

This command installs:

  • kafkajs: A modern Apache Kafka client for Node.js
  • dotenv: A zero-dependency module that loads environment variables from a .env file

Connection Setup

Set up the environment variables

First, let's set up the environment variables for our Kafka connection. Create a .env file in your project root with the following content:

.env
KAFKA_BROKERS=your_kafka_host:9092
KAFKA_CLIENT_ID=my-app
KAFKA_TOPIC=my-topic

Replace the placeholders with your actual Kafka broker address, client ID, and topic name from the Database app in Sealos.

Create a Kafka client

Create a new file named kafkaClient.js with the following content:

const { Kafka } = require('kafkajs');
require('dotenv').config();
 
const kafka = new Kafka({
  clientId: process.env.KAFKA_CLIENT_ID,
  brokers: process.env.KAFKA_BROKERS.split(','),
});
 
const producer = kafka.producer();
const consumer = kafka.consumer({ groupId: 'test-group' });
 
module.exports = { kafka, producer, consumer };

This file creates a Kafka client and exports it along with a producer and consumer.

Create a producer script

Now, let's create a file named producer.js to demonstrate how to produce messages:

const { kafka, producer } = require('./kafkaClient');
require('dotenv').config();
 
async function produceMessage() {
  try {
    await producer.connect();
    
    // Check if the topic exists
    const admin = kafka.admin();
    await admin.connect();
    const topics = await admin.listTopics();
    
    if (!topics.includes(process.env.KAFKA_TOPIC)) {
      console.log(`Topic ${process.env.KAFKA_TOPIC} does not exist. Creating it...`);
      await admin.createTopics({
        topics: [{ topic: process.env.KAFKA_TOPIC, numPartitions: 1, replicationFactor: 1 }]
      });
      console.log(`Topic ${process.env.KAFKA_TOPIC} created successfully.`);
    }
    
    await admin.disconnect();
 
    // Send the message
    const result = await producer.send({
      topic: process.env.KAFKA_TOPIC,
      messages: [
        { value: 'Hello from Sealos DevBox!' },
      ],
    });
    console.log('Message sent successfully', result);
  } catch (error) {
    console.error('Error producing message:', error);
    if (error.name === 'KafkaJSNumberOfRetriesExceeded') {
      console.error('Connection details:', {
        clientId: process.env.KAFKA_CLIENT_ID,
        brokers: process.env.KAFKA_BROKERS,
      });
    }
  } finally {
    await producer.disconnect();
  }
}
 
produceMessage();

This script does the following:

  1. Connects to Kafka using the producer.
  2. Checks if the specified topic exists, creating it if necessary.
  3. Sends a message to the topic.
  4. Handles errors, including connection issues.
  5. Disconnects from Kafka after the operation.

This approach ensures the topic exists before sending messages and provides detailed error information if the connection fails.

Create a consumer script

Create another file named consumer.js to demonstrate how to consume messages:

const { consumer } = require('./kafkaClient');
require('dotenv').config();
 
async function consumeMessages() {
  try {
    await consumer.connect();
    await consumer.subscribe({ topic: process.env.KAFKA_TOPIC, fromBeginning: true });
 
    await consumer.run({
      eachMessage: async ({ topic, partition, message }) => {
        console.log({
          topic,
          partition,
          offset: message.offset,
          value: message.value.toString(),
        });
      },
    });
  } catch (error) {
    console.error('Error consuming messages:', error);
  }
}
 
consumeMessages();

This consumer script:

  1. Connects to Kafka using the consumer instance.
  2. Subscribes to the specified topic, starting from the beginning of the log.
  3. Continuously runs and processes each incoming message.
  4. Logs the topic, partition, offset, and message value for each received message.
  5. Handles any errors that occur during the consumption process.

This setup allows for real-time processing of messages as they arrive in the Kafka topic.

Usage

To run the producer script, use the following command in your Cursor terminal:

node producer.js

To run the consumer script, open another terminal and use:

node consumer.js

The consumer will start listening for messages. When you run the producer script, you should see the message being received by the consumer.

Best Practices

  1. Use environment variables for Kafka configuration details.
  2. Implement proper error handling and logging.
  3. Use the kafkajs built-in retry mechanism for better reliability.
  4. Consider implementing a graceful shutdown mechanism for your consumer.
  5. Use compression for better performance when dealing with large messages or high throughput.

Troubleshooting

If you encounter connection issues:

  1. Verify your Kafka broker address in the .env file.
  2. Ensure your Kafka cluster is running and accessible.
  3. Check for any network restrictions in your DevBox environment.
  4. Confirm that the required packages are correctly installed.

For more detailed information on using Kafka with Node.js, refer to the KafkaJS documentation.

Edit on GitHub

Last updated on

On this page