From e0198702bb73bc36ae2a51a1584189b758a89176 Mon Sep 17 00:00:00 2001 From: Igor Zolotarenko Date: Mon, 13 Nov 2023 16:47:58 +0200 Subject: [PATCH] Create Request class. --- .../p2p-media-loader-core/src/http-loader.ts | 129 ++++---- .../src/hybrid-loader.ts | 125 ++++---- .../p2p-media-loader-core/src/p2p/loader.ts | 12 +- .../p2p-media-loader-core/src/p2p/peer.ts | 97 ++---- .../src/request-container.ts | 285 +++++++++++------- .../src/segments-storage.ts | 101 ++----- .../p2p-media-loader-core/src/utils/stream.ts | 34 +++ .../p2p-media-loader-core/src/utils/utils.ts | 35 +-- 8 files changed, 393 insertions(+), 425 deletions(-) create mode 100644 packages/p2p-media-loader-core/src/utils/stream.ts diff --git a/packages/p2p-media-loader-core/src/http-loader.ts b/packages/p2p-media-loader-core/src/http-loader.ts index a30cf2a6..fb58d23f 100644 --- a/packages/p2p-media-loader-core/src/http-loader.ts +++ b/packages/p2p-media-loader-core/src/http-loader.ts @@ -1,12 +1,12 @@ -import { Segment, Settings } from "./types"; -import { HttpRequest, LoadProgress } from "./request-container"; -import * as Utils from "./utils/utils"; +import { Settings } from "./types"; +import { Request } from "./request-container"; -export function getHttpSegmentRequest( - segment: Segment, +export async function fulfillHttpSegmentRequest( + request: Request, settings: Pick -): Readonly { +) { const headers = new Headers(); + const { segment } = request; const { url, byteRange } = segment; if (byteRange) { @@ -16,86 +16,61 @@ export function getHttpSegmentRequest( } const abortController = new AbortController(); - const progress: LoadProgress = { - loadedBytes: 0, - startTimestamp: performance.now(), - chunks: [], - }; - const loadSegmentData = async () => { - const requestAbortTimeout = setTimeout(() => { - const errorType: HttpLoaderError["type"] = "request-timeout"; - abortController.abort(errorType); - }, settings.httpRequestTimeout); - - try { - const response = await window.fetch(url, { - headers, - signal: abortController.signal, - }); - - if (response.ok) { - const data = await getDataPromiseAndMonitorProgress(response, progress); - clearTimeout(requestAbortTimeout); - return data; - } - throw new HttpLoaderError("fetch-error", response.statusText); - } catch (error) { - if (error instanceof Error) { - if ((error.name as HttpLoaderError["type"]) === "manual-abort") { - throw new HttpLoaderError("manual-abort"); - } - if ((error.name as HttpLoaderError["type"]) === "request-timeout") { - throw new HttpLoaderError("request-timeout"); - } - if (!(error instanceof HttpLoaderError)) { - throw new HttpLoaderError("fetch-error", error.message); - } - } - throw error; - } - }; + const requestAbortTimeout = setTimeout(() => { + const errorType: HttpLoaderError["type"] = "request-timeout"; + abortController.abort(errorType); + }, settings.httpRequestTimeout); - return { - type: "http", - promise: loadSegmentData(), - progress, - abort: () => { - const abortErrorType: HttpLoaderError["type"] = "manual-abort"; - abortController.abort(abortErrorType); - }, + const abortManually = () => { + const abortErrorType: HttpLoaderError["type"] = "manual-abort"; + abortController.abort(abortErrorType); }; -} -async function getDataPromiseAndMonitorProgress( - response: Response, - progress: LoadProgress -): Promise { - const totalBytesString = response.headers.get("Content-Length"); - if (!response.body) { - return response.arrayBuffer().then((data) => { - progress.loadedBytes = data.byteLength; - progress.totalBytes = data.byteLength; - progress.lastLoadedChunkTimestamp = performance.now(); - return data; + const requestControls = request.start("http", abortManually); + try { + const fetchResponse = await window.fetch(url, { + headers, + signal: abortController.signal, }); - } - if (totalBytesString) progress.totalBytes = +totalBytesString; + if (fetchResponse.ok) { + const totalBytesString = fetchResponse.headers.get("Content-Length"); + if (!fetchResponse.body) { + fetchResponse.arrayBuffer().then((data) => { + requestControls.addLoadedChunk(data); + requestControls.completeOnSuccess(); + }); + return; + } - const reader = response.body.getReader(); - progress.startTimestamp = performance.now(); + if (totalBytesString) request.setTotalBytes(+totalBytesString); - progress.chunks = []; - for await (const chunk of readStream(reader)) { - progress.chunks.push(chunk); - progress.loadedBytes += chunk.length; - progress.lastLoadedChunkTimestamp = performance.now(); + const reader = fetchResponse.body.getReader(); + for await (const chunk of readStream(reader)) { + requestControls.addLoadedChunk(chunk); + } + requestControls.completeOnSuccess(); + clearTimeout(requestAbortTimeout); + } + throw new HttpLoaderError("fetch-error", fetchResponse.statusText); + } catch (error) { + if (error instanceof Error) { + let httpLoaderError: HttpLoaderError; + if ((error.name as HttpLoaderError["type"]) === "manual-abort") { + httpLoaderError = new HttpLoaderError("manual-abort"); + } else if ( + (error.name as HttpLoaderError["type"]) === "request-timeout" + ) { + httpLoaderError = new HttpLoaderError("request-timeout"); + } else if (!(error instanceof HttpLoaderError)) { + httpLoaderError = new HttpLoaderError("fetch-error", error.message); + } else { + httpLoaderError = error; + } + requestControls.cancelOnError(httpLoaderError); + } } - - progress.totalBytes = progress.loadedBytes; - - return Utils.joinChunks(progress.chunks, progress.totalBytes); } async function* readStream( diff --git a/packages/p2p-media-loader-core/src/hybrid-loader.ts b/packages/p2p-media-loader-core/src/hybrid-loader.ts index bd503081..f0f4f415 100644 --- a/packages/p2p-media-loader-core/src/hybrid-loader.ts +++ b/packages/p2p-media-loader-core/src/hybrid-loader.ts @@ -1,5 +1,5 @@ import { Segment, StreamWithSegments } from "./index"; -import { getHttpSegmentRequest, HttpLoaderError } from "./http-loader"; +import { fulfillHttpSegmentRequest } from "./http-loader"; import { SegmentsMemoryStorage } from "./segments-storage"; import { Settings, CoreEventHandlers } from "./types"; import { BandwidthApproximator } from "./bandwidth-approximator"; @@ -8,6 +8,7 @@ import { RequestsContainer, EngineCallbacks, HybridLoaderRequest, + Request, } from "./request-container"; import * as QueueUtils from "./utils/queue"; import * as LoggerUtils from "./utils/logger"; @@ -40,7 +41,10 @@ export class HybridLoader { const activeStream = requestedSegment.stream; this.playback = { position: requestedSegment.startTime, rate: 1 }; this.segmentAvgDuration = getSegmentAvgDuration(activeStream); - this.requests = new RequestsContainer(requestedSegment.stream.type); + this.requests = new RequestsContainer( + requestedSegment.stream.type, + this.bandwidthApproximator + ); if (!this.segmentStorage.isInitialized) { throw new Error("Segment storage is not initialized."); @@ -101,7 +105,8 @@ export class HybridLoader { }); } } else { - this.requests.addEngineCallbacks(segment, callbacks); + const request = this.requests.getOrCreateRequest(segment); + request.engineCallbacks = callbacks; } this.createProcessQueueMicrotask(); @@ -110,9 +115,10 @@ export class HybridLoader { private createProcessQueueMicrotask(force = true) { const now = performance.now(); if ( - !force && - this.lastQueueProcessingTimeStamp !== undefined && - now - this.lastQueueProcessingTimeStamp <= 1000 + (!force && + this.lastQueueProcessingTimeStamp !== undefined && + now - this.lastQueueProcessingTimeStamp <= 1000) || + this.isProcessQueueMicrotaskCreated ) { return; } @@ -133,18 +139,14 @@ export class HybridLoader { skipSegment: (segment) => this.segmentStorage.hasSegment(segment), }); - for (const { - segment, - loaderRequest, - engineCallbacks, - } of this.requests.values()) { + for (const request of this.requests.values()) { if ( - !engineCallbacks && - loaderRequest && - !queueSegmentIds.has(segment.localId) + !request.isSegmentRequestedByEngine && + request.status === "loading" && + !queueSegmentIds.has(request.segment.localId) ) { - loaderRequest.abort(); - this.requests.remove(segment); + request.abort(); + this.requests.remove(request); } } @@ -153,7 +155,7 @@ export class HybridLoader { for (const item of queue) { const { statuses, segment } = item; - const request = this.requests.getHybridLoaderRequest(segment); + const request = this.requests.get(segment); if (statuses.isHighDemand) { if (request?.type === "http") continue; @@ -174,39 +176,39 @@ export class HybridLoader { continue; } } - if (this.requests.httpRequestsCount < simultaneousHttpDownloads) { + if (this.requests.executingHttpCount < simultaneousHttpDownloads) { void this.loadThroughHttp(item); continue; } this.abortLastHttpLoadingAfter(queue, segment); - if (this.requests.httpRequestsCount < simultaneousHttpDownloads) { + if (this.requests.executingHttpCount < simultaneousHttpDownloads) { void this.loadThroughHttp(item); continue; } if (this.requests.isP2PRequested(segment)) continue; - if (this.requests.p2pRequestsCount < simultaneousP2PDownloads) { + if (this.requests.executingP2PCount < simultaneousP2PDownloads) { void this.loadThroughP2P(item); continue; } this.abortLastP2PLoadingAfter(queue, segment); - if (this.requests.p2pRequestsCount < simultaneousP2PDownloads) { + if (this.requests.executingP2PCount < simultaneousP2PDownloads) { void this.loadThroughP2P(item); } break; } if (statuses.isP2PDownloadable) { if (request) continue; - if (this.requests.p2pRequestsCount < simultaneousP2PDownloads) { + if (this.requests.executingP2PCount < simultaneousP2PDownloads) { void this.loadThroughP2P(item); continue; } this.abortLastP2PLoadingAfter(queue, segment); - if (this.requests.p2pRequestsCount < simultaneousP2PDownloads) { + if (this.requests.executingP2PCount < simultaneousP2PDownloads) { void this.loadThroughP2P(item); } } @@ -222,59 +224,48 @@ export class HybridLoader { private async loadThroughHttp(item: QueueItem, isRandom = false) { const { segment } = item; - let data: ArrayBuffer | undefined; - try { - const httpRequest = getHttpSegmentRequest(segment, this.settings); - if (!isRandom) { - this.logger.loader( - `http request: ${LoggerUtils.getQueueItemString(item)}` - ); - } + const request = this.requests.getOrCreateRequest(segment); + request.subscribe("onCompleted", this.onRequestCompleted); + request.subscribe("onError", this.onRequestError); - this.requests.addHybridLoaderRequest(segment, httpRequest); - this.bandwidthApproximator.addLoading(httpRequest.progress); - data = await httpRequest.promise; - if (!data) return; - this.logger.loader(`http responses: ${segment.externalId}`); - this.onSegmentLoaded(item, "http", data); - } catch (error) { - if ( - !(error instanceof HttpLoaderError) || - error.type === "manual-abort" - ) { - return; - } - this.createProcessQueueMicrotask(); + void fulfillHttpSegmentRequest(request, this.settings); + if (!isRandom) { + this.logger.loader( + `http request: ${LoggerUtils.getQueueItemString(item)}` + ); } } private async loadThroughP2P(item: QueueItem) { - const { segment } = item; const p2pLoader = this.p2pLoaders.currentLoader; - try { - const downloadPromise = p2pLoader.downloadSegment(item); - if (downloadPromise === undefined) return; - const data = await downloadPromise; - this.onSegmentLoaded(item, "p2p", data); - } catch (error) { - if ( - !(error instanceof PeerRequestError) || - error.type === "manual-abort" - ) { - return; - } - const request = this.requests.get(segment); - this.createProcessQueueMicrotask(); - } + const request = p2pLoader.downloadSegment(item); + if (request === undefined) return; + + request.subscribe("onCompleted", this.onRequestCompleted); + request.subscribe("onError", this.onRequestError); } + private onRequestCompleted = (request: Request, data: ArrayBuffer) => { + const { segment } = request; + this.logger.loader(`http responses: ${segment.externalId}`); + this.eventHandlers?.onSegmentLoaded?.(data.byteLength, "http"); + this.createProcessQueueMicrotask(); + }; + + private onRequestError = (request: Request, error: Error) => { + if (!(error instanceof PeerRequestError) || error.type === "manual-abort") { + return; + } + this.createProcessQueueMicrotask(); + }; + private loadRandomThroughHttp() { const { simultaneousHttpDownloads } = this.settings; const p2pLoader = this.p2pLoaders.currentLoader; const connectedPeersAmount = p2pLoader.connectedPeersAmount; if ( - this.requests.httpRequestsCount >= simultaneousHttpDownloads || + this.requests.executingHttpCount >= simultaneousHttpDownloads || !connectedPeersAmount ) { return; @@ -314,12 +305,6 @@ export class HybridLoader { this.refreshLevelBandwidth(true); } void this.segmentStorage.storeSegment(segment, data); - - const bandwidth = statuses.isHighDemand - ? this.bandwidthApproximator.getBandwidth() - : this.levelBandwidth.value; - - this.requests.resolveAndRemoveRequest(segment, { data, bandwidth }); this.eventHandlers?.onSegmentLoaded?.(byteLength, type); this.createProcessQueueMicrotask(); } @@ -328,7 +313,7 @@ export class HybridLoader { for (const { segment: itemSegment } of arrayBackwards(queue)) { if (itemSegment.localId === segment.localId) break; if (this.requests.isHttpRequested(segment)) { - this.requests.abortLoaderRequest(segment); + this.requests.get(segment)?.abort(); this.logger.loader( "http aborted: ", LoggerUtils.getSegmentString(segment) @@ -342,7 +327,7 @@ export class HybridLoader { for (const { segment: itemSegment } of arrayBackwards(queue)) { if (itemSegment.localId === segment.localId) break; if (this.requests.isP2PRequested(segment)) { - this.requests.abortLoaderRequest(segment); + this.requests.get(segment)?.abort(); this.logger.loader( "p2p aborted: ", LoggerUtils.getSegmentString(segment) diff --git a/packages/p2p-media-loader-core/src/p2p/loader.ts b/packages/p2p-media-loader-core/src/p2p/loader.ts index ff81470a..619492c6 100644 --- a/packages/p2p-media-loader-core/src/p2p/loader.ts +++ b/packages/p2p-media-loader-core/src/p2p/loader.ts @@ -7,7 +7,7 @@ import { SegmentsMemoryStorage } from "../segments-storage"; import * as Utils from "../utils/utils"; import * as LoggerUtils from "../utils/logger"; import { PeerSegmentStatus } from "../enums"; -import { RequestsContainer } from "../request-container"; +import { Request, RequestsContainer } from "../request-container"; import debug from "debug"; export class P2PLoader { @@ -86,7 +86,7 @@ export class P2PLoader { this.peers.set(connection.id, peer); } - downloadSegment(item: QueueItem): Promise | undefined { + downloadSegment(item: QueueItem): Request | undefined { const { segment, statuses } = item; const untestedPeers: Peer[] = []; let fastestPeer: Peer | undefined; @@ -113,18 +113,18 @@ export class P2PLoader { if (!peer) return; - const request = peer.requestSegment(segment); - this.requests.addHybridLoaderRequest(segment, request); + const request = this.requests.getOrCreateRequest(segment); + peer.fulfillSegmentRequest(request); this.logger( `p2p request ${segment.externalId} | ${LoggerUtils.getStatusesString( statuses )}` ); - request.promise.then(() => { + request.subscribe("onCompleted", () => { this.logger(`p2p loaded: ${segment.externalId}`); }); - return request.promise; + return request; } isLoadingOrLoadedBySomeone(segment: Segment): boolean { diff --git a/packages/p2p-media-loader-core/src/p2p/peer.ts b/packages/p2p-media-loader-core/src/p2p/peer.ts index c051d35c..cb9da03f 100644 --- a/packages/p2p-media-loader-core/src/p2p/peer.ts +++ b/packages/p2p-media-loader-core/src/p2p/peer.ts @@ -8,7 +8,7 @@ import { } from "../internal-types"; import { PeerCommandType, PeerSegmentStatus } from "../enums"; import * as PeerUtil from "../utils/peer"; -import { P2PRequest } from "../request-container"; +import { Request, RequestControls } from "../request-container"; import { Segment, Settings } from "../types"; import * as Utils from "../utils/utils"; import debug from "debug"; @@ -33,14 +33,6 @@ type PeerEventHandlers = { onSegmentRequested: (peer: Peer, segmentId: string) => void; }; -type PeerRequest = { - segment: Segment; - p2pRequest: P2PRequest; - resolve: (data: ArrayBuffer) => void; - reject: (error: PeerRequestError) => void; - responseTimeoutId: number; -}; - type PeerSettings = Pick< Settings, "p2pSegmentDownloadTimeout" | "webRtcMaxMessageSize" @@ -51,7 +43,7 @@ export class Peer { private connection?: PeerConnection; private connections = new Set(); private segments = new Map(); - private request?: PeerRequest; + private requestData?: { request: Request; controls: RequestControls }; private readonly logger = debug("core:peer"); private readonly bandwidthMeasurer = new BandwidthMeasurer(); private isUploadingSegment = false; @@ -109,7 +101,7 @@ export class Peer { } get downloadingSegment(): Segment | undefined { - return this.request?.segment; + return this.requestData?.request.segment; } get bandwidth(): number | undefined { @@ -138,14 +130,13 @@ export class Peer { break; case PeerCommandType.SegmentData: - if (this.request?.segment.externalId === command.i) { - const { progress } = this.request.p2pRequest; - progress.totalBytes = command.s; + if (this.requestData?.request.segment.externalId === command.i) { + this.requestData.request.setTotalBytes(command.s); } break; case PeerCommandType.SegmentAbsent: - if (this.request?.segment.externalId === command.i) { + if (this.requestData?.request.segment.externalId === command.i) { this.cancelSegmentRequest("segment-absent"); this.segments.delete(command.i); } @@ -162,18 +153,21 @@ export class Peer { this.connection.send(JSON.stringify(command)); } - requestSegment(segment: Segment) { - if (this.request) { + fulfillSegmentRequest(request: Request) { + if (this.requestData) { throw new Error("Segment already is downloading"); } - const { externalId } = segment; + this.requestData = { + request, + controls: request.start("p2p", () => + this.cancelSegmentRequest("manual-abort") + ), + }; const command: PeerSegmentCommand = { c: PeerCommandType.SegmentRequest, - i: externalId, + i: request.segment.externalId, }; this.sendCommand(command); - this.request = this.createPeerRequest(segment); - return this.request.p2pRequest; } sendSegmentsAnnouncement(announcement: JsonSegmentAnnouncement) { @@ -235,60 +229,27 @@ export class Peer { this.sendCommand(command); } - private createPeerRequest(segment: Segment): PeerRequest { - const { promise, resolve, reject } = - Utils.getControlledPromise(); - return { - segment, - resolve, - reject, - responseTimeoutId: this.setRequestTimeout(), - p2pRequest: { - type: "p2p", - progress: { - loadedBytes: 0, - startTimestamp: performance.now(), - chunks: [], - }, - promise, - abort: () => this.cancelSegmentRequest("manual-abort"), - }, - }; - } - private receiveSegmentChunk(chunk: Uint8Array): void { - const { request } = this; - if (!request) return; - - const { progress } = request.p2pRequest; - progress.loadedBytes += chunk.byteLength; - progress.lastLoadedChunkTimestamp = performance.now(); - progress.chunks.push(chunk); - - if (progress.loadedBytes === progress.totalBytes) { - const segmentData = Utils.joinChunks( - progress.chunks, - progress.totalBytes - ); - const { lastLoadedChunkTimestamp, startTimestamp, loadedBytes } = - progress; - const loadingDuration = lastLoadedChunkTimestamp - startTimestamp; - this.bandwidthMeasurer.addMeasurement(loadedBytes, loadingDuration); - request.resolve(segmentData); + if (!this.requestData) return; + const { request, controls } = this.requestData; + controls.addLoadedChunk(chunk); + + if (request.loadedBytes === request.totalBytes) { + controls.completeOnSuccess(); this.clearRequest(); } else if ( - progress.totalBytes !== undefined && - progress.loadedBytes > progress.totalBytes + request.totalBytes !== undefined && + request.loadedBytes > request.totalBytes ) { this.cancelSegmentRequest("response-bytes-mismatch"); } } private cancelSegmentRequest(type: PeerRequestError["type"]) { - if (!this.request) return; - this.logger( - `cancel segment request ${this.request?.segment.externalId} (${type})` - ); + if (!this.requestData) return; + const { request, controls } = this.requestData; + const { segment } = request; + this.logger(`cancel segment request ${segment.externalId} (${type})`); const error = new PeerRequestError(type); const sendCancelCommandTypes: PeerRequestError["type"][] = [ "destroy", @@ -299,10 +260,10 @@ export class Peer { if (sendCancelCommandTypes.includes(type)) { this.sendCommand({ c: PeerCommandType.CancelSegmentRequest, - i: this.request.segment.externalId, + i: segment.externalId, }); } - this.request.reject(error); + controls.cancelOnError(error); this.clearRequest(); } diff --git a/packages/p2p-media-loader-core/src/request-container.ts b/packages/p2p-media-loader-core/src/request-container.ts index 9926fb0f..2185bf26 100644 --- a/packages/p2p-media-loader-core/src/request-container.ts +++ b/packages/p2p-media-loader-core/src/request-container.ts @@ -1,8 +1,10 @@ import { Segment, SegmentResponse, StreamType } from "./types"; -import { Subscriptions } from "./segments-storage"; import { PeerRequestError } from "./p2p/peer"; import { HttpLoaderError } from "./http-loader"; import Debug from "debug"; +import { EventDispatcher } from "./event-dispatcher"; +import * as Utils from "./utils/utils"; +import { BandwidthApproximator } from "./bandwidth-approximator"; export type EngineCallbacks = { onSuccess: (response: SegmentResponse) => void; @@ -14,11 +16,9 @@ export type LoadProgress = { lastLoadedChunkTimestamp?: number; loadedBytes: number; totalBytes?: number; - chunks: Uint8Array[]; }; type HybridLoaderRequestBase = { - promise: Promise; abort: () => void; progress: LoadProgress; }; @@ -35,38 +35,179 @@ export type P2PRequest = HybridLoaderRequestBase & { export type HybridLoaderRequest = HttpRequest | P2PRequest; -type RequestItem = { - segment: Readonly; - loaderRequest?: HybridLoaderRequest; - engineCallbacks?: Readonly; - prevAttempts: HybridLoaderRequest[]; +type RequestEvents = { + onCompleted: (request: Request, data: ArrayBuffer) => void; + onError: (request: Request, data: Error) => void; +}; + +type RequestStatus = + | "not-started" + | "loading" + | "succeed" + | "failed" + | "aborted"; + +export class Request extends EventDispatcher { + readonly id: string; + private _engineCallbacks?: EngineCallbacks; + private hybridLoaderRequest?: HybridLoaderRequest; + private prevAttempts: HybridLoaderRequest[] = []; + private chunks: Uint8Array[] = []; + private _loadedBytes = 0; + private _totalBytes?: number; + private _status: RequestStatus = "not-started"; + + constructor( + readonly segment: Segment, + private readonly bandwidthApproximator: BandwidthApproximator + ) { + super(); + this.id = getRequestItemId(segment); + } + + get status() { + return this._status; + } + + get isSegmentRequestedByEngine(): boolean { + return !!this._engineCallbacks; + } + + get type() { + return this.hybridLoaderRequest?.type; + } + + get loadedBytes() { + return this._loadedBytes; + } + + set engineCallbacks(callbacks: EngineCallbacks) { + if (this._engineCallbacks) { + throw new Error("Segment is already requested by engine"); + } + this._engineCallbacks = callbacks; + } + + get totalBytes(): number | undefined { + return this._totalBytes; + } + + setTotalBytes(value: number) { + if (this._totalBytes !== undefined) { + throw new Error("Request total bytes value is already set"); + } + this._totalBytes = value; + } + + get loadedPercent() { + if (!this._totalBytes) return; + return Utils.getPercent(this.loadedBytes, this._totalBytes); + } + + start(type: "http" | "p2p", abortLoading: () => void): RequestControls { + if (this._status === "loading") { + throw new Error("Request has been already started."); + } + + this._status = "loading"; + this.hybridLoaderRequest = { + type, + abort: abortLoading, + progress: { + loadedBytes: 0, + startTimestamp: performance.now(), + }, + }; + + return { + addLoadedChunk: this.addLoadedChunk, + completeOnSuccess: this.completeOnSuccess, + cancelOnError: this.cancelOnError, + }; + } + + abort() { + if (!this.hybridLoaderRequest) return; + this.hybridLoaderRequest.abort(); + this._status = "aborted"; + } + + private completeOnSuccess = () => { + this.throwErrorIfNotLoadingStatus(); + const data = Utils.joinChunks(this.chunks); + this._status = "succeed"; + this._engineCallbacks?.onSuccess({ + data, + bandwidth: this.bandwidthApproximator.getBandwidth(), + }); + this.dispatch("onCompleted", this, data); + }; + + private addLoadedChunk = (chunk: Uint8Array) => { + this.throwErrorIfNotLoadingStatus(); + const { hybridLoaderRequest: request } = this; + if (!request) return; + this.chunks.push(chunk); + request.progress.lastLoadedChunkTimestamp = performance.now(); + this._loadedBytes += chunk.length; + }; + + private cancelOnError = (error: Error) => { + this.throwErrorIfNotLoadingStatus(); + if (!this.hybridLoaderRequest) return; + this._status = "failed"; + this.hybridLoaderRequest.error = error; + this.prevAttempts.push(this.hybridLoaderRequest); + this.dispatch("onError", this, error); + }; + + private throwErrorIfNotLoadingStatus() { + if (this._status !== "loading") { + throw new Error("Request has been already completed/aborted/failed."); + } + } +} + +export type RequestControls = { + addLoadedChunk: Request["addLoadedChunk"]; + completeOnSuccess: Request["completeOnSuccess"]; + cancelOnError: Request["cancelOnError"]; }; function getRequestItemId(segment: Segment) { return segment.localId; } +type RequestsContainerEvents = { + httpRequestsUpdated: () => void; +}; + export class RequestsContainer { - private readonly requests = new Map(); - private readonly onHttpRequestsHandlers = new Subscriptions(); + private readonly requests = new Map(); private readonly logger: Debug.Debugger; + private readonly events = new EventDispatcher(); - constructor(streamType: StreamType) { + constructor( + streamType: StreamType, + private readonly bandwidthApproximator: BandwidthApproximator + ) { this.logger = Debug(`core:requests-container-${streamType}`); this.logger.color = "LightSeaGreen"; } - get httpRequestsCount() { + get executingHttpCount() { let count = 0; - // eslint-disable-next-line @typescript-eslint/no-unused-vars - for (const request of this.httpRequests()) count++; + for (const request of this.httpRequests()) { + if (request.status === "loading") count++; + } return count; } - get p2pRequestsCount() { + get executingP2PCount() { let count = 0; - // eslint-disable-next-line @typescript-eslint/no-unused-vars - for (const request of this.p2pRequests()) count++; + for (const request of this.p2pRequests()) { + if (request.status === "loading") count++; + } return count; } @@ -75,126 +216,60 @@ export class RequestsContainer { return this.requests.get(id); } - getHybridLoaderRequest(segment: Segment) { + getOrCreateRequest(segment: Segment) { const id = getRequestItemId(segment); - return this.requests.get(id)?.loaderRequest; + let request = this.requests.get(id); + if (!request) { + request = new Request(segment, this.bandwidthApproximator); + request.subscribe("onCompleted", this.onRequestCompleted); + this.requests.set(request.id, request); + } + return request; } - remove(segment: Segment) { - const id = getRequestItemId(segment); - this.requests.delete(id); - } + private onRequestCompleted: RequestEvents["onCompleted"] = (request) => { + this.requests.delete(request.id); + }; - addHybridLoaderRequest(segment: Segment, loaderRequest: HybridLoaderRequest) { - const segmentId = getRequestItemId(segment); - const existingRequest = this.requests.get(segmentId); - if (existingRequest) { - existingRequest.loaderRequest = loaderRequest; - } else { - this.requests.set(segmentId, { - segment, - loaderRequest, - prevAttempts: [], - }); - } - this.logger( - `add loader request: ${loaderRequest.type} ${segment.externalId}` - ); - if (loaderRequest.type === "http") this.onHttpRequestsHandlers.fire(); - } - - addEngineCallbacks(segment: Segment, engineCallbacks: EngineCallbacks) { - const segmentId = getRequestItemId(segment); - const requestItem = this.requests.get(segmentId); - - if (requestItem) { - requestItem.engineCallbacks = engineCallbacks; - } else { - this.requests.set(segmentId, { - segment, - engineCallbacks, - prevAttempts: [], - }); - } - this.logger(`add engine request ${segment.externalId}`); + remove(value: Segment | Request) { + const id = value instanceof Request ? value.id : getRequestItemId(value); + this.requests.delete(id); } values() { return this.requests.values(); } - *httpRequests(): Generator { + *httpRequests(): Generator { for (const request of this.requests.values()) { - if (request.loaderRequest?.type === "http") yield request; + if (request.type === "http") yield request; } } - *p2pRequests(): Generator { + *p2pRequests(): Generator { for (const request of this.requests.values()) { - if (request.loaderRequest?.type === "p2p") yield request; + if (request.type === "p2p") yield request; } } - resolveAndRemoveRequest(segment: Segment, response: SegmentResponse) { - const id = getRequestItemId(segment); - const request = this.requests.get(id); - if (!request) return; - request.engineCallbacks?.onSuccess(response); - this.requests.delete(id); - } - isHttpRequested(segment: Segment): boolean { const id = getRequestItemId(segment); - return this.requests.get(id)?.loaderRequest?.type === "http"; + return this.requests.get(id)?.type === "http"; } isP2PRequested(segment: Segment): boolean { const id = getRequestItemId(segment); - return this.requests.get(id)?.loaderRequest?.type === "p2p"; + return this.requests.get(id)?.type === "p2p"; } isHybridLoaderRequested(segment: Segment): boolean { const id = getRequestItemId(segment); - return !!this.requests.get(id)?.loaderRequest; - } - - abortEngineRequest(segment: Segment) { - const id = getRequestItemId(segment); - const request = this.requests.get(id); - if (!request) return; - - // request.engineCallbacks?.onError(new RequestAbortError()); - request.loaderRequest?.abort(); - } - - abortLoaderRequest(segment: Segment) { - const id = getRequestItemId(segment); - this.requests.get(id)?.loaderRequest?.abort(); - } - - abortAllNotRequestedByEngine(isLocked?: (segment: Segment) => boolean) { - const isSegmentLocked = isLocked ? isLocked : () => false; - for (const { - loaderRequest, - engineCallbacks, - segment, - } of this.requests.values()) { - if (engineCallbacks || !loaderRequest) continue; - if (!isSegmentLocked(segment)) loaderRequest.abort(); - } - } - - subscribeOnHttpRequestsUpdate(handler: () => void) { - this.onHttpRequestsHandlers.add(handler); - } - - unsubscribeFromHttpRequestsUpdate(handler: () => void) { - this.onHttpRequestsHandlers.remove(handler); + return !!this.requests.get(id)?.type; } destroy() { for (const request of this.requests.values()) { - request.loaderRequest?.abort(); + request.abort(); request.engineCallbacks?.onError("failed"); } this.requests.clear(); diff --git a/packages/p2p-media-loader-core/src/segments-storage.ts b/packages/p2p-media-loader-core/src/segments-storage.ts index e8b1a1db..545ee965 100644 --- a/packages/p2p-media-loader-core/src/segments-storage.ts +++ b/packages/p2p-media-loader-core/src/segments-storage.ts @@ -1,4 +1,6 @@ import { Segment, Settings, Stream } from "./types"; +import { EventDispatcher } from "./event-dispatcher"; +import * as StreamUtils from "./utils/stream"; import Debug from "debug"; type StorageSettings = Pick< @@ -6,62 +8,29 @@ type StorageSettings = Pick< "cachedSegmentExpiration" | "cachedSegmentsCount" >; -function getStreamShortExternalId(stream: Readonly) { - const { type, index } = stream; - return `${type}-${index}`; -} - function getStorageItemId(segment: Segment) { - const streamExternalId = getStreamShortExternalId(segment.stream); + const streamExternalId = StreamUtils.getStreamShortId(segment.stream); return `${streamExternalId}|${segment.externalId}`; } -export class Subscriptions< - T extends (...args: unknown[]) => void = () => void -> { - private readonly list: Set; - - constructor(handlers?: T | T[]) { - if (handlers) { - this.list = new Set(Array.isArray(handlers) ? handlers : [handlers]); - } else { - this.list = new Set(); - } - } - - add(handler: T) { - this.list.add(handler); - } - - remove(handler: T) { - this.list.delete(handler); - } - - fire(...args: Parameters) { - for (const handler of this.list) { - handler(...args); - } - } - - get isEmpty() { - return this.list.size === 0; - } -} - type StorageItem = { segment: Segment; data: ArrayBuffer; lastAccessed: number; }; +type StorageEventHandlers = { + [key in `onStorageUpdated${string}`]: (steam: Stream) => void; +}; + export class SegmentsMemoryStorage { private cache = new Map(); private _isInitialized = false; private readonly isSegmentLockedPredicates: (( segment: Segment ) => boolean)[] = []; - private onUpdateHandlers = new Map(); private readonly logger: Debug.Debugger; + private readonly events = new EventDispatcher(); constructor( private readonly masterManifestUrl: string, @@ -90,14 +59,13 @@ export class SegmentsMemoryStorage { async storeSegment(segment: Segment, data: ArrayBuffer) { const id = getStorageItemId(segment); - const streamId = getStreamShortExternalId(segment.stream); this.cache.set(id, { segment, data, lastAccessed: performance.now(), }); this.logger(`add segment: ${id}`); - this.fireOnUpdateSubscriptions(streamId); + this.dispatchStorageUpdatedEvent(segment.stream); void this.clear(); } @@ -116,10 +84,10 @@ export class SegmentsMemoryStorage { } getStoredSegmentExternalIdsOfStream(stream: Stream) { - const streamId = getStreamShortExternalId(stream); + const streamId = StreamUtils.getStreamShortId(stream); const externalIds: string[] = []; for (const { segment } of this.cache.values()) { - const itemStreamId = getStreamShortExternalId(segment.stream); + const itemStreamId = StreamUtils.getStreamShortId(segment.stream); if (itemStreamId === streamId) externalIds.push(segment.externalId); } return externalIds; @@ -128,7 +96,7 @@ export class SegmentsMemoryStorage { private async clear(): Promise { const itemsToDelete: string[] = []; const remainingItems: [string, StorageItem][] = []; - const streamIdsOfChangedItems = new Set(); + const streamsOfChangedItems = new Set(); // Delete old segments const now = performance.now(); @@ -138,9 +106,8 @@ export class SegmentsMemoryStorage { const { lastAccessed, segment } = item; if (now - lastAccessed > this.settings.cachedSegmentExpiration) { if (!this.isSegmentLocked(segment)) { - const streamId = getStreamShortExternalId(segment.stream); itemsToDelete.push(itemId); - streamIdsOfChangedItems.add(streamId); + streamsOfChangedItems.add(segment.stream); } } else { remainingItems.push(entry); @@ -155,9 +122,8 @@ export class SegmentsMemoryStorage { for (const [itemId, { segment }] of remainingItems) { if (!this.isSegmentLocked(segment)) { - const streamId = getStreamShortExternalId(segment.stream); itemsToDelete.push(itemId); - streamIdsOfChangedItems.add(streamId); + streamsOfChangedItems.add(segment.stream); countOverhead--; if (countOverhead === 0) break; } @@ -167,40 +133,39 @@ export class SegmentsMemoryStorage { if (itemsToDelete.length) { this.logger(`cleared ${itemsToDelete.length} segments`); itemsToDelete.forEach((id) => this.cache.delete(id)); - for (const streamId of streamIdsOfChangedItems) { - this.fireOnUpdateSubscriptions(streamId); + for (const stream of streamsOfChangedItems) { + this.dispatchStorageUpdatedEvent(stream); } } return itemsToDelete.length > 0; } - subscribeOnUpdate(stream: Stream, handler: () => void) { - const streamId = getStreamShortExternalId(stream); - const handlers = this.onUpdateHandlers.get(streamId); - if (!handlers) { - this.onUpdateHandlers.set(streamId, new Subscriptions(handler)); - } else { - handlers.add(handler); - } + subscribeOnUpdate( + stream: Stream, + listener: StorageEventHandlers["onStorageUpdated"] + ) { + const localId = StreamUtils.getStreamShortId(stream); + this.events.subscribe(`onStorageUpdated-${localId}`, listener); } - unsubscribeFromUpdate(stream: Stream, handler: () => void) { - const streamId = getStreamShortExternalId(stream); - const handlers = this.onUpdateHandlers.get(streamId); - if (handlers) { - handlers.remove(handler); - if (handlers.isEmpty) this.onUpdateHandlers.delete(streamId); - } + unsubscribeFromUpdate( + stream: Stream, + listener: StorageEventHandlers["onStorageUpdated"] + ) { + const localId = StreamUtils.getStreamShortId(stream); + this.events.unsubscribe(`onStorageUpdated-${localId}`, listener); } - private fireOnUpdateSubscriptions(streamId: string) { - this.onUpdateHandlers.get(streamId)?.fire(); + private dispatchStorageUpdatedEvent(stream: Stream) { + this.events.dispatch( + `onStorageUpdated${StreamUtils.getStreamShortId(stream)}`, + stream + ); } public async destroy() { this.cache.clear(); - this.onUpdateHandlers.clear(); this._isInitialized = false; } } diff --git a/packages/p2p-media-loader-core/src/utils/stream.ts b/packages/p2p-media-loader-core/src/utils/stream.ts new file mode 100644 index 00000000..5845e0ac --- /dev/null +++ b/packages/p2p-media-loader-core/src/utils/stream.ts @@ -0,0 +1,34 @@ +import { Segment, Stream, StreamWithSegments } from "../types"; + +const PEER_PROTOCOL_VERSION = "V1"; + +export function getStreamExternalId( + manifestResponseUrl: string, + stream: Readonly +): string { + const { type, index } = stream; + return `${PEER_PROTOCOL_VERSION}:${manifestResponseUrl}-${type}-${index}`; +} + +export function getSegmentFromStreamsMap( + streams: Map, + segmentId: string +): Segment | undefined { + for (const stream of streams.values()) { + const segment = stream.segments.get(segmentId); + if (segment) return segment; + } +} + +export function getSegmentFromStreamByExternalId( + stream: StreamWithSegments, + segmentExternalId: string +): Segment | undefined { + for (const segment of stream.segments.values()) { + if (segment.externalId === segmentExternalId) return segment; + } +} + +export function getStreamShortId(stream: Stream) { + return `${stream.type}-${stream.index}`; +} diff --git a/packages/p2p-media-loader-core/src/utils/utils.ts b/packages/p2p-media-loader-core/src/utils/utils.ts index 7267ac7a..8db9b2bb 100644 --- a/packages/p2p-media-loader-core/src/utils/utils.ts +++ b/packages/p2p-media-loader-core/src/utils/utils.ts @@ -1,34 +1,3 @@ -import { Segment, Stream, StreamWithSegments } from "../index"; - -const PEER_PROTOCOL_VERSION = "V1"; - -export function getStreamExternalId( - manifestResponseUrl: string, - stream: Readonly -): string { - const { type, index } = stream; - return `${PEER_PROTOCOL_VERSION}:${manifestResponseUrl}-${type}-${index}`; -} - -export function getSegmentFromStreamsMap( - streams: Map, - segmentId: string -): Segment | undefined { - for (const stream of streams.values()) { - const segment = stream.segments.get(segmentId); - if (segment) return segment; - } -} - -export function getSegmentFromStreamByExternalId( - stream: StreamWithSegments, - segmentExternalId: string -): Segment | undefined { - for (const segment of stream.segments.values()) { - if (segment.externalId === segmentExternalId) return segment; - } -} - export function getControlledPromise() { let resolve: (value: T) => void; let reject: (reason?: unknown) => void; @@ -62,3 +31,7 @@ export function joinChunks( return buffer; } + +export function getPercent(numerator: number, denominator: number): number { + return (numerator / denominator) * 100; +}