replace local instance of producer creation, by the app get existent singleton, to improve performace

This commit is contained in:
Oldemar Gonçalves 2024-09-18 10:01:57 +01:00
parent 5b4d35a909
commit 0200114331
No known key found for this signature in database
GPG Key ID: 436D5C274207477A

View File

@ -11,6 +11,8 @@ use RdKafka\Producer;
class Kafka extends BaseDatastore
{
private const PRODUCER_MAXIMUM_QUEUE_ACUMULATED_MESSAGES = 100;
public function getName()
{
return 'Kafka';
@ -44,7 +46,9 @@ class Kafka extends BaseDatastore
public function put($device, $measurement, $tags, $fields)
{
try {
$producer = Kafka::createFromConfig();
// get the singleton instance of the produced
/** @var Producer $producer */
$producer = app('RdKafka\Producer');
$topic = $producer->newTopic(Kafka::getTopicName());
$device_data = DeviceCache::get($device['device_id']);
@ -78,7 +82,7 @@ class Kafka extends BaseDatastore
}
}
if($excluded_device_fields != null && strlen($excluded_device_fields) > 0) {
if ($excluded_device_fields != null && strlen($excluded_device_fields) > 0) {
// convert into array
$excluded_device_fields_arr = explode(',', $excluded_device_fields);
}
@ -133,12 +137,16 @@ class Kafka extends BaseDatastore
$topic->produce(RD_KAFKA_PARTITION_UA, 0, $dataArr);
$producer->poll(0);
// flush remaining message in the queue
$result = $producer->flush(self::getKafkaFlushTimeout());
// flush when it reaches 100messages
if ($producer->getOutQLen() >= Kafka::PRODUCER_MAXIMUM_QUEUE_ACUMULATED_MESSAGES) {
// flush remaining message in the queue
$result = $producer->flush(self::getKafkaFlushTimeout());
if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
Log::warning('KAFKA: Was unable to flush, messages might be lost!');
if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
Log::warning('KAFKA: Was unable to flush, messages might be lost!');
}
}
} catch (\Throwable $th) {
Log::warning($th);
}