亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

如何在數據集中存儲自定義對象?

如何在數據集中存儲自定義對象?

婷婷同學_ 2019-06-19 15:45:49
如何在數據集中存儲自定義對象?根據介紹星火數據集:在我們期待Spark2.0的同時,我們計劃對數據集進行一些令人興奮的改進,特別是:.自定義編碼器-盡管我們目前為各種各樣的類型自動生成編碼器,但我們希望為自定義對象打開一個API。并嘗試將自定義類型存儲在Dataset導致以下錯誤:無法找到存儲在數據集中的類型的編碼器。導入sqlContext.Inductions支持原始類型(Int、String等)和Producttype(CASE類)。_對序列化其他類型的支持將在以后的版本中添加?;颍寒惓#何凑业骄幋a器用于.有什么解決辦法嗎?注意,這個問題僅作為CommunityWiki答案的入口點存在。隨時更新/改進問題和答案。
查看完整描述

3 回答

?
飲歌長嘯

TA貢獻1951條經驗 獲得超3個贊

更新

這個答案仍然是有效和信息豐富的,盡管現在情況更好,自從2.2/2.3,這增加了內置編碼器的支持SetSeqMapDateTimestamp,和BigDecimal..如果您堅持只使用case類和通常的Scala類型來創建類型,那么應該可以只使用SQLImplicits.


不幸的是,在這方面幾乎沒有增加任何幫助。尋覓@since 2.0.0在……里面Encoders.scalaSQLImplicits.scala查找主要與原始類型有關的內容(以及對Case類的一些調整)。所以,首先要說的是:目前對自定義類編碼器沒有真正好的支持。..這樣的話,下面是一些我們可以期望做得很好的技巧,考慮到我們目前所擁有的一切。作為一個預先的免責聲明:這不會完美的工作,我會盡我最大的努力使所有的限制清楚和預先。

到底是什么問題?

當您想創建數據集時,SPark“需要一個編碼器(將T類型的JVM對象與內部SparkSQL表示形式相互轉換),該編碼器通常是通過從SparkSession,也可以通過調用Encoders“(摘自醫生createDataset)。編碼器將采用以下形式Encoder[T]哪里T是您正在編碼的類型。第一個建議是增加import spark.implicits._(這給了你這些(第二個建議是顯式地傳遞隱式編碼器,使用這,這個一組編碼器相關功能。

沒有普通類可用的編碼器,所以

import spark.implicits._class MyObj(val i: Int)// ...val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))

將給出以下隱式相關編譯時錯誤:

無法找到存儲在數據集中的類型的編碼器。導入sqlContext.Inductions支持原始類型(Int、String等)和Producttype(CASE類)。_對序列化其他類型的支持將在以后的版本中添加。

但是,如果將剛才用于在某個類中獲取上述錯誤的任何類型包裝,則Product,錯誤會被延遲到運行時,所以

import spark.implicits._case class Wrap[T](unwrap: T)class MyObj(val i: Int)
// ...val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3))))

編譯很好,但是在運行時會失敗。

未支持的OperationException:沒有為MyObj找到編碼器

這樣做的原因是,實施者SPark創建的Induces實際上只在運行時(通過Scala關系)生成。在本例中,在編譯時所有的SPark檢查都是最外層的類擴展Product(所有的CASE類都這樣做),并且只在運行時才意識到它仍然不知道如何處理MyObj(如果我試圖創建一個Dataset[(Int,MyObj)]-星火等待運行時繼續運行MyObj)。這些是迫切需要解決的核心問題:

  • 一些擴展的類

    Product

    編譯,盡管在運行時總是崩潰,而且
  • 沒有辦法傳遞嵌套類型的自定義編碼器(我無法僅為

    MyObj

    使它知道如何編碼

    Wrap[MyObj]

    (Int,MyObj)).

只管用kryo

每個人建議的解決方案是使用kryo編碼器。

import spark.implicits._class MyObj(val i: Int)implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
// ...val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))

不過,這很快就會變得很乏味。特別是當您的代碼正在操作各種數據集、加入、分組等時。那么,為什么不直接默示這一切都是自動完成的呢?

import scala.reflect.ClassTagimplicit def kryoEncoder[A](implicit ct: ClassTag[A]) = 
  org.apache.spark.sql.Encoders.kryo[A](ct)

現在,我似乎可以做任何我想做的事情(下面的示例在spark-shell哪里spark.implicits._自動導入)

class MyObj(val i: Int)val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))val d2 = d1.map(d => (d.i+1,d)).alias("d2")
 // mapping works fine and ..val d3 = d1.map(d => (d.i,  d)).alias("d3") 
 // .. deals with the new typeval d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") // Boom!

或者幾乎。問題是使用kryo導致SPark只將DataSet中的每一行存儲為平面二進制對象。為mapfilterforeach這就足夠了,但是對于像join,星火確實需要將它們分隔成列。檢查架構d2d3,您可以看到只有一個二進制列:

d2.printSchema// root//  |-- value: binary (nullable = true)

元組的部分解

因此,在Scala中使用InstitucesinScala的魔力(更多在6.26.3過載分辨率),我可以為自己做一系列能做得盡可能好的事情,至少對于元組來說是這樣,并且可以很好地與現有的Institutions一起工作:

import org.apache.spark.sql.{Encoder,Encoders}import scala.reflect.ClassTagimport spark.implicits._  
// we can still take advantage of all the old implicitsimplicit def single[A](implicit c: ClassTag[A]):
 Encoder[A] = Encoders.kryo[A](c)implicit def tuple2[A1, A2](
  implicit e1: Encoder[A1],
           e2: Encoder[A2]): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2)implicit def tuple3[A1, A2, A3](
  implicit e1: Encoder[A1],
           e2: Encoder[A2],
           e3: Encoder[A3]): Encoder[(A1,A2,A3)] = Encoders.tuple[A1,A2,A3](e1, e2, e3)// ... you can keep making these

然后,帶著這些請求,我可以讓上面的例子起作用,盡管用了一些列重命名。

class MyObj(val i: Int)val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))val d2 = d1.map(d => (d.i+1,d))
.toDF("_1","_2").as[(Int,MyObj)].alias("d2")val d3 = d1.map(d => (d.i  ,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d3")
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1")

我還沒有弄清楚如何獲得預期的元組名稱(_1_2.)默認情況下不用重命名-如果有人想玩這個游戲,這,這個名字"value"被介紹和這,這個通常添加元組名稱的位置。但是,關鍵是我現在有了一個很好的結構化模式:

d4.printSchema// root//  |-- _1: struct (nullable = false)//  |    |-- _1: integer (nullable = true)//  |   
 |-- _2: binary (nullable = true)//  |-- _2: struct (nullable = false)//  |    |-- _1: integer (nullable = true)//  | 
    |-- _2: binary (nullable = true)

總之,這個解決辦法是:

  • 允許我們為元組獲得單獨的列(因此我們可以再次加入元組,耶!)
  • 我們可以再一次依賴于請求(所以不需要經過。)

    kryo

    (到處都是)
  • 幾乎完全向后兼容

    import spark.implicits._

    (涉及一些重命名)
  • 是嗎?

    讓我們加入

    kyro

    序列化二進制列,更不用說那些可能具有
  • 將某些元組列重命名為“value”(如果有必要的話,可以通過轉換將其撤消),會產生令人不快的副作用。

    .toDF

    ,指定新的列名,并將其轉換回DataSet-模式名稱似乎通過聯接(最需要它們的地方)被保留。

一般類的部分解

這個不太愉快,也沒有很好的解決辦法。但是,現在我們有了上面的元組解決方案,我有一個預感-來自另一個答案的隱式轉換解決方案也不會那么痛苦,因為您可以將更復雜的類轉換為元組。然后,在創建DataSet之后,您可能會使用dataframe方法重命名這些列。如果一切順利,這是真的一個改進,因為我現在可以在類的字段上執行聯接。如果我只使用了一個平面二進制kryo序列化程序是不可能的。

下面是一個做了一些事情的例子:我有一個類MyObj其中有類型的字段。Intjava.util.UUID,和Set[String]..第一個照顧好自己。第二個,雖然我可以使用kryo如果存儲為String(自UUID這通常是我想反對的事情。第三個真正屬于二進制列。

class MyObj(val i: Int, val u: java.util.UUID, val s: Set[String])// alias for the type to convert to and fromtype MyObjEncoded = 
(Int, String, Set[String])// implicit conversionsimplicit def toEncoded(o: MyObj): MyObjEncoded = (o.i, o.u.toString, o.s)implicit 
def fromEncoded(e: MyObjEncoded): MyObj =
  new MyObj(e._1, java.util.UUID.fromString(e._2), e._3)

現在,我可以使用這個機器創建一個具有良好模式的數據集:

val d = spark.createDataset(Seq[MyObjEncoded](
  new MyObj(1, java.util.UUID.randomUUID, Set("foo")),
  new MyObj(2, java.util.UUID.randomUUID, Set("bar")))).toDF("i","u","s").as[MyObjEncoded]

模式向我展示了I列的正確名稱和前兩種情況,這兩種情況我都可以使用。

d.printSchema// root//  |-- i: integer (nullable = false)//  |-- u: string (nullable = true)//  |-- s: binary (nullable = true)


查看完整回答
反對 回復 2019-06-19
?
繁花如伊

TA貢獻2012條經驗 獲得超12個贊

您可以使用UDT注冊,然后使用案例類、元組等.所有正確的工作與您的用戶定義的類型!

假設您想使用自定義Enum:

trait CustomEnum { def value:String }case object Foo extends CustomEnum  { val value = "F" }case object Bar extends CustomEnum  
{ val value = "B" }object CustomEnum {
  def fromString(str:String) = Seq(Foo, Bar).find(_.value == str).get}

登記如下:

// First define a UDT class for it:class CustomEnumUDT extends UserDefinedType[CustomEnum] {
  override def sqlType: DataType = org.apache.spark.sql.types.StringType
  override def serialize(obj: CustomEnum): Any = org.apache.spark.unsafe.types.UTF8String.fromString(obj.value)
  // Note that this will be a UTF8String type
  override def deserialize(datum: Any): CustomEnum = CustomEnum.fromString(datum.toString)
  override def userClass: Class[CustomEnum] = classOf[CustomEnum]}// Then Register the UDT Class!
  // NOTE: you have to put this file into the org.apache.spark package!UDTRegistration.register(classOf[CustomEnum].
  getName, classOf[CustomEnumUDT].getName)

那就用它!

case class UsingCustomEnum(id:Int, en:CustomEnum)val seq = Seq(
  UsingCustomEnum(1, Foo),
  UsingCustomEnum(2, Bar),
  UsingCustomEnum(3, Foo)).toDS()seq.filter(_.en == Foo).show()println(seq.collect())

假設您想使用多態記錄:

trait CustomPolycase class FooPoly(id:Int) extends CustomPolycase class BarPoly(value:String, secondValue:Long) extends CustomPoly

..它的用法是這樣的:

case class UsingPoly(id:Int, poly:CustomPoly)Seq(
  UsingPoly(1, new FooPoly(1)),
  UsingPoly(2, new BarPoly("Blah", 123)),
  UsingPoly(3, new FooPoly(1))).toDS

polySeq.filter(_.poly match {
  case FooPoly(value) => value == 1
  case _ => false}).show()

您可以編寫一個自定義的UDT,它將所有內容編碼為字節(我在這里使用java序列化,但更好的方法可能是檢測SPark的Kryo上下文)。

首先定義UDT類:

class CustomPolyUDT extends UserDefinedType[CustomPoly] {
  val kryo = new Kryo()

  override def sqlType: DataType = org.apache.spark.sql.types.BinaryType
  override def serialize(obj: CustomPoly): Any = {
    val bos = new ByteArrayOutputStream()
    val oos = new ObjectOutputStream(bos)
    oos.writeObject(obj)

    bos.toByteArray  }
  override def deserialize(datum: Any): CustomPoly = {
    val bis = new ByteArrayInputStream(datum.asInstanceOf[Array[Byte]])
    val ois = new ObjectInputStream(bis)
    val obj = ois.readObject()
    obj.asInstanceOf[CustomPoly]
  }

  override def userClass: Class[CustomPoly] = classOf[CustomPoly]}

然后注冊:

// NOTE: The file you do this in has to be inside of the org.apache.spark package!UDTRegistration.register(classOf[CustomPoly].
getName, classOf[CustomPolyUDT].getName)

那你就可以用它了!

// As shown above:case class UsingPoly(id:Int, poly:CustomPoly)Seq(
  UsingPoly(1, new FooPoly(1)),
  UsingPoly(2, new BarPoly("Blah", 123)),
  UsingPoly(3, new FooPoly(1))).toDS

polySeq.filter(_.poly match {
  case FooPoly(value) => value == 1
  case _ => false}).show()


查看完整回答
反對 回復 2019-06-19
  • 3 回答
  • 0 關注
  • 694 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號