Skip to content
This repository has been archived by the owner on Jun 10, 2022. It is now read-only.

Commit

Permalink
modify Producer.php syncMeta access
Browse files Browse the repository at this point in the history
  • Loading branch information
nmred committed Jun 22, 2017
1 parent aa640b8 commit 0924e90
Showing 1 changed file with 31 additions and 31 deletions.
62 changes: 31 additions & 31 deletions src/Kafka/Producer/Process.php
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,37 @@ public function setError($error)
$this->error = $error;
}

// }}}
// {{{ public function syncMeta()

public function syncMeta()
{
$this->debug('Start sync metadata request');
$brokerList = explode(',', \Kafka\ProducerConfig::getInstance()->getMetadataBrokerList());
$brokerHost = array();
foreach ($brokerList as $key => $val) {
if (trim($val)) {
$brokerHost[] = $val;
}
}
if (count($brokerHost) == 0) {
throw new \Kafka\Exception('Not set config `metadataBrokerList`');
}
shuffle($brokerHost);
$broker = \Kafka\Broker::getInstance();
foreach ($brokerHost as $host) {
$socket = $broker->getMetaConnect($host);
if ($socket) {
$params = array();
$this->debug('Start sync metadata request params:' . json_encode($params));
$requestData = \Kafka\Protocol::encode(\Kafka\Protocol::METADATA_REQUEST, $params);
$socket->write($requestData);
return;
}
}
throw new \Kafka\Exception('Not has broker can connection `metadataBrokerList`');
}

// }}}
// {{{ protected function processRequest()

Expand Down Expand Up @@ -195,37 +226,6 @@ protected function processRequest($data, $fd)
}
}

// }}}
// {{{ protected function syncMeta()

protected function syncMeta()
{
$this->debug('Start sync metadata request');
$brokerList = explode(',', \Kafka\ProducerConfig::getInstance()->getMetadataBrokerList());
$brokerHost = array();
foreach ($brokerList as $key => $val) {
if (trim($val)) {
$brokerHost[] = $val;
}
}
if (count($brokerHost) == 0) {
throw new \Kafka\Exception('Not set config `metadataBrokerList`');
}
shuffle($brokerHost);
$broker = \Kafka\Broker::getInstance();
foreach ($brokerHost as $host) {
$socket = $broker->getMetaConnect($host);
if ($socket) {
$params = array();
$this->debug('Start sync metadata request params:' . json_encode($params));
$requestData = \Kafka\Protocol::encode(\Kafka\Protocol::METADATA_REQUEST, $params);
$socket->write($requestData);
return;
}
}
throw new \Kafka\Exception('Not has broker can connection `metadataBrokerList`');
}

// }}}
// {{{ protected function produce()

Expand Down

0 comments on commit 0924e90

Please sign in to comment.