How to join 2 Datasets in a way similar to RDD?
Problem Description:
I have 2 data frames
val a = Map(1 -> 2, 3 -> 5).toSeq.toDF("key", "value")
val b = Map(1 -> 4, 2 -> 5).toSeq.toDF("key", "value")
I join them on the column key
a.join(b, "key").show(false)
+---+-----+-----+
|key|value|value|
+---+-----+-----+
|1 |2 |4 |
+---+-----+-----+
I would instead like the result
+---+-----+
|key|value|
+---+-----+
|1 |{2, 4}
+---+-----
I would like the column value
to be an array that aggragates the values. Is there an idiomatic way to do this? The join
on 2 RDDs does this by default.
Solution – 1
To go the fully typed route, you could go the Dataset route:
case class MyKeyValuePair(key: String, value: String)
case class MyOutput(key: String, values: Seq[String])
val a = Map(1 -> 2, 3 -> 5).toSeq.toDF("key", "value").as[MyKeyValuePair]
val b = Map(1 -> 4, 2 -> 5).toSeq.toDF("key", "value").as[MyKeyValuePair]
val output = a.joinWith(b, a("key") === b("key"), "inner").map{
case ( pair1, pair2 ) => MyOutput(pair1.key, Seq(pair1.value, pair2.value))
}
output.show
+---+------+
|key|values|
+---+------+
| 1|[2, 4]|
+---+------+
This gives you fine grained control over what you want your output to exactly look like, and gives you compile-time safety. You just have to define some case classes to type your datasets with.
Hope this helps!