Skip to content
Advertisement

PHP Async Multi-threaded Curl Application

I’m looking to build a process/script that can handle at least 300-400 transaction per second. Currently i’m using Workerman to do the following job. I can run without any issue with 400 threads , but the tps was around 60-70 tps it can process with latency of less than a second.

Below is the working code :-

main.php:-

<?php
require_once __DIR__ . '/vendor/autoload.php';
use WorkermanWorker;

$http_worker = new Worker('http://0.0.0.0:2345');

$http_worker->count = 400;
$http_worker->onMessage = function ($connection, $request) {
//Config
    $connection->send("");
    $url = 'http://localhost:3000';
    $packageid=11;
    $payload = $request->post();
    $temp_payload = implode("|",$payload);
    list($id,$user,$package_id,$timestamp) = explode('|',$temp_payload);

    $date_request_first=date('Y-m-d H:i:s');// PHP Worker current casting timestamp
    $date_compare1= date("Y-m-d h:i:s a", strtotime($date_request_first));
    
    $xml_get_subscriber='<?xml version="1.0" encoding="UTF-8"?><SOAP-ENV:Envelope xmlns:SOAP-ENV="http://schemas.xmlsoap.org/soap/envelope/" xmlns:SOAP-ENC="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" ><SOAP-ENV:Body>
    <ns2:root>
        <msg_head>
            <time>2020-08-20 17:57:29</time>
            <from />
            <to />
            <msg_type />
            <serial />
        </msg_head>
        <interface_msg>
            <msg_response>
                <ResponseClass Name="Response">
                    <GetUserClass Name="AJAX">
                        <ResultCode>0</ResultCode>
                        <ResultDescr>success</ResultDescr>
                        <IBAN>'.$id.'</IBAN>
                        <PREFERNOTIFYMETHOD>1</PREFERNOTIFYMETHOD>
                    </GetUserClass>
                </ResponseClass>
            </msg_response>
        </interface_msg>
    </ns2:root>
</SOAP-ENV:Body>
</SOAP-ENV:Envelope>';

$doc = new DOMDocument();
$doc->loadXML(getURLContent($url,$id,$msisdn,$package_id,$provisioning_recipe));
//$doc->loadXML($result_provisioning);

$xpath = new DOMXPath($doc);

foreach ($xpath->query("//ResultCode/text()")  as $package) {
    $resultCode = $package->textContent;
}
if($resultCode == 0 && $resultDesc ="success"){


$doc = new DOMDocument();
$doc->loadXML(getURLContent($url,$id,$msisdn,$package_id,$xml_get_subscriber_info));

$xpath = new DOMXPath($doc);

foreach ($xpath->query("//PACKAGEID/text()")  as $match1) {
    $PAK_checking = $match1->textContent;
}

$myArrayPak = explode('$', $PAK_checking);
$key_value = array_search($packageid,$myArrayPak);
        if($key_value)
        {
            
        $conf = new RdKafkaConf();
        $conf->set('metadata.broker.list', '192.168.0.16:9092');
        
        $producer = new RdKafkaProducer($conf);
        $topic = $producer->newTopic("successful-request");
        $produce_date =date('Y-m-d H:i:s');
        
        $ar=date_create($myArrayEndDate[$key_value]);
        $final_date = date_format($ar,"Y-m-d H:i:s");
        $toStore="$id;$msisdn;$package_id;1;$final_date";
        echo "PackageID = $myArrayPak[$key_value]  , End-Date = $final_daten";
        $topic->produce(RD_KAFKA_PARTITION_UA,0, "$toStore");
        $producer->poll(0);
        
        for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
            $result = $producer->flush(10000);
            if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
                break;
            }
        }
        if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
            throw new RuntimeException('Was unable to flush, messages might be lost!');
        }   


}
}


function getUrlContent($url,$uid,$msisdn,$pckg,$xml_get_subscriber_info){
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, $url);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, 5);
curl_setopt($ch, CURLOPT_TIMEOUT, 5);
curl_setopt($ch, CURLOPT_POSTFIELDS, $xml_get_subscriber_info);
$data = curl_exec($ch);
$httpcode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
curl_close($ch);
//return ($httpcode>=200 && $httpcode<300) ? $data : false;
if($httpcode!= 200)
{
    $conf = new RdKafkaConf();
        $conf->set('metadata.broker.list', '192.168.0.17:9092');
        
        $producer = new RdKafkaProducer($conf);
        $topic = $producer->newTopic("failed-request");
        $produce_date =date('Y-m-d H:i:s');
        
        $toStore="$uid;$msisdn;$pckg;$produce_date";
        $topic->produce(RD_KAFKA_PARTITION_UA,0, "$toStore");
        $producer->poll(0);
        
        for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
            $result = $producer->flush(10000);
            if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
                break;
            }
        }
        if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
            throw new RuntimeException('Was unable to flush, messages might be lost!');
        }   
}
else
{
   return $data;
}

}



}


// run all workers
Worker::runAll();
?>

Now when I increase the thread to 800, the issue starts :-

Notice: Undefined variable: PAK in test.php.php on line 77
PHP Warning:  DOMDocument::loadXML(): Empty string supplied as input in test.php.php on line 69

Warning: DOMDocument::loadXML(): Empty string supplied as input in test.php.php on line 69
PHP Notice:  Undefined variable: PAK in test.php.php on line 77

Notice: Undefined variable: PAK in test.php.php on line 77
PHP Warning:  DOMDocument::loadXML(): Empty string supplied as input in test.php.php on line 166

Warning: DOMDocument::loadXML(): Empty string supplied as input in test.php.php on line 166
PHP Notice:  Undefined variable: resultCode in test.php.php on line 182

Notice: Undefined variable: resultCode in test.php.php on line 182
PHP Warning:  DOMDocument::loadXML(): Empty string supplied as input in test.php.php on line 185

Warning: DOMDocument::loadXML(): Empty string supplied as input in test.php.php on line 185
PHP Notice:  Undefined variable: PAK_checking in test.php.php on line 197

Notice: Undefined variable: PAK_checking in test.php.php on line 197
PHP Notice:  Undefined variable: psetdatelist in test.php.php on line 198

Notice: Undefined variable: psetdatelist in test.php.php on line 198
PHP Warning:  DOMDocument::loadXML(): Empty string supplied as input in test.php.php on line 166

Warning: DOMDocument::loadXML(): Empty string supplied as input in test.php.php on line 166
PHP Notice:  Undefined variable: resultCode in test.php.php on line 182

Notice: Undefined variable: resultCode in test.php.php on line 182
PHP Warning:  DOMDocument::loadXML(): Empty string supplied as input in test.php.php on line 166

Warning: DOMDocument::loadXML(): Empty string supplied as input in test.php.php on line 166
PHP Notice:  Undefined variable: resultCode in test.php.php on line 182

Notice: Undefined variable: resultCode in test.php.php on line 182
PHP Warning:  DOMDocument::loadXML(): Empty string supplied as input in test.php.php on line 166

I check and it seems the CURL XML was empty when i increased the worker count. But when it was 400 http_worker there is no issue at all.

I’m running on 8 core CPU and 16GB RAM in a VM. My goal is to process as much request from the northbound and check against a southbound with minimum 300 TPS.

The process flow is as below :-

Client -> Main.php -> Check with southbound -> Produce to Kafka Topic

Problem : The issue that i’m facing when i increased from 400 to 800 threads , my curl response is always empty.

Is there any method i can leverage to send 300 tps minimum and XML parse it without any issue? Or is there any suggest for me to improve my process flow?

Advertisement

Answer

I found out OS Unix has limitation when it comes to handle multiple request above 200-300 tps.

I had found another alternative using guzzleHTTP , that helped me. It helped me to achieve 600 tps + with transaction latency of 2 seconds.

User contributions licensed under: CC BY-SA
10 People found this is helpful
Advertisement