fluentd not parsing JSON log file entry
Asked Answered
S

3

6

I've seen a number of similar questions on Stackoverflow, including this one. But none address my particular issue.

The application is deployed in a Kubernetes (v1.15) cluster. I'm using a docker image based on the fluent/fluentd-docker-image GitHub repo, v1.9/armhf, modified to include the elasticsearch plugin. Elasticsearch and Kibana are both version 7.6.0.

The logs are going to stdout and look like:

{"Application":"customer","HTTPMethod":"GET","HostName":"","RemoteAddr":"10.244.4.154:51776","URLPath":"/customers","level":"info","msg":"HTTP request received","time":"2020-03-10T20:17:32Z"}

In Kibana I'm seeing something like this:

{
  "_index": "logstash-2020.03.10",
  "_type": "_doc",
  "_id": "p-UZxnABBcooPsDQMBy_",
  "_version": 1,
  "_score": null,
  "_source": {
    "log": "{\"Application\":\"customer\",\"HTTPMethod\":\"GET\",\"HostName\":\"\",\"RemoteAddr\":\"10.244.4.154:46160\",\"URLPath\":\"/customers\",\"level\":\"info\",\"msg\":\"HTTP request received\",\"time\":\"2020-03-10T20:18:18Z\"}\n",
    "stream": "stdout",
    "docker": {
      "container_id": "cd1634b0ce410f3c89fe63f508fe6208396be87adf1f27fa9d47a01d81ff7904"
    },
    "kubernetes": {

I'm expecting to see the JSON pulled from the log: value somewhat like this (abbreviated):

{
  "_index": "logstash-2020.03.10",
  ...
  "_source": {
    "log": "...",   
    "Application":"customer",
    "HTTPMethod":"GET",
    "HostName":"",
    "RemoteAddr":"10.244.4.154:46160",
    "URLPath":"/customers",
    "level":"info",
    "msg":"HTTP request received",
    "time":"2020-03-10T20:18:18Z",
    "stream": "stdout",
    "docker": {
      "container_id": "cd1634b0ce410f3c89fe63f508fe6208396be87adf1f27fa9d47a01d81ff7904"
    },
    "kubernetes": {

My fluentd config is:

match fluent.**>
  @type null
</match>

<source>
  @type tail
  path /var/log/containers/*.log
  pos_file /var/log/fluentd-containers.log.pos
  time_format %Y-%m-%dT%H:%M:%S.%NZ
  tag kubernetes.*
  format json
  read_from_head true
</source>

<match kubernetes.var.log.containers.**fluentd**.log>
  @type null
</match>
<match kubernetes.var.log.containers.**kube-system**.log>
  @type null
</match>
<filter kubernetes.**>
  @type kubernetes_metadata
</filter>

<match **>
   @type elasticsearch
   @id out_es
   @log_level info
   include_tag_key true
   host "#{ENV['FLUENT_ELASTICSEARCH_HOST']}"
   port "#{ENV['FLUENT_ELASTICSEARCH_PORT']}"
   path "#{ENV['FLUENT_ELASTICSEARCH_PATH']}"
   <format>
      @type json
   </format>
</match>

I'm sure I'm missing something. Can anyone point me in the right direction?

Thanks, Rich

Stomato answered 10/3, 2020 at 21:32 Comment(0)
B
10

This config worked for me:

<source>
  @type tail
  path /var/log/containers/*.log,/var/log/containers/*.log
  pos_file /opt/bitnami/fluentd/logs/buffers/fluentd-docker.pos
  tag kubernetes.*
  read_from_head true
  <parse>
    @type json
    time_key time
    time_format %iso8601
  </parse>
</source>

<filter kubernetes.**>
  @type parser
  key_name "$.log"
  hash_value_field "log"
  reserve_data true
  <parse>
    @type json
  </parse> 
</filter>

<filter kubernetes.**>
  @type kubernetes_metadata
</filter>

Make sure to edit path so that it matches your use case.

This happens because docker logs in /var/log/containers/*.log put container STDOUT under 'log' key as string, so to put those JSON logs there as strings they must be first serialized to strings. What you need to do is to add an additional step that will parse this string under 'log' key:

<filter kubernetes.**>
  @type parser
  key_name "$.log"
  hash_value_field "log"
  reserve_data true
  <parse>
    @type json
  </parse> 
</filter>
Bayou answered 22/6, 2020 at 14:5 Comment(1)
Btw, I made it working only with the "key_name log". docs.fluentd.org/filter/parser#key_nameShalloon
B
1

Im SOLVED from this parse

check in http first, make sure it was parse, and log your container

fluentd.conf

<source>
  @type http
  port 5170
  bind 0.0.0.0
</source>

<filter *>
  @type parser
  key_name "$.log"
  hash_value_field "log"
  reserve_data true
  <parse>
    @type json
  </parse> 
</filter>

<match **>
  @type stdout
</match>

and check http in your terminal with curl

curl -i -X POST -d 'json={"source":"stderr","log":"{\"applicationName\":\"api-producer-go\",\"level\":\"info\",\"msg\":\"Development is Running\",\"time\":\"2020-09-04T14:32:29Z\"}","container_id":"f9975c6a7bc6dcc21dbdacca8ff98152cd04ae28b3bc36707eba5453f6ff9960","container_name":"/api-producer-golang"}' http://localhost:5170/test.cycle
Bullen answered 4/9, 2020 at 14:47 Comment(0)
K
0

I had a json being emmited from my container like this:

{"asctime": "2020-06-28 23:40:37,184", "filename": "streaming_pull_manager.py", "funcName": "_should_recover", "lineno": 648, "processName": "MainProcess", "threadName": "Thread-6", "message": "Observed recoverable stream error 504 Deadline Exceeded", "severity": "INFO"}

And Kibana was showing "failed to find message". Then I went and google around and I fixed that by appending the following code to my kubernetes.conf:

<filter **>
  @type record_transformer
  <record>
    log_json ${record["log"]}
  </record>
</filter>


<filter **>
  @type parser
  @log_level debug
  key_name log_json
  reserve_data true
  remove_key_name_field true
  emit_invalid_record_to_error false
  <parse>
    @type json
  </parse>
</filter>

The final kuberenetes.json file looks like this:

<label @FLUENT_LOG>
  <match fluent.**>
    @type null
  </match>
</label>

<source>
  @type tail
  @id in_tail_container_logs
  path /var/log/containers/*.log
  pos_file /var/log/fluentd-containers.log.pos
  tag "#{ENV['FLUENT_CONTAINER_TAIL_TAG'] || 'kubernetes.*'}"
  exclude_path "#{ENV['FLUENT_CONTAINER_TAIL_EXCLUDE_PATH'] || use_default}"
  read_from_head true
  <parse>
    @type "#{ENV['FLUENT_CONTAINER_TAIL_PARSER_TYPE'] || 'json'}"
    time_format %Y-%m-%dT%H:%M:%S.%NZ
  </parse>
</source>

<source>
  @type tail
  @id in_tail_minion
  path /var/log/salt/minion
  pos_file /var/log/fluentd-salt.pos
  tag salt
  <parse>
    @type regexp
    expression /^(?<time>[^ ]* [^ ,]*)[^\[]*\[[^\]]*\]\[(?<severity>[^ \]]*) *\] (?<message>.*)$/
    time_format %Y-%m-%d %H:%M:%S
  </parse>
</source>

<source>
  @type tail
  @id in_tail_startupscript
  path /var/log/startupscript.log
  pos_file /var/log/fluentd-startupscript.log.pos
  tag startupscript
  <parse>
    @type syslog
  </parse>
</source>

<source>
  @type tail
  @id in_tail_docker
  path /var/log/docker.log
  pos_file /var/log/fluentd-docker.log.pos
  tag docker
  <parse>
    @type regexp
    expression /^time="(?<time>[^)]*)" level=(?<severity>[^ ]*) msg="(?<message>[^"]*)"( err="(?<error>[^"]*)")?( statusCode=($<status_code>\d+))?/
  </parse>
</source>

<source>
  @type tail
  @id in_tail_etcd
  path /var/log/etcd.log
  pos_file /var/log/fluentd-etcd.log.pos
  tag etcd
  <parse>
    @type none
  </parse>
</source>

<source>
  @type tail
  @id in_tail_kubelet
  multiline_flush_interval 5s
  path /var/log/kubelet.log
  pos_file /var/log/fluentd-kubelet.log.pos
  tag kubelet
  <parse>
    @type kubernetes
  </parse>
</source>

<source>
  @type tail
  @id in_tail_kube_proxy
  multiline_flush_interval 5s
  path /var/log/kube-proxy.log
  pos_file /var/log/fluentd-kube-proxy.log.pos
  tag kube-proxy
  <parse>
    @type kubernetes
  </parse>
</source>

<source>
  @type tail
  @id in_tail_kube_apiserver
  multiline_flush_interval 5s
  path /var/log/kube-apiserver.log
  pos_file /var/log/fluentd-kube-apiserver.log.pos
  tag kube-apiserver
  <parse>
    @type kubernetes
  </parse>
</source>

<source>
  @type tail
  @id in_tail_kube_controller_manager
  multiline_flush_interval 5s
  path /var/log/kube-controller-manager.log
  pos_file /var/log/fluentd-kube-controller-manager.log.pos
  tag kube-controller-manager
  <parse>
    @type kubernetes
  </parse>
</source>

<source>
  @type tail
  @id in_tail_kube_scheduler
  multiline_flush_interval 5s
  path /var/log/kube-scheduler.log
  pos_file /var/log/fluentd-kube-scheduler.log.pos
  tag kube-scheduler
  <parse>
    @type kubernetes
  </parse>
</source>

<source>
  @type tail
  @id in_tail_rescheduler
  multiline_flush_interval 5s
  path /var/log/rescheduler.log
  pos_file /var/log/fluentd-rescheduler.log.pos
  tag rescheduler
  <parse>
    @type kubernetes
  </parse>
</source>

<source>
  @type tail
  @id in_tail_glbc
  multiline_flush_interval 5s
  path /var/log/glbc.log
  pos_file /var/log/fluentd-glbc.log.pos
  tag glbc
  <parse>
    @type kubernetes
  </parse>
</source>

<source>
  @type tail
  @id in_tail_cluster_autoscaler
  multiline_flush_interval 5s
  path /var/log/cluster-autoscaler.log
  pos_file /var/log/fluentd-cluster-autoscaler.log.pos
  tag cluster-autoscaler
  <parse>
    @type kubernetes
  </parse>
</source>

# Example:
# 2017-02-09T00:15:57.992775796Z AUDIT: id="90c73c7c-97d6-4b65-9461-f94606ff825f" ip="104.132.1.72" method="GET" user="kubecfg" as="<self>" asgroups="<lookup>" namespace="default" uri="/api/v1/namespaces/default/pods"
# 2017-02-09T00:15:57.993528822Z AUDIT: id="90c73c7c-97d6-4b65-9461-f94606ff825f" response="200"
<source>
  @type tail
  @id in_tail_kube_apiserver_audit
  multiline_flush_interval 5s
  path /var/log/kubernetes/kube-apiserver-audit.log
  pos_file /var/log/kube-apiserver-audit.log.pos
  tag kube-apiserver-audit
  <parse>
    @type multiline
    format_firstline /^\S+\s+AUDIT:/
    # Fields must be explicitly captured by name to be parsed into the record.
    # Fields may not always be present, and order may change, so this just looks
    # for a list of key="\"quoted\" value" pairs separated by spaces.
    # Unknown fields are ignored.
    # Note: We can't separate query/response lines as format1/format2 because
    #       they don't always come one after the other for a given query.
    format1 /^(?<time>\S+) AUDIT:(?: (?:id="(?<id>(?:[^"\\]|\\.)*)"|ip="(?<ip>(?:[^"\\]|\\.)*)"|method="(?<method>(?:[^"\\]|\\.)*)"|user="(?<user>(?:[^"\\]|\\.)*)"|groups="(?<groups>(?:[^"\\]|\\.)*)"|as="(?<as>(?:[^"\\]|\\.)*)"|asgroups="(?<asgroups>(?:[^"\\]|\\.)*)"|namespace="(?<namespace>(?:[^"\\]|\\.)*)"|uri="(?<uri>(?:[^"\\]|\\.)*)"|response="(?<response>(?:[^"\\]|\\.)*)"|\w+="(?:[^"\\]|\\.)*"))*/
    time_format %Y-%m-%dT%T.%L%Z
  </parse>
</source>

<filter kubernetes.**>
  @type kubernetes_metadata
  @id filter_kube_metadata
  kubernetes_url "#{ENV['FLUENT_FILTER_KUBERNETES_URL'] || 'https://' + ENV.fetch('KUBERNETES_SERVICE_HOST') + ':' + ENV.fetch('KUBERNETES_SERVICE_PORT') + '/api'}"
  verify_ssl "#{ENV['KUBERNETES_VERIFY_SSL'] || true}"
  ca_file "#{ENV['KUBERNETES_CA_FILE']}"
  skip_labels "#{ENV['FLUENT_KUBERNETES_METADATA_SKIP_LABELS'] || 'false'}"
  skip_container_metadata "#{ENV['FLUENT_KUBERNETES_METADATA_SKIP_CONTAINER_METADATA'] || 'false'}"
  skip_master_url "#{ENV['FLUENT_KUBERNETES_METADATA_SKIP_MASTER_URL'] || 'false'}"
  skip_namespace_metadata "#{ENV['FLUENT_KUBERNETES_METADATA_SKIP_NAMESPACE_METADATA'] || 'false'}"
</filter>

<filter **>
  @type record_transformer
  <record>
    log_json ${record["log"]}
  </record>
</filter>


<filter **>
  @type parser
  @log_level debug
  key_name log_json
  reserve_data true
  remove_key_name_field true
  emit_invalid_record_to_error false
  <parse>
    @type json
  </parse>
</filter>

EDIT: If anyone is looking for how to overwrite fluent .conf files, especially kubernetes.conf, there is an amazing tutorial here.

Koressa answered 30/6, 2020 at 3:18 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.