Egg.js: An enterprise level framework based on KOA2
Kafka: distributed publish subscribe messaging system with high throughput
This article will integrate the egg + Kafka + mysql logging system example
System requirements: Log recording, message queue control through Kafka
Thinking:
λ.1 Environment preparation
1) Kafka
Download Kafka and unzip it
Start the zookeeper:
bin/zookeeper-server-start.sh config/zookeeper.properties
Copy the code
Start the Kafka server
Num. Partitions =5 in config/server.properties, we set 5 partitions
bin/kafka-server-start.sh config/server.properties
Copy the code
(2) an egg + mysql
Build the egg from the scaffolding. Install kafka-node, egg-mysql
Mysql user name root Password 123456
Lambda. 2 integration
- The root directory creates app.js, which runs every time the project loads
'use strict';
const kafka = require('kafka-node');
module.exports = app= > {
app.beforeStart(async() = > {const ctx = app.createAnonymousContext();
const Producer = kafka.Producer;
const client = new kafka.KafkaClient({ kafkaHost: app.config.kafkaHost });
const producer = new Producer(client, app.config.producerConfig);
producer.on('error'.function(err) {
console.error('ERROR: [Producer] ' + err);
});
app.producer = producer;
const consumer = new kafka.Consumer(client, app.config.consumerTopics, {
autoCommit: false}); consumer.on('message'.async function(message) {
try {
await ctx.service.log.insert(JSON.parse(message.value));
consumer.commit(true, (err, data) => {
console.error('commit:', err, data);
});
} catch (error) {
console.error('ERROR: [GetMessage] ', message, error); }}); consumer.on('error'.function(err) {
console.error('ERROR: [Consumer] ' + err);
});
});
};
Copy the code
The above code creates a producer and a consumer.
The producer is created and loaded into the APP global object. We will produce the message on request. I’m just creating a new instance
The consumer retrieving message accesses the Insert method of the Service layer (database insert data).
For details, please refer to the kafka-Node API. There are producer and consumer configuration parameters below.
- The controller, the log. Js
Here we get the producer and pass it to the service layer
'use strict';
const Controller = require('egg').Controller;
class LogController extends Controller {
/** * @description Kafka controls log information flow * @host /log/notice * @method POST * @param {log} log Information */
async notice() {
const producer = this.ctx.app.producer;
const Response = new this.ctx.app.Response();
const requestBody = this.ctx.request.body;
const backInfo = await this.ctx.service.log.send(producer, requestBody);
this.ctx.body = Response.success(backInfo); }}module.exports = LogController;
Copy the code
- Service, the js
So we have a send method, and we call producer.send to produce the producer
The INSERT method inserts data into the database
'use strict';
const Service = require('egg').Service;
const uuidv1 = require('uuid/v1');
class LogService extends Service {
async send(producer, params) {
const payloads = [
{
topic: this.ctx.app.config.topic,
messages: JSON.stringify(params),
},
];
producer.send(payloads, function(err, data) {
console.log('send : ', data);
});
return 'success';
}
async insert(message) {
try {
const logDB = this.ctx.app.mysql.get('log');
const ip = this.ctx.ip;
const Logs = this.ctx.model.Log.build({
id: uuidv1(),
type: message.type || ' '.level: message.level || 0.operator: message.operator || ' '.content: message.content || ' ',
ip,
user_agent: message.user_agent || ' '.error_stack: message.error_stack || ' '.url: message.url || ' '.request: message.request || ' '.response: message.response || ' '.created_at: new Date(),
updated_at: new Date()});const result = await logDB.insert('logs', Logs.dataValues);
if (result.affectedRows === 1) {
console.log(`SUCEESS: [Insert ${message.type}] `);
} else console.error('ERROR: [Insert DB] ', result);
} catch (error) {
console.error('ERROR: [Insert] ', message, error); }}}module.exports = LogService;
Copy the code
- Config, config. Default. Js
Some of the configuration parameters used in the above code are specified here, note that 5 partitions are opened here.
'use strict';
module.exports = appInfo= > {
const config = (exports = {});
const topic = 'logAction_p5';
// add your config here
config.middleware = [];
config.security = {
csrf: {
enable: false,}};// mysql database configuration
config.mysql = {
clients: {
basic: {
host: 'localhost'.port: '3306'.user: 'root'.password: '123456'.database: 'merchants_basic',},log: {
host: 'localhost'.port: '3306'.user: 'root'.password: '123456'.database: 'merchants_log',}},default: {},
app: true.agent: false};// sequelize config
config.sequelize = {
dialect: 'mysql'.database: 'merchants_log'.host: 'localhost'.port: '3306'.username: 'root'.password: '123456'.dialectOptions: {
requestTimeout: 999999,},pool: {
acquire: 999999,}};// kafka config
config.kafkaHost = 'localhost:9092';
config.topic = topic;
config.producerConfig = {
// Partitioner type (default = 0, random = 1, cyclic = 2, keyed = 3, custom = 4), default 0
partitionerType: 1}; config.consumerTopics = [ { topic,partition: 0 },
{ topic, partition: 1 },
{ topic, partition: 2 },
{ topic, partition: 3 },
{ topic, partition: 4},];return config;
};
Copy the code
- Entity class:
Mode, the js
Sequelize is used here
'use strict';
module.exports = app= > {
const { STRING, INTEGER, DATE, TEXT } = app.Sequelize;
const Log = app.model.define('log', {
/** * UUID */
id: { type: STRING(36), primaryKey: true },
/** * Log type */
type: STRING(100),
/** * Priority level (a higher number indicates a higher priority) */
level: INTEGER,
/** ** operator */
operator: STRING(50),
/** * Log content */
content: TEXT,
/** * IP */
ip: STRING(36),
/** * Current user agent information */
user_agent: STRING(150),
/** * error stack */
error_stack: TEXT,
/** * URL */
url: STRING(255),
/**
* 请求对象
*/
request: TEXT,
/** * response object */
response: TEXT,
/** * create time */
created_at: DATE,
/** * update time */
updated_at: DATE,
});
return Log;
};
Copy the code
- Test Python scripts:
import requests
from multiprocessing import Pool
from threading import Thread
from multiprocessing import Process
def loop(a):
t = 1000
while t:
url = "http://localhost:7001/log/notice"
payload = "{\n\t\"type\": \"ERROR\",\n\t\"level\": 1,\n\t\"content\": \"URL send ERROR\",\n\t\"operator\": \"Knove\"\n}"
headers = {
'Content-Type': "application/json".'Cache-Control': "no-cache"
}
response = requests.request("POST", url, data=payload, headers=headers)
print(response.text)
if __name__ == '__main__':
for i in range(10):
t = Thread(target=loop)
t.start()
Copy the code
- Construction sentences:
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for logs
-- ----------------------------
DROP TABLE IF EXISTS `logs`;
CREATE TABLE `logs` (
`id` varchar(36) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL.`type` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT 'Log type'.`level` int(11) NULL DEFAULT NULL COMMENT 'Priority level (higher number, higher priority)'.`operator` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT 'Operator'.`content` text CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL COMMENT 'Log message'.`ip` varchar(36) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT 'IP\r\nIP'.`user_agent` varchar(150) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT 'Current user Agent information'.`error_stack` text CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL COMMENT 'Error stack'.`url` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT 'the current URL'.`request` text CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL COMMENT 'Request object'.`response` text CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL COMMENT 'Response object'.`created_at` datetime(0) NULL DEFAULT NULL COMMENT 'Creation time'.`updated_at` datetime(0) NULL DEFAULT NULL COMMENT 'Update Time',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_bin ROW_FORMAT = Dynamic;
SET FOREIGN_KEY_CHECKS = 1;
Copy the code
Lambda. 3 team
There are very few similar materials on the Internet, and I gnaw all kinds of documents to explore the way of technical implementation
If you have any questions, please feel free to comment. More than happy to help solve the problem