This article is a brief analysis of the code of Kong-plugin-kafka-log. Since the version of Kong-plugin-kafka-log is adapted to Kong is older, I have updated the source code and adapted to Kong version 2.0.x. Reference (github.com/tzssangglas…).

handler.lua

  1. The local variable
local mt_cache = { __mode = "k" }
local producers_cache = setmetatable({}, mt_cache)
Copy the code

Setmetatable

Metatable is a concept unique to Lua that behaves similarly to operator overloading. For example, we can override Add to evaluate the union of two Lua arrays. Or override toString to define a function that converts to a string. Lua provides two functions that handle metatables:

  • The first is setmetatable(table, metatable), which sets the metatable for atable;
  • The second is getMetatable (table), which is used to get the metatable of the table.

Weak tables, a unique concept in Lua, are related to garbage collection. A table is weak if its meta table has a mode field.

  • If mode is k, it means that the key of the table is weak reference.
  • If the value of __mode is v, it means that the table value is weak reference.
  • Of course, you can also set it to KV to indicate that the key and value of the table are weak references.

Any weak table whose key or value is reclaimed will have its entire key-value object reclaimed. Mt_cache overloads the producers_cache collection policy. The key is a weak reference. Once the key is reclaimed, the entire key and value in producers_cache will be reclaimed.

  1. Local function log function
if premature then
    return
end
Copy the code

This code is a callback to the timer. Premature is a flag indicating whether the timer is run prematurely and will only occur if the Nginx worker quits (basically ‘I didn’t execute this timer but cancelled it due to shutdown/reload’)

Github.com/Kong/kong/i…

  1. Cache_key: cache_key; cache_key: cache_key
--- Computes a cache key for a given configuration.
local function cache_key(conf)
  -- here we rely on validation logic in schema that automatically assigns a unique id
  -- on every configuartion update
  return conf.uuid
end

--- Publishes a message to Kafka.
-- Must run in the context of `ngx.timer.at`.
local function log(premature, conf, message)
  if premature then
    return
  end

  local cache_key = cache_key(conf)
  if not cache_key then
    ngx.log(ngx.ERR, "[kafka-log] cannot log a given request because configuration has no uuid")
    return
  end

  local producer = producers_cache[cache_key]
  if not producer then
    kong.log.notice("creating a new Kafka Producer for cache key: ", cache_key)

    local err
    producer, err = producers.new(conf)
    if not producer then
      ngx.log(ngx.ERR, "[kafka-log] failed to create a Kafka Producer for a given configuration: ", err)
      return
    end

    producers_cache[cache_key] = producer
  end

  local ok, err = producer:send(conf.topic, nil, cjson_encode(message))
  if not ok then
    ngx.log(ngx.ERR, "[kafka-log] failed to send a message on topic ", conf.topic, ":", err)
    return
  end
end
Copy the code

The original logic of kafka-log is:

  1. When a plug-in is added or updated, the UUID is randomly generated and used as the UUIDproducers_cacheKey and value are created based on the current configurationproducer;
  2. Due to theproducers_cacheIs a table whose key is a weak reference, so every time the plug-in is updated, the UUID is updated,producers_cacheIn the oldproducerWill be GC, then use the updated UUID as the key, based on the updated configurationproducerAs value, put producers_cache;
  3. The purpose of this is to synchronize the latest configuration of the plug-in toproducers_cache, and log push to ensure that the configuration andproducers_cacheSynchronization avoids generating logs every time they are pushedproducerThe overhead of;

Once you’ve sorted out this logic, it’s easy to make changes. You should also update your UUID when you add or update a plug-in. When kafka-log first came out, kong was in version 0.1.x, and it was possible to manually configure undeclared attributes (Uuid) in the plugin.

--- (Re)assigns a unique id on every configuration update.
-- since `uuid` is not a part of the `fields`, clients won't be able to change it
local function regenerate_uuid(schema, plugin_t, dao, is_updating)
  plugin_t.uuid = utils.uuid()
  return true
end
Copy the code

Add a uUID attribute to the plug-in configuration during self_check. In this way, the user is unaware of the update, but runs self_Check to quietly update the UUID attribute to complete the producers_cache update. In kong 2.0. X, self_Check is removed, but there is an entity_check property, which I modified as follows:

entity_checks = {
        { custom_entity_check = {
            field_sources = { "config" },
            fn = function(entity)
                local config = entity.config...-- Update the uUID attribute when updating the configuration
                config.uuid = utils.uuid()
                return true
            end}}},Copy the code

New (conf). Producers comes from another code in the plugin, local producers = require “Kong.plugins.kafka-log. producers” takes the producer in the cache and executes the send function. If the execution fails, a local log is generated.

  1. function KafkaLogHandler:log
function KafkaLogHandler:log(conf, other)
    KafkaLogHandler.super.log(self)
    local message = basic_serializer.serialize(ngx)
    local ok, err = ngx.timer.at(0.log, conf, message)
    if not ok then
        ngx.log(ngx.ERR, "[kafka-log] failed to create timer: ", err)
    end
end
Copy the code

This is a phase in the plug-in execution declaration cycle provided by Kong, the log phase, which is executed after the request receives a response from upstream and before it is returned to the downstream client. Kong pushes logs from basic_serializer. Serialize (NGX), which is the context of the serialized request in nginx and will be converted to JSON format later. The code that triggers the local function log function is

ngx.timer.at(0.log, conf, message)
Copy the code

Ngx.timer.at

In OpenResty, we sometimes need to perform tasks periodically in the background, such as synchronizing data, cleaning up logs, and so on. If you were to design it, what would you do? The easiest way to do this is to provide an API to the outside world. Then the system crontab periodically calls curl to access the interface and implement the curving requirement. However, this not only creates a sense of fragmentation, but also adds complexity to operations and maintenance. So OpenResty provides ngx. Timer to address this need. You can think of ngx. Timer as a client request simulated by OpenResty that triggers the corresponding callback function. There are two types of scheduled tasks for OpenResty:

  • Ngx.timer.at, which is used to perform one-time scheduled tasks;
  • Ngx.time. every, used to perform periodic tasks. (Above quote from Wen Ming)

Why use ngx.timer.at to perform local function log? Because the cosocket API is not available in set_by_lua, log_by_lua, header_filter_by_lua* and body_filter_by_lua*. Init_by_lua * and init_worker_by_lua* are temporarily unavailable. The local function log is the log_by_lua* phase, and you can’t use the COsocket API directly. Using ngx.timer.at(0, log, conf, message) can bypass this limitation. This bypass is also the dominant approach to similar cases in OpenResty application development.

producers.lua

  1. Create_producer function
--- Creates a new Kafka Producer.
local function create_producer(conf)...end
return { new = create_producer }
Copy the code

Return {new = create_producer} returns the producers. New (conf) function created by handler.lua.

  1. Create_producer function
local function create_producer(conf)
  local broker_list = {}
  for idx, value in ipairs(conf.bootstrap_servers) do
    local server = types.bootstrap_server(value)
    if not server then
      return nil."invalid bootstrap server value: ". valueend
    broker_list[idx] = server
  end
Copy the code

The bootstrap_Servers parameter in the plug-in configuration is looped to verify the validity of kafka’s broker IP +port, which is the port that Kafka pushes messages to.

  1. producer_config
local producer_config = {
    -- settings affecting all Kafka APIs (including Metadata API, Produce API, etc)
    socket_timeout = conf.timeout,
    keepalive_timeout = conf.keepalive,
    -- settings specific to Kafka Produce API
    required_acks = conf.producer_request_acks,
    request_timeout = conf.producer_request_timeout,
    batch_num = conf.producer_request_limits_messages_per_request,
    batch_size = conf.producer_request_limits_bytes_per_request,
    max_retry = conf.producer_request_retries_max_attempts,
    retry_backoff = conf.producer_request_retries_backoff_timeout,
    producer_type = conf.producer_async and "async" or "sync",
    flush_time = conf.producer_async_flush_timeout,
    max_buffering = conf.producer_async_buffering_limits_messages_in_memory,
  }
  local cluster_name = conf.uuid
  return kafka_producer:new(broker_list, producer_config, cluster_name)
Copy the code

Producer_config is a table that sets the various TCP connections and Kafka-related configurations in the plug-in configuration.

local cluster_name = conf.uuid
return kafka_producer:new(broker_list, producer_config, cluster_name)
Copy the code

Conf. Uuid is used. Optimization may be needed to find a method to extract the unique identifier of the CONF instead of the UUID. Call the resty.kafka.Producer object in the Lua-resty-Kafka library to execute the real push message to kafka’s underlying method.

schema.lua

local types = require "kong.plugins.kafka-log.types"
local utils = require "kong.tools.utils"
--- Validates value of `bootstrap_servers` field.
local function check_bootstrap_servers(values)
  if values and 0 < #values then
    for _, value in ipairs(values) do
      local server = types.bootstrap_server(value)
      if not server then
        return false."invalid bootstrap server value: ". valueend
    end
    return true
  end
  return false."bootstrap_servers is required"
end
--- (Re)assigns a unique id on every configuration update.
-- since `uuid` is not a part of the `fields`, clients won't be able to change it
local function regenerate_uuid(schema, plugin_t, dao, is_updating)
  plugin_t.uuid = utils.uuid()
  return true
end
return {
  fields = {
    bootstrap_servers = { type = "array", required = true, func = check_bootstrap_servers },
    topic = { type = "string", required = true },
    timeout = { type = "number", default = 10000 },
    keepalive = { type = "number", default = 60000 },
    producer_request_acks = { type = "number", default = 1, enum = { - 1.0.1 } },
    producer_request_timeout = { type = "number", default = 2000 },
    producer_request_limits_messages_per_request = { type = "number", default = 200 },
    producer_request_limits_bytes_per_request = { type = "number", default = 1048576 },
    producer_request_retries_max_attempts = { type = "number", default = 10 },
    producer_request_retries_backoff_timeout = { type = "number", default = 100 },
    producer_async = { type = "boolean", default = true },
    producer_async_flush_timeout = { type = "number", default = 1000 },
    producer_async_buffering_limits_messages_in_memory = { type = "number", default = 50000 },
  },
  self_check = regenerate_uuid,
}
Copy the code

This is the plug-in configuration page related to the code, which

local function regenerate_uuid(schema, plugin_t, dao, is_updating)
  plugin_t.uuid = utils.uuid()
  return true
end
Copy the code

The self_check property has been removed in the new version of Kong and replaced with entity_checks, so the regenerate_uUID function will not be executed, so the conf.uuid problem is best solved here.

types.lua

--- Parses `host:port` string into a `{host: ... , port: ... }` table.
function _M.bootstrap_server(string)
  local m = re_match(string, bootstrap_server_regex, "jo")
  if not m then
    return nil."invalid bootstrap server value: ".string
  end
  return { host = m[1], port = m[2]}end
return _M
Copy the code

This is the types. Bootstrap_server (value) called earlier in productors. lua to verify that the bootstrap_server configured is a valid IP +port attribute.