Author | Tianrun Rongtong – Qian Wenjin

Introduction: Tianrun Rongtong is a Cloud call center service provider, among which CTI-Cloud provides efficient and stable call center services for a large number of head customers. Now, Tianrun extends the functions of CTI-Cloud to mobile terminals through T-Phone SDK to provide customers with mobile terminal call services.

Application scenarios

In The T-Phone SDK of Tianrun, we need to collect WebRTC information for data analysis and optimization suggestions, so we need to report the related logs collected in the SDK. In order to simplify the data reported by the log, we only upload the transmitted data every 5 seconds after the connection between the two parties. The jitter and connection status of the network during transmission can be displayed digitally, and the data analysis of each call can be provided to provide data support in the subsequent SDK evolution. In addition, we make operation logs for each phone call, record the operation and time when the interface is called, restore the operation records of a certain phone call for users, analyze possible misoperations, and provide better interactive experience for customers.

Since it is still in the early stage of the project, we are more concerned about the usage of users in a certain period of time and whether the call quality is still high in the scenario of mass use. Meanwhile, we should try to analyze each agent and ensure that each agent has its own data table.

For example, if we want to query WebRTC logs of a call made between 12:00-12:10 on February 14, 2020 on agent 9001 of enterprise 7000001, if the table is not divided by agent, the SQL statement should look like this:

select log_time, audio_bytes_sent,...  from aladdin.webrtc_log where device_id = '70000019001' and where log_time between 1581652800000 and 1581653400000;Copy the code

If we want to improve the speed of query, we first need to build indexes on device_id and log_time fields, but when the amount of data is relatively large, index storage will also be a problem, so we need to consider the partition table (the database we used before is AWS RDS, so there is no concept of partition database).

There are two options: by time table or by agent table. Why do we have a seating schedule? If the table is divided by time, there will be a large difference in the amount of data in different tables, and even there is no data in a table, because few people make outbound calls at midnight. But we can’t be so arbitrary not to set up a schedule for midnight time, in case a person is making international calls? But nothing more than a position may be not shout, and for moving the applications, we considering problems more is via a position feedback the problem to us, we will have a screen for this position, so the query device_id reflect of this field is must, if carried out in accordance with the device_id table, We no longer need to index this field when querying. Therefore, we choose to divide the table according to the seats.

If we want to use a traditional database to do a table, we must check whether the table exists before inserting data, and we also need to create the table in advance. This kind of procedure seems to me very weak. It would be much easier if you had a database that could specify the table name when inserting data, insert it if it exists, and create the table automatically if it doesn’t exist.

This section describes the overall process for reporting logs

The entire process requires the T-Phone SDK, the CTI-Cloud Interface module (an Interface open to customers), and the log reporting module to work together

design

The log reporting frequency is high, so the REQUIREMENTS on I/O throughput are high. We can collect data asynchronously. Vert.x was used as a tool for all asynchronous project development.

For data storage, we chose TDengine based on the following considerations:

1. WebRTC logs and operation logs are data flows generated based on time. TDengine happens to be a sequential database designed specifically for structured data flow in the Internet of Things.

2. WebRTC logs and operation logs are stored in the same format. To ensure that all agents can be analyzed, it is recommended that each agent have its own data table. TDengine provides super tables in which data structures are defined and separated by tags, as long as you specify the table name when inserting data. Obviously solved the chicken rib problem mentioned above. According to the TDengine website:

To take full advantage of the timing and other data characteristics of its data, TDengine requires a separate table for each data collection point.

In fact, our agent is an independent data collection point, TDengine in our scenario is a good fit for business.

3. The time. In a traditional database, we need to index the fields to improve the query speed, but we still don’t want to create an index, because the index still takes up storage space. Can we replace the index with a similar way of dividing tables? The answer is yes:

Data written to TDengine is fragmented by time on hard disks. Data of tables in the same VNode in the same time range is stored in the same file group. This data fragmentation method can greatly simplify the query of data in time dimension and improve the query speed. By default, each data file on the hard disk stores 10 days of data. You can modify the system configuration parameter daysPerFile as required.

4. Insert and query fast and stable.

We tried TDengine on our development server. Not different from the official website, the query and storage speed is really fast, and does not rely on other file systems, so WE use TDengine as the storage engine of this module. As there is a length limit for columns in TDengine, the longest is 4096, and we report a lot of fields, so try to allocate the length of each field.

In the process of data collection, TPhone SDK will not interact with us directly, but will first store the data in SQS, and then we will pull the data from SQS, and then store the data after processing.

First to create a super table, TDEngine provides a super table in my opinion is very convenient, we can directly use the super table to achieve automatic data storage table.

create database aladdin;use aladdin;create table webrtc_log( createTime timestamp, deviceId binary(100), audioBytesSent bigint, audioBytesReceived bigint, ... ssrcSendGoogCurrentDelayMs int, ssrcSendGoogJitterBufferMs int) tags (  deviceIdTag binary(100));Copy the code

TDengine provides many connection methods. In order to better cooperate with Vertx for asynchronous storage, we use Rest mode for database operation here.

start

After we had the overall idea, we started to develop it:

1. Application Configuration:

{  "aws.region": "<your aws region>"."aws.accessKey": "<your aws ak>"."aws.secretAccessKey": "<your aws sk>"."aladdin.maxPool": 100,  "aladdin.maxWaitQueue": 1500,  "aladdin.queue.name": ["queuename1"."queuename2"]."aladdin.cache.expireAfterWrite": 30."aladdin.cache.expireAfterAccess": 30."tdengine.host": "<your tdengine host>"."tdengine.port": 6020,  "tdengine.user": "root"."tdengine.password": "<your tdengine password>"}Copy the code

2. Rewrite the Launcher

import com.tinet.twatch.aladdin.config.Configurer; import io.vertx.core.DeploymentOptions; import io.vertx.core.Launcher; import io.vertx.core.Vertx; import io.vertx.core.json.JsonObject; import io.vertx.core.logging.SLF4JLogDelegateFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author qianwj * @since v1.0 */public class AladdinLauncher extends Launcher {private static Configurer configurer = new Configurer(); private Logger logger = LoggerFactory.getLogger(AladdinLauncher.class); public static void main(String[] args) { System.setProperty("vertx.logger-delegate-factory-class-name", SLF4JLogDelegateFactory.class.getName());    new AladdinLauncher().dispatch(args);  }  @Override  public void beforeDeployingVerticle(DeploymentOptions deploymentOptions) {    logger.info("Loading config starting...");    JsonObject config = configurer.load();    JsonObject local = deploymentOptions.getConfig();    if(! Config.isempty ()) {// Inject the Consul configuration into the context local.mergein (config); deploymentOptions.setConfig(local);    }    super.beforeDeployingVerticle(deploymentOptions);    logger.info("Loading config completed, config: {}", deploymentOptions.getConfig());  }  @Override  public void afterConfigParsed(JsonObject config) {    logger.info("Loading local config complete, local config: {}", config.encodePrettily());  }  @Override  public void handleDeployFailed(Vertx vertx, String mainVerticle, DeploymentOptions deploymentOptions, Throwable cause) {    logger.error("Deploy verticle occur exception: {}, App will be closed immediately!", cause.getLocalizedMessage(), cause);    vertx.close();  }}Copy the code

In fact, the configuration file is not necessary after the second step. We use Consul as the configuration center for centralized configuration. This step is mainly for injecting Consul configuration and loading logs.

3. Pull data from SQS

import com.amazonaws.AmazonServiceException; import com.tinet.ctilink.yun.entity.YunMessage; import com.tinet.twatch.aladdin.service.AwsSQSService; import com.tinet.twatch.aladdin.config.Configurer; import io.vertx.core.AbstractVerticle; import io.vertx.core.eventbus.EventBus; import io.vertx.core.json.Json; import io.vertx.core.json.JsonArray; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; public class DataCollectVerticle extends AbstractVerticle { private Logger logger = LoggerFactory.getLogger(DataCollectVerticle.class); private volatile boolean shutdown =false;  @Override  public void start() throws Exception {    logger.info("DataCollectVerticle starting...");    AwsSQSService sqsService = Configurer.sqsService();    EventBus bus = vertx.eventBus();    vertx.setPeriodic(1000, id -> {      try {        if (shutdown) {          vertx.cancelTimer(id);        }        JsonArray array = config().getJsonArray(Configurer.QUEUE_URL);        List<YunMessage> msgs = sqsService.receiveMessageAndDelete(array.getString(0));        List<YunMessage> userActionMsgs = sqsService.receiveMessageAndDelete(array.getString(1));        bus.send(Configurer.CHANNEL_ADDRESS, Json.encode(msgs));      } catch (AmazonServiceException e) {        logger.warn("msgs received failed, cause: {}", e.getLocalizedMessage(), e); }}); } @Override public void stop() throws Exception { shutdown =true;    logger.info("DataCollectVerticle closing...");  }}Copy the code

4. Store data in TDengine

import com.github.benmanes.caffeine.cache.Cache; import com.tinet.ctilink.yun.entity.YunMessage; import com.tinet.twatch.aladdin.DataOperator; import com.tinet.twatch.aladdin.config.Configurer; import com.tinet.twatch.aladdin.model.WebRTCLog; import io.vertx.core.AbstractVerticle; import io.vertx.core.Handler; import io.vertx.core.buffer.Buffer; import io.vertx.core.eventbus.EventBus; import io.vertx.core.eventbus.Message; import io.vertx.core.json.Json; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import io.vertx.ext.web.client.HttpResponse; import io.vertx.ext.web.client.WebClient; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.*; public class SaveVerticle extends AbstractVerticle { private Logger logger = LoggerFactory.getLogger(SaveVerticle.class); @Override public void start() throws Exception { logger.info("SaveVerticle starting...."); EventBus bus = vertx.eventbus (); bus.consumer(Configurer.CHANNEL_ADDRESS, (Handler<Message<String>>) msg -> { JsonArray coming = new JsonArray(msg.body());if(coming ! = null) save(coming); }); } private void save(JsonArray array) { WebClient client = Configurer.tdClient(); List<WebRTCLog> data = new ArrayList<>(); Cache<String, WebRTCLog> cache = Configurer.cache();if (array.size() > 0) {      final WebRTCLog empty = new WebRTCLog();      for (int i = 0; i < array.size(); i++) {        String message = array.getJsonObject(i).mapTo(YunMessage.class).getBody();        try {          JsonObject json = DataOperator.toJsonObject(message);          WebRTCLog log = json.mapTo(WebRTCLog.class);          String cacheKey = log.getDeviceId();          WebRTCLog org = cache.get(cacheKey, k -> empty);          if(! Objects.equals(org, empty)) {// If the dataoperator.merge (log, org);          }          cache.put(cacheKey, log);          data.add(log);        } catch (Exception e) {          logger.error("log saved failed, cause: {}", e.getLocalizedMessage(), e);        }      }      client.post("/rest/sql")        .basicAuthentication(config().getString("tdengine.user"), config().getString("tdengine.password"))        .sendBuffer(insert(data), ar -> {            if (ar.succeeded()) {                HttpResponse<Buffer> response = ar.result();                if(response ! = null) { JsonObject res = response.bodyAsJsonObject();if (!"succ".equals(res.getString("status"))) {                        logger.warn("data insert failed! data: {}, cause: {}", Json.encode(data), res.getString("desc")); }}}else {                logger.error("data insert failed! {}", Json.encode(data), ar.cause()); }}); } } private Buffer insert(WebRTCLoglog) throws Exception {    String formatter = "INSERT INTO ALADDIN.WEBRTC_LOG_%s " +                       " USING ALADDIN.WEBRTC_LOG TAGS(%s) " +                       "VALUES(%s)";    String sql = String.format(formatter, log.getDeviceId(), log.getDeviceId(), DataOperator.valToSql(log));    return Buffer.buffer(sql);  }  private Buffer insert(List<WebRTCLog> data) throws IllegalAccessException {    StringBuilder sqlBuilder = new StringBuilder("INSERT INTO ");    String formatter = "ALADDIN.WEBRTC_LOG_%s USING ALADDIN.WEBRTC_LOG TAGS(%s) VALUES(%s) ";    for (WebRTCLog log : data) {      sqlBuilder.append(String.format(formatter, log.getDeviceId(), log.getDeviceId(), DataOperator.valToSql(log)));    }    return Buffer.buffer(sqlBuilder.toString());  }  @Override  public void stop() throws Exception {    logger.info("SaveVerticle closing....");  }}Copy the code

5. Verticle deployment

import com.tinet.twatch.aladdin.config.Configurer; import com.tinet.twatch.aladdin.verticle.DataCollectVerticle; import com.tinet.twatch.aladdin.verticle.SaveVerticle; import io.vertx.core.AbstractVerticle; import io.vertx.core.DeploymentOptions; import io.vertx.core.Promise; import io.vertx.core.logging.Logger; import io.vertx.core.logging.LoggerFactory; import io.vertx.ext.web.client.WebClientOptions; public class MainVerticle extends AbstractVerticle { private Logger logger = LoggerFactory.getLogger(MainVerticle.class); @Override public void start(Promise<Void> startPromise) throws Exception { logger.info("MainVerticle starting..."); SQS String region = config().getString()"aws.region");    String accessKey = config().getString("aws.accessKey");    String secretKey = config().getString("aws.secretAccessKey");    Configurer.initSQSService(region, accessKey, secretKey, config().getJsonArray(Configurer.QUEUE_URL));    DeploymentOptions dataCollectDeploymentOptions = new DeploymentOptions();    dataCollectDeploymentOptions.setInstances(1);    dataCollectDeploymentOptions.setConfig(config());    dataCollectDeploymentOptions.setWorker(true);    Configurer.initCache(config().getInteger("aladdin.cache.expireAfterWrite"), config().getInteger("aladdin.cache.expireAfterAccess"));    vertx.deployVerticle(DataCollectVerticle.class.getName(), dataCollectDeploymentOptions, ar -> {      if (ar.succeeded()) {        logger.info("DataCollectVerticle started!");      } else {        logger.warn("DataCollectVerticle deploy failed! {}", ar.cause().getLocalizedMessage(), ar.cause()); }}); // Initialize webClient WebClientOptions options = new WebClientOptions(); options.setMaxWaitQueueSize(config().getInteger("aladdin.maxWaitQueue"));    options.setMaxPoolSize(config().getInteger("aladdin.maxPool"));    options.setDefaultHost(config().getString("tdengine.host"));    options.setDefaultPort(config().getInteger("tdengine.port"));    Configurer.initTDClient(vertx, options);        DeploymentOptions saveDeploymentOptions = new DeploymentOptions();    saveDeploymentOptions.setInstances(1);    saveDeploymentOptions.setConfig(config());    vertx.deployVerticle(SaveVerticle.class.getName(), saveDeploymentOptions, ar -> {      if (ar.succeeded()) {        logger.info("SaveVerticle started!");      } else {        logger.warn("SaveVerticle deploy failed!"); }}); }}Copy the code

This enables a log reporting module to be implemented quickly and without affecting each other when multiple instances are deployed, although in a real production environment we would need to consider more.

Of course, log reporting is just the beginning. In the future project development, I will continue to introduce TDengine’s application practice in data analysis. Thank you for watching.

About the author: Qian Wenjin, R&D engineer of Basic R&D Department of Tianrun Rongtong, a fan of open source community. Currently, he is mainly responsible for the development and application of T-Phone SDK/CTI-Cloud.

This article was originally published at: blog.ti-net.com.cn/tdenginezwe…