diff --git a/src/Kafka/Consumer/Assignment.php b/src/Kafka/Consumer/Assignment.php index b8044e68..1165437c 100644 --- a/src/Kafka/Consumer/Assignment.php +++ b/src/Kafka/Consumer/Assignment.php @@ -43,6 +43,8 @@ class Assignment private $offsets = array(); + private $lastOffsets = array(); + private $fetchOffsets = array(); private $consumerOffsets = array(); @@ -166,6 +168,22 @@ public function getOffsets() return $this->offsets; } + // }}} + // {{{ public function setLastOffsets() + + public function setLastOffsets($offsets) + { + $this->lastOffsets = $offsets; + } + + // }}} + // {{{ public function getOffsets() + + public function getLastOffsets() + { + return $this->lastOffsets; + } + // }}} // {{{ public function setFetchOffsets() @@ -211,6 +229,9 @@ public function setConsumerOffset($topic, $part, $offset) public function getConsumerOffset($topic, $part) { + if (!isset($this->consumerOffsets[$topic][$part])) { + return false; + } return $this->consumerOffsets[$topic][$part]; } @@ -262,6 +283,19 @@ public function setPrecommitOffset($topic, $part, $offset) $this->precommitOffsets[$topic][$part] = $offset; } + // }}} + // {{{ public function clearOffset() + + public function clearOffset() + { + $this->offsets = array(); + $this->lastOffsets = array(); + $this->fetchOffsets = array(); + $this->consumerOffsets = array(); + $this->commitOffsets = array(); + $this->precommitOffsets = array(); + } + // }}} // }}} } diff --git a/src/Kafka/Consumer/Process.php b/src/Kafka/Consumer/Process.php index db4c1418..d6066c2a 100644 --- a/src/Kafka/Consumer/Process.php +++ b/src/Kafka/Consumer/Process.php @@ -39,6 +39,8 @@ class Process protected $isRunning = true; + protected $messages = array(); + // }}} // {{{ functions // {{{ public function __construct() @@ -442,7 +444,6 @@ protected function offset() if (!$connect) { return; } - $resetOffset = \Kafka\ConsumerConfig::getInstance()->getOffsetReset(); $data = array(); foreach ($topicList as $topic) { $item = array( @@ -453,7 +454,7 @@ protected function offset() $item['partitions'][] = array( 'partition_id' => $partId, 'offset' => 1, - 'time' => ($resetOffset == 'latest') ? -1 : -2, + 'time' => -1, ); $data[] = $item; } @@ -481,6 +482,7 @@ public function succOffset($result, $fd) //$this->debug($msg); $offsets = \Kafka\Consumer\Assignment::getInstance()->getOffsets(); + $lastOffsets = \Kafka\Consumer\Assignment::getInstance()->getLastOffsets(); foreach ($result as $topic) { foreach ($topic['partitions'] as $part) { if ($part['errorCode'] != 0) { @@ -488,10 +490,12 @@ public function succOffset($result, $fd) break 2; } - $offsets[$topic['topicName']][$part['partition']] = $part['offsets'][0]; + $offsets[$topic['topicName']][$part['partition']] = end($part['offsets']); + $lastOffsets[$topic['topicName']][$part['partition']] = $part['offsets'][0]; } } \Kafka\Consumer\Assignment::getInstance()->setOffsets($offsets); + \Kafka\Consumer\Assignment::getInstance()->setLastOffsets($lastOffsets); $this->state->succRun(\Kafka\Consumer\State::REQUEST_OFFSET, $fd); } @@ -554,8 +558,17 @@ public function succFetchOffset($result) $assign->setFetchOffsets($offsets); $consumerOffsets = $assign->getConsumerOffsets(); + $lastOffsets = $assign->getLastOffsets(); if (empty($consumerOffsets)) { - $assign->setConsumerOffsets($assign->getFetchOffsets()); + $consumerOffsets = $assign->getFetchOffsets(); + foreach ($consumerOffsets as $topic => $value) { + foreach ($value as $partId => $offset) { + if (isset($lastOffsets[$topic][$partId]) && $lastOffsets[$topic][$partId] > $offset) { + $consumerOffsets[$topic][$partId] = $offset + 1; + } + } + } + $assign->setConsumerOffsets($consumerOffsets); $assign->setCommitOffsets($assign->getFetchOffsets()); } $this->state->succRun(\Kafka\Consumer\State::REQUEST_FETCH_OFFSET); @@ -566,6 +579,7 @@ public function succFetchOffset($result) protected function fetch() { + $this->messages = array(); $context = array(); $broker = \Kafka\Broker::getInstance(); $topics = \Kafka\Consumer\Assignment::getInstance()->getTopics(); @@ -624,10 +638,14 @@ public function succFetch($result, $fd) } $offset = $assign->getConsumerOffset($topic['topicName'], $part['partition']); + if ($offset === false) { + return; // current is rejoin.... + } foreach ($part['messages'] as $message) { - if ($this->consumer != null) { - call_user_func($this->consumer, $topic['topicName'], $part['partition'], $message); - } + $this->messages[$topic['topicName']][$part['partition']][] = $message; + //if ($this->consumer != null) { + // call_user_func($this->consumer, $topic['topicName'], $part['partition'], $message); + //} $offset = $message['offset']; } @@ -694,10 +712,21 @@ public function succCommit($result) foreach ($topic['partitions'] as $part) { if ($part['errorCode'] != 0) { $this->stateConvert($part['errorCode']); - break 2; + return; // not call user consumer function } } } + + foreach ($this->messages as $topic => $value) { + foreach ($value as $part => $messages) { + foreach ($messages as $message) { + if ($this->consumer != null) { + call_user_func($this->consumer, $topic, $part, $message); + } + } + } + } + $this->messages = array(); } // }}} @@ -725,23 +754,29 @@ protected function stateConvert($errorCode, $context = null) \Kafka\Protocol::UNKNOWN_MEMBER_ID, ); + $assign = \Kafka\Consumer\Assignment::getInstance(); if (in_array($errorCode, $recoverCodes)) { $this->state->recover(); + $assign->clearOffset(); return false; } if (in_array($errorCode, $rejoinCodes)) { if ($errorCode == \Kafka\Protocol::UNKNOWN_MEMBER_ID) { - $assign = \Kafka\Consumer\Assignment::getInstance(); $assign->setMemberId(''); } + $assign->clearOffset(); $this->state->rejoin(); return false; } if (\Kafka\Protocol::OFFSET_OUT_OF_RANGE == $errorCode) { - $assign = \Kafka\Consumer\Assignment::getInstance(); - $offsets = $assign->getOffsets(); + $resetOffset = \Kafka\ConsumerConfig::getInstance()->getOffsetReset(); + if ($resetOffset == 'latest') { + $offsets = $assign->getLastOffsets(); + } else { + $offsets = $assign->getOffsets(); + } list($topic, $partId) = $context; if (isset($offsets[$topic][$partId])) { $assign->setConsumerOffset($topic, $partId, $offsets[$topic][$partId]); diff --git a/src/Kafka/Consumer/State.php b/src/Kafka/Consumer/State.php index df32ffc6..6b77dc8c 100644 --- a/src/Kafka/Consumer/State.php +++ b/src/Kafka/Consumer/State.php @@ -69,7 +69,7 @@ class State 'interval' => 2000, ), self::REQUEST_COMMIT_OFFSET => array( - 'interval' => 2000, + 'norepeat' => true, ), ); @@ -130,6 +130,9 @@ public function init() public function start() { foreach ($this->requests as $request => $option) { + if (isset($option['norepeat']) && $option['norepeat']) { + continue; + } $interval = isset($option['interval']) ? $option['interval'] : 200; \Amp\repeat(function ($watcherId) use ($request, $option) { if ($this->checkRun($request) && $option['func'] != null) { @@ -177,6 +180,16 @@ public function succRun($key, $context = null) $this->callStatus[$key]['status'] = (self::STATUS_LOOP | self::STATUS_FINISH); break; case self::REQUEST_OFFSET: + if (!isset($this->callStatus[$key]['context'])) { + $this->callStatus[$key]['status'] = (self::STATUS_LOOP | self::STATUS_FINISH); + break; + } + unset($this->callStatus[$key]['context'][$context]); + $contextStatus = $this->callStatus[$key]['context']; + if (empty($contextStatus)) { + $this->callStatus[$key]['status'] = (self::STATUS_LOOP | self::STATUS_FINISH); + } + break; case self::REQUEST_FETCH: if (!isset($this->callStatus[$key]['context'])) { $this->callStatus[$key]['status'] = (self::STATUS_LOOP | self::STATUS_FINISH); @@ -186,6 +199,7 @@ public function succRun($key, $context = null) $contextStatus = $this->callStatus[$key]['context']; if (empty($contextStatus)) { $this->callStatus[$key]['status'] = (self::STATUS_LOOP | self::STATUS_FINISH); + call_user_func($this->requests[self::REQUEST_COMMIT_OFFSET]['func']); } break; } @@ -350,6 +364,17 @@ protected function checkRun($key) return false; case self::REQUEST_HEARTGROUP: case self::REQUEST_OFFSET: + if (($status & self::STATUS_PROCESS) == self::STATUS_PROCESS) { + return false; + } + $syncStatus = $this->callStatus[self::REQUEST_SYNCGROUP]['status']; + if (($syncStatus & self::STATUS_FINISH) != self::STATUS_FINISH) { + return false; + } + if (($status & self::STATUS_LOOP) == self::STATUS_LOOP) { + return true; + } + return false; case self::REQUEST_FETCH_OFFSET: if (($status & self::STATUS_PROCESS) == self::STATUS_PROCESS) { return false; @@ -358,12 +383,15 @@ protected function checkRun($key) if (($syncStatus & self::STATUS_FINISH) != self::STATUS_FINISH) { return false; } + $offsetStatus = $this->callStatus[self::REQUEST_OFFSET]['status']; + if (($offsetStatus & self::STATUS_FINISH) != self::STATUS_FINISH) { + return false; + } if (($status & self::STATUS_LOOP) == self::STATUS_LOOP) { return true; } return false; case self::REQUEST_FETCH: - case self::REQUEST_COMMIT_OFFSET: if (($status & self::STATUS_PROCESS) == self::STATUS_PROCESS) { return false; } @@ -371,6 +399,10 @@ protected function checkRun($key) if (($fetchOffsetStatus & self::STATUS_FINISH) != self::STATUS_FINISH) { return false; } + $commitOffsetStatus = $this->callStatus[self::REQUEST_COMMIT_OFFSET]['status']; + if (($commitOffsetStatus & self::STATUS_PROCESS) == self::STATUS_PROCESS) { + return false; + } if (($status & self::STATUS_LOOP) == self::STATUS_LOOP) { return true; }