找到你要的答案

Q:How to read a nested collection in Spark

Q:如何读取火花中的嵌套集合

I have a parquet table with one of the columns being

, array<struct<col1,col2,..colN>>

Can run queries against this table in Hive using LATERAL VIEW syntax.

How to read this table into an RDD, and more importantly how to filter, map etc this nested collection in Spark?

Could not find any references to this in Spark documentation. Thanks in advance for any information!

ps. Felt might be helpful to give some stats on the table. Number of columns in main table ~600. Number of rows ~200m. Number of "columns" in nested collection ~10. Avg number of records in nested collection ~35.

我和一个被列有实木复合地板表

阵列结构,<;<;COL1,COL2,.. ColN >;>;

使用横向视图语法可以在蜂巢中运行对该表的查询。

如何阅读此表为一盘,更重要的是如何过滤、地图等这套收集火花?

在火花文档中找不到任何引用。提前感谢任何信息!

ps. Felt might be helpful to give some stats on the table. Number of columns in main table ~600. Number of rows ~200m. Number of "columns" in nested collection ~10. Avg number of records in nested collection ~35.

answer1: 回答1:

There is no magic in the case of nested collection. Spark will handle the same way a RDD[(String, String)] and a RDD[(String, Seq[String])].

Reading such nested collection from Parquet files can be tricky, though.

Let's take an example from the spark-shell (1.3.1):

scala> import sqlContext.implicits._
import sqlContext.implicits._

scala> case class Inner(a: String, b: String)
defined class Inner

scala> case class Outer(key: String, inners: Seq[Inner])
defined class Outer

Write the parquet file:

scala> val outers = sc.parallelize(List(Outer("k1", List(Inner("a", "b")))))
outers: org.apache.spark.rdd.RDD[Outer] = ParallelCollectionRDD[0] at parallelize at <console>:25

scala> outers.toDF.saveAsParquetFile("outers.parquet")

Read the parquet file:

scala> import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.catalyst.expressions.Row

scala> val dataFrame = sqlContext.parquetFile("outers.parquet")
dataFrame: org.apache.spark.sql.DataFrame = [key: string, inners: array<struct<a:string,b:string>>]   

scala> val outers = dataFrame.map { row =>
     |   val key = row.getString(0)
     |   val inners = row.getAs[Seq[Row]](1).map(r => Inner(r.getString(0), r.getString(1)))
     |   Outer(key, inners)
     | }
outers: org.apache.spark.rdd.RDD[Outer] = MapPartitionsRDD[8] at map at DataFrame.scala:848

The important part is row.getAs[Seq[Row]](1). The internal representation of a nested sequence of struct is ArrayBuffer[Row], you could use any super-type of it instead of Seq[Row]. The 1 is the column index in the outer row. I used the method getAs here but there are alternatives in the latest versions of Spark. See the source code of the Row trait.

Now that you have a RDD[Outer], you can apply any wanted transformation or action.

// Filter the outers
outers.filter(_.inners.nonEmpty)

// Filter the inners
outers.map(outer => outer.copy(inners = outer.inners.filter(_.a == "a")))

Note that we used the spark-SQL library only to read the parquet file. You could for example select only the wanted columns directly on the DataFrame, before mapping it to a RDD.

dataFrame.select('col1, 'col2).map { row => ... }

在嵌套集合的情况下没有魔法。火花会处理一个RDD [一样(字符串,字符串)]和[(RDD字符串,SEQ [字符串])]。

阅读这样的嵌套集合从地板的文件可能会非常棘手,但。

让我们从火花壳举个例子(1.3.1):

scala> import sqlContext.implicits._
import sqlContext.implicits._

scala> case class Inner(a: String, b: String)
defined class Inner

scala> case class Outer(key: String, inners: Seq[Inner])
defined class Outer

写拼花文件:

scala> val outers = sc.parallelize(List(Outer("k1", List(Inner("a", "b")))))
outers: org.apache.spark.rdd.RDD[Outer] = ParallelCollectionRDD[0] at parallelize at <console>:25

scala> outers.toDF.saveAsParquetFile("outers.parquet")

看地板的文件:

scala> import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.catalyst.expressions.Row

scala> val dataFrame = sqlContext.parquetFile("outers.parquet")
dataFrame: org.apache.spark.sql.DataFrame = [key: string, inners: array<struct<a:string,b:string>>]   

scala> val outers = dataFrame.map { row =>
     |   val key = row.getString(0)
     |   val inners = row.getAs[Seq[Row]](1).map(r => Inner(r.getString(0), r.getString(1)))
     |   Outer(key, inners)
     | }
outers: org.apache.spark.rdd.RDD[Outer] = MapPartitionsRDD[8] at map at DataFrame.scala:848

重要的是行。木屐[行] [有益](1)。一个嵌套结构的内部表示是ArrayBuffer [行],你可以使用任何超类而不是序列[行]。1是外列中的列索引。我使用的方法在这里但木屐火花的最新版本有替代品。见行性状的源代码。

现在你有一个RDD [外],你可以将任何想要转变或行动。

// Filter the outers
outers.filter(_.inners.nonEmpty)

// Filter the inners
outers.map(outer => outer.copy(inners = outer.inners.filter(_.a == "a")))

注意我们使用的火花SQL库只看地板的文件。比如你可以只选择需要的列直接在数据框,它映射到一个RDD之前。

dataFrame.select('col1, 'col2).map { row => ... }
answer2: 回答2:

I'll give a Python-based answer since that's what I'm using. I think Scala has something similar.

The explode function was added in Spark 1.4.0 to handle nested arrays in DataFrames, according to the Python API docs.

Create a test dataframe:

from pyspark.sql import Row

df = sqlContext.createDataFrame([Row(a=1, intlist=[1,2,3]), Row(a=2, intlist=[4,5,6])])
df.show()

## +-+--------------------+
## |a|             intlist|
## +-+--------------------+
## |1|ArrayBuffer(1, 2, 3)|
## |2|ArrayBuffer(4, 5, 6)|
## +-+--------------------+

Use explode to flatten the list column:

from pyspark.sql.functions import explode

df.select(df.a, explode(df.intlist)).show()

## +-+---+
## |a|_c0|
## +-+---+
## |1|  1|
## |1|  2|
## |1|  3|
## |2|  4|
## |2|  5|
## |2|  6|
## +-+---+

我会给一个基于Python的回答因为那是我用。我认为斯卡拉有类似的东西。

爆炸功能火花1.4.0添加到数据帧处理嵌套数组,根据Python API文档。

创建一个测试帧:

from pyspark.sql import Row

df = sqlContext.createDataFrame([Row(a=1, intlist=[1,2,3]), Row(a=2, intlist=[4,5,6])])
df.show()

## +-+--------------------+
## |a|             intlist|
## +-+--------------------+
## |1|ArrayBuffer(1, 2, 3)|
## |2|ArrayBuffer(4, 5, 6)|
## +-+--------------------+

使用爆炸使列表列变平:

from pyspark.sql.functions import explode

df.select(df.a, explode(df.intlist)).show()

## +-+---+
## |a|_c0|
## +-+---+
## |1|  1|
## |1|  2|
## |1|  3|
## |2|  4|
## |2|  5|
## |2|  6|
## +-+---+
answer3: 回答3:

Another approach would be using pattern matching like this:

val rdd: RDD[(String, List[(String, String)]] = dataFrame.map(_.toSeq.toList match { 
  case List(key: String, inners: Seq[Row]) => key -> inners.map(_.toSeq.toList match {
    case List(a:String, b: String) => (a, b)
  }).toList
})

You can pattern match directly on Row but it is likely to fail for a few reasons.

另一种方法是使用像这样的模式匹配:

val rdd: RDD[(String, List[(String, String)]] = dataFrame.map(_.toSeq.toList match { 
  case List(key: String, inners: Seq[Row]) => key -> inners.map(_.toSeq.toList match {
    case List(a:String, b: String) => (a, b)
  }).toList
})

您可以直接在行模式匹配,但它可能会失败的原因有几个。

answer4: 回答4:

Above answers are all great answers and tackle this question from different sides; Spark SQL is also quite useful way to access nested data.

Here's example how to use explode() in SQL directly to query nested collection.

SELECT hholdid, tsp.person_seq_no 
FROM (  SELECT hholdid, explode(tsp_ids) as tsp 
        FROM disc_mrt.unified_fact uf
     )

tsp_ids is a nested of structs, which has many attributes, including person_seq_no which I'm selecting in the outer query above.

Above was tested in Spark 2.0. I did a small test and it doesn't work in Spark 1.6. This question was asked when Spark 2 wasn't around, so this answer adds nicely to the list of available options to deal with nested structures.

Noticable not resolved JIRAs on explode() for SQL access:

以上答案都是很好的回答和解决这个问题,从不同的侧面;火花SQL也访问嵌套的数据相当有用的方法。

这里是如何使用explode() SQL直接查询嵌套集合。

SELECT hholdid, tsp.person_seq_no 
FROM (  SELECT hholdid, explode(tsp_ids) as tsp 
        FROM disc_mrt.unified_fact uf
     )

tsp_ids是一个嵌套的结构,其中有许多属性,包括person_seq_no我在外查询选择。

以上测试的火花2。我做了一个小测试,它不工作在星火1.6。这个问题是问星火2时不在身边,所以这个答案很好地添加到列表的可用选项处理嵌套结构。

明显不是解决SQL访问explode() jiras:

hadoop  hive  apache-spark  parquet