From c6833e34236c9fb76d0ed84ed75544a5e9be9618 Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Wed, 11 May 2022 21:24:51 -0500 Subject: [PATCH] Refactoring of HTTP/2 connection processor Reduced complexity of sending request body. Fixed unsubscribing from cancellation token if sending request body fails. Simplified Response object creation. Eliminated idle connection ping (was broken before, could restore if deemed necessary, some servers disconnect on client ping). --- src/Connection/ConnectionFactory.php | 2 +- src/Connection/DefaultConnectionFactory.php | 14 +- src/Connection/Http2Connection.php | 5 +- .../Internal/Http2ConnectionProcessor.php | 587 +++++++----------- src/Connection/Internal/Http2Stream.php | 43 +- test/Connection/Http2ConnectionTest.php | 6 +- 6 files changed, 243 insertions(+), 414 deletions(-) diff --git a/src/Connection/ConnectionFactory.php b/src/Connection/ConnectionFactory.php index 7b92134d..a7ccf231 100644 --- a/src/Connection/ConnectionFactory.php +++ b/src/Connection/ConnectionFactory.php @@ -17,5 +17,5 @@ interface ConnectionFactory * Additionally, the factory may invoke {@see EventListener::startDnsResolution()} and * {@see EventListener::completeDnsResolution()}, but is not required to implement such granular events. */ - public function create(Request $request, Cancellation $cancellationToken): Connection; + public function create(Request $request, Cancellation $cancellation): Connection; } diff --git a/src/Connection/DefaultConnectionFactory.php b/src/Connection/DefaultConnectionFactory.php index ca34c4e2..268c9532 100644 --- a/src/Connection/DefaultConnectionFactory.php +++ b/src/Connection/DefaultConnectionFactory.php @@ -29,7 +29,7 @@ public function __construct(?Socket\SocketConnector $connector = null, ?ConnectC public function create( Request $request, - Cancellation $cancellationToken + Cancellation $cancellation ): Connection { foreach ($request->getEventListeners() as $eventListener) { $eventListener->startConnectionCreation($request); @@ -103,7 +103,7 @@ public function create( $socket = $connector->connect( 'tcp://' . $authority, $connectContext->withConnectTimeout($request->getTcpConnectTimeout()), - $cancellationToken + $cancellation ); } catch (Socket\ConnectException $e) { throw new UnprocessedRequestException( @@ -111,7 +111,7 @@ public function create( ); } catch (CancelledException $e) { // In case of a user cancellation request, throw the expected exception - $cancellationToken->throwIfRequested(); + $cancellation->throwIfRequested(); // Otherwise we ran into a timeout of our TimeoutCancellation throw new UnprocessedRequestException(new TimeoutException(\sprintf( @@ -138,7 +138,7 @@ public function create( } $tlsCancellation = new CompositeCancellation( - $cancellationToken, + $cancellation, new TimeoutCancellation($request->getTlsHandshakeTimeout()) ); @@ -159,7 +159,7 @@ public function create( $socket->close(); // In case of a user cancellation request, throw the expected exception - $cancellationToken->throwIfRequested(); + $cancellation->throwIfRequested(); // Otherwise we ran into a timeout of our TimeoutCancellation throw new UnprocessedRequestException(new TimeoutException(\sprintf( @@ -182,7 +182,7 @@ public function create( if ($tlsInfo->getApplicationLayerProtocol() === 'h2') { $http2Connection = new Http2Connection($socket); - $http2Connection->initialize(); + $http2Connection->initialize($cancellation); foreach ($request->getEventListeners() as $eventListener) { $eventListener->completeConnectionCreation($request); @@ -195,7 +195,7 @@ public function create( // Treat the presence of only HTTP/2 as prior knowledge, see https://http2.github.io/http2-spec/#known-http if ($request->getProtocolVersions() === ['2']) { $http2Connection = new Http2Connection($socket); - $http2Connection->initialize(); + $http2Connection->initialize($cancellation); foreach ($request->getEventListeners() as $eventListener) { $eventListener->completeConnectionCreation($request); diff --git a/src/Connection/Http2Connection.php b/src/Connection/Http2Connection.php index 5e755c9c..049ec486 100644 --- a/src/Connection/Http2Connection.php +++ b/src/Connection/Http2Connection.php @@ -11,6 +11,7 @@ use Amp\Socket\EncryptableSocket; use Amp\Socket\SocketAddress; use Amp\Socket\TlsInfo; +use Amp\TimeoutCancellation; final class Http2Connection implements Connection { @@ -36,9 +37,9 @@ public function getProtocolVersions(): array return self::PROTOCOL_VERSIONS; } - public function initialize(): void + public function initialize(?Cancellation $cancellation = null): void { - $this->processor->initialize(); + $this->processor->initialize($cancellation ?? new TimeoutCancellation(5)); } public function getStream(Request $request): ?Stream diff --git a/src/Connection/Internal/Http2ConnectionProcessor.php b/src/Connection/Internal/Http2ConnectionProcessor.php index 9d496f5a..9912bfa9 100644 --- a/src/Connection/Internal/Http2ConnectionProcessor.php +++ b/src/Connection/Internal/Http2ConnectionProcessor.php @@ -11,8 +11,6 @@ use Amp\DeferredCancellation; use Amp\DeferredFuture; use Amp\Future; -use Amp\Http\Client\Connection\Http2ConnectionException as ClientHttp2ConnectionException; -use Amp\Http\Client\Connection\Http2StreamException as ClientHttp2StreamException; use Amp\Http\Client\Connection\HttpStream; use Amp\Http\Client\Connection\Stream; use Amp\Http\Client\Connection\UnprocessedRequestException; @@ -46,18 +44,14 @@ final class Http2ConnectionProcessor implements Http2Processor private const PREFACE = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; private const DEFAULT_MAX_FRAME_SIZE = 1 << 14; private const DEFAULT_WINDOW_SIZE = (1 << 16) - 1; - private const MINIMUM_WINDOW = 512 * 1024; private const WINDOW_INCREMENT = 1024 * 1024; - // Seconds to wait for pong (PING with ACK) frame before closing the connection. private const PONG_TIMEOUT = 5; /** @var string 64-bit for ping. */ private string $counter = "aaaaaaaa"; - private EncryptableSocket $socket; - /** @var Http2Stream[] */ private array $streams = []; @@ -78,8 +72,9 @@ final class Http2ConnectionProcessor implements Http2Processor /** @var int Currently open or reserved streams. Initially unlimited. */ private int $remainingStreams = 2147483647; - private HPack $hpack; + private readonly HPack $hpack; + /** @var DeferredFuture|null */ private ?DeferredFuture $settings = null; private bool $initializeStarted = false; @@ -88,12 +83,9 @@ final class Http2ConnectionProcessor implements Http2Processor private ?string $pongWatcher = null; + /** @var DeferredFuture|null */ private ?DeferredFuture $pongDeferred = null; - private ?string $idleWatcher = null; - - private int $idlePings = 0; - /** @var list<\Closure():void>|null */ private ?array $onClose = []; @@ -103,15 +95,20 @@ final class Http2ConnectionProcessor implements Http2Processor private int|null $shutdown = null; - private Queue $frameQueue; + private readonly Queue $frameQueue; - public function __construct(EncryptableSocket $socket) - { - $this->socket = $socket; - $this->hpack = new HPack; + public function __construct( + private readonly EncryptableSocket $socket, + ) { + $this->hpack = new HPack(); $this->frameQueue = new Queue(); } + public function __destruct() + { + $this->close(); + } + public function isInitialized(): bool { return $this->initialized; @@ -121,7 +118,7 @@ public function isInitialized(): bool * Returns once the connection has been initialized. A stream cannot be obtained from the * connection until the promise returned by this method resolves. */ - public function initialize(): void + public function initialize(Cancellation $cancellation): void { if ($this->initializeStarted) { throw new \Error('Connection may only be initialized once'); @@ -141,7 +138,13 @@ public function initialize(): void EventLoop::queue($this->runReadFiber(...)); EventLoop::queue($this->runWriteFiber(...)); - $future->await(); + try { + $future->await($cancellation); + } catch (CancelledException $exception) { + $exception = new SocketException('Connecting cancelled', 0, $exception); + $this->shutdown($exception); + throw new UnprocessedRequestException($exception); + } } /** @@ -174,22 +177,8 @@ public function close(): void public function handlePong(string $data): void { - if ($this->pongDeferred === null) { - return; - } - - if ($this->pongWatcher !== null) { - EventLoop::cancel($this->pongWatcher); - $this->pongWatcher = null; - } - + $this->cancelPongWatcher(true); $this->hasTimeout = false; - - $deferred = $this->pongDeferred; - $this->pongDeferred = null; - - \assert($deferred !== null); - $deferred->complete(true); } public function handlePing(string $data): void @@ -206,11 +195,7 @@ public function handleShutdown(int $lastId, int $error): void $error ); - /** - * @psalm-suppress DeprecatedClass - * @noinspection PhpDeprecationInspection - */ - $this->shutdown(new ClientHttp2ConnectionException($message, $error), $lastId); + $this->shutdown(new SocketException($message, $error), $lastId); } public function handleStreamWindowIncrement(int $streamId, int $windowSize): void @@ -341,8 +326,6 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool } }); - $this->setupPingIfIdle(); - return; } @@ -386,15 +369,6 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool return; } - $response = new Response( - '2', - $status, - Status::getReason($status), - $headers, - new ReadableBuffer, - $stream->request - ); - if ($status < 200) { $onInformationalResponse = $stream->request->getInformationalResponseHandler(); $preResponseResolution = $stream->preResponseResolution; @@ -402,13 +376,22 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool $stream->preResponseResolution = async(function () use ( $preResponseResolution, $onInformationalResponse, - $response, - $streamId + $streamId, + $stream, + $status, + $headers, ): void { $preResponseResolution?->await(); try { - $onInformationalResponse($response); + $onInformationalResponse(new Response( + '2', + $status, + Status::getReason($status), + $headers, + new ReadableBuffer(), + $stream->request, + )); } catch (\Throwable) { $this->handleStreamException(new Http2StreamException( 'Informational response handler threw an exception', @@ -422,6 +405,36 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool return; } + $stream->body = $body = new Queue(); + $stream->trailers = new DeferredFuture(); + $trailers = $stream->trailers->getFuture(); + + $deferredCancellation = new DeferredCancellation; + $cancellation = $deferredCancellation->getCancellation(); + $body = new ResponseBodyStream( + new ReadableIterableStream($body->iterate()), + $deferredCancellation, + ); + + $cancellationId = $cancellation->subscribe(function (CancelledException $exception) use ($streamId): void { + if (isset($this->streams[$streamId])) { + $this->releaseStream($streamId, $exception); + } + }); + + $trailers = $trailers->finally(static fn () => $cancellation->unsubscribe($cancellationId)); + $trailers->ignore(); + + $response = new Response( + '2', + $status, + Status::getReason($status), + $headers, + $body, + $stream->request, + $trailers, + ); + \assert($stream->preResponseResolution === null); $stream->preResponseResolution = async(function () use ($stream, $streamId): void { @@ -438,24 +451,6 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool } }); - $stream->body = $body = new Queue(); - $stream->trailers = $trailers = new DeferredFuture; - $trailers->getFuture()->ignore(); - - $bodyCancellation = new DeferredCancellation; - $cancellationToken = new CompositeCancellation( - $stream->cancellationToken, - $bodyCancellation->getCancellation() - ); - - $response->setBody( - new ResponseBodyStream( - new ReadableIterableStream($body->pipe()), - $bodyCancellation - ) - ); - $response->setTrailers($trailers->getFuture()); - \assert($stream->pendingResponse !== null); $stream->responsePending = false; @@ -499,27 +494,6 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool $stream->expectedLength = (int) $contentLength; } - - $cancellationToken->subscribe(function (CancelledException $exception) use ($streamId): void { - if (!isset($this->streams[$streamId])) { - return; - } - - if (!$this->streams[$streamId]->originalCancellation->isRequested()) { - $this->hasTimeout = true; - async($this->ping(...))->ignore(); // async ping, if other requests occur, they wait for it - - $transferTimeout = $this->streams[$streamId]->request->getTransferTimeout(); - - $exception = new TimeoutException( - 'Allowed transfer timeout exceeded, took longer than ' . $transferTimeout . ' s', - 0, - $exception - ); - } - - $this->releaseStream($streamId, $exception); - }); } public function handlePushPromise(int $streamId, int $pushId, array $pseudo, array $headers): void @@ -649,7 +623,12 @@ public function handlePushPromise(int $streamId, int $pushId, array $pseudo, arr $request->setInactivityTimeout($parentStream->request->getInactivityTimeout()); $request->setTransferTimeout($parentStream->request->getTransferTimeout()); - $tokenSource = new DeferredCancellation(); + $deferredCancellation = new DeferredCancellation(); + + $cancellation = new CompositeCancellation( + $parentStream->cancellation, + $deferredCancellation->getCancellation(), + ); $stream = new Http2Stream( $pushId, @@ -663,11 +642,10 @@ static function () { // nothing to do } ), - $parentStream->cancellationToken, - $tokenSource->getCancellation(), + $cancellation, $this->createStreamInactivityWatcher($pushId, $request->getInactivityTimeout()), self::DEFAULT_WINDOW_SIZE, - 0 + 0, ); $stream->dependency = $streamId; @@ -687,13 +665,8 @@ static function () { return; } - EventLoop::queue(function () use ($pushId, $tokenSource, $stream): void { - $cancellationToken = new CompositeCancellation( - $stream->cancellationToken, - $tokenSource->getCancellation() - ); - - $cancellationId = $cancellationToken->subscribe(function (CancelledException $exception) use ( + EventLoop::queue(function () use ($pushId, $deferredCancellation, $stream, $cancellation): void { + $cancellationId = $cancellation->subscribe(function (CancelledException $exception) use ( $pushId ): void { if (isset($this->streams[$pushId])) { @@ -708,13 +681,13 @@ static function () { \assert($stream->pendingResponse !== null); $future = $stream->pendingResponse->getFuture() - ->finally(static fn () => $cancellationToken->unsubscribe($cancellationId)); + ->finally(static fn () => $cancellation->unsubscribe($cancellationId)); $onPush($stream->request, $future); - } catch (HttpException | StreamException | CancelledException $exception) { - $tokenSource->cancel($exception); + } catch (HttpException|StreamException|CancelledException $exception) { + $deferredCancellation->cancel($exception); } catch (\Throwable $exception) { - $tokenSource->cancel($exception); + $deferredCancellation->cancel($exception); throw $exception; } }); @@ -746,12 +719,7 @@ public function handleStreamException(Http2StreamException $exception): void $id = $exception->getStreamId(); $code = $exception->getCode(); - /** - * @psalm-suppress DeprecatedClass - * @psalm-suppress InvalidScalarArgument - * @noinspection PhpDeprecationInspection - */ - $exception = new ClientHttp2StreamException($exception->getMessage(), $id, $code, $exception); + $exception = new SocketException($exception->getMessage(), $code, $exception); if ($code === Http2Parser::REFUSED_STREAM) { $exception = new UnprocessedRequestException($exception); @@ -764,14 +732,7 @@ public function handleStreamException(Http2StreamException $exception): void public function handleConnectionException(Http2ConnectionException $exception): void { - /** - * @psalm-suppress DeprecatedClass - * @psalm-suppress InvalidScalarArgument - * @noinspection PhpDeprecationInspection - */ - $this->shutdown( - new ClientHttp2ConnectionException($exception->getMessage(), $exception->getCode(), $exception) - ); + $this->shutdown(new SocketException($exception->getMessage(), $exception->getCode(), $exception)); } public function handleData(int $streamId, string $data): void @@ -823,7 +784,6 @@ public function handleData(int $streamId, string $data): void return; } - \assert($stream->body !== null); $stream->body->pushAsync($data)->map(function () use ($stream, $streamId, $length): void { // Stream may have closed while waiting for body data to be consumed. if (!isset($this->streams[$streamId])) { @@ -848,10 +808,9 @@ public function handleSettings(array $settings): void $this->writeFrame(Http2Parser::SETTINGS, Http2Parser::ACK)->ignore(); if ($this->settings) { - $deferred = $this->settings; - $this->settings = null; $this->initialized = true; - $deferred->complete($this->remainingStreams); + $this->settings->complete($this->remainingStreams); + $this->settings = null; } } @@ -900,7 +859,6 @@ public function handleStreamEnd(int $streamId): void }); } - $this->setupPingIfIdle(); $this->releaseStream($streamId); } @@ -929,7 +887,7 @@ public function getRemainingStreams(): int return $this->remainingStreams; } - public function request(Request $request, Cancellation $cancellationToken, Stream $stream): Response + public function request(Request $request, Cancellation $cancellation, Stream $stream): Response { if ($this->shutdown !== null) { $exception = new UnprocessedRequestException(new SocketException(\sprintf( @@ -945,7 +903,7 @@ public function request(Request $request, Cancellation $cancellationToken, Strea throw $exception; } - if ($this->hasTimeout && !$this->ping()) { + if ($this->hasTimeout && !$this->ping()->await()) { $exception = new UnprocessedRequestException( new SocketException(\sprintf( "Socket to '%s' missed responding to PINGs", @@ -960,9 +918,6 @@ public function request(Request $request, Cancellation $cancellationToken, Strea throw $exception; } - $this->idlePings = 0; - $this->cancelIdleWatcher(); - RequestNormalizer::normalizeRequest($request); // Remove defunct HTTP/1.x headers. @@ -999,14 +954,47 @@ public function request(Request $request, Cancellation $cancellationToken, Strea throw $exception; } - $originalCancellation = $cancellationToken; - if ($request->getTransferTimeout() > 0) { - $cancellationToken = new CompositeCancellation( - $cancellationToken, - new TimeoutCancellation($request->getTransferTimeout()) + $streamId = $this->streamId += 2; // Client streams should be odd-numbered, starting at 1. + + $originalCancellation = $cancellation; + $transferTimeout = $request->getTransferTimeout(); + if ($transferTimeout) { + $cancellation = new CompositeCancellation( + $cancellation, + new TimeoutCancellation($transferTimeout), ); } + $cancellationId = $cancellation->subscribe(function (CancelledException $exception) use ( + $streamId, + $transferTimeout, + $originalCancellation + ): void { + if (!isset($this->streams[$streamId])) { + return; + } + + if (!$originalCancellation->isRequested()) { + $exception = new TimeoutException( + 'Allowed transfer timeout exceeded, took longer than ' . $transferTimeout . ' s', + 0, + $exception, + ); + } + + $this->releaseStream($streamId, $exception); + }); + + $this->streams[$streamId] = $http2stream = new Http2Stream( + $streamId, + $request, + $stream, + $cancellation, + $this->createStreamInactivityWatcher($streamId, $request->getInactivityTimeout()), + self::DEFAULT_WINDOW_SIZE, + $this->initialWindowSize, + ); + try { $headers = $this->generateHeaders($request); $body = $request->getBody()->createBodyStream(); @@ -1017,48 +1005,11 @@ public function request(Request $request, Cancellation $cancellationToken, Strea $chunk = $body->read(); - $streamId = $this->streamId += 2; // Client streams should be odd-numbered, starting at 1. - - $this->streams[$streamId] = $http2stream = new Http2Stream( - $streamId, - $request, - $stream, - $cancellationToken, - $originalCancellation, - $this->createStreamInactivityWatcher($streamId, $request->getInactivityTimeout()), - self::DEFAULT_WINDOW_SIZE, - $this->initialWindowSize - ); - $this->socket->reference(); - $transferTimeout = $request->getTransferTimeout(); - $cancellationId = $cancellationToken->subscribe(function (CancelledException $exception) use ( - $streamId, - $originalCancellation, - $transferTimeout - ): void { - if (!isset($this->streams[$streamId])) { - return; - } - - if (!$originalCancellation->isRequested()) { - $exception = new TimeoutException( - 'Allowed transfer timeout exceeded, took longer than ' . $transferTimeout . ' s', - 0, - $exception - ); - } - - $this->releaseStream($streamId, $exception); - - if (!$originalCancellation->isRequested()) { - $this->hasTimeout = true; - $this->ping(); // async ping, if other requests occur, they wait for it - } - }); - if (!isset($this->streams[$streamId])) { + $cancellation->unsubscribe($cancellationId); + foreach ($request->getEventListeners() as $eventListener) { $eventListener->completeSendingRequest($request, $stream); } @@ -1089,82 +1040,60 @@ public function request(Request $request, Cancellation $cancellationToken, Strea $this->writeFrame(Http2Parser::HEADERS, $flag, $streamId, $headers)->ignore(); } - if ($chunk === null) { - foreach ($request->getEventListeners() as $eventListener) { - $eventListener->completeSendingRequest($request, $stream); - } - - $http2stream->requestBodyCompletion->complete(null); - - \assert($http2stream->pendingResponse !== null); - - return $http2stream->pendingResponse->getFuture()->await(); - } + try { + $buffer = $chunk; + $future = Future::complete(); + while ($buffer !== null) { + $chunk = $body->read($cancellation); + $future->await($cancellation); - $buffer = $chunk; - while (null !== $chunk = $body->read()) { - if (!isset($this->streams[$streamId])) { - foreach ($request->getEventListeners() as $eventListener) { - $eventListener->completeSendingRequest($request, $stream); + if (!isset($this->streams[$streamId])) { + break; } - \assert($http2stream->pendingResponse !== null); + $future = $this->writeData($http2stream, $buffer); + $buffer = $chunk; - return $http2stream->pendingResponse->getFuture()->await(); + if ($chunk === null) { + $http2stream->requestBodyCompletion->complete(); + } } - $this->writeData($http2stream, $buffer)->await(); - - $buffer = $chunk; - } - - if (!isset($this->streams[$streamId])) { - foreach ($request->getEventListeners() as $eventListener) { - $eventListener->completeSendingRequest($request, $stream); + $future->await($cancellation); + } finally { + if (!$http2stream->requestBodyCompletion->isComplete()) { + $http2stream->requestBodyCompletion->complete(); } - - \assert($http2stream->pendingResponse !== null); - - return $http2stream->pendingResponse->getFuture()->await(); } - \assert($http2stream->pendingResponse !== null); - - $responseFuture = $http2stream->pendingResponse->getFuture(); - - $http2stream->requestBodyCompletion->complete(); - - $this->writeData($http2stream, $buffer)->await(); - foreach ($request->getEventListeners() as $eventListener) { $eventListener->completeSendingRequest($request, $stream); } + \assert($http2stream->pendingResponse !== null); + $future = $http2stream->pendingResponse->getFuture(); + /** @var Response $response */ - $response = $responseFuture->await(); + $response = $future->await(); $response->getTrailers() - ->finally(static fn () => $cancellationToken->unsubscribe($cancellationId)) + ->finally(static fn () => $cancellation->unsubscribe($cancellationId)) ->ignore(); return $response; } catch (\Throwable $exception) { - if (isset($cancellationId)) { - $cancellationToken->unsubscribe($cancellationId); - } + $cancellation->unsubscribe($cancellationId); - if (isset($streamId) && isset($this->streams[$streamId])) { - \assert(isset($http2stream)); - - if (!$http2stream->requestBodyCompletion->isComplete()) { - $http2stream->requestBodyCompletion->error($exception); - } + if (!$http2stream->requestBodyCompletion->isComplete()) { + $http2stream->requestBodyCompletion->error($exception); + } + if (isset($this->streams[$streamId])) { $this->releaseStream($streamId, $exception); } if ($exception instanceof StreamException) { - $message = 'Failed to write request (stream ' . ($streamId ?? 'not assigned') . ') to socket: ' . + $message = 'Failed to write request (stream ' . $streamId . ') to socket: ' . $exception->getMessage(); $exception = new SocketException($message, 0, $exception); } @@ -1175,7 +1104,7 @@ public function request(Request $request, Cancellation $cancellationToken, Strea public function isClosed(): bool { - return $this->onClose === null; + return $this->shutdown !== null || $this->socket->isClosed(); } private function runReadFiber(): void @@ -1199,22 +1128,19 @@ private function runReadFiber(): void self::DEFAULT_MAX_FRAME_SIZE ) )->ignore(); - } catch (\Throwable $e) { - /** - * @psalm-suppress DeprecatedClass - * @noinspection PhpDeprecationInspection - */ - $this->shutdown(new ClientHttp2ConnectionException( + } catch (\Throwable $exception) { + $this->shutdown(new SocketException( "The HTTP/2 connection closed" . ($this->shutdown !== null ? ' unexpectedly' : ''), $this->shutdown ?? Http2Parser::GRACEFUL_SHUTDOWN, + $exception, ), 0); return; } - $parser = (new Http2Parser($this))->parse(); - try { + $parser = (new Http2Parser($this))->parse(); + while (null !== $chunk = $this->socket->read()) { $parser->send($chunk); @@ -1222,28 +1148,18 @@ private function runReadFiber(): void break; } } - - /** - * @psalm-suppress DeprecatedClass - * @noinspection PhpDeprecationInspection - */ - $this->shutdown(new ClientHttp2ConnectionException( - "The HTTP/2 connection from '" . $this->socket->getLocalAddress() . "' to '" . $this->socket->getRemoteAddress() . - "' closed" . ($this->shutdown === null ? ' unexpectedly' : ''), - $this->shutdown ?? Http2Parser::INTERNAL_ERROR, - )); } catch (\Throwable $exception) { - /** - * @psalm-suppress DeprecatedClass - * @noinspection PhpDeprecationInspection - */ - $this->shutdown(new ClientHttp2ConnectionException( + $this->shutdown(new SocketException( "The HTTP/2 connection from '" . $this->socket->getLocalAddress() . "' to '" . $this->socket->getRemoteAddress() . - "' closed unexpectedly: " . $exception->getMessage(), + "' closed due to an exception: " . $exception->getMessage(), Http2Parser::INTERNAL_ERROR, - $exception + $exception, )); + + return; } + + $this->shutdown(); } private function writeFrame( @@ -1355,11 +1271,8 @@ private function writeBufferedData(Http2Stream $stream): Future $length = \strlen($stream->requestBodyBuffer); if ($length <= $windowSize) { - if ($stream->windowSizeIncrease) { - $deferred = $stream->windowSizeIncrease; - $stream->windowSizeIncrease = null; - $deferred->complete(); - } + $stream->windowSizeIncrease?->complete(); + $stream->windowSizeIncrease = null; $this->clientWindow -= $length; $stream->clientWindow -= $length; @@ -1400,9 +1313,8 @@ private function writeBufferedData(Http2Stream $stream): Future if ($windowSize > 0) { // Read next body chunk if less than 8192 bytes will remain in the buffer if ($length - 8192 < $windowSize && $stream->windowSizeIncrease) { - $deferred = $stream->windowSizeIncrease; + $stream->windowSizeIncrease->complete(); $stream->windowSizeIncrease = null; - $deferred->complete(null); } $data = $stream->requestBodyBuffer; @@ -1455,13 +1367,8 @@ private function releaseStream(int $streamId, ?\Throwable $exception = null): vo } if ($stream->responsePending || $stream->body || $stream->trailers) { - /** - * @psalm-suppress DeprecatedClass - * @noinspection PhpDeprecationInspection - */ - $exception = $exception ?? new ClientHttp2StreamException( + $exception ??= new SocketException( \sprintf("Stream %d closed unexpectedly", $streamId), - $streamId, Http2Parser::INTERNAL_ERROR ); @@ -1469,21 +1376,15 @@ private function releaseStream(int $streamId, ?\Throwable $exception = null): vo $exception = new HttpException($exception->getMessage(), 0, $exception); } - if ($stream->pendingResponse) { - $stream->responsePending = false; - $stream->pendingResponse->error($exception); - $stream->pendingResponse = null; - } + $stream->responsePending = false; + $stream->pendingResponse?->error($exception); + $stream->pendingResponse = null; - if ($stream->body) { - $stream->body->error($exception); - $stream->body = null; - } + $stream->body?->error($exception); + $stream->body = null; - if ($stream->trailers) { - $stream->trailers->error($exception); - $stream->trailers = null; - } + $stream->trailers?->error($exception); + $stream->trailers = null; $this->writeFrame( Http2Parser::RST_STREAM, @@ -1508,120 +1409,76 @@ private function releaseStream(int $streamId, ?\Throwable $exception = null): vo } } - private function setupPingIfIdle(): void - { - if ($this->idleWatcher !== null) { - return; - } - - $this->idleWatcher = EventLoop::defer(function ($watcher): void { - \assert($this->idleWatcher === null || $this->idleWatcher === $watcher); - - $this->idleWatcher = null; - if (!empty($this->streams)) { - return; - } - - $this->idleWatcher = EventLoop::delay(300000, function ($watcher): void { - \assert($this->idleWatcher === null || $this->idleWatcher === $watcher); - \assert(empty($this->streams)); - - $this->idleWatcher = null; - - try { - // Connection idle for 10 minutes - if ($this->idlePings >= 1) { - $this->shutdown(new HttpException('Too many pending pings')); - return; - } - - if ($this->ping()) { - $this->setupPingIfIdle(); - } - } catch (\Throwable $exception) { - $this->shutdown(new HttpException('Exception when handling pings', 0, $exception)); - } - }); - - EventLoop::unreference($this->idleWatcher); - }); - - EventLoop::unreference($this->idleWatcher); - } - - private function cancelIdleWatcher(): void - { - if ($this->idleWatcher !== null) { - EventLoop::cancel($this->idleWatcher); - $this->idleWatcher = null; - } - } - /** - * @return bool Fulfilled with true if a pong is received within the timeout, false if none is received. + * @return Future Resolved with true if a pong is received within the timeout, false if none is received. */ - private function ping(): bool + private function ping(): Future { if ($this->onClose === null) { - return false; + return Future::complete(false); } if ($this->pongDeferred !== null) { - return $this->pongDeferred->getFuture()->await(); + return $this->pongDeferred->getFuture(); } - $this->pongDeferred = new DeferredFuture; - $this->idlePings++; - - $future = $this->pongDeferred->getFuture(); - $this->pongWatcher = EventLoop::delay(self::PONG_TIMEOUT, function (): void { - $this->hasTimeout = false; + $this->pongDeferred = $deferred = new DeferredFuture; + $this->pongWatcher = EventLoop::delay(self::PONG_TIMEOUT, fn () => $this->cancelPongWatcher(false)); - $deferred = $this->pongDeferred; - $this->pongDeferred = null; - - \assert($deferred !== null); - - $deferred->complete(false); + $this->writeFrame(Http2Parser::PING, 0, 0, $this->counter++)->ignore(); - // Shutdown connection to stop new requests, but keep it open, as other responses might still arrive - $this->shutdown(new HttpException('PONG timeout of ' . self::PONG_TIMEOUT . 'ms reached'), \max(0, $this->streamId)); - }); + return $deferred->getFuture(); + } - $this->writeFrame(Http2Parser::PING, 0, 0, $this->counter++)->ignore(); + private function cancelPongWatcher(bool $receivedPong): void + { + if ($this->pongWatcher !== null) { + EventLoop::cancel($this->pongWatcher); + $this->pongWatcher = null; + } - return $future->await(); + $this->pongDeferred?->complete($receivedPong); + $this->pongDeferred = null; } /** - * @param HttpException $reason Shutdown reason. - * @param int|null $lastId ID of last processed frame. Null to use the last opened frame ID or 0 if no + * @param HttpException|null $reason Shutdown reason. + * @param int|null $lastId ID of last processed frame. Null to use the last opened frame ID or 0 if no * streams have been opened. */ - private function shutdown(HttpException $reason, ?int $lastId = null): void + private function shutdown(?HttpException $reason = null, ?int $lastId = null): void { - $code = (int) $reason->getCode(); - $this->shutdown = $code; + if ($this->shutdown !== null) { + return; + } - if ($this->settings !== null) { - $settings = $this->settings; - $this->settings = null; + $reason ??= new SocketException( + "The HTTP/2 connection from '" . $this->socket->getLocalAddress() . "' to '" + . $this->socket->getRemoteAddress() . "' closed unexpectedly", + Http2Parser::INTERNAL_ERROR, + ); + $this->shutdown = (int) $reason->getCode(); + + if ($this->settings !== null) { $message = "Connection closed before HTTP/2 settings could be received"; - $settings->error(new UnprocessedRequestException(new SocketException($message, 0, $reason))); + $this->settings->error(new UnprocessedRequestException(new SocketException($message, 0, $reason))); + $this->settings = null; } if ($this->streams) { - $reason = $lastId !== null ? new UnprocessedRequestException($reason) : $reason; foreach ($this->streams as $id => $stream) { - if ($lastId !== null && $id <= $lastId) { - continue; + if ($lastId !== null && $id >= $lastId + 1) { + $reason = $reason instanceof UnprocessedRequestException + ? $reason + : new UnprocessedRequestException($reason); } $this->releaseStream($id, $reason); } } + $this->cancelPongWatcher(false); $this->socket->close(); if ($this->onClose !== null) { @@ -1632,6 +1489,8 @@ private function shutdown(HttpException $reason, ?int $lastId = null): void EventLoop::queue($callback); } } + + \assert(empty($this->streams), 'Streams array not empty after shutdown'); } /** @@ -1715,9 +1574,7 @@ private function createStreamInactivityWatcher(int $streamId, float $timeout): ? } $watcher = EventLoop::delay($timeout, function () use ($streamId, $timeout): void { - if (!isset($this->streams[$streamId])) { - return; - } + \assert(isset($this->streams[$streamId]), 'Stream inactivity watcher invoked after stream closed'); $this->releaseStream( $streamId, @@ -1739,15 +1596,11 @@ private function runWriteFiber(): void } catch (\Throwable $exception) { $this->hasWriteError = true; - /** - * @psalm-suppress DeprecatedClass - * @noinspection PhpDeprecationInspection - */ - $this->shutdown(new ClientHttp2ConnectionException( + $this->shutdown(new SocketException( "The HTTP/2 connection closed unexpectedly: " . $exception->getMessage(), Http2Parser::INTERNAL_ERROR, $exception - ), \max(0, $this->streamId)); + )); } } } diff --git a/src/Connection/Internal/Http2Stream.php b/src/Connection/Internal/Http2Stream.php index ba06b259..69e34508 100644 --- a/src/Connection/Internal/Http2Stream.php +++ b/src/Connection/Internal/Http2Stream.php @@ -14,7 +14,7 @@ use Revolt\EventLoop; /** - * Used in Http2Connection. + * Used in Http2ConnectionProcessor. * * @internal */ @@ -23,10 +23,6 @@ final class Http2Stream use ForbidSerialization; use ForbidCloning; - public int $id; - - public Request $request; - public ?Response $response = null; public ?DeferredFuture $pendingResponse; @@ -39,22 +35,14 @@ final class Http2Stream public ?DeferredFuture $trailers = null; - public Cancellation $originalCancellation; - - public Cancellation $cancellationToken; - /** @var int Bytes received on the stream. */ public int $received = 0; - public int $serverWindow; - - public int $clientWindow; - public int $bufferSize; public string $requestBodyBuffer = ''; - public DeferredFuture $requestBodyCompletion; + public readonly DeferredFuture $requestBodyCompletion; /** @var int Integer between 1 and 256 */ public int $weight = 16; @@ -63,30 +51,17 @@ final class Http2Stream public ?int $expectedLength = null; - public Stream $stream; - public ?DeferredFuture $windowSizeIncrease = null; - private ?string $watcher; - public function __construct( - int $id, - Request $request, - Stream $stream, - Cancellation $cancellationToken, - Cancellation $originalCancellation, - ?string $watcher, - int $serverSize, - int $clientSize + public readonly int $id, + public readonly Request $request, + public readonly Stream $stream, + public readonly Cancellation $cancellation, + public readonly ?string $watcher, + public int $serverWindow, + public int $clientWindow, ) { - $this->id = $id; - $this->request = $request; - $this->stream = $stream; - $this->cancellationToken = $cancellationToken; - $this->originalCancellation = $originalCancellation; - $this->watcher = $watcher; - $this->serverWindow = $serverSize; - $this->clientWindow = $clientSize; $this->pendingResponse = new DeferredFuture; $this->requestBodyCompletion = new DeferredFuture; $this->bufferSize = 0; diff --git a/test/Connection/Http2ConnectionTest.php b/test/Connection/Http2ConnectionTest.php index 31ed42fe..4cc8c206 100644 --- a/test/Connection/Http2ConnectionTest.php +++ b/test/Connection/Http2ConnectionTest.php @@ -8,6 +8,7 @@ use Amp\Http\Client\InvalidRequestException; use Amp\Http\Client\Request; use Amp\Http\Client\Response; +use Amp\Http\Client\SocketException; use Amp\Http\Client\TimeoutException; use Amp\Http\Client\Trailers; use Amp\Http\HPack; @@ -85,7 +86,7 @@ public function testSwitchingProtocols(): void ["date", formatDateHeader()], ]), Http2Parser::HEADERS, Http2Parser::END_HEADERS, 1)); - $this->expectException(Http2ConnectionException::class); + $this->expectException(SocketException::class); $this->expectExceptionMessage('Switching Protocols (101) is not part of HTTP/2'); $stream->request($request, new NullCancellation); @@ -466,7 +467,7 @@ public function testServerPushingOddStream(): void [":path", '/static'], ]), Http2Parser::PUSH_PROMISE, Http2Parser::END_HEADERS, 1)); - $this->expectException(Http2ConnectionException::class); + $this->expectException(SocketException::class); $this->expectExceptionMessage('Invalid server initiated stream'); /** @var Response $response */ @@ -478,7 +479,6 @@ public function testServerPushingOddStream(): void * @throws Socket\SocketException * @throws \Amp\ByteStream\ClosedException * @throws \Amp\ByteStream\StreamException - * @throws \Amp\Http\Http2\Http2ConnectionException * @dataProvider providerValidUriPaths */ public function testWritingRequestWithValidUriPathProceedsWithMatchingUriPath(