找到你要的答案

Q:Apache Spark - Dealing with Sliding Windows on Temporal RDDs

Q:Apache的火花处理时态RDDS滑动窗口

I've been working quite a lot with Apache Spark the last few months but now I have received a pretty difficult task, to compute average/minimum/maximum etcetera on a sliding window over a paired RDD where the Key component is a date tag and the value component is a matrix. So each aggregation function should also return a matrix, where for each cell the average for all of that cell in the time period is averaged.

I want to be able to say that I want the average for every 7 days, with a sliding window of one day. The sliding window movement unit is always one, and then the unit of the size of the window (so if it's every 12 weeks, the window movement unit is 1).

My initial thought now is to simply iterate, if we want an average per X days, X times, and for each time just group the elements by it's date, with an offset.

So if we have this scenario:

Days: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15

Matrices: A B C D E F G H I J K L M N O

And we want the average per 5 days, I will iterate 5 times and show the grouping here:

First iteration:

Group 1: (1, A) (2, B) (3, C) (4, D) (5, E)

Group 2: (6, F) (7, G) (8, H) (9, I) (10, J)

Group 3: (11, K) (12, L) (13, M) (14, N) (15, O)

Second iteration:

Group 1: (2, B) (3, C) (4, D) (5, E) (6, F)

Group 2: (7, G) (8, H) (9, I) (10, J), (11, K)

Group 3: (12, L) (13, M) (14, N) (15, O)

Etcetera, and for each group, I have to do a fold/reduce procedure to get the average.

However as you might imagine, this is pretty slow and probably a rather bad way to do it. I can't really figure out any better way to do it though.

我已经工作了很多与Apache Spark过去几个月,但现在我已经收到了一个非常艰难的任务,来计算平均/最大/最小等等对滑动窗口的配对RDD的关键部件是一个日期的标签和值组成的矩阵。因此,每个聚合函数也应该返回一个矩阵,其中每个单元格的平均时间段中的所有单元格的平均。

我想说,我想要的平均每7天,滑动窗口的一天。滑动窗口的移动单元始终是一个,然后窗口的大小的单位(因此,如果它是每12周,窗口移动单元是1)。

我最初的想法是简单的重复,如果我们希望每X天,平均X倍,而每次组的元素的日期,与偏移。

所以,如果我们有这种情况:

日期:1,3,6,8,9,11,12,13,14,15,4,2,7,10

矩阵:一个矩阵,我是一个

我们希望平均每5天,我将重复5次,表明这里的分组:

第一次迭代:

第1组:(1,a)(2,B)(3,C)(4,D)(5,e)

第2组:(6,f)(7,g)(8,h)(9,I)(10,J)

第3组:(11,K)(12,升)(13,m)(14,N)(15,O)

二次迭代:

第1组:(2,b)(3,c)(4,D)(5,e)(6,F)

第2组:(7,g)(8,h)(9,I)(10,j),(11,K)

第3组:(12,升)(13,m)(14,N)(15,O)

Etcetera,并为每个组,我要做一个折叠/减少程序得到的平均。

然而,你可以想象,这是相当缓慢,可能是一个相当糟糕的方式来做。我真的无法想出任何更好的方法来做它虽然。

answer1: 回答1:

If you convert to a DataFrame, this all gets a lot simpler -- you can just self-join the data back on itself and find the average. Say I have a series of data like this:

tsDF.show
date       amount
1970-01-01 10.0
1970-01-01 5.0
1970-01-01 7.0
1970-01-02 14.0
1970-01-02 13.9
1970-01-03 1.0
1970-01-03 5.0
1970-01-03 9.0
1970-01-04 9.0
1970-01-04 5.8
1970-01-04 2.8
1970-01-04 8.9
1970-01-05 8.1
1970-01-05 2.1
1970-01-05 2.78
1970-01-05 20.78

Which rolls up as:

tsDF.groupBy($"date").agg($"date", sum($"amount"), count($"date")).show
date       SUM(amount) COUNT(date)
1970-01-01 22.0        3
1970-01-02 27.9        2
1970-01-03 15.0        3
1970-01-04 26.5        4
1970-01-05 33.76       4

I then would need to create a UDF to shift the date for the join condition (note I am only using a 2 day window by using offset = -2):

def dateShift(myDate: java.sql.Date): java.sql.Date = {
  val offset = -2;
  val cal = Calendar.getInstance;
  cal.setTime(myDate);
  cal.add(Calendar.DATE, offset);
  new java.sql.Date(cal.getTime.getTime)
}
val udfDateShift = udf[java.sql.Date,java.sql.Date](dateShift)

And then I could easily find a 2-day rolling average like this:

val windowDF = tsDF.select($"date")
  .groupBy($"date")
  .agg($"date")
  .join(
    tsDF.select($"date" as "r_date", $"amount" as "r_amount"),
    $"r_date" > udfDateShift($"date") and $"r_date" <= $"date"
  )
  .groupBy($"date")
  .agg($"date",avg($"r_amount") as "2 day avg amount / record")

val windowDF.show
date       2 day avg amount / record
1970-01-01 7.333333333333333
1970-01-02 9.98
1970-01-03 8.58
1970-01-04 5.928571428571429
1970-01-05 7.5325

While this isn't exactly what you were trying to do, you see how you can use a DataFrame self-join to extract running averages from a data set. Hope you found this helpful.

如果你将一个数据框,这一切都变得简单了很多,你可以自行加入数据回到本身并求其平均值。说我有一系列这样的数据:

tsDF.show
date       amount
1970-01-01 10.0
1970-01-01 5.0
1970-01-01 7.0
1970-01-02 14.0
1970-01-02 13.9
1970-01-03 1.0
1970-01-03 5.0
1970-01-03 9.0
1970-01-04 9.0
1970-01-04 5.8
1970-01-04 2.8
1970-01-04 8.9
1970-01-05 8.1
1970-01-05 2.1
1970-01-05 2.78
1970-01-05 20.78

卷起来:

tsDF.groupBy($"date").agg($"date", sum($"amount"), count($"date")).show
date       SUM(amount) COUNT(date)
1970-01-01 22.0        3
1970-01-02 27.9        2
1970-01-03 15.0        3
1970-01-04 26.5        4
1970-01-05 33.76       4

然后我将需要创建一个自定义转换为加入条件的日期(注意我只使用一个2天的窗口用偏移= 2):

def dateShift(myDate: java.sql.Date): java.sql.Date = {
  val offset = -2;
  val cal = Calendar.getInstance;
  cal.setTime(myDate);
  cal.add(Calendar.DATE, offset);
  new java.sql.Date(cal.getTime.getTime)
}
val udfDateShift = udf[java.sql.Date,java.sql.Date](dateShift)

然后我可以很容易地找到一个为期两天的滚动平均这样:

val windowDF = tsDF.select($"date")
  .groupBy($"date")
  .agg($"date")
  .join(
    tsDF.select($"date" as "r_date", $"amount" as "r_amount"),
    $"r_date" > udfDateShift($"date") and $"r_date" <= $"date"
  )
  .groupBy($"date")
  .agg($"date",avg($"r_amount") as "2 day avg amount / record")

val windowDF.show
date       2 day avg amount / record
1970-01-01 7.333333333333333
1970-01-02 9.98
1970-01-03 8.58
1970-01-04 5.928571428571429
1970-01-05 7.5325

而这不正是你想做的,你知道你可以使用一个自加入帧提取数据集运行平均值。希望你发现这很有帮助。

algorithm  scala  apache-spark