Tips¶
How to add tags based on filed content with pipelines?¶
If the filed’s name is known, it can be used directly. If not, use “message” which holds everything.
filter {
if [message] =~ /regexp/ {
mutate {
add_tag => [ "tag1", "tag2" ]
}
}
}
Integrate Kafka¶
Kafka can be integrated into the middle of an Elastic Stack. The simplest implementation is leveraging the kafka input/output plugin of logstash directly. With that, the data flow looks as below:
- data source -> logstash -> kafka
- kafka -> logstash -> elasticsearch
More information on scaling Logstash can be found from Deploying and Scaling Logstash
Send into Kafka¶
Create a logstash pipeline as below:
input { tcp { port => 5000 tags => ["syslog", "topic1"] } } output { kafka { id => "topic1" # It is recommended to use different id for different Logstash pipelines topic_id => "topic1" codec => json bootstrap_servers => "kafka_server1:9092,kafka_server2:9092,kafka_server3:9092" } }
Send a test information:
telnet logstash_server 5000 message1 message2
Verify the messages have been sent to Kafka successfully:
bin/kafka-console-consumer.sh --bootstrap-server "kafka_server1:9092,kafka_server2:9092,kafka_server3:9092" --topic topic1 --from-beginning
Read from Kafka and Send to Elasticsearch¶
Create a logstash pipeline as below:
input { kafka { client_id => "logstash_server" # It is recommended to use different client_id for different Logstash pipelines group_id => "logstash_server" topics => ["topic1"] codec => "json" bootstrap_servers => "kafka_server1:9092,kafka_server2:9092,kafka_server3:9092" } } output { elasticsearch { hosts => ["http://elasticsearch1:9200", "http://elasticsearch2:9200", "http://elasticsearch3:9200"] index => "topic1-%{+YYYY.MM.dd}" } }
From Kibana, the informaiton should be able to be seen
Add Tags to Different Kafka Topics¶
Notes: [@metadata][kafka][topic] will be empty sometimes due to unknown issues. Hence this tip is listed here for reference.
input {
kafka {
client_id => "logstash_server"
group_id => "logstash"
topics => ["unity", "xio"]
codec => "json"
bootstrap_servers => "kafka_server1:9092,kafka_server2:9092,kafka_server3:9092"
}
}
filter {
if [@metadata][kafka][topic] == "unity" {
mutate { add_tag => ["unity"] }
}
if [@metadata][kafka][topic] == "xio" {
mutate { add_tag => ["xio"] }
}
}
output {
elasticsearch {
hosts => ["http://elasticsearch1:9200", "http://elasticsearch2:9200", "http://elasticsearch3:9200"]
index => "storagebox-%{+YYYY.MM.dd}"
}
}
Rename the Host Field while Sending Filebeat Events to Logstash¶
If filebeat is sending events to Elasticsearch directly, everything works fine. However, if filebeat is sending events to an index already used by Logstash where syslog(TCP/UDP input) is also sending events to, error on the host filed will be raised:
- TCP/UDP input plugin of Logstash will add a field host to stand for where the information is generated. This field is a string;
- Filebeat sends events with a filed host which is an object(dict);
- Because of the difference, Elasticsearch cannot map the host field correctly and generate index accordingly.
To fix this, the mutate filter plugin can be used to rename the host field of Filebeat to a new name as below:
filter {
mutate {
rename => ["host", "server"]
}
}