我对 Kafka 很陌生,在向生产者推送价值时收到此消息func Produce(topic string, key string, message interface{}) { headers := map[string][]byte{ MSG_HEADER_KEY_CORRELATIONID: []byte("1234"), MSG_HEADER_KEY_REQUESTID: []byte(uuid.NewString()), MSG_HEADER_KEY_TESTID: []byte("456"), MSG_HEADER_KEY_MESSAGETYPE: []byte("TestLookupRequest"), } kheaders := make([]kafka.Header, 0, len(headers)) for k, v := range headers { kheaders = append(kheaders, kafka.Header{Key: k, Value: v}) } var err error servers := "XXXXXX" protocol := "SASL_SSL" mechanisms := "PLAIN" username := "XXXXXXX" password := "XXXXXXX" Producer, err = kafka.NewProducer(&kafka.ConfigMap{ "bootstrap.servers": servers, "security.protocol": protocol, "sasl.username": username, "sasl.password": password, "sasl.mechanism": mechanisms, }) if err != nil { panic(err) } defer Producer.Close() value, _ := json.Marshal(message) err = Producer.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Key: []byte("12345"), Headers: kheaders, Value: value, Timestamp: time.Now().UTC(), TimestampType: kafka.TimestampCreateTime, }, nil) if err != nil { panic(err) } Producer.Flush(30)}%4|1641074998.615|终止|rdkafka#producer-1| [thrd:app]:生产者以 1 条消息(881 字节)仍在队列或传输中终止:使用 flush() 等待未完成的消息传递有关如何解决此问题的任何帮助?
1 回答

沧海一幻觉
TA贡献1824条经验 获得超5个赞
请尝试更长的超时时间Flush()
;30 毫秒可能还不够。或者尝试使用本例中的通道: https ://github.com/confluentinc/confluent-kafka-go/blob/80c58f81b6cc32d3ed046609bf660a41a061b23d/examples/producer_example/producer_example.go
- 1 回答
- 0 关注
- 294 浏览
添加回答
举报
0/150
提交
取消