diff --git a/src/Connection/ConnectionFactory.php b/src/Connection/ConnectionFactory.php index 7b92134d..a7ccf231 100644 --- a/src/Connection/ConnectionFactory.php +++ b/src/Connection/ConnectionFactory.php @@ -17,5 +17,5 @@ interface ConnectionFactory * Additionally, the factory may invoke {@see EventListener::startDnsResolution()} and * {@see EventListener::completeDnsResolution()}, but is not required to implement such granular events. */ - public function create(Request $request, Cancellation $cancellationToken): Connection; + public function create(Request $request, Cancellation $cancellation): Connection; } diff --git a/src/Connection/DefaultConnectionFactory.php b/src/Connection/DefaultConnectionFactory.php index ca34c4e2..268c9532 100644 --- a/src/Connection/DefaultConnectionFactory.php +++ b/src/Connection/DefaultConnectionFactory.php @@ -29,7 +29,7 @@ public function __construct(?Socket\SocketConnector $connector = null, ?ConnectC public function create( Request $request, - Cancellation $cancellationToken + Cancellation $cancellation ): Connection { foreach ($request->getEventListeners() as $eventListener) { $eventListener->startConnectionCreation($request); @@ -103,7 +103,7 @@ public function create( $socket = $connector->connect( 'tcp://' . $authority, $connectContext->withConnectTimeout($request->getTcpConnectTimeout()), - $cancellationToken + $cancellation ); } catch (Socket\ConnectException $e) { throw new UnprocessedRequestException( @@ -111,7 +111,7 @@ public function create( ); } catch (CancelledException $e) { // In case of a user cancellation request, throw the expected exception - $cancellationToken->throwIfRequested(); + $cancellation->throwIfRequested(); // Otherwise we ran into a timeout of our TimeoutCancellation throw new UnprocessedRequestException(new TimeoutException(\sprintf( @@ -138,7 +138,7 @@ public function create( } $tlsCancellation = new CompositeCancellation( - $cancellationToken, + $cancellation, new TimeoutCancellation($request->getTlsHandshakeTimeout()) ); @@ -159,7 +159,7 @@ public function create( $socket->close(); // In case of a user cancellation request, throw the expected exception - $cancellationToken->throwIfRequested(); + $cancellation->throwIfRequested(); // Otherwise we ran into a timeout of our TimeoutCancellation throw new UnprocessedRequestException(new TimeoutException(\sprintf( @@ -182,7 +182,7 @@ public function create( if ($tlsInfo->getApplicationLayerProtocol() === 'h2') { $http2Connection = new Http2Connection($socket); - $http2Connection->initialize(); + $http2Connection->initialize($cancellation); foreach ($request->getEventListeners() as $eventListener) { $eventListener->completeConnectionCreation($request); @@ -195,7 +195,7 @@ public function create( // Treat the presence of only HTTP/2 as prior knowledge, see https://http2.github.io/http2-spec/#known-http if ($request->getProtocolVersions() === ['2']) { $http2Connection = new Http2Connection($socket); - $http2Connection->initialize(); + $http2Connection->initialize($cancellation); foreach ($request->getEventListeners() as $eventListener) { $eventListener->completeConnectionCreation($request); diff --git a/src/Connection/Http2Connection.php b/src/Connection/Http2Connection.php index 5e755c9c..049ec486 100644 --- a/src/Connection/Http2Connection.php +++ b/src/Connection/Http2Connection.php @@ -11,6 +11,7 @@ use Amp\Socket\EncryptableSocket; use Amp\Socket\SocketAddress; use Amp\Socket\TlsInfo; +use Amp\TimeoutCancellation; final class Http2Connection implements Connection { @@ -36,9 +37,9 @@ public function getProtocolVersions(): array return self::PROTOCOL_VERSIONS; } - public function initialize(): void + public function initialize(?Cancellation $cancellation = null): void { - $this->processor->initialize(); + $this->processor->initialize($cancellation ?? new TimeoutCancellation(5)); } public function getStream(Request $request): ?Stream diff --git a/src/Connection/Internal/Http2ConnectionProcessor.php b/src/Connection/Internal/Http2ConnectionProcessor.php index 9d496f5a..9912bfa9 100644 --- a/src/Connection/Internal/Http2ConnectionProcessor.php +++ b/src/Connection/Internal/Http2ConnectionProcessor.php @@ -11,8 +11,6 @@ use Amp\DeferredCancellation; use Amp\DeferredFuture; use Amp\Future; -use Amp\Http\Client\Connection\Http2ConnectionException as ClientHttp2ConnectionException; -use Amp\Http\Client\Connection\Http2StreamException as ClientHttp2StreamException; use Amp\Http\Client\Connection\HttpStream; use Amp\Http\Client\Connection\Stream; use Amp\Http\Client\Connection\UnprocessedRequestException; @@ -46,18 +44,14 @@ final class Http2ConnectionProcessor implements Http2Processor private const PREFACE = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; private const DEFAULT_MAX_FRAME_SIZE = 1 << 14; private const DEFAULT_WINDOW_SIZE = (1 << 16) - 1; - private const MINIMUM_WINDOW = 512 * 1024; private const WINDOW_INCREMENT = 1024 * 1024; - // Seconds to wait for pong (PING with ACK) frame before closing the connection. private const PONG_TIMEOUT = 5; /** @var string 64-bit for ping. */ private string $counter = "aaaaaaaa"; - private EncryptableSocket $socket; - /** @var Http2Stream[] */ private array $streams = []; @@ -78,8 +72,9 @@ final class Http2ConnectionProcessor implements Http2Processor /** @var int Currently open or reserved streams. Initially unlimited. */ private int $remainingStreams = 2147483647; - private HPack $hpack; + private readonly HPack $hpack; + /** @var DeferredFuture|null */ private ?DeferredFuture $settings = null; private bool $initializeStarted = false; @@ -88,12 +83,9 @@ final class Http2ConnectionProcessor implements Http2Processor private ?string $pongWatcher = null; + /** @var DeferredFuture|null */ private ?DeferredFuture $pongDeferred = null; - private ?string $idleWatcher = null; - - private int $idlePings = 0; - /** @var list<\Closure():void>|null */ private ?array $onClose = []; @@ -103,15 +95,20 @@ final class Http2ConnectionProcessor implements Http2Processor private int|null $shutdown = null; - private Queue $frameQueue; + private readonly Queue $frameQueue; - public function __construct(EncryptableSocket $socket) - { - $this->socket = $socket; - $this->hpack = new HPack; + public function __construct( + private readonly EncryptableSocket $socket, + ) { + $this->hpack = new HPack(); $this->frameQueue = new Queue(); } + public function __destruct() + { + $this->close(); + } + public function isInitialized(): bool { return $this->initialized; @@ -121,7 +118,7 @@ public function isInitialized(): bool * Returns once the connection has been initialized. A stream cannot be obtained from the * connection until the promise returned by this method resolves. */ - public function initialize(): void + public function initialize(Cancellation $cancellation): void { if ($this->initializeStarted) { throw new \Error('Connection may only be initialized once'); @@ -141,7 +138,13 @@ public function initialize(): void EventLoop::queue($this->runReadFiber(...)); EventLoop::queue($this->runWriteFiber(...)); - $future->await(); + try { + $future->await($cancellation); + } catch (CancelledException $exception) { + $exception = new SocketException('Connecting cancelled', 0, $exception); + $this->shutdown($exception); + throw new UnprocessedRequestException($exception); + } } /** @@ -174,22 +177,8 @@ public function close(): void public function handlePong(string $data): void { - if ($this->pongDeferred === null) { - return; - } - - if ($this->pongWatcher !== null) { - EventLoop::cancel($this->pongWatcher); - $this->pongWatcher = null; - } - + $this->cancelPongWatcher(true); $this->hasTimeout = false; - - $deferred = $this->pongDeferred; - $this->pongDeferred = null; - - \assert($deferred !== null); - $deferred->complete(true); } public function handlePing(string $data): void @@ -206,11 +195,7 @@ public function handleShutdown(int $lastId, int $error): void $error ); - /** - * @psalm-suppress DeprecatedClass - * @noinspection PhpDeprecationInspection - */ - $this->shutdown(new ClientHttp2ConnectionException($message, $error), $lastId); + $this->shutdown(new SocketException($message, $error), $lastId); } public function handleStreamWindowIncrement(int $streamId, int $windowSize): void @@ -341,8 +326,6 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool } }); - $this->setupPingIfIdle(); - return; } @@ -386,15 +369,6 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool return; } - $response = new Response( - '2', - $status, - Status::getReason($status), - $headers, - new ReadableBuffer, - $stream->request - ); - if ($status < 200) { $onInformationalResponse = $stream->request->getInformationalResponseHandler(); $preResponseResolution = $stream->preResponseResolution; @@ -402,13 +376,22 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool $stream->preResponseResolution = async(function () use ( $preResponseResolution, $onInformationalResponse, - $response, - $streamId + $streamId, + $stream, + $status, + $headers, ): void { $preResponseResolution?->await(); try { - $onInformationalResponse($response); + $onInformationalResponse(new Response( + '2', + $status, + Status::getReason($status), + $headers, + new ReadableBuffer(), + $stream->request, + )); } catch (\Throwable) { $this->handleStreamException(new Http2StreamException( 'Informational response handler threw an exception', @@ -422,6 +405,36 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool return; } + $stream->body = $body = new Queue(); + $stream->trailers = new DeferredFuture(); + $trailers = $stream->trailers->getFuture(); + + $deferredCancellation = new DeferredCancellation; + $cancellation = $deferredCancellation->getCancellation(); + $body = new ResponseBodyStream( + new ReadableIterableStream($body->iterate()), + $deferredCancellation, + ); + + $cancellationId = $cancellation->subscribe(function (CancelledException $exception) use ($streamId): void { + if (isset($this->streams[$streamId])) { + $this->releaseStream($streamId, $exception); + } + }); + + $trailers = $trailers->finally(static fn () => $cancellation->unsubscribe($cancellationId)); + $trailers->ignore(); + + $response = new Response( + '2', + $status, + Status::getReason($status), + $headers, + $body, + $stream->request, + $trailers, + ); + \assert($stream->preResponseResolution === null); $stream->preResponseResolution = async(function () use ($stream, $streamId): void { @@ -438,24 +451,6 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool } }); - $stream->body = $body = new Queue(); - $stream->trailers = $trailers = new DeferredFuture; - $trailers->getFuture()->ignore(); - - $bodyCancellation = new DeferredCancellation; - $cancellationToken = new CompositeCancellation( - $stream->cancellationToken, - $bodyCancellation->getCancellation() - ); - - $response->setBody( - new ResponseBodyStream( - new ReadableIterableStream($body->pipe()), - $bodyCancellation - ) - ); - $response->setTrailers($trailers->getFuture()); - \assert($stream->pendingResponse !== null); $stream->responsePending = false; @@ -499,27 +494,6 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool $stream->expectedLength = (int) $contentLength; } - - $cancellationToken->subscribe(function (CancelledException $exception) use ($streamId): void { - if (!isset($this->streams[$streamId])) { - return; - } - - if (!$this->streams[$streamId]->originalCancellation->isRequested()) { - $this->hasTimeout = true; - async($this->ping(...))->ignore(); // async ping, if other requests occur, they wait for it - - $transferTimeout = $this->streams[$streamId]->request->getTransferTimeout(); - - $exception = new TimeoutException( - 'Allowed transfer timeout exceeded, took longer than ' . $transferTimeout . ' s', - 0, - $exception - ); - } - - $this->releaseStream($streamId, $exception); - }); } public function handlePushPromise(int $streamId, int $pushId, array $pseudo, array $headers): void @@ -649,7 +623,12 @@ public function handlePushPromise(int $streamId, int $pushId, array $pseudo, arr $request->setInactivityTimeout($parentStream->request->getInactivityTimeout()); $request->setTransferTimeout($parentStream->request->getTransferTimeout()); - $tokenSource = new DeferredCancellation(); + $deferredCancellation = new DeferredCancellation(); + + $cancellation = new CompositeCancellation( + $parentStream->cancellation, + $deferredCancellation->getCancellation(), + ); $stream = new Http2Stream( $pushId, @@ -663,11 +642,10 @@ static function () { // nothing to do } ), - $parentStream->cancellationToken, - $tokenSource->getCancellation(), + $cancellation, $this->createStreamInactivityWatcher($pushId, $request->getInactivityTimeout()), self::DEFAULT_WINDOW_SIZE, - 0 + 0, ); $stream->dependency = $streamId; @@ -687,13 +665,8 @@ static function () { return; } - EventLoop::queue(function () use ($pushId, $tokenSource, $stream): void { - $cancellationToken = new CompositeCancellation( - $stream->cancellationToken, - $tokenSource->getCancellation() - ); - - $cancellationId = $cancellationToken->subscribe(function (CancelledException $exception) use ( + EventLoop::queue(function () use ($pushId, $deferredCancellation, $stream, $cancellation): void { + $cancellationId = $cancellation->subscribe(function (CancelledException $exception) use ( $pushId ): void { if (isset($this->streams[$pushId])) { @@ -708,13 +681,13 @@ static function () { \assert($stream->pendingResponse !== null); $future = $stream->pendingResponse->getFuture() - ->finally(static fn () => $cancellationToken->unsubscribe($cancellationId)); + ->finally(static fn () => $cancellation->unsubscribe($cancellationId)); $onPush($stream->request, $future); - } catch (HttpException | StreamException | CancelledException $exception) { - $tokenSource->cancel($exception); + } catch (HttpException|StreamException|CancelledException $exception) { + $deferredCancellation->cancel($exception); } catch (\Throwable $exception) { - $tokenSource->cancel($exception); + $deferredCancellation->cancel($exception); throw $exception; } }); @@ -746,12 +719,7 @@ public function handleStreamException(Http2StreamException $exception): void $id = $exception->getStreamId(); $code = $exception->getCode(); - /** - * @psalm-suppress DeprecatedClass - * @psalm-suppress InvalidScalarArgument - * @noinspection PhpDeprecationInspection - */ - $exception = new ClientHttp2StreamException($exception->getMessage(), $id, $code, $exception); + $exception = new SocketException($exception->getMessage(), $code, $exception); if ($code === Http2Parser::REFUSED_STREAM) { $exception = new UnprocessedRequestException($exception); @@ -764,14 +732,7 @@ public function handleStreamException(Http2StreamException $exception): void public function handleConnectionException(Http2ConnectionException $exception): void { - /** - * @psalm-suppress DeprecatedClass - * @psalm-suppress InvalidScalarArgument - * @noinspection PhpDeprecationInspection - */ - $this->shutdown( - new ClientHttp2ConnectionException($exception->getMessage(), $exception->getCode(), $exception) - ); + $this->shutdown(new SocketException($exception->getMessage(), $exception->getCode(), $exception)); } public function handleData(int $streamId, string $data): void @@ -823,7 +784,6 @@ public function handleData(int $streamId, string $data): void return; } - \assert($stream->body !== null); $stream->body->pushAsync($data)->map(function () use ($stream, $streamId, $length): void { // Stream may have closed while waiting for body data to be consumed. if (!isset($this->streams[$streamId])) { @@ -848,10 +808,9 @@ public function handleSettings(array $settings): void $this->writeFrame(Http2Parser::SETTINGS, Http2Parser::ACK)->ignore(); if ($this->settings) { - $deferred = $this->settings; - $this->settings = null; $this->initialized = true; - $deferred->complete($this->remainingStreams); + $this->settings->complete($this->remainingStreams); + $this->settings = null; } } @@ -900,7 +859,6 @@ public function handleStreamEnd(int $streamId): void }); } - $this->setupPingIfIdle(); $this->releaseStream($streamId); } @@ -929,7 +887,7 @@ public function getRemainingStreams(): int return $this->remainingStreams; } - public function request(Request $request, Cancellation $cancellationToken, Stream $stream): Response + public function request(Request $request, Cancellation $cancellation, Stream $stream): Response { if ($this->shutdown !== null) { $exception = new UnprocessedRequestException(new SocketException(\sprintf( @@ -945,7 +903,7 @@ public function request(Request $request, Cancellation $cancellationToken, Strea throw $exception; } - if ($this->hasTimeout && !$this->ping()) { + if ($this->hasTimeout && !$this->ping()->await()) { $exception = new UnprocessedRequestException( new SocketException(\sprintf( "Socket to '%s' missed responding to PINGs", @@ -960,9 +918,6 @@ public function request(Request $request, Cancellation $cancellationToken, Strea throw $exception; } - $this->idlePings = 0; - $this->cancelIdleWatcher(); - RequestNormalizer::normalizeRequest($request); // Remove defunct HTTP/1.x headers. @@ -999,14 +954,47 @@ public function request(Request $request, Cancellation $cancellationToken, Strea throw $exception; } - $originalCancellation = $cancellationToken; - if ($request->getTransferTimeout() > 0) { - $cancellationToken = new CompositeCancellation( - $cancellationToken, - new TimeoutCancellation($request->getTransferTimeout()) + $streamId = $this->streamId += 2; // Client streams should be odd-numbered, starting at 1. + + $originalCancellation = $cancellation; + $transferTimeout = $request->getTransferTimeout(); + if ($transferTimeout) { + $cancellation = new CompositeCancellation( + $cancellation, + new TimeoutCancellation($transferTimeout), ); } + $cancellationId = $cancellation->subscribe(function (CancelledException $exception) use ( + $streamId, + $transferTimeout, + $originalCancellation + ): void { + if (!isset($this->streams[$streamId])) { + return; + } + + if (!$originalCancellation->isRequested()) { + $exception = new TimeoutException( + 'Allowed transfer timeout exceeded, took longer than ' . $transferTimeout . ' s', + 0, + $exception, + ); + } + + $this->releaseStream($streamId, $exception); + }); + + $this->streams[$streamId] = $http2stream = new Http2Stream( + $streamId, + $request, + $stream, + $cancellation, + $this->createStreamInactivityWatcher($streamId, $request->getInactivityTimeout()), + self::DEFAULT_WINDOW_SIZE, + $this->initialWindowSize, + ); + try { $headers = $this->generateHeaders($request); $body = $request->getBody()->createBodyStream(); @@ -1017,48 +1005,11 @@ public function request(Request $request, Cancellation $cancellationToken, Strea $chunk = $body->read(); - $streamId = $this->streamId += 2; // Client streams should be odd-numbered, starting at 1. - - $this->streams[$streamId] = $http2stream = new Http2Stream( - $streamId, - $request, - $stream, - $cancellationToken, - $originalCancellation, - $this->createStreamInactivityWatcher($streamId, $request->getInactivityTimeout()), - self::DEFAULT_WINDOW_SIZE, - $this->initialWindowSize - ); - $this->socket->reference(); - $transferTimeout = $request->getTransferTimeout(); - $cancellationId = $cancellationToken->subscribe(function (CancelledException $exception) use ( - $streamId, - $originalCancellation, - $transferTimeout - ): void { - if (!isset($this->streams[$streamId])) { - return; - } - - if (!$originalCancellation->isRequested()) { - $exception = new TimeoutException( - 'Allowed transfer timeout exceeded, took longer than ' . $transferTimeout . ' s', - 0, - $exception - ); - } - - $this->releaseStream($streamId, $exception); - - if (!$originalCancellation->isRequested()) { - $this->hasTimeout = true; - $this->ping(); // async ping, if other requests occur, they wait for it - } - }); - if (!isset($this->streams[$streamId])) { + $cancellation->unsubscribe($cancellationId); + foreach ($request->getEventListeners() as $eventListener) { $eventListener->completeSendingRequest($request, $stream); } @@ -1089,82 +1040,60 @@ public function request(Request $request, Cancellation $cancellationToken, Strea $this->writeFrame(Http2Parser::HEADERS, $flag, $streamId, $headers)->ignore(); } - if ($chunk === null) { - foreach ($request->getEventListeners() as $eventListener) { - $eventListener->completeSendingRequest($request, $stream); - } - - $http2stream->requestBodyCompletion->complete(null); - - \assert($http2stream->pendingResponse !== null); - - return $http2stream->pendingResponse->getFuture()->await(); - } + try { + $buffer = $chunk; + $future = Future::complete(); + while ($buffer !== null) { + $chunk = $body->read($cancellation); + $future->await($cancellation); - $buffer = $chunk; - while (null !== $chunk = $body->read()) { - if (!isset($this->streams[$streamId])) { - foreach ($request->getEventListeners() as $eventListener) { - $eventListener->completeSendingRequest($request, $stream); + if (!isset($this->streams[$streamId])) { + break; } - \assert($http2stream->pendingResponse !== null); + $future = $this->writeData($http2stream, $buffer); + $buffer = $chunk; - return $http2stream->pendingResponse->getFuture()->await(); + if ($chunk === null) { + $http2stream->requestBodyCompletion->complete(); + } } - $this->writeData($http2stream, $buffer)->await(); - - $buffer = $chunk; - } - - if (!isset($this->streams[$streamId])) { - foreach ($request->getEventListeners() as $eventListener) { - $eventListener->completeSendingRequest($request, $stream); + $future->await($cancellation); + } finally { + if (!$http2stream->requestBodyCompletion->isComplete()) { + $http2stream->requestBodyCompletion->complete(); } - - \assert($http2stream->pendingResponse !== null); - - return $http2stream->pendingResponse->getFuture()->await(); } - \assert($http2stream->pendingResponse !== null); - - $responseFuture = $http2stream->pendingResponse->getFuture(); - - $http2stream->requestBodyCompletion->complete(); - - $this->writeData($http2stream, $buffer)->await(); - foreach ($request->getEventListeners() as $eventListener) { $eventListener->completeSendingRequest($request, $stream); } + \assert($http2stream->pendingResponse !== null); + $future = $http2stream->pendingResponse->getFuture(); + /** @var Response $response */ - $response = $responseFuture->await(); + $response = $future->await(); $response->getTrailers() - ->finally(static fn () => $cancellationToken->unsubscribe($cancellationId)) + ->finally(static fn () => $cancellation->unsubscribe($cancellationId)) ->ignore(); return $response; } catch (\Throwable $exception) { - if (isset($cancellationId)) { - $cancellationToken->unsubscribe($cancellationId); - } + $cancellation->unsubscribe($cancellationId); - if (isset($streamId) && isset($this->streams[$streamId])) { - \assert(isset($http2stream)); - - if (!$http2stream->requestBodyCompletion->isComplete()) { - $http2stream->requestBodyCompletion->error($exception); - } + if (!$http2stream->requestBodyCompletion->isComplete()) { + $http2stream->requestBodyCompletion->error($exception); + } + if (isset($this->streams[$streamId])) { $this->releaseStream($streamId, $exception); } if ($exception instanceof StreamException) { - $message = 'Failed to write request (stream ' . ($streamId ?? 'not assigned') . ') to socket: ' . + $message = 'Failed to write request (stream ' . $streamId . ') to socket: ' . $exception->getMessage(); $exception = new SocketException($message, 0, $exception); } @@ -1175,7 +1104,7 @@ public function request(Request $request, Cancellation $cancellationToken, Strea public function isClosed(): bool { - return $this->onClose === null; + return $this->shutdown !== null || $this->socket->isClosed(); } private function runReadFiber(): void @@ -1199,22 +1128,19 @@ private function runReadFiber(): void self::DEFAULT_MAX_FRAME_SIZE ) )->ignore(); - } catch (\Throwable $e) { - /** - * @psalm-suppress DeprecatedClass - * @noinspection PhpDeprecationInspection - */ - $this->shutdown(new ClientHttp2ConnectionException( + } catch (\Throwable $exception) { + $this->shutdown(new SocketException( "The HTTP/2 connection closed" . ($this->shutdown !== null ? ' unexpectedly' : ''), $this->shutdown ?? Http2Parser::GRACEFUL_SHUTDOWN, + $exception, ), 0); return; } - $parser = (new Http2Parser($this))->parse(); - try { + $parser = (new Http2Parser($this))->parse(); + while (null !== $chunk = $this->socket->read()) { $parser->send($chunk); @@ -1222,28 +1148,18 @@ private function runReadFiber(): void break; } } - - /** - * @psalm-suppress DeprecatedClass - * @noinspection PhpDeprecationInspection - */ - $this->shutdown(new ClientHttp2ConnectionException( - "The HTTP/2 connection from '" . $this->socket->getLocalAddress() . "' to '" . $this->socket->getRemoteAddress() . - "' closed" . ($this->shutdown === null ? ' unexpectedly' : ''), - $this->shutdown ?? Http2Parser::INTERNAL_ERROR, - )); } catch (\Throwable $exception) { - /** - * @psalm-suppress DeprecatedClass - * @noinspection PhpDeprecationInspection - */ - $this->shutdown(new ClientHttp2ConnectionException( + $this->shutdown(new SocketException( "The HTTP/2 connection from '" . $this->socket->getLocalAddress() . "' to '" . $this->socket->getRemoteAddress() . - "' closed unexpectedly: " . $exception->getMessage(), + "' closed due to an exception: " . $exception->getMessage(), Http2Parser::INTERNAL_ERROR, - $exception + $exception, )); + + return; } + + $this->shutdown(); } private function writeFrame( @@ -1355,11 +1271,8 @@ private function writeBufferedData(Http2Stream $stream): Future $length = \strlen($stream->requestBodyBuffer); if ($length <= $windowSize) { - if ($stream->windowSizeIncrease) { - $deferred = $stream->windowSizeIncrease; - $stream->windowSizeIncrease = null; - $deferred->complete(); - } + $stream->windowSizeIncrease?->complete(); + $stream->windowSizeIncrease = null; $this->clientWindow -= $length; $stream->clientWindow -= $length; @@ -1400,9 +1313,8 @@ private function writeBufferedData(Http2Stream $stream): Future if ($windowSize > 0) { // Read next body chunk if less than 8192 bytes will remain in the buffer if ($length - 8192 < $windowSize && $stream->windowSizeIncrease) { - $deferred = $stream->windowSizeIncrease; + $stream->windowSizeIncrease->complete(); $stream->windowSizeIncrease = null; - $deferred->complete(null); } $data = $stream->requestBodyBuffer; @@ -1455,13 +1367,8 @@ private function releaseStream(int $streamId, ?\Throwable $exception = null): vo } if ($stream->responsePending || $stream->body || $stream->trailers) { - /** - * @psalm-suppress DeprecatedClass - * @noinspection PhpDeprecationInspection - */ - $exception = $exception ?? new ClientHttp2StreamException( + $exception ??= new SocketException( \sprintf("Stream %d closed unexpectedly", $streamId), - $streamId, Http2Parser::INTERNAL_ERROR ); @@ -1469,21 +1376,15 @@ private function releaseStream(int $streamId, ?\Throwable $exception = null): vo $exception = new HttpException($exception->getMessage(), 0, $exception); } - if ($stream->pendingResponse) { - $stream->responsePending = false; - $stream->pendingResponse->error($exception); - $stream->pendingResponse = null; - } + $stream->responsePending = false; + $stream->pendingResponse?->error($exception); + $stream->pendingResponse = null; - if ($stream->body) { - $stream->body->error($exception); - $stream->body = null; - } + $stream->body?->error($exception); + $stream->body = null; - if ($stream->trailers) { - $stream->trailers->error($exception); - $stream->trailers = null; - } + $stream->trailers?->error($exception); + $stream->trailers = null; $this->writeFrame( Http2Parser::RST_STREAM, @@ -1508,120 +1409,76 @@ private function releaseStream(int $streamId, ?\Throwable $exception = null): vo } } - private function setupPingIfIdle(): void - { - if ($this->idleWatcher !== null) { - return; - } - - $this->idleWatcher = EventLoop::defer(function ($watcher): void { - \assert($this->idleWatcher === null || $this->idleWatcher === $watcher); - - $this->idleWatcher = null; - if (!empty($this->streams)) { - return; - } - - $this->idleWatcher = EventLoop::delay(300000, function ($watcher): void { - \assert($this->idleWatcher === null || $this->idleWatcher === $watcher); - \assert(empty($this->streams)); - - $this->idleWatcher = null; - - try { - // Connection idle for 10 minutes - if ($this->idlePings >= 1) { - $this->shutdown(new HttpException('Too many pending pings')); - return; - } - - if ($this->ping()) { - $this->setupPingIfIdle(); - } - } catch (\Throwable $exception) { - $this->shutdown(new HttpException('Exception when handling pings', 0, $exception)); - } - }); - - EventLoop::unreference($this->idleWatcher); - }); - - EventLoop::unreference($this->idleWatcher); - } - - private function cancelIdleWatcher(): void - { - if ($this->idleWatcher !== null) { - EventLoop::cancel($this->idleWatcher); - $this->idleWatcher = null; - } - } - /** - * @return bool Fulfilled with true if a pong is received within the timeout, false if none is received. + * @return Future Resolved with true if a pong is received within the timeout, false if none is received. */ - private function ping(): bool + private function ping(): Future { if ($this->onClose === null) { - return false; + return Future::complete(false); } if ($this->pongDeferred !== null) { - return $this->pongDeferred->getFuture()->await(); + return $this->pongDeferred->getFuture(); } - $this->pongDeferred = new DeferredFuture; - $this->idlePings++; - - $future = $this->pongDeferred->getFuture(); - $this->pongWatcher = EventLoop::delay(self::PONG_TIMEOUT, function (): void { - $this->hasTimeout = false; + $this->pongDeferred = $deferred = new DeferredFuture; + $this->pongWatcher = EventLoop::delay(self::PONG_TIMEOUT, fn () => $this->cancelPongWatcher(false)); - $deferred = $this->pongDeferred; - $this->pongDeferred = null; - - \assert($deferred !== null); - - $deferred->complete(false); + $this->writeFrame(Http2Parser::PING, 0, 0, $this->counter++)->ignore(); - // Shutdown connection to stop new requests, but keep it open, as other responses might still arrive - $this->shutdown(new HttpException('PONG timeout of ' . self::PONG_TIMEOUT . 'ms reached'), \max(0, $this->streamId)); - }); + return $deferred->getFuture(); + } - $this->writeFrame(Http2Parser::PING, 0, 0, $this->counter++)->ignore(); + private function cancelPongWatcher(bool $receivedPong): void + { + if ($this->pongWatcher !== null) { + EventLoop::cancel($this->pongWatcher); + $this->pongWatcher = null; + } - return $future->await(); + $this->pongDeferred?->complete($receivedPong); + $this->pongDeferred = null; } /** - * @param HttpException $reason Shutdown reason. - * @param int|null $lastId ID of last processed frame. Null to use the last opened frame ID or 0 if no + * @param HttpException|null $reason Shutdown reason. + * @param int|null $lastId ID of last processed frame. Null to use the last opened frame ID or 0 if no * streams have been opened. */ - private function shutdown(HttpException $reason, ?int $lastId = null): void + private function shutdown(?HttpException $reason = null, ?int $lastId = null): void { - $code = (int) $reason->getCode(); - $this->shutdown = $code; + if ($this->shutdown !== null) { + return; + } - if ($this->settings !== null) { - $settings = $this->settings; - $this->settings = null; + $reason ??= new SocketException( + "The HTTP/2 connection from '" . $this->socket->getLocalAddress() . "' to '" + . $this->socket->getRemoteAddress() . "' closed unexpectedly", + Http2Parser::INTERNAL_ERROR, + ); + $this->shutdown = (int) $reason->getCode(); + + if ($this->settings !== null) { $message = "Connection closed before HTTP/2 settings could be received"; - $settings->error(new UnprocessedRequestException(new SocketException($message, 0, $reason))); + $this->settings->error(new UnprocessedRequestException(new SocketException($message, 0, $reason))); + $this->settings = null; } if ($this->streams) { - $reason = $lastId !== null ? new UnprocessedRequestException($reason) : $reason; foreach ($this->streams as $id => $stream) { - if ($lastId !== null && $id <= $lastId) { - continue; + if ($lastId !== null && $id >= $lastId + 1) { + $reason = $reason instanceof UnprocessedRequestException + ? $reason + : new UnprocessedRequestException($reason); } $this->releaseStream($id, $reason); } } + $this->cancelPongWatcher(false); $this->socket->close(); if ($this->onClose !== null) { @@ -1632,6 +1489,8 @@ private function shutdown(HttpException $reason, ?int $lastId = null): void EventLoop::queue($callback); } } + + \assert(empty($this->streams), 'Streams array not empty after shutdown'); } /** @@ -1715,9 +1574,7 @@ private function createStreamInactivityWatcher(int $streamId, float $timeout): ? } $watcher = EventLoop::delay($timeout, function () use ($streamId, $timeout): void { - if (!isset($this->streams[$streamId])) { - return; - } + \assert(isset($this->streams[$streamId]), 'Stream inactivity watcher invoked after stream closed'); $this->releaseStream( $streamId, @@ -1739,15 +1596,11 @@ private function runWriteFiber(): void } catch (\Throwable $exception) { $this->hasWriteError = true; - /** - * @psalm-suppress DeprecatedClass - * @noinspection PhpDeprecationInspection - */ - $this->shutdown(new ClientHttp2ConnectionException( + $this->shutdown(new SocketException( "The HTTP/2 connection closed unexpectedly: " . $exception->getMessage(), Http2Parser::INTERNAL_ERROR, $exception - ), \max(0, $this->streamId)); + )); } } } diff --git a/src/Connection/Internal/Http2Stream.php b/src/Connection/Internal/Http2Stream.php index ba06b259..69e34508 100644 --- a/src/Connection/Internal/Http2Stream.php +++ b/src/Connection/Internal/Http2Stream.php @@ -14,7 +14,7 @@ use Revolt\EventLoop; /** - * Used in Http2Connection. + * Used in Http2ConnectionProcessor. * * @internal */ @@ -23,10 +23,6 @@ final class Http2Stream use ForbidSerialization; use ForbidCloning; - public int $id; - - public Request $request; - public ?Response $response = null; public ?DeferredFuture $pendingResponse; @@ -39,22 +35,14 @@ final class Http2Stream public ?DeferredFuture $trailers = null; - public Cancellation $originalCancellation; - - public Cancellation $cancellationToken; - /** @var int Bytes received on the stream. */ public int $received = 0; - public int $serverWindow; - - public int $clientWindow; - public int $bufferSize; public string $requestBodyBuffer = ''; - public DeferredFuture $requestBodyCompletion; + public readonly DeferredFuture $requestBodyCompletion; /** @var int Integer between 1 and 256 */ public int $weight = 16; @@ -63,30 +51,17 @@ final class Http2Stream public ?int $expectedLength = null; - public Stream $stream; - public ?DeferredFuture $windowSizeIncrease = null; - private ?string $watcher; - public function __construct( - int $id, - Request $request, - Stream $stream, - Cancellation $cancellationToken, - Cancellation $originalCancellation, - ?string $watcher, - int $serverSize, - int $clientSize + public readonly int $id, + public readonly Request $request, + public readonly Stream $stream, + public readonly Cancellation $cancellation, + public readonly ?string $watcher, + public int $serverWindow, + public int $clientWindow, ) { - $this->id = $id; - $this->request = $request; - $this->stream = $stream; - $this->cancellationToken = $cancellationToken; - $this->originalCancellation = $originalCancellation; - $this->watcher = $watcher; - $this->serverWindow = $serverSize; - $this->clientWindow = $clientSize; $this->pendingResponse = new DeferredFuture; $this->requestBodyCompletion = new DeferredFuture; $this->bufferSize = 0; diff --git a/test/Connection/Http2ConnectionTest.php b/test/Connection/Http2ConnectionTest.php index 31ed42fe..4cc8c206 100644 --- a/test/Connection/Http2ConnectionTest.php +++ b/test/Connection/Http2ConnectionTest.php @@ -8,6 +8,7 @@ use Amp\Http\Client\InvalidRequestException; use Amp\Http\Client\Request; use Amp\Http\Client\Response; +use Amp\Http\Client\SocketException; use Amp\Http\Client\TimeoutException; use Amp\Http\Client\Trailers; use Amp\Http\HPack; @@ -85,7 +86,7 @@ public function testSwitchingProtocols(): void ["date", formatDateHeader()], ]), Http2Parser::HEADERS, Http2Parser::END_HEADERS, 1)); - $this->expectException(Http2ConnectionException::class); + $this->expectException(SocketException::class); $this->expectExceptionMessage('Switching Protocols (101) is not part of HTTP/2'); $stream->request($request, new NullCancellation); @@ -466,7 +467,7 @@ public function testServerPushingOddStream(): void [":path", '/static'], ]), Http2Parser::PUSH_PROMISE, Http2Parser::END_HEADERS, 1)); - $this->expectException(Http2ConnectionException::class); + $this->expectException(SocketException::class); $this->expectExceptionMessage('Invalid server initiated stream'); /** @var Response $response */ @@ -478,7 +479,6 @@ public function testServerPushingOddStream(): void * @throws Socket\SocketException * @throws \Amp\ByteStream\ClosedException * @throws \Amp\ByteStream\StreamException - * @throws \Amp\Http\Http2\Http2ConnectionException * @dataProvider providerValidUriPaths */ public function testWritingRequestWithValidUriPathProceedsWithMatchingUriPath(