GOlang 生产消费kafa

docker搭建kafka

version: '3'

services:
  kafka:
    image: apache/kafka:3.7.0
    container_name: kafka
    ports:
      - "9092:9092"    # 外部访问端口
      - "9093:9093"    # Controller端口
      - "9094:9094"
    environment:
      KAFKA_ENABLE_KRAFT: "yes"
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: "broker,controller"
      KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka:9093"

      # 关键修改开始
      KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"  # 新增配置
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
      KAFKA_LISTENERS: "INTERNAL://:9092,CONTROLLER://:9093,EXTERNAL://:9094"
      KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka:9092,EXTERNAL://localhost:9094"
      KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
      # 关键修改结束

      KAFKA_GROUP_COORDINATOR_REBALANCE_PROTOCOLS: "classic"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: "1"
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: "1"

    volumes:
      - ./kafka-data:/tmp/kraft-combined-logs
    networks:
      - kafka-net
  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    container_name: kafka-ui
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092  # 使用内部地址
    networks:
      - kafka-net
    depends_on:
      - kafka

networks:
  kafka-net:
    driver: bridge

volumes:
  kafka-data:

启动docker-compose

docker-compose up -d

编写生产者和消费者代码

package kafka

import (
    "context"
    "fmt"
    "github.com/IBM/sarama"
    "log"
    "os"
    "os/signal"
    "testing"
    "time"
)

func TestProducer(t *testing.T) {
    // 配置生产者
    config := sarama.NewConfig()
    config.Producer.Return.Successes = true
    config.Producer.RequiredAcks = sarama.WaitForAll
    config.Producer.Retry.Max = 5

    // 连接 Kafka 集群
    brokers := []string{"localhost:9094"}
    producer, err := sarama.NewSyncProducer(brokers, config)
    if err != nil {
        log.Fatalf("Error creating producer: %v", err)
    }
    defer producer.Close()

    // 处理中断信号
    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)

    // 生产消息
    topic := "test-topic"
    msgCount := 0
    for {
        select {
        case <-signals:
            fmt.Println("Interrupt received, shutting down")
            return
        default:
            message := &sarama.ProducerMessage{
                Topic: topic,
                Value: sarama.StringEncoder(fmt.Sprintf("Message %d", msgCount)),
            }

            // 发送消息
            partition, offset, err := producer.SendMessage(message)
            if err != nil {
                log.Printf("Failed to send message: %v", err)
            } else {
                log.Printf("Message sent to partition %d at offset %d", partition, offset)
            }

            msgCount++
            time.Sleep(2 * time.Second)
        }
    }

}

func TestConsumer(t *testing.T) {
    config := sarama.NewConfig()
    config.Version = sarama.V3_7_0_0 // 匹配Kafka版本
    config.Consumer.Offsets.Initial = sarama.OffsetOldest
    config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{
        sarama.BalanceStrategyRange,
    }

    groupID := "test-group"
    consumerGroup, err := sarama.NewConsumerGroup([]string{"localhost:9094"}, groupID, config)
    if err != nil {
        log.Fatalf("Error creating consumer group: %v", err)
    }
    defer consumerGroup.Close()

    handler := consumerHandler{}
    ctx := context.Background()

    for {
        err := consumerGroup.Consume(ctx, []string{"test-topic"}, &handler)
        if err != nil {
            log.Printf("Consume error: %v", err)
            time.Sleep(5 * time.Second)
        }
    }
}

// 消费者处理器
type consumerHandler struct{}

func (h *consumerHandler) Setup(sarama.ConsumerGroupSession) error   { return nil }
func (h *consumerHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil }
func (h *consumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    for msg := range claim.Messages() {
        fmt.Printf("Received message: Topic(%s) Partition(%d) Offset(%d) Key(%s) Value(%s)\n",
            msg.Topic, msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
        session.MarkMessage(msg, "")
    }
    return nil
}

results matching ""

    No results matching ""