为了账号安全,请及时绑定邮箱和手机立即绑定

Scala火花,列表缓冲区为空

Scala火花,列表缓冲区为空

largeQ 2019-12-06 14:53:56
在这段注释中的代码中,正确显示了列表缓冲区项的长度,但是在第二注释中,代码从未执行。为什么会发生?val conf = new SparkConf().setAppName("app").setMaster("local")val sc = new SparkContext(conf)var wktReader: WKTReader = new WKTReader(); val dataSet = sc.textFile("dataSet.txt")val items = new ListBuffer[String]() dataSet.foreach { e =>  items += e  println("len = " + items.length) //1. here length is ok}println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")items.foreach { x => print(x)} //2. this code doesn't be executed日志在这里:16/11/20 01:16:52 INFO Utils: Successfully started service 'SparkUI' on port 4040.    16/11/20 01:16:52 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.56.1:4040    16/11/20 01:16:53 INFO Executor: Starting executor ID driver on host localhost    16/11/20 01:16:53 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 58608.    16/11/20 01:16:53 INFO NettyBlockTransferService: Server created on 192.168.56.1:58608    16/11/20 01:16:53 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.56.1, 58608)    16/11/20 01:16:53 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.56.1:58608 with 347.1 MB RAM, BlockManagerId(driver, 192.168.56.1, 58608)    16/11/20 01:16:53 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.56.1, 58608)    Starting app
查看完整描述

2 回答

?
万千封印

TA贡献1891条经验 获得超3个赞

因此,Apache Spark不提供共享内存:


dataSet.foreach { e =>

  items += e

  println("len = " + items.length) //1. here length is ok

}

您可以在各自的执行者上修改的本地副本items。items驱动程序上定义的原始列表未修改。结果是:


items.foreach { x => print(x) }

执行,但没有要打印的内容。


请检查了解关闭


虽然这里建议这样做,但是您可以用累加器代替物品


val acc = sc.collectionAccumulator[String]("Items")

dataSet.foreach(e => acc.add(e))


查看完整回答
反对 回复 2019-12-06
?
动漫人物

TA贡献1815条经验 获得超10个赞

Spark在执行程序中运行并返回结果。上面的代码无法正常工作。如果您需要从中添加元素,foreach则需要在驱动程序中收集数据并将其添加到中current_set。但是,当您拥有大数据时,收集数据不是一个好主意。


val items = new ListBuffer[String]()


val rdd = spark.sparkContext.parallelize(1 to 10, 4)

rdd.collect().foreach(data => items += data.toString())

println(items)

输出:


ListBuffer(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)


查看完整回答
反对 回复 2019-12-06
  • 2 回答
  • 0 关注
  • 551 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信