question related spark broadcast join with RDD and broadcast RDD.collectAsMap() while doing triangle computation

I wanna compute the count of triangle problem, where my input is follower, followee. I use broadcast join to compute this triangle, set edges to edges.collectAsMap() to broadcast to all nodes.

val spark = SparkSession.builder()
  .appName("Combining in Spark")
  .config("spark.master", "local")
  .getOrCreate()

import spark.implicits._
val edgesRDD = spark.sparkContext.textFile(args(0))
  .map(line => {
    val parts = line.split(",")
    (parts(0).toInt, parts(1).toInt)  // (follower, user)
  })
  .filter { case (follower, user) => follower < maxValue && user < maxValue }

val broadcastEdges = spark.sparkContext.broadcast(edgesRDD.collectAsMap())

val trianglesRDD = edgesRDD
  .flatMap { case (a, b) =>
    broadcastEdges.value.get(b) match {
      case Some(c) if broadcastEdges.value.contains(c) && broadcastEdges.value(c)             == a => Seq((a, b, c))
      case _ => Seq.empty
    }
  }

val triangleCount = trianglesRDD.count() / 3

what could be the problem with this code?

My input data is 1,2 2,3 2,4, 3,1. And my output should be (1,2,3) (2,1,3) and (3,1,2) since these 3 nodes can get circled. But I only get (2,3,1), whereas if my input data is 1,2 2,3 2,4 4,1, I can get the correct outputs which are (1,2,4) (2,1,4) (2,4,1).

Leave a Comment