Outputs
  • Updated on 20 Oct 2019
  • 17 minutes to read
  • Contributors
  • Print
  • Share
  • Dark
    Light

Outputs

  • Print
  • Share
  • Dark
    Light

Outputs

An output is a sink where we wish to send our consumed data after applying an optional array of processors. Only one output is configured at the root of a C8Pipeline config. However, the output can be a broker which combines multiple outputs under a chosen brokering pattern.

An output config section looks like this:

{
	"output": {
		"type": "foo",
		"foo": {
			"bar": "baz"
		},
		"processors": [
			{
				"type": "qux"
			}
		]
	}
}

Back Pressure

C8Pipeline outputs apply back pressure to components upstream. This means if your output target starts blocking traffic C8Pipelines will gracefully stop consuming until the issue is resolved.

Retries

When a C8Pipeline output fails to send a message the error is propagated back up to the input, where depending on the protocol it will either be pushed back to the source as a Noack (AMQP) or will be reattempted indefinitely with the commit withheld until success (c8streams).

It's possible to instead have c8pipeline indefinitely retry an output until success with a retry output. Some other outputs, such as the broker, might also retry indefinitely depending on their
configuration.

Multiplexing Outputs

It is possible to perform content based multiplexing of messages to specific outputs either by using the switch output, or a broker with the fan_out pattern and a filter processor on each output, which is a processor that drops messages if the condition does not pass. Conditions are content aware logical operators that can be combined using boolean logic.

For more information regarding conditions, including a full list of available conditions please read the docs here.

Dead Letter Queues

It's possible to create fallback outputs for when an output target fails using a broker output with the 'try' pattern.

Contents

  1. c8db
  2. c8streams
  3. c8restql
  4. amqp
  5. broker
  6. cache
  7. drop
  8. drop_on_error
  9. dynamodb
  10. elasticsearch
  11. gcp_pubsub
  12. hdfs
  13. http_client
  14. kafka
  15. kinesis
  16. mqtt
  17. retry
  18. s3
  19. sns
  20. sqs
  21. switch
  22. tcp
  23. udp
  24. websocket

c8db

{
    "type":"c8db",
    "c8db":{
        "name":"name_of_the_collection",
        "local":true/false
    }
}

c8db is a collection of json documents. By using this output type, every document coming from processors will be saved to the given c8db collection.

Following is the list of attributes of c8db output type:

  • name: indicates name of the c8db collection. If does not exists, it will be created automatically.
  • local: indicates wether c8db collection is created locally or gloabally. If local if true then c8db collection is created locally. If local if false then c8db collection is created gloabally.
{
    "type":"c8db",
    "c8db":{
        "name": "pipeline-c8db-output",
        "local": true
    }
}

c8streams

{
    "type":"c8streams",
    "c8streams":{
        "name":"name_of_the_stream",
        "local":true/false
    }
}

c8stream is a stream of events. By using this output type, every document coming from processors will be sent to the given c8stream output.

Following is the list of attributes of c8streams output type:

  • name: indicates name of the c8stream. If does not exists, it will be created automatically.
  • local: indicates wether c8stream is created locally or gloabally. If local if true then c8stream is created locally. If local if false then c8stream is created gloabally.
{
    "type":"c8streams",
    "c8streams":{
        "name": "c8pipeline-c8stream-output",
        "local": true
    }
}

c8restql

{
    "type":"c8restql",
    "c8restql":{
        "name":"name_of_stored_query",
        "user":"name_of_the_user",
        "bindVars":{}
    }
}

c8restql is a special output type. Using c8restql output type we can run a saved c8 query for each incoming message from the processors. Output of the c8 query will be logged.

Following is the list of attributes of c8restql output type:

  • name: - indicates name of the query.
  • user: name of the user, to whome query belongs
  • bindVars: default input to c8 query if incoming message is empty or invalid json.
{
    "type":"c8restql",
    "c8restql":{
        "name":"pipeline-c8restql-output",
        "user":"user1",
        "bindVars":{
            "@collection": "collection1",
            "value": "1000"
        }
    }
}

amqp

{
	"type": "amqp",
	"amqp": {
		"exchange": "c8-exchange",
		"exchange_declare": {
			"durable": true,
			"enabled": false,
			"type": "direct"
		},
		"immediate": false,
		"key": "c8-key",
		"mandatory": false,
		"persistent": false,
		"tls": {
			"client_certs": [],
			"enabled": false,
			"root_cas_file": "",
			"skip_cert_verify": false
		},
		"url": "amqp://guest:guest@localhost:5672/"
	}
}

Sends messages to an AMQP (0.91) exchange. AMQP is a messaging protocol used by various message brokers, including RabbitMQ. The metadata from each message are delivered as headers.

It's possible for this output type to create the target exchange by setting exchange_declare.enabled to true, if the exchange already exists then the declaration passively verifies that the settings match.

Exchange type options are: direct|fanout|topic|x-custom

TLS is automatic when connecting to an amqps URL, but custom settings can be enabled in the tls section.

The field key can be dynamically set using function interpolations described here.

broker

{
	"type": "broker",
	"broker": {
		"copies": 1,
		"outputs": [],
		"pattern": "fan_out"
	}
}

The broker output type allows you to configure multiple output targets by listing them:

{
	"output": {
		"broker": {
			"pattern": "fan_out",
			"outputs": [
				{
					"foo": {
						"foo_field_1": "value1"
					}
				},
				{
					"bar": {
						"bar_field_1": "value2",
						"bar_field_2": "value3"
					}
				},
				{
					"baz": {
						"baz_field_1": "value4"
					},
					"processors": [
						{
							"type": "baz_processor"
						}
					]
				}
			]
		},
		"processors": [
			{
				"type": "some_processor"
			}
		]
	}
}

The broker pattern determines the way in which messages are allocated to outputs and can be chosen from the following:

fan_out

With the fan out pattern all outputs will be sent every message that passes through c8pipelines in parallel.

If an output applies back pressure it will block all subsequent messages, and if an output fails to send a message it will be retried continuously until completion or service shut down.

fan_out_sequential

Similar to the fan out pattern except outputs are written to sequentially, meaning an output is only written to once the preceding output has confirmed receipt of the same message.

round_robin

With the round robin pattern each message will be assigned a single output following their order. If an output applies back pressure it will block all subsequent messages. If an output fails to send a message then the message will be re-attempted with the next input, and so on.

greedy

The greedy pattern results in higher output throughput at the cost of potentially disproportionate message allocations to those outputs. Each message is sent to a single output, which is determined by allowing outputs to claim messages as soon as they are able to process them. This results in certain faster outputs potentially processing more messages at the cost of slower outputs.

try

The try pattern attempts to send each message to only one output, starting from the first output on the list. If an output attempt fails then the broker attempts to send to the next output in the list and so on.

This pattern is useful for triggering events in the case where certain output targets have broken. For example, if you had an output type http_client but wished to reroute messages whenever the endpoint becomes unreachable you could use a try broker.

Utilising More Outputs

When using brokered outputs with patterns such as round robin or greedy it is possible to have multiple messages in-flight at the same time. In order to fully utilise this you need to have a greater number of input sources than output sources.

Processors

It is possible to configure processors at the broker level, where they will be applied to all child outputs, as well as on the individual child outputs. If you have processors at both the broker level and on child outputs then the broker processors will be applied before the child nodes processors.

cache*

{
	"type": "cache",
	"cache": {
		"key": "${!count:items}-${!timestamp_unix_nano}",
		"target": ""
	}
}

Stores message fields as items in a cache.

Like follows:

{
	"output": {
		"cache": {
			"target": "foo",
			"key": "${!json_field:document.id}"
		}
	}
}

In order to create a unique key value per item you should use function interpolations described here.

drop

{
	"type": "drop",
	"drop": {}
}

Drops all messages.

drop_on_error

{
	"type": "drop_on_error",
	"drop_on_error": {}
}

Attempts to write messages to a child output and if the write fails for any reason the message is dropped instead of being reattempted. This output can be combined with a child retry output in order to set an explicit number of retry attempts before dropping a message.

For example, the following configuration attempts to send to a hypothetical output type foo three times, but if all three attempts fail the message is dropped entirely:

{
	"output": {
		"drop_on_error": {
			"retry": {
				"max_retries": 2,
				"output": {
					"type": "foo"
				}
			}
		}
	}
}

dynamodb

{
	"type": "dynamodb",
	"dynamodb": {
		"backoff": {
			"initial_interval": "1s",
			"max_elapsed_time": "30s",
			"max_interval": "5s"
		},
		"credentials": {
			"id": "",
			"profile": "",
			"role": "",
			"role_external_id": "",
			"secret": "",
			"token": ""
		},
		"endpoint": "",
		"json_map_columns": {},
		"max_retries": 3,
		"region": "eu-west-1",
		"string_columns": {},
		"table": "",
		"ttl": "",
		"ttl_key": ""
	}
}

Inserts items into a DynamoDB table.

The field string_columns is a map of column names to string values, where the values are
function interpolated per message of a batch. This allows you to populate string columns of an item by extracting fields within the document payload or metadata like follows:

{
	"string_columns": {
		"id": "${!json_field:id}",
		"title": "${!json_field:body.title}",
		"topic": "${!metadata:kafka_topic}",
		"full_content": "${!content}"
	}
}

The field json_map_columns is a map of column names to json paths, where the dot path is extracted from each document and converted into a map value. Both an empty path and the path . are interpreted as the root of the document. This allows you to populate map columns of an item like follows:

{
	"json_map_columns": {
		"user": "path.to.user",
		"whole_document": "."
	}
}

A column name can be empty:

{
	"json_map_columns": {
		"": "."
	}
}

In which case the top level document fields will be written at the root of the item, potentially overwriting previously defined column values. If a path is not found within a document the column will not be populated.

Credentials

Set them explicitly at the component level, allowing you to transfer data across accounts. You can find out more in this document.

elasticsearch

{
	"type": "elasticsearch",
	"elasticsearch": {
		"aws": {
			"credentials": {
				"id": "",
				"profile": "",
				"role": "",
				"role_external_id": "",
				"secret": "",
				"token": ""
			},
			"enabled": false,
			"endpoint": "",
			"region": "eu-west-1"
		},
		"backoff": {
			"initial_interval": "1s",
			"max_elapsed_time": "30s",
			"max_interval": "5s"
		},
		"basic_auth": {
			"enabled": false,
			"password": "",
			"username": ""
		},
		"id": "${!count:elastic_ids}-${!timestamp_unix}",
		"index": "benthos_index",
		"max_retries": 0,
		"pipeline": "",
		"sniff": true,
		"timeout": "5s",
		"type": "doc",
		"urls": [
			"http://localhost:9200"
		]
	}
}

Publishes messages into an Elasticsearch index. If the index does not exist then it is created with a dynamic mapping.

Both the id and index fields can be dynamically set using function interpolations described here.

AWS Credentials

Set them explicitly at the component level, allowing you to transfer data across accounts. You can find out more in this document.

gcp_pubsub

{
	"type": "gcp_pubsub",
	"gcp_pubsub": {
		"project": "",
		"topic": ""
	}
}

Sends messages to a GCP Cloud Pub/Sub topic. Metadata from messages are sent as attributes.

hdfs

{
	"type": "hdfs",
	"hdfs": {
		"directory": "",
		"hosts": [
			"localhost:9000"
		],
		"path": "${!count:files}-${!timestamp_unix_nano}.txt",
		"user": "benthos_hdfs"
	}
}

Sends message parts as files to a HDFS directory. Each file is written with the path specified with the path field, in order to have a different path for each object you should use function interpolations described here.

http_client

{
	"type": "http_client",
	"http_client": {
		"backoff_on": [
			429
		],
		"basic_auth": {
			"enabled": false,
			"password": "",
			"username": ""
		},
		"copy_response_headers": false,
		"drop_on": [],
		"headers": {
			"Content-Type": "application/octet-stream"
		},
		"max_retry_backoff": "300s",
		"oauth": {
			"access_token": "",
			"access_token_secret": "",
			"consumer_key": "",
			"consumer_secret": "",
			"enabled": false,
			"request_url": ""
		},
		"propagate_response": false,
		"rate_limit": "",
		"retries": 3,
		"retry_period": "1s",
		"timeout": "5s",
		"tls": {
			"client_certs": [],
			"enabled": false,
			"root_cas_file": "",
			"skip_cert_verify": false
		},
		"url": "http://localhost:4195/post",
		"verb": "POST"
	}
}

Sends messages to an HTTP server. The request will be retried for each message whenever the response code is outside the range of 200 -> 299 inclusive. It is possible to list codes outside of this range in the drop_on field in order to prevent retry attempts.

The period of time between retries is linear by default. Response codes that are within the backoff_on list will instead apply exponential backoff between retry attempts.

When the number of retries expires the output will reject the message, the behaviour after this will depend on the pipeline but usually this simply means the send is attempted again until successful whilst applying back pressure.

The URL and header values of this type can be dynamically set using function interpolations described here.

The body of the HTTP request is the raw contents of the message payload. If the message has multiple parts the request will be sent according to RFC1341

Propagating Responses

It's possible to propagate the response from each HTTP request back to the input source by setting propagate_response to true. Only inputs that support synchronous responses are able to make use of these propagated responses.

kafka

{
	"type": "kafka",
	"kafka": {
		"ack_replicas": false,
		"addresses": [
			"localhost:9092"
		],
		"backoff": {
			"initial_interval": "0s",
			"max_elapsed_time": "5s",
			"max_interval": "1s"
		},
		"client_id": "benthos_kafka_output",
		"compression": "none",
		"key": "",
		"max_msg_bytes": 1000000,
		"max_retries": 0,
		"round_robin_partitions": false,
		"sasl": {
			"enabled": false,
			"password": "",
			"user": ""
		},
		"target_version": "1.0.0",
		"timeout": "5s",
		"tls": {
			"client_certs": [],
			"enabled": false,
			"root_cas_file": "",
			"skip_cert_verify": false
		},
		"topic": "benthos_stream"
	}
}

The kafka output type writes messages to a kafka broker, these messages are acknowledged, which is propagated back to the input. The config field ack_replicas determines whether we wait for acknowledgement from all replicas or just a single broker.

It is possible to specify a compression codec to use out of the following options: none, snappy, lz4 and gzip.

If the field key is not empty then each message will be given its contents as a key.

Both the key and topic fields can be dynamically set using function interpolations described here.

By default the paritioner will select partitions based on a hash of the key value. If the key is empty then a partition is chosen at random. You can alternatively force the partitioner to round-robin partitions with the field round_robin_partitions.

kinesis

{
	"type": "kinesis",
	"kinesis": {
		"backoff": {
			"initial_interval": "1s",
			"max_elapsed_time": "30s",
			"max_interval": "5s"
		},
		"credentials": {
			"id": "",
			"profile": "",
			"role": "",
			"role_external_id": "",
			"secret": "",
			"token": ""
		},
		"endpoint": "",
		"hash_key": "",
		"max_retries": 0,
		"partition_key": "",
		"region": "eu-west-1",
		"stream": ""
	}
}

Sends messages to a Kinesis stream.

Both the partition_key(required) and hash_key (optional) fields can be dynamically set using function interpolations described here.

Credentials

Set them explicitly at the component level, allowing you to transfer data across accounts. You can find out more in this document.

mqtt

{
	"type": "mqtt",
	"mqtt": {
		"client_id": "benthos_output",
		"password": "",
		"qos": 1,
		"topic": "benthos_topic",
		"urls": [
			"tcp://localhost:1883"
		],
		"user": ""
	}
}

Pushes messages to an MQTT broker.

retry

{
	"type": "retry",
	"retry": {
		"backoff": {
			"initial_interval": "500ms",
			"max_elapsed_time": "0s",
			"max_interval": "3s"
		},
		"max_retries": 0,
		"output": {}
	}
}

Attempts to write messages to a child output and if the write fails for any reason the message is retried either until success or, if the retries or max elapsed time fields are non-zero, either is reached.

All messages in C8Pipeline are always retried on an output error, but this would usually involve propagating the error back to the source of the message, whereby it would be reprocessed before reaching the output layer once again.

This output type is useful whenever we wish to avoid reprocessing a message on the event of a failed send. We might, for example, have a dedupe processor that we want to avoid reapplying to the same message more than once in the pipeline.

Rather than retrying the same output you may wish to retry the send using a different output target (a dead letter queue). In which case you should instead use the broker output type with the pattern try.

S3

{
	"type": "s3",
	"s3": {
		"bucket": "",
		"content_encoding": "",
		"content_type": "application/octet-stream",
		"credentials": {
			"id": "",
			"profile": "",
			"role": "",
			"role_external_id": "",
			"secret": "",
			"token": ""
		},
		"endpoint": "",
		"force_path_style_urls": false,
		"path": "${!count:files}-${!timestamp_unix_nano}.txt",
		"region": "eu-west-1",
		"timeout": "5s"
	}
}

Sends message parts as objects to an Amazon S3 bucket. Each object is uploaded with the path specified with the path field.

In order to have a different path for each object you should use function interpolations described here, which are calculated per message of a batch.

The fields content_type and content_encoding can also be set dynamically using function interpolation.

Credentials

Set them explicitly at the component level, allowing you to transfer data across accounts. You can find out more in this document.

sns

{
	"type": "sns",
	"sns": {
		"credentials": {
			"id": "",
			"profile": "",
			"role": "",
			"role_external_id": "",
			"secret": "",
			"token": ""
		},
		"endpoint": "",
		"region": "eu-west-1",
		"timeout": "5s",
		"topic_arn": ""
	}
}

Sends messages to an AWS SNS topic.

Credentials

Set them explicitly at the component level, allowing you to transfer data across accounts. You can find out more in this document.

sqs

{
	"type": "sqs",
	"sqs": {
		"backoff": {
			"initial_interval": "1s",
			"max_elapsed_time": "30s",
			"max_interval": "5s"
		},
		"credentials": {
			"id": "",
			"profile": "",
			"role": "",
			"role_external_id": "",
			"secret": "",
			"token": ""
		},
		"endpoint": "",
		"max_retries": 0,
		"message_deduplication_id": "",
		"message_group_id": "",
		"region": "eu-west-1",
		"url": ""
	}
}

Sends messages to an SQS queue. Metadata values are sent along with the payload as attributes with the data type String. If the number of metadata values in a message exceeds the message attribute limit (10) then the top ten keys ordered alphabetically will be selected.

The fields message_group_id and message_deduplication_id can be set dynamically using
function interpolations.

Credentials

Set them explicitly at the component level, allowing you to transfer data across accounts. You can find out more in this document.

switch

{
	"type": "switch",
	"switch": {
		"outputs": [],
		"retry_until_success": true
	}
}

The switch output type allows you to configure multiple conditional output targets by listing child outputs paired with conditions. Conditional logic is currently applied per whole message batch. In order to multiplex per message of a batch use the broker output with the pattern fan_out.

In the following example, messages containing "foo" will be sent to both the foo and baz outputs. Messages containing "bar" will be sent to both the bar and baz outputs. Messages containing both "foo" and "bar" will be sent to all three outputs. And finally, messages that do not contain "foo" or "bar" will be sent to the baz output only.

{
	"output": {
		"switch": {
			"retry_until_success": true,
			"outputs": [
				{
					"output": {
						"foo": {
							"foo_field_1": "value1"
						}
					},
					"condition": {
						"text": {
							"operator": "contains",
							"arg": "foo"
						}
					},
					"fallthrough": true
				},
				{
					"output": {
						"bar": {
							"bar_field_1": "value2",
							"bar_field_2": "value3"
						}
					},
					"condition": {
						"text": {
							"operator": "contains",
							"arg": "bar"
						}
					},
					"fallthrough": true
				},
				{
					"output": {
						"baz": {
							"baz_field_1": "value4"
						},
						"processors": [
							{
								"type": "baz_processor"
							}
						]
					}
				}
			]
		},
		"processors": [
			{
				"type": "some_processor"
			}
		]
	}
}

The switch output requires a minimum of two outputs. If no condition is defined for an output, it behaves like a static true condition. If fallthrough is set to true, the switch output will continue evaluating additional outputs after finding a match.

Messages that do not match any outputs will be dropped. If an output applies back pressure it will block all subsequent messages.

If an output fails to send a message it will be retried continuously until completion or service shut down. You can change this behaviour so that when an output returns an error the switch output also returns an error by setting retry_until_success to false. This allows you to wrap the switch with a try broker, but care must be taken to ensure duplicate messages aren't introduced during error conditions.

tcp

{
	"type": "tcp",
	"tcp": {
		"address": "localhost:4194"
	}
}

Sends messages as a continuous stream of line delimited data over TCP by connecting to a server.

If batched messages are sent the final message of the batch will be followed by two line breaks in order to indicate the end of the batch.

udp

{
	"type": "udp",
	"udp": {
		"address": "localhost:4194"
	}
}

Sends messages as a continuous stream of line delimited data over UDP by connecting to a server.

websocket

{
	"type": "websocket",
	"websocket": {
		"basic_auth": {
			"enabled": false,
			"password": "",
			"username": ""
		},
		"oauth": {
			"access_token": "",
			"access_token_secret": "",
			"consumer_key": "",
			"consumer_secret": "",
			"enabled": false,
			"request_url": ""
		},
		"url": "ws://localhost:4195/post/ws"
	}
}

Sends messages to an HTTP server via a websocket connection.

Was this article helpful?