Tips

How to add tags based on field content with pipelines?

If the field’s name is known, it can be used directly. If not, use “message” which holds everything.

filter {
  if [message] =~ /regexp/ {
    mutate {
      add_tag => [ "tag1", "tag2" ]
    }
  }
}

Add Tags to Different Kafka Topics

Notes: [@metadata][kafka][topic] will be empty sometimes due to unknown issues. Hence this tip is listed here for reference.

input {
  kafka {
    client_id => "logstash_server"
    group_id => "logstash"
    topics => ["unity", "xio"]
    codec => "json"
    bootstrap_servers => "kafka_server1:9092,kafka_server2:9092,kafka_server3:9092"
  }
}

filter {
  if [@metadata][kafka][topic] == "unity" {
    mutate { add_tag => ["unity"] }
  }
  if [@metadata][kafka][topic] == "xio" {
    mutate { add_tag => ["xio"] }
  }
}

output {
  elasticsearch {
    hosts => ["http://elasticsearch1:9200", "http://elasticsearch2:9200", "http://elasticsearch3:9200"]
    index => "storagebox-%{+YYYY.MM.dd}"
  }
}

Rename the Host Field while Sending Filebeat Events to Logstash

If filebeat is sending events to Elasticsearch directly, everything works fine. However, if filebeat is sending events to an index already used by Logstash where syslog(TCP/UDP input) is also sending events to, error on the host field will be raised:

  • TCP/UDP input plugin of Logstash will add a field host to stand for where the information is generated. This field is a string;
  • Filebeat sends events with a field host which is an object(dict);
  • Because of the difference, Elasticsearch cannot map the host field correctly and generate index accordingly.

To fix this, the mutate filter plugin can be used to rename the host field of Filebeat to a new name as below:

filter {
  mutate {
    rename => ["host", "server"]
  }
}

Consolidate Unix Logs with NXLog

System logs can be consolidated to Elasticsearch directly on Unix systems (AIX, Solaris, FreeBSE, HP-UX) by configuring syslog forwarding, however, there is no way to forward non-system logs, such as logs generated by applications, due to the fact that Filebeat only supports Linux and Windows.

NXLog can be leveraged on Unix systems to consolidate such logs. Please refer to NXLog User Guide for reference.

Reveal metadata of an event

@metadata of events won’t be shown at output time. However, we may need to check its content sometimes:

  • An output definition as below can be added in Logstash pipeline to print out the content of @metadata:

    stdout { codec => rubydebug { metadata => true } }
    
  • Create a copy of @metadata: the newly created object meta will be displayed as normal at the output time

    filter {
      mutate {
        copy => {
          "@metadata" => "meta"
        }                                                                                                                                                     }
    }
    
  • It is also possible to just check some specified fields of @metadata:

    filter {
      mutate {
        add_field => { "src_ip" => "%{[@metadata][ip_address]}" }
      }
    }
    

Use syslog timestamp as @timestamp

When events are sent from syslog to logstash, it will have a structure as below after processing:

{
          "host" => "localhost",
       "message" => "Feb 21 07:16:20 devbox avahi-daemon[324]: Registering new address record for 10.0.2.15 on enp0s3.IPv4\r",
      "@version" => "1",
          "port" => 58574,
          "type" => "syslog",
    "@timestamp" => 2020-02-21T07:16:25.507Z
}

Thare are 2 x timestamp inforamtion:

  • At the beginning of the message field (Feb 21 07:16:20): the exact time this event happens
  • The @timestamp field (2020-02-21T07:16:25.507Z): the time in UTC logstash see this event

Most of time, the 2 x timestamp are close to each other. Of course, there will be some tiny difference between them since transferring the event to logstash will cost some time (5 x seconds difference for this example). For most use cases, the difference is acceptable. However, it is not acceptable for use cases which need serious time accuracy. For such cases, the date filter can be used to make the @timestamp field use the same value as the timestamp in the message field:

filter {
  grok {
    match => {
      "message" => "(?<logtime>^\w+?.*?\d{2}:\d{2}:\d{2}) %{GREEDYDATA:event}"
    }
  }
  date {
    locale => "en_US"
    timezone => "UTC"
    match => [ "logtime", "MMM dd HH:mm:ss", "MMM dd yyyy HH:mm:ss" ]
  }
  mutate => { remove_field => [ "logtime", "event" ] }
}

Explanations:

  • grok: this filter will extract the timestamp information from the message field based on defined patterns and split the message into 2 x new fields (logtime and event). After processing, the new struct will be as below:

    {
              "host" => "localhost",
           "logtime" => "Feb 21 07:16:20",
           "message" => "Feb 21 07:16:20 devbox avahi-daemon[324]: Registering new address record for 10.0.2.15 on enp0s3.IPv4\r",
             "event" => "devbox avahi-daemon[324]: Registering new address record for 10.0.2.15 on enp0s3.IPv4\r",
          "@version" => "1",
              "port" => 58656,
              "type" => "syslog",
        "@timestamp" => 2020-02-21T07:16:25.507Z
    }
    
  • date: this filter will parse the logtime field (created by the grok filter) based on defined patterns (specified in its match) and tune time based on locale and timezone options to get a timestamp in UTC which will be used to replace the original @timestamp field. After processing, the new struct will be as below (the 2 x timestamp information are the same now):

    {
              "host" => "localhost",
           "logtime" => "Feb 21 07:16:20",
           "message" => "Feb 21 07:16:20 devbox avahi-daemon[324]: Registering new address record for 10.0.2.15 on enp0s3.IPv4\r",
             "event" => "devbox avahi-daemon[324]: Registering new address record for 10.0.2.15 on enp0s3.IPv4\r",
          "@version" => "1",
              "port" => 58662,
              "type" => "syslog",
        "@timestamp" => 2020-02-21T07:16:20.000Z
    }
    
  • mutate: this filter will drop the logtime field and event field (created by grok). After processing, the new struct will be as below:

    {
              "host" => "localhost",
           "message" => "Feb 21 07:16:20 devbox avahi-daemon[324]: Registering new address record for 10.0.2.15 on enp0s3.IPv4\r",
          "@version" => "1",
              "port" => 58668,
              "type" => "syslog",
        "@timestamp" => 2020-02-21T07:16:20.000Z
    }
    
  • Done

References:

Consolidate journal logs

After adopting systemd on Linux, logs for services can be checked easily with the consolidated journalctl tool. The classic logs (/var/log/messages, etc.) are used mainly for system related inforamtion logging (journald will also do the same actually).

Due to the powerful functions of journald (and journalctl), more and more modern Linux distributions even won’t install the syslog related packages - one can only check logs with journalctl. Under such a condition, consolidating journals to Elastic stack becomes a common task.

  • Define a systemd service as below (/etc/systemd/system/remote-syslog.service):

    [Unit]
    Description=Remote-syslog
    After=systemd-journald.service
    Requires=systemd-journald.service
    
    [Service]
    ExecStartPre=/usr/sbin/iptables -A INPUT -p tcp --dport <syslog server port> -j ACCEPT
    ExecStart=/bin/sh -c "journalctl -f | ncat <syslog server address> <syslog server port>"
    TimeoutStartSec=0
    Restart=on-failure
    RestartSec=5s
    
    [Install]
    WantedBy=multi-user.target
    
  • Start the service

    sudo systemctl daemon-reload
    sudo systemctl enable remote-syslog.service
    sudo systemctl start remote-syslog.service
    
  • Notes

    • ncat is a tool provided by nmap which works as the linux cat command for network sockets
    • syslog server needs to open a TCP port for the connection (define a corresponding Logstash pipeline)(if UDP is preferred, “use ncat -u”)

Alerting Management with Elastalert

Elastic stack ships with powerful built-in alerting management capability through X-Pack which is not free of charge. Thanks to the OSS contributors, there exists a powerful open source alternative named Elastalert.

Elastalert is a rule based alerting implementation, and supports a large num. of alerting channels such as MS teams, Jira, etc. Please refer to its official documentation for details.

KQL

Query a field which contains a specified string

Notes:

  • Use regular expression to match the preceding/succeeding part of the string;
  • Do not use quote around the string.
message: *reboot*