Remove duplicate keys from Spark Scala

+1 vote
asked Jul 27, 2015 by user2200660

I am using spark 1.2 with scala and have a pair RDD with (String, String). A sample record looks like:

<Key, value>
id_1, val_1_1; val_1_2
id_2, val_2_1; val_2_2
id_3, val_3_1; val_3_2
id_1, val_4_1; val_4_2

I just want to remove all the records with duplicate key, so in the above example, fourth record will be removed because id_1 is a duplicate key.

Pls help.

Thanks.

2 Answers

+11 votes
answered Jul 28, 2015 by jean-logeart

You can use reduceByKey:

val rdd: RDD[(K, V)] = // ...
val res: RDD[(K, V)] = rdd.reduceByKey((v1, v2) => v1)
+1 vote
answered Nov 8 by mattinbits

If it necessary to select always the first entry for a given key, then, combining @JeanLogeart answer with the comment from @Paul,

import org.apache.spark.{SparkContext, SparkConf}
val data = List( ("id_1", "val_1_1; val_1_2"), ("id_2", "val_2_1; val_2_2"), ("id_3", "val_3_1; val_3_2"), ("id_1", "val_4_1; val_4_2") )
val conf = new SparkConf().setMaster("local").setAppName("App")
val sc = new SparkContext(conf)
val dataRDD = sc.parallelize(data)
val resultRDD = dataRDD.zipWithIndex.map{ case ((key, value), index) => (key, (value, index))
}.reduceByKey((v1,v2) => if(v1._2 < v2._2) v1 else v2).mapValues(_._1)
resultRDD.collect().foreach(v => println(v))
sc.stop()
Welcome to Q&A, where you can ask questions and receive answers from other members of the community.
...