为了账号安全,请及时绑定邮箱和手机立即绑定

Spark SQL 分区依据、窗口、排序依据、计数

Spark SQL 分区依据、窗口、排序依据、计数

潇潇雨雨 2021-09-14 16:21:16
假设我有一个包含杂志订阅信息的数据框:subscription_id    user_id       created_at       expiration_date 12384               1           2018-08-10        2018-12-10 83294               1           2018-06-03        2018-10-03 98234               1           2018-04-08        2018-08-08 24903               2           2018-05-08        2018-07-08 32843               2           2018-03-25        2018-05-25 09283               2           2018-04-07        2018-06-07现在我想添加一个列,说明在当前订阅开始之前用户有多少以前的订阅已过期。换句话说,与给定用户相关联的到期日期在此订阅的开始日期之前。这是完整的所需输出:subscription_id    user_id       created_at       expiration_date   previous_expired 12384               1           2018-08-10        2018-12-10          1 83294               1           2018-06-03        2018-10-03          0 98234               1           2018-04-08        2018-08-08          0 24903               2           2018-05-08        2018-07-08          2 32843               2           2018-03-25        2018-05-03          1 09283               2           2018-01-25        2018-02-25          0尝试:编辑:使用 Python 尝试了各种滞后/领先/等,我现在认为这是一个 SQL 问题df = df.withColumn('shiftlag', func.lag(df.expires_at).over(Window.partitionBy('user_id').orderBy('created_at')))<--- 编辑,编辑:没关系,这行不通我想我用尽了滞后/领先/转移方法,发现它不起作用。我现在认为最好使用 Spark SQL 来做到这一点,也许使用 acase when来生成新列,结合 a having count,按 ID 分组?
查看完整描述

1 回答

?
神不在的星期二

TA贡献1963条经验 获得超6个赞

使用 PySpark 解决了这个问题:


我首先创建了另一个列,其中包含每个用户的所有到期日期数组:


joined_array = df.groupBy('user_id').agg(collect_set('expiration_date'))

然后将该数组加入到原始数据帧中:


joined_array = joined_array.toDF('user_idDROP', 'expiration_date_array')

df = df.join(joined_array, df.user_id == joined_array.user_idDROP, how = 'left').drop('user_idDROP')

然后创建一个函数来遍历数组,如果创建日期大于到期日期,则将计数加 1:


def check_expiration_count(created_at, expiration_array):

  if not expiration_array:

    return 0

  else:

   count = 0

    for i in expiration_array:

  if created_at > i:

    count += 1

return count


check_expiration_count = udf(check_expiration_count, IntegerType())

然后应用该函数创建一个具有正确计数的新列:


df = df.withColumn('count_of_subs_ending_before_creation', check_expiration_count(df.created_at, df.expiration_array))

瓦拉。完毕。谢谢大家(没有人帮忙,但还是谢谢)。希望有人在 2022 年发现这很有用


查看完整回答
反对 回复 2021-09-14
  • 1 回答
  • 0 关注
  • 227 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信