Identify Duplicate and Non-Dup records in a dataframe

I have a dataframe which contains both duplicate and distinct records in it. I have to identify which are the duplicate records and which are distinct records and split them separately in 2 different dataframes.

Input:

custid | cust_name | loc | prodid
1234   | John      | US  | P133
1234   | John      | US  | P133
1234   | John      | US  | P133
5678   | Mike      | CHN | P456
4325   | Peter     | RUS | P247
3458   | Andy      | IND | P764 
3458   | Andy      | IND | P764

Ouput:
DF 1 (Dups):

custid | cust_name | loc | prodid
1234   | John      | US  | P133
1234   | John      | US  | P133
1234   | John      | US  | P133
3458   | Andy      | IND | P764 
3458   | Andy      | IND | P764

DF2 (Non Dups):

custid | cust_name | loc | prodid
5678   | Mike      | CHN | P456
4325   | Peter     | RUS | P247

Can someone please help.

Create a window specification to count the number of rows in each group then check the rows where count is greater than 1 to create a boolean flag has_dupes then filter the subsets using this flag

W = Window.partitionBy(*df.columns)
df = df.withColumn('has_dupes', F.count(F.lit(1)).over(W) > 1)

df_dupes = df.filter('has_dupes')
df_nodupes = df.filter('not has_dupes')

df_nodupes.show()
+------+---------+---+------+---------+
|custid|cust_name|loc|prodid|has_dupes|
+------+---------+---+------+---------+
|  4325|    Peter|RUS|  P247|    false|
|  5678|     Mike|CHN|  P456|    false|
+------+---------+---+------+---------+

df_dupes.show()
+------+---------+---+------+---------+
|custid|cust_name|loc|prodid|has_dupes|
+------+---------+---+------+---------+
|  1234|     John| US|  P133|     true|
|  1234|     John| US|  P133|     true|
|  1234|     John| US|  P133|     true|
|  3458|     Andy|IND|  P764|     true|
|  3458|     Andy|IND|  P764|     true|
+------+---------+---+------+---------+

using Spark SQL

df.createorReplaceTempView("df_main")

df_non_dup=spark.sql("""select * from df_main where custid in (select custid from df group by custid having count(*)=1)""")

df_dup=spark.sql("""select * from df_main where custid not in (select custid from df group by custid having count(*)=1)""")

using SPARK CORE

df1=df.select('*').distinct()
pandasDF1 = df1.toPandas().astype(str)
pandasDF=df.toPandas().astype(str)
pandasDF2=pandasDF[~pandasDF.isin(pandasDF1)].dropna()
df2=spark.createDataFrame(pandasDF2)

Here’s one straightforward way to do it. Get a count of duplicates while doing a groupby on all the columns. Then filter on the count and create two dataframes one having duplicate customer ids and another with unique customer ids. Then join these with the original dataframe.

import pyspark.sql.functions as F
from pyspark import SparkContext, SQLContext

sc = SparkContext('local')
sqlContext = SQLContext(sc)
### This is very important setting if you want legacy behaviour
sqlContext.setConf("spark.sql.legacy.timeParserPolicy", "LEGACY")

data1 = [
    [1234, "John", "US", "P133"],
    [1234, "John", "US", "P133"],
    [1234, "John", "US", "P133"],
    [5678, "Mike", "CHN", "P456"],
    [4325, "Peter", "RUS", "P247"],
    [3458, "Andy", "IND", "P764"],
    [3458, "Andy", "IND", "P764"],

]



df1Columns = ["custid","cust_name","loc","prodid"]
df1 = sqlContext.createDataFrame(data=data1, schema=df1Columns)



df1.show(n=100, truncate=False)


df1_count = df1.groupby(["custid","cust_name","loc","prodid"]).agg(F.count("*").alias("count"))
df1_count.show(n=100, truncate=False)

df1_select = df1_count.select("custid", "count")
df1_select.show(n=100, truncate=False)

df1_more_than_one = df1_select.filter(F.col("count") > 1)
df1_exactly_one = df1_select.filter(F.col("count") == 1)

df1_more_than_one.show(n=100, truncate=False)
df1_exactly_one.show(n=100, truncate=False)


df1_duplicates = df1.join(df1_more_than_one, on=["custid"]).drop("count")
df1_non_duplicates = df1.join(df1_exactly_one, on=["custid"]).drop("count")

print("Duplicates dataframe below")
df1_duplicates.show(n=100, truncate=False)
print("Non duplicates dataframe below")
df1_non_duplicates.show(n=100, truncate=False)

Output :

+------+---------+---+------+
|custid|cust_name|loc|prodid|
+------+---------+---+------+
|1234  |John     |US |P133  |
|1234  |John     |US |P133  |
|1234  |John     |US |P133  |
|5678  |Mike     |CHN|P456  |
|4325  |Peter    |RUS|P247  |
|3458  |Andy     |IND|P764  |
|3458  |Andy     |IND|P764  |
+------+---------+---+------+

+------+---------+---+------+-----+
|custid|cust_name|loc|prodid|count|
+------+---------+---+------+-----+
|5678  |Mike     |CHN|P456  |1    |
|1234  |John     |US |P133  |3    |
|3458  |Andy     |IND|P764  |2    |
|4325  |Peter    |RUS|P247  |1    |
+------+---------+---+------+-----+

+------+-----+
|custid|count|
+------+-----+
|5678  |1    |
|1234  |3    |
|3458  |2    |
|4325  |1    |
+------+-----+

+------+-----+
|custid|count|
+------+-----+
|1234  |3    |
|3458  |2    |
+------+-----+

+------+-----+
|custid|count|
+------+-----+
|5678  |1    |
|4325  |1    |
+------+-----+

Duplicates dataframe below
+------+---------+---+------+
|custid|cust_name|loc|prodid|
+------+---------+---+------+
|1234  |John     |US |P133  |
|1234  |John     |US |P133  |
|1234  |John     |US |P133  |
|3458  |Andy     |IND|P764  |
|3458  |Andy     |IND|P764  |
+------+---------+---+------+

Non duplicates dataframe below
+------+---------+---+------+
|custid|cust_name|loc|prodid|
+------+---------+---+------+
|5678  |Mike     |CHN|P456  |
|4325  |Peter    |RUS|P247  |
+------+---------+---+------+

import pyspark.sql.functions as f
df_basket1.join(
    df_basket1.groupBy(df_basket1.columns).agg((f.count("*")>1).cast("int").alias("Duplicate_indicator")),
    on=df_basket1.columns,
    how="inner"
)

Leave a Comment