Apache Beam pipeline that bulk-decompresses files in Google Cloud Storage. matches files by glob pattern, detects compression from the extension, decompresses to a target bucket, and logs failures to a CSV. skips files that already exist at the destination. ships in both Java and Python.
python cli_dataflow_decompress.py \
--input_file_pattern=gs://my-bucket/*.gz \
--output_bucket=gs://my-output-bucket \
--output_failure_file=gs://my-bucket/failed.csv- glob matching — point it at
gs://bucket/path/*.gzand it finds everything - auto-detection — figures out compression type from the file extension (GZIP, BZIP2, DEFLATE, ZIP)
- idempotent — checks if the output file already exists before decompressing, safe to re-run
- dead-letter output — uncompressed files, malformed archives, and I/O errors go to a CSV error log instead of crashing the pipeline
- parallel decompression (Python) — uses a thread pool sized to CPU count, processes files in configurable batches
- strips extension —
data.json.gzbecomesdata.jsonat the destination
| type | Java | Python |
|---|---|---|
| GZIP | yes | yes |
| BZIP2 | yes | yes |
| DEFLATE | yes | yes |
| ZIP | yes | — |
detection is automatic via file extension. unrecognized extensions get routed to the error output.
designed as a Google Cloud Dataflow template (CLI_Dataflow_Decompress). uses ValueProvider for runtime parameter binding, Guava for byte copying, Apache Commons CSV for error output formatting.
java -cp <classpath> com.google.cloud.teleport.templates.CliDataflowDecompress \
--inputFilePattern=gs://bucket/*.gz \
--outputBucket=gs://output-bucket \
--outputFailureFile=gs://bucket/failed.csvstandalone script. runs on Dataflow by default. uses GcsIO for streaming reads/writes and a ThreadPoolExecutor for parallel decompression within each worker.
python cli_dataflow_decompress.py \
--input_file_pattern=gs://bucket/*.gz \
--output_bucket=gs://output-bucket \
--output_failure_file=gs://bucket/failed.csv \
--batch_size=100| flag | required | description |
|---|---|---|
--inputFilePattern |
yes | GCS glob pattern for input files |
--outputBucket |
yes | GCS bucket for decompressed output |
--outputFailureFile |
yes | GCS path for CSV error log |
plus standard Beam/Dataflow flags (--runner, --project, --region, etc.).
| flag | required | default | description |
|---|---|---|---|
--input_file_pattern |
yes | — | GCS glob pattern for input files |
--output_bucket |
yes | — | GCS bucket for decompressed output |
--output_failure_file |
yes | — | GCS path for CSV error log |
--batch_size |
no | 100 |
files per batch |
plus standard Beam/Dataflow flags.
decompressed files land in the output bucket with the same path structure, minus the compression extension.
failures go to a CSV:
Filename,Error
gs://bucket/bad-file.gz,"BZip2 format error: ..."
gs://bucket/not-compressed.txt,"file is not compressed"three cases are caught and routed to the dead-letter CSV instead of failing the pipeline:
- uncompressed file — no recognized compression extension
- malformed archive — BZip2 format errors, incorrect zlib headers
- I/O errors — general read/write failures
no explicit retry logic beyond Beam's built-in bundle retry on worker failure.
inferred from imports (no pom.xml in this repo — designed to live inside the DataflowTemplates project):
org.apache.beam:beam-sdks-java-corecom.google.guava:guavaorg.apache.commons:commons-csvorg.slf4j:slf4j-api
apache-beam[gcp]
google-cloud-storage
CliDataflowDecompress.java — Java implementation (Dataflow template)
cli_dataflow_decompress.py — Python implementation (standalone)
Java key classes:
CliDataflowDecompress— pipeline entry point and orchestrationCliDataflowDecompress.Decompress—DoFnthat handles per-file decompression logicCliDataflowDecompress.Options— pipeline options interface
Python key classes:
CliDataflowDecompressOptions— CLI args and validationDecompress—DoFnwith thread pool, handles decompression and GCS I/OBatchElements—DoFnthat groups file metadata into batches
MIT