为了账号安全,请及时绑定邮箱和手机立即绑定

优化 pyspark 中的行访问和转换

优化 pyspark 中的行访问和转换

慕尼黑5688855 2022-12-14 17:36:37
我在 S3 存储桶中有一个以 jason 形式存在的大型数据集 (5GB)。我需要转换数据的模式,并使用 ETL 脚本将转换后的数据写回 S3。所以我使用爬虫来检测架构并将数据加载到 pyspark 数据框中,然后更改架构。现在我遍历数据框中的每一行并将其转换为字典。删除空列,然后将字典转换为字符串并写回 S3。以下是代码:#df is the pyspark dataframecolumns = df.columnsprint(columns)s3 = boto3.resource('s3')cnt = 1for row in df.rdd.toLocalIterator():    data = row.asDict(True)    for col_name in columns:        if data[col_name] is None:            del data[col_name]    content = json.dumps(data)    object = s3.Object('write-test-transaction-transformed', str(cnt)).put(Body=content)    cnt = cnt+1print(cnt)我用过 toLocalIterator。上面代码的执行是串行执行的吗?如果是那么如何优化它?有没有更好的方法来执行上述逻辑?
查看完整描述

3 回答

?
波斯汪

TA贡献1811条经验 获得超4个赞

假设,数据集中的每一行都是 json 字符串格式


import pyspark.sql.functions as F


def drop_null_cols(data):

    import json

    content = json.loads(data)

    for key, value in list(content.items()):

        if value is None:

            del content[key]


    return json.dumps(content)


drop_null_cols_udf = F.udf(drop_null_cols, F.StringType())


df = spark.createDataFrame(

    ["{\"name\":\"Ranga\", \"age\":25, \"city\":\"Hyderabad\"}",

     "{\"name\":\"John\", \"age\":null, \"city\":\"New York\"}",

     "{\"name\":null, \"age\":31, \"city\":\"London\"}"],

    "string"

).toDF("data")


df.select(

    drop_null_cols_udf("data").alias("data")

).show(10,False)

如果输入数据框有 cols 并且输出只需要不是 null cols json


df = spark.createDataFrame(

        [('Ranga', 25, 'Hyderabad'),

         ('John', None, 'New York'),

         (None, 31, 'London'),

        ],

        ['name', 'age', 'city']

    )


df.withColumn(

    "data", F.to_json(F.struct([x for x in df.columns]))

).select(

    drop_null_cols_udf("data").alias("data")

).show(10, False)


#df.write.format("csv").save("s3://path/to/file/) -- save to s3

结果


+-------------------------------------------------+

|data                                             |

+-------------------------------------------------+

|{"name": "Ranga", "age": 25, "city": "Hyderabad"}|

|{"name": "John", "city": "New York"}             |

|{"age": 31, "city": "London"}                    |

+-------------------------------------------------+


查看完整回答
反对 回复 2022-12-14
?
智慧大石

TA贡献1946条经验 获得超3个赞

我将遵循以下方法(用 scala 编写,但可以在 python 中以最小的变化实现)-


找到数据集计数并将其命名为totalCount

val totalcount = inputDF.count()

查找count(col)所有数据框列,并将字段映射到它们的计数


这里对于输入数据框的所有列,计算计数

请注意,count(anycol)返回提供的列全部非空的行数。例如 - 如果一列有 10 行值,如果说有 5 个值,null则计数(列)变为 5

获取第一行Map[colName, count(colName)]称为fieldToCount

val cols = inputDF.columns.map { inputCol =>

      functions.count(col(inputCol)).as(inputCol)

    }

// Returns the number of rows for which the supplied column are all non-null.

    // count(null) returns 0

    val row = dataset.select(cols: _*).head()

    val fieldToCount = row.getValuesMap[Long]($(inputCols))

获取要删除的列


在此处使用步骤#2 中创建的 Map,并将计数小于 totalCount 的列标记为要删除的列

从输入数据框中选择所有列,并count == totalCount根据要求以任何格式将处理后的输出数据框保存在任何地方。

请注意,this approach will remove all the column having at least one null value

val fieldToBool = fieldToCount.mapValues(_ < totalcount)

val processedDF = inputDF.select(fieldToBool.filterNot(_._2).map(_.1) :_*)

// save this processedDF anywhere in any format as per requirement

我相信这种方法会比您目前使用的方法表现更好


查看完整回答
反对 回复 2022-12-14
?
Cats萌萌

TA贡献1805条经验 获得超9个赞

我解决了上面的问题。我们可以简单地查询数据框的空值。df = df.filter(df.column.isNotNull()) 从而删除所有存在 null 的行。所以如果有 n 列,我们需要 2^n 次查询来筛选出所有可能的组合。在我的例子中,有 10 列,所以总共有 1024 个查询,这是可以接受的,因为 sql 查询是并行化的。



查看完整回答
反对 回复 2022-12-14
  • 3 回答
  • 0 关注
  • 85 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信