3 回答
TA贡献1811条经验 获得超4个赞
假设,数据集中的每一行都是 json 字符串格式
import pyspark.sql.functions as F
def drop_null_cols(data):
import json
content = json.loads(data)
for key, value in list(content.items()):
if value is None:
del content[key]
return json.dumps(content)
drop_null_cols_udf = F.udf(drop_null_cols, F.StringType())
df = spark.createDataFrame(
["{\"name\":\"Ranga\", \"age\":25, \"city\":\"Hyderabad\"}",
"{\"name\":\"John\", \"age\":null, \"city\":\"New York\"}",
"{\"name\":null, \"age\":31, \"city\":\"London\"}"],
"string"
).toDF("data")
df.select(
drop_null_cols_udf("data").alias("data")
).show(10,False)
如果输入数据框有 cols 并且输出只需要不是 null cols json
df = spark.createDataFrame(
[('Ranga', 25, 'Hyderabad'),
('John', None, 'New York'),
(None, 31, 'London'),
],
['name', 'age', 'city']
)
df.withColumn(
"data", F.to_json(F.struct([x for x in df.columns]))
).select(
drop_null_cols_udf("data").alias("data")
).show(10, False)
#df.write.format("csv").save("s3://path/to/file/) -- save to s3
结果
+-------------------------------------------------+
|data |
+-------------------------------------------------+
|{"name": "Ranga", "age": 25, "city": "Hyderabad"}|
|{"name": "John", "city": "New York"} |
|{"age": 31, "city": "London"} |
+-------------------------------------------------+
TA贡献1946条经验 获得超3个赞
我将遵循以下方法(用 scala 编写,但可以在 python 中以最小的变化实现)-
找到数据集计数并将其命名为totalCount
val totalcount = inputDF.count()
查找count(col)所有数据框列,并将字段映射到它们的计数
这里对于输入数据框的所有列,计算计数
请注意,count(anycol)返回提供的列全部非空的行数。例如 - 如果一列有 10 行值,如果说有 5 个值,null则计数(列)变为 5
获取第一行Map[colName, count(colName)]称为fieldToCount
val cols = inputDF.columns.map { inputCol =>
functions.count(col(inputCol)).as(inputCol)
}
// Returns the number of rows for which the supplied column are all non-null.
// count(null) returns 0
val row = dataset.select(cols: _*).head()
val fieldToCount = row.getValuesMap[Long]($(inputCols))
获取要删除的列
在此处使用步骤#2 中创建的 Map,并将计数小于 totalCount 的列标记为要删除的列
从输入数据框中选择所有列,并count == totalCount根据要求以任何格式将处理后的输出数据框保存在任何地方。
请注意,this approach will remove all the column having at least one null value
val fieldToBool = fieldToCount.mapValues(_ < totalcount)
val processedDF = inputDF.select(fieldToBool.filterNot(_._2).map(_.1) :_*)
// save this processedDF anywhere in any format as per requirement
我相信这种方法会比您目前使用的方法表现更好
TA贡献1805条经验 获得超9个赞
我解决了上面的问题。我们可以简单地查询数据框的空值。df = df.filter(df.column.isNotNull()) 从而删除所有存在 null 的行。所以如果有 n 列,我们需要 2^n 次查询来筛选出所有可能的组合。在我的例子中,有 10 列,所以总共有 1024 个查询,这是可以接受的,因为 sql 查询是并行化的。
添加回答
举报