Skip to main content

Data Generation & Transformation

This example demonstrates using the generate input to create synthetic data for testing pipelines. It's useful for development, testing transformations, and simulating data sources without external dependencies.

Download & Run

Quick Start:

# Download and run directly
curl -sSL https://docs.expanso.io/examples/data-transformer.yaml | expanso-edge run -

# Or download first, customize, then run
curl -o my-pipeline.yaml https://docs.expanso.io/examples/data-transformer.yaml
expanso-edge run -f my-pipeline.yaml

Download: data-transformer.yaml

What This Pipeline Does

  1. Generates messages every 5 seconds with synthetic data
  2. Creates unique IDs, timestamps, and counter values
  3. Enriches with processing metadata
  4. Outputs to stdout for inspection

Complete Pipeline

input:
generate:
interval: 5s
mapping: |
root.id = uuid_v4()
root.message = "Hello from Expanso"
root.timestamp = now()
root.count = counter()

pipeline:
processors:
- mapping: |
root = this
root.processed_at = now()

output:
stdout:
codec: lines

Configuration Breakdown

Input: Generate

input:
generate:
interval: 5s
mapping: |
root.id = uuid_v4()
root.message = "Hello from Expanso"
root.timestamp = now()
root.count = counter()

The generate input creates messages at a fixed interval using a Bloblang mapping. Each message gets:

  • id: Unique UUID (e.g., 550e8400-e29b-41d4-a716-446655440000)
  • message: Static string
  • timestamp: Current timestamp in RFC3339 format
  • count: Incrementing counter starting from 1

See: Generate Input

Processor: Add Metadata

pipeline:
processors:
- mapping: |
root = this
root.processed_at = now()

Copies the entire input message and adds a processed_at timestamp showing when the message was processed.

Output: Stdout

output:
stdout:
codec: lines

Prints each message to stdout as a single line of JSON.

See: Stdout Output

Example Output

Each message will look like this:

{"id":"a1b2c3d4-e5f6-7890-abcd-ef1234567890","message":"Hello from Expanso","timestamp":"2024-01-15T10:30:00Z","count":1,"processed_at":"2024-01-15T10:30:00.123Z"}
{"id":"b2c3d4e5-f6a7-8901-bcde-f12345678901","message":"Hello from Expanso","timestamp":"2024-01-15T10:30:05Z","count":2,"processed_at":"2024-01-15T10:30:05.456Z"}
{"id":"c3d4e5f6-a7b8-9012-cdef-012345678902","message":"Hello from Expanso","timestamp":"2024-01-15T10:30:10Z","count":3,"processed_at":"2024-01-15T10:30:10.789Z"}

Common Variations

Generate Realistic Test Data

Create more complex synthetic data:

input:
generate:
interval: 1s
mapping: |
root.user_id = "user-" + counter().string()
root.event_type = ["login", "logout", "purchase", "view"].index(counter() % 4)
root.timestamp = now()
root.value = random_int(min: 1, max: 100)
root.metadata.ip = "192.168.1." + (random_int(min: 1, max: 254)).string()
root.metadata.user_agent = "TestAgent/1.0"

Generate Batch Test Data

Generate a specific number of messages as fast as possible:

input:
generate:
interval: "" # Empty interval = as fast as possible
count: 1000 # Generate exactly 1000 messages
mapping: |
root.id = uuid_v4()
root.batch_num = counter()
root.data = "test-data-" + counter().string()

Scheduled Processing

Use generate to trigger scheduled processing (e.g., every 5 minutes):

input:
generate:
interval: '@every 5m'
mapping: 'root = {}'

pipeline:
processors:
# Add your scheduled processing here
- mapping: |
root.scheduled_run = now()
root.task = "periodic-cleanup"

Simulate Streaming Data

Generate continuous events with varying data:

input:
generate:
interval: 100ms # 10 messages per second
mapping: |
root.sensor_id = "sensor-" + (random_int(min: 1, max: 10)).string()
root.temperature = random_int(min: 15, max: 30)
root.humidity = random_int(min: 30, max: 80)
root.timestamp = now()

Transform Generated Data

Apply complex transformations to synthetic data:

input:
generate:
interval: 2s
mapping: |
root.raw_value = random_int(min: 0, max: 1000)
root.timestamp = now()

pipeline:
processors:
- mapping: |
# Calculate derived values
root.value = this.raw_value
root.category = if this.raw_value < 300 {
"low"
} else if this.raw_value < 700 {
"medium"
} else {
"high"
}
root.normalized = this.raw_value / 1000.0
root.processed_at = now()

Use Cases

Development & Testing

Generate test data without external dependencies:

  • Test transformations before connecting to real data sources
  • Validate pipeline configurations
  • Benchmark processing performance
  • Debug routing logic

Scheduled Tasks

Trigger periodic processing:

  • Run cleanup tasks every hour
  • Fetch data from APIs on schedule
  • Generate periodic reports
  • Trigger batch jobs

Load Testing

Simulate high-volume data sources:

  • Test pipeline throughput
  • Validate error handling under load
  • Measure resource usage
  • Identify bottlenecks

Data Simulation

Create realistic test scenarios:

  • Simulate sensor data for IoT testing
  • Generate user events for analytics testing
  • Create transaction data for fraud detection testing
  • Mock API responses

Important Notes

Interval Formats

The interval field supports several formats:

interval: 1s          # Every second
interval: 500ms # Every 500 milliseconds
interval: 1m # Every minute
interval: '@every 5m' # Every 5 minutes (cron-style)
interval: "" # As fast as possible (use with count)

Counter Behavior

The counter() function maintains state across messages:

  • Starts at 1
  • Increments by 1 for each message
  • Persists across the lifetime of the input
  • Resets if the pipeline restarts

Performance Considerations

For high-volume generation:

  • Use shorter intervals (e.g., 100ms for 10 msgs/sec)
  • Set count to limit total messages
  • Use batch_size to group messages
  • Monitor resource usage

Memory Management

When generating large volumes:

  • Set explicit count to prevent infinite generation
  • Use appropriate interval to control rate
  • Consider downstream processing capacity
  • Monitor memory usage

Next Steps