GOlang 生产消费kafa
docker搭建kafka
version: '3'
services:
kafka:
image: apache/kafka:3.7.0
container_name: kafka
ports:
- "9092:9092"
- "9093:9093"
- "9094:9094"
environment:
KAFKA_ENABLE_KRAFT: "yes"
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: "broker,controller"
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka:9093"
KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
KAFKA_LISTENERS: "INTERNAL://:9092,CONTROLLER://:9093,EXTERNAL://:9094"
KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka:9092,EXTERNAL://localhost:9094"
KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_GROUP_COORDINATOR_REBALANCE_PROTOCOLS: "classic"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: "1"
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: "1"
volumes:
- ./kafka-data:/tmp/kraft-combined-logs
networks:
- kafka-net
kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: kafka-ui
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
networks:
- kafka-net
depends_on:
- kafka
networks:
kafka-net:
driver: bridge
volumes:
kafka-data:
启动docker-compose
docker-compose up -d
编写生产者和消费者代码
package kafka
import (
"context"
"fmt"
"github.com/IBM/sarama"
"log"
"os"
"os/signal"
"testing"
"time"
)
func TestProducer(t *testing.T) {
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Retry.Max = 5
brokers := []string{"localhost:9094"}
producer, err := sarama.NewSyncProducer(brokers, config)
if err != nil {
log.Fatalf("Error creating producer: %v", err)
}
defer producer.Close()
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
topic := "test-topic"
msgCount := 0
for {
select {
case <-signals:
fmt.Println("Interrupt received, shutting down")
return
default:
message := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder(fmt.Sprintf("Message %d", msgCount)),
}
partition, offset, err := producer.SendMessage(message)
if err != nil {
log.Printf("Failed to send message: %v", err)
} else {
log.Printf("Message sent to partition %d at offset %d", partition, offset)
}
msgCount++
time.Sleep(2 * time.Second)
}
}
}
func TestConsumer(t *testing.T) {
config := sarama.NewConfig()
config.Version = sarama.V3_7_0_0
config.Consumer.Offsets.Initial = sarama.OffsetOldest
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{
sarama.BalanceStrategyRange,
}
groupID := "test-group"
consumerGroup, err := sarama.NewConsumerGroup([]string{"localhost:9094"}, groupID, config)
if err != nil {
log.Fatalf("Error creating consumer group: %v", err)
}
defer consumerGroup.Close()
handler := consumerHandler{}
ctx := context.Background()
for {
err := consumerGroup.Consume(ctx, []string{"test-topic"}, &handler)
if err != nil {
log.Printf("Consume error: %v", err)
time.Sleep(5 * time.Second)
}
}
}
type consumerHandler struct{}
func (h *consumerHandler) Setup(sarama.ConsumerGroupSession) error { return nil }
func (h *consumerHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil }
func (h *consumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
fmt.Printf("Received message: Topic(%s) Partition(%d) Offset(%d) Key(%s) Value(%s)\n",
msg.Topic, msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
session.MarkMessage(msg, "")
}
return nil
}