为了账号安全,请及时绑定邮箱和手机立即绑定

Spark Window函数-日期之间的range

/ 猿问

Spark Window函数-日期之间的range

慕村225694 2019-11-30 14:01:33

我有一个DataFrame包含数据的Spark SQL ,我要获取的是给定日期范围内当前行之前的所有行。因此,举例来说,我想让7天之前的所有行都排在给定行的前面。我发现我需要使用一个Window Function喜欢:


Window \

    .partitionBy('id') \

    .orderBy('start')

问题来了。我希望有rangeBetween7天的时间,但是在Spark文档中我什么都找不到。Spark甚至提供这种选择吗?现在,我只获得前面的所有行:


.rowsBetween(-sys.maxsize, 0)

但想要达到以下目标:


.rangeBetween("7 days", 0)

如果有人可以帮助我,我将非常感激。提前致谢!


查看完整描述

2 回答

?
aluckdog

火花> = 2.3


从Spark 2.3开始,可以使用SQL API使用间隔对象,但是DataFrameAPI支持仍在进行中。


df.createOrReplaceTempView("df")


spark.sql(

    """SELECT *, mean(some_value) OVER (

        PARTITION BY id 

        ORDER BY CAST(start AS timestamp) 

        RANGE BETWEEN INTERVAL 7 DAYS PRECEDING AND CURRENT ROW

     ) AS mean FROM df""").show()


## +---+----------+----------+------------------+       

## | id|     start|some_value|              mean|

## +---+----------+----------+------------------+

## |  1|2015-01-01|      20.0|              20.0|

## |  1|2015-01-06|      10.0|              15.0|

## |  1|2015-01-07|      25.0|18.333333333333332|

## |  1|2015-01-12|      30.0|21.666666666666668|

## |  2|2015-01-01|       5.0|               5.0|

## |  2|2015-01-03|      30.0|              17.5|

## |  2|2015-02-01|      20.0|              20.0|

## +---+----------+----------+------------------+

火花<2.3


据我所知,在Spark和Hive中都不可能直接实现。两者的require ORDER BY子句都使用RANGE数字。我发现最接近的是转换为时间戳并以秒为单位进行操作。假设start列包含date类型:


from pyspark.sql import Row


row = Row("id", "start", "some_value")

df = sc.parallelize([

    row(1, "2015-01-01", 20.0),

    row(1, "2015-01-06", 10.0),

    row(1, "2015-01-07", 25.0),

    row(1, "2015-01-12", 30.0),

    row(2, "2015-01-01", 5.0),

    row(2, "2015-01-03", 30.0),

    row(2, "2015-02-01", 20.0)

]).toDF().withColumn("start", col("start").cast("date"))

一个小助手和窗口定义:


from pyspark.sql.window import Window

from pyspark.sql.functions import mean, col



# Hive timestamp is interpreted as UNIX timestamp in seconds*

days = lambda i: i * 86400 

最后查询:


w = (Window()

   .partitionBy(col("id"))

   .orderBy(col("start").cast("timestamp").cast("long"))

   .rangeBetween(-days(7), 0))


df.select(col("*"), mean("some_value").over(w).alias("mean")).show()


## +---+----------+----------+------------------+

## | id|     start|some_value|              mean|

## +---+----------+----------+------------------+

## |  1|2015-01-01|      20.0|              20.0|

## |  1|2015-01-06|      10.0|              15.0|

## |  1|2015-01-07|      25.0|18.333333333333332|

## |  1|2015-01-12|      30.0|21.666666666666668|

## |  2|2015-01-01|       5.0|               5.0|

## |  2|2015-01-03|      30.0|              17.5|

## |  2|2015-02-01|      20.0|              20.0|

## +---+----------+----------+------------------+

远非漂亮,但可行。


查看完整回答
反对 回复 2019-11-30
?
莫回无

我使用Spark 2.3,但第一个选项对我不起作用并引发异常。2.4.0版scala.MatchError: CalendarIntervalType (of class org.apache.spark.sql.types.CalendarIntervalType$) 中将解决一个JIRA问题:issues.apache.org/jira/browse/SPARK-25845 

查看完整回答
反对 回复 2019-11-30

添加回答

回复

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信