Skip to main content

Inputs

An input is a source of data piped through an array of optional processors:

input:
label: my_redis_input

redis_streams:
url: tcp://localhost:6379
streams:
- expanso_stream
body_key: body
consumer_group: expanso_group

# Optional list of processing steps
processors:
- mapping: |
root.document = this.without("links")
root.link_count = this.links.length()

Some inputs have a logical end, for example a csv input ends once the last row is consumed, when this happens the input gracefully terminates and Expanso Edge will shut itself down once all messages have been processed fully.

Brokering

Only one input is configured at the root of a pipeline config. However, the root input can be a broker which combines multiple inputs and merges the streams:

input:
broker:
inputs:
- kafka:
addresses: [ TODO ]
topics: [ foo, bar ]
consumer_group: foogroup

- redis_streams:
url: tcp://localhost:6379
streams:
- expanso_stream
body_key: body
consumer_group: expanso_group

Labels

Inputs have an optional field label that can uniquely identify them in observability data such as metrics and logs. This can be useful when running configs with multiple inputs, otherwise their metrics labels will be generated based on their composition.

Database Inputs

Query databases to read data into your pipeline:

  • sql_select: Query database tables (MySQL, PostgreSQL, SQLite, etc.)
  • sql_raw: Execute custom SQL queries
  • mongodb: Query MongoDB collections
  • cassandra: Query Cassandra tables

See the Database Connectivity Guide for complete examples including:

  • Reading from PostgreSQL and MySQL
  • Using SQLite for edge caching and analytics
  • Database-to-database replication
  • Cloud database authentication (AWS RDS, Azure)

Browse Inputs

Showing 60 of 200 components
amqp_0_9
Input
Connects to an AMQP (0.91) queue. AMQP is a messaging protocol used by various message brokers, including RabbitMQ.
ServicesMessaging
amqp_1
Input
Reads messages from an AMQP (1.0) server.
ServicesMessaging
aws_kinesis
Input
Receive messages from one or more Kinesis streams.
ServicesAWSCloud
aws_s3
Input
Downloads objects within an Amazon S3 bucket, optionally filtered by a prefix, either by walking the items in the bucket or by streaming upload notifi...
ServicesAWSCloud+1
aws_sqs
Input
Consume messages from an AWS SQS URL.
ServicesAWSCloud
azure_blob_storage
Input
Downloads objects within an Azure Blob Storage container, optionally filtered by a prefix.
ServicesAzureCloud+1
azure_cosmosdb
Input
Executes a SQL query against Azure CosmosDB and creates a batch of messages from each page of items.
AzureCloud
azure_queue_storage
Input
Dequeue objects from an Azure Storage Queue.
ServicesAzureCloud
azure_table_storage
Input
:::caution BETA
ServicesAzureCloud
batched
Input
Consumes data from a child input and applies a batching policy to the stream.
Utility
beanstalkd
Input
:::caution EXPERIMENTAL
Services
broker
Input
Allows you to combine multiple inputs into a single stream of data, where each input will be read in parallel.
Utility
cassandra
Input
:::caution EXPERIMENTAL
ServicesDatabaseNoSQL
cockroachdb_changefeed
Input
:::caution EXPERIMENTAL
Integration
csv
Input
Reads one or more CSV files as structured records following the format described in RFC 4180.
LocalParsingTransform
cypher
Input
:::caution EXPERIMENTAL
Services
discord
Input
:::caution EXPERIMENTAL
ServicesSocial
dynamic
Input
A special broker type where the inputs are identified by unique labels and can be created, changed and removed during runtime via a REST HTTP interfac...
Utility
etcd
Input
:::caution BETA
Services
file
Input
Consumes data from files on disk, emitting messages according to a chosen codec.
LocalFiles
gcp_bigquery_select
Input
Executes a `SELECT` query against BigQuery and creates a message for each row received.
ServicesGCPCloud
gcp_cloud_storage
Input
Downloads objects within a Google Cloud Storage bucket, optionally filtered by a prefix.
ServicesGCPCloud
gcp_pubsub
Input
Consumes messages from a GCP Cloud Pub/Sub subscription.
ServicesGCPCloud
gcp_spanner_cdc
Input
:::caution BETA
ServicesGCPCloud
generate
Input
Generates messages at a given interval using a Bloblang mapping executed without a context. This allows you to generate messages for testing your pipe...
Utility
hdfs
Input
Reads files from a HDFS directory, where each discrete file will be consumed as a single message payload.
Services
http_client
Input
Connects to a server and continuously performs requests for a single message.
NetworkHTTP
http_server
Input
Receive messages POSTed over HTTP(S). HTTP 2.0 is supported when using TLS, which is enabled when key and cert files are specified.
NetworkHTTP
inproc
Input
input:
Utility
kafka
Input
Connects to Kafka brokers and consumes one or more topics.
ServicesMessagingStreaming
kafka_franz
Input
A Kafka input using the Franz Kafka client library.
ServicesMessagingStreaming
mongodb
Input
Executes a query and creates a message for each document received.
ServicesDatabaseNoSQL
mqtt
Input
Subscribe to topics on MQTT brokers.
ServicesMessagingIoT
nanomsg
Input
Consumes messages via Nanomsg sockets (scalability protocols).
Network
nats
Input
Subscribe to a NATS subject.
ServicesMessagingStreaming
nats_jetstream
Input
Reads messages from NATS JetStream subjects.
ServicesMessagingStreaming
nats_kv
Input
Watches for updates in a NATS key-value bucket.
ServicesMessagingStreaming
nats_object_store
Input
:::caution EXPERIMENTAL
ServicesMessagingStreaming
nsq
Input
Subscribe to an NSQ instance topic and channel.
Services
parquet
Input
:::caution EXPERIMENTAL
Local
pulsar
Input
Reads messages from an Apache Pulsar server.
ServicesMessagingStreaming
read_until
Input
Reads messages from a child input until a consumed message passes a Bloblang query, at which point the input closes. It is also possible to configure ...
Utility
redis_list
Input
Pops messages from the beginning of a Redis list using the BLPop command.
ServicesMessagingCaching
redis_pubsub
Input
Consume from a Redis publish/subscribe channel using either the SUBSCRIBE or PSUBSCRIBE commands.
ServicesMessagingCaching
redis_scan
Input
Scans the set of keys in the current selected database and gets their values, using the Scan and Get commands.
ServicesMessagingCaching
redis_streams
Input
Pulls messages from Redis (v5.0+) streams with the XREADGROUP command. The `client_id` should be unique for each consumer of a group.
ServicesMessagingCaching
resource
Input
Resource is an input type that channels messages from a resource input, identified by its name.
Utility
s2
Input
:::caution BETA
Services
sequence
Input
Reads messages from a sequence of child inputs, starting with the first and once that input gracefully terminates starts consuming from the next, and ...
Utility
sftp
Input
:::caution BETA
NetworkFiles
socket
Input
Connects to a tcp or unix socket and consumes a continuous stream of messages.
Network
socket_server
Input
Creates a server that receives a stream of messages over a tcp, udp or unix socket.
Network
sql_raw
Input
Executes a select query and creates a message for each row received.
Services
sql_select
Input
Executes a select query and creates a message for each row received.
Services
stdin
Input
Consumes data piped to stdin, chopping it into individual messages according to the specified scanner.
Local
subprocess
Input
Executes a command, runs it as a subprocess, and consumes messages from it over stdout.
Utility
twitter_search
Input
:::caution EXPERIMENTAL
ServicesSocial
websocket
Input
Connects to a websocket server and continuously receives messages.
NetworkStreaming
zmq4
Input
Consumes messages from a ZeroMQ socket.
Network
zmq4n
Input
Consumes messages from a ZeroMQ socket.
Network