Inputs
  • Updated on 20 Oct 2019
  • 11 minutes to read
  • Contributors
  • Print
  • Share
  • Dark
    Light

Inputs

  • Print
  • Share
  • Dark
    Light

Inputs

An input is a source of data piped through an array of optional processors. Only one input is configured at the root of a pipeline config. However, the root input can be a broker which combines multiple inputs.

An input config section looks like this:

{
    "input": {
        "c8db": {
            "local": true,
            "name": "transformCollection1"
        }
    },
 }

Contents

c8db

{
    "type":"c8db",
    "c8db":{
        "name":"name_of_the_collection",
        "local":true/false
    }
}

c8db is a collection of json documents. By using this input type, every document saved into the given c8db will be processed and forwarded to the output.

Following is the list of attributes of c8db input type:

  • name: indicates name of the c8db collection. If does not exists, it will be created automatically.
  • local: indicates wether c8db collection is created locally or gloabally. If local if true then c8db collection is created locally. If local if false then c8db collection is created gloabally.
{
    "type":"c8db",
    "c8db":{
        "name": "pipeline-c8db-input",
        "local": true
    }
}

c8streams

{
    "type":"c8streams",
    "c8streams":{
        "name":"pipeline-c8streams-input",
        "local":true/false
    }
}

c8stream is a stream of events. By using this input type, every event coming on to the given c8stream will be processed and forwarded to the output.

Following is the list of attributes of c8streams input type:

  • name: indicates name of the c8stream. If does not exists, it will be created automatically.
  • local: indicates wether c8stream is created locally or gloabally. If local if true then c8stream is created locally. If local if false then c8stream is created gloabally.
{
    "type":"c8streams",
    "c8streams":{
        "name": "c8pipeline-c8stream-input",
        "local": true
    }
}

c8restql

{
    "type":"c8restql",
    "c8restql":{
        "name":"name_of_stored_query",
        "user":"name_of_the_user",
        "frequency":30,
        "timeout":5,
        "bindVars":{}
    }
}

c8restql is a special input type. Using c8restql input type we can periodically run a saved c8 query. Output of the c8 query will be forwarded further to processors and output.

Following is the list of attributes of c8restql input type:

  • name: - indicates name of the query.
  • user: name of the user, to whome query belongs
  • frequency: time interval (in seconds) after which query should execute periodically. Defaults to 30 seconds
  • timeout: time interval (in seconds) WaitForClose blocks until the output has closed down. Defaults to 5 seconds.
  • bindVars: inputs to c8 query (if any)
{
    "type":"c8restql",
    "c8restql":{
        "name":"pipeline-c8restql-input",
        "user":"user1",
        "frequency":10,
        "timeout":10,
        "bindVars":{
            "@collection": "collection1",
            "value": "1000"
        }
    }
}

broker

{
	"type": "broker",
	"broker": {
		"copies": 1,
		"inputs": []
	}
}

The broker type allows you to combine multiple inputs, where each input will be read in parallel. A broker type is configured with its own list of input configurations and a field to specify how many copies of the list of inputs should be created.

Adding more input types allows you to merge streams from multiple sources into one. For example, reading from both RabbitMQ and Kafka:

{
	"input": {
		"broker": {
			"copies": 1,
			"inputs": [
				{
					"amqp": {
						"url": "amqp://guest:guest@localhost:5672/",
						"consumer_tag": "c8pipelines-consumer",
						"queue": "c8pipelines-queue"
					}
				},
				{
					"kafka": {
						"addresses": [
							"localhost:9092"
						],
						"client_id": "c8pipelines_kafka_input",
						"consumer_group": "c8pipelines_consumer_group",
						"partition": 0,
						"topic": "c8pipelines_stream"
					}
				}
			]
		}
	}
}

If the number of copies is greater than zero the list will be copied that number of times. For example, if your inputs were of type foo and bar, with 'copies' set to '2', you would end up with two 'foo' inputs and two 'bar' inputs.

Processors

It is possible to configure processors at the broker level, where they will be applied to all child inputs, as well as on the individual child inputs. If you have processors at both the broker level _and_on child inputs then the broker processors will be applied after the child nodes processors.

http_client

{
	"type": "http_client",
	"http_client": {
		"backoff_on": [
			429
		],
		"basic_auth": {
			"enabled": false,
			"password": "",
			"username": ""
		},
		"copy_response_headers": false,
		"drop_on": [],
		"headers": {
			"Content-Type": "application/octet-stream"
		},
		"max_retry_backoff": "300s",
		"oauth": {
			"access_token": "",
			"access_token_secret": "",
			"consumer_key": "",
			"consumer_secret": "",
			"enabled": false,
			"request_url": ""
		},
		"payload": "",
		"rate_limit": "",
		"retries": 3,
		"retry_period": "1s",
		"stream": {
			"delimiter": "",
			"enabled": false,
			"max_buffer": 1000000,
			"multipart": false,
			"reconnect": true
		},
		"timeout": "5s",
		"tls": {
			"client_certs": [],
			"enabled": false,
			"root_cas_file": "",
			"skip_cert_verify": false
		},
		"url": "http://localhost:4195/get",
		"verb": "GET"
	}
}

The HTTP client input type connects to a server and continuously performs requests for a single message.

You should set a sensible retry period and max backoff so as to not flood your target server.

The URL and header values of this type can be dynamically set using function interpolations described here.

Streaming

If you enable streaming then Benthos will consume the body of the response as a line delimited list of message parts. Each part is read as an individual message unless multipart is set to true, in which case an empty line indicates the end of a message.

tcp

{
	"type": "tcp",
	"tcp": {
		"address": "localhost:4194",
		"delimiter": "",
		"max_buffer": 1000000,
		"multipart": false
	}
}

Connects to a TCP server and consumes a continuous stream of messages.

If multipart is set to false each line of data is read as a separate message. If multipart is set to true each line is read as a message part, and an empty line indicates the end of a message.

If the delimiter field is left empty then line feed (\n) is used.

websocket

{
	"type": "websocket",
	"websocket": {
		"basic_auth": {
			"enabled": false,
			"password": "",
			"username": ""
		},
		"oauth": {
			"access_token": "",
			"access_token_secret": "",
			"consumer_key": "",
			"consumer_secret": "",
			"enabled": false,
			"request_url": ""
		},
		"open_message": "",
		"url": "ws://localhost:4195/get/ws"
	}
}

Connects to a websocket server and continuously receives messages.

It is possible to configure an open_message, which when set to a non-empty string will be sent to the websocket server each time a connection is first established.

mqtt

{
	"type": "mqtt",
	"mqtt": {
		"clean_session": true,
		"client_id": "c8pipelines_input",
		"password": "",
		"qos": 1,
		"topics": [
			"c8pipelines_topic"
		],
		"urls": [
			"tcp://localhost:1883"
		],
		"user": ""
	}
}

Subscribe to topics on MQTT brokers.

Metadata

This input adds the following metadata fields to each message:

- mqtt_duplicate
- mqtt_qos
- mqtt_retained
- mqtt_topic
- mqtt_message_id

You can access these metadata fields using function interpolation.

amqp

{
	"type": "amqp",
	"amqp": {
		"bindings_declare": [],
		"consumer_tag": "c8-consumer",
		"max_batch_count": 1,
		"prefetch_count": 10,
		"prefetch_size": 0,
		"queue": "c8-queue",
		"queue_declare": {
			"durable": true,
			"enabled": false
		},
		"tls": {
			"client_certs": [],
			"enabled": false,
			"root_cas_file": "",
			"skip_cert_verify": false
		},
		"url": "amqp://guest:guest@localhost:5672/"
	}
}

Connects to an AMQP (0.91) queue. AMQP is a messaging protocol used by various message brokers, including RabbitMQ.

The field max_batch_count specifies the maximum number of prefetched messages to be batched together. When more than one message is batched they can be split into individual messages with the split processor.

It's possible for this input type to declare the target queue by setting queue_declare.enabled to true, if the queue already exists then the declaration passively verifies that they match the target fields.

Similarly, it is possible to declare queue bindings by adding objects to the bindings_declare array. Binding declare objects take the form of:

{
  "exchange": "c8-exchange",
  "key": "c8-key"
}

TLS is automatic when connecting to an amqps URL, but custom
settings can be enabled in the tls section.

Metadata

This input adds the following metadata fields to each message:

- amqp_content_type
- amqp_content_encoding
- amqp_delivery_mode
- amqp_priority
- amqp_correlation_id
- amqp_reply_to
- amqp_expiration
- amqp_message_id
- amqp_timestamp
- amqp_type
- amqp_user_id
- amqp_app_id
- amqp_consumer_tag
- amqp_delivery_tag
- amqp_redelivered
- amqp_exchange
- amqp_routing_key
- All existing message headers, including nested headers prefixed with the key
  of their respective parent.

You can access these metadata fields using
function interpolation.

kafka

{
	"type": "kafka",
	"kafka": {
		"addresses": [
			"localhost:9092"
		],
		"client_id": "c8pipelines_kafka_input",
		"commit_period": "1s",
		"consumer_group": "c8pipelines_consumer_group",
		"fetch_buffer_cap": 256,
		"max_batch_count": 1,
		"max_processing_period": "100ms",
		"partition": 0,
		"sasl": {
			"enabled": false,
			"password": "",
			"user": ""
		},
		"start_from_oldest": true,
		"target_version": "1.0.0",
		"tls": {
			"client_certs": [],
			"enabled": false,
			"root_cas_file": "",
			"skip_cert_verify": false
		},
		"topic": "c8pipelines_stream"
	}
}

Connects to a kafka (0.8+) server. Offsets are managed within kafka as per the consumer group (set via config). Only one partition per input is supported, if you wish to balance partitions across a consumer group look at the kafka_balanced input type instead.

The field max_batch_count specifies the maximum number of prefetched messages to be batched together. When more than one message is batched they can be split into individual messages with the split processor.

The field max_processing_period should be set above the maximum estimated time taken to process a message.

The target version by default will be the oldest supported, as it is expected that the server will be backwards compatible. In order to support newer client features you should increase this version up to the known version of the target server.

TLS

Custom TLS settings can be used to override system defaults. This includes providing a collection of root certificate authorities, providing a list of client certificates to use for client verification and skipping certificate verification.

Client certificates can either be added by file or by raw contents:

{
	"enabled": true,
	"client_certs": [
		{
			"cert_file": "./example.pem",
			"key_file": "./example.key"
		},
		{
			"cert": "foo",
			"key": "bar"
		}
	]
}

Metadata

This input adds the following metadata fields to each message:

- kafka_key
- kafka_topic
- kafka_partition
- kafka_offset
- kafka_lag
- kafka_timestamp_unix
- All existing message headers (version 0.11+)

The field kafka_lag is the calculated difference between the high water mark offset of the partition at the time of ingestion and the current message offset.

You can access these metadata fields using
function interpolation.

gcp_pubsub

{
	"type": "gcp_pubsub",
	"gcp_pubsub": {
		"max_batch_count": 1,
		"max_outstanding_bytes": 1000000000,
		"max_outstanding_messages": 1000,
		"project": "",
		"subscription": ""
	}
}

Consumes messages from a GCP Cloud Pub/Sub subscription.

The field max_batch_count specifies the maximum number of prefetched messages to be batched together.

Metadata

This input adds the following metadata fields to each message:

- gcp_pubsub_publish_time_unix
- All message attributes

You can access these metadata fields using
function interpolation.

kinesis

{
	"type": "kinesis",
	"kinesis": {
		"client_id": "c8pipelines_consumer",
		"commit_period": "1s",
		"credentials": {
			"id": "",
			"profile": "",
			"role": "",
			"role_external_id": "",
			"secret": "",
			"token": ""
		},
		"dynamodb_table": "",
		"endpoint": "",
		"limit": 100,
		"region": "eu-west-1",
		"shard": "0",
		"start_from_oldest": true,
		"stream": "",
		"timeout": "5s"
	}
}

Receive messages from a Kinesis stream.

It's possible to use DynamoDB for persisting shard iterators by setting the table name. Offsets will then be tracked per client_id per shard_id. When using this mode you should create a table with namespace as the primary key and shard_id as a sort key.

S3

{
	"type": "s3",
	"s3": {
		"bucket": "",
		"credentials": {
			"id": "",
			"profile": "",
			"role": "",
			"role_external_id": "",
			"secret": "",
			"token": ""
		},
		"delete_objects": false,
		"download_manager": {
			"enabled": true
		},
		"endpoint": "",
		"force_path_style_urls": false,
		"max_batch_count": 1,
		"prefix": "",
		"region": "eu-west-1",
		"retries": 3,
		"sqs_body_path": "Records.*.s3.object.key",
		"sqs_bucket_path": "",
		"sqs_endpoint": "",
		"sqs_envelope_path": "",
		"sqs_max_messages": 10,
		"sqs_url": "",
		"timeout": "5s"
	}
}

Downloads objects in an Amazon S3 bucket, optionally filtered by a prefix. If an SQS queue has been configured then only object keys read from the queue will be downloaded. Otherwise, the entire list of objects found when this input is created will be downloaded. Note that the prefix configuration is only used when downloading objects without SQS configured.

If the download manager is enabled this can help speed up file downloads but results in file metadata not being copied.

If your bucket is configured to send events directly to an SQS queue then you need to set the sqs_body_path field to a dot path where the object key is found in the payload. However, it is also common practice to send bucket events to an SNS topic which
sends enveloped events to SQS, in which case you must also set the sqs_envelope_path field to where the payload can be found.

When using SQS events it's also possible to extract target bucket names from the events by specifying a path in the field sqs_bucket_path. For each SQS event, if that path exists and contains a string it will used as the bucket of the download instead of the bucket field.

Here is a guide for setting up an SQS queue that receives events for new S3 bucket objects:

https://docs.aws.amazon.com/AmazonS3/latest/dev/ways-to-add-notification-config-to bucket.html

WARNING: When using SQS please make sure you have sensible values for sqs_max_messages and also the visibility timeout of the queue itself.

When C8Pipelines consumes an S3 item as a result of receiving an SQS message the message is not deleted until the S3 item has been sent onwards. This ensures at-least-once crash resiliency, but also means that if the S3 item takes longer to process than the visibility timeout of your queue then the same items might be processed multiple times.

Metadata

This input adds the following metadata fields to each message:

- s3_key
- s3_bucket
- s3_last_modified_unix*
- s3_last_modified (RFC3339)*
- s3_content_type*
- s3_content_encoding*
- All user defined metadata*

* Only added when NOT using download manager

You can access these metadata fields using
function interpolation.

sqs

{
	"type": "sqs",
	"sqs": {
		"credentials": {
			"id": "",
			"profile": "",
			"role": "",
			"role_external_id": "",
			"secret": "",
			"token": ""
		},
		"delete_message": true,
		"endpoint": "",
		"max_number_of_messages": 1,
		"region": "eu-west-1",
		"timeout": "5s",
		"url": ""
	}
}

Receive messages from an Amazon SQS URL, only the body is extracted into messages.

Metadata

This input adds the following metadata fields to each message:

- sqs_message_id
- sqs_receipt_handle
- sqs_approximate_receive_count
- All message attributes

You can access these metadata fields using
config interpolation.

Was this article helpful?