为了账号安全,请及时绑定邮箱和手机立即绑定

pyspark计算当前时间和上次活动时间之间差异的移动平均值

pyspark计算当前时间和上次活动时间之间差异的移动平均值

海绵宝宝撒 2021-11-09 16:23:20
我有一些这样的记录。A    B1    2018-12-252    2019-01-151    2019-01-203    2018-01-012    2019-01-014    2018-04-093    2018-11-081    2018-03-20我想要得到的是这样的东西。第一步,在组内按升序排列。(不需要按A订购)A    B1    2018-03-201    2018-12-251    2019-01-203    2018-01-013    2018-11-082    2019-01-012    2019-01-154    2018-04-09第二步,获取组内连续行之间的时间差。A    B            C1    2018-03-20   NaN1    2018-12-25   2801    2019-01-20   263    2018-01-01   NaN3    2018-11-08   3112    2019-01-01   NaN2    2019-01-15   144    2018-04-09   NaN第三步,得到窗口大小为2的C的移动平均。(因为我只提供了很少的行作为例子,为了方便就选择大小2)A    B            C     moving_avg1    2018-03-20   NaN   NaN1    2018-12-25   280   2801    2019-01-20   26    1533    2018-01-01   NaN   NaN3    2018-11-08   311   3112    2019-01-01   NaN   NaN2    2019-01-15   14    144    2018-04-09   NaN   NaN如果 Windows 函数可以处理这种情况,该解决方案实际上不需要生成 C 列。我列出每个步骤只是为了确保您可以清楚地了解问题所在。结果集将如下所示A    B            moving_avg1    2018-03-20   NaN1    2018-12-25   2801    2019-01-20   1533    2018-01-01   NaN3    2018-11-08   3112    2019-01-01   NaN2    2019-01-15   144    2018-04-09   NaN注意:这是在 pyspark 上并使用数据框。不是在 Python 上使用 Pandas。
查看完整描述

2 回答

?
湖上湖

TA贡献2003条经验 获得超2个赞

可能有更聪明的方法来实现这一点,但您也可以使用 RDD :


from operator import add

from numpy import mean

from datetime import datetime


data = [(1, "2018-12-25"), (2, "2019-01-15"), (1, "2019-01-20"), (3, "2018-01-01"),

        (2, "2019-01-01"), (4, "2018-04-09"), (3, "2018-11-08"), (1, "2018-03-20")]

data = sc.parallelize(data).mapValues(lambda v: [datetime.strptime(v, "%Y-%m-%d")]).reduceByKey(add)


def computeMvgAvg(values):

sorted_date = sorted(values)

diffs = []

mvg_avg = []

for i in range(1, len(sorted_date)):

    diffs.append(int((sorted_date[i] - sorted_date[i-1]).total_seconds()/86400))

    mvg_avg.append(int(mean(diffs)))

diffs = [None] + diffs

mvg_avg = [None] + mvg_avg

return zip(sorted_date, diffs, mvg_avg)


sch = StructType([

   StructField("A", StringType(), True),

   StructField("B", DateType(), True),

   StructField("C", IntegerType(), True),

   StructField("moving_avg", IntegerType(), True)

])

data.flatMapValues(myMapValues).map(lambda row: [row[0]] + list(row[1])).toDF(schema=sch).show()


+---+----------+----+----------+

|  A|         B|   C|moving_avg|

+---+----------+----+----------+

|  1|2018-03-20|null|      null|

|  1|2018-12-25| 280|       280|

|  1|2019-01-20|  26|       153|

|  2|2019-01-01|null|      null|

|  2|2019-01-15|  14|        14|

|  3|2018-01-01|null|      null|

|  3|2018-11-08| 311|       311|

|  4|2018-04-09|null|      null|

+---+----------+----+----------+


查看完整回答
反对 回复 2021-11-09
?
慕姐8265434

TA贡献1813条经验 获得超2个赞

文档: 窗口

文档: 滞后

# Creating a Dataframe

from pyspark.sql.window import Window

from pyspark.sql.functions import col, to_date, lag, datediff, when, udf

df = sqlContext.createDataFrame([(1,'2018-12-25'),(2,'2019-01-15'),(1,'2019-01-20'),(3,'2018-01-01'),

                                 (2,'2019-01-01'),(4,'2018-04-09'),(3,'2018-11-08'),(1,'2018-03-20')],

                                 ['A','B'])

df = df.withColumn('B',to_date(col('B'), 'yyyy-MM-dd'))


# Using window and lag functions to find the value from previous row

my_window = Window.partitionBy('A').orderBy('A','B')


# Creating a UDF to calculate average of window sized 2.

def row_avg(c1,c2):

    count_non_null = 2

    total = 0

    if c1 == None:

        c1 = 0

        count_non_null = count_non_null - 1

    if c2 == None:

        c2 = 0

        count_non_null = count_non_null - 1

    if count_non_null == 0:

        return None

    else:

        return int((c1+c2)/count_non_null)


row_avg = udf(row_avg)


df = df.withColumn('B_Lag_1', lag(col('B'),1).over(my_window))\

       .withColumn('C', datediff(col('B'),col('B_Lag_1'))).drop('B_Lag_1')\

       .withColumn('C_Lag_1', lag(col('C'),1).over(my_window))\

       .withColumn('moving_avg',row_avg(col('C'),col('C_Lag_1'))).drop('C','C_Lag_1')

df.show()

+---+----------+----------+

|  A|         B|moving_avg|

+---+----------+----------+

|  1|2018-03-20|      null|

|  1|2018-12-25|       280|

|  1|2019-01-20|       153|

|  3|2018-01-01|      null|

|  3|2018-11-08|       311|

|  2|2019-01-01|      null|

|  2|2019-01-15|        14|

|  4|2018-04-09|      null|

+---+----------+----------+


查看完整回答
反对 回复 2021-11-09
  • 2 回答
  • 0 关注
  • 128 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信