This is the multi-page printable view of this section.
Click here to print.
Return to the regular view of this page.
Event Management
Before it was SIEM
Back in the dawn of time, we called it ‘Central Logging’ and it looked kind of like this:
# The classical way you'd implement this is via a tiered system.
Log Shipper --\ /--> Log Parser --\
Log Shipper ---+--> Log Broker --+---> Log Parser ---+--> Log Storage --> Log Visualizer
Log Shipper --/ \--> Log Parser --/
# The modern way is more distributed. The clients are more powerful so you spread the load out and they can connect to distributed storage directly.
Log Parser Shipper --\ /-- Log Storage <-\
Log Parser Shipper ---+--- Log Storage <--+- Visualizer
Log Parser Shipper --/ \-- Log Storage <-/
# ELK (Elasticsearch Logstash and Kibana) is a good example.
Logstash --\ /-- Elasticsearch <-\
Logstash ---+--- Elasticsearch <--+--> Kibana
Logstash --/ \-- Elasticsearch <-/
More recently, there’s a move toward shippers like NXLog and Elasticsearch’s beats client. A native client saves you from deploying Java and is better suited for thin or micro instances.
# NXLog has an output module for Elasticsearch now. Beats is an Elasticsearch product.
nxlog --\
nxlog ---+--> Elasticsearch <-- Kibana
beats --/
Windows has it’s own log forwarding technology. You can put it to work without installing anything on the clients. This makes Windows admins a lot happier.
# It's built-in and fine for windows events - just doesn't do text files. Beats can read the events and push to elasticsearch.
Windows Event Forwarding --\
Windows Event Forwarding ---+--> Central Windows Event Manager -> Beats/Elasticsearch --> Kibana
Windows Event Forwarding --/
Unix has several ways to do it, but the most modern/least-overhead way is to use the native journald
system.
# Built-in to systemd
journald send --> central journald receive --> Beats/Elasticsearch --> Kibana
But Why?
The original answer used to be ‘reporting’. It was easier to get all the data together and do an analysis in one place.
Now the answer is ‘correlation’. If someone is probing your systems, they’ll do it very slowly and from multiple IPs to evade thresholds if they can, trying to break up patterns of attack. These patterns can become clear however, when you have a complete picture in one place.
1 - Elastic Stack
This is also referred to ELK, and is an acronym that stands for Elasticsearch, Logstash and Kibana
This is a trio of tools that <www.elasticsearch.org> has packaged up into a simple and flexible way to handle, store and visualize data. Logstash collects the logs, parses them and stores them in Elasticsearch. Kibana is a web application that knows how to to talk to Elasticsearch and visualizes the data.
Quite simple and powerful
To make use of this tio, start by deploying in this order:
- Elasticseach (first, you have have some place to put things)
- Kibana (so you can see what’s going on in elasticsearch easily)
- Logstash (to start collecting data)
More recently, you can use the Elasticsearch Beats client in place of Logstash. These are natively compiled clients that have less capability, but are easier on the infrastructure than Logstash, a Java application.
1.1 - Elasticsearch
1.1.1 - Installation (Linux)
This is circa 2014 - use with a grain of salt.
This is generally the first step, as you need a place to collect your logs. Elasticsearch itself is a NoSQL database and well suited for pure-web style integrations.
Java is required, and you may wish to deploy Oracle’s java per the Elasticsearch team’s recommendation. You may also want to dedicate a data partition. By default, data is stored in /var/lib/elasticsearch and that can fill up. We will also install the ‘kopf’ plugin that makes it easier to manage your data.
Install Java and Elasticsearch
# (add a java repo)
sudo yum install java
# (add the elasticsearch repo)
sudo yum install elasticsearch
# Change the storage location
sudo mkdir /opt/elasticsearch
sudo chown elasticsearch:elasticsearch /opt/elasticsearch
sudo vim /etc/elasticsearch/elasticsearch.yml
...
path.data: /opt/elasticsearch/data
...
# Allow connections on ports 9200, 9300-9400 and set the cluster IP
# By design, Elasticsearch is open so control access with care
sudo iptables --insert INPUT --protocol tcp --source 10.18.0.0/16 --dport 9200 --jump ACCEPT
sudo iptables --insert INPUT --protocol tcp --source 10.18.0.0/16 --dport 9300:9300 --jump ACCEPT
sudo vim /etc/elasticsearch/elasticsearch.yml
...
# Failing to set the 'publish_host can result in the cluster auto-detecting an interface clients or other
# nodes can't reach. If you only have one interface you can leave commented out.
network.publish_host: 10.18.3.1
...
# Increase the heap size
sudo vim /etc/sysconfig/elasticsearch
# Heap size defaults to 256m min, 1g max
# Set ES_HEAP_SIZE to 50% of available RAM, but no more than 31g
ES_HEAP_SIZE=2g
# Install the kopf plugin and access it via your browser
sudo /usr/share/elasticsearch/bin/plugin -install lmenezes/elasticsearch-kopf
sudo service elasticsearch restart
In your browser, navigate to
http://10.18.3.1:9200/_plugin/kopf/
If everything is working correctly you should see a web page with KOPF at the top.
1.1.2 - Installation (Windows)
You may need to install on windows to ensure the ‘maximum amount of service ability with existing support staff’. I’ve used it on both Windows and Linux and it’s fine either way. Windows just requires a few more steps.
Requirements and Versions
The current version of Elasticsearch at time of writing these notes is 7.6. It requires an OS and Java. The latest of those supported are:
- Windows Server 2016
- OpenJDK 13
Installation
The installation instructions are at https://www.elastic.co/guide/en/elastic-stack-get-started/current/get-started-elastic-stack.html
Note: Elasicsearch has both an zip and a MSI. The former comes with a java distro but the MSI includes a service installer.
Java
The OpenJDK 13 GA Releases at https://jdk.java.net/13/ no longer include installers or the JRE. But you can install via a MSI from https://github.com/ojdkbuild/ojdkbuild
Download the latest java-13-openjdk-jre-13.X and execute. Use the advanced settings to include the configuration of the JAVA_HOME and other useful variables.
To test the install, open a command prompt and check the version
C:\Users\allen>java --version
openjdk 13.0.2 2020-01-14
OpenJDK Runtime Environment 19.9 (build 13.0.2+8)
OpenJDK 64-Bit Server VM 19.9 (build 13.0.2+8, mixed mode, sharing)
Elasticsearch
Download the MSI installer from https://www.elastic.co/downloads/elasticsearch. It may be tagged as beta, but it installs the GA product well. Importantly, it also installs a windows service for Elasticsearch.
Verify the installation by checking your services for ‘Elasticsearch’, which should be running.
Troubleshooting
Elasticsearch only listing on localhhost
By default, this is the case. You must edit the config file.
# In an elevated command prompt
notepad C:\ProgramDaata\Elastic\Elasticsearach\config\elasticsearch.yml
# add
discovery.type: single-node
network.host: 0.0.0.0
https://stackoverflow.com/questions/59350069/elasticsearch-start-up-error-the-default-discovery-settings-are-unsuitable-for
failure while checking if template exists: 405 Method Not Allowed
You can’t run newer versions of the filebeat with older versions of elasticsearch. Download the old deb and sudo apt install ./some.deb
https://discuss.elastic.co/t/filebeat-receives-http-405-from-elasticsearch-after-7-x-8-1-upgrade/303821
https://discuss.elastic.co/t/cant-start-filebeat/181050
1.1.3 - Common Tasks
This is circa 2014 - use with a grain of salt.
Configuration of elasticsearch itself is seldom needed. You will have to maintain the data in your indexes however. This is done by either using the kopf tool, or at the command line.
After you have some data in elasticsearch, you’ll see that your ‘documents’ are organized into ‘indexes’. This is a simply a container for your data that was specified when logstash originally sent it, and the naming is arbitrarily defined by the client.
Deleting Data
The first thing you’re likely to need is to delete some badly-parsed data from your testing.
Delete all indexes with the name test*
curl -XDELETE http://localhost:9200/test*
Delete from all indexes documents of type ‘WindowsEvent’
curl -XDELETE http://localhost:9200/_all/WindowsEvent
Delete from all indexes documents have the attribute ‘path’ equal to ‘/var/log/httpd/ssl_request.log’
curl -XDELETE 'http://localhost:9200/_all/_query?q=path:/var/log/https/ssl_request.log'
Delete from the index ’logstash-2014.10.29’ documents of type ‘shib-access’
curl -XDELETE http://localhost:9200/logstash-2014.10.29/shib-access
http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/docs-delete-by-query.html
Curator
All the maintenance by hand has to stop at some point and Curator is a good tool to automate some of it. This is a script that will do some curls for you, so to speak.
Install
wget https://bootstrap.pypa.io/get-pip.py
sudo python get-pip.py
sudo pip install elasticsearch-curator
sudo pip install argparse
Use
curator --help
curator delete --help
And in your crontab
# Note: you must escape % characters with a \ in crontabs
20 0 * * * curator delete indices --time-unit days --older-than 14 --timestring '\%Y.\%m.\%d' --regex '^logstash-bb-.*'
20 0 * * * curator delete indices --time-unit days --older-than 14 --timestring '\%Y.\%m.\%d' --regex '^logstash-adfsv2-.*'
20 0 * * * curator delete indices --time-unit days --older-than 14 --timestring '\%Y.\%m.\%d' --regex '^logstash-20.*'
Sometimes you’ll need to do an inverse match.
0 20 * * * curator delete indices --regex '^((?!logstash).)*$'
A good way to test your regex is by using the show indices method
curator show indices --regex '^((?!logstash).)*$'
Here’s some OLD posts and links, but be aware the syntax had changed and it’s been several versions since these
http://www.ragingcomputer.com/2014/02/removing-old-records-for-logstash-elasticsearch-kibana
http://www.elasticsearch.org/blog/curator-tending-your-time-series-indices/
http://stackoverflow.com/questions/406230/regular-expression-to-match-line-that-doesnt-contain-a-word
Replication and Yellow Cluster Status
By default, elasticsearch assumes you want to have two nodes and replicate your data and the default for new indexes is to have 1 replica. You may not want to do that to start with however, so you change the default and change the replica settings on your existing data in-bulk with:
http://stackoverflow.com/questions/24553718/updating-the-default-index-number-of-replicas-setting-for-new-indices
Set all existing replica requirements to just one copy
curl -XPUT 'localhost:9200/_settings' -d '
{
"index" : { "number_of_replicas" : 0 }
}'
Change the default settings for new indexes to have just one copy
curl -XPUT 'localhost:9200/_template/logstash_template' -d '
{
"template" : "*",
"settings" : {"number_of_replicas" : 0 }
} '
http://stackoverflow.com/questions/24553718/updating-the-default-index-number-of-replicas-setting-for-new-indices
Unassigned Shards
You will occasionally have a hiccup where you run out of disk space or something similar and be left with indexes that have no data in them or have shards unassigned. Generally, you will have to delete them but you can also manually reassign them.
http://stackoverflow.com/questions/19967472/elasticsearch-unassigned-shards-how-to-fix
Listing Index Info
You can get a decent human readable list of your indexes using the cat api
curl localhost:9200/_cat/indices
If you wanted to list by size, they use the example
curl localhost:9200/_cat/indices?bytes=b | sort -rnk8
1.2 - Kibana
1.2.1 - Installation (Windows)
Kibana is a Node.js app using the Express Web framework - meaning to us it looks like a web server running on port 5601. If you’re running elasticsearch on the same box, it will connect with the defaults.
https://www.elastic.co/guide/en/kibana/current/windows.html
Download and Extract
No MSI or installer is available for windows so you must download the .zip from https://www.elastic.co/downloads/kibana. Uncompress (this will take a while), rename it to ‘Kibana’ and move it to Program Files.
So that you may access it later, edit the config file at {location}/config/kibana.yaml with wordpad and set the server.host entry to:
server.host: "0.0.0.0"
Create a Service
Download the service manager NSSM from https://nssm.cc/download and extract. Start an admin powershell, navigate to the extracted location and run the installation command like so:
C:\Users\alleng\Downloads\nssm-2.24\nssm-2.24\win64> .\nssm.exe install Kibana
In the Pop-Up, set the application path to the below. The start up path will auto populate.
C:\Program Files\Kibana\kibana-7.6.2-windows-x86_64\bin\kibana.bat
Click ‘Install service’ and it should indicate success. Go to the service manager to find and start it. After a minute (Check process manager for the CPU to drop) You should be able to access it at:
http://localhost:5601/app/kibana#/home
1.2.2 - Troubleshooting
Rounding Errors
Kibana rounds to 16 significant digits
Turns out, if you have a value of type integer, that’s just the limit. While elasticsearch shows you this:
curl http://localhost:9200/logstash-db-2016/isim-process/8163783564660983218?pretty
{
"_index" : "logstash-db-2016",
"_type" : "isim-process",
"_id" : "8163783564660983218",
"_version" : 1,
"found" : true,
"_source":{"requester_name":"8163783564660983218","request_num":8163783618037078861,"started":"2016-04-07 15:16:16:139 GMT","completed":"2016-04-07 15:16:16:282 GMT","subject_service":"Service","request_type":"EP","result_summary":"AA","requestee_name":"Mr. Requester","subject":"mrRequest","@version":"1","@timestamp":"2016-04-07T15:16:16.282Z"}
}
Kibana shows you this
View: Table / JSON / Raw
Field Action Value
request_num 8163783618037079000
Looking at the JSON will give you the clue - it’s being treated as an integer and not a string.
"_source": {
"requester_name": "8163783564660983218",
"request_num": 8163783618037079000,
"started": "2016-04-07 15:16:16:139 GMT",
"completed": "2016-04-07 15:16:16:282 GMT",
Mutate it to string in logstash to get your precision back.
https://github.com/elastic/kibana/issues/4356
1.3 - Logstash
Logstash is a parser and shipper. It reads from (usually) a file, parses the data into JSON, then connects to something else and send the data. That something else can be Elasticsearch, a systlog server, and others.
Logstash v/s Beats
But for most things these days, Beats is a better choice. Give that a look fist.
1.3.1 - Installation
Note: Before you install logstash, take a look at Elasticsearch’s Beats. It’s lighter-weight for most tasks.
Quick Install
This is a summary of the current install page. Visit and adjust versions as needed.
# Install java
apt install default-jre-headless
apt-get install apt-transport-https
apt install gnupg2
wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | apt-key add -
# Check for the current version - 7 is no longer the current version by now
echo "deb https://artifacts.elastic.co/packages/7.x/apt stable main" | tee -a /etc/apt/sources.list.d/elastic-7.x.list
apt update
apt-get install logstash
Logstash has a NetFlow module, but it has been deprecated2. One should instead use the Filebeat Netflow Module.3
The rest of this page is circa 2014 - use with a grain of salt.
Installation - Linux Clients
Install Java
If you don’t already have it, install it. You’ll need at least 1.7 and Oracle is recommended. However, with older systems do yourself a favor and use the OpenJDK as older versions of Sun and IBM do things with cryptography leading to strange bugs in recent releases of logstash.
# On RedHat flavors, install the OpenJDK and select it for use (in case there are others) with the system alternatives utility
sudo yum install java-1.7.0-openjdk
sudo /usr/sbin/alternatives --config java
Install Logstash
This is essentially:
( Look at https://www.elastic.co/downloads/logstash to get the lastest version or add the repo)
wget (some link from the above page)
sudo yum --nogpgcheck localinstall logstash*
# You may want to grab a plugin, like the syslog output, though elasticsearch installs by default
cd /opt/logstash/
sudo bin/plugin install logstash-output-syslog
# If you're ready to configure the service
sudo vim /etc/logstash/conf.d/logstash.conf
sudo service logstash start
https://www.elastic.co/guide/en/logstash/current/index.html
Operating
The most common use of logstash is to tail and parse log files. You do this by specifying a file and filter like so
[gattis@someHost ~]$ vim /etc/logstash/conf.d/logstash.conf
input {
file {
path => "/var/log/httpd/request.log"
}
}
filter {
grok {
match => [ "message", "%{COMBINEDAPACHELOG}"]
}
}
output {
stdout {
codec => rubydebug
}
}
Filter
There are many different types of filters, but the main one you’ll be using is grok. It’s all about parsing the message into fields. Without this, you just have a bunch of un-indexed text in your database. It ships with some handy macros such as %{COMBINEDAPACHELOG} that takes this:
10.138.120.138 - schmoej [01/Apr/2016:09:39:04 -0400] "GET /some/url.do?action=start HTTP/1.1" 200 10680 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.87 Safari/537.36"
And turns it into
agent "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.87 Safari/537.36"
auth schmoej
bytes 10680
clientip 10.138.120.138
httpversion 1.1
path /var/pdweb/www-default/log/request.log
referrer "-"
request /some/url.do?action=start
response 200
timestamp 01/Apr/2016:09:39:04 -0400
verb GET
See the grok’ing for more details
Output
We’re outputting to the console so we can see what’s going on with our config. If you get some output, but it’s not parsed fully because of an error in the parsing, you’ll see something like the below with a “_grokparsefailure” tag. That means you have to dig into a custom pattern as in described in grok’ing.
Note: by default, logstash is ’tailing’ your logs, so you’ll only see new entries. If you’ve got no traffic you’ll have to generate some
{
"message" => "test message",
"@version" => "1",
"@timestamp" => "2014-10-31T17:39:28.925Z",
"host" => "some.app.private",
"tags" => [
[0] "_grokparsefailure"
]
}
If it looks good, you’ll want to send it on to your database. Change your output to look like so which will put your data in a default index that kibana (the visualizer) can show by default.
output {
elasticsearch {
hosts => ["10.17.153.1:9200"]
}
}
Troubleshooting
If you don’t get any output at all, check that the logstash user can actually read the file in question. Check your log files and try running logstash as yourself with the output going to the console.
cat /var/log/logstash/*
/opt/logstash/bin/logstash -f /etc/logstash/conf.d/logstash.conf
1.3.2 - Operation
Basic Operation
Generally, you create a config with 3 sections;
This example uses the grok
filter to parse the message.
sudo vi /etc/logstash/conf.d/logstash.conf
input {
file {
path => "/var/pdweb/www-default/log/request.log"
}
}
filter {
grok {
match => [ "message", "%{COMBINEDAPACHELOG}"]
}
}
output {
stdout { }
}
Then you test it at the command line
# Test the config file itself
/opt/logstash/bin/logstash -f /etc/logstash/conf.d/logstash.conf --configtest
# Test the parsing of data
/opt/logstash/bin/logstash -e -f /etc/logstash/conf.d/logstash.conf
You should get some nicely parsed lines. If that’s the case, you can edit your config to add a sincedb
and an actual destination.
input {
file {
path => "/var/pdweb/www-default/log/request.log"
sincedb_path => "/opt/logstash/sincedb"
}
}
filter {
grok {
match => [ "message", "%{COMBINEDAPACHELOG}"]
}
}
output {
elasticsearch {
host => "some.server.private"
protocol => "http"
}
}
If instead you see output with a _grokparsefailure
like below, you need to change the filter. Take a look at the common gotchas, then the parse failure section below it.
{
"message" => "test message",
"@version" => "1",
"@timestamp" => "2014-10-31T17:39:28.925Z",
"host" => "some.app.private",
"tags" => [
[0] "_grokparsefailure"
]
}
Common Gotchas
No New Data
Logstash reads new lines by default. If you don’t have anyone actually hitting your webserver, but you do have some log entries in the file itself, you can tell logstash to process the exiting entries and not save it’s place in the file.
file {
path => "/var/log/httpd/request.log"
start_position => "beginning"
sincedb_path => "/dev/null"
}
Multiple Conf files
Logstash uses all the files in the conf.d directory - even if they don’t end in .conf. Make sure to remove any you don’t want as they can conflict.
Default Index
Logstash creates Elasticsearch indexes that look like:
logstash-%{+YYYY.MM.dd}
The logstash folks have some great material on how to get started. Really top notch.
http://logstash.net/docs/1.4.2/configuration#fieldreferences
Parse Failures
The Greedy Method
The best way to start is to change your match to a simple pattern and work out from there. Try the ‘GREEDYDATA’ pattern and assign it to a field named ‘Test’. This takes the form of:
And it looks like:
filter {
grok {
match => [ "message" => "%{GREEDYDATA:Test}" ]
}
}
"message" => "test message",
"@version" => "1",
"@timestamp" => "2014-10-31T17:39:28.925Z",
"host" => "some.app.private",
"Test" => "The rest of your message
That should give you some output. You can then start cutting it up with the patterns (also called macros) found here;
You can also use the online grok debugger and the list of default patterns.
Combining Patterns
There may not be a standard pattern for what you want, but it’s easy to pull together several existing ones. Here’s an example that pulls in a custom timestamp.
Example:
Sun Oct 26 22:20:55 2014 File does not exist: /var/www/html/favicon.ico
Pattern:
match => { "message" => "(?<timestamp>%{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{YEAR})"}
Notice the ‘?’ at the beginning of the parenthetical enclosure. That tells the pattern matching engine not to bother capturing that for later use. Like opting out of a ( ) and \1 in sed.
Optional Fields
Some log formats simply skip columns when they don’t have data. This will cause your parse to fail unless you make some fields optional with a ‘?’, like this:
match => [ "message", "%{HOSTNAME:VHost}? %{COMBINEDAPACHELOG} %{IP:XForwardedFor}?"]
http://joda-time.sourceforge.net/apidocs/org/joda/time/format/DateTimeFormat.html
Dropping Events
Oftentimes, you’ll have messages that you don’t care about and you’ll want to drop those. Best practice is to do coarse actions first, so you’ll want to compare and drop with a general conditional like:
filter {
if [message] =~ /File does not exist/ {
drop { }
}
grok {
...
...
You can also directly reference fields once you have grok’d the message
filter {
grok {
match => { "message" => "%{HOSTNAME:VHost}? %{COMBINEDAPACHELOG} %{IP:XForwardedFor}?"}
}
if [request] == "/status" {
drop { }
}
}
http://logstash.net/docs/1.4.2/configuration#conditionals
Dating Messages
By default, logstash date stamps the message when it sees them. However, there can be a delay between when an action happens and when it gets logged to a file. To remedy this - and allow you to suck in old files without the date on every event being the same - you add a date filter.
Note - you actually have to grok out the date into it’s own variable, you can’t just attempt to match on the whole message. The combined apache macro below does this for us.
filter {
grok {
match => { “message” => “%{HOSTNAME:VHost}? %{COMBINEDAPACHELOG} %{IP:XForwardedFor}?”}
}
date {
match => [ “timestamp” , “dd/MMM/yyyy:HH:mm:ss Z” ]
}
}
In the above case, ’timestamp’ is a parsed field and you’re using the date language to tell it what the component parts are
http://joda-time.sourceforge.net/apidocs/org/joda/time/format/DateTimeFormat.html
Sending to Multiple Servers
In addition to an elasticsearch server, you may want to send it to a syslog server at the same time.
input {
file {
path => "/var/pdweb/www-default/log/request.log"
sincedb_path => "/opt/logstash/sincedb"
}
}
filter {
grok {
match => [ "message", "%{HOSTNAME:VHost}? %{COMBINEDAPACHELOG} %{IP:XForwardedFor}?"]
}
date {
match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
}
}
output {
elasticsearch {
host => "some.server.private"
protocol => "http"
}
syslog {
host => "some.syslog.server"
port => "514"
severity => "notice"
facility => "daemon"
}
}
Deleting Sent Events
Sometimes you’ll accidentally send a bunch of event to the server and need to delete and resend corrected versions.
http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/indices-delete-mapping.html
curl -XDELETE <http://localhost:9200/_all/SOMEINDEX>
curl -XDELETE <http://localhost:9200/_all/SOMEINDEX?q=path:"/var/log/httpd/ssl_request_log>"
1.3.3 - Index Routing
When using logstash as a broker, you will want to route events to different indexes according to their type. You have two basic ways to do this;
- Using Mutates with a single output
- Using multiple Outputs
The latter is significantly better for performance. The less you touch the event, the better it seems. When testing these two different configs in the lab, the multiple output method was about 40% faster when under CPU constraint. (i.e. you can always add more CPU if you want to mutate the events.)
Multiple Outputs
input {
...
...
}
filter {
...
...
}
output {
if [type] == "RADIUS" {
elasticsearch {
hosts => ["localhost:9200"]
index => "logstash-radius-%{+YYYY.MM.dd}"
}
}
else if [type] == "RADIUSAccounting" {
elasticsearch {
hosts => ["localhost:9200"]
index => "logstash-radius-accounting-%{+YYYY.MM.dd}"
}
}
else {
elasticsearch {
hosts => ["localhost:9200"]
index => "logstash-test-%{+YYYY.MM.dd}"
}
}
}
Mutates
If your source system includes a field to tell you want index to place it in, you might be able to skip mutating altogether, but often you must look at the contents to make that determination. Doing so does reduce performance.
input {
...
...
}
filter {
...
...
# Add a metadata field with the destination index based on the type of event this was
if [type] == "RADIUS" {
mutate { add_field => { "[@metadata][index-name]" => "logstash-radius" } }
}
else if [type] == "RADIUSAccounting" {
mutate { add_field => { "[@metadata][index-name]" => "logstash-radius-accounting" } }
}
else {
mutate { add_field => { "[@metadata][index-name]" => "logstash-test" } }
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "%{[@metadata][index-name]}-%{+YYYY.MM.dd}"
}
}
https://www.elastic.co/guide/en/logstash/current/event-dependent-configuration.html#metadata
1.3.4 - Database Connections
You can connect Logstash to a database to poll events almost as easily as tailing a log file.
Installation
The JDBC plug-in ships with logstash so no installation of that is needed. However, you do need the JDBC driver for the DB in question.
Here’s an example for DB2, for which you can get the jar from either the server itself or the DB2 fix-pack associated with the DB Version you’re running. The elasticsearch docs say to just put it in your path. I’ve put it in the logstash folder (based on some old examples) and we’ll see if it survives upgrades.
sudo mkdir /opt/logstash/vendor/jars
sudo cp /home/gattis/db2jcc4.jar /opt/logstash/vendor/jars
sudo chown -R logstash:logstash /opt/logstash/vendor/jars
Configuration
Configuring the input
Edit the config file like so
sudo vim /etc/logstash/conf.d/logstash.conf
input {
jdbc {
jdbc_driver_library => "/opt/logstash/vendor/jars/db2jcc4.jar"
jdbc_driver_class => "com.ibm.db2.jcc.DB2Driver"
jdbc_connection_string => "jdbc:db2://db1.tim.private:50000/itimdb"
jdbc_user => "itimuser"
jdbc_password => "somePassword"
statement => "select * from someTable"
}
}
Filtering
You don’t need to do any pattern matching, as the input emits the event pre-parsed based on the DB columns. You may however, want to match a timestamp in the database.
# A sample value in the 'completed' column is 2016-04-07 00:41:03:291 GMT
filter {
date {
match => [ "completed" , "yyyy-MM-dd HH:mm:ss:SSS zzz" ]
}
}
Output
One recommended trick is to link the primary keys between the database and kibana. That way, if you run the query again you update the existing elasticsearch records rather than create duplicates ones. Simply tell the output plugin to use the existing primary key from the database for the document_id when it sends it to elasticsearch.
# Database key is the column 'id'
output {
elasticsearch {
hosts => ["10.17.153.1:9200"]
index => "logstash-db-%{+YYYY}"
document_id => "${id}"
type => "isim-process"
}
}
Other Notes
If any of your columns are non-string type, logstash and elasticsearch will happily store them as such. But be warned that kibana will round them to 16 digits due to a limitation of javascript.
https://github.com/elastic/kibana/issues/4356
Sources
https://www.elastic.co/blog/logstash-jdbc-input-plugin
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html
https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html
1.3.5 - Multiline Matching
Here’s an example that uses the multiline codec (preferred over the multiline filter, as it’s more appropriate when you might have more than one input)
input {
file {
path => "/opt/IBM/tivoli/common/CTGIM/logs/access.log"
type => "itim-access"
start_position => "beginning"
sincedb_path => "/dev/null"
codec => multiline {
pattern => "^<Message Id"
negate => true
what => previous
}
}
}
Getting a match can be difficult, as grok by default does not match against multiple lines. You can mutate to remove all the new lines, or use a seemingly secret preface, the ‘(?m)’ directive as shown below
filter {
grok {
match => { "message" => "(?m)(?<timestamp>%{YEAR}.%{MONTHNUM}.%{MONTHDAY} %{HOUR}:%{MINUTE}:%{SECOND}%{ISO8601_TIMEZONE})%{DATA}com.ibm.itim.security.%{WORD:catagory}%{DATA}CDATA\[%{DATA:auth}\]%{DATA}CDATA\[%{DATA:clientip}\]"}
}
https://logstash.jira.com/browse/LOGSTASH-509
1.4 - Beats
Beats are a family of lightweight shippers that you should consider as a first-solution for sending data to Elasticsearch. The two most common ones to use are:
Filebeat is used both for files, and for other general types, like syslog and NetFlow data.
Winlogbeat is used to load Windows events into Elasticsearch and works well with Windows Event Forwarding.
1.4.1 - Linux Installation
On Linux
A summary from the general docs. View and adjust versions as needed.
If you haven’t already added the repo:
apt-get install apt-transport-https
apt install gnupg2
wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | apt-key add -
echo "deb https://artifacts.elastic.co/packages/7.x/apt stable main" | tee -a /etc/apt/sources.list.d/elastic-7.x.list
apt update
apt install filebeat
systemctl enable filebeat
Filebeat uses a default config file at /etc/filebeat/filebeat.yml
. If you don’t want to edit that, you can use the ‘modules’ to configure it for you. That command will also load dashboard elements into Kibana, so you must have that already installed Kibana to make use of it.
Here’s a simple test
mv /etc/filebeat/filebeat.yml /etc/filebeat/filebeat.yml.orig
vi /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/*.log
output.file:
path: "/tmp/filebeat"
filename: filebeat
#rotate_every_kb: 10000
#number_of_files: 7
#permissions: 0600
1.4.2 - Windows Installation
Installation
Download the .zip version (the msi doesn’t include the server install script) from the URL below. Extract, rename to Filebeat and move it the to the c:\Program Files
directory.
https://www.elastic.co/downloads/beats/filebeat
Start an admin powershell, change to that directory and run the service install command. (Keep the shell up for later when done)
PowerShell.exe -ExecutionPolicy UnRestricted -File .\install-service-filebeat.ps1
Basic Configuration
Edit the filebeat config file.
You need to configure the input and output sections. The output is already set to elasticsearch localhost so you only have to change the input from the unix to the windows style.
paths:
#- /var/log/*.log
- c:\programdata\elasticsearch\logs\*
Test as per normal
./filebeat test config -e
Filebeat specific dashboards must be added to Kibana. Do that with the setup argument:
.\filebeat.exe setup --dashboards
To start Filebeat in the forrgound (to see any interesting messages)
If you’re happy with the results, you can stop the application then start the service
Ctrl-C
Start-Service filebeat
Adapted from the guide at
https://www.elastic.co/guide/en/beats/filebeat/7.6/filebeat-getting-started.html
1.4.3 - NetFlow Forwarding
The NetFlow protocol is now implemented in Filebeat. Assuming you’ve installed Filebeat and configured Elasticsearch and Kibana, you can use this input module to auto configure the inputs, indexes and dashboards.
./filebeat modules enable netflow
filebeat setup -e
If you are just testing and don’t want to add the full stack, you can set up the netflow input which the module is a wrapper for.
filebeat.inputs:
- type: netflow
max_message_size: 10KiB
host: "0.0.0.0:2055"
protocols: [ v5, v9, ipfix ]
expiration_timeout: 30m
queue_size: 8192
output.file:
path: "/tmp/filebeat"
filename: filebeat
Consider dropping all the fields you don’t care about as there are a lot of them. Use the include_fields
processor to limit what you take in
- include_fields:
fields: ["destination.port", "destination.ip", "source.port", "source.mac", "source.ip"]
1.4.4 - Palo Example
# This filebeat config accepts TRAFFIC and SYSTEM syslog messages from a Palo Alto,
# tags and parses them
# This is an arbitrary port. The normal port for syslog is UDP 512
filebeat.inputs:
- type: syslog
protocol.udp:
host: ":9000"
processors:
# The message field will have "TRAFFIC" for netflow logs and we can
# extract the details with a CSV decoder and array extractor
- if:
contains:
message: ",TRAFFIC,"
then:
- add_tags:
tags: "netflow"
- decode_csv_fields:
fields:
message: csv
- extract_array:
field: csv
overwrite_keys: true
omit_empty: true
fail_on_error: false
mappings:
source.ip: 7
destination.ip: 8
source.nat.ip: 9
network.application: 14
source.port: 24
destination.port: 25
source.nat.port: 26
- drop_fields:
fields: ["csv", "message"]
else:
# The message field will have "SYSTEM,dhcp" for dhcp logs and we can
# do a similar process to above
- if:
contains:
message: ",SYSTEM,dhcp"
then:
- add_tags:
tags: "dhcp"
- decode_csv_fields:
fields:
message: csv
- extract_array:
field: csv
overwrite_keys: true
omit_empty: true
fail_on_error: false
mappings:
message: 14
# The DHCP info can be further pulled apart using space as a delimiter
- decode_csv_fields:
fields:
message: csv2
separator: " "
- extract_array:
field: csv2
overwrite_keys: true
omit_empty: true
fail_on_error: false
mappings:
source.ip: 4
source.mac: 7
hostname: 10
- drop_fields:
fields: ["csv","csv2"] # Can drop message too like above when we have watched a few
- drop_fields:
fields: ["agent.ephemeral_id", "agent.hostname", "agent.id", "agent.type", "agent.version", "ecs.version","host.name","event.severity","input.type","hostname","log.source.address","syslog.facility", "syslog.facility_label", "syslog.priority", "syslog.priority_label","syslog.severity_label"]
ignore_missing: true
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: false
setup.template.settings:
index.number_of_shards: 1
output.elasticsearch:
hosts: ["localhost:9200"]
1.4.5 - RADIUS Forwarding
Here’s an example of sending FreeRADIUS logs to Elasticsearch.
cat /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: log
paths:
- /var/log/freeradius/radius.log
include_lines: ['\) Login OK','incorrect']
tags: ["radius"]
processors:
- drop_event:
when:
contains:
message: "previously"
- if:
contains:
message: "Login OK"
then:
- dissect:
tokenizer: "%{key1} [%{source.user.id}/%{key3}cli %{source.mac})"
target_prefix: ""
- drop_fields:
fields: ["key1","key3"]
- script:
lang: javascript
source: >
function process(event) {
var mac = event.Get("source.mac");
if(mac != null) {
mac = mac.toLowerCase();
mac = mac.replace(/-/g,":");
event.Put("source.mac", mac);
}
}
else:
- dissect:
tokenizer: "%{key1} [%{source.user.id}/<via %{key3}"
target_prefix: ""
- drop_fields:
fields: ["key1","key3"]
output.elasticsearch:
hosts: ["http://logcollector.yourorg.local:9200"]
allow_older_versions: true
setup.ilm.enabled: false
1.4.6 - Syslog Forwarding
You may have an older system or appliance that can transmit syslog data. You can use filebeat to accept that data and store it in Elasticsearch.
Install filebeat and test the reception the /tmp.
vi /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: syslog
protocol.udp:
host: ":9000"
output.file:
path: "/tmp"
filename: filebeat
sudo systemctl filebeat restart
pfSense Example
The instructions are NetGate’s remote logging example.
Status -> System Logs -> Settings
Enable and configure. Internet rumor has it that it’s UDP only so the config above reflects that. Interpreting the output requires parsing the message section detailed in the filter log format docs.
'5,,,1000000103,bge1.1099,match,block,in,4,0x0,,64,0,0,DF,17,udp,338,10.99.147.15,255.255.255.255,2048,30003,318'
'5,,,1000000103,bge2,match,block,in,4,0x0,,84,1,0,DF,17,udp,77,157.240.18.15,205.133.125.165,443,61343,57'
'222,,,1000029965,bge2,match,pass,out,4,0x0,,128,27169,0,DF,6,tcp,52,205.133.125.142,205.133.125.106,5225,445,0,S,1248570004,,8192,,mss;nop;wscale;nop;nop;sackOK'
'222,,,1000029965,bge2,match,pass,out,4,0x0,,128,11613,0,DF,6,tcp,52,205.133.125.142,211.24.111.75,15305,445,0,S,2205942835,,8192,,mss;nop;wscale;nop;nop;sackOK'
2 - Loki
Loki is a system for handling logs (unstructured data) but is lighter-weight than Elasticsearch. It also has fewer add-ons. But if you’re already using Prometheus and Grafana and you want to do it yourself, it can be a better solution.
Installation
Install Loki and Promtail together. These are available in the debian stable repos at current version. No need to go to backports or testing
sudo apt install loki promtail
curl localhost:3100/metrics
Configuration
Default config files are create in /etc/loki and /etc/promtail. Promtail is tailing /var/log/*log file, pushing them to localhost loki on the default port (3100) and loki is saving data in the /tmp directory. This is fine for testing.
Promtail runs as the promtail user (not root) and can’t read anything useful, so add them to the adm group.
sudo usermod -a -G adm promtail
sudo systemctl restart promtail
Grafana Integration
In grafana, add a datasource.
Configuration –> Add new data source –> Loki
Set the URL to http://localhost:3100
Then view the logs
Explore –> Select label (filename) –> Select value (daemon)
Troubleshooting
error notifying frontend about finished query
Edit the timeout setting in your loki datasource. The default may be too short so set it to 30s or some such
Failed to load log volume for this query
If you added a logfmt parser like the gui suggested, you may find not all your entries can be parsed, leading to this error.:w
3 - Network Traffic
Recoding traffic on the network is critical for troubleshooting and compliance. For the latter, the most common strategy is to record the “flows”. These are the connections each host makes or accepts, and how much data is involved.
You can collect this information at the LAN on individual switches, but the WAN (at the router) is usually more important. And if the router is performing NAT, it’s the only place to record the mappings of internal to external IPs and ports.
Network Log System
A network flow log system usually has three main parts.
Exporter --> Collector --> Analyzer
The Exporter, which records the data, the Collector, which is where the data is stored, and the Analyzer which makes the data more human-readable.
Example
We’ll use a Palo Alto NG Firewall as our exporter, and an Elasticsearch back-end. The data we are collecting is essentially log data, and Elasticsearch is probably the best at handling unstructured information.
At small scale, you can combine all of the the collection and analysis parts on a single system. We’ll use windows servers in our example as well.
graph LR
A(Palo)
B(Beats)
C(ElasticSearch)
D(Kibana)
subgraph Exporter
A
end
subgraph Collector and Analyzer
B --> C --> D
end
A --> B
Installation
Start with Elasticsearch and Kibana, then install Beats.
Configuration
Beats and Palo have a couple of protocols in common. NetFlow is the traditional protocol, but when you’re using NAT the best choice is the syslog protocol as the Palo will directly tell you the NAT info all in one record and you don’t have to correlate multiple interface flows to see who did what.
Beats
On the Beats server, start an admin powershell session, change to the Filebeat directory, edit the config file and restart the server.
There is a bunch of example text in the config so tread carefully and keep in mind that indentation matters. Stick this block right under the filebeat.inputs:
line and you should be OK.
This config stanza has a processor block that decodes the CVS content sent over in the message field, extracts a few select fields, then discards the rest. There’s quite a bit left over though, so see tuning below if you’d like to reduce the data load even more.
cd "C:\Program Files\Filebeat"
write.exe filebeat.yml
filebeat.inputs:
- type: syslog
protocol.udp:
host: ":9000"
processors:
- decode_csv_fields:
fields:
message: csv
- extract_array:
field: csv
overwrite_keys: true
omit_empty: true
fail_on_error: false
mappings:
source.ip: 7
destination.ip: 8
source.nat.ip: 9
network.application: 14
source.port: 24
destination.port: 25
source.nat.port: 26
- drop_fields:
fields: ["csv", "message"]
A larger is example is under the beats documentation.
Palo Alto Setup
Perform steps 1 and 2 of the Palo setup guide with the notes below.
https://docs.paloaltonetworks.com/pan-os/10-0/pan-os-admin/monitoring/use-syslog-for-monitoring/configure-syslog-monitoring
- In step 1 - The panw module defaults to 9001
- In step 2 - Make sure to choose Traffic as the type of log
Tuning
You can reduce the amount of data even more by adding a few more Beats directives.
# At the very top level of the file, you can add this processor to affect global fields
processors:
- drop_fields:
fields: ["agent.ephemeral_id","agent.id","agent.hostname","agent.type","agent.version","ecs.version","host.name"]
# You can also drop syslog fields that aren't that useful (you may need to put this under the syslog input)
- drop_fields:
fields: ["event.severity","input.type","hostname","syslog.facility", "syslog.facility_label", "syslog.priority", "syslog.priority_label","syslog.severity_label"]
You may want even more data. See the Full Palo Syslog data to see what’s available. An example
Conclusion
At this point you can navigate to the Kibana web console and explore the logs. There is no dashboard as this is just for log retention and covers the minimum required. If you’re interested in more, check out the SIEM and Netflow dashboards Elasticsearch offers.
Sources
Palo Shipping
https://docs.logz.io/shipping/security-sources/palo-alto-networks.html
4 - NXLog
This info on NXLog is circa 2014 - use with caution.
NXLog is best used when Windows Event Forwarding can’t be and filebeats isn’t sufficient.
Background
There are several solutions for capturing logs in Windows, but NXLog has some advantages;
- Cross-platform and Open Source
- Captures windows events pre-parsed
- Native windows installer and service
You could just run logstash everywhere. But in practice, Logstash’s memory requirements are several times NXLog and not everyone likes to install java everywhere.
Deploy on Windows
Download from http://nxlog.org/download. This will take you to the sourceforge site and the MSI you can install from. This installation is clean and the service installs automatically.
NXLog uses a config file with blocks in the basic pattern of:
- Input Block
- Output Block
- Routing Block
The latter being what ties together your inputs and outputs. You start out with one variable, called the $raw_event with everything in it. As you call modules, that variable gets parsed out to more useful individual variables.
Event Viewer Example
Here’s an example of invoking the module that pulls in data from the windows event log entries associated.
- Navigate to C:\Program Files (x86)\nxlog\conf
- Edit the security settings on the file nxlog.conf. Change the ‘Users’ to have modify rights. This allows you to actually edit the config file.
- Open that file in notepad and simply change it to look like so
# Set the ROOT to the folder your nxlog was installed into
define ROOT C:\Program Files (x86)\nxlog
## Default required locations based on the above
Moduledir %ROOT%\modules
CacheDir %ROOT%\data
Pidfile %ROOT%\data\nxlog.pid
SpoolDir %ROOT%\data
LogFile %ROOT%\data\nxlog.log
# Increase to DEBUG if needed for diagnosis
LogLevel INFO
# Input the windows event logs
<Input in>
Module im_msvistalog
</Input>
# Output the logs to a file for testing
<Output out>
Module om_file
File "C:/Program Files (x86)/nxlog/data/log-test-output.txt"
</Output>
# Define the route by mapping the input to an output
<Route 1>
Path in => out
</Route>
With any luck, you’ve now got some lines in your output file.
# Set the ROOT to the folder your nxlog was installed into
define ROOT C:\Program Files (x86)\nxlog
## Default required locations based on the above
Moduledir %ROOT%\modules
CacheDir %ROOT%\data
Pidfile %ROOT%\data\nxlog.pid
SpoolDir %ROOT%\data
LogFile %ROOT%\data\nxlog.log
# Increase to DEBUG if needed for diagnosis
LogLevel INFO
# Input a test file
<Input in>
Module im_file
File ""C:/Program Files (x86)/nxlog/data/test-in.txt"
SavePos FALSE
ReadFromLast FALSE
</Input>
# Output the logs to a file for testing
<Output out>
Module om_file
File "C:/Program Files (x86)/nxlog/data/log-test-output.txt"
</Output>
# Define the route by mapping the input to an output
<Route 1>
Path in => out
</Route>
Sending Events to a Remote Logstash Receiver
To be useful, you need to send your logs somewhere. Here’s an example of sending them to a Logstash receiver.
# Set the ROOT to the folder your nxlog was installed into
define ROOT C:\Program Files (x86)\nxlog
## Default required locations based on the above
Moduledir %ROOT%\modules
CacheDir %ROOT%\data
Pidfile %ROOT%\data\nxlog.pid
SpoolDir %ROOT%\data
LogFile %ROOT%\data\nxlog.log
# Increase to DEBUG if needed for diagnosis
LogLevel INFO
# Load the JSON module needed by the output module
<Extension json>
Module xm_json
</Extension>
# Input the windows event logs
<Input in>
Module im_msvistalog
</Input>
# Output the logs out using the TCP module, convert to JSON format (important)
<Output out>
Module om_tcp
Host some.server
Port 6379
Exec to_json();
</Output>
# Define the route by mapping the input to an output
<Route 1>
Path in => out
</Route>
Restart the service in the windows services, and you are in business.
Note about JSON
You’re probably shipping logs to a logstash broker (or similar json based tcp receiver). In that case, make sure to specify JSON on the way out, as in the example above or you’ll spend hours trying to figure out why you’re getting a glob of plain txt and loose all the pre-parsed windows event messages which are nearly impossible to parse back from plain text.
Using that to_json() will replace the contents. The variable we mentioned earlier, $raw_event, with all of the already parsed fields. If you hand’t invoked a module to parse that data out, you’d just get a bunch of empty events as the data was replaced with a bunch of nothing.
4.1 - Drop Events
Exec
You can use the ‘Exec’ statement in any block and some pattern matching to drop events you don’t care about.
<Input in>
Module im_file
File "E:/Imports/get_accessplans/log-test.txt"
Exec if $raw_event =~ /someThing/ drop();
</Input>
Or the inverse, with the operator !~
Dropping Events with pm_pattern
The alternative is the patternDB approach as it has some parallelization advantages you’ll read about in the docs should you dig into it further. This matters when you have lots of patterns to check against.
# Set the ROOT to the folder your nxlog was installed into
define ROOT C:\Program Files (x86)\nxlog
## Default required locations based on the above
Moduledir %ROOT%\modules
CacheDir %ROOT%\data
Pidfile %ROOT%\data\nxlog.pid
SpoolDir %ROOT%\data
LogFile %ROOT%\data\nxlog.log
# Increase to DEBUG if needed for diagnosis
LogLevel INFO
# Load the JSON module needed by the output module
<Extension json>
Module xm_json
</Extension>
# Input the windows event logs
<Input in>
Module im_msvistalog
</Input>
# Process log events
<Processor pattern>
Module pm_pattern
PatternFile %ROOT%/conf/patterndb.xml
</Processor>
# Output the logs out using the TCP module, convert to JSON format (important)
<Output out>
Module om_tcp
Host some.server
Port 6379
Exec to_json();
</Output>
# Define the route by mapping the input to an output
<Route 1>
Path in => pattern => out
</Route>
And create an XML file like so:
<?xml version="1.0" encoding="UTF-8"?>
<patterndb>
<group>
<name>eventlog</name>
<id>1</id>
<pattern>
<id>2</id>
<name>500s not needed</name>
<matchfield>
<name>EventID</name>
<type>exact</type>
<value>500</value>
</matchfield>
<exec>drop();</exec>
</pattern>
</group>
</patterndb>
4.2 - Event Log
Limiting Log Messages
You may not want ALL the event logs. You can add a query to that module however, and limit logs to the security logs, like so
<Input in>
Module im_msvistalog
Query <QueryList><Query Id="0" Path="Security"><Select Path="Security">*</Select></Query></QueryList>
</Input>
You can break that into multiple lines for easier reading by escaping the returns. Here’s an example that ships the ADFS Admin logs.
<Input in>
Module im_msvistalog
Query <QueryList>\
<Query Id="0">\
<Select Path='AD FS 2.0/Admin'>*</Select>\
</Query>\
</QueryList>
</Input>
Pulling out Custom Logs
If you’re interested in very specific logs, you can create a custom view in the windows event viewer, and after selecting the criteria in with the graphical tool, click on the XML tab to see what the query is. For example, to ship all the ADFS 2 logs (assuming you’ve turned on auditing) Take the output of the XML tab (shown below) and modify to be compliant with the nxlog format.
<QueryList>
<Query Id="0" Path="AD FS 2.0 Tracing/Debug">
<Select Path="AD FS 2.0 Tracing/Debug">*[System[Provider[@Name='AD FS 2.0' or @Name='AD FS 2.0 Auditing' or @Name='AD FS 2.0 Tracing']]]</Select>
<Select Path="AD FS 2.0/Admin">*[System[Provider[@Name='AD FS 2.0' or @Name='AD FS 2.0 Auditing' or @Name='AD FS 2.0 Tracing']]]</Select>
<Select Path="Security">*[System[Provider[@Name='AD FS 2.0' or @Name='AD FS 2.0 Auditing' or @Name='AD FS 2.0 Tracing']]]</Select>
</Query>
</QueryList
Here’s the query from a MS NPS
<QueryList>
<Query Id="0" Path="System">
<Select Path="System">*[System[Provider[@Name='NPS']]]</Select>
<Select Path="System">*[System[Provider[@Name='HRA']]]</Select>
<Select Path="System">*[System[Provider[@Name='Microsoft-Windows-HCAP']]]</Select>
<Select Path="System">*[System[Provider[@Name='RemoteAccess']]]</Select>
<Select Path="Security">*[System[Provider[@Name='Microsoft-Windows-Security-Auditing'] and Task = 12552]]</Select>
</Query>
</QueryList>
4.3 - Input File Rotation
NXLog has decent ability to rotate it’s own output files, but it’s doesn’t come with a lot of methods to rotate input files - i.e. your reading in Accounting logs from a windows RADIUS and it would be nice to archive those with NXLog, because Windows won’t do it. You could bust out some perl (if you’re on unix) and use the xm_perl module, but there’s a simpler way.
On windows, the solution is to use an exec block with a scheduled command. The forfiles
executable is already present in windows and does the trick. The only gotcha is that ALL the parameters must be delimited like below.
So the command
forfiles /P "E:\IAS_Logs" /D -1 /C "cmd /c move @file \\server\share"
Becomes
<Extension exec>
Module xm_exec
<Schedule>
When @daily
Exec exec('C:\Windows\System32\forfiles.exe','/P','"E:\IAS_Logs"','/D','-1','/C','"cmd','/c','move','@file','\\server\share"');
</Schedule>
</Extension>
A slightly more complex example with added compression and removal of old files (there isn’t a great command line zip utility for windows in advance of powershell 5)
# Add log rotation for the windows input files
<Extension exec>
Module xm_exec
<Schedule>
When @daily
# Make a compressed copy of .log files older than 1 day
Exec exec('C:\Windows\System32\forfiles.exe','/P','"E:\IAS_Logs"','/M','*.log','/D','-1','/C','"cmd','/c','makecab','@file"')
# Delete original files after 2 days, leaving the compressed copies
Exec exec('C:\Windows\System32\forfiles.exe','/P','"E:\IAS_Logs"','/M','*.log','/D','-2','/C','"cmd','/c','del','@file"')
# Move compressed files to the depot after 2 days
Exec exec('C:\Windows\System32\forfiles.exe','/P','"E:\IAS_Logs"','/M','*.lo_','/D','-2','/C','"cmd','/c','move','@file','\\shared.ohio.edu\appshare\radius\logs\radius1.oit.ohio.edu"');
</Schedule>
</Extension>
The @daily runs right at 0 0 0 0 0 (midnight every night). Check the manual for more precise cron controls
4.4 - Inverse Matching
You can use the ‘Exec’ statement to match inverse like so
<Input in>
Module im_file
File "E:/Imports/get_accessplans/log-test.txt"
Exec if $raw_event !~ /someThing/ drop();
</Input>
However, when you’re using a pattern db this is harder as the REGEXP doesn’t seem to honor inverses like you’d expect. Instead, you must look for matches in your pattern db like normal;
<?xml version="1.0" encoding="UTF-8"?>
<patterndb>
<group>
<name>eventlog</name>
<id>1</id>
<pattern>
<id>2</id>
<name>Identify user login success usernames</name>
<matchfield>
<name>EventID</name>
<type>exact</type>
<value>501</value>
</matchfield>
<matchfield>
<name>Message</name>
<type>REGEXP</type>
<value>windowsaccountname \r\n(\S+)</value>
<capturedfield>
<name>ADFSLoginSuccessID</name>
<type>STRING</type>
</capturedfield>
</matchfield
</pattern>
</group>
</patterndb>
Then, add a section to your nxlog.conf to take action when the above capture field doesn’t existing (meaning there wasn’t a regexp match).
...
# Process log events
<Processor pattern>
Module pm_pattern
PatternFile %ROOT%/conf/patterndb.xml
</Processor>
# Using a null processor just to have a place to put the exec statement
<Processor filter>
Module pm_null
Exec if (($EventID == 501) and ($ADFSLoginSucccessID == undef)) drop();
</Processor>
# Output the logs out using the TCP module, convert to JSON format (important)
<Output out>
Module om_tcp
Host some.server
Port 6379
Exec to_json();
</Output>
# Define the route by mapping the input to an output
<Route 1>
Path in => pattern => filter => out
</Route>
4.5 - Logstash Broker
When using logstash as a Broker/Parser to receive events from nxlog, you’ll need to explicitly tell it that the message is in json format with a filter, like so:
input {
tcp {
port => 6379
type => "WindowsEventLog"
}
}
filter {
json {
source => message
}
}
output {
stdout { codec => rubydebug }
}
4.6 - Manipulating Data
Core Fields
NXLog makes and handful of attributes about the event available to you. Some of these are from the ‘core’ module
$raw_event
$EventReceivedTime
$SourceModuleName
$SourceModuleType
Additional Fields
These are always present and added to by the input module or processing module you use. For example, the mseventlog module adds all the attributes from the windows event logs as attributes to the nxlog event. So your event contains:
$raw_event
$EventReceivedTime
$SourceModuleName
$SourceModuleType
$Message
$EventTime
$Hostname
$SourceName
$EventID
...
You can also create new attributes by using a processing module, such as parsing an input file’s XML. This will translate all the tags (within limites) into attributes.
<Extension xml>
Module xm_xml
</Extension>
<Input IAS_Accounting_Logs>
Module im_file
File "E:\IAS_Logs\IN*.log"
Exec parse_xml();
</Input>
And you can also add an Exec at any point to create or replace new attribute as desired
<Input IAS_Accounting_Logs>
Module im_file
File "E:\IAS_Logs\IN*.log"
Exec $type = "RADIUSAccounting";
</Input>
Rewriting Data
Rather than manipulate everything in the input and output modules, use the pm_null module to group a block together.
<Processor rewrite>
Module pm_null
Exec parse_syslog_bsd();\
if $Message =~ /error/ \
{\
$SeverityValue = syslog_severity_value("error");\
to_syslog_bsd(); \
}
</Processor>
<Route 1>
Path in => rewrite => fileout
</Route>
4.7 - NPS Example
define ROOT C:\Program Files (x86)\nxlog
Moduledir %ROOT%\modules
CacheDir %ROOT%\data
Pidfile %ROOT%\data\nxlog.pid
SpoolDir %ROOT%\data
LogFile %ROOT%\data\nxlog.log
# Load the modules needed by the outputs
<Extension json>
Module xm_json
</Extension>
<Extension xml>
Module xm_xml
</Extension>
# Inputs. Add the field '$type' so the receiver can easily tell what type they are.
<Input IAS_Event_Logs>
Module im_msvistalog
Query \
<QueryList>\
<Query Id="0" Path="System">\
<Select Path="System">*[System[Provider[@Name='NPS']]]</Select>\
<Select Path="System">*[System[Provider[@Name='HRA']]]</Select>\
<Select Path="System">*[System[Provider[@Name='Microsoft-Windows-HCAP']]]</Select>\
<Select Path="System">*[System[Provider[@Name='RemoteAccess']]]</Select>\
<Select Path="Security">*[System[Provider[@Name='Microsoft-Windows-Security-Auditing'] and Task = 12552]]</Select>\
</Query>\
</QueryList>
Exec $type = "RADIUS";
</Input>
<Input IAS_Accounting_Logs>
Module im_file
File "E:\IAS_Logs\IN*.log"
Exec parse_xml();
Exec $type = "RADIUSAccounting";
</Input>
# Output the logs out using the TCP module, convert to JSON format (important)
<Output broker>
Module om_tcp
Host 192.168.1.1
Port 8899
Exec to_json();
</Output>
# Routes
<Route 1>
Path IAS_Event_Logs,IAS_Accounting_Logs => broker
</Route>
# Rotate the input logs while we're at it, so we don't need a separate tool
<Extension exec>
Module xm_exec
<Schedule>
When @daily
#Note - the Exec statement is one line but may appear wrapped
Exec exec('C:\Windows\System32\forfiles.exe','/P','"E:\IAS_Logs"','/D','-1','/C','"cmd','/c','move','@file','\\some.windows.server\share\logs\radius1"');
</Schedule>
</Extension>
4.8 - Parsing
You can also extract and set values with a pattern_db, like this;
(Note, nxlog uses perl pattern matching syntax if you need to look things up)
<?xml version="1.0" encoding="UTF-8"?>
<patterndb>
<group>
<name>ADFS Logs</name>
<id>1</id>
<pattern>
<id>2</id>
<name>Identify user login fails</name>
<matchfield>
<name>EventID</name>
<type>exact</type>
<value>111</value>
</matchfield>
<matchfield>
<name>Message</name>
<type>REGEXP</type>
<value>LogonUser failed for the '(\S+)'</value>
<capturedfield>
<name>ADFSLoginFailUsername</name>
<type>STRING</type>
</capturedfield>
</matchfield>
<set>
<field>
<name>ADFSLoginFail</name>
<value>failure</value>
<type>string</type>
</field>
</set>
</pattern>
And a more complex example, where we’re matching against a sting like:
2015-03-03T19:45:03 get_records 58 DailyAddAcct completed (Success) with: 15 Records Processed 0 adds 0 removes 0 modified 15 unchanged
<?xml version="1.0" encoding="UTF-8"?>
<patterndb>
<group>
<name>Bbts Logs</name>
<id>1</id>
<pattern>
<id>2</id>
<name>Get TS Records</name>
<matchfield>
<name>raw_event</name>
<type>REGEXP</type>
<value>^(\S+) get_record (\S+)\s+(\S+) completed \((\S+)\) with: (\S+) Records Processed (\S+) adds (\S+) removes (\S+) modified (\S+) unchanged</value>
<capturedfield>
<name>timestamp</name>
<type>STRING</type>
</capturedfield>
<capturedfield>
<name>Transaction_ID</name>
<type>STRING</type>
</capturedfield>
<capturedfield>
<name>Job_Subtype</name>
<type>STRING</type>
</capturedfield>
<capturedfield>
<name>Job_Status</name>
<type>STRING</type>
</capturedfield>
<capturedfield>
<name>Record_Total</name>
<type>STRING</type>
</capturedfield>
<capturedfield>
<name>Record_Add</name>
<type>STRING</type>
</capturedfield>
<capturedfield>
<name>Record_Remove</name>
<type>STRING</type>
</capturedfield>
<capturedfield>
<name>Record_Mod</name>
<type>STRING</type>
</capturedfield>
<capturedfield>
<name>Record_NoChange</name>
<type>STRING</type>
</capturedfield>
</matchfield>
<set>
<field>
<name>Job_Type</name>
<value>Get_Records</value>
<type>string</type>
</field>
</set>
</pattern>
</group>
</patterndb>
4.9 - Reprocessing
Sometimes you have a parse error when you’re testing and you need to feed all your source files back in. Problem is you’re usually saving position and reading only new entries by default.
Defeat this by adding a line to the nxlog config so it starts reading files at the beginning and deleting the ConfigCache file (so there’s no last position to start from).
<Input IAS_Accounting_Logs>
Module im_file
ReadFromLast FALSE
File "E:\IAS_Logs\IN*.log"
Exec parse_xml();
Exec $type = "RADIUSAccounting";
</Input>
del C:\Program Files (x86)\nxlog\data\configcache.dat
Restart and it will begin reprocessing all the data. When you’re done, remove the ReadFromLast line and restart.
Note: If you had just deleted the cache file, nxlog would have resumed at the tail of the file. You could have told it not to save position, but you actually do want that for when you’re ready to resume normal operation.
https://www.mail-archive.com/[email protected]/msg00158.html
4.10 - Syslog
There are two components; adding the syslog module and adding the export path.
<Extension syslog>
Module xm_syslog
</Extension>
<Input IAS_Accounting_Logs>
Module im_file
File "E:\IAS_Logs\IN*.log"
Exec $type = "RADIUSAccounting";
</Input>
<Output siem>
Module om_udp
Host 192.168.1.1
Port 514
Exec to_syslog_ietf();
</Output>
<Route 1>
Path IAS_Accounting_Logs => siem
</Route>
4.11 - Troubleshooting
NXLOG Couldn’t read next event
If you see this error message from nxlog:
ERROR Couldn't read next event, corrupted eventlog?; The data is invalid.
Congrats - you’ve hit a bug.
https://nxlog.org/support-tickets/immsvistalog-maximum-event-log-count-support
The work-around is to limit your log event subscriptions on the input side by using a query. Example:
<Input in>
Module im_msvistalog
Query <QueryList><Query Id="0" Path="Microsoft-Windows-PrintService/Operational"><Select Path="Microsoft-Windows-PrintService/Operational">*</Select></Query></QueryList>
Exec if $EventID != 307 drop();
Exec $type = "IDWorks";
</Input>
Parse failure on windows to logstash
We found that nxlog made for the best windows log-shipper. But it didn’t seem to parse the events in the event log. Output to logstash seemed not to be in json format, and we confirmed this by writing directly to disk. This happens even though the event log input module explicitly emits the log attributes atomically.
Turns out you have to explicitly tell the output module to use json. This isn’t well documented.
4.12 - UNC Paths
When using Windows UNC paths, don’t forget that the backslash is also used for escaping characters, so the path
\\server\radius
looks like
\\server;adius
in your error log message. You’ll want to escape your back slashes like this;
\\\\server\\radius\\file.log
4.13 - Unicode Normalization
Files you’re reading may be any character set and this can cause strange things when you modify or pass the data on, as an example at stack exchange shows. This isn’t a problem with windows event logs, but windows applications use several different types of charsets.
Best practice is to convert everything to UTF-8. This is especially true when invoking modules such as json, that don’t handle other codes well.
NXLog has the ability to convert and can even to this automatically. However, there is some room for error. If you can, identity what the encoding is by looking at it in a hex editor and comparing to MS’s identification chart.
Here’s an snippet of a manual conversion of a powershell generated log. Having looked at the first part and identified it as UTF-16LE
...
<Extension charconv>
Module xm_charconv
AutodetectCharsets utf-8, utf-16, utf-32, iso8859-2, ucs-2le
</Extension>
<Input in1>
Module im_file
File "E:/Imports/log.txt"
Exec $raw_event = convert($raw_event,"UTF-16LE","UTF-8");
</Input>
...
Notice however that the charconv
module has an automatic directive. You can use that as long as what you have is included as marked in bold here.
<Extension charconv>
Module xm_charconv
AutodetectCharsets utf-8, utf-16, utf-16le, utf-32, iso8859-2
</Extension>
<Input sql-ERlogs>
Module im_file
File 'C:\Program Files\Microsoft SQL Server\MSSQL11.SQL\MSSQL\Log\ER*'
ReadFromLast TRUE
Exec convert_fields("AUTO", "utf-8");
</Input>
If you’re curious what charsets are supported, you can type this command in any unix system to see the names.
iconv -i
4.14 - Windows Files
Windows uses UTF-16 by default. Other services may use derivations thereof. In any event, it’s recommended to normalize things to UTF-8. Here’s a good example of what will happen if you don’t;
<http://stackoverflow.com/questions/27596676/nxlog-logs-are-in-unicode-charecters>
The answer to that question is to use the specific code field, as “AUTO” doesn’t seem to detect properly.
<Input in>
Module im_file
File "E:/Imports/get_accessplans/log-test.txt"
Exec if $raw_event == '' drop();
Exec $Event = convert($raw_event,"UCS-2LE","UTF-8"); to_json();
SavePos FALSE
ReadFromLast FALSE
</Input>
From the manual on SQL Server
Microsoft SQL Server
Microsoft SQL Server stores its logs in UTF-16 encoding using a line-based format.
It is recommended to normalize the encoding to UTF-8. The following config snipped
will do that.
<Extension _charconv>
Module xm_charconv
</Extension>
<Input in>
Module im_file
File "C:\\MSSQL\\ERRORLOG"
Exec convert_fields('UCS-2LE','UTF-8'); if $raw_event == '' drop();
</Input>
As of this writing, the LineBased parser, the default InputType for im_file
is not able to properly read the double-byte UTF-16 encoded files and will read
an additional empty line (because of the double-byte CRLF). The above drop() call is intended to fix this.
convert_fields('UTF-16','UTF-8');
might also work instead of UCS-2LE.
5 - Windows Event Forwarding
If you’re in a Windows shop, this is the best way to keep the Windows admins happy. No installation of extra tools. ‘Keeps it in the MS family’ so to speak.
Configure your servers to push logs to a cental location and use a client there, to send it on. Beats works well for this.
The key seems to be
- Create a domain service account or add the machine account
- add that to the group on the client
check the runtime status on the collector
For printing, in Event Viewer navigate to Microsoft-Windows-PrintService/Operational and enable it as its not on by default.
Make sure to enable for latency or you’ll spend a long time wondering why there is no data.
Sources
https://hackernoon.com/the-windows-event-forwarding-survival-guide-2010db7a68c4
https://www.ibm.com/docs/en/netcoolomnibus/8?topic=acquisition-forwarded-event-log
https://www.youtube.com/watch?v=oyPuRE51k3o&t=158s