Java

Learn how to connect to Kafka in Sealos DevBox using Java

This guide will walk you through the process of connecting to Kafka using Java within your Sealos DevBox project.

Prerequisites

Project Setup

Create a new Maven project

In your Sealos DevBox terminal, initialize a new Maven project:

mvn archetype:generate -DgroupId=com.example -DartifactId=kafka-java-example -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false
mv kafka-java-example/* .
rm -rf kafka-java-example
rm -rf test

Update pom.xml

Replace the content of your pom.xml file with the following:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
 
    <groupId>com.example</groupId>
    <artifactId>kafka-java-example</artifactId>
    <version>1.0-SNAPSHOT</version>
 
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
    </properties>
 
    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.4.0</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>2.0.5</version>
        </dependency>
    </dependencies>
 
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>11</source>
                    <target>11</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.4</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.example.App</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

This pom.xml file includes the necessary dependencies (Kafka client and SLF4J for logging) and configures the Maven Shade plugin to create an executable JAR.

Create a configuration file

Create a file named kafka.properties in the src/main/resources directory:

bootstrap.servers=your_kafka_bootstrap_servers:9092
topic=your_topic_name
group.id=your_consumer_group_id

Replace the placeholders with your actual Kafka credentials from the Database app in Sealos.

Create Java classes

Create the following Java classes in the src/main/java/com/example directory:

  1. KafkaProducerExample.java:
package com.example;
 
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
 
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;
 
public class KafkaProducerExample {
    public static void main(String[] args) {
        Properties props = loadConfig();
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
 
        Producer<String, String> producer = new KafkaProducer<>(props);
 
        String topic = props.getProperty("topic");
        String message = "Hello from Sealos DevBox!";
 
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
 
        producer.send(record, (metadata, exception) -> {
            if (exception == null) {
                System.out.println("Message sent successfully. Topic: " + metadata.topic() +
                        ", Partition: " + metadata.partition() +
                        ", Offset: " + metadata.offset());
            } else {
                System.err.println("Error sending message: " + exception.getMessage());
            }
        });
 
        producer.flush();
        producer.close();
    }
 
    private static Properties loadConfig() {
        Properties props = new Properties();
        try (FileInputStream fis = new FileInputStream("src/main/resources/kafka.properties")) {
            props.load(fis);
        } catch (IOException e) {
            throw new RuntimeException("Error loading Kafka configuration", e);
        }
        return props;
    }
}

This class demonstrates how to create a Kafka producer, send a message, and handle the result asynchronously.

  1. KafkaConsumerExample.java:
package com.example;
 
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
 
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
 
public class KafkaConsumerExample {
    public static void main(String[] args) {
        Properties props = loadConfig();
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
        Consumer<String, String> consumer = new KafkaConsumer<>(props);
 
        String topic = props.getProperty("topic");
        consumer.subscribe(Collections.singletonList(topic));
 
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("Received message: " + record.value() +
                            " from topic: " + record.topic() +
                            ", partition: " + record.partition() +
                            ", offset: " + record.offset());
                }
            }
        } finally {
            consumer.close();
        }
    }
 
    private static Properties loadConfig() {
        Properties props = new Properties();
        try (FileInputStream fis = new FileInputStream("src/main/resources/kafka.properties")) {
            props.load(fis);
        } catch (IOException e) {
            throw new RuntimeException("Error loading Kafka configuration", e);
        }
        return props;
    }
}

This class shows how to create a Kafka consumer, subscribe to a topic, and continuously poll for new messages.

Both classes use a loadConfig() method to read the Kafka properties from the kafka.properties file, allowing for easy configuration changes without modifying the code.

Build and Run

To build and run the project, use the following commands in your terminal:

mvn clean package
java -cp target/kafka-java-example-1.0-SNAPSHOT.jar com.example.KafkaProducerExample
java -cp target/kafka-java-example-1.0-SNAPSHOT.jar com.example.KafkaConsumerExample

Run the producer and consumer in separate terminal windows to see the message being sent and received.

Best Practices

  1. Use a properties file to store Kafka configuration details.
  2. Implement proper error handling and logging.
  3. Use the try-with-resources statement to ensure that Kafka producers and consumers are properly closed.
  4. Consider using Kafka's AdminClient for managing topics and other Kafka resources.
  5. Implement proper serialization and deserialization for your message keys and values.

Troubleshooting

If you encounter connection issues:

  1. Verify your Kafka credentials in the kafka.properties file.
  2. Ensure your Kafka cluster is running and accessible from your DevBox environment.
  3. Check for any network restrictions in your DevBox environment.
  4. Confirm that all required dependencies are correctly specified in your pom.xml file.

For more detailed information on using Kafka with Java, refer to the Apache Kafka documentation.

Edit on GitHub

Last updated on

On this page