为了账号安全,请及时绑定邮箱和手机立即绑定

Python Spark - 在阈值后删除数据 - Pyspark

Python Spark - 在阈值后删除数据 - Pyspark

狐的传说 2023-10-26 10:23:10
如何删除最后TP == 1一个具有 48 小时缓冲区的数据?例如ID = A9,最后一个TP == 1是 on 2020-05-06 13:00。我想保留该组 ID 的所有数据,直到2020-05-06 13:00最后TP == 1加上接下来的 2 天?+---++--------+----------------+| id|       TP|            Date|+---+---------+----------------+| A1|     Null|2010-01-01 12:00|| A1|     Null|2010-01-01 13:00|| A1|        1|2010-01-02 01:00|| A1|     Null|2010-01-02 02:00|| A9|     Null|2010-05-05 12:00|| A9|        1|2010-05-05 13:00|| A9|        1|2010-05-06 13:00|| A9|     Null|2010-05-09 13:00|+---+---------+----------------+所需的数据框+---++--------+----------------+| id|       TP|            Date|+---+---------+----------------+| A1|     Null|2010-01-01 12:00|| A1|     Null|2010-01-01 13:00|| A1|        1|2010-01-02 01:00|| A1|     Null|2010-01-02 02:00|| A9|     Null|2010-05-05 12:00|| A9|        1|2010-05-05 13:00|| A9|        1|2010-05-06 13:00|+---+---------+----------------+这就是我在 Pandas 中所做的,但对于 15M+ 的观察结果效率不高main_pd = main.toPandas()bigdf = pd.DataFrame()for i in main_pd.ID.unique():  df = main_pd[main_pd.ID == i]  TPdate = df[df.TP == 1]['Date'].max()+pd.Timedelta('3 days 0 hours')  df = df[(df.Date <= TPdate)]  bigdf = bigdf.append(df)
查看完整描述

2 回答

?
月关宝盒

TA贡献1772条经验 获得超5个赞

IIUC,您可以使用窗口函数查找max(IF(TP=1, Date, NULL))每个id,然后按此阈值进行过滤:


from pyspark.sql import Window, functions as F

w1 = Window.partitionBy('id')


df_new = df.withColumn('Date', F.to_timestamp('Date', 'yyyy-MM-dd HH:mm')) \

    .withColumn('threshhold_date', F.expr("max(IF(TP=1, Date, NULL))").over(w1)) \

    .filter('Date <= threshhold_date + interval 2 days') 

df_new.show()

+---+----+-------------------+-------------------+

| id|  TP|               Date|    threshhold_date|

+---+----+-------------------+-------------------+

| A9|Null|2010-05-05 12:00:00|2010-05-06 13:00:00|

| A9|   1|2010-05-05 13:00:00|2010-05-06 13:00:00|

| A9|   1|2010-05-06 13:00:00|2010-05-06 13:00:00|

| A1|Null|2010-01-01 12:00:00|2010-01-02 01:00:00|

| A1|Null|2010-01-01 13:00:00|2010-01-02 01:00:00|

| A1|   1|2010-01-02 01:00:00|2010-01-02 01:00:00|

| A1|Null|2010-01-02 02:00:00|2010-01-02 01:00:00|

+---+----+-------------------+-------------------+


查看完整回答
反对 回复 2023-10-26
?
阿晨1998

TA贡献2037条经验 获得超6个赞

您可以简单地过滤数据帧TP = 1, 并使用collect()[0]来获取列的最大值Date作为变量。

使用以下命令向该变量添加 48 小时timedelta并过滤df:



from pyspark.sql.functions import *

from datetime import timedelta


date_var = df.filter(col("TP")==1).orderBy("date", ascending=False)\

                .collect()[0]["date"] + timedelta(hours=48)


df.filter(col("Date")<=date_var).show()


查看完整回答
反对 回复 2023-10-26
  • 2 回答
  • 0 关注
  • 67 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信