为了账号安全,请及时绑定邮箱和手机立即绑定

使用pyspark并行化排序算法

使用pyspark并行化排序算法

德玛西亚99 2021-09-11 15:00:49
早上好,我开发了一个简单的归并排序算法,我想比较它在并行化和非并行化时的性能。首先,我生成一个数字列表来排序并检查合并排序对列表进行排序需要多长时间。我要做的下一件事是将数字列表传递给sc.parallelize()并转换为list,RDD然后将合并排序函数传递给mapPartitions()然后collect()。import randomimport timefrom pyspark import SparkContextdef execute_merge_sort(generated_list):    start_time = time.time()    sorted_list = merge_sort(generated_list)    elapsed = time.time() - start_time    print('Simple merge sort: %f sec' % elapsed)    return sorted_listdef generate_list(length):    N = length    generated_list = [random.random() for num in range(N)]    return generated_listdef merging(left_side, right_side):    result = []    i = j = 0    while i < len(left_side) and j < len(right_side):        if left_side[i] <= right_side[j]:            result.append(left_side[i])            i += 1        else:            result.append(right_side[j])            j += 1    if i == len(left_side):        result.extend(right_side[j:])    else:        result.extend(left_side[i:])    return resultdef merge_sort(generated_list):    if len(generated_list) <= 1:        return generated_list    middle_value = len(generated_list) // 2    sorted_list = merging(merge_sort(generated_list[:middle_value]), merge_sort(generated_list[middle_value:]))    return sorted_listdef is_sorted(num_array):    for i in range(1, len(num_array)):        if num_array[i] < num_array[i - 1]:            return False    return Truegenerate_list = generate_list(500000)sorted_list = execute_merge_sort(generate_list)sc = SparkContext()rdd = sc.parallelize(generate_list).mapPartitions(execute_merge_sort).collect()当我执行此操作时sc.parallelize(generate_list).mapPartitions(execute_merge_sort).collect(),出现以下错误:File "<ipython-input-15-1b7974b4fa56>", line 7, in execute_merge_sort  File "<ipython-input-15-1b7974b4fa56>", line 36, in merge_sortTypeError: object of type 'itertools.chain' has no len()任何帮助,将不胜感激。提前致谢。
查看完整描述

1 回答

?
当年话下

TA贡献1890条经验 获得超9个赞

我想出了如何解决TypeError: 'float' object is not iterable.

这可以通过使用flatMap(lambda x: x)和调用扁平化数据glom()以包装列表并使其可由函数执行来解决execute_merge_sort。通过执行以下行,返回的结果是一个包含排序列表的列表。

sc.parallelize(random_list_of_lists).flatMap(lambda x: x).glom().mapPartitions(execute_merge_sort_rdd).collect()



查看完整回答
反对 回复 2021-09-11
  • 1 回答
  • 0 关注
  • 241 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信