3 回答
TA贡献1951条经验 获得超3个赞
更新
Set, Seq, Map, Date, TimestampBigDecimalSQLImplicits.
@since 2.0.0Encoders.scalaSQLImplicits.scala
到底是什么问题?
SparkSessionEncoderscreateDatasetEncoder[T]Timport spark.implicits._
import spark.implicits._class MyObj(val i: Int)// ...val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
无法找到存储在数据集中的类型的编码器。导入sqlContext.Inductions支持原始类型(Int、String等)和Producttype(CASE类)。_对序列化其他类型的支持将在以后的版本中添加。
Product
import spark.implicits._case class Wrap[T](unwrap: T)class MyObj(val i: Int) // ...val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3))))
未支持的OperationException:没有为MyObj找到编码器
ProductMyObjDataset[(Int,MyObj)]MyObj
一些扩展的类 Product编译,尽管在运行时总是崩溃,而且 没有办法传递嵌套类型的自定义编码器(我无法仅为 MyObj使它知道如何编码 Wrap[MyObj]或 (Int,MyObj)).
只管用 kryo
kryo
import spark.implicits._class MyObj(val i: Int)implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj] // ...val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
import scala.reflect.ClassTagimplicit def kryoEncoder[A](implicit ct: ClassTag[A]) = org.apache.spark.sql.Encoders.kryo[A](ct)
spark-shellspark.implicits._
class MyObj(val i: Int)val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))val d2 = d1.map(d => (d.i+1,d)).alias("d2")
// mapping works fine and ..val d3 = d1.map(d => (d.i, d)).alias("d3")
// .. deals with the new typeval d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") // Boom!kryomap, filter, foreachjoind2d3
d2.printSchema// root// |-- value: binary (nullable = true)
元组的部分解
import org.apache.spark.sql.{Encoder,Encoders}import scala.reflect.ClassTagimport spark.implicits._
// we can still take advantage of all the old implicitsimplicit def single[A](implicit c: ClassTag[A]):
Encoder[A] = Encoders.kryo[A](c)implicit def tuple2[A1, A2](
implicit e1: Encoder[A1],
e2: Encoder[A2]): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2)implicit def tuple3[A1, A2, A3](
implicit e1: Encoder[A1],
e2: Encoder[A2],
e3: Encoder[A3]): Encoder[(A1,A2,A3)] = Encoders.tuple[A1,A2,A3](e1, e2, e3)// ... you can keep making theseclass MyObj(val i: Int)val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))val d2 = d1.map(d => (d.i+1,d))
.toDF("_1","_2").as[(Int,MyObj)].alias("d2")val d3 = d1.map(d => (d.i ,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d3")
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1")_1, _2"value"
d4.printSchema// root// |-- _1: struct (nullable = false)// | |-- _1: integer (nullable = true)// | |-- _2: binary (nullable = true)// |-- _2: struct (nullable = false)// | |-- _1: integer (nullable = true)// | |-- _2: binary (nullable = true)
允许我们为元组获得单独的列(因此我们可以再次加入元组,耶!) 我们可以再一次依赖于请求(所以不需要经过。) kryo(到处都是) 几乎完全向后兼容 import spark.implicits._(涉及一些重命名) 是吗? 不
让我们加入 kyro序列化二进制列,更不用说那些可能具有 将某些元组列重命名为“value”(如果有必要的话,可以通过转换将其撤消),会产生令人不快的副作用。 .toDF,指定新的列名,并将其转换回DataSet-模式名称似乎通过联接(最需要它们的地方)被保留。
一般类的部分解
kryo
MyObjInt, java.util.UUIDSet[String]kryoStringUUID
class MyObj(val i: Int, val u: java.util.UUID, val s: Set[String])// alias for the type to convert to and fromtype MyObjEncoded = (Int, String, Set[String])// implicit conversionsimplicit def toEncoded(o: MyObj): MyObjEncoded = (o.i, o.u.toString, o.s)implicit def fromEncoded(e: MyObjEncoded): MyObj = new MyObj(e._1, java.util.UUID.fromString(e._2), e._3)
val d = spark.createDataset(Seq[MyObjEncoded](
new MyObj(1, java.util.UUID.randomUUID, Set("foo")),
new MyObj(2, java.util.UUID.randomUUID, Set("bar")))).toDF("i","u","s").as[MyObjEncoded]d.printSchema// root// |-- i: integer (nullable = false)// |-- u: string (nullable = true)// |-- s: binary (nullable = true)
TA贡献2012条经验 获得超12个赞
假设您想使用自定义Enum:
trait CustomEnum { def value:String }case object Foo extends CustomEnum { val value = "F" }case object Bar extends CustomEnum
{ val value = "B" }object CustomEnum {
def fromString(str:String) = Seq(Foo, Bar).find(_.value == str).get}// First define a UDT class for it:class CustomEnumUDT extends UserDefinedType[CustomEnum] {
override def sqlType: DataType = org.apache.spark.sql.types.StringType
override def serialize(obj: CustomEnum): Any = org.apache.spark.unsafe.types.UTF8String.fromString(obj.value)
// Note that this will be a UTF8String type
override def deserialize(datum: Any): CustomEnum = CustomEnum.fromString(datum.toString)
override def userClass: Class[CustomEnum] = classOf[CustomEnum]}// Then Register the UDT Class!
// NOTE: you have to put this file into the org.apache.spark package!UDTRegistration.register(classOf[CustomEnum].
getName, classOf[CustomEnumUDT].getName)case class UsingCustomEnum(id:Int, en:CustomEnum)val seq = Seq( UsingCustomEnum(1, Foo), UsingCustomEnum(2, Bar), UsingCustomEnum(3, Foo)).toDS()seq.filter(_.en == Foo).show()println(seq.collect())
假设您想使用多态记录:
trait CustomPolycase class FooPoly(id:Int) extends CustomPolycase class BarPoly(value:String, secondValue:Long) extends CustomPoly
case class UsingPoly(id:Int, poly:CustomPoly)Seq(
UsingPoly(1, new FooPoly(1)),
UsingPoly(2, new BarPoly("Blah", 123)),
UsingPoly(3, new FooPoly(1))).toDS
polySeq.filter(_.poly match {
case FooPoly(value) => value == 1
case _ => false}).show()class CustomPolyUDT extends UserDefinedType[CustomPoly] {
val kryo = new Kryo()
override def sqlType: DataType = org.apache.spark.sql.types.BinaryType
override def serialize(obj: CustomPoly): Any = {
val bos = new ByteArrayOutputStream()
val oos = new ObjectOutputStream(bos)
oos.writeObject(obj)
bos.toByteArray }
override def deserialize(datum: Any): CustomPoly = {
val bis = new ByteArrayInputStream(datum.asInstanceOf[Array[Byte]])
val ois = new ObjectInputStream(bis)
val obj = ois.readObject()
obj.asInstanceOf[CustomPoly]
}
override def userClass: Class[CustomPoly] = classOf[CustomPoly]}// NOTE: The file you do this in has to be inside of the org.apache.spark package!UDTRegistration.register(classOf[CustomPoly]. getName, classOf[CustomPolyUDT].getName)
// As shown above:case class UsingPoly(id:Int, poly:CustomPoly)Seq(
UsingPoly(1, new FooPoly(1)),
UsingPoly(2, new BarPoly("Blah", 123)),
UsingPoly(3, new FooPoly(1))).toDS
polySeq.filter(_.poly match {
case FooPoly(value) => value == 1
case _ => false}).show()添加回答
举报
