Difference between revisions of "Logstash"
Line 193: | Line 193: | ||
Anything that falls out the bottom will be marked with a _grokparsefailure tag and appear in logstash otherwise verbatim | Anything that falls out the bottom will be marked with a _grokparsefailure tag and appear in logstash otherwise verbatim | ||
+ | |||
+ | == output == | ||
+ | |||
+ | Below is the custom elasticsearch output method that i use to seperate Juniper flow messages into a different index so that their retention can be managed at a different rate to regular syslog messages. In my experience juniper flow messages number in the thousands per second on a busy firewall and will often need to be dumped after a few days. In contrast to this regular syslog messages could, with adequate management, be retained for several months or years. | ||
+ | |||
+ | <pre> | ||
+ | output { | ||
+ | if "juniper_flow_create" in [tags] { | ||
+ | elasticsearch { | ||
+ | index => [ "firewall-%{+YYYY.MM.dd}" ] | ||
+ | host => localhost | ||
+ | } | ||
+ | } elseif "juniper_flow_close" in [tags] { | ||
+ | elasticsearch { | ||
+ | index => [ "firewall-%{+YYYY.MM.dd}" ] | ||
+ | host => localhost | ||
+ | } | ||
+ | } elseif "juniper_flow_deny" in [tags] { | ||
+ | elasticsearch { | ||
+ | index => [ "firewall-%{+YYYY.MM.dd}" ] | ||
+ | host => localhost | ||
+ | } | ||
+ | } else { | ||
+ | elasticsearch { | ||
+ | host => localhost | ||
+ | } | ||
+ | } | ||
+ | } | ||
+ | </pre> |
Revision as of 21:24, 18 November 2014
For various reasons, not least of which is timestamp normalisation and/or everything normalisation for that matter i chose to use a standard udp and tcp input method for logstash and skip the in-built syslog one. This meant that i needed to reinvent the wheel somewhat when it came to grok filters however. So here i have documented the grok filters that I built and why.
Contents
- 1 input
- 2 filter
- 2.1 Delete useless log messages
- 2.2 Match Cisco ACL logs
- 2.3 Match other Cisco logs
- 2.4 Match juniper command input
- 2.5 Match juniper flow session create
- 2.6 Match juniper flow session close
- 2.7 Match juniper flow session deny
- 2.8 Match linux syslog messages
- 2.9 Match anything else that looks syslog-y
- 2.10 Clean up sucessful groks and set the message field properly
- 2.11 Anything else
- 3 output
input
input { udp { port => 514 type => syslog } tcp { port => 514 type => syslog } }
filter
filter { if [type] == "syslog" {
Delete useless log messages
if [message] =~ /Access Server SDK: No log writers/ or [message] =~ /Warning: Could not obtain lock on \/mnt\/oracle\/ohs/ or [message] =~ /last message repeated [0-9]+ times/ { drop { } }
Match Cisco ACL logs
grok { match => { "message" => "%{SYSLOG5424PRI}%{NUMBER:sequence}: %{SYSLOGTIMESTAMP:log_timestamp}: \%SEC-6-IPACCESSLOGP: list %{INT:acl_number} %{WORD:action} %{WORD:protocol} %{IP:src_ip}\(%{NUMBER:src_port}\) \-\> %{IP:dst_ip}\(%{NUMBER:dst_port}\), %{INT:packets} packet" } add_tag => ["grok_match", "cisco_acl_message"] remove_field => [ "message" ] }
Match other Cisco logs
if "_grokparsefailure" in [tags] { grok { match => { "message" => "%{SYSLOG5424PRI}%{NUMBER:sequence}: %{SYSLOGTIMESTAMP:log_timestamp}: %{GREEDYDATA:message_remainder}" } add_tag => ["grok_match", "cisco_message", "replace_message"] remove_tag => [ "_grokparsefailure" ] } }
Match juniper command input
if "_grokparsefailure" in [tags] { grok { match => { "message" => "%{SYSLOG5424PRI}%{SYSLOGTIMESTAMP} %{HOSTNAME:hostname} %{WORD:process_name}\[%{NUMBER:process_number}\]: UI_CMDLINE_READ_LINE: User '%{USER:user}', command '%{GREEDYDATA:command}'" } add_tag => ["grok_match", "juniper_command"] remove_tag => [ "_grokparsefailure" ] remove_field => [ "message" ] } }
Match juniper flow session create
if "_grokparsefailure" in [tags] { grok { match => { "message" => "%{SYSLOG5424PRI}%{NUMBER} %{TIMESTAMP_ISO8601} %{HOSTNAME:hostname} RT_FLOW - RT_FLOW_SESSION_CREATE \[.*%{SPACE}source-address=\"%{IP:src_ip}\"%{SPACE}source-port=\"%{NUMBER:src_port}\"%{SPACE}destination-address=\"%{IP:dst_ip}\"%{SPACE}destination-port=\"%{NUMBER:dst_port}\"%{SPACE}service-name=\"%{DATA:juniper_service_name}\"%{SPACE}nat-source-address=\"%{IP:nat_src_ip}\"%{SPACE}nat-source-port=\"%{NUMBER:nat_src_port}\"%{SPACE}nat-destination-address=\"%{IP:nat_dst_ip}\"%{SPACE}nat-destination-port=\"%{NUMBER:nat_dst_port}\"%{SPACE}src-nat-rule-name=\"%{DATA:src_nat_rule_name}\"%{SPACE}dst-nat-rule-name=\"%{DATA:dst_nat_rule_name}\"%{SPACE}protocol-id=\"%{DATA:protocol_id}\"%{SPACE}policy-name=\"%{DATA:policy_name}\"%{SPACE}source-zone-name=\"%{DATA:src_zone_name}\"%{SPACE}destination-zone-name=\"%{DATA:dst_zone_name}\"%{SPACE}session-id-32=\"%{NUMBER:session_id}\"%{SPACE}username=\"%{DATA:user}\"%{SPACE}roles=\"%{DATA:roles}\"%{SPACE}packet-incoming-interface=\"%{DATA:incoming_interface}\"\]" } add_tag => ["grok_match", "juniper_flow_create"] remove_tag => [ "_grokparsefailure" ] remove_field => [ "message" ] } }
Match juniper flow session close
if "_grokparsefailure" in [tags] { grok { match => { "message" => "%{SYSLOG5424PRI}%{NUMBER} %{TIMESTAMP_ISO8601} %{HOSTNAME:hostname} RT_FLOW - RT_FLOW_SESSION_CLOSE \[.*%{SPACE}reason=\"%{DATA:reason}\"%{SPACE}source-address=\"%{IP:src_ip}\"%{SPACE}source-port=\"%{NUMBER:src_port}\"%{SPACE}destination-address=\"%{IP:dst_ip}\"%{SPACE}destination-port=\"%{NUMBER:dst_port}\"%{SPACE}service-name=\"%{DATA:juniper_service_name}\"%{SPACE}nat-source-address=\"%{IP:nat_src_ip}\"%{SPACE}nat-source-port=\"%{NUMBER:nat_src_port}\"%{SPACE}nat-destination-address=\"%{IP:nat_dst_ip}\"%{SPACE}nat-destination-port=\"%{NUMBER:nat_dst_port}\"%{SPACE}src-nat-rule-name=\"%{DATA:src_nat_rule_name}\"%{SPACE}dst-nat-rule-name=\"%{DATA:dst_nat_rule_name}\"%{SPACE}protocol-id=\"%{DATA:protocol_id}\"%{SPACE}policy-name=\"%{DATA:policy_name}\"%{SPACE}source-zone-name=\"%{DATA:src_zone_name}\"%{SPACE}destination-zone-name=\"%{DATA:dst_zone_name}\"%{SPACE}session-id-32=\"%{NUMBER:session_id}\"%{SPACE}packets-from-client=\"%{NUMBER:packets_from_client}\"%{SPACE}bytes-from-client=\"%{NUMBER:bytes_from_client}\"%{SPACE}packets-from-server=\"%{NUMBER:packets_from_server}\"%{SPACE}bytes-from-server=\"%{NUMBER:bytes_from_server}\"%{SPACE}elapsed-time=\"%{DATA:elapsed_time}\"%{SPACE}application=\"%{DATA:application}\"%{SPACE}nested-application=\"%{DATA:nested_application}\"%{SPACE}username=\"%{DATA:user}\"%{SPACE}roles=\"%{DATA:roles}\"%{SPACE}packet-incoming-interface=\"%{DATA:incoming_interface}\"\]" } add_tag => ["grok_match", "juniper_flow_close"] remove_tag => [ "_grokparsefailure" ] remove_field => [ "message" ] } }
Match juniper flow session deny
if "_grokparsefailure" in [tags] { grok { match => { "message" => "%{SYSLOG5424PRI}%{NUMBER} %{TIMESTAMP_ISO8601} %{HOSTNAME:hostname} RT_FLOW - RT_FLOW_SESSION_DENY \[.*%{SPACE}source-address=\"%{IP:src_ip}\"%{SPACE}source-port=\"%{NUMBER:src_port}\"%{SPACE}destination-address=\"%{IP:dst_ip}\"%{SPACE}destination-port=\"%{NUMBER:dst_port}\"%{SPACE}service-name=\"%{DATA:juniper_service_name}\"%{SPACE}protocol-id=\"%{DATA:protocol_id}\"%{SPACE}icmp-type=\"%{DATA:icmp_type}\"%{SPACE}policy-name=\"%{DATA:policy_name}\"%{SPACE}source-zone-name=\"%{DATA:src_zone_name}\"%{SPACE}destination-zone-name=\"%{DATA:dst_zone_name}\"%{SPACE}application=\"%{DATA:application}\"%{SPACE}nested-application=\"%{DATA:nested_application}\"%{SPACE}username=\"%{DATA:user}\"%{SPACE}roles=\"%{DATA:roles}\"%{SPACE}packet-incoming-interface=\"%{DATA:incoming_interface}\"\]" } add_tag => ["grok_match", "juniper_flow_deny"] remove_tag => [ "_grokparsefailure" ] remove_field => [ "message" ] } }
Match linux syslog messages
if "_grokparsefailure" in [tags] { grok { match => { "message" => "%{SYSLOG5424PRI}%{SYSLOGTIMESTAMP} %{HOSTNAME:hostname} %{WORD:process_name}\[%{NUMBER:process_number}\]: %{GREEDYDATA:message_remainder}" } add_tag => ["grok_match", "linux_syslog", "replace_message"] remove_tag => [ "_grokparsefailure" ] } }
Match anything else that looks syslog-y
if "_grokparsefailure" in [tags] { grok { match => { "message" => "%{SYSLOG5424PRI}%{SYSLOGTIMESTAMP}%{SPACE}%{GREEDYDATA:message_remainder}" } add_tag => ["grok_match", "other_syslog", "replace_message"] remove_tag => [ "_grokparsefailure" ] } }
Clean up sucessful groks and set the message field properly
if "replace_message" in [tags] { mutate { replace => [ "message" , "%{message_remainder}" ] remove_field => [ "message_remainder" ] remove_tag => [ "replace_message" ] } }
Anything else
Anything that falls out the bottom will be marked with a _grokparsefailure tag and appear in logstash otherwise verbatim
output
Below is the custom elasticsearch output method that i use to seperate Juniper flow messages into a different index so that their retention can be managed at a different rate to regular syslog messages. In my experience juniper flow messages number in the thousands per second on a busy firewall and will often need to be dumped after a few days. In contrast to this regular syslog messages could, with adequate management, be retained for several months or years.
output { if "juniper_flow_create" in [tags] { elasticsearch { index => [ "firewall-%{+YYYY.MM.dd}" ] host => localhost } } elseif "juniper_flow_close" in [tags] { elasticsearch { index => [ "firewall-%{+YYYY.MM.dd}" ] host => localhost } } elseif "juniper_flow_deny" in [tags] { elasticsearch { index => [ "firewall-%{+YYYY.MM.dd}" ] host => localhost } } else { elasticsearch { host => localhost } } }