ingest pipelines

split an event into multiple

use the script processor:

"script": {
  "lang": "painless",
  "source": """
    if (ctx.containsKey('my_array_field') && ctx['my_array_field' instanceof List) {
      List new_events = new ArrayList();
      for (def item : ctx['my_array_field']) {
        Map new_doc = new HashMap(ctx); // copy orig document
        new_doc.remove('my_array_field'); // remove the orig array field
        new_doc.put('single_item_field', item); // add the individual item
        new_events.add(new_doc);
      }
      ctx.remove('my_array_field'); // remove the original array field from the current document
      ctx._ingest.on_failure = (e) => { /* handle potential failures? */ };
      ctx._ingest.new_documents = new_events;
    }
  """
}
PUT _ingest/pipeline/logs-agent_status
{
  "version": 2,
  "processors": [
    {
      "set": {
        "tag": "debug_set",
        "field": "event.original",
        "copy_from": "message"
      }
    },
    {
      "remove": {
        "ignore_missing": true,
        "tag": "debug_remove",
        "field": "message"
      }
    },
    {
      "json": {
        "tag": "debug_json",
        "field": "event.original",
        "target_field": "temp"
      }
    },
    {
      "set": {
        "field": "agent_overview",
        "copy_from": "temp.statusSummary"
      }
    },
    {
      "set": {
        "field": "message",
        "copy_from": "temp.items"
      }
    },
    {
      "script": {
        "source": "if (ctx.containsKey('message') && ctx['message'] instanceof List) {\n  List new_events = new ArrayList();\n  for (def item : ctx['message']) {\n    Map new_doc = new HashMap(ctx);\n    new_doc.remove('message');\n    new_doc.put('agent_status', item);\n    new_events.add(new_doc);\n  }\n  ctx._ingest.new_documents = new_events;\n}",
        "tag": "split debug_script",
        "lang": "painless",
        "if": "ctx?.message != null"
      }
    },
    {
      "remove": {
        "field": [
          "event.original",
          "message",
          "temp"
        ]
      }
    }
  ],
  "on_failure": [
    {
      "set": {
        "tag": "ingest_pipeline_failure",
        "field": "error.message",
        "value": "{{ _ingest.on_failure_message }}"
      }
    },
    {
      "set": {
        "field": "event.kind",
        "value": "pipeline_error"
      }
    }
  ]
}