When a friend of the learning group asked about the command line operation audit scheme of the online Linux host, he gave a simple solution to use rsyslog + ElasticSearch, and did not explain the details of the solution. Unfortunately, I set up a flag that I would push it in the next update of the article on the public account. After nearly 2 months, TODAY I finally came to fill the pit I dug before. First of all, when it comes to Linux’s operational auditing requirements, most of what we want is to restore the command line that was executed when the online server was artificially (mistakenly) manipulated, and its associated context. This requirement scenario is in line with general business log collection, as simple as sending content directly to syslog through history, and more complex as capturing behavior at the kernel level using AUDITd or EBPF. However, this paper does not intend to explain the principle of the above scheme, but only completes 80% of the daily (80% of the data sources? I don’t know, some sort of 80-20) operational audit. Since the title of this article is written in Shell, today’s topic is related to Bash.

To summarize today’s topic: add log auditing with custom Bash sources, send user actions to Rsyslog aggregation, and finally do log storage and query in ElasticSearch.

Linux part

  1. Get the necessary tools
  • Rsyslog: a log processing service on Linux that is compatible with the syslog syntax
  • Jq: A gadget that works with JSON data under the shell
  • Logger: A tool for logging to syslog

With the exception of JQ, most operating system distributions ship with these widgets, or you can install them directly using the operating system’s built-in package management tools.

  1. Customize bash.audit.sh and copy it to /etc/profile.d/
if [ "${SHELL##*/}"! ="bash" ]; then
  return
fi

if [ "${AUDIT_READY}" = "yes" ]; then
    return
fi

declare -rx HISTFILE="$HOME/.bash_history"
declare -rx HISTSIZE=500000
declare -rx HISTFILESIZE=500000
declare -rx HISTCONTROL=""
declare -rx HISTIGNORE=""
declare -rx HISTCMD
declare -rx AUDIT_READY="yes"

shopt -s histappend
shopt -s cmdhist
shopt -s histverify

if shopt -q login_shell && [ -t 0 ]; then
  stty -ixon
fi

if groups | grep -q root; then
  declare -x TMOUT=86400
  # chattr +a "$HISTFILE"
fi

declare -a LOGIN_INFO=( $(who -mu | awk '{print $1,$2,$6}'))declare -rx AUDIT_LOGINUSER="${LOGIN_INFO[0]}"
declare -rx AUDIT_LOGINPID="${LOGIN_INFO[2]}"
declare -rx AUDIT_USER="$USER"
declare -rx AUDIT_PID="$$"
declare -rx AUDIT_TTY="${LOGIN_INFO[1]}"
declare -rx AUDIT_SSH="$([ -n "$SSH_CONNECTION" ] && echo "$SSH_CONNECTION" | awk '{print $1":"$2"- >"$3":"$4} ')"
declare -rx AUDIT_STR="$AUDIT_LOGINUSER  $AUDIT_LOGINPID  $AUDIT_TTY  $AUDIT_SSH"
declare -rx AUDIT_TAG=$(echo -n $AUDIT_STR | sha1sum |cut -c1-12)
declare -x AUDIT_LASTHISTLINE=""

set +o functrace
shopt -s extglob

function AUDIT_DEBUG() {
  if [ -z "$AUDIT_LASTHISTLINE" ]; then
    local AUDIT_CMD="$(fc -l -1 -1)"
    AUDIT_LASTHISTLINE="${AUDIT_CMD%%+([^ 0-9])*}"
  else
    AUDIT_LASTHISTLINE="$AUDIT_HISTLINE"
  fi
  local AUDIT_CMD="$(history 1)"
  AUDIT_HISTLINE="${AUDIT_CMD%%+([^ 0-9])*}"
  if [ "${AUDIT_HISTLINE:-0}" -ne "${AUDIT_LASTHISTLINE:-0}"] | | ["${AUDIT_HISTLINE:-0}" -eq "1" ]; then
    MESSAGE=$(jq -c -n \
	    --arg pwd "$PWD" \
	    --arg cmd "${AUDIT_CMD##*( )? (+ ([0-9])? (\ *) + ())}" \
	    --arg user "$AUDIT_LOGINUSER" \
	    --arg become "$AUDIT_USER" \
	    --arg pid "$AUDIT_PID" \
	    --arg info "${AUDIT_STR}" \
	    '{cmd: $cmd, user: $user, become: $become, pid: $pid, pwd: $pwd, info: $info}')
    logger -p local6.info -t "$AUDIT_TAG" "@cee: $MESSAGE"
  fi
}

function AUDIT_EXIT() {
  local AUDIT_STATUS="$?"
  if [ -n "$AUDIT_TTY" ]; then
    MESSAGE_CLOSED=$(jq -c -n \
        --arg action "session closed" \
        --arg user "$AUDIT_LOGINUSER" \
        --arg become "$AUDIT_USER" \
        --arg pid "$AUDIT_PID" \
        --arg info "${AUDIT_STR}" \
        '{user: $user, become: $become, pid: $pid, action: $action, info: $info}')
    logger -p local6.info -t "$AUDIT_TAG" "@cee: $MESSAGE_CLOSED"
  fi
  exit "$AUDIT_STATUS"
}

declare -frx +t AUDIT_DEBUG
declare -frx +t AUDIT_EXIT

if [ -n "$AUDIT_TTY" ]; then
  MESSAGE_OPENED=$(jq -c -n \
      --arg action "session opened" \
      --arg user "$AUDIT_LOGINUSER" \
      --arg become "$AUDIT_USER" \
      --arg pid "$AUDIT_PID" \
      --arg info "${AUDIT_STR}" \
      '{user: $user, become: $become, pid: $pid, action: $action, info: $info}')
  logger -p local6.info -t "$AUDIT_TAG" "@cee: $MESSAGE_OPENED"
fi

declare -rx PROMPT_COMMAND="[ -n "$AUDIT_DONE" ] && echo ''; AUDIT_DONE=; trap 'AUDIT_DEBUG && AUDIT_DONE=1; trap DEBUG' DEBUG"
declare -rx BASH_COMMAND
declare -rx SHELLOPT

trap AUDIT_EXIT EXIT

Copy the code

Briefly, this script defines the shell’s history entries, login timeouts, and the format and sending of audit logs.

  1. Configure the Rsyslog client and create one locally/etc/rsyslog.d/40-audit.confFile that will be used locallyLocal6 levelThe remote rsyslog service processes system logs in a centralized manner
$RepeatedMsgReduction off

local6.info @<>:514
& stop
Copy the code

After the configuration, do not forget to restart the Rsyslog service!

Data section

As the name implies, the data part is used to receive and process operating system logs sent by clients. Here we use rsyslog and ElasticSearch.

  1. Prepare rsyslog – elasticsearch

To get RSyslog to send logs to ElastichSearch, we have to install its ES module

# Ubuntu 
apt-get install -y rsyslog-elasticsearch rsyslog-mmjsonparse
   
#CentOS
yum install rsyslog-elasticsearch rsyslog-mmjsonparse
Copy the code
  1. The ElasticSearch service is available

For easy deployment, this article uses Docker directly to quickly pull up an ES service

Docker run -d --name elasticSearch -p 9200:9200 -p 9300:9300 -e "discovery. Type =single-node" ElasticSearch :7.3.1 docker run -d --name elasticSearch -p 9200:9200 -p 9300:9300 -e "DiscoveryCopy the code
  1. Configure the Rsyslog server and create a file/etc/rsyslog.d/40-audit-server.confIs used to define write policies for logs.
$RepeatedMsgReduction off

$ModLoad imudp
$UDPServerRun 514

module(load="mmjsonparse")          # for parsing CEE-enhanced syslog messages
module(load="omelasticsearch")      # for outputting to Elasticsearch

#try to parse a structured log

# this is for index names to be like: rsyslog-YYYY.MM.DD
template(name="rsyslog-index" type="string" string="bashaudit-%$YEAR% %.$MONTH% %.$DAY%")

# this is for formatting our syslog in JSON with @timestamp
template(name="json-syslog" type="list") {
    constant(value="{")
      constant(value="\"@timestamp\":\"")     property(name="timegenerated" dateFormat="rfc3339" date.inUTC="on")
      constant(value="\",\"host\":\"")        property(name="fromhost-ip")
      constant(value="\",\"severity\":\"")    property(name="syslogseverity-text")
      constant(value="\",\"facility\":\"")    property(name="syslogfacility-text")
      constant(value="\",\"program\":\"")     property(name="programname")
      constant(value="\",\"tag\":\"")         property(name="syslogtag" format="json")
      constant(value="\",")                   property(name="$!!! all-json" position.from="2")
    # closing brace is in all-json
}

if ($syslogfacility-text == 'local6' and $syslogseverity-text == 'info') then {
	action(type="mmjsonparse")
	action(type="omelasticsearch" template="json-syslog" searchIndex="rsyslog-index" dynSearchIndex="on" server="<your_elasticsearch_address>" serverport="<your_elasticsearch_port>")
        # action(type="omfile" file="/var/log/bashaudit.log")
        stop
}
Copy the code

Two MODULES of Rsyslog are used to process the collected logs

  • mmjsonparseUsed to format logs using JSON
  • omelasticsearchUsed to configure ElastichSearch

Restart the Rsyslog service

The query part

You can query audit logs using Kibana or secondary development using ElasticSearch API. Here we use Kibana as an example.

cat << EOF > ./kibana.yml server.port: 15601 elasticsearch.hosts: ["http://<your_elasticsearch_address>:<your_elasticsearch_port>"] i18n.locale: "zh-CN" EOF docker run -d --ulimit nofile=1000000:1000000 --net host --name elasticsearch-audit -v . / kibana. Yml: / usr/share/kibana/config/kibana yml, restart always docker. Elastic. Co/kibana/kibana - oss: 7.3.1Copy the code

Visit http://localhost:15601 locally to kibana configuration to create an index schema called Bashaudit

After that, we can enter Discover to query audit logs, including basic Shell execution time, source user, execution directory, and so on.

Further, we can also do some additional secondary development of the audit log by calling the API, for example:

  • Collect statistics on online server hotspot users
  • Do hotspot operation statistics for online servers
  • Alarms are generated for online dangerous Shell operations

conclusion

This article describes how to customize Bash to send subsequent command line operations to rsyslog service when users log in to initialize Shell, and store formatted logs in ElasticSearch to assist system administrators in online fault location. This can also be used for visual secondary development of Linux command line auditing.

However, this article’s approach to customizing Bash still has many limitations, such as:

  • Cannot audit execution logic in ShellScript;

  • There are other shells to bypass audit, such as ZSH, etc.

As you can see, there is more to auditing than Bash alone, so you can try using Snoopy to do a trace audit of the inside of a Shell script. I will update the audit scheme related to Snoopy/AUDITd when I have an opportunity later. Thanks for your continuous attention!


More content on “Cloud native Xiao Bai”