Skip to main content

Kafka to S3 Pipeline

This example demonstrates consuming messages from Apache Kafka and writing them to Amazon S3 as batched JSON files. Useful for data lake ingestion, event stream archiving, or ETL workflows.

Download & Run

Quick Start:

# Download and run directly
curl -sSL https://docs.expanso.io/examples/kafka-to-s3.yaml | expanso-edge run -

# Or download first, customize, then run
curl -o my-pipeline.yaml https://docs.expanso.io/examples/kafka-to-s3.yaml
expanso-edge run -f my-pipeline.yaml

Download: kafka-to-s3.yaml

What This Pipeline Does

  1. Consumes messages from a Kafka topic
  2. Parses and validates JSON payloads
  3. Enriches messages with metadata (Kafka partition, offset, timestamp)
  4. Batches messages for efficient S3 uploads
  5. Writes batched data to S3 with partitioned paths

Complete Pipeline

Configure your Kafka brokers and S3 bucket, then run:

input:
kafka:
addresses:
- localhost:9092
topics:
- events
consumer_group: expanso-s3-archiver
start_from_oldest: true

pipeline:
processors:
# Parse JSON payload
- mapping: |
let parsed = this.parse_json()
root = if parsed.exists() { parsed } else { deleted() }

# Add Kafka metadata for traceability
- mapping: |
root.kafka_topic = @kafka_topic
root.kafka_partition = @kafka_partition
root.kafka_offset = @kafka_offset
root.processed_at = now()

# Batch messages into JSON array
- archive:
format: json_array

output:
aws_s3:
bucket: my-data-lake
path: events/${!timestamp_format("2006/01/02/15")}/batch-${!timestamp_unix_nano()}.json
content_type: application/json
region: us-east-1
batching:
count: 100
period: 30s

Configuration Breakdown

Input: Kafka

input:
kafka:
addresses:
- localhost:9092
topics:
- events
consumer_group: expanso-s3-archiver
start_from_oldest: true

The kafka input connects to your Kafka cluster and consumes messages from the specified topic. Parallelism is controlled by checkpoint_limit, and offsets are committed periodically to track progress.

Key settings:

  • addresses: Your Kafka broker addresses
  • topics: The topic(s) to consume from
  • consumer_group: Groups consumers for offset tracking and load balancing
  • start_from_oldest: Start from earliest messages on first run

See: Kafka Input

Processors: Parse, Enrich, Batch

1. Parse JSON

- mapping: |
let parsed = this.parse_json()
root = if parsed.exists() { parsed } else { deleted() }

Parses each Kafka message as JSON. Invalid messages are dropped to prevent pipeline errors.

2. Add Metadata

- mapping: |
root.kafka_topic = @kafka_topic
root.kafka_partition = @kafka_partition
root.kafka_offset = @kafka_offset
root.processed_at = now()

Enriches each message with Kafka metadata for traceability:

  • kafka_topic: Source topic
  • kafka_partition: Partition number
  • kafka_offset: Message offset
  • Processing timestamp

3. Batch Messages

- archive:
format: json_array

Uses the archive processor to group messages into JSON arrays. Combined with the output batching config, this creates efficient batches for S3 uploads.

See: Archive Processor

Output: AWS S3

output:
aws_s3:
bucket: my-data-lake
path: events/${!timestamp_format("2006/01/02/15")}/batch-${!timestamp_unix_nano()}.json
content_type: application/json
region: us-east-1
batching:
count: 100
period: 30s

Writes batched messages to S3 with a time-partitioned path structure. The path uses interpolation functions to organize files by date and ensure unique filenames.

Path structure: events/YYYY/MM/DD/HH/batch-timestamp.json

This enables efficient querying with tools like Athena, Spark, or Presto that support partition pruning.

See: AWS S3 Output

Common Variations

Add Compression

Compress files before upload to reduce storage costs:

output:
aws_s3:
bucket: my-data-lake
path: events/${!timestamp_format("2006/01/02/15")}/batch-${!timestamp_unix_nano()}.json.gz
content_type: application/json
content_encoding: gzip
region: us-east-1
batching:
count: 100
period: 30s
processors:
- compress:
algorithm: gzip

Route by Topic

When consuming from multiple topics, route to different S3 prefixes:

input:
kafka:
addresses:
- localhost:9092
topics:
- orders
- users
- events
consumer_group: expanso-s3-archiver

output:
aws_s3:
bucket: my-data-lake
path: ${!@kafka_topic}/${!timestamp_format("2006/01/02/15")}/batch-${!timestamp_unix_nano()}.json
content_type: application/json
region: us-east-1
batching:
count: 100
period: 30s

Add Error Handling

Send failed messages to a dead letter topic instead of dropping them:

pipeline:
processors:
- try:
- mapping: |
root = this.parse_json()
- mapping: |
root.kafka_topic = @kafka_topic
root.kafka_partition = @kafka_partition
root.kafka_offset = @kafka_offset
root.processed_at = now()
catch:
- mapping: |
root.error = error()
root.raw_message = content()
root.failed_at = now()
- output:
kafka:
addresses:
- localhost:9092
topic: dead-letter-events

Next Steps