为了账号安全,请及时绑定邮箱和手机立即绑定

Apache Camel 自定义组件消费者不调用其他处理器

Apache Camel 自定义组件消费者不调用其他处理器

慕工程0101907 2022-07-06 19:05:38
我在Apache Camel中编写了一个自定义组件。骆驼成功地创建了它的消费者并使用我的 URI,但没有调用处理器。这是我的消费者代码片段(在 Kotlin 中):class SoroushBotConsumer (private val endpoint: MyEndpoint, processor: Processor) : DefaultConsumer(endpoint, processor) {    val objectMapper:ObjectMapper = ObjectMapper();    init {        startListening()    }    private fun startListening() {        val client = ClientBuilder.newBuilder().register(SseFeature::class.java).build()        val target = client.target("MY_URL"))        while(true){            var e: EventInput?  target.request().get(EventInput::class.java)!!            val inboundEvent = e.read()            val exchange = endpoint.createExchange()            exchange.getIn().body = objectMapper.readValue(inboundEvent.rawData,MessageModel::class.java)            try {                processor.process(exchange)            } catch (e: Exception) {                if (exchange.exception != null) {                exceptionHandler.handleException("Error processing exchange",exchange, exchange.exception)            }        }    }}在消费者中一切正常,但没有处理器被执行。这是我创建路线的方法!var context = DefaultCamelContext()context.addRoutes(object : RouteBuilder() {    override fun configure() {        from("myapp://getMessage/).process{            println(it.getIn())        }.to("myapp://sendMessage/")    }})context.start();Thread.sleep(100000);context.stop();它既不调用流程也不创建我的生产者。(它甚至不调用MyEndpoint::createProducer())当我用另一个端点替换我的from语句时file,一切正常。更新:ScheduledPollConsumer当我从实现方法扩展我的消费者时pull,一切都很好。
查看完整描述

2 回答

?
慕田峪7331174

TA贡献1828条经验 获得超13个赞

在 doStart 方法中设置无限循环并不是一个好主意,您将在其中劫持当前线程,然后该线程将永远不会终止。相反,您应该设置一个运行此作业的后台线程,并且您可以从 doStart 设置此线程并让它运行。换句话说,组件“接收”消息的方式是 100% 特定于组件的,因为每个组件都有自己的方式。然后在 doStop 方法中,您有逻辑来停止该后台线程并清理您的任何资源。



查看完整回答
反对 回复 2022-07-06
?
三国纷争

TA贡献1804条经验 获得超7个赞

是的,因为我们必须完成consumer的构造函数,并将接收消息的逻辑写在doStart()


class SoroushBotConsumer (private val endpoint: MyEndpoint, processor: Processor) : DefaultConsumer(endpoint, processor) {

    val objectMapper:ObjectMapper = ObjectMapper();

    override fun doStart() {

        val client = ClientBuilder.newBuilder().register(SseFeature::class.java).build()

        val target = client.target("MY_URL"))

        while(true){

            var e: EventInput?  target.request().get(EventInput::class.java)!!


            val inboundEvent = e.read()

            val exchange = endpoint.createExchange()

            exchange.getIn().body = objectMapper.readValue(inboundEvent.rawData,MessageModel::class.java)

            try {

                processor.process(exchange)

            } catch (e: Exception) {

                if (exchange.exception != null) {

                    exceptionHandler.handleException("Error processing exchange",exchange, exchange.exception)

                }

            }

        }

    }

}


查看完整回答
反对 回复 2022-07-06
  • 2 回答
  • 0 关注
  • 207 浏览

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号