Error Handling
  • Updated on 21 Oct 2019
  • 2 minutes to read
  • Contributors
  • Print
  • Share
  • Dark
    Light

Error Handling

  • Print
  • Share
  • Dark
    Light

Processor Errors

Sometimes things can go wrong. C8Pipelines supports a range of [processors][processors] such as http and lambda that have the potential to fail if their retry attempts are exhausted. When this happens the data is not dropped but instead continues through the pipeline mostly unchanged. The content remains the same but a metadata flag is added to the message that can be referred to later in the pipeline using the [processor_failed][processor_failed] condition.

This behaviour allows you to define in your config whether you would like the failed messages to be dropped, recovered with more processing, or routed to a dead-letter queue, or any combination thereof.

Abandon on Failure

It's possible to define a list of processors which should be skipped for messages that failed a previous stage using the [try][try] processor:

  [
	{
		"type": "try",
		"try": [
			{
				"type": "foo"
			},
			{
				"type": "bar"
			},
			{
				"type": "baz"
			}
		]
	}
]

Recover Failed Messages

Failed messages can be fed into their own processor steps with a
[catch][catch] processor:

  [
	{
		"catch": [
			{
				"type": "foo"
			}
		]
	}
]

Once messages finish the catch block they will have their failure flags removed and are treated like regular messages. If this behaviour is not desired then it is possible to simulate a catch block with a [conditional][conditional] processor placed within a [for_each][for_each] processor:

  [
	{
		"for_each": [
			{
				"conditional": {
					"condition": {
						"type": "processor_failed"
					},
					"processors": [
						{
							"type": "foo"
						}
					]
				}
			}
		]
	}
]

Logging Errors

When an error occurs there will occasionally be useful information stored within the error flag that can be exposed with the interpolation function error. This allows you to expose the information with processors.

For example, you can augment the message payload with the error message:

  [
	{
		"catch": [
			{
				"json": {
					"operator": "set",
					"path": "meta.error",
					"value": "${!error}"
				}
			}
		]
	}
]

Attempt Until Success

It's possible to reattempt a processor for a particular message until it is successful with a [while][while] processor:

  [
	{
		"for_each": [
			{
				"while": {
					"at_least_once": true,
					"max_loops": 0,
					"condition": {
						"type": "processor_failed"
					},
					"processors": [
						{
							"type": "catch"
						},
						{
							"type": "foo"
						}
					]
				}
			}
		]
	}
]

This loop will block the pipeline and prevent the blocking message from being acknowledged. It is therefore usually a good idea in practice to build your condition with an exit strategy after N failed attempts so that the pipeline can unblock itself without intervention.

Drop Failed Messages

In order to filter out any failed messages from your pipeline you can simply use a [filter_parts][filter_parts] processor:

  [
	{
		"filter_parts": {
			"not": {
				"type": "processor_failed"
			}
		}
	}
]

This will remove any failed messages from a batch.

Route to a Dead-Letter Queue

It is possible to send failed messages to different destinations using a [broker][broker] output with [filter_parts][filter_parts] processors.

{
	"output": {
		"broker": {
			"pattern": "fan_out",
			"outputs": [
				{
					"type": "foo",
					"processors": [
						{
							"filter_parts": {
								"type": "processor_failed"
							}
						}
					]
				},
				{
					"type": "bar",
					"processors": [
						{
							"filter_parts": {
								"not": {
									"type": "processor_failed"
								}
							}
						}
					]
				}
			]
		}
	}
}
Was this article helpful?