Step 3: Data Transformation
Now let's add some real processing. Imagine you have IoT sensors reporting temperature in Celsius, but you need Fahrenheit. This pipeline simulates sensor data and converts it.
Create the Pipeline
Create sensor-pipeline.yaml:
input:
generate:
interval: 1s
mapping: |
root.device_id = "edge-sensor-01"
root.temp_c = random_int(min: 15, max: 35)
root.humidity = random_int(min: 30, max: 80)
pipeline:
processors:
- mapping: |
root = this
root.temp_f = (this.temp_c * 1.8) + 32
root.processed_at = now()
output:
stdout:
codec: lines
Run it:
expanso-edge run --config sensor-pipeline.yaml
Output:
{"device_id":"edge-sensor-01","humidity":65,"processed_at":"2024-12-26T10:05:00Z","temp_c":22,"temp_f":71.6}
{"device_id":"edge-sensor-01","humidity":48,"processed_at":"2024-12-26T10:05:01Z","temp_c":28,"temp_f":82.4}
{"device_id":"edge-sensor-01","humidity":72,"processed_at":"2024-12-26T10:05:02Z","temp_c":19,"temp_f":66.2}
Understanding the Pipeline
[Generate Sensor Data] → [Transform: Add temp_f] → [Print to Terminal]
(input) (processor) (output)
Key concepts:
pipeline.processors- An array of transformations applied in orderroot = this- Keep all existing fieldsroot.temp_f = ...- Add a new calculated field- Bloblang math - Standard operators work:
*,+,/,-