为了账号安全,请及时绑定邮箱和手机立即绑定

Spark 未使用二进制文件在并行 Pyspark 中运行 RDD

Spark 未使用二进制文件在并行 Pyspark 中运行 RDD

汪汪一只猫 2023-12-26 15:14:41
我是 Spark 的新手,开始用 Python 编写一些脚本。我的理解是Spark并行执行Transformation(map)。def some_function(name, content):    print(name, datetime.now())    time.sleep(30)    return contentconfig = SparkConf().setAppName("sample2").setMaster("local[*]")filesRDD = SparkContext(conf=config).binaryFiles("F:\\usr\\temp\\*.zip")inputfileRDD = filesRDD.map(lambda job_bundle: (job_bundle[0], some_function(job_bundle[0], job_bundle[1])))print(inputfileRDD.collect())上面的代码.zip从文件夹中收集文件列表并对其进行处理。当我执行它时,我看到这是按顺序发生的。输出file:/F:/usr/temp/sample.zip 2020-10-22 10:42:37.089085file:/F:/usr/temp/sample1.zip 2020-10-22 10:43:07.103317您可以看到它在 30 秒后开始处理第二个文件。意思是完成第一个文件后。我的代码出了什么问题?为什么它不并行执行RDD?你能帮我么 ?
查看完整描述

1 回答

?
红糖糍粑

TA贡献1815条经验 获得超6个赞

我不确切知道该方法如何binaryFiles跨 Spark 分区对文件进行分区。似乎与此相反textFiles,它倾向于只创建一个分区。让我们看看一个名为 的示例目录dir,其中包含 5 个文件。


> ls dir

test1  test2  test3  test4  test5

如果我使用textFile,事情就会并行运行。我不提供输出,因为它不是很漂亮,但你可以自己检查。我们可以验证事物是否与 并行运行getNumPartitions。


>>> sc.textFile("dir").foreach(lambda x: some_function(x, None))

# ugly output, but everything starts at the same time,

# except maybe the last one since you have 4 cores.

>>> sc.textFile("dir").getNumPartitions()

5

由于binaryFiles情况不同,并且由于某种原因,所有内容都进入同一个分区。


>>> sc.binaryFiles("dir").getNumPartitions()

1

我什至尝试使用 10k 个文件,所有内容仍然位于同一分区。我相信这背后的原因是,在scala中,binaryFiles返回一个带有文件名的RDD和一个允许读取文件的对象(但不执行读取)。因此速度很快,并且生成的 RDD 很小。因此,将其放在一个分区上就可以了。在 scala 中,我们可以在使用后使用重新分区binaryFiles,一切都会很好。


scala> sc.binaryFiles("dir").getNumPartitions

1

scala> sc.binaryFiles("dir").repartition(4).getNumPartitions

4

scala> sc.binaryFiles("dir").repartition(4)

    .foreach{ case (name, ds) => { 

        println(System.currentTimeMillis+": "+name)

        Thread.sleep(2000)

        // do some reading on the DataStream ds

    }}

1603352918396: file:/home/oanicol/sandbox/dir/test1

1603352918396: file:/home/oanicol/sandbox/dir/test3

1603352918396: file:/home/oanicol/sandbox/dir/test4

1603352918396: file:/home/oanicol/sandbox/dir/test5

1603352920397: file:/home/oanicol/sandbox/dir/test2

python 中的问题是binaryFiles实际上将文件读取到一个分区上。另外,这对我来说非常神秘,但是 pyspark 2.4 中的以下代码行会产生与您注意到的相同的行为,这是没有意义的。


# this should work but does not

sc.binaryFiles("dir", minPartitions=4).foreach(lambda x: some_function(x, ''))

# this does not work either, which is strange but it would not be advised anyway

# since all the data would be read on one partition

sc.binaryFiles("dir").repartition(4).foreach(lambda x: some_function(x, ''))

然而,由于binaryFiles实际读取文件,您可以使用wholeTextFile它将文件作为文本文件读取并按预期运行:


# this works

sc.wholeTextFiles("dir", minPartitions=4).foreach(lambda x: some_function(x, ''))


查看完整回答
反对 回复 2023-12-26
  • 1 回答
  • 0 关注
  • 35 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信