ingest pipelines¶
split an event into multiple¶
use the script processor:
"script": {
"lang": "painless",
"source": """
if (ctx.containsKey('my_array_field') && ctx['my_array_field' instanceof List) {
List new_events = new ArrayList();
for (def item : ctx['my_array_field']) {
Map new_doc = new HashMap(ctx); // copy orig document
new_doc.remove('my_array_field'); // remove the orig array field
new_doc.put('single_item_field', item); // add the individual item
new_events.add(new_doc);
}
ctx.remove('my_array_field'); // remove the original array field from the current document
ctx._ingest.on_failure = (e) => { /* handle potential failures? */ };
ctx._ingest.new_documents = new_events;
}
"""
}
PUT _ingest/pipeline/logs-agent_status
{
"version": 2,
"processors": [
{
"set": {
"tag": "debug_set",
"field": "event.original",
"copy_from": "message"
}
},
{
"remove": {
"ignore_missing": true,
"tag": "debug_remove",
"field": "message"
}
},
{
"json": {
"tag": "debug_json",
"field": "event.original",
"target_field": "temp"
}
},
{
"set": {
"field": "agent_overview",
"copy_from": "temp.statusSummary"
}
},
{
"set": {
"field": "message",
"copy_from": "temp.items"
}
},
{
"script": {
"source": "if (ctx.containsKey('message') && ctx['message'] instanceof List) {\n List new_events = new ArrayList();\n for (def item : ctx['message']) {\n Map new_doc = new HashMap(ctx);\n new_doc.remove('message');\n new_doc.put('agent_status', item);\n new_events.add(new_doc);\n }\n ctx._ingest.new_documents = new_events;\n}",
"tag": "split debug_script",
"lang": "painless",
"if": "ctx?.message != null"
}
},
{
"remove": {
"field": [
"event.original",
"message",
"temp"
]
}
}
],
"on_failure": [
{
"set": {
"tag": "ingest_pipeline_failure",
"field": "error.message",
"value": "{{ _ingest.on_failure_message }}"
}
},
{
"set": {
"field": "event.kind",
"value": "pipeline_error"
}
}
]
}