First, preparation

Kafka is written in the Java/Scala language, but that doesn’t prevent it from supporting multiple languages. Kafka supports C/C++, PHP, Python, Go, and more at CLIENTS.

PHP operating Kafka requires the librdKafka library and Kafka PHP extension to be installed. 1. Install librdkafka

git clone https://github.com/edenhill/librdkafka.git
./configure
 make
 sudo make installCopy the code

2. Install phP-kafka extension

$ git clone https://github.com/arnaud-lb/php-rdkafka.git
$ cd librdkafka/
$ phpize
$ ./configure 
$ make
$ sudo make install

Configure the rdkafka extension in the php.ini file
extension=rdkafka.so

# check whether the extension works
php -m | grep kafkaCopy the code

Two, code implementation

Demo is from github.com/arnaud-lb/p…

The normal production logic is as follows: 1. Configure producer client parameters and create corresponding producer instances.

/**
 * Create a producer
 */
$conf = new RdKafka\Conf();
$conf->set('log_level', LOG_DEBUG);
//$conf->set('debug'.'all');
$rk = new RdKafka\Producer($conf);
$rk->addBrokers("127.0.0.1");Copy the code

2. Build a theme;

/**
 * Create a topic instance from the producer
 */
$topic = $rk->newTopic("test");Copy the code

3. Send a message.

/**
 * Producing messages
 * The first argument is the partition. RD_KAFKA_PARTITION_UA stands forUnassigned, and lets librdkafka choose the partition. * The first parameter is partition. RD_KAFKA_PARTITION_UA indicates that the partition is not assigned and librdkafka chooses the partition. * The second argument are message flags and should be either 0 or RD_KAFKA_MSG_F_BLOCK to block produce on full queue. * The second parameter is the message flag, which is 0 or RD_KAFKA_MSG_F_BLOCK, to prevent the production of messages when the queue is full. * The message payload can be anything. * /$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message payload");Copy the code

4. Close the producer instance.

/**
 * Proper shutdown
 * This should be done prior to destroying a producer instance
 *   to make sure all queued and in- Flight produce requests are completed before terminating. * Ensure that all production requests in the queue and in production have been completed before closing the producer instance. * Not calling flush can lead to message loss! Failure to call Flush results in message loss! * /$timeout_ms = 60000; // 1 minute
$rk->flush($timeout_ms);Copy the code

Verify that the message was sent successfully terminal opens a consumer:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testCopy the code

In another window, execute PHP producer.php,

You can see that the consumer terminal receives the message.

The complete code is as follows:

<? php /** * Created by PhpStorm. * User: liulu * Date: 2020/1/1 * Time: 18:38 */ /** * Create a producer */$conf = new RdKafka\Conf();
$conf->set('log_level', LOG_DEBUG);
//$conf->set('debug'.'all');
$rk = new RdKafka\Producer($conf);
$rk->addBrokers("127.0.0.1");

/**
 * Create a topic instance from the producer
 */
$topic = $rk->newTopic("test");

/**
 * Producing messages
 * The first argument is the partition. RD_KAFKA_PARTITION_UA stands forUnassigned, and lets librdkafka choose the partition. * The first parameter is partition. RD_KAFKA_PARTITION_UA indicates that the partition is not assigned and librdkafka chooses the partition. * The second argument are message flags and should be either 0 or RD_KAFKA_MSG_F_BLOCK to block produce on full queue. * The second parameter is the message flag, which is 0 or RD_KAFKA_MSG_F_BLOCK, to prevent the production of messages when the queue is full. * The message payload can be anything. * /$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message payload");

/**
 * Proper shutdown
 * This should be done prior to destroying a producer instance
 *   to make sure all queued and in- Flight produce requests are completed before terminating. * Ensure that all production requests in the queue and in production have been completed before closing the producer instance. * Not calling flush can lead to message loss! Failure to call Flush results in message loss! * /$timeout_ms = 60000; // 1 minute
$rk->flush($timeout_ms);

echo 'finished';exit;Copy the code

I hope the above content can help you. Many PHPer will encounter some problems and bottlenecks when they are advanced, and they have no sense of direction when writing too many business codes. I have sorted out some information, including but not limited to: Distributed architecture, high scalability, high performance, high concurrency, server performance tuning, TP6, Laravel, YII2, Redis, Swoft, Kafka, Mysql optimization, shell scripting, Docker, microservices, Nginx, etc. Many knowledge points are free to share with you