Streaming Reads with Python and Google Cloud Storage

In data processing, efficiency and reliability are paramount. As a data engineer, you’ll often need to read files in resource constrained environments. One common approach to reading a file is to stream the file and process it in smaller chunks. I recently came across a way to accomplish this using Google Cloud Storage (GCS), Python, and a CRC32C checksum (to verify the file’s integrity). Some reasons why this approach could be useful and why this post exists:

Implementing the Approach

To capitalize on these benefits, I developed a Python snippet that seamlessly integrates with the Google Cloud Storage Python library. The Crc32cReader class wraps around a file-like object, allowing you to calculate the CRC32C digest as the file is read in chunks. This method ensures data integrity and offers an efficient way to handle large files from GCS.

Here’s the main code implementation:

import base64
import struct

from contextlib import contextmanager
from typing import IO

from google.cloud import storage

class Crc32cReader:
    """The Google Python client doesn't calculate the CRC32C digest when streaming a file for consumption.
    This wrapper provides a way to calculate the CRC32C digest as a file-like object is read in chunks.
    """

    def __init__(self, fileobj: IO[bytes]) -> None:
        self._fileobj: IO[bytes] = fileobj
        self.digest: int = 0

    def _update(self, chunk: bytes) -> None:
        """Given a chunk read from the file, update the hexdigest"""
        self.digest = crc32c(chunk, self.digest)

    def read(self, size: int = -1) -> bytes:
        chunk: bytes = self._fileobj.read(size)
        self._update(chunk)
        return chunk

    def read1(self, size: int = -1) -> bytes:
        return self.read(size)

    def hexdigest(self) -> str:
        """Return the hexdigest of the hasher.
        The Base64 encoded CRC32C is in big-endian byte order.
        See https://cloud.google.com/storage/docs/hashes-etags
        """
        return base64.b64encode(struct.pack(">I", self.digest)).decode("utf-8")


@contextlib.contextmanager
def read_file_as_stream(self, bucket_name: str, file_path: str):
    """
    Read a GCS blob as a stream.

    Args:
        bucket_name: The name of the bucket that contains the blob.
        file_path: The name of the blob.

    Returns:
        A file-like object
    """
    client = storage.Client()
    bucket = client.bucket(bucket_name)
    blob = bucket.blob(blob_name=file_path)

    # You must reload a blob to get properties of it, like crc32c digest
    blob.reload()
    expected_checksum: int = blob.crc32c

    calculated_checksum = 0
    # Note: The Google libraries stream a file when open is invoked on the blob itself
    with blob.open(mode="rb", chunk_size=CHUNK_SIZE) as blob_in:
        crc32c_reader = Crc32cReader(blob_in)
        yield crc32c_reader
        calculated_checksum = crc32c_reader.hexdigest()

    if calculated_checksum != expected_checksum:
        raise ValueError(
            f"Checksum mismatch. Expected {expected_checksum}, calculated {calculated_checksum}"
        )

The read_file_as_stream method can be used by a consuming method that needs to access a file from GCS in a streaming fashion. Once the file is processed, the CRC32C digest is verified against the metadata stored in GCS and an exception is raised if there’s a mis-match, due to transmission error or data corruption.

Conclusion

With streaming of files, we can create more efficient, reliable, and resource-conscious data processing workflows. This approach is particularly beneficial for handling large files, ensuring data integrity, and optimizing resource usage.

Score: 
0
×