Data Generation & Transformation
This example demonstrates using the generate input to create synthetic data for testing pipelines. It's useful for development, testing transformations, and simulating data sources without external dependencies.
Download & Run
Quick Start:
# Download and run directly
curl -sSL https://docs.expanso.io/examples/data-transformer.yaml | expanso-edge run -
# Or download first, customize, then run
curl -o my-pipeline.yaml https://docs.expanso.io/examples/data-transformer.yaml
expanso-edge run -f my-pipeline.yaml
Download: data-transformer.yaml
What This Pipeline Does
- Generates messages every 5 seconds with synthetic data
- Creates unique IDs, timestamps, and counter values
- Enriches with processing metadata
- Outputs to stdout for inspection
Complete Pipeline
input:
generate:
interval: 5s
mapping: |
root.id = uuid_v4()
root.message = "Hello from Expanso"
root.timestamp = now()
root.count = counter()
pipeline:
processors:
- mapping: |
root = this
root.processed_at = now()
output:
stdout:
codec: lines
Configuration Breakdown
Input: Generate
input:
generate:
interval: 5s
mapping: |
root.id = uuid_v4()
root.message = "Hello from Expanso"
root.timestamp = now()
root.count = counter()
The generate input creates messages at a fixed interval using a Bloblang mapping. Each message gets:
id: Unique UUID (e.g.,550e8400-e29b-41d4-a716-446655440000)message: Static stringtimestamp: Current timestamp in RFC3339 formatcount: Incrementing counter starting from 1
See: Generate Input
Processor: Add Metadata
pipeline:
processors:
- mapping: |
root = this
root.processed_at = now()
Copies the entire input message and adds a processed_at timestamp showing when the message was processed.
Output: Stdout
output:
stdout:
codec: lines
Prints each message to stdout as a single line of JSON.
See: Stdout Output
Example Output
Each message will look like this:
{"id":"a1b2c3d4-e5f6-7890-abcd-ef1234567890","message":"Hello from Expanso","timestamp":"2024-01-15T10:30:00Z","count":1,"processed_at":"2024-01-15T10:30:00.123Z"}
{"id":"b2c3d4e5-f6a7-8901-bcde-f12345678901","message":"Hello from Expanso","timestamp":"2024-01-15T10:30:05Z","count":2,"processed_at":"2024-01-15T10:30:05.456Z"}
{"id":"c3d4e5f6-a7b8-9012-cdef-012345678902","message":"Hello from Expanso","timestamp":"2024-01-15T10:30:10Z","count":3,"processed_at":"2024-01-15T10:30:10.789Z"}
Common Variations
Generate Realistic Test Data
Create more complex synthetic data:
input:
generate:
interval: 1s
mapping: |
root.user_id = "user-" + counter().string()
root.event_type = ["login", "logout", "purchase", "view"].index(counter() % 4)
root.timestamp = now()
root.value = random_int(min: 1, max: 100)
root.metadata.ip = "192.168.1." + (random_int(min: 1, max: 254)).string()
root.metadata.user_agent = "TestAgent/1.0"
Generate Batch Test Data
Generate a specific number of messages as fast as possible:
input:
generate:
interval: "" # Empty interval = as fast as possible
count: 1000 # Generate exactly 1000 messages
mapping: |
root.id = uuid_v4()
root.batch_num = counter()
root.data = "test-data-" + counter().string()
Scheduled Processing
Use generate to trigger scheduled processing (e.g., every 5 minutes):
input:
generate:
interval: '@every 5m'
mapping: 'root = {}'
pipeline:
processors:
# Add your scheduled processing here
- mapping: |
root.scheduled_run = now()
root.task = "periodic-cleanup"
Simulate Streaming Data
Generate continuous events with varying data:
input:
generate:
interval: 100ms # 10 messages per second
mapping: |
root.sensor_id = "sensor-" + (random_int(min: 1, max: 10)).string()
root.temperature = random_int(min: 15, max: 30)
root.humidity = random_int(min: 30, max: 80)
root.timestamp = now()
Transform Generated Data
Apply complex transformations to synthetic data:
input:
generate:
interval: 2s
mapping: |
root.raw_value = random_int(min: 0, max: 1000)
root.timestamp = now()
pipeline:
processors:
- mapping: |
# Calculate derived values
root.value = this.raw_value
root.category = if this.raw_value < 300 {
"low"
} else if this.raw_value < 700 {
"medium"
} else {
"high"
}
root.normalized = this.raw_value / 1000.0
root.processed_at = now()
Use Cases
Development & Testing
Generate test data without external dependencies:
- Test transformations before connecting to real data sources
- Validate pipeline configurations
- Benchmark processing performance
- Debug routing logic
Scheduled Tasks
Trigger periodic processing:
- Run cleanup tasks every hour
- Fetch data from APIs on schedule
- Generate periodic reports
- Trigger batch jobs
Load Testing
Simulate high-volume data sources:
- Test pipeline throughput
- Validate error handling under load
- Measure resource usage
- Identify bottlenecks
Data Simulation
Create realistic test scenarios:
- Simulate sensor data for IoT testing
- Generate user events for analytics testing
- Create transaction data for fraud detection testing
- Mock API responses
Important Notes
Interval Formats
The interval field supports several formats:
interval: 1s # Every second
interval: 500ms # Every 500 milliseconds
interval: 1m # Every minute
interval: '@every 5m' # Every 5 minutes (cron-style)
interval: "" # As fast as possible (use with count)
Counter Behavior
The counter() function maintains state across messages:
- Starts at 1
- Increments by 1 for each message
- Persists across the lifetime of the input
- Resets if the pipeline restarts
Performance Considerations
For high-volume generation:
- Use shorter intervals (e.g.,
100msfor 10 msgs/sec) - Set
countto limit total messages - Use
batch_sizeto group messages - Monitor resource usage
Memory Management
When generating large volumes:
- Set explicit
countto prevent infinite generation - Use appropriate
intervalto control rate - Consider downstream processing capacity
- Monitor memory usage
Next Steps
- Generate Input - Full configuration options
- Mapping Processor - Transform data with Bloblang
- Bloblang Guide - Data transformation reference