¿Cómo crear un DataFrame vacío con un esquema especificado?

Quiero crear un DataFrame con un esquema especificado en Scala. He intentado utilizar JSON leer (me refiero a la lectura de archivos vacíos), pero no creo que eso es la mejor práctica.

Supongamos que desea un marco de datos con el siguiente esquema:

idioma: ninguno -->

root
 |-- k: string (nullable = true)
 |-- v: integer (nullable = false)

Basta con definir el esquema para un marco de datos y utilizar RDD[Row] vacío:

lenguaje: scala -->

import org.apache.spark.sql.types.{
    StructType, StructField, StringType, IntegerType}
import org.apache.spark.sql.Row

val schema = StructType(
    StructField("k", StringType, true) ::
    StructField("v", IntegerType, false) :: Nil)

// Spark < 2.0
// sqlContext.createDataFrame(sc.emptyRDD[Row], schema) 
spark.createDataFrame(sc.emptyRDD[Row], schema)

El equivalente en PySpark es casi idéntico:

lenguaje: python -->

from pyspark.sql.types import StructType, StructField, IntegerType, StringType

schema = StructType([
    StructField("k", StringType(), True), StructField("v", IntegerType(), False)
])

# or df = sc.parallelize([]).toDF(schema)

# Spark < 2.0 
# sqlContext.createDataFrame([], schema)
df = spark.createDataFrame([], schema)

Uso de codificadores implícitos (sólo Scala) con tipos Product como Tuple:

lenguaje: scala -->

import spark.implicits._

Seq.empty[(String, Int)].toDF("k", "v")

o case class:

lenguaje: scala -->

case class KV(k: String, v: Int)

Seq.empty[KV].toDF

o

spark.emptyDataset[KV].toDF
Comentarios (0)

A partir de Spark 2.0.0, puedes hacer lo siguiente.

Case Class

Definamos una clase de caso Persona:

scala> case class Person(id: Int, name: String)
defined class Person

Importar spark SparkSession implícito Encoders:

scala> import spark.implicits._
import spark.implicits._

Y utilizar SparkSession para crear un Dataset[Person] vacío:

scala> spark.emptyDataset[Person]
res0: org.apache.spark.sql.Dataset[Person] = [id: int, name: string]

Esquema DSL

También puedes usar un Esquema "DSL" (ver Funciones de soporte para DataFrames en org.apache.spark.sql.ColumnName).

scala> val id = $"id".int
id: org.apache.spark.sql.types.StructField = StructField(id,IntegerType,true)

scala> val name = $"name".string
name: org.apache.spark.sql.types.StructField = StructField(name,StringType,true)

scala> import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructType

scala> val mySchema = StructType(id :: name :: Nil)
mySchema: org.apache.spark.sql.types.StructType = StructType(StructField(id,IntegerType,true), StructField(name,StringType,true))

scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row

scala> val emptyDF = spark.createDataFrame(sc.emptyRDD[Row], mySchema)
emptyDF: org.apache.spark.sql.DataFrame = [id: int, name: string]

scala> emptyDF.printSchema
root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
Comentarios (3)
import scala.reflect.runtime.{universe => ru}
def createEmptyDataFrame[T: ru.TypeTag] =
    hiveContext.createDataFrame(sc.emptyRDD[Row],
      ScalaReflection.schemaFor(ru.typeTag[T].tpe).dataType.asInstanceOf[StructType]
    )
  case class RawData(id: String, firstname: String, lastname: String, age: Int)
  val sourceDF = createEmptyDataFrame[RawData]
Comentarios (0)