How to find common pairs irrespective of their order in Pyspark RDD?

I want to find out the pair of who have contacted with one another. Following is the data:

Input is
K-\> M, H 
M-\> K, E
H-\> F
B-\> T, H
E-\> K, H
F-\> K, H, E
A-\> Z

And the output is:

Output:
K, M //(this means K has supplied goods to M and M has also supplied some good to K)
H, F

Here is what I have written the code.

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext
from pyspark.ml.regression import LinearRegression
import re
from itertools import combinations

spark = SparkContext("local", "DoubleRDD")


def findpairs(ls):
    lst = []
    for i in range(0,len(ls)-1):
        for j in range(i+1, len(ls)):
            if ls[i] == tuple(reversed(ls[j])):
                lst.append(ls[i])   
    return(lst)

text  = spark.textFile("path to the .txt")
text  = text.map(lambda s: s.replace("->",","))
text  = text.map(lambda s: s.replace(",",""))
text  = text.map(lambda s: s.replace(" ",""))
pairs = text.flatMap(lambda x:  [(x[0],y) for y in x[1:]])
commonpairs = pairs.filter(lambda x: findpairs(x))
pairs.collect()
The output is: []

Dont use RDD the problem can be solved using native spark dataframe functions. Read the text file as a spark dataframe

df = spark.read.csv('data.txt', header=False, sep=r'-\\> ').toDF('x', 'y')

# +---+-------+
# |  x|      y|
# +---+-------+
# |  K|   M, H|
# |  M|   K, E|
# |  H|      F|
# |  B|   T, H|
# |  E|   K, H|
# |  F|K, H, E|
# |  A|     Zs|
# +---+-------+

Split and explode the recipients (y) column

df1 = df.withColumn('y', F.explode(F.split('y', r',\s+')))

# +---+---+
# |  x|  y|
# +---+---+
# |  K|  M|
# |  K|  H|
# |  M|  K|
# |  M|  E|
# |  H|  F|
# |  B|  T|
# |  B|  H|
# |  E|  K|
# |  E|  H|
# |  F|  K|
# |  F|  H|
# |  F|  E|
# |  A| Zs|
# +---+---+

Self join the dataframe where recipient in left is the sender in right dataframe. Then filter the datframe such that sender in left is same as the recipient in the right

df1 = df1.alias('left').join(df1.alias('right'), on=F.expr("left.y == right.x"))
df1 = df1.filter("left.x == right.y")

# +---+---+---+---+
# |  x|  y|  x|  y|
# +---+---+---+---+
# |  K|  M|  M|  K|
# |  M|  K|  K|  M|
# |  H|  F|  F|  H|
# |  F|  H|  H|  F|
# +---+---+---+---+

Drop the duplicate combination of sender and recipients

df1 = df1.select('left.*').withColumn('pairs', F.array_sort(F.array('x', 'y')))
df1 = df1.dropDuplicates(['pairs']).drop('pairs')

# +---+---+
# |  x|  y|
# +---+---+
# |  H|  F|
# |  K|  M|
# +---+---+

text  = spark.textFile("PATH TO .txt file")
text  = text.map(lambda s: s.replace("->",","))
text  = text.map(lambda s: s.replace(",",""))
text  = text.map(lambda s: s.replace(" ",""))
pairs = text.flatMap(lambda x:  [(tuple(sorted((x[0],y))),1) for y in 
x[1:]]).groupByKey().mapValues(len)
cm = pairs.filter(lambda x: x[1] ==2).collect()
for i in range(0, len(cm)):
    print(cm[i][0])

I have written above code and it produced desired output.

('K', 'M')
('F', 'H')

Leave a Comment