I wanna compute the count of triangle problem, where my input is follower, followee. I use broadcast join to compute this triangle, set edges to edges.collectAsMap()
to broadcast to all nodes.
val spark = SparkSession.builder()
.appName("Combining in Spark")
.config("spark.master", "local")
.getOrCreate()
import spark.implicits._
val edgesRDD = spark.sparkContext.textFile(args(0))
.map(line => {
val parts = line.split(",")
(parts(0).toInt, parts(1).toInt) // (follower, user)
})
.filter { case (follower, user) => follower < maxValue && user < maxValue }
val broadcastEdges = spark.sparkContext.broadcast(edgesRDD.collectAsMap())
val trianglesRDD = edgesRDD
.flatMap { case (a, b) =>
broadcastEdges.value.get(b) match {
case Some(c) if broadcastEdges.value.contains(c) && broadcastEdges.value(c) == a => Seq((a, b, c))
case _ => Seq.empty
}
}
val triangleCount = trianglesRDD.count() / 3
what could be the problem with this code?
My input data is 1,2 2,3 2,4, 3,1
. And my output should be (1,2,3) (2,1,3)
and (3,1,2)
since these 3 nodes can get circled. But I only get (2,3,1)
, whereas if my input data is 1,2 2,3 2,4 4,1
, I can get the correct outputs which are (1,2,4) (2,1,4) (2,4,1)
.