Apache spark 如何将spark数据帧行展平为多个数据帧行
嗨,我有一个像这样打印的spark数据框(单行) 所以在一行中,我包装了一个数组,我想把它展平,创建一个数据帧,每个数组都有一个值,例如上面的行应该这样转换Apache spark 如何将spark数据帧行展平为多个数据帧行,apache-spark,dataframe,Apache Spark,Dataframe,嗨,我有一个像这样打印的spark数据框(单行) 所以在一行中,我包装了一个数组,我想把它展平,创建一个数据帧,每个数组都有一个值,例如上面的行应该这样转换 [abc,11918,46734,1487530800317] [abc,1233,1234,1487530800317] 所以我得到的数据帧是2行而不是1行,所以包装数组中的每个对应元素都应该放在新行中 在第一个答案后编辑1: 如果我的输入中有3个数组呢 WrappedArray(46734,1234,[abc,WrappedArray
[abc,11918,46734,1487530800317]
[abc,1233,1234,1487530800317]
所以我得到的数据帧是2行而不是1行,所以包装数组中的每个对应元素都应该放在新行中
在第一个答案后编辑1:
如果我的输入中有3个数组呢
WrappedArray(46734,1234,[abc,WrappedArray(11918,1233),WrappedArray(46734,1234),WrappedArray(1,2),1487530800317]
我的输出应该是
[abc,11918,46734,1,1487530800317]
[abc,1233,1234,2,1487530800317]
肯定不是最好的解决方案,但这会起作用:
case class TestFormat(a: String, b: Seq[String], c: Seq[String], d: String)
val data = Seq(TestFormat("abc", Seq("11918","1233"),
Seq("46734","1234"), "1487530800317")).toDS
val zipThem: (Seq[String], Seq[String]) => Seq[(String, String)] = _.zip(_)
val udfZip = udf(zipThem)
data.select($"a", explode(udfZip($"b", $"c")) as "tmp", $"d")
.select($"a", $"tmp._1" as "b", $"tmp._2" as "c", $"d")
.show
问题是,默认情况下,您无法确保两个序列的长度相等
可能更好的解决方案是将整个数据帧重新格式化为一个数据建模的结构,例如
root
-- a
-- d
-- records
---- b
---- c
感谢您回答@swebbo,您的回答帮助我完成了以下任务: 我这样做:
import org.apache.spark.sql.functions.{explode, udf}
import sqlContext.implicits._
val zipColumns = udf((x: Seq[Long], y: Seq[Long], z: Seq[Long]) => (x.zip(y).zip(z)) map {
case ((a,b),c) => (a,b,c)
})
val flattened = subDf.withColumn("columns", explode(zipColumns($"col3", $"col4", $"col5"))).select(
$"col1", $"col2",
$"columns._1".alias("col3"), $"columns._2".alias("col4"), $"columns._3".alias("col5"))
flattened.show
希望这是可以理解的:)谢谢你的回答,我这里有一个后续问题,如果我有3个数组[abc,WrappedArray(119181233),WrappedArray(467341234),WrappedArray(1,2),1487530800317],如果你对答案没意见,你可以投赞成票,而不是谢谢!
import org.apache.spark.sql.functions.{explode, udf}
import sqlContext.implicits._
val zipColumns = udf((x: Seq[Long], y: Seq[Long], z: Seq[Long]) => (x.zip(y).zip(z)) map {
case ((a,b),c) => (a,b,c)
})
val flattened = subDf.withColumn("columns", explode(zipColumns($"col3", $"col4", $"col5"))).select(
$"col1", $"col2",
$"columns._1".alias("col3"), $"columns._2".alias("col4"), $"columns._3".alias("col5"))
flattened.show