influxdb

InfluxDB is an open source distributed database of timing, events, and metrics. Written in Go without external dependencies. Its design goal is to achieve distributed and horizontal scaling.


InfluxDB Startup process:


1 Use the Docker to pull down the influxDB image

docker pull tutum/influxdb docekr


2 Running influxDB in a Docker environment

docker run -d -p 8083:8083 -p8086:8086 –expose 8090 –expose 8099 –name influxsrv tutum/influxdb

Meanings of each parameter:

-d: The container is running in the background

-p: maps an in-container port to a host port in the format of host port: in-container port.

8083 is the Web management tool port of the InfluxDB

8086 is the HTTP API port of the InfluxDB

Expose: Enables the container to accept incoming data from the outside

The value of tag (0.8.8) specifies the version to be run. The default value is Latest.


3 After influxDB is enabled, an internal HTTP server management tool is started. Users can access the Web server to operate the InfluxDB.

Of course, you can also access influxDB using the CLI, which is the command line interface.

Open a browser and enter http://127.0.0.1:8083 to access the home page of the management tool


4 Influxdb client See the example

https://github.com/influxdata/influxdb/tree/master/client

Details on the principle of p.influxDB

https://www.linuxdaxue.com/influxdb-principle.html



Grafana

Grafana is an open source temporal statistics and monitoring platform that supports many data sources such as ElasticSearch, Graphite, and InfluxDB, and is known for its powerful interface editor.

Website: https://grafana.com/


Grafana Startup process:

1 Docker pulls the image

docker run -d –name=grafana -p 3000:3000 grafana/grafana


2 Access the home page of the management tool

The default port for logging in to grafana is 3000. The user name and password are admin/admin. The configuration file /etc/grafana.ini needs to be restarted after the configuration file is changed.


3. Create a database and bind the influxDB


4. Create a new panel

Home — > New Dashboard — > Graph — > Click Edit


Metrics in Edit is used to construct an SQL query



Dot Golang

The monitoring log program adds required content to the InfluxDB

1. Import github.com/influxdata/influxdb/client/v2


2. Create the InfluxDB client

// Create a new HTTPClient
	c, err := client.NewHTTPClient(client.HTTPConfig{
		Addr:     addr,
		Username: username,
		Password: password,
	})
	iferr ! = nil { log.Fatal(err) } defer c.Close()Copy the code


3. Create the format and type of the dots to be typed

// Create a new point batch
		bp, err := client.NewBatchPoints(client.BatchPointsConfig{
			Database:  database,
			Precision: precision,
		})
		iferr ! = nil { log.Fatal(err) }Copy the code


4. Create points and add them to the influxDB database

// Create a point and add to batch
		//Tags:Path,Method,Scheme,Status
		tags := map[string]string{
			"Path": v.Path,
			"Method": v.Method,
			"Scheme": v.Scheme,
			"Status": v.Status,
			}

		fields := map[string]interface{}{
			"UpstreamTime": v.UpstreamTime,
			"RequestTime":  v.RequestTime,
			"BytesSent":    v.BytesSent,
		}

pt, err := client.NewPoint("nginx_log", tags, fields, v.TimeLocal)
		iferr ! = nil { log.Fatal(err) } bp.AddPoint(pt) // Write the batchiferr := c.Write(bp); err ! = nil { log.Fatal(err) }Copy the code

Golang complete code

Imooc. log Log file format is as follows:

172.0.0.12 - - [02/May/2018:17:17:35 +0000] http "GET /foo?query=t HTTP/1.0" 200 2133 "-" "KeepAliveClient" "-" 1.005 1.854

172.0.0.12 - - [02/May/2018:17:17:36 +0000] http "POST /bar?query=t HTTP/1.0" 300 2133 "-" "KeepAliveClient" "-" 1.025 1.854


The code logic mainly reads logs in imooc.log log file through the reading module, and then obtains data line by line through regular expression, and points data through influxDB client through the writing module, and finally displays data graphics through grafana.


package main

import (
	"bufio"
	"fmt"
	"github.com/influxdata/influxdb/client/v2"
	"io"
	"net/url"
	"os"
	"regexp"
	"strconv"
	"strings"
	"time"

	"flag"
	"log"
	"net/http"
	"encoding/json"
)

const (
	TypeHandleLine = 0
	TypeErrNum = 1
	TpsIntervalTime = 5
)

var TypeMonitorChan = make(chan int,200)

type Message struct {
	TimeLocal                    time.Time
	BytesSent                    int
	Path, Method, Scheme, Status string
	UpstreamTime, RequestTime    float64} // System status monitoringtype SystemInfo struct {
	HandleLine    int     `json:"handleLine"'// Total number of log lines processed Tpsfloat64 `json:"tps"ReadChanLen int 'json:"readChanLen"` / /readChannel length WriterChanLen int 'json:"writeChanLen"'// Write Channel Length RunTime String' json:"ruanTime"'// Total run time ErrNum int' json:"errNum"'// error count}type Monitor struct {
	startTime time.Time
	data SystemInfo
	tpsSli []int
	tps float64
}

func (m *Monitor)start(lp *LogProcess)  {

	go func() {
		for n := range TypeMonitorChan  {
			switch n {
			case TypeErrNum:
				m.data.ErrNum += 1

			case TypeHandleLine:
				m.data.HandleLine += 1
			}
		}
	}()


	ticker := time.NewTicker(time.Second *TpsIntervalTime)
	go func() {
		for {
			<-ticker.C
			m.tpsSli = append(m.tpsSli,m.data.HandleLine)
			if len(m.tpsSli) > 2 {
				m.tpsSli = m.tpsSli[1:]
				m.tps =  float64(m.tpsSli[1] - m.tpsSli[0])/TpsIntervalTime
			}
		}
	}()


	http.HandleFunc("/monitor", func(writer http.ResponseWriter, request *http.Request) {
		m.data.RunTime = time.Now().Sub(m.startTime).String()
		m.data.ReadChanLen = len(lp.rc)
		m.data.WriterChanLen = len(lp.wc)
		m.data.Tps = m.tps

		ret ,_ := json.MarshalIndent(m.data,""."\t")
		io.WriteString(writer,string(ret))
	})


	http.ListenAndServe(": 9193",nil)
}


type Reader interface {
	Read(rc chan []byte)
}

type Writer interface {
	Writer(wc chan *Message)
}

type LogProcess struct {
	rc    chan []byte
	wc    chan *Message
	read  Reader
	write Writer
}

typeFunc (r *ReadFromFile) Read(rc chan []byte) {// Struct {path string // struct (r *ReadFromFile) Read(rc chan []byte) { err := os.Open(r.path) fmt.Println(r.path)iferr ! = nil { panic(fmt.Sprintf("open file err :", err.error ())))} // Read the contents of the file line by line from the end of the file. F.seek (0, 2)for {
		line, err := rd.ReadBytes('\n') // Continuously read content until needed'\n'The end of theif err == io.EOF {
			time.Sleep(5000 * time.Microsecond)
			continue
		} else iferr ! = nil { panic(fmt.Sprintf("ReadBytes err :", err.Error()))
		}

		TypeMonitorChan <- TypeHandleLine
		rc <- line[:len(line)-1]
	}

}

type WriteToinfluxDB struct {
	influxDBDsn string //influx data source} // Write module /** 1. Initialize the InfluxDB client. 2. Read monitoring data from a Write Channel 3. Construct data and Write it to the influxDB */ func (W *WriteToinfluxDB) Writer(WC chan *Message) {infSli := strings.Split(w.influxDBDsn,"@") addr := infSli[0] username := infSli[1] password := infSli[2] database := infSli[3] precision := infSli[4] // Create a  new HTTPClient c, err := client.NewHTTPClient(client.HTTPConfig{ Addr: addr, Username: username, Password: password, })iferr ! = nil { log.Fatal(err) } defer c.Close()for v := range wc {
		// Create a new point batch
		bp, err := client.NewBatchPoints(client.BatchPointsConfig{
			Database:  database,
			Precision: precision,
		})
		iferr ! = nil { log.Fatal(err) } // Create a point and add to batch //Tags:Path,Method,Scheme,Status tags := map[string]string{"Path": v.Path,
			"Method": v.Method,
			"Scheme": v.Scheme,
			"Status": v.Status,
			}

		fields := map[string]interface{}{
			"UpstreamTime": v.UpstreamTime,
			"RequestTime":  v.RequestTime,
			"BytesSent":    v.BytesSent,
		}

		fmt.Println("taps:",tags)
		fmt.Println("fields:",fields)

		pt, err := client.NewPoint("nginx_log", tags, fields, v.TimeLocal)
		iferr ! = nil { log.Fatal(err) } bp.AddPoint(pt) // Write the batchiferr := c.Write(bp); err ! = nil { log.Fatal(err) } // Close client resourcesiferr := c.Close(); err ! = nil { log.Fatal(err) } log.Println("write success"}} // Func (l *LogProcess)Process() {/ * * 172.0.012 - [04 / Mar 2018:13:49:52 + 0000] HTTP"GET /foo? Query = HTTP / 1.0 t"200, 2133,"-"
	"KeepAliveClient" "-"1.005 1.854 (/ \ \ d +) \ s + ([^ \ [] +) \ s + ([^ \ [] +) \ s + \ [\] ([^ \]] +) \ s + ([a-z] +) + \ \ s"([^"\] +)"\s+(\d{3})\s+(\d+)\s+\"([^"\] +)"\s+\"(.*?) \"\s+\"([\d\.-]+)\"\s+([\d\.-]+)\s+([\d\.-]+) */ r := regexp.MustCompile(`([\d\.]+)\s+([^ \[]+)\s+([^ \[]+)\s+\[([^\]]+)\]\s+([a-z]+)\s+\"([^"\] +)"\s+(\d{3})\s+(\d+)\s+\"([^"\] +)"\s+\"(.*?) \"\s+\"([\d\.-]+)\"\s+([\d\.-]+)\s+([\d\.-]+)`) for v := range l.rc { ret := r.FindStringSubmatch(string(v)) if len(ret) ! = 14 { TypeMonitorChan <- TypeErrNum fmt.Println("FindStringSubmatch fail:", string(v)) fmt.println (len(ret)) continue} message := & message {} [04/Mar/2018:13:49:52 +0000] loc, _ := time.LoadLocation("Asia/Shanghai") t, err := time.ParseInLocation("02/Jan/2006:15:04:05 +0000", ret[4], loc) if err ! = nil { TypeMonitorChan <- TypeErrNum fmt.Println("ParseInLocation fail:", err.error (), ret[4])} message.timelocal = t 2133 byteSent, _ := strconv.Atoi(ret[8]) message.BytesSent = byteSent //"GET /foo? Query = HTTP / 1.0 t" reqSli := strings.Split(ret[6], " ") if len(reqSli) ! = 3 { TypeMonitorChan <- TypeErrNum fmt.Println("strings.Split fail:", ret[6]) continue } message.Method = reqSli[0] u, err := url.Parse(reqSli[1]) if err ! = nil { TypeMonitorChan <- TypeErrNum fmt.Println("url parse fail:", err) continue } message.Path = u.Path //http message.Scheme = ret[5] //code: 200 message.Status = ret[7] //1.005 upstreamTime, _ := strConv. ParseFloat(ret[12], 64) message.UpstreamTime = UpstreamTime //1.854 requestTime, _ := strconv.ParseFloat(ret[13], 64) message.RequestTime = RequestTime // fmt.println (message) l.c < -message}} /** */ func main() {var path, influDsn String flag.stringvar (&path,"path","./imooc.log","read file path") flag.StringVar(&influDsn, "influxDsn","http://127.0.01:8086@imooc@imoocpass@imooc@s","influx data source") flag.Parse() r := &ReadFromFile{ path: path, } w := &WriteToinfluxDB{ influxDBDsn: influDsn, } lp := &LogProcess{ rc: make(chan []byte,200), wc: make(chan *Message), read: r, write: w, } go lp.read.Read(lp.rc) for i:=1; i<2 ; i++ { go lp.Process() } for i:=1; i<4 ; i++ { go lp.write.Writer(lp.wc) } fmt.Println("begin !!!") m:= &Monitor{ startTime:time.Now(), data:SystemInfo{}, } m.start(lp) }Copy the code