Kafka to S3 Pipeline
This example demonstrates consuming messages from Apache Kafka and writing them to Amazon S3 as batched JSON files. Useful for data lake ingestion, event stream archiving, or ETL workflows.
Download & Run
Quick Start:
# Download and run directly
curl -sSL https://docs.expanso.io/examples/kafka-to-s3.yaml | expanso-edge run -
# Or download first, customize, then run
curl -o my-pipeline.yaml https://docs.expanso.io/examples/kafka-to-s3.yaml
expanso-edge run -f my-pipeline.yaml
Download: kafka-to-s3.yaml
What This Pipeline Does
- Consumes messages from a Kafka topic
- Parses and validates JSON payloads
- Enriches messages with metadata (Kafka partition, offset, timestamp)
- Batches messages for efficient S3 uploads
- Writes batched data to S3 with partitioned paths
Complete Pipeline
Configure your Kafka brokers and S3 bucket, then run:
input:
kafka:
addresses:
- localhost:9092
topics:
- events
consumer_group: expanso-s3-archiver
start_from_oldest: true
pipeline:
processors:
# Parse JSON payload
- mapping: |
let parsed = this.parse_json()
root = if parsed.exists() { parsed } else { deleted() }
# Add Kafka metadata for traceability
- mapping: |
root.kafka_topic = @kafka_topic
root.kafka_partition = @kafka_partition
root.kafka_offset = @kafka_offset
root.processed_at = now()
# Batch messages into JSON array
- archive:
format: json_array
output:
aws_s3:
bucket: my-data-lake
path: events/${!timestamp_format("2006/01/02/15")}/batch-${!timestamp_unix_nano()}.json
content_type: application/json
region: us-east-1
batching:
count: 100
period: 30s
Configuration Breakdown
Input: Kafka
input:
kafka:
addresses:
- localhost:9092
topics:
- events
consumer_group: expanso-s3-archiver
start_from_oldest: true
The kafka input connects to your Kafka cluster and consumes messages from the specified topic. Parallelism is controlled by checkpoint_limit, and offsets are committed periodically to track progress.
Key settings:
addresses: Your Kafka broker addressestopics: The topic(s) to consume fromconsumer_group: Groups consumers for offset tracking and load balancingstart_from_oldest: Start from earliest messages on first run
See: Kafka Input
Processors: Parse, Enrich, Batch
1. Parse JSON
- mapping: |
let parsed = this.parse_json()
root = if parsed.exists() { parsed } else { deleted() }
Parses each Kafka message as JSON. Invalid messages are dropped to prevent pipeline errors.
2. Add Metadata
- mapping: |
root.kafka_topic = @kafka_topic
root.kafka_partition = @kafka_partition
root.kafka_offset = @kafka_offset
root.processed_at = now()
Enriches each message with Kafka metadata for traceability:
kafka_topic: Source topickafka_partition: Partition numberkafka_offset: Message offset- Processing timestamp
3. Batch Messages
- archive:
format: json_array
Uses the archive processor to group messages into JSON arrays. Combined with the output batching config, this creates efficient batches for S3 uploads.
See: Archive Processor
Output: AWS S3
output:
aws_s3:
bucket: my-data-lake
path: events/${!timestamp_format("2006/01/02/15")}/batch-${!timestamp_unix_nano()}.json
content_type: application/json
region: us-east-1
batching:
count: 100
period: 30s
Writes batched messages to S3 with a time-partitioned path structure. The path uses interpolation functions to organize files by date and ensure unique filenames.
Path structure: events/YYYY/MM/DD/HH/batch-timestamp.json
This enables efficient querying with tools like Athena, Spark, or Presto that support partition pruning.
See: AWS S3 Output
Common Variations
Add Compression
Compress files before upload to reduce storage costs:
output:
aws_s3:
bucket: my-data-lake
path: events/${!timestamp_format("2006/01/02/15")}/batch-${!timestamp_unix_nano()}.json.gz
content_type: application/json
content_encoding: gzip
region: us-east-1
batching:
count: 100
period: 30s
processors:
- compress:
algorithm: gzip
Route by Topic
When consuming from multiple topics, route to different S3 prefixes:
input:
kafka:
addresses:
- localhost:9092
topics:
- orders
- users
- events
consumer_group: expanso-s3-archiver
output:
aws_s3:
bucket: my-data-lake
path: ${!@kafka_topic}/${!timestamp_format("2006/01/02/15")}/batch-${!timestamp_unix_nano()}.json
content_type: application/json
region: us-east-1
batching:
count: 100
period: 30s
Add Error Handling
Send failed messages to a dead letter topic instead of dropping them:
pipeline:
processors:
- try:
- mapping: |
root = this.parse_json()
- mapping: |
root.kafka_topic = @kafka_topic
root.kafka_partition = @kafka_partition
root.kafka_offset = @kafka_offset
root.processed_at = now()
catch:
- mapping: |
root.error = error()
root.raw_message = content()
root.failed_at = now()
- output:
kafka:
addresses:
- localhost:9092
topic: dead-letter-events
Next Steps
- Kafka Input - Full Kafka configuration options
- AWS S3 Output - S3 configuration and batching
- Archive Processor - Batching patterns
- AWS Guide - AWS credentials and configuration