找到你要的答案

Q:DataFrame.save() / sqlContext.load loses “nullable” status of schema

Q:数据框。save() / sqlcontext.load失去“图式可空”状态

I have a couple of questions based on something I am seeing on spark-shell. The easiest thing is just to show the behavior:

scala> import org.apache.spark.sql._
scala> import org.apache.spark.sql.types._
scala> val schema = StructType(Seq(StructField("foo", IntegerType, false)))
scala> val df1 = sqlContext.createDataFrame(sc.parallelize(Array(Row(1))), schema)
scala> df1.printSchema
root
 |-- foo: integer (nullable = false)
scala> df1.save("temp.parq", SaveMode.Overwrite)
scala> val df2 = sqlContext.load("temp.parq")
scala> df2.printSchema
root
 |-- foo: integer (nullable = true)

Note that the nullable property for column "foo" changed.

I ran into issues with nullable columns when I turned on spark.sql.parquet.filterPushdown and that's when I noticed this behavior.

So my questions:

  1. Do I really care about this?
  2. If yes, then how can I tell whether the change to the schema happens before or after the save to Parquet file?

我有几个问题的基础上我看到的火花壳。最简单的事情就是展示行为:

scala> import org.apache.spark.sql._
scala> import org.apache.spark.sql.types._
scala> val schema = StructType(Seq(StructField("foo", IntegerType, false)))
scala> val df1 = sqlContext.createDataFrame(sc.parallelize(Array(Row(1))), schema)
scala> df1.printSchema
root
 |-- foo: integer (nullable = false)
scala> df1.save("temp.parq", SaveMode.Overwrite)
scala> val df2 = sqlContext.load("temp.parq")
scala> df2.printSchema
root
 |-- foo: integer (nullable = true)

注意,可空属性栏“foo”改变。

我跑进可为空的列问题时,我打开spark.sql.parquet.filterpushdown然后注意到这种行为。

所以我的问题:

  1. Do I really care about this?
  2. If yes, then how can I tell whether the change to the schema happens before or after the save to Parquet file?
answer1: 回答1:

If you look at source code of new spark 1.3+, you found the next comment:

  // This is a hack. We always set nullable/containsNull/valueContainsNull to true
  // for the schema of a parquet data

for example:

    /** Returns a new base relation with the given parameters and save given data into it. */
  override def createRelation(
      sqlContext: SQLContext,
      mode: SaveMode,
      parameters: Map[String, String],
      data: DataFrame): BaseRelation = {
    val path = checkPath(parameters)
    val filesystemPath = new Path(path)
    val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
    val doInsertion = (mode, fs.exists(filesystemPath)) match {
      case (SaveMode.ErrorIfExists, true) =>
        sys.error(s"path $path already exists.")
      case (SaveMode.Append, _) | (SaveMode.Overwrite, _) | (SaveMode.ErrorIfExists, false) =>
        true
      case (SaveMode.Ignore, exists) =>
        !exists
    }

    val relation = if (doInsertion) {
      // This is a hack. We always set nullable/containsNull/valueContainsNull to true
      // for the schema of a parquet data.
      val df =
        sqlContext.createDataFrame(
          data.queryExecution.toRdd,
          data.schema.asNullable,
          needsConversion = false)
      val createdRelation =
        createRelation(sqlContext, parameters, df.schema).asInstanceOf[ParquetRelation2]
      createdRelation.insert(df, overwrite = mode == SaveMode.Overwrite)
      createdRelation
    } else {
      // If the save mode is Ignore, we will just create the relation based on existing data.
      createRelation(sqlContext, parameters)
    }

    relation
  }
}

I think that's what you're looking for. I don’t know the reason of this behaviour. But it was really surprising for me.

如果你看看源代码的新火花1.3 +,你发现下一个评论:

  // This is a hack. We always set nullable/containsNull/valueContainsNull to true
  // for the schema of a parquet data

for example:

    /** Returns a new base relation with the given parameters and save given data into it. */
  override def createRelation(
      sqlContext: SQLContext,
      mode: SaveMode,
      parameters: Map[String, String],
      data: DataFrame): BaseRelation = {
    val path = checkPath(parameters)
    val filesystemPath = new Path(path)
    val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
    val doInsertion = (mode, fs.exists(filesystemPath)) match {
      case (SaveMode.ErrorIfExists, true) =>
        sys.error(s"path $path already exists.")
      case (SaveMode.Append, _) | (SaveMode.Overwrite, _) | (SaveMode.ErrorIfExists, false) =>
        true
      case (SaveMode.Ignore, exists) =>
        !exists
    }

    val relation = if (doInsertion) {
      // This is a hack. We always set nullable/containsNull/valueContainsNull to true
      // for the schema of a parquet data.
      val df =
        sqlContext.createDataFrame(
          data.queryExecution.toRdd,
          data.schema.asNullable,
          needsConversion = false)
      val createdRelation =
        createRelation(sqlContext, parameters, df.schema).asInstanceOf[ParquetRelation2]
      createdRelation.insert(df, overwrite = mode == SaveMode.Overwrite)
      createdRelation
    } else {
      // If the save mode is Ignore, we will just create the relation based on existing data.
      createRelation(sqlContext, parameters)
    }

    relation
  }
}

我想这就是你要找的。我不知道这种行为的原因。但对我来说真的很奇怪。

apache-spark