This article is reprinted from the public account StreamCloudNative. The author is Xue Song, who works at New Continent Software as a senior software engineer.

Edit: Chicken steaks, StreamNative.

About the Apache Pulsar

Apache Pulsar is the top project of Apache Software Foundation. It is the next generation cloud native distributed message flow platform, integrating message, storage and lightweight functional computing. It adopts the architecture design of computing and storage separation, supports multi-tenant, persistent storage, and multi-room cross-region data replication. It has strong consistency, high throughput, low latency, and high scalability.

At present, Apache Pulsar has been adopted by many large Internet and traditional industry companies at home and abroad, cases are distributed in artificial intelligence, finance, telecom operators, live and short video, Internet of Things, retail and e-commerce, online education and other industries, such as Comcast, Yahoo! , Tencent, China Telecom, China Mobile, BIGO, VIPKID, etc.

background

Apache Pulsar, as a cloud native distributed message system, includes Zookeeper, Bookie, broker, functions-worker, proxy and other components, and all components are deployed on multiple hosts in a distributed way. So that each component of the log files are scattered in multiple hosts, when components appear problem, because the log is scattered, want to check for an error message, individual service to each service to troubleshoot, more troublesome, our approach is usually directly to grep log files, such as the awk command can obtain desired information. However, as the volume of applications and services increases, so do the supported nodes, so the traditional method exposed many problems, such as: low efficiency, too much log archiving, text search is too slow, how to multi-dimensional query and so on. Therefore, we hope that by aggregating and monitoring the logs, we can quickly find the error information of each Pulsar service and quickly troubleshoot, making the operation and maintenance more purposeful, targeted and direct.

In order to solve the problem of log retrieval, our team considered using a centralized log collection system to collect, manage and access logs on all Pulsar nodes in a unified manner.

A complete centralized log system must contain the following main features:

  • Collect – Can collect log data from multiple sources;
  • Transfer – Can stably transfer log data to the central system;
  • Storage – How to store log data;
  • Analysis – can support UI analysis;
  • Warning – Provides error reporting and monitoring mechanisms.

ELK provides a complete set of solutions, and all of them are open source software, which cooperate with each other and perfectly connect with each other, and meet the application of many occasions efficiently. It is a mainstream log system at present. Our company has a self-developed big data management platform, through which we deploy and manage ELK, and have used ELK to provide support services for multiple business systems in the production system. ELK stands for three open source software: Elasticsearch, Logstash, and Kibana are all open-source software. The latest version has been renamed Elastic Stack and added Beats, including FileBeat, a lightweight log collection and processing tool. Filebeat consumes less resources. It is suitable for collecting logs from various servers and transferring them to Logstash.

As you can see in the figure above, there are two problems if Pulsar uses this log collection mode:

  • Hosts with Pulsar service deployed must have a Filebeat service deployed.
  • The Pulsar service logs must be dropped to the disk as a file, occupying IO of the host disk.

Log4j2 supports sending logs to Kafka by default. Kafka uses Log4j2Appender. You can configure the Log4j2 configuration file to send logs generated by Log4j2 to Kafka in real time.

As shown below:

Implementation process

Take Pulsar 2.6.2 as an example to introduce the detailed implementation process of Apache Pulsar based on Log4j2+Kafka+ELK to achieve fast log retrieval solution.

First, preparation

The first thing you need to determine is what fields are used to retrieve logs in Kibana, which can be aggregated and queried in multiple dimensions. Then, Elasticsearch splits words and creates indexes based on the retrieved fields.

As shown in the figure above, we set up 8 retrieval fields for Pulsar logs, namely: cluster name, host name, host IP, component name, log content, system time, log level, and cluster instance.

Ii. Implementation process

Note: In order to keep the structure of Pulsar’s native configuration files and scripts intact, we implemented this solution by adding new configuration files and scripts.

1. Add a configuration file

Add the following two configuration files to the {PULSAR_HOME}/conf directory:

1) Logenv.sh This file is used to configure the JVM options required for Pulsar component startup into the Pulsar service Java process, as shown in the following example:

KAFKA_CLUSTER = 192.168.0.1:9092192168 0.2:9092192168:0.2 9092 PULSAR_CLUSTER = PULSAR_CLUSTER PULSAR_TOPIC = PULSAR_TOPIC HOST_IP = 192.168.0.1 PULSAR_MODULE_INSTANCE_ID = 1Copy the code

The meanings of the above fields are:

  • KAFKA_CLUSTER: Kafka broker list address;
  • PULSAR_CLUSTER: cluster name of Pulsar;
  • PULSAR_TOPIC: the Topic in Kafka used to access the Pulsar service log;
  • HOST_IP: IP address of the Pulsar host.
  • PULSAR_MODULE_INSTANCE_ID: indicates the instance ID of the Pulsar service. Multiple Pulsar clusters may be deployed on a host.

2) log4j2 – kafka. Yaml

Yaml is copied from log4j2.yaml. Add the following changes to log4j2.yaml: (Note: In the following figure, log4j2.yaml is on the left and log4j2-kafka.yaml is on the right.)

  • Add Kafka cluster Broker list and define log4j2 to Kafka message log format. Elasticsearch uses Spaces to split the eight search fields in Kafka.

• Add kafka Appenders;

• add Failover;

• Modify Root and Logger of Loggers to asynchronous mode;

• The log4j2-kafka.yaml configuration file is complete as follows:

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#




Configuration:
  status: INFO
  monitorInterval: 30
  name: pulsar
  packages: io.prometheus.client.log4j2


  Properties:
    Property:
      - name: "pulsar.log.dir"
        value: "logs"
      - name: "pulsar.log.file"
        value: "pulsar.log"
      - name: "pulsar.log.appender"
        value: "RoutingAppender"
      - name: "pulsar.log.root.level"
        value: "info"
      - name: "pulsar.log.level"
        value: "info"
      - name: "pulsar.routing.appender.default"
        value: "Console"
      - name: "kafkaBrokers"
        value: "${sys:kafka.cluster}"
      - name: "pattern"
        value: "${sys:pulsar.cluster} ${sys:pulsar.hostname} ${sys:pulsar.hostip} ${sys:pulsar.module.type} ${sys:pulsar.module.instanceid} %date{yyyy-MM-dd HH:mm:ss.SSS} [%thread] [%c{10}] %level , %msg%n"


  # Example: logger-filter script
  Scripts:
    ScriptFile:
      name: filter.js
      language: JavaScript
      path: ./conf/log4j2-scripts/filter.js
      charset: UTF-8


  Appenders:


    #Kafka
    Kafka:
      name: "pulsar_kafka"
      topic: "${sys:pulsar.topic}"
      ignoreExceptions: "false"
      PatternLayout:
        pattern: "${pattern}"
      Property:
        - name: "bootstrap.servers"
          value: "${kafkaBrokers}"
        - name: "max.block.ms"
          value: "2000"


    # Console
    Console:
      name: Console
      target: SYSTEM_OUT
      PatternLayout:
        Pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"


    Failover:
      name: "Failover"
      primary: "pulsar_kafka"
      retryIntervalSeconds: "600"
      Failovers:
        AppenderRef:
          ref: "RollingFile"


    # Rolling file appender configuration
    RollingFile:
      name: RollingFile
      fileName: "${sys:pulsar.log.dir}/${sys:pulsar.log.file}"
      filePattern: "${sys:pulsar.log.dir}/${sys:pulsar.log.file}-%d{MM-dd-yyyy}-%i.log.gz"
      immediateFlush: false
      PatternLayout:
        Pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"
      Policies:
        TimeBasedTriggeringPolicy:
          interval: 1
          modulate: true
        SizeBasedTriggeringPolicy:
          size: 1 GB
      # Delete file older than 30days
      DefaultRolloverStrategy:
          Delete:
            basePath: ${sys:pulsar.log.dir}
            maxDepth: 2
            IfFileName:
              glob: "*/${sys:pulsar.log.file}*log.gz"
            IfLastModified:
              age: 30d


    Prometheus:
      name: Prometheus


    # Routing
    Routing:
      name: RoutingAppender
      Routes:
        pattern: "$${ctx:function}"
        Route:
          -
            Routing:
              name: InstanceRoutingAppender
              Routes:
                pattern: "$${ctx:instance}"
                Route:
                  -
                    RollingFile:
                      name: "Rolling-${ctx:function}"
                      fileName : "${sys:pulsar.log.dir}/functions/${ctx:function}/${ctx:functionname}-${ctx:instance}.log"
                      filePattern : "${sys:pulsar.log.dir}/functions/${sys:pulsar.log.file}-${ctx:instance}-%d{MM-dd-yyyy}-%i.log.gz"
                      PatternLayout:
                        Pattern: "%d{ABSOLUTE} %level{length=5} [%thread] [instance: %X{instance}] %logger{1} - %msg%n"
                      Policies:
                        TimeBasedTriggeringPolicy:
                          interval: 1
                          modulate: true
                        SizeBasedTriggeringPolicy:
                          size: "20MB"
                        # Trigger every day at midnight that also scan
                        # roll-over strategy that deletes older file
                        CronTriggeringPolicy:
                          schedule: "0 0 0 * * ?"
                      # Delete file older than 30days
                      DefaultRolloverStrategy:
                          Delete:
                            basePath: ${sys:pulsar.log.dir}
                            maxDepth: 2
                            IfFileName:
                              glob: "*/${sys:pulsar.log.file}*log.gz"
                            IfLastModified:
                              age: 30d
                  - ref: "${sys:pulsar.routing.appender.default}"
                    key: "${ctx:function}"
          - ref: "${sys:pulsar.routing.appender.default}"
            key: "${ctx:function}"


  Loggers:


    # Default root logger configuration
    AsyncRoot:
      level: "${sys:pulsar.log.root.level}"
      additivity: true
      AppenderRef:
        - ref: "Failover"
          level: "${sys:pulsar.log.level}"
        - ref: Prometheus
          level: info


    AsyncLogger:
      - name: org.apache.bookkeeper.bookie.BookieShell
        level: info
        additivity: false
        AppenderRef:
          - ref: Console


      - name: verbose
        level: info
        additivity: false
        AppenderRef:
          - ref: Console


    # Logger to inject filter script
#     - name: org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl
#       level: debug
#       additivity: false
#       AppenderRef:
#         ref: "${sys:pulsar.log.appender}"
#         ScriptFilter:
#           onMatch: ACCEPT
#           onMisMatch: DENY
#           ScriptRef:
#             ref: filter.js
Copy the code

Matters needing attention:

  • Log access must be asynchronous and must not affect service performance.
  • When a system with high response requirements is connected to a third-party system, it must rely on decoupling. In this case, the Failover Appender decouple the dependence on Kafka. When Kafka crashes, the log triggers a Failover and writes data to the local server.
  • The default value of log4j2 Failover appender retryIntervalSeconds is 1 minute, which is changed by exceptions. Therefore, you can increase the interval, for example, 10 minutes.
  • Kafka appender ignoreExceptions must be set to false otherwise Failover cannot be triggered.
  • When KafkaClient is down, an attempt to write to Kafka will take 1 minute to return Exception, after which Failover will be triggered. When the number of requests is large, the log4j2 queue will fill up quickly, and then log writing will block, seriously affecting the response of the main service. So make it short enough, make it long enough.

2. Add the script file

Add the following two scripts to {PULSAR_HOME}/bin: 1) pulsar-kafka This script file is copied from the pulsar script file. Add the following modifications to the pulsar script file: The figure below shows Pulsar on the left and Pulsar-kafka on the right.

• specify log4j2 – kafka. Yaml;

• Add content to read logenv.sh;

• Added OPTS option to pass JVM option to Java process when pulsar component is started from pulsar-kafka and pulsar-daemon-kafka scripts;

• The complete contents of the pulsar-kafka script file are as follows:

#!/usr/bin/env bash
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#


BINDIR=$(dirname "$0")
export PULSAR_HOME=`cd -P $BINDIR/..;pwd`


DEFAULT_BROKER_CONF=$PULSAR_HOME/conf/broker.conf
DEFAULT_BOOKKEEPER_CONF=$PULSAR_HOME/conf/bookkeeper.conf
DEFAULT_ZK_CONF=$PULSAR_HOME/conf/zookeeper.conf
DEFAULT_CONFIGURATION_STORE_CONF=$PULSAR_HOME/conf/global_zookeeper.conf
DEFAULT_DISCOVERY_CONF=$PULSAR_HOME/conf/discovery.conf
DEFAULT_PROXY_CONF=$PULSAR_HOME/conf/proxy.conf
DEFAULT_STANDALONE_CONF=$PULSAR_HOME/conf/standalone.conf
DEFAULT_WEBSOCKET_CONF=$PULSAR_HOME/conf/websocket.conf
DEFAULT_LOG_CONF=$PULSAR_HOME/conf/log4j2-kafka.yaml
DEFAULT_PULSAR_PRESTO_CONF=${PULSAR_HOME}/conf/presto


# functions related variables
FUNCTIONS_HOME=$PULSAR_HOME/pulsar-functions
DEFAULT_WORKER_CONF=$PULSAR_HOME/conf/functions_worker.yml
DEFAULT_JAVA_INSTANCE_JAR=$PULSAR_HOME/instances/java-instance.jar
JAVA_INSTANCE_JAR=${PULSAR_JAVA_INSTANCE_JAR:-"${DEFAULT_JAVA_INSTANCE_JAR}"}
DEFAULT_PY_INSTANCE_FILE=$PULSAR_HOME/instances/python-instance/python_instance_main.py
PY_INSTANCE_FILE=${PULSAR_PY_INSTANCE_FILE:-"${DEFAULT_PY_INSTANCE_FILE}"}
DEFAULT_FUNCTIONS_EXTRA_DEPS_DIR=$PULSAR_HOME/instances/deps
FUNCTIONS_EXTRA_DEPS_DIR=${PULSAR_FUNCTIONS_EXTRA_DEPS_DIR:-"${DEFAULT_FUNCTIONS_EXTRA_DEPS_DIR}"}
SQL_HOME=$PULSAR_HOME/pulsar-sql
PRESTO_HOME=${PULSAR_HOME}/lib/presto


# Check bookkeeper env and load bkenv.sh
if [ -f "$PULSAR_HOME/conf/bkenv.sh" ]
then
    . "$PULSAR_HOME/conf/bkenv.sh"
fi


# Check pulsar env and load pulser_env.sh
if [ -f "$PULSAR_HOME/conf/pulsar_env.sh" ]
then
    . "$PULSAR_HOME/conf/pulsar_env.sh"
fi


if [ -f "$PULSAR_HOME/conf/logenv.sh" ]
then
    . "$PULSAR_HOME/conf/logenv.sh"
fi


# Check for the java to use
if [[ -z $JAVA_HOME ]]; then
    JAVA=$(which java)
    if [ $? != 0 ]; then
        echo "Error: JAVA_HOME not set, and no java executable found in $PATH." 1>&2
        exit 1
    fi
else
    JAVA=$JAVA_HOME/bin/java
fi


# exclude tests jar
RELEASE_JAR=`ls $PULSAR_HOME/pulsar-*.jar 2> /dev/null | grep -v tests | tail -1`
if [ $? == 0 ]; then
    PULSAR_JAR=$RELEASE_JAR
fi


# exclude tests jar
BUILT_JAR=`ls $PULSAR_HOME/pulsar-broker/target/pulsar-*.jar 2> /dev/null | grep -v tests | tail -1`
if [ $? != 0 ] && [ ! -e "$PULSAR_JAR" ]; then
    echo "\nCouldn't find pulsar jar.";
    echo "Make sure you've run 'mvn package'\n";
    exit 1;
elif [ -e "$BUILT_JAR" ]; then
    PULSAR_JAR=$BUILT_JAR
fi


#
# find the instance locations for pulsar-functions
#


# find the java instance location
if [ ! -f "${JAVA_INSTANCE_JAR}" ]; then
    # didn't find a released jar, then search the built jar
    BUILT_JAVA_INSTANCE_JAR="${FUNCTIONS_HOME}/runtime-all/target/java-instance.jar"
    if [ -z "${BUILT_JAVA_INSTANCE_JAR}" ]; then
        echo "\nCouldn't find pulsar-functions java instance jar.";
        echo "Make sure you've run 'mvn package'\n";
        exit 1;
    fi
    JAVA_INSTANCE_JAR=${BUILT_JAVA_INSTANCE_JAR}
fi


# find the python instance location
if [ ! -f "${PY_INSTANCE_FILE}" ]; then
    # didn't find a released python instance, then search the built python instance
    BUILT_PY_INSTANCE_FILE="${FUNCTIONS_HOME}/instance/target/python-instance/python_instance_main.py"
    if [ -z "${BUILT_PY_INSTANCE_FILE}" ]; then
        echo "\nCouldn't find pulsar-functions python instance.";
        echo "Make sure you've run 'mvn package'\n";
        exit 1;
    fi
    PY_INSTANCE_FILE=${BUILT_PY_INSTANCE_FILE}
fi


# find pulsar sql presto distribution location
check_presto_libraries() {
    if [ ! -d "${PRESTO_HOME}" ]; then


        BUILT_PRESTO_HOME="${SQL_HOME}/presto-distribution/target/pulsar-presto-distribution"
        if [ ! -d "${BUILT_PRESTO_HOME}" ]; then
            echo "\nCouldn't find presto distribution.";
            echo "Make sure you've run 'mvn package'\n";
            exit 1;
        fi
        PRESTO_HOME=${BUILT_PRESTO_HOME}
    fi
}


pulsar_help() {
    cat <<EOF
Usage: pulsar <command>
where command is one of:


    broker              Run a broker server
    bookie              Run a bookie server
    zookeeper           Run a zookeeper server
    configuration-store Run a configuration-store server
    discovery           Run a discovery server
    proxy               Run a pulsar proxy
    websocket           Run a web socket proxy server
    functions-worker    Run a functions worker server
    sql-worker          Run a sql worker server
    sql                 Run sql CLI
    standalone          Run a broker server with local bookies and local zookeeper


    initialize-cluster-metadata     One-time metadata initialization
    delete-cluster-metadata         Delete a cluster's metadata
    initialize-transaction-coordinator-metadata     One-time transaction coordinator metadata initialization
    initialize-namespace     namespace initialization
    compact-topic       Run compaction against a topic
    zookeeper-shell     Open a ZK shell client
    broker-tool         CLI to operate a specific broker
    tokens              Utility to create authentication tokens


    help                This help message


or command is the full name of a class with a defined main() method.


Environment variables:
   PULSAR_LOG_CONF               Log4j configuration file (default $DEFAULT_LOG_CONF)
   PULSAR_BROKER_CONF            Configuration file for broker (default: $DEFAULT_BROKER_CONF)
   PULSAR_BOOKKEEPER_CONF        Configuration file for bookie (default: $DEFAULT_BOOKKEEPER_CONF)
   PULSAR_ZK_CONF                Configuration file for zookeeper (default: $DEFAULT_ZK_CONF)
   PULSAR_CONFIGURATION_STORE_CONF         Configuration file for global configuration store (default: $DEFAULT_CONFIGURATION_STORE_CONF)
   PULSAR_DISCOVERY_CONF         Configuration file for discovery service (default: $DEFAULT_DISCOVERY_CONF)
   PULSAR_WEBSOCKET_CONF         Configuration file for websocket proxy (default: $DEFAULT_WEBSOCKET_CONF)
   PULSAR_PROXY_CONF             Configuration file for Pulsar proxy (default: $DEFAULT_PROXY_CONF)
   PULSAR_WORKER_CONF            Configuration file for functions worker (default: $DEFAULT_WORKER_CONF)
   PULSAR_STANDALONE_CONF        Configuration file for standalone (default: $DEFAULT_STANDALONE_CONF)
   PULSAR_PRESTO_CONF            Configuration directory for Pulsar Presto (default: $DEFAULT_PULSAR_PRESTO_CONF)
   PULSAR_EXTRA_OPTS             Extra options to be passed to the jvm
   PULSAR_EXTRA_CLASSPATH        Add extra paths to the pulsar classpath
   PULSAR_PID_DIR                Folder where the pulsar server PID file should be stored
   PULSAR_STOP_TIMEOUT           Wait time before forcefully kill the pulsar server instance, if the stop is not successful


These variable can also be set in conf/pulsar_env.sh
EOF
}


add_maven_deps_to_classpath() {
    MVN="mvn"
    if [ "$MAVEN_HOME" != "" ]; then
    MVN=${MAVEN_HOME}/bin/mvn
    fi


    # Need to generate classpath from maven pom. This is costly so generate it
    # and cache it. Save the file into our target dir so a mvn clean will get
    # clean it up and force us create a new one.
    f="${PULSAR_HOME}/distribution/server/target/classpath.txt"
    if [ ! -f "${f}" ]
    then
    ${MVN} -f "${PULSAR_HOME}/pom.xml" dependency:build-classpath -DincludeScope=compile -Dmdep.outputFile="${f}" &> /dev/null
    fi
    PULSAR_CLASSPATH=${CLASSPATH}:`cat "${f}"`
}


if [ -d "$PULSAR_HOME/lib" ]; then
PULSAR_CLASSPATH=$PULSAR_CLASSPATH:$PULSAR_HOME/lib/*
    ASPECTJ_AGENT_PATH=`ls -1 $PULSAR_HOME/lib/org.aspectj-aspectjweaver-*.jar`
else
    add_maven_deps_to_classpath


    ASPECTJ_VERSION=`grep '<aspectj.version>' $PULSAR_HOME/pom.xml | awk -F'>' '{print $2}' | awk -F'<' '{print $1}'`
    ASPECTJ_AGENT_PATH="$HOME/.m2/repository/org/aspectj/aspectjweaver/$ASPECTJ_VERSION/aspectjweaver-$ASPECTJ_VERSION.jar"
fi


ASPECTJ_AGENT="-javaagent:$ASPECTJ_AGENT_PATH"


# if no args specified, show usage
if [ $# = 0 ]; then
    pulsar_help;
    exit 1;
fi


# get arguments
COMMAND=$1
shift


if [ -z "$PULSAR_WORKER_CONF" ]; then
    PULSAR_WORKER_CONF=$DEFAULT_WORKER_CONF
fi


if [ -z "$PULSAR_BROKER_CONF" ]; then
    PULSAR_BROKER_CONF=$DEFAULT_BROKER_CONF
fi


if [ -z "$PULSAR_BOOKKEEPER_CONF" ]; then
    PULSAR_BOOKKEEPER_CONF=$DEFAULT_BOOKKEEPER_CONF
fi


if [ -z "$PULSAR_ZK_CONF" ]; then
    PULSAR_ZK_CONF=$DEFAULT_ZK_CONF
fi


if [ -z "$PULSAR_GLOBAL_ZK_CONF" ]; then
    PULSAR_GLOBAL_ZK_CONF=$DEFAULT_GLOBAL_ZK_CONF
fi


if [ -z "$PULSAR_CONFIGURATION_STORE_CONF" ]; then
    PULSAR_CONFIGURATION_STORE_CONF=$DEFAULT_CONFIGURATION_STORE_CONF
fi


if [ -z "$PULSAR_DISCOVERY_CONF" ]; then
    PULSAR_DISCOVERY_CONF=$DEFAULT_DISCOVERY_CONF
fi


if [ -z "$PULSAR_PROXY_CONF" ]; then
    PULSAR_PROXY_CONF=$DEFAULT_PROXY_CONF
fi


if [ -z "$PULSAR_WEBSOCKET_CONF" ]; then
    PULSAR_WEBSOCKET_CONF=$DEFAULT_WEBSOCKET_CONF
fi


if [ -z "$PULSAR_STANDALONE_CONF" ]; then
    PULSAR_STANDALONE_CONF=$DEFAULT_STANDALONE_CONF
fi


if [ -z "$PULSAR_LOG_CONF" ]; then
    PULSAR_LOG_CONF=$DEFAULT_LOG_CONF
fi


if [ -z "$PULSAR_PRESTO_CONF" ]; then
    PULSAR_PRESTO_CONF=$DEFAULT_PULSAR_PRESTO_CONF
fi


PULSAR_CLASSPATH="$PULSAR_JAR:$PULSAR_CLASSPATH:$PULSAR_EXTRA_CLASSPATH"
PULSAR_CLASSPATH="`dirname $PULSAR_LOG_CONF`:$PULSAR_CLASSPATH"
OPTS="$OPTS -Dlog4j.configurationFile=`basename $PULSAR_LOG_CONF`"


# Ensure we can read bigger content from ZK. (It might be
# rarely needed when trying to list many z-nodes under a
# directory)
OPTS="$OPTS -Djute.maxbuffer=10485760 -Djava.net.preferIPv4Stack=true"


OPTS="-cp $PULSAR_CLASSPATH $OPTS"


OPTS="$OPTS $PULSAR_EXTRA_OPTS $PULSAR_MEM $PULSAR_GC"


# log directory & file
PULSAR_LOG_DIR=${PULSAR_LOG_DIR:-"$PULSAR_HOME/logs"}
PULSAR_LOG_APPENDER=${PULSAR_LOG_APPENDER:-"RoutingAppender"}
PULSAR_LOG_ROOT_LEVEL=${PULSAR_LOG_ROOT_LEVEL:-"info"}
PULSAR_LOG_LEVEL=${PULSAR_LOG_LEVEL:-"info"}
PULSAR_ROUTING_APPENDER_DEFAULT=${PULSAR_ROUTING_APPENDER_DEFAULT:-"Console"}


#Configure log configuration system properties
OPTS="$OPTS -Dpulsar.log.appender=$PULSAR_LOG_APPENDER"
OPTS="$OPTS -Dpulsar.log.dir=$PULSAR_LOG_DIR"
OPTS="$OPTS -Dpulsar.log.level=$PULSAR_LOG_LEVEL"
OPTS="$OPTS -Dpulsar.routing.appender.default=$PULSAR_ROUTING_APPENDER_DEFAULT"


# Functions related logging
OPTS="$OPTS -Dpulsar.functions.process.container.log.dir=$PULSAR_LOG_DIR"
# instance
OPTS="$OPTS -Dpulsar.functions.java.instance.jar=${JAVA_INSTANCE_JAR}"
OPTS="$OPTS -Dpulsar.functions.python.instance.file=${PY_INSTANCE_FILE}"
OPTS="$OPTS -Dpulsar.functions.extra.dependencies.dir=${FUNCTIONS_EXTRA_DEPS_DIR}"
OPTS="$OPTS -Dpulsar.functions.instance.classpath=${PULSAR_CLASSPATH}"
OPTS="$OPTS -Dpulsar.module.instanceid=${PULSAR_MODULE_INSTANCE_ID} -Dpulsar.module.type=$COMMAND -Dkafka.cluster=${KAFKA_CLUSTER} -Dpulsar.hostname=${HOSTNAME} -Dpulsar.hostip=${HOST_IP} -Dpulsar.cluster=${PULSAR_CLUSTER} -Dpulsar.topic=${PULSAR_TOPIC}"


ZK_OPTS=" -Dzookeeper.4lw.commands.whitelist=* -Dzookeeper.snapshot.trust.empty=true"


#Change to PULSAR_HOME to support relative paths
cd "$PULSAR_HOME"
if [ $COMMAND == "broker" ]; then
    PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"pulsar-broker.log"}
    exec $JAVA $OPTS $ASPECTJ_AGENT -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.PulsarBrokerStarter --broker-conf $PULSAR_BROKER_CONF $@
elif [ $COMMAND == "bookie" ]; then
    PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"bookkeeper.log"}
    # Pass BOOKIE_EXTRA_OPTS option defined in pulsar_env.sh
    OPTS="$OPTS $BOOKIE_EXTRA_OPTS"
    exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.bookkeeper.proto.BookieServer --conf $PULSAR_BOOKKEEPER_CONF $@
elif [ $COMMAND == "zookeeper" ]; then
    PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"zookeeper.log"}
    exec $JAVA ${ZK_OPTS} $OPTS $ASPECTJ_AGENT -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.zookeeper.ZooKeeperStarter $PULSAR_ZK_CONF $@
elif [ $COMMAND == "global-zookeeper" ]; then
    PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"global-zookeeper.log"}
    # Allow global ZK to turn into read-only mode when it cannot reach the quorum
    OPTS="${OPTS} ${ZK_OPTS} -Dreadonlymode.enabled=true"
    exec $JAVA $OPTS $ASPECTJ_AGENT -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.zookeeper.ConfigurationStoreStarter $PULSAR_GLOBAL_ZK_CONF $@
elif [ $COMMAND == "configuration-store" ]; then
    PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"configuration-store.log"}
    # Allow global ZK to turn into read-only mode when it cannot reach the quorum
    OPTS="${OPTS} ${ZK_OPTS} -Dreadonlymode.enabled=true"
    exec $JAVA $OPTS $ASPECTJ_AGENT -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.zookeeper.ConfigurationStoreStarter $PULSAR_CONFIGURATION_STORE_CONF $@
elif [ $COMMAND == "discovery" ]; then
    PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"discovery.log"}
    exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.discovery.service.server.DiscoveryServiceStarter $PULSAR_DISCOVERY_CONF $@
elif [ $COMMAND == "proxy" ]; then
    PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"pulsar-proxy.log"}
    exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.proxy.server.ProxyServiceStarter --config $PULSAR_PROXY_CONF $@
elif [ $COMMAND == "websocket" ]; then
    PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"pulsar-websocket.log"}
    exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.websocket.service.WebSocketServiceStarter $PULSAR_WEBSOCKET_CONF $@
elif [ $COMMAND == "functions-worker" ]; then
    PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"pulsar-functions-worker.log"}
    exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.functions.worker.FunctionWorkerStarter -c $PULSAR_WORKER_CONF $@
elif [ $COMMAND == "standalone" ]; then
    PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"pulsar-standalone.log"}
    exec $JAVA $OPTS $ASPECTJ_AGENT ${ZK_OPTS} -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.PulsarStandaloneStarter --config $PULSAR_STANDALONE_CONF $@
elif [ $COMMAND == "initialize-cluster-metadata" ]; then
    exec $JAVA $OPTS org.apache.pulsar.PulsarClusterMetadataSetup $@
elif [ $COMMAND == "delete-cluster-metadata" ]; then
    exec $JAVA $OPTS org.apache.pulsar.PulsarClusterMetadataTeardown $@
elif [ $COMMAND == "initialize-transaction-coordinator-metadata" ]; then
    exec $JAVA $OPTS org.apache.pulsar.PulsarTransactionCoordinatorMetadataSetup $@
elif [ $COMMAND == "initialize-namespace" ]; then
    exec $JAVA $OPTS org.apache.pulsar.PulsarInitialNamespaceSetup $@
elif [ $COMMAND == "zookeeper-shell" ]; then
    exec $JAVA $OPTS org.apache.zookeeper.ZooKeeperMain $@
elif [ $COMMAND == "broker-tool" ]; then
    exec $JAVA $OPTS org.apache.pulsar.broker.tools.BrokerTool $@
elif [ $COMMAND == "compact-topic" ]; then
    exec $JAVA $OPTS org.apache.pulsar.compaction.CompactorTool --broker-conf $PULSAR_BROKER_CONF $@
elif [ $COMMAND == "sql" ]; then
    check_presto_libraries
    exec $JAVA -cp "${PRESTO_HOME}/lib/*" io.prestosql.cli.Presto --server localhost:8081 "${@}"
elif [ $COMMAND == "sql-worker" ]; then
    check_presto_libraries
    exec ${PRESTO_HOME}/bin/launcher --etc-dir ${PULSAR_PRESTO_CONF} "${@}"
elif [ $COMMAND == "tokens" ]; then
      exec $JAVA $OPTS org.apache.pulsar.utils.auth.tokens.TokensCliUtils $@
elif [ $COMMAND == "help" -o $COMMAND == "--help" -o $COMMAND == "-h" ]; then
    pulsar_help;
else
    echo ""
    echo "-- Invalid command '$COMMAND' -- Use '$0 help' to get a list of valid commands"
    echo ""
    exit 1
fi
Copy the code

2) the pulsar – daemon – kafka

This script file is copied from the pulsar-daemon script file. Add the following changes to the pulsar-daemon script file: (Note: in the following figure, pulsar-daemon is on the left and pulsar-daemon-kafka is on the right)

• Add content to read logenv.sh;

• Read pulsar-kafka;

• The complete contents of the pulsar-daemon-kafka script are as follows:

#! /usr/bin/env bash # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under The Apache License, Version 2.0 (The # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the  License. # usage() { cat <<EOF Usage: pulsar-daemon (start|stop) <command> <args... > where command is one of: broker Run a broker server bookie Run a bookie server zookeeper Run a zookeeper server configuration-store Run a configuration-store server discovery Run a discovery server websocket Run a websocket proxy server functions-worker Run a functions worker server standalone Run a standalone Pulsar service proxy Run a Proxy Pulsar service where argument is one of: -force (accepted only with stop command): Decides whether to stop the server forcefully if not stopped by normal shutdown EOF } BINDIR=$(dirname "$0") PULSAR_HOME=$(cd -P $BINDIR/.. ; pwd) # Check bookkeeper env and load bkenv.sh if [ -f "$PULSAR_HOME/conf/bkenv.sh" ] then . "$PULSAR_HOME/conf/bkenv.sh"  fi if [ -f "$PULSAR_HOME/conf/pulsar_env.sh" ] then . "$PULSAR_HOME/conf/pulsar_env.sh" fi if [ -f "$PULSAR_HOME/conf/logenv.sh" ] then . "$PULSAR_HOME/conf/logenv.sh" fi PULSAR_LOG_APPENDER=${PULSAR_LOG_APPENDER:-"RollingFile"} PULSAR_STOP_TIMEOUT=${PULSAR_STOP_TIMEOUT:-30} PULSAR_PID_DIR=${PULSAR_PID_DIR:-$PULSAR_HOME/bin} if [ $# = 0 ]; then usage exit 1 elif [ $# = 1 ]; then if [ $1 == "--help" -o $1 == "-h" ]; then usage exit 1 else echo "Error: no enough arguments provided." usage exit 1 fi fi startStop=$1 shift command=$1 shift case $command in (broker) echo "doing $startStop $command ..." ;; (bookie) echo "doing $startStop $command ..." ;; (zookeeper) echo "doing $startStop $command ..." ;; (global-zookeeper) echo "doing $startStop $command ..." ;; (configuration-store) echo "doing $startStop $command ..." ;; (discovery) echo "doing $startStop $command ..." ;; (websocket) echo "doing $startStop $command ..." ;; (functions-worker) echo "doing $startStop $command ..." ;; (standalone) echo "doing $startStop $command ..." ;; (proxy) echo "doing $startStop $command ..." ;; (*) echo "Error: unknown service name $command" usage exit 1 ;; esac export PULSAR_LOG_DIR=$PULSAR_LOG_DIR export PULSAR_LOG_APPENDER=$PULSAR_LOG_APPENDER export PULSAR_LOG_FILE=pulsar-$command-$HOSTNAME.log pid=$PULSAR_PID_DIR/pulsar-$command.pid out=$PULSAR_LOG_DIR/pulsar-$command-$HOSTNAME.out logfile=$PULSAR_LOG_DIR/$PULSAR_LOG_FILE rotate_out_log () { log=$1; num=5; if [ -n "$2" ]; then num=$2 fi if [ -f "$log" ]; then # rotate logs while [ $num -gt 1 ]; do prev=`expr $num - 1` [ -f "$log.$prev" ] && mv "$log.$prev" "$log.$num" num=$prev done mv "$log" "$log.$num"; fi } mkdir -p "$PULSAR_LOG_DIR" case $startStop in (start) if [ -f $pid ]; then if kill -0 `cat $pid` > /dev/null 2>&1; then echo $command running as process `cat $pid`. Stop it first. exit 1 fi fi rotate_out_log $out echo starting $command, logging to $logfile echo Note: Set immediateFlush to true in conf/log4j2-kafka.yaml will guarantee the logging event is flushing to disk immediately. The default behavior is switched off due to performance considerations. pulsar=$PULSAR_HOME/bin/pulsar-kafka nohup $pulsar $command "$@" > "$out" 2>&1 < /dev/null & echo $! > $pid sleep 1; head $out sleep 2; if ! ps -p $! > /dev/null ; then exit 1 fi ;; (stop) if [ -f $pid ]; then TARGET_PID=$(cat $pid) if kill -0 $TARGET_PID > /dev/null 2>&1; then echo "stopping $command" kill $TARGET_PID count=0 location=$PULSAR_LOG_DIR while ps -p $TARGET_PID > /dev/null; do echo "Shutdown is in progress... Please wait..." sleep 1 count=`expr $count + 1` if [ "$count" = "$PULSAR_STOP_TIMEOUT" ]; then break fi done if [ "$count" != "$PULSAR_STOP_TIMEOUT" ]; then echo "Shutdown completed." fi if kill -0 $TARGET_PID > /dev/null 2>&1; then fileName=$location/$command.out $JAVA_HOME/bin/jstack $TARGET_PID > $fileName echo "Thread dumps are taken for analysis at $fileName" if [ "$1" == "-force" ] then echo "forcefully stopping $command" kill -9 $TARGET_PID >/dev/null 2>&1 echo Successfully stopped the process else echo "WARNNING : $command is not stopped completely." exit 1 fi fi else echo "no $command to stop" fi rm $pid else echo no "$command to stop" fi ;; (*) usage exit 1 ;; esacCopy the code

Add jars that Kafka Producer depends on

Add the following three JARS to the {PULSAR_HOME}/lib directory on all nodes of the pulsar cluster:

Connect - API - 2.0.1. Jar disruptor - 3.4.2. Jar kafka - clients - 2.0.1. JarCopy the code

4. Start Pulsar service

  1. To ensure that the Pulsar service logs can be written to Kafka correctly, the bin/pulsar-kafka command is used to start the Pulsar service. If there is no exception, the bin/pulsar-daemon-kafka command is used to start the Pulsar service.
  2. For example, to start the broker, execute the following command:
bin/pulsar-daemon-kafka start broker
Copy the code
  1. Run the ps command to view the broker process as follows:

The sys tag in log4j2-kafka.yaml can instantiate a Kafka Producer using these properties. The broker process logs are sent to the Kafka Broker via Kafka Producer.

5. Test whether Pulsar logs are successfully written to Kafka Broker

Start a Kafka Consumer, subscribe to the Topic log4j2 sends the message, and read the following message content, separated by Spaces:

Broker 1 2020-12-26 17:40:14.363 [promethes-stats-44-1] [org. Eclipse jetty. Server. RequestLog] INFO - 192.168.0.1 - [26 / Dec / 2020:17:40:14 + 0800] "the GET/metrics/HTTP / 1.1" 200 23445 "http://192.168.0.1:8080/metrics" "Prometheus / 2.22.1" 4Copy the code

6. Log retrieval

Open the Kibana page AND search according to the keyword fields: cluster:”pulsar-cluster” AND hostname:”XXX” AND module:”broker” AND level:”INFO”

In the figure above, you can see the log retrieval results for a period of time, and you can add Available fields to the retrieval results as needed. In this way, developers or operations personnel can quickly and effectively analyze the causes of Pulsar service anomalies from multiple dimensions through Kibana. At this point, Apache Pulsar is based on Log4j2+Kafka+ELK to achieve fast log retrieval of a complete set of solutions.

conclusion

At present, the distributed, is a more popular technology direction of service, in a production system, with the continuous development of business, the rapid development of applications and services dimension, from monomer/vertical architecture to distributed/micro service architecture is a natural choice, it is mainly manifested in reducing complexity, fault tolerance, independent deployment, horizontal scaling, etc. But it also faces new challenges, such as the efficiency of troubleshooting and the convenience of operation and maintenance monitoring. This article takes Apache Pulsar as an example to share how the Java process uses Log4j2+Kafka+ELK to realize the fast retrieval of distributed and microservized logs and achieve the effect of service governance.

reading

Focus on StreamCloudNative and discuss the development trend of technologies in various fields with the author 👇

  • Collect logs to Pulsar using Elastic Beats
  • How to use Apache Flume to send log data to Apache Pulsar
  • KoP officially open source: native Kafka protocol supported on Apache Pulsar

Welcome to contribute

Did you get any inspiration from this article?

Do you have any unique experiences to share and grow with your community?

The Apache Pulsar community welcomes contributions. Apache Pulsar and StreamNative hope to provide a platform for people to share their Pulsar experience and knowledge, and help more people in the community learn more about Pulsar. Scan code to add Bot friends can contact contribute 👇

Click the link to read the original article!