为了账号安全,请及时绑定邮箱和手机立即绑定

Apache Spark 可以使用 TCP 侦听器作为输入吗?

Apache Spark 可以使用 TCP 侦听器作为输入吗?

12345678_0001 2023-06-21 16:05:24
Apache Spark 可以使用 TCP 侦听器作为输入吗?如果是,也许有人有执行该操作的 java 代码示例。我试图找到关于此的示例,但所有教程都展示了如何通过 TCP 定义到数据服务器的输入连接,而不是使用等待传入数据的 TCP 侦听器。
查看完整描述

2 回答

?
繁星点点滴滴

TA贡献1803条经验 获得超3个赞

是的,可以使用 Spark 监听 TCP 端口并处理任何传入数据。您正在寻找的是Spark Streaming。

为了方便:

import org.apache.spark.*;

import org.apache.spark.api.java.function.*;

import org.apache.spark.streaming.*;

import org.apache.spark.streaming.api.java.*;

import scala.Tuple2;


// Create a local StreamingContext with two working thread and batch interval of 1 second

SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");

JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));


// Create a DStream that will connect to hostname:port, like localhost:9999

JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);


// Split each line into words

JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());


// Count each word in each batch

JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));

JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2);


// Print the first ten elements of each RDD generated in this DStream to the console

wordCounts.print();


jssc.start();              // Start the computation

jssc.awaitTermination();   // Wait for the computation to terminate


查看完整回答
反对 回复 2023-06-21
?
慕姐8265434

TA贡献1813条经验 获得超2个赞

Spark没有内置的TCP服务器来等待生产者和缓冲数据。Spark 通过其 API 库在 TCP、Kafka 等的轮询机制上工作。要使用传入的 TCP 数据,您需要有一个 Spark 可以连接到的外部 TCP 服务器,如 Shaido 在示例中所解释的那样。



查看完整回答
反对 回复 2023-06-21
  • 2 回答
  • 0 关注
  • 90 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信