Difference between revisions of "Logstash"

From Initech Technical Wiki
Jump to: navigation, search
m (Timprice moved page Logstash:input to Logstash)
 
(11 intermediate revisions by the same user not shown)
Line 1: Line 1:
 
For various reasons, not least of which is timestamp normalisation and/or everything normalisation for that matter i chose to use a standard udp and tcp input method for logstash and skip the in-built syslog one.  This meant that i needed to reinvent the wheel somewhat when it came to grok filters however.  So here i have documented the grok filters that I built and why.
 
For various reasons, not least of which is timestamp normalisation and/or everything normalisation for that matter i chose to use a standard udp and tcp input method for logstash and skip the in-built syslog one.  This meant that i needed to reinvent the wheel somewhat when it came to grok filters however.  So here i have documented the grok filters that I built and why.
 
'''Note, because pre formatted text doesn't wrap i've split long lines in logical places.'''
 
  
 
== input ==
 
== input ==
  
<pre>
+
<pre style="white-space: pre-wrap;
 +
white-space: -moz-pre-wrap;
 +
white-space: -pre-wrap;
 +
white-space: -o-pre-wrap;
 +
word-wrap: break-word;">
 
input {
 
input {
 
   udp {
 
   udp {
Line 20: Line 22:
 
== filter ==
 
== filter ==
  
<pre>
+
<pre style="white-space: pre-wrap;
 +
white-space: -moz-pre-wrap;
 +
white-space: -pre-wrap;
 +
white-space: -o-pre-wrap;
 +
word-wrap: break-word;">
 
filter {
 
filter {
 
if [type] == "syslog" {
 
if [type] == "syslog" {
Line 28: Line 34:
  
  
<pre>
+
<pre style="white-space: pre-wrap;
if [message] =~ /Access Server SDK: No log writers/ or  
+
white-space: -moz-pre-wrap;
[message] =~ /Warning: Could not obtain lock on \/mnt\/oracle\/ohs/ or  
+
white-space: -pre-wrap;
[message] =~ /last message repeated [0-9]+ times/ {
+
white-space: -o-pre-wrap;
 +
word-wrap: break-word;">
 +
if [message] =~ /Access Server SDK: No log writers/ or [message] =~ /Warning: Could not obtain lock on \/mnt\/oracle\/ohs/ or [message] =~ /last message repeated [0-9]+ times/ {
 
         drop { }
 
         drop { }
 
       }
 
       }
 +
</pre>
 +
 +
=== Match Cisco ACL logs ===
 +
 +
<pre style="white-space: pre-wrap;
 +
white-space: -moz-pre-wrap;
 +
white-space: -pre-wrap;
 +
white-space: -o-pre-wrap;
 +
word-wrap: break-word;">
 +
grok {
 +
match => { "message" => "%{SYSLOG5424PRI}%{NUMBER:sequence}: %{SYSLOGTIMESTAMP:log_timestamp}: \%SEC-6-IPACCESSLOGP: list %{INT:acl_number} %{WORD:action} %{WORD:protocol} %{IP:src_ip}\(%{NUMBER:src_port}\) \-\> %{IP:dst_ip}\(%{NUMBER:dst_port}\), %{INT:packets} packet" }
 +
add_tag => ["grok_match", "cisco_acl_message"]
 +
remove_field => [ "message" ]
 +
}
 +
</pre>
 +
 +
=== Match other Cisco logs ===
 +
 +
<pre style="white-space: pre-wrap;
 +
white-space: -moz-pre-wrap;
 +
white-space: -pre-wrap;
 +
white-space: -o-pre-wrap;
 +
word-wrap: break-word;">
 +
if "_grokparsefailure" in [tags] {
 +
grok {
 +
match => { "message" => "%{SYSLOG5424PRI}%{NUMBER:sequence}: %{SYSLOGTIMESTAMP:log_timestamp}: %{GREEDYDATA:message_remainder}" }
 +
add_tag => ["grok_match", "cisco_message", "replace_message"]
 +
remove_tag => [ "_grokparsefailure" ]
 +
}
 +
}
 +
</pre>
 +
 +
=== Match juniper command input ===
 +
 +
<pre style="white-space: pre-wrap;
 +
white-space: -moz-pre-wrap;
 +
white-space: -pre-wrap;
 +
white-space: -o-pre-wrap;
 +
word-wrap: break-word;">
 +
if "_grokparsefailure" in [tags] {
 +
grok {
 +
match => { "message" => "%{SYSLOG5424PRI}%{SYSLOGTIMESTAMP} %{HOSTNAME:hostname} %{WORD:process_name}\[%{NUMBER:process_number}\]: UI_CMDLINE_READ_LINE: User '%{USER:user}', command '%{GREEDYDATA:command}'" }
 +
add_tag => ["grok_match", "juniper_command"]
 +
remove_tag => [ "_grokparsefailure" ]
 +
remove_field => [ "message" ]
 +
}
 +
}
 +
</pre>
 +
 +
=== Match juniper flow session create ===
 +
 +
<pre style="white-space: pre-wrap;
 +
white-space: -moz-pre-wrap;
 +
white-space: -pre-wrap;
 +
white-space: -o-pre-wrap;
 +
word-wrap: break-word;">
 +
if "_grokparsefailure" in [tags] {
 +
grok {
 +
match => { "message" => "%{SYSLOG5424PRI}%{NUMBER} %{TIMESTAMP_ISO8601} %{HOSTNAME:hostname} RT_FLOW - RT_FLOW_SESSION_CREATE \[.*%{SPACE}source-address=\"%{IP:src_ip}\"%{SPACE}source-port=\"%{NUMBER:src_port}\"%{SPACE}destination-address=\"%{IP:dst_ip}\"%{SPACE}destination-port=\"%{NUMBER:dst_port}\"%{SPACE}service-name=\"%{DATA:juniper_service_name}\"%{SPACE}nat-source-address=\"%{IP:nat_src_ip}\"%{SPACE}nat-source-port=\"%{NUMBER:nat_src_port}\"%{SPACE}nat-destination-address=\"%{IP:nat_dst_ip}\"%{SPACE}nat-destination-port=\"%{NUMBER:nat_dst_port}\"%{SPACE}src-nat-rule-name=\"%{DATA:src_nat_rule_name}\"%{SPACE}dst-nat-rule-name=\"%{DATA:dst_nat_rule_name}\"%{SPACE}protocol-id=\"%{DATA:protocol_id}\"%{SPACE}policy-name=\"%{DATA:policy_name}\"%{SPACE}source-zone-name=\"%{DATA:src_zone_name}\"%{SPACE}destination-zone-name=\"%{DATA:dst_zone_name}\"%{SPACE}session-id-32=\"%{NUMBER:session_id}\"%{SPACE}username=\"%{DATA:user}\"%{SPACE}roles=\"%{DATA:roles}\"%{SPACE}packet-incoming-interface=\"%{DATA:incoming_interface}\"\]" }
 +
add_tag => ["grok_match", "juniper_flow_create"]
 +
remove_tag => [ "_grokparsefailure" ]
 +
remove_field => [ "message" ]
 +
}
 +
}
 +
</pre>
 +
 +
=== Match juniper flow session close ===
 +
 +
<pre style="white-space: pre-wrap;
 +
white-space: -moz-pre-wrap;
 +
white-space: -pre-wrap;
 +
white-space: -o-pre-wrap;
 +
word-wrap: break-word;">
 +
if "_grokparsefailure" in [tags] {
 +
grok {
 +
match => { "message" => "%{SYSLOG5424PRI}%{NUMBER} %{TIMESTAMP_ISO8601} %{HOSTNAME:hostname} RT_FLOW - RT_FLOW_SESSION_CLOSE \[.*%{SPACE}reason=\"%{DATA:reason}\"%{SPACE}source-address=\"%{IP:src_ip}\"%{SPACE}source-port=\"%{NUMBER:src_port}\"%{SPACE}destination-address=\"%{IP:dst_ip}\"%{SPACE}destination-port=\"%{NUMBER:dst_port}\"%{SPACE}service-name=\"%{DATA:juniper_service_name}\"%{SPACE}nat-source-address=\"%{IP:nat_src_ip}\"%{SPACE}nat-source-port=\"%{NUMBER:nat_src_port}\"%{SPACE}nat-destination-address=\"%{IP:nat_dst_ip}\"%{SPACE}nat-destination-port=\"%{NUMBER:nat_dst_port}\"%{SPACE}src-nat-rule-name=\"%{DATA:src_nat_rule_name}\"%{SPACE}dst-nat-rule-name=\"%{DATA:dst_nat_rule_name}\"%{SPACE}protocol-id=\"%{DATA:protocol_id}\"%{SPACE}policy-name=\"%{DATA:policy_name}\"%{SPACE}source-zone-name=\"%{DATA:src_zone_name}\"%{SPACE}destination-zone-name=\"%{DATA:dst_zone_name}\"%{SPACE}session-id-32=\"%{NUMBER:session_id}\"%{SPACE}packets-from-client=\"%{NUMBER:packets_from_client}\"%{SPACE}bytes-from-client=\"%{NUMBER:bytes_from_client}\"%{SPACE}packets-from-server=\"%{NUMBER:packets_from_server}\"%{SPACE}bytes-from-server=\"%{NUMBER:bytes_from_server}\"%{SPACE}elapsed-time=\"%{DATA:elapsed_time}\"%{SPACE}application=\"%{DATA:application}\"%{SPACE}nested-application=\"%{DATA:nested_application}\"%{SPACE}username=\"%{DATA:user}\"%{SPACE}roles=\"%{DATA:roles}\"%{SPACE}packet-incoming-interface=\"%{DATA:incoming_interface}\"\]" }
 +
add_tag => ["grok_match", "juniper_flow_close"]
 +
remove_tag => [ "_grokparsefailure" ]
 +
remove_field => [ "message" ]
 +
}
 +
}
 +
</pre>
 +
 +
=== Match juniper flow session deny ===
 +
 +
<pre style="white-space: pre-wrap;
 +
white-space: -moz-pre-wrap;
 +
white-space: -pre-wrap;
 +
white-space: -o-pre-wrap;
 +
word-wrap: break-word;">
 +
if "_grokparsefailure" in [tags] {
 +
grok {
 +
match => { "message" => "%{SYSLOG5424PRI}%{NUMBER} %{TIMESTAMP_ISO8601} %{HOSTNAME:hostname} RT_FLOW - RT_FLOW_SESSION_DENY \[.*%{SPACE}source-address=\"%{IP:src_ip}\"%{SPACE}source-port=\"%{NUMBER:src_port}\"%{SPACE}destination-address=\"%{IP:dst_ip}\"%{SPACE}destination-port=\"%{NUMBER:dst_port}\"%{SPACE}service-name=\"%{DATA:juniper_service_name}\"%{SPACE}protocol-id=\"%{DATA:protocol_id}\"%{SPACE}icmp-type=\"%{DATA:icmp_type}\"%{SPACE}policy-name=\"%{DATA:policy_name}\"%{SPACE}source-zone-name=\"%{DATA:src_zone_name}\"%{SPACE}destination-zone-name=\"%{DATA:dst_zone_name}\"%{SPACE}application=\"%{DATA:application}\"%{SPACE}nested-application=\"%{DATA:nested_application}\"%{SPACE}username=\"%{DATA:user}\"%{SPACE}roles=\"%{DATA:roles}\"%{SPACE}packet-incoming-interface=\"%{DATA:incoming_interface}\"\]" }
 +
add_tag => ["grok_match", "juniper_flow_deny"]
 +
remove_tag => [ "_grokparsefailure" ]
 +
remove_field => [ "message" ]
 +
}
 +
}
 +
</pre>
 +
 +
=== Match linux syslog messages ===
 +
 +
<pre style="white-space: pre-wrap;
 +
white-space: -moz-pre-wrap;
 +
white-space: -pre-wrap;
 +
white-space: -o-pre-wrap;
 +
word-wrap: break-word;">
 +
if "_grokparsefailure" in [tags] {
 +
grok {
 +
match => { "message" => "%{SYSLOG5424PRI}%{SYSLOGTIMESTAMP} %{HOSTNAME:hostname} %{WORD:process_name}\[%{NUMBER:process_number}\]: %{GREEDYDATA:message_remainder}" }
 +
add_tag => ["grok_match", "linux_syslog", "replace_message"]
 +
remove_tag => [ "_grokparsefailure" ]
 +
}
 +
}
 +
</pre>
 +
 +
=== Match anything else that looks syslog-y ===
 +
 +
<pre style="white-space: pre-wrap;
 +
white-space: -moz-pre-wrap;
 +
white-space: -pre-wrap;
 +
white-space: -o-pre-wrap;
 +
word-wrap: break-word;">
 +
if "_grokparsefailure" in [tags] {
 +
grok {
 +
match => { "message" => "%{SYSLOG5424PRI}%{SYSLOGTIMESTAMP}%{SPACE}%{GREEDYDATA:message_remainder}" }
 +
add_tag => ["grok_match", "other_syslog", "replace_message"]
 +
remove_tag => [ "_grokparsefailure" ]
 +
}
 +
}
 +
</pre>
 +
 +
=== Clean up sucessful groks and set the message field properly ===
 +
 +
<pre style="white-space: pre-wrap;
 +
white-space: -moz-pre-wrap;
 +
white-space: -pre-wrap;
 +
white-space: -o-pre-wrap;
 +
word-wrap: break-word;">
 +
if "replace_message" in [tags] {
 +
mutate {
 +
replace => [ "message" , "%{message_remainder}" ]
 +
remove_field => [ "message_remainder" ]
 +
remove_tag => [ "replace_message" ]
 +
}
 +
}
 +
</pre>
 +
 +
=== Anything else ===
 +
 +
Anything that falls out the bottom will be marked with a _grokparsefailure tag and appear in logstash otherwise verbatim
 +
 +
== output ==
 +
 +
Below is the custom elasticsearch output method that i use to seperate Juniper flow messages into a different index so that their retention can be managed at a different rate to regular syslog messages.  In my experience juniper flow messages number in the thousands per second on a busy firewall and will often need to be dumped after a few days.  In contrast to this regular syslog messages could, with adequate management, be retained for several months or years.  If you run non-default index names then the kibana dashboard won't display those indexes without [[Kibana dashboard modifications|some modifications]].
 +
 +
<pre>
 +
output {
 +
  if "juniper_flow_create" in [tags] {
 +
  elasticsearch {
 +
index => [ "firewall-%{+YYYY.MM.dd}" ]
 +
host => localhost
 +
}
 +
  } elseif "juniper_flow_close" in [tags] {
 +
        elasticsearch {
 +
                index => [ "firewall-%{+YYYY.MM.dd}" ]
 +
                host => localhost
 +
        }
 +
  } elseif "juniper_flow_deny" in [tags] {
 +
        elasticsearch {
 +
                index => [ "firewall-%{+YYYY.MM.dd}" ]
 +
                host => localhost
 +
        }
 +
  } else {
 +
        elasticsearch {
 +
                host => localhost
 +
        }
 +
  }
 +
}
 
</pre>
 
</pre>

Latest revision as of 21:34, 18 November 2014

For various reasons, not least of which is timestamp normalisation and/or everything normalisation for that matter i chose to use a standard udp and tcp input method for logstash and skip the in-built syslog one. This meant that i needed to reinvent the wheel somewhat when it came to grok filters however. So here i have documented the grok filters that I built and why.

input

input {
  	udp {
		port => 514
		type => syslog
  	}
	tcp {
		port => 514
		type => syslog
	}
}

filter

filter {
	if [type] == "syslog" {

Delete useless log messages

		if [message] =~ /Access Server SDK: No log writers/ or [message] =~ /Warning: Could not obtain lock on \/mnt\/oracle\/ohs/ or [message] =~ /last message repeated [0-9]+ times/ {
         		drop { }
      		}

Match Cisco ACL logs

		grok {
			match => { "message" => "%{SYSLOG5424PRI}%{NUMBER:sequence}: %{SYSLOGTIMESTAMP:log_timestamp}: \%SEC-6-IPACCESSLOGP: list %{INT:acl_number} %{WORD:action} %{WORD:protocol} %{IP:src_ip}\(%{NUMBER:src_port}\) \-\> %{IP:dst_ip}\(%{NUMBER:dst_port}\), %{INT:packets} packet" }
			add_tag => ["grok_match", "cisco_acl_message"]
			remove_field => [ "message" ]
		}

Match other Cisco logs

		if "_grokparsefailure" in [tags] {
			grok {
				match => { "message" => "%{SYSLOG5424PRI}%{NUMBER:sequence}: %{SYSLOGTIMESTAMP:log_timestamp}: %{GREEDYDATA:message_remainder}" }
				add_tag => ["grok_match", "cisco_message", "replace_message"]
				remove_tag => [ "_grokparsefailure" ]
			}
		}

Match juniper command input

		if "_grokparsefailure" in [tags] {
			grok {
				match => { "message" => "%{SYSLOG5424PRI}%{SYSLOGTIMESTAMP} %{HOSTNAME:hostname} %{WORD:process_name}\[%{NUMBER:process_number}\]: UI_CMDLINE_READ_LINE: User '%{USER:user}', command '%{GREEDYDATA:command}'" }
				add_tag => ["grok_match", "juniper_command"]
				remove_tag => [ "_grokparsefailure" ]
				remove_field => [ "message" ]
			}
		}

Match juniper flow session create

		if "_grokparsefailure" in [tags] {
			grok {
				match => { "message" => "%{SYSLOG5424PRI}%{NUMBER} %{TIMESTAMP_ISO8601} %{HOSTNAME:hostname} RT_FLOW - RT_FLOW_SESSION_CREATE \[.*%{SPACE}source-address=\"%{IP:src_ip}\"%{SPACE}source-port=\"%{NUMBER:src_port}\"%{SPACE}destination-address=\"%{IP:dst_ip}\"%{SPACE}destination-port=\"%{NUMBER:dst_port}\"%{SPACE}service-name=\"%{DATA:juniper_service_name}\"%{SPACE}nat-source-address=\"%{IP:nat_src_ip}\"%{SPACE}nat-source-port=\"%{NUMBER:nat_src_port}\"%{SPACE}nat-destination-address=\"%{IP:nat_dst_ip}\"%{SPACE}nat-destination-port=\"%{NUMBER:nat_dst_port}\"%{SPACE}src-nat-rule-name=\"%{DATA:src_nat_rule_name}\"%{SPACE}dst-nat-rule-name=\"%{DATA:dst_nat_rule_name}\"%{SPACE}protocol-id=\"%{DATA:protocol_id}\"%{SPACE}policy-name=\"%{DATA:policy_name}\"%{SPACE}source-zone-name=\"%{DATA:src_zone_name}\"%{SPACE}destination-zone-name=\"%{DATA:dst_zone_name}\"%{SPACE}session-id-32=\"%{NUMBER:session_id}\"%{SPACE}username=\"%{DATA:user}\"%{SPACE}roles=\"%{DATA:roles}\"%{SPACE}packet-incoming-interface=\"%{DATA:incoming_interface}\"\]" }
				add_tag => ["grok_match", "juniper_flow_create"]
				remove_tag => [ "_grokparsefailure" ]
				remove_field => [ "message" ]
			}
		}

Match juniper flow session close

		if "_grokparsefailure" in [tags] {
			grok {
				match => { "message" => "%{SYSLOG5424PRI}%{NUMBER} %{TIMESTAMP_ISO8601} %{HOSTNAME:hostname} RT_FLOW - RT_FLOW_SESSION_CLOSE \[.*%{SPACE}reason=\"%{DATA:reason}\"%{SPACE}source-address=\"%{IP:src_ip}\"%{SPACE}source-port=\"%{NUMBER:src_port}\"%{SPACE}destination-address=\"%{IP:dst_ip}\"%{SPACE}destination-port=\"%{NUMBER:dst_port}\"%{SPACE}service-name=\"%{DATA:juniper_service_name}\"%{SPACE}nat-source-address=\"%{IP:nat_src_ip}\"%{SPACE}nat-source-port=\"%{NUMBER:nat_src_port}\"%{SPACE}nat-destination-address=\"%{IP:nat_dst_ip}\"%{SPACE}nat-destination-port=\"%{NUMBER:nat_dst_port}\"%{SPACE}src-nat-rule-name=\"%{DATA:src_nat_rule_name}\"%{SPACE}dst-nat-rule-name=\"%{DATA:dst_nat_rule_name}\"%{SPACE}protocol-id=\"%{DATA:protocol_id}\"%{SPACE}policy-name=\"%{DATA:policy_name}\"%{SPACE}source-zone-name=\"%{DATA:src_zone_name}\"%{SPACE}destination-zone-name=\"%{DATA:dst_zone_name}\"%{SPACE}session-id-32=\"%{NUMBER:session_id}\"%{SPACE}packets-from-client=\"%{NUMBER:packets_from_client}\"%{SPACE}bytes-from-client=\"%{NUMBER:bytes_from_client}\"%{SPACE}packets-from-server=\"%{NUMBER:packets_from_server}\"%{SPACE}bytes-from-server=\"%{NUMBER:bytes_from_server}\"%{SPACE}elapsed-time=\"%{DATA:elapsed_time}\"%{SPACE}application=\"%{DATA:application}\"%{SPACE}nested-application=\"%{DATA:nested_application}\"%{SPACE}username=\"%{DATA:user}\"%{SPACE}roles=\"%{DATA:roles}\"%{SPACE}packet-incoming-interface=\"%{DATA:incoming_interface}\"\]" }
				add_tag => ["grok_match", "juniper_flow_close"]
				remove_tag => [ "_grokparsefailure" ]
				remove_field => [ "message" ]
			}
		}

Match juniper flow session deny

		if "_grokparsefailure" in [tags] {
			grok {
				match => { "message" => "%{SYSLOG5424PRI}%{NUMBER} %{TIMESTAMP_ISO8601} %{HOSTNAME:hostname} RT_FLOW - RT_FLOW_SESSION_DENY \[.*%{SPACE}source-address=\"%{IP:src_ip}\"%{SPACE}source-port=\"%{NUMBER:src_port}\"%{SPACE}destination-address=\"%{IP:dst_ip}\"%{SPACE}destination-port=\"%{NUMBER:dst_port}\"%{SPACE}service-name=\"%{DATA:juniper_service_name}\"%{SPACE}protocol-id=\"%{DATA:protocol_id}\"%{SPACE}icmp-type=\"%{DATA:icmp_type}\"%{SPACE}policy-name=\"%{DATA:policy_name}\"%{SPACE}source-zone-name=\"%{DATA:src_zone_name}\"%{SPACE}destination-zone-name=\"%{DATA:dst_zone_name}\"%{SPACE}application=\"%{DATA:application}\"%{SPACE}nested-application=\"%{DATA:nested_application}\"%{SPACE}username=\"%{DATA:user}\"%{SPACE}roles=\"%{DATA:roles}\"%{SPACE}packet-incoming-interface=\"%{DATA:incoming_interface}\"\]" }
				add_tag => ["grok_match", "juniper_flow_deny"]
				remove_tag => [ "_grokparsefailure" ]
				remove_field => [ "message" ]
			}
		}

Match linux syslog messages

		if "_grokparsefailure" in [tags] {
			grok {
				match => { "message" => "%{SYSLOG5424PRI}%{SYSLOGTIMESTAMP} %{HOSTNAME:hostname} %{WORD:process_name}\[%{NUMBER:process_number}\]: %{GREEDYDATA:message_remainder}" }
				add_tag => ["grok_match", "linux_syslog", "replace_message"]
				remove_tag => [ "_grokparsefailure" ]
			}
		}

Match anything else that looks syslog-y

		if "_grokparsefailure" in [tags] {
			grok {
				match => { "message" => "%{SYSLOG5424PRI}%{SYSLOGTIMESTAMP}%{SPACE}%{GREEDYDATA:message_remainder}" }
				add_tag => ["grok_match", "other_syslog", "replace_message"]
				remove_tag => [ "_grokparsefailure" ]
			}
		}

Clean up sucessful groks and set the message field properly

		if "replace_message" in [tags] {
			mutate {
				replace => [ "message" , "%{message_remainder}" ]
				remove_field => [ "message_remainder" ]
				remove_tag => [ "replace_message" ]
			}
		}

Anything else

Anything that falls out the bottom will be marked with a _grokparsefailure tag and appear in logstash otherwise verbatim

output

Below is the custom elasticsearch output method that i use to seperate Juniper flow messages into a different index so that their retention can be managed at a different rate to regular syslog messages. In my experience juniper flow messages number in the thousands per second on a busy firewall and will often need to be dumped after a few days. In contrast to this regular syslog messages could, with adequate management, be retained for several months or years. If you run non-default index names then the kibana dashboard won't display those indexes without some modifications.

output {
  if "juniper_flow_create" in [tags] {
  	elasticsearch { 
		index => [ "firewall-%{+YYYY.MM.dd}" ]
		host => localhost 
	}
  } elseif "juniper_flow_close" in [tags] {
        elasticsearch {
                index => [ "firewall-%{+YYYY.MM.dd}" ]
                host => localhost
        }
  } elseif "juniper_flow_deny" in [tags] {
        elasticsearch {
                index => [ "firewall-%{+YYYY.MM.dd}" ]
                host => localhost
        }
  } else {
        elasticsearch {
                host => localhost
        }
  }
}