OSError: Couldn’t deserialize thrift: No more data to read. Deserializing page header failed

I am fetching data from event hub and uploading it to blob with blob_type AppendBlob it appending correctly but when i download and try to read that parquet file then it showing this error OSError: Couldn't deserialize thrift: No more data to read. Deserializing page header failed. and sometime this error also Unexpected end of stream: Page was smaller (4) than expected (13) could anyone help me in understanding both error and help me in solving former error.

import asyncio
from datetime import datetime
import time
from datetime import datetime
import pandas as pd
from io import BytesIO
from azure.storage.blob import BlobServiceClient
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import (BlobCheckpointStore)

EVENT_HUB_CONNECTION_STR = ""
EVENT_HUB_NAME = ""
BLOB_STORAGE_CONNECTION_STRING = ""
BLOB_CONTAINER_NAME = ""

async def on_event(partition_context, event):
    global finalDF
    try:
       
        data = event.body_as_json(encoding='UTF-8')
        df=pd.DataFrame(data,index=[0])
        finalDF=pd.concat([finalDF,df])
       
        if finalDF.shape[0]>100:
            uniqueBPIds=(finalDF['batteryserialnumber'].unique()).tolist()
            parquet = BytesIO()
            for i in uniqueBPIds:
                tempdf=finalDF[finalDF['batteryserialnumber']==i]
                tempdf.to_parquet(parquet)
                parquet.seek(0)
                blob_service_client = BlobServiceClient.from_connection_string(BLOB_STORAGE_CONNECTION_STRING)
                blob_path = f'new8_{year}/{month}/{i}/{i}_{year}_{month}_{day}.parquet'
                blob_client = blob_service_client.get_blob_client(container=BLOB_CONTAINER_NAME, blob=blob_path)
                blob_client.upload_blob(data = parquet,overwrite=False,blob_type="AppendBlob")
            finalDF=pd.DataFrame()
            print('done')
    except Exception as e:
        print('ERROR',e)

    await partition_context.update_checkpoint(event)
    
async def main():
    checkpoint_store = BlobCheckpointStore.from_connection_string(
        BLOB_STORAGE_CONNECTION_STRING, BLOB_CONTAINER_NAME
    )

    client = EventHubConsumerClient.from_connection_string(
        EVENT_HUB_CONNECTION_STR,
        consumer_group="$Default",
        checkpoint_store=checkpoint_store,
        eventhub_name=EVENT_HUB_NAME,
    )
    async with client:
        await client.receive(on_event=on_event, starting_position="-1")

if __name__ == "__main__":
    k=0
    finalDF = pd.DataFrame()
    current_datetime = datetime.now()
    year, month, day = current_datetime.year, current_datetime.month, current_datetime.day

    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Leave a Comment