Unable to read CSV file with headers from S3 bucket using FileSource.forBulkFileFormat functionality of apache flink

My project requires reading a csv file from an S3 bucket which contains headers using the “FileSource.forBulkFileFormat” functionality of apache flink (version 1.18.0). Programming language used is java. Hadoop file system is being used via flink’s library named “flink-s3-fs-hadoop”.

Example of CSV data is:

student_id,exam_id,subject,score,grade
1,1,Math,41,D
1,1,Spanish,51,C

The below code is able to read if headers are not present in the csv file. Code Snippet used:

private static final StreamExecutionEnvironment ENV;
private static final StreamTableEnvironment TABLE_ENV;
    
    static  {
        ENV = StreamExecutionEnvironment.getExecutionEnvironment()
                .setRuntimeMode(RuntimeExecutionMode.BATCH)
                .setParallelism(1);
        TABLE_ENV = StreamTableEnvironment.create(ENV);
    }
Configuration configuration =  new Configuration();
        configuration.setString("s3.access-key", "<<access-key>>");
        configuration.setString("s3.secret-key", "<<secret-key>>");
        FileSystem.initialize(configuration, null);
        
        DataType dataType = DataTypes.ROW(
                DataTypes.FIELD("student_id", DataTypes.INT()),
                DataTypes.FIELD("exam_id", DataTypes.INT()),
                DataTypes.FIELD("subject", DataTypes.STRING()),
                DataTypes.FIELD("score", DataTypes.INT()),
                DataTypes.FIELD("grade", DataTypes.STRING())
                );
        RowType rowType = (RowType) dataType.getLogicalType();
        CsvRowDataDeserializationSchema deserSchema = new CsvRowDataDeserializationSchema.Builder(rowType, InternalTypeInfo.of(rowType)).build();
        
        FileSource<RowData> source = FileSource.forBulkFileFormat(
                 new DeserializationSchemaAdapter(deserSchema),
                 new Path("<<s3FilePath>>"))
                .build();
        DataStream<RowData> rowData = ENV.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
        Table tempTable = TABLE_ENV.fromDataStream(rowData, Schema.newBuilder().fromRowDataType(dataType).build());
        tempTable.printSchema();
        tempTable.execute().print();

Expected Output: Able to read CSV file containing headers

student_id,exam_id,subject,score,grade
1,1,Math,41,D
1,1,Spanish,51,C

Actual Output: Able to read CSV file without headers

1,1,Math,41,D
1,1,Spanish,51,C

Can anybody please let me know if there is a way to read the csv file with headers using the “FileSource.forBulkFileFormat” functionality?

  • This appears to be a duplicate of stackoverflow.com/questions/77762619/…

    – 

  • @Martjin Visser the specified link is not a duplicate. The above link is for write operation. This question is for Read operation. Please read and understand the question before marking it down.

    – 




Leave a Comment