Belirtilen bir şema ile boş bir DataFrame nasıl oluşturulur?

Scala'da belirli bir şema ile DataFrame üzerinde oluşturmak istiyorum. JSON read kullanmayı denedim (yani boş dosyayı okumayı) ama bunun en iyi uygulama olduğunu sanmıyorum.

Aşağıdaki şemaya sahip bir veri çerçevesi istediğinizi varsayalım:

root
 |-- k: string (nullable = true)
 |-- v: integer (nullable = false)

Basitçe bir veri çerçevesi için şema tanımlar ve boş RDD[Satır] kullanırsınız:

import org.apache.spark.sql.types.{
    StructType, StructField, StringType, IntegerType}
import org.apache.spark.sql.Row

val schema = StructType(
    StructField("k", StringType, true) ::
    StructField("v", IntegerType, false) :: Nil)

// Spark < 2.0
// sqlContext.createDataFrame(sc.emptyRDD[Row], schema) 
spark.createDataFrame(sc.emptyRDD[Row], schema)

PySpark eşdeğeri neredeyse aynıdır:

from pyspark.sql.types import StructType, StructField, IntegerType, StringType

schema = StructType([
    StructField("k", StringType(), True), StructField("v", IntegerType(), False)
])

# or df = sc.parallelize([]).toDF(schema)

# Spark < 2.0 
# sqlContext.createDataFrame([], schema)
df = spark.createDataFrame([], schema)

Örtük kodlayıcıları (yalnızca Scala) Tuple gibi Product türleriyle kullanma:

import spark.implicits._

Seq.empty[(String, Int)].toDF("k", "v")

veya vaka sınıfı:

case class KV(k: String, v: Int)

Seq.empty[KV].toDF

veya

spark.emptyDataset[KV].toDF
Yorumlar (0)

Spark 2.0.0'dan itibaren aşağıdakileri yapabilirsiniz.

Vaka Sınıfı

Bir Person case sınıfı tanımlayalım:

scala> case class Person(id: Int, name: String)
defined class Person

SparkSession implicit Encoders öğesini içe aktarın:

scala> import spark.implicits._
import spark.implicits._

Ve boş bir Dataset[Person] oluşturmak için SparkSession kullanın:

scala> spark.emptyDataset[Person]
res0: org.apache.spark.sql.Dataset[Person] = [id: int, name: string]

Şema DSL

Ayrıca bir Şema "DSL" de kullanabilirsiniz (bkz. org.apache.spark.sql.ColumnName içinde Veri Çerçeveleri için destek fonksiyonları).

scala> val id = $"id".int
id: org.apache.spark.sql.types.StructField = StructField(id,IntegerType,true)

scala> val name = $"name".string
name: org.apache.spark.sql.types.StructField = StructField(name,StringType,true)

scala> import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructType

scala> val mySchema = StructType(id :: name :: Nil)
mySchema: org.apache.spark.sql.types.StructType = StructType(StructField(id,IntegerType,true), StructField(name,StringType,true))

scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row

scala> val emptyDF = spark.createDataFrame(sc.emptyRDD[Row], mySchema)
emptyDF: org.apache.spark.sql.DataFrame = [id: int, name: string]

scala> emptyDF.printSchema
root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
Yorumlar (3)
import scala.reflect.runtime.{universe => ru}
def createEmptyDataFrame[T: ru.TypeTag] =
    hiveContext.createDataFrame(sc.emptyRDD[Row],
      ScalaReflection.schemaFor(ru.typeTag[T].tpe).dataType.asInstanceOf[StructType]
    )
  case class RawData(id: String, firstname: String, lastname: String, age: Int)
  val sourceDF = createEmptyDataFrame[RawData]
Yorumlar (0)