HowTo - Using Pipelines for Data Filters & Transforms
  • Updated on 21 Oct 2019
  • 1 minute to read
  • Contributors
  • Print
  • Share
  • Dark
    Light

HowTo - Using Pipelines for Data Filters & Transforms

  • Print
  • Share
  • Dark
    Light

In this example we start off with a stream of JSON data being written to a stream. Our task is to consume the stream, mutate the data into a new format, filter certain items, and write the stream to a new topic.

For this example the data will be social media interactions with NLP enrichments such as sentiment and language detection, it looks something like this:

{
  "content": {
    "text": "This ist a public service announcement: I hate Jerry, Kate is okay.",
    "created_at": 1524599100
  },
  "language":[
    {
      "confidence": 0.07,
      "code": "de"
    },
    {
      "confidence": 0.93,
      "code": "en"
    }
  ],
  "sentiment": [
    {
       "entity": "jerry",
       "level": "negative",
       "confidence": 0.45
    },
    {
       "entity": "kate",
       "level": "neutral",
       "confidence": 0.08
    }
  ]
}

And we wish to mutate the data such that irrelevant data is removed. Specifically, we want to reduce the language detection candidates down to a single language code, and we want to remove sentiment entities with a confidence below a certain level (0.3). The desired output from the above sample would be:

{
  "content": {
    "created_at": 1524599100,
    "text": "This ist a public service announcement: I hate Jerry, Kate is okay."
  },
  "entities": [
    {
      "name": "jerry",
      "sentiment": "negative"
    }
  ],
  "language": "en"
}

Which we can accomplish using the [JMESPath processor][jmespath-processor].

Input

{
	"input": {
		"type": "c8streams",
		"c8streams": {
			"name": "input-stream",
			"local": false
		}
	}
}

Pipeline

Our input is a standard c8stream.

{
	"pipeline": {
		"processors": [{
			"jmespath": {
				"query": "{\n  content: content,\n  entities: sentiment[?confidence > `0.3`].{\n    name: entity,\n    sentiment: level\n  },\n  language: max_by(language, &confidence).code\n}\n"
			}
		}]
	}
}

The pipeline is where we construct our processing pipelines. The only processor defined here is our JMESPath query, which is where our JSON mutation comes from. You can [read more about JMESPath here][jmespath].

Output

Our output is a standard c8stream producer output.

{
	"output": {
		"type": "c8streams",
		"c8streams": {
			"name": "output-stream",
			"local": false
		}
	}
}

Full example

{
	"input": {
		"type": "c8streams",
		"c8streams": {
			"name": "input-stream",
			"local": false
		}
	},
	"pipeline": {
		"processors": [{
			"jmespath": {
				"query": "{\n  content: content,\n  entities: sentiment[?confidence > `0.3`].{\n    name: entity,\n    sentiment: level\n  },\n  language: max_by(language, &confidence).code\n}\n"
			}
		}]
	},
	"output": {
		"type": "c8streams",
		"c8streams": {
			"name": "output-stream",
			"local": false
		}
	}
}
Was this article helpful?