Q：Apache的火花处理时态RDDS滑动窗口

I've been working quite a lot with Apache Spark the last few months but now I have received a pretty difficult task, to compute average/minimum/maximum etcetera on a sliding window over a paired RDD where the Key component is a date tag and the value component is a matrix. So each aggregation function should also return a matrix, where for each cell the average for all of that cell in the time period is averaged.

I want to be able to say that I want the average for every 7 days, with a sliding window of one day. The sliding window movement unit is always one, and then the unit of the size of the window (so if it's every 12 weeks, the window movement unit is 1).

My initial thought now is to simply iterate, if we want an average per X days, X times, and for each time just group the elements by it's date, with an offset.

So if we have this scenario:

Days: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15

Matrices: A B C D E F G H I J K L M N O

And we want the average per 5 days, I will iterate 5 times and show the grouping here:

First iteration:

Group 1: (1, A) (2, B) (3, C) (4, D) (5, E)

Group 2: (6, F) (7, G) (8, H) (9, I) (10, J)

Group 3: (11, K) (12, L) (13, M) (14, N) (15, O)

Second iteration:

Group 1: (2, B) (3, C) (4, D) (5, E) (6, F)

Group 2: (7, G) (8, H) (9, I) (10, J), (11, K)

Group 3: (12, L) (13, M) (14, N) (15, O)

Etcetera, and for each group, I have to do a fold/reduce procedure to get the average.

However as you might imagine, this is pretty slow and probably a rather bad way to do it. I can't really figure out any better way to do it though.

Etcetera，并为每个组，我要做一个折叠/减少程序得到的平均。

If you convert to a DataFrame, this all gets a lot simpler -- you can just self-join the data back on itself and find the average. Say I have a series of data like this:

``````tsDF.show
date       amount
1970-01-01 10.0
1970-01-01 5.0
1970-01-01 7.0
1970-01-02 14.0
1970-01-02 13.9
1970-01-03 1.0
1970-01-03 5.0
1970-01-03 9.0
1970-01-04 9.0
1970-01-04 5.8
1970-01-04 2.8
1970-01-04 8.9
1970-01-05 8.1
1970-01-05 2.1
1970-01-05 2.78
1970-01-05 20.78
``````

Which rolls up as:

``````tsDF.groupBy(\$"date").agg(\$"date", sum(\$"amount"), count(\$"date")).show
date       SUM(amount) COUNT(date)
1970-01-01 22.0        3
1970-01-02 27.9        2
1970-01-03 15.0        3
1970-01-04 26.5        4
1970-01-05 33.76       4
``````

I then would need to create a UDF to shift the date for the join condition (note I am only using a 2 day window by using offset = -2):

``````def dateShift(myDate: java.sql.Date): java.sql.Date = {
val offset = -2;
val cal = Calendar.getInstance;
cal.setTime(myDate);
new java.sql.Date(cal.getTime.getTime)
}
val udfDateShift = udf[java.sql.Date,java.sql.Date](dateShift)
``````

And then I could easily find a 2-day rolling average like this:

``````val windowDF = tsDF.select(\$"date")
.groupBy(\$"date")
.agg(\$"date")
.join(
tsDF.select(\$"date" as "r_date", \$"amount" as "r_amount"),
\$"r_date" > udfDateShift(\$"date") and \$"r_date" <= \$"date"
)
.groupBy(\$"date")
.agg(\$"date",avg(\$"r_amount") as "2 day avg amount / record")

val windowDF.show
date       2 day avg amount / record
1970-01-01 7.333333333333333
1970-01-02 9.98
1970-01-03 8.58
1970-01-04 5.928571428571429
1970-01-05 7.5325
``````

While this isn't exactly what you were trying to do, you see how you can use a DataFrame self-join to extract running averages from a data set. Hope you found this helpful.

``````tsDF.show
date       amount
1970-01-01 10.0
1970-01-01 5.0
1970-01-01 7.0
1970-01-02 14.0
1970-01-02 13.9
1970-01-03 1.0
1970-01-03 5.0
1970-01-03 9.0
1970-01-04 9.0
1970-01-04 5.8
1970-01-04 2.8
1970-01-04 8.9
1970-01-05 8.1
1970-01-05 2.1
1970-01-05 2.78
1970-01-05 20.78
``````

``````tsDF.groupBy(\$"date").agg(\$"date", sum(\$"amount"), count(\$"date")).show
date       SUM(amount) COUNT(date)
1970-01-01 22.0        3
1970-01-02 27.9        2
1970-01-03 15.0        3
1970-01-04 26.5        4
1970-01-05 33.76       4
``````

``````def dateShift(myDate: java.sql.Date): java.sql.Date = {
val offset = -2;
val cal = Calendar.getInstance;
cal.setTime(myDate);
new java.sql.Date(cal.getTime.getTime)
}
val udfDateShift = udf[java.sql.Date,java.sql.Date](dateShift)
``````

``````val windowDF = tsDF.select(\$"date")
.groupBy(\$"date")
.agg(\$"date")
.join(
tsDF.select(\$"date" as "r_date", \$"amount" as "r_amount"),
\$"r_date" > udfDateShift(\$"date") and \$"r_date" <= \$"date"
)
.groupBy(\$"date")
.agg(\$"date",avg(\$"r_amount") as "2 day avg amount / record")

val windowDF.show
date       2 day avg amount / record
1970-01-01 7.333333333333333
1970-01-02 9.98
1970-01-03 8.58
1970-01-04 5.928571428571429
1970-01-05 7.5325
``````

algorithm  scala  apache-spark