Dataproc Serverless – Slow writes to GCS

I have Dataproc Serverless app using PySpark. The job is fairly straight forward, it reads from GCS from a structure like this:

bucket
├── _d_date_sale=2023-11-23
│   └── xx.parquet
├── d_date_sale=2023-11-24
│   ├── 1.parquet
│   └── 2.parquet
|      .....
├── d_date_sale=2023-11-25
│   └── xx.parquet
└── d_date_sale=2023-11-26
    └── xx.parquet
    ...

Overall the structure might include 800+ parquet files that the spark job will read. Each parquet file is probably around ~5-10 MB (this is not something I can control)

Now the task of the job is to transform the data into another structure which will basically create a lot more files since the new target structure will add another level of partitioning (i.e. adding store_id)

bucket
├── store_id=00001
│   ├── d_date_sale=2023-11-23
│   │   └── foo.parquet
│   ├── d_date_sale=2023-11-24
│   │   └── foo.parquet
│   ├── d_date_sale=2023-11-25
│   │   └── foo.parquet
│   └── d_date_sale=2023-11-26
│   │   └── foo.parquet
│   └── ...
├── store_id=00002
│   ├── d_date_sale=2023-11-26
│   │   └── foo.parquet
│   └── ...
└── store_id=00003
│   ├── d_date_sale=2023-11-26
│   │   └── foo.parquet
│   └── ...

The files will be smaller for the most part ~1-2 MB

I have observed that the writes to the temp directory .staging-spark-* goes fairly fast (judging by the _SUCCESS file timestamp) but for the copying step this can take 3-4 time longer than the previous step. I can’t wrap my head around why – any ideas are appreciated 🙂


Configuration details:

   version: 2.1
   spark.dataproc.scaling.version: 2
   spark.executor.cores: 8

Attaching a snippet of the code (its pretty much the gist of it)


class CloudStorage(object):
    @staticmethod
    def _add_duplicate_partition_key(
        src_df: DataFrame,
        partition_key: str,
    ) -> DataFrame:
        duplicated_partition_key = f"_{partition_key}"

        return src_df.withColumn(
            colName=duplicated_partition_key, col=F.col(partition_key)
        )

    @staticmethod
    def write(
        df: DataFrame,
        partitions_cols: list[str] | None,
        dest_bucket: str,
        data_type: types.SupportedSourceTypes,
        mode: types.ParquetWriteModeTypes,
    ):
        # Spark removes the partitions cols, we want them to keep them
        # so we duplicate them and add prefix with _{colname}
        dup_part_key_df: DataFrame = functools.reduce(
            lambda res_df, p_key: CloudStorage._add_duplicate_partition_key(
                res_df, p_key
            ),
            partitions_cols,
            df,
        )

        # repartitioned = duplicated_partition_key_df.repartition(*[F.col(f"_{column}") for column in partitions_cols])

        dup_part_key_df.write.partitionBy(partitions_cols).mode(mode).option("compression", "snappy").format(
            'parquet').save(f"gs://{dest_bucket}/data/{data_type}/")

### Example call ### 
 paths = [f"gs://{self.config.dest_bucket}/{prefix}*.parquet" for prefix in partition_dates]
 df = self.spark.read.load(path=paths, format="parquet")
 CloudStorage.write(df, cols, dest_bucket, ..)

Leave a Comment