为了账号安全,请及时绑定邮箱和手机立即绑定

Sarama Kafka 消费者组函数返回

Sarama Kafka 消费者组函数返回

Go
呼唤远方 2022-04-26 15:24:29
我对 Go Lang 非常陌生,并试图对使用 Sarama 库从 Kafka 消费消息的开源库进行一些调整。原始代码可以在这里找到。原始包实现了一个 PartitionConsumer,如果不需要跨多个消费者使用同一主题的读取一致性,它就可以正常工作,但是,这对我不起作用。我在同一个应用程序中做了一些工作,使用我在网上找到的一些示例来实现 sarama NewConsumerGroup 包。KafkaConfig 为消费者携带 groupID 和 Topic。当我运行这个程序时,消费者启动并使用正确的组从正确的主题中读取,并使用在此函数中创建的 ConsumerClaim 将其打印到 STDOUT:func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {    for message := range claim.Messages() {        log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)        session.MarkMessage(message, "")    }    return nil}然而,我认为我需要的是NewKafkaInput函数返回*KafkaInput添加到结构的声明中的消息(如果我在这里使用了错误的术语,请原谅我,这是我的第一个 Go 牛仔竞技表演)。... i := &KafkaInput{        config: config,        // consumers: make([]sarama.PartitionConsumer, len(partitions)),        // messages:  make(chan *sarama.ConsumerMessage, 256),        messages: make(chan *sarama.ConsumerMessage, 256),    }    return i}在此处完成的原始示例中:func NewKafkaInput(address string, config *KafkaConfig) *KafkaInput {    ...    go func(consumer sarama.PartitionConsumer) {                defer consumer.Close()                for message := range consumer.Messages() {                    i.messages <- message                }            }(consumer)    ...}我花了好几天的时间来研究将函数移入和移出NewKafakInput函数,尝试将消息添加到KafakInput函数外部的结构以及介于两者之间的所有内容。我只是无法让它工作。该NewKafakInput函数需要返回*KafkaInput任何消息,以便此函数可以完成:func (i *KafkaInput) Read(data []byte) (int, error) {    message := <-i.messages    if !i.config.useJSON {        copy(data, message.Value)        return len(message.Value), nil    }    var kafkaMessage KafkaMessage    json.Unmarshal(message.Value, &kafkaMessage)    buf, err := kafkaMessage.Dump()    if err != nil {        log.Println("Failed to decode access log entry:", err)        return 0, err    }    copy(data, buf)    return len(buf), nil}完全有可能我也把这件事弄得一团糟,但感谢任何帮助和输入。
查看完整描述

1 回答

?
素胚勾勒不出你

TA贡献1827条经验 获得超9个赞

这是我的问题的解决方案。我有 goroutines 阻塞了主要功能,他们需要被打破。如果下面的代码没有任何意义,这里是我正在修改的程序的链接:https ://github.com/buger/goreplay 。如果我能得到所有者的回复,我计划清理代码并提交拉取请求,或者可能发布一个分叉。


package main


import (

    "context"

    "encoding/json"

    "strings"


    "os"


    "log"


    "github.com/Shopify/sarama"

)


// KafkaInput is used for recieving Kafka messages and

// transforming them into HTTP payloads.

type KafkaInput struct {

    sarama.ConsumerGroup

    config   *KafkaConfig

    consumer Consumer

    messages chan *sarama.ConsumerMessage

}


// Consumer represents a Sarama consumer group consumer

type Consumer struct {

    ready    chan bool

    messages chan *sarama.ConsumerMessage

}


var (

    brokers  = ""

    version  = ""

    group    = ""

    topics   = ""

    assignor = ""

    oldest   = true

    verbose  = false

)


// NewKafkaInput creates instance of kafka consumer client.

func NewKafkaInput(address string, config *KafkaConfig) *KafkaInput {

    /**

     * Construct a new Sarama configuration.

     * The Kafka cluster version has to be defined before the consumer/producer is initialized.

     */

    c := sarama.NewConfig()

    // Configuration options go here


    log.Printf("KafkaConfig: %s", config.host)

    log.Printf("KafkaConfig: %s", config.group)

    log.Printf("KafkaConfig: %s", config.topic)


    log.Println("Starting a new Sarama consumer")


    if verbose {

        sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)

    }


    version, err := sarama.ParseKafkaVersion("2.1.1")

    if err != nil {

        log.Panicf("Error parsing Kafka version: %v", err)

    }


    c.Version = version


    if oldest {

        c.Consumer.Offsets.Initial = sarama.OffsetOldest

    }


    group, err := sarama.NewConsumerGroup(strings.Split(config.host, ","), config.group, c)


    /**

     * Setup a new Sarama consumer group

     */

    consumer := Consumer{

        ready:    make(chan bool),

        messages: make(chan *sarama.ConsumerMessage, 256),

    }


    i := &KafkaInput{

        ConsumerGroup: group,

        config:        config,

        messages:      make(chan *sarama.ConsumerMessage, 256),

        consumer:      consumer,

    }


    go i.loop([]string{config.topic})

    i.messages = consumer.messages

    return i

}


//ConsumeClaim and stuff

func (i *KafkaInput) ConsumeClaim(s sarama.ConsumerGroupSession, c sarama.ConsumerGroupClaim) error {

    for msg := range c.Messages() {

        s.MarkMessage(msg, "")

        i.Push(msg)

    }

    return nil

}


func (i *KafkaInput) loop(topic []string) {

    ctx := context.Background()

    for {

        if err := i.Consume(ctx, []string{i.config.topic}, i); err != nil {

            return

        }

    }

}


// Push Messages

func (i *KafkaInput) Push(m *sarama.ConsumerMessage) {

    if i.consumer.messages != nil {

        log.Printf("MSGPUSH: %s", m)

        i.consumer.messages <- m

    }

}


func (i *KafkaInput) Read(data []byte) (int, error) {


    message := <-i.messages

    log.Printf("Msg: %s", string(message.Value))

    if !i.config.useJSON {

        copy(data, message.Value)

        return len(message.Value), nil

    }


    var kafkaMessage KafkaMessage

    json.Unmarshal(message.Value, &kafkaMessage)


    buf, err := kafkaMessage.Dump()

    if err != nil {

        log.Println("Failed to decode access log entry:", err)

        return 0, err

    }


    copy(data, buf)


    return len(buf), nil


}


func (i *KafkaInput) String() string {

    return "Kafka Input: " + i.config.host + "/" + i.config.topic

}


// Setup is run at the beginning of a new session, before ConsumeClaim

func (i *KafkaInput) Setup(s sarama.ConsumerGroupSession) error {

    return nil

}


// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (i *KafkaInput) Cleanup(s sarama.ConsumerGroupSession) error {

    return nil

}


查看完整回答
反对 回复 2022-04-26
  • 1 回答
  • 0 关注
  • 653 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号