I've setup the Elasticsearch, Logstash, Kibana log viewing tools on my systems. There are 2 machines in my configuration now (Amazon EC2 instances):
- 54.251.120.171 - Logstash-server where ELK is installed
- 54.249.59.224 - Logstash-forwarder - sends "/var/logs/messages" log to Logstash-server
On logstash-server, this what my configs (in different files) look like :-
input {
lumberjack {
port => 5000
type => "logs"
ssl_certificate => "/etc/pki/tls/certs/logstash-forwarder.crt"
ssl_key => "/etc/pki/tls/private/logstash-forwarder.key"
}
}
filter {
if [type] == "syslog" {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
add_field => [ "received_at", "%{@timestamp}" ]
add_field => [ "received_from", "%{host}" ]
}
syslog_pri { }
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
}
}
output {
elasticsearch { host => localhost }
stdout { codec => rubydebug }
}
On logstash-forwarder, this what my config file looks like, it forwards the /var/log/messages, /var/log/secure logs to logstash-server
{
"network": {
"servers": [ "54.251.120.171:5000" ],
"timeout": 15,
"ssl ca": "/etc/pki/tls/certs/logstash-forwarder.crt"
},
"files": [
{
"paths": [
"/var/log/messages",
"/var/log/secure"
],
"fields": { "type": "syslog" }
}
]
}
This is what my Kibana interface looks like after it has fetched the indexed logs from Elasticsearch.
So my question is, I need a way to retrieve IP address of the logstash-forwarder i.e. 54.249.59.224 in case there's a log event.
The reason why I'm asking this is in a real scenario, we might have many logstash-forwarders (say 10), with all 10 of them sending logs to our logstash-server. So I need someway to tag all the log events, so that I can identify which logstash-server has sent which log event.
I'll need to use the IP address (maybe some other information as well) to search for log events in the Kibana interface.
Can someone please help me to do this? :)
Or incase someone has a better idea how to do this effectively in a different way, you're most welcome!