I have Dataproc Serverless app using PySpark. The job is fairly straight forward, it reads from GCS from a structure like this:
bucket
├── _d_date_sale=2023-11-23
│ └── xx.parquet
├── d_date_sale=2023-11-24
│ ├── 1.parquet
│ └── 2.parquet
| .....
├── d_date_sale=2023-11-25
│ └── xx.parquet
└── d_date_sale=2023-11-26
└── xx.parquet
...
Overall the structure might include 800+ parquet files that the spark job will read. Each parquet file is probably around ~5-10 MB (this is not something I can control)
Now the task of the job is to transform the data into another structure which will basically create a lot more files since the new target structure will add another level of partitioning (i.e. adding store_id)
bucket
├── store_id=00001
│ ├── d_date_sale=2023-11-23
│ │ └── foo.parquet
│ ├── d_date_sale=2023-11-24
│ │ └── foo.parquet
│ ├── d_date_sale=2023-11-25
│ │ └── foo.parquet
│ └── d_date_sale=2023-11-26
│ │ └── foo.parquet
│ └── ...
├── store_id=00002
│ ├── d_date_sale=2023-11-26
│ │ └── foo.parquet
│ └── ...
└── store_id=00003
│ ├── d_date_sale=2023-11-26
│ │ └── foo.parquet
│ └── ...
The files will be smaller for the most part ~1-2 MB
I have observed that the writes to the temp directory .staging-spark-*
goes fairly fast (judging by the _SUCCESS file timestamp) but for the copying step this can take 3-4 time longer than the previous step. I can’t wrap my head around why – any ideas are appreciated 🙂
Configuration details:
version: 2.1
spark.dataproc.scaling.version: 2
spark.executor.cores: 8
Attaching a snippet of the code (its pretty much the gist of it)
class CloudStorage(object):
@staticmethod
def _add_duplicate_partition_key(
src_df: DataFrame,
partition_key: str,
) -> DataFrame:
duplicated_partition_key = f"_{partition_key}"
return src_df.withColumn(
colName=duplicated_partition_key, col=F.col(partition_key)
)
@staticmethod
def write(
df: DataFrame,
partitions_cols: list[str] | None,
dest_bucket: str,
data_type: types.SupportedSourceTypes,
mode: types.ParquetWriteModeTypes,
):
# Spark removes the partitions cols, we want them to keep them
# so we duplicate them and add prefix with _{colname}
dup_part_key_df: DataFrame = functools.reduce(
lambda res_df, p_key: CloudStorage._add_duplicate_partition_key(
res_df, p_key
),
partitions_cols,
df,
)
# repartitioned = duplicated_partition_key_df.repartition(*[F.col(f"_{column}") for column in partitions_cols])
dup_part_key_df.write.partitionBy(partitions_cols).mode(mode).option("compression", "snappy").format(
'parquet').save(f"gs://{dest_bucket}/data/{data_type}/")
### Example call ###
paths = [f"gs://{self.config.dest_bucket}/{prefix}*.parquet" for prefix in partition_dates]
df = self.spark.read.load(path=paths, format="parquet")
CloudStorage.write(df, cols, dest_bucket, ..)