Processors
  • Updated on 21 Oct 2019
  • 20 minutes to read
  • Contributors
  • Print
  • Share
  • Dark
    Light

Processors

  • Print
  • Share
  • Dark
    Light

Processors

Pipelines processors are functions applied to messages passing through a pipeline. The function signature allows a processor to mutate or drop messages depending on the content of the message.

Processors are set via config, and depending on where in the config they are placed they will be run either immediately after a specific input (set in the input section), on all messages (set in the pipeline section) or before a specific output (set in the output section). Most processors apply to all messages and can be placed in the pipeline section:

{
    pipeline: 
    {
        processors: 
        {
          foo: {  "bar": "baz"  }
        }
    }
 }

By organising processors you can configure complex behaviours in your pipeline. You can [find some examples here][0].

Error Handling

Some processors have conditions whereby they might fail. C8Pipelines has mechanisms for detecting and recovering from these failures which can be read about here.

Contents

cache*

{
	"type": "cache",
	"cache": {
		"cache": "",
		"key": "",
		"operator": "set",
		"parts": [],
		"value": ""
	}
}

Performs operations against C8Cache for each message, allowing you to store or retrieve data within message payloads.

This processor will interpolate functions within the key and value fields individually for each message of the batch. This allows you to specify dynamic keys and values based on the contents of the message payloads and metadata. You can find a list of functions
here.

Operators

set

Set a key in the cache to a value. If the key already exists the contents are overridden.

add

Set a key in the cache to a value. If the key already exists the action fails with a 'key already exists' error, which can be detected with processor error handling.

get

Retrieve the contents of a cached key and replace the original message payload with the result. If the key does not exist the action fails with an error, which can be detected with processor error handling.

Examples

The cache processor can be used in combination with other processors in order to solve a variety of data stream problems.

Deduplication

Deduplication can be done using the add operator with a key extracted from the message payload, since it fails when a key already exists we can remove the duplicates using a processor_failed condition:

[
	{
		"cache": {
			"cache": "TODO",
			"operator": "add",
			"key": "${!json_field:message.id}",
			"value": "storeme"
		}
	},
	{
		"filter_parts": {
			"type": "processor_failed"
		}
	}
]

Hydration

It's possible to enrich payloads with content previously stored in a cache by using the process_dag processor:

[
	{
		"process_map": {
			"processors": [
				{
					"cache": {
						"cache": "TODO",
						"operator": "get",
						"key": "${!json_field:message.document_id}"
					}
				}
			],
			"postmap": {
				"message.document": "."
			}
		}
	}
]

catch

{
	"type": "catch",
	"catch": []
}

Behaves similarly to the for_each processor, where a list of child processors are applied to the message. However, processors are only applied to messages that failed a processing step prior to the catch.

For example, with the following config:

[
	{
		"type": "foo"
	},
	{
		"catch": [
			{
				"type": "bar"
			},
			{
				"type": "baz"
			}
		]
	}
]

If the processor foo fails for a particular message, that message will be fed into the processors bar and baz. Messages that do not fail for the processor foo will skip these processors.

When messages leave the catch block their fail flags are cleared. This processor is useful for when it's possible to recover failed messages, or when special actions (such as logging/metrics) are required before dropping them.

More information about error handing can be found here.

conditional

{
	"type": "conditional",
	"conditional": {
		"condition": {
			"type": "text",
			"text": {
				"arg": "",
				"operator": "equals_cs",
				"part": 0
			}
		},
		"else_processors": [],
		"processors": []
	}
}

Conditional is a processor that has a list of child processors, else_processors, and a condition. For each message , if the condition passes, the child processors will be applied, otherwise the else_processors are applied. This processor is useful for applying processors based on the content of message.

You can find a full list of conditions here.

filter

{
	"type": "filter",
	"filter": {
		"type": "text",
		"text": {
			"arg": "",
			"operator": "equals_cs",
			"part": 0
		}
	}
}

Tests each message against a condition, if the condition fails then the message is dropped. You can find a full list of conditions here.

hash

{
	"type": "hash",
	"hash": {
		"algorithm": "sha256",
		"parts": []
	}
}

Hashes messages according to the selected algorithm. Supported algorithms are: sha256, sha512, sha1, xxhash64.

This processor is mostly useful when combined with the process_field processor as it allows you to hash a specific field of a document like this:

{
	"process_field": {
		"path": "foo.bar",
		"processors": [
			{
				"hash": {
					"algorithm": "sha256"
				}
			}
		]
	}
}

http

{
	"type": "http",
	"http": {
		"max_parallel": 0,
		"parallel": false,
		"request": {
			"backoff_on": [
				429
			],
			"basic_auth": {
				"enabled": false,
				"password": "",
				"username": ""
			},
			"copy_response_headers": false,
			"drop_on": [],
			"headers": {
				"Content-Type": "application/octet-stream"
			},
			"max_retry_backoff": "300s",
			"oauth": {
				"access_token": "",
				"access_token_secret": "",
				"consumer_key": "",
				"consumer_secret": "",
				"enabled": false,
				"request_url": ""
			},
			"rate_limit": "",
			"retries": 3,
			"retry_period": "1s",
			"timeout": "5s",
			"tls": {
				"client_certs": [],
				"enabled": false,
				"root_cas_file": "",
				"skip_cert_verify": false
			},
			"url": "http://localhost:4195/post",
			"verb": "POST"
		}
	}
}

Performs an HTTP request using the message as the request body, and replaces the original message with the body of the response.

The rate_limit field can be used to specify a rate limit resource to cap the rate of requests across all parallel components service wide.

The URL and header values of this type can be dynamically set using function interpolations described here.

In order to map or encode the payload to a specific request body, and map the response back into the original payload instead of replacing it entirely, you can use the process_map or process_field processors.

Metadata

If the request returns a response code this processor sets a metadata field http_status_code on resulting message.

If the field copy_response_headers is set to true then any headers in the response will also be set in the resulting message as metadata.

Error Handling

When all retry attempts for a message are exhausted the processor cancels the attempt. These failed messages will continue through the pipeline unchanged, but can be dropped or placed in a dead letter queue according to your config, you can read about these patterns here.

jmespath

{
	"type": "jmespath",
	"jmespath": {
		"parts": [],
		"query": ""
	}
}

Parses a message as a JSON document and attempts to apply a JMESPath expression to it, replacing the contents of the part with the result. Please refer to the JMESPath website for information and tutorials regarding the syntax of expressions.

For example, with the following config:

{
	"jmespath": {
		"query": "locations[?state == 'WA'].name | sort(@) | {Cities: join(', ', @)}"
	}
}

If the initial contents of a message were:

{
  "locations": [
    {"name": "Seattle", "state": "WA"},
    {"name": "New York", "state": "NY"},
    {"name": "Bellevue", "state": "WA"},
    {"name": "Olympia", "state": "WA"}
  ]
}

Then the resulting contents would be:

{"Cities": "Bellevue, Olympia, Seattle"}

It is possible to create boolean queries with JMESPath, in order to filter messages with boolean queries please instead use the jmespath condition.

json

{
	"type": "json",
	"json": {
		"operator": "clean",
		"parts": [],
		"path": "",
		"value": ""
	}
}

Parses messages as a JSON document, performs a mutation on the data, and then overwrites the previous contents with the new value.

The field path is a dot separated path which, for most operators, determines the field within the payload to be targeted. If the path is empty or "." the root of the data will be targeted.

This processor will interpolate functions within the 'value' field, you can find a list of functions here.

Operators

append

Appends a value to an array at a target dot path. If the path does not exist all objects in the path are created (unless there is a collision).

If a non-array value already exists in the target path it will be replaced by an array containing the original value as well as the new value.

If the value is an array the elements of the array are expanded into the new array. E.g. if the target is an array [0,1] and the value is also an array [2,3], the result will be [0,1,2,3] as opposed to [0,1,[2,3]].

clean

Walks the JSON structure and deletes any fields where the value is:

  • An empty array
  • An empty object
  • An empty string
  • null

copy

Copies the value of a target dot path (if it exists) to a location. The destination path is specified in the value field. If the destination path does not exist all objects in the path are created (unless there is a collision).

delete

Removes a key identified by the dot path. If the path does not exist this is a no-op.

move

Moves the value of a target dot path (if it exists) to a new location. The destination path is specified in the value field. If the destination path does not exist all objects in the path are created (unless there is a collision).

select

Reads the value found at a dot path and replaced the original contents entirely by the new value.

set

Sets the value of a field at a dot path. If the path does not exist all objects in the path are created (unless there is a collision).

The value can be any type, including objects and arrays. When using YAML configuration files a YAML object will be converted into a JSON object, i.e. with the config:

{
	"json": {
		"operator": "set",
		"parts": [
			0
		],
		"path": "some.path",
		"value": {
			"foo": {
				"bar": 5
			}
		}
	}
}

The value will be converted into '{"foo":{"bar":5}}'. If the YAML object contains keys that aren't strings those fields will be ignored.

split

Splits a string field by a value and replaces the original string with an array containing the results of the split. This operator requires both the path value and the contents of the value field to be strings.

lambda

{
	"type": "lambda",
	"lambda": {
		"credentials": {
			"id": "",
			"profile": "",
			"role": "",
			"role_external_id": "",
			"secret": "",
			"token": ""
		},
		"endpoint": "",
		"function": "",
		"parallel": false,
		"rate_limit": "",
		"region": "eu-west-1",
		"retries": 3,
		"timeout": "5s"
	}
}

Invokes an AWS lambda for each message. The contents of the message is the payload of the request, and the result of the invocation will become the new contents of the message.

It is possible to perform requests per message of a batch in parallel by setting the parallel flag to true. The rate_limit field can be used to specify a rate limit resource to cap the rate of requests across parallel components service wide.

In order to map or encode the payload to a specific request body, and map the response back into the original payload instead of replacing it entirely, you can use the process_map or process_field processors.

Error Handling

When all retry attempts for a message are exhausted the processor cancels the attempt. These failed messages will continue through the pipeline unchanged, but can be dropped or placed in a dead letter queue according to your config, you can read about these patterns here.

metadata

{
	"type": "metadata",
	"metadata": {
		"key": "example",
		"operator": "set",
		"parts": [],
		"value": "${!hostname}"
	}
}

Performs operations on the metadata of a message. Metadata are key/value pairs that are associated with message. Metadata values can be referred to using configuration interpolation functions, which allow you to set fields in certain outputs using these dynamic values.

This processor will interpolate functions within both the key and value fields, you can find a list of functions here. This allows you to set the contents of a metadata field using values taken from the message payload.

{
	"for_each": [
		{
			"metadata": {
				"operator": "set",
				"key": "foo",
				"value": "${!json_field:document.foo}"
			}
		}
	]
}

Operators

set

Sets the value of a metadata key.

delete

Removes all metadata values from the message where the key matches the value provided. If the value field is left empty the key value will instead be used.

delete_all

Removes all metadata values from the message.

delete_prefix

Removes all metadata values from the message where the key is prefixed with the value provided. If the value field is left empty the key value will instead be used as the prefix.

noop

{
	"type": "noop"
}

Noop is a no-op processor that does nothing, the message passes through unchanged.

number

{
	"type": "number",
	"number": {
		"operator": "add",
		"parts": [],
		"value": 0
	}
}

Parses message contents into a 64-bit floating point number and performs an operator on it. In order to execute this processor on a sub field of a document use it with the process_field processor.

The value field can either be a number or a string type. If it is a string type then this processor will interpolate functions within it, you can find a list of functions here.

For example, if we wanted to subtract the current unix timestamp from the field 'foo' of a JSON document {"foo":1561219142} we could use the following config:

{
	"process_field": {
		"path": "foo",
		"result_type": "float",
		"processors": [
			{
				"number": {
					"operator": "subtract",
					"value": "${!timestamp_unix}"
				}
			}
		]
	}
}

Operators

add

Adds a value.

subtract

Subtracts a value.

process_dag

{
	"type": "process_dag",
	"process_dag": {}
}

A processor that manages a map of process_map processors and calculates a Directed Acyclic Graph (DAG) of their dependencies by referring to their postmap targets for provided fields and their premap targets for required fields.

The names of workflow stages may only contain alphanumeric, underscore and dash characters (they must match the regular expression [a-zA-Z0-9_-]+).

The DAG is then used to execute the children in the necessary order with the maximum parallelism possible. You can read more about workflows in in this document.

The field dependencies is an optional array of fields that a child depends on. This is useful for when fields are required but don't appear within a premap such as those used in conditions.

This processor is extremely useful for performing a complex mesh of enrichments where network requests mean we desire maximum parallelism across those enrichments.

For example, if we had three target HTTP services that we wished to enrich each document with - foo, bar and baz - where baz relies on the result of both foo and bar, we might express that relationship here like so:

{
	"process_dag": {
		"foo": {
			"premap": {
				".": "."
			},
			"processors": [
				{
					"http": {
						"request": {
							"url": "http://foo/enrich"
						}
					}
				}
			],
			"postmap": {
				"foo_result": "."
			}
		},
		"bar": {
			"premap": {
				".": "msg.sub.path"
			},
			"processors": [
				{
					"http": {
						"request": {
							"url": "http://bar/enrich"
						}
					}
				}
			],
			"postmap": {
				"bar_result": "."
			}
		},
		"baz": {
			"premap": {
				"foo_obj": "foo_result",
				"bar_obj": "bar_result"
			},
			"processors": [
				{
					"http": {
						"request": {
							"url": "http://baz/enrich"
						}
					}
				}
			],
			"postmap": {
				"baz_obj": "."
			}
		}
	}
}

With this config the DAG would determine that the children foo and bar can be executed in parallel, and once they are both finished we may proceed onto baz.

process_field

{
	"type": "process_field",
	"process_field": {
		"codec": "json",
		"parts": [],
		"path": "",
		"processors": [],
		"result_type": "string"
	}
}

A processor that extracts the value of a field dot path within payloads according to a specified codec, applies a list of processors to the extracted value and finally sets the field within the original payloads to the processed result.

Codecs

json (default)

Parses the payload as a JSON document, extracts and sets the field using a dot notation path.

The result, according to the config field result_type, can be marshalled into any of the following types: string (default), int, float, bool, object (including null), array and discard. The discard type is a special case that discards the result of the processing steps entirely.

It's therefore possible to use this codec without any child processors as a way of casting string values into other types. For example, with an input JSON document {"foo":"10"} it's possible to cast the value of the field foo to an integer type with:

{
	"process_field": {
		"path": "foo",
		"result_type": "int"
	}
}

metadata

Extracts and sets a metadata value identified by the path field. If the field result_type is set to discard then the result of the processing stages is discarded and the original metadata value is left unchanged.

Usage

For example, with an input JSON document {"foo":"hello world"} it's possible to uppercase the value of the field 'foo' by using the JSON codec and a text child processor:

{
	"process_field": {
		"codec": "json",
		"path": "foo",
		"processors": [
			{
				"text": {
					"operator": "to_upper"
				}
			}
		]
	}
}

If the number of messages resulting from the processing steps does not match the original count then this processor fails and the messages continue unchanged. Therefore, you should avoid using batch and filter type processors in this list.

process_map

{
	"type": "process_map",
	"process_map": {
		"conditions": [],
		"parts": [],
		"postmap": {},
		"postmap_optional": {},
		"premap": {},
		"premap_optional": {},
		"processors": []
	}
}

A processor that extracts and maps fields identified via dot path from the original payload into new objects, applies a list of processors to the newly constructed objects, and finally maps the result back into the original payload.

This processor is useful for performing processors on subsections of a payload. For example, you could extract sections of a JSON object in order to construct a request object for an http processor, then map the result back into a field within the original object.

The order of stages of this processor are as follows:

  • Conditions are applied to each individual message part in the batch, determining whether the part will be mapped. If the conditions are empty all message parts will be mapped. If the field parts is populated the message parts not in this list are also excluded from mapping.
  • Message parts that are flagged for mapping are mapped according to the premap fields, creating a new object. If the premap stage fails (targets are not found) the message part will not be processed.
  • Message parts that are mapped are processed as a batch. You may safely break the batch into individual parts during processing with the split processor.
  • After all child processors are applied to the mapped messages they are mapped back into the original message parts they originated from as per your postmap. If the postmap stage fails the mapping is skipped and the message payload remains as it started.

Map paths are arbitrary dot paths, target path hierarchies are constructed if they do not yet exist. Processing is skipped for message parts where the premap targets aren't found, for optional premap targets use premap_optional.

Map target paths that are parents of other map target paths will always be mapped first, therefore it is possible to map subpath overrides.

If postmap targets are not found the merge is abandoned, for optional postmap targets use postmap_optional.

If the premap is empty then the full payload is sent to the processors, if the postmap is empty then the processed result replaces the original contents entirely.

Maps can reference the root of objects either with an empty string or '.', for example the maps:

{
	"premap": {
		".": "foo.bar"
	},
	"postmap": {
		"foo.bar": "."
	}
}

Would create a new object where the root is the value of foo.bar and would map the full contents of the result back into foo.bar.

If the number of total message parts resulting from the processing steps does not match the original count then this processor fails and the messages continue unchanged. Therefore, you should avoid using batch and filter type processors in this list.

c8ql*

{
	"type": "c8ql",
	"c8ql": {
		"args": "",
		"query": ""
	}
}

C8QL is a processor that runs a query against C8 database for each message and, for queries that return rows, replaces the message with the result.

If a query contains arguments they can be set as an array of strings supporting interpolation functions in the args field.

Results

When a query returns rows they are serialised according to below codec, and
the message contents are replaced with the serialised result.

json_array

The resulting rows are serialised into an array of JSON objects, where each object represents a row, where the key is the column name and the value is that columns value in the row.

switch

{
	"type": "switch",
	"switch": []
}

Switch is a processor that lists child case objects each containing a condition and processors. Each message is tested against the condition of each child case until a condition passes, whereby the processors of that case will be executed on the message.

Each case may specify a boolean fallthrough field indicating whether the next case should be executed after it (the default is false.)

A case takes this form:

[
	{
		"condition": {
			"type": "foo"
		},
		"processors": [
			{
				"type": "foo"
			}
		],
		"fallthrough": false
	}
]

You can find a full list of conditions here.

text

{
	"type": "text",
	"text": {
		"arg": "",
		"operator": "trim_space",
		"parts": [],
		"value": ""
	}
}

Performs text based mutations on payloads.

This processor will interpolate functions within the value field, you can find a list of functions here.

Operators

append

Appends text to the end of the payload.

escape_url_query

Escapes text so that it is safe to place within the query section of a URL.

unescape_url_query

Unescapes text that has been url escaped.

find_regexp

Extract the matching section of the argument regular expression in a message.

prepend

Prepends text to the beginning of the payload.

quote

Returns a doubled-quoted string, using escape sequences (\t, \n, \xFF, \u0100) for control characters and other non-printable characters.

replace

Replaces all occurrences of the argument in a message with a value.

replace_regexp

Replaces all occurrences of the argument regular expression in a message with a value. Inside the value $ signs are interpreted as submatch expansions, e.g. $1 represents the text of the first submatch.

set

Replace the contents of a message entirely with a value.

strip_html

Removes all HTML tags from a message.

to_lower

Converts all text into lower case.

to_upper

Converts all text into upper case.

trim

Removes all leading and trailing occurrences of characters within the arg field.

trim_space

Removes all leading and trailing whitespace from the payload.

unquote

Unquotes a single, double, or back-quoted string literal

try

{
	"type": "try",
	"try": []
}

A list of child processors are applied to each message. If a processor fails for a message then that message will skip all following processors.

For example, with the following config:

[
	{
		"try": [
			{
				"type": "foo"
			},
			{
				"type": "bar"
			},
			{
				"type": "baz"
			}
		]
	}
]

If the processor foo fails for a particular message, that message will skip the processors bar and baz.

This processor is useful for when child processors depend on the successful output of previous processors. This processor can be followed with a catch processor for defining child processors to be applied only to failed messages.

More information about error handing can be found here.

Was this article helpful?