为了账号安全,请及时绑定邮箱和手机立即绑定

Apache Beam:无法通过 docker-compose 访问 Pub/Sub

Apache Beam:无法通过 docker-compose 访问 Pub/Sub

婷婷同学_ 2022-07-27 09:50:46
我已经构建了一个软件,它使用 GCP Pub/Sub 作为消息队列,使用 Apache Beam 构建管道,使用 Flask 构建网络服务器。它在生产中运行顺利,但我无法将所有部分与 docker-compose 连接在一起,特别是 Apache Beam 管道。我遵循Dataflow 管道和 pubsub 模拟器,通过将localhostSO 答案中docker-compose.yaml的  pubsub_emulator:    build: docker_images/message_queue    ports:      - 8085:8085  webserver:    build: docker_images/webserver    environment:      PUBSUB_EMULATOR_HOST: pubsub_emulator:8085      PUBSUB_PROJECT_ID: my-dev    restart: unless-stopped    ports:      - 8899:8080    depends_on:      - pubsub_emulator   pipeline:    build: docker_images/pipeline    environment:      PUBSUB_EMULATOR_HOST: pubsub_emulator:8085      PUBSUB_PROJECT_ID: my-dev    restart: unless-stopped    depends_on:      - pubsub_emulator网络服务器能够访问 Pub/Sub 模拟器并生成主题。但是,管道在启动时失败,并显示MalformedURLException:Caused by: java.lang.IllegalArgumentException: java.net.MalformedURLException: no protocol: pubsub_emulator:8085/v1/projects/my-dev/subscriptions/sync_beam_1702190853678138166管道的选项看起来不错,我用以下方式定义它们:final String pubSubEmulatorHost = System.getenv("PUBSUB_EMULATOR_HOST"); BasePipeline.PipeOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()                                .as(BasePipeline.PipeOptions.class);options.as(DataflowPipelineOptions.class).setStreaming(true);options.as(PubsubOptions.class).setPubsubRootUrl(pubSubEmulatorHost);Pipeline pipeline = Pipeline.create(options);有人知道正在发生的事情以及如何解决它吗?唯一的解决方案是否意味着将模拟器和管道设置在同一个 docker 中?
查看完整描述

1 回答

?
收到一只叮咚

TA贡献1821条经验 获得超5个赞

您可以尝试将值更改为以下内容:


http://pubsub_emulator:8085

作为抱怨丢失的错误protocol,预计会出现http在您的情况下


根据Apache Beam SDK,该值应为完全限定的 URL:


// getPubsubRootUrl

@Default.String(value="https://pubsub.googleapis.com")

 @Hidden

java.lang.String getPubsubRootUrl()

// Root URL for use with the Google Cloud Pub/Sub API.

但是,如果您来自 python 背景,您会注意到这里显示的使用gRPC Python的Python SDK只需要包含地址和端口的服务器地址


# A snippet from google-cloud-python library.

if os.environ.get("PUBSUB_EMULATOR_HOST"):

    kwargs["channel"] = grpc.insecure_channel(

        target=os.environ.get("PUBSUB_EMULATOR_HOST")

    )

grpc.insecure_channel(target, options=None)

Creates an insecure Channel to a server.


The returned Channel is thread-safe.


Parameters: 

target – The server address


查看完整回答
反对 回复 2022-07-27
  • 1 回答
  • 0 关注
  • 115 浏览

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号