go-queue 之 kq 的生产者不支持配置SASL认证信息

go-queue 之 kq 的生产者不支持配置SASL认证信息

go-queue​ 的生产者写入时,没有支持配置SASL认证,官方的示例

type ServiceContext struct {
    Config         config.Config
  .....
    KqPusherClient *kq.Pusher
}

func NewServiceContext(c config.Config) *ServiceContext {
    return &ServiceContext{
        Config:         c,
    .....
        KqPusherClient: kq.NewPusher(c.KqPusherConf.Brokers, c.KqPusherConf.Topic),
    }
}

NewPusher​ 的实现:

func NewPusher(addrs []string, topic string, opts ...PushOption) *Pusher {
	producer := &kafka.Writer{
		Addr:        kafka.TCP(addrs...),
		Topic:       topic,
		Balancer:    &kafka.LeastBytes{},
		Compression: kafka.Snappy,
	}

    // ...
}

go-queue​ 在 segmentio/kafka-go​ 这个包基础上,使用 go-zero​ 进行了上层统一封装,看了下segmentio/kafka-go​包的使用说明,在创建kafka.Writer对象的时候,支持通过Transport来配置SASL认证信息

mechanism, err := scram.Mechanism(scram.SHA512, "username", "password")
if err != nil {
    panic(err)
}

// Transports are responsible for managing connection pools and other resources,
// it's generally best to create a few of these and share them across your
// application.
sharedTransport := &kafka.Transport{
    SASL: mechanism,
}

w := kafka.Writer{
	Addr:      kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
	Topic:     "topic-A",
	Balancer:  &kafka.Hash{},
	Transport: sharedTransport,
}

所以对go-queue​的NewPusher​源码做了部分更改:

func NewPusher(addrs []string, topic string, opts ...PushOption) *Pusher {
	producer := &kafka.Writer{
		Addr:        kafka.TCP(addrs...),
		Topic:       topic,
		Balancer:    &kafka.LeastBytes{},
		Compression: kafka.Snappy,
		Transport: &kafka.Transport{ // 添加的部分
			SASL: plain.Mechanism{
				Username: "username", // SASL 用户名
				Password: "password", // SASL 密码
			},
		},
	}
    
    // ...
}

Posted in PHP

发表评论

您的电子邮箱地址不会被公开。