Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bandwidth calculator. #323

Merged
merged 7 commits into from
Jan 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 0 additions & 57 deletions packages/p2p-media-loader-core/src/bandwidth-approximator.ts

This file was deleted.

63 changes: 63 additions & 0 deletions packages/p2p-media-loader-core/src/bandwidth-calculator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
const CLEAR_THRESHOLD_MS = 3000;

export class BandwidthCalculator {
private simultaneousLoadingsCount = 0;
private readonly bytes: number[] = [];
private readonly timestamps: number[] = [];
private noLoadingsTotalTime = 0;
private allLoadingsStoppedTimestamp = 0;

addBytes(bytesLength: number, now = performance.now()) {
this.bytes.push(bytesLength);
this.timestamps.push(now - this.noLoadingsTotalTime);
}

startLoading(now = performance.now()) {
this.clearStale();
if (this.simultaneousLoadingsCount === 0) {
this.noLoadingsTotalTime += now - this.allLoadingsStoppedTimestamp;
}
this.simultaneousLoadingsCount++;
}

// in bits per second
stopLoading(now = performance.now()) {
if (this.simultaneousLoadingsCount <= 0) return;
this.simultaneousLoadingsCount--;
if (this.simultaneousLoadingsCount !== 0) return;
this.allLoadingsStoppedTimestamp = now;
}

getBandwidthForLastNSeconds(seconds: number) {
if (!this.timestamps.length) return 0;
const milliseconds = seconds * 1000;
const lastItemTimestamp = this.timestamps[this.timestamps.length - 1];
let lastCountedTimestamp = lastItemTimestamp;
const threshold = lastItemTimestamp - milliseconds;
let totalBytes = 0;

for (let i = this.bytes.length - 1; i >= 0; i--) {
const timestamp = this.timestamps[i];
if (timestamp < threshold) break;
lastCountedTimestamp = timestamp;
totalBytes += this.bytes[i];
}

return (totalBytes * 8000) / (lastItemTimestamp - lastCountedTimestamp);
}

clearStale() {
if (!this.timestamps.length) return;
const threshold =
this.timestamps[this.timestamps.length - 1] - CLEAR_THRESHOLD_MS;

let samplesToRemove = 0;
for (const timestamp of this.timestamps) {
if (timestamp > threshold) break;
samplesToRemove++;
}

this.bytes.splice(0, samplesToRemove);
this.timestamps.splice(0, samplesToRemove);
}
}
7 changes: 3 additions & 4 deletions packages/p2p-media-loader-core/src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {
} from "./types";
import * as StreamUtils from "./utils/stream";
import { LinkedMap } from "./linked-map";
import { BandwidthApproximator } from "./bandwidth-approximator";
import { BandwidthCalculator } from "./bandwidth-calculator";
import { EngineCallbacks } from "./request";
import { SegmentsMemoryStorage } from "./segments-storage";

Expand All @@ -31,7 +31,7 @@ export class Core<TStream extends Stream = Stream> {
httpErrorRetries: 3,
p2pErrorRetries: 3,
};
private readonly bandwidthApproximator = new BandwidthApproximator();
private readonly bandwidthCalculator = new BandwidthCalculator();
private segmentStorage?: SegmentsMemoryStorage;
private mainStreamLoader?: HybridLoader;
private secondaryStreamLoader?: HybridLoader;
Expand Down Expand Up @@ -113,7 +113,6 @@ export class Core<TStream extends Stream = Stream> {
this.mainStreamLoader = undefined;
this.secondaryStreamLoader = undefined;
this.segmentStorage = undefined;
this.bandwidthApproximator.destroy();
this.manifestResponseUrl = undefined;
}

Expand Down Expand Up @@ -145,7 +144,7 @@ export class Core<TStream extends Stream = Stream> {
manifestResponseUrl,
segment,
this.settings,
this.bandwidthApproximator,
this.bandwidthCalculator,
this.segmentStorage,
this.eventHandlers
);
Expand Down
18 changes: 6 additions & 12 deletions packages/p2p-media-loader-core/src/hybrid-loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Segment, StreamWithSegments } from "./index";
import { HttpRequestExecutor } from "./http-loader";
import { SegmentsMemoryStorage } from "./segments-storage";
import { Settings, CoreEventHandlers, Playback } from "./types";
import { BandwidthApproximator } from "./bandwidth-approximator";
import { BandwidthCalculator } from "./bandwidth-calculator";
import { P2PLoadersContainer } from "./p2p/loaders-container";
import { RequestsContainer } from "./request-container";
import { EngineCallbacks } from "./request";
Expand Down Expand Up @@ -30,7 +30,7 @@ export class HybridLoader {
private streamManifestUrl: string,
requestedSegment: Segment,
private readonly settings: Settings,
private readonly bandwidthApproximator: BandwidthApproximator,
private readonly bandwidthCalculator: BandwidthCalculator,
private readonly segmentStorage: SegmentsMemoryStorage,
private readonly eventHandlers?: Pick<CoreEventHandlers, "onSegmentLoaded">
) {
Expand All @@ -40,7 +40,7 @@ export class HybridLoader {
this.segmentAvgDuration = StreamUtils.getSegmentAvgDuration(activeStream);
this.requests = new RequestsContainer(
this.requestProcessQueueMicrotask,
this.bandwidthApproximator,
this.bandwidthCalculator,
this.playback,
this.settings
);
Expand Down Expand Up @@ -94,7 +94,7 @@ export class HybridLoader {
if (data) {
callbacks.onSuccess({
data,
bandwidth: this.bandwidthApproximator.getBandwidth(),
bandwidth: this.bandwidthCalculator.getBandwidthForLastNSeconds(3),
});
}
} else {
Expand Down Expand Up @@ -344,7 +344,7 @@ export class HybridLoader {
queue: QueueUtils.QueueItem[],
segment: Segment
): boolean {
for (const { segment: itemSegment } of arrayBackwards(queue)) {
for (const { segment: itemSegment } of Utils.arrayBackwards(queue)) {
if (itemSegment === segment) break;
const request = this.requests.get(itemSegment);
if (request?.type === "http" && request.status === "loading") {
Expand All @@ -359,7 +359,7 @@ export class HybridLoader {
queue: QueueUtils.QueueItem[],
segment: Segment
): boolean {
for (const { segment: itemSegment } of arrayBackwards(queue)) {
for (const { segment: itemSegment } of Utils.arrayBackwards(queue)) {
if (itemSegment === segment) break;
const request = this.requests.get(itemSegment);
if (request?.type === "p2p" && request.status === "loading") {
Expand Down Expand Up @@ -403,9 +403,3 @@ export class HybridLoader {
this.logger.destroy();
}
}

function* arrayBackwards<T>(arr: T[]) {
for (let i = arr.length - 1; i >= 0; i--) {
yield arr[i];
}
}
14 changes: 0 additions & 14 deletions packages/p2p-media-loader-core/src/linked-map.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,6 @@ export class LinkedMap<K, V extends object> {
private _first?: LinkedObject<K, V>;
private _last?: LinkedObject<K, V>;

get first() {
return this._first?.item;
}

get last() {
return this._last?.item;
}

get size() {
return this.map.size;
}
Expand Down Expand Up @@ -53,12 +45,6 @@ export class LinkedMap<K, V extends object> {
this.map.delete(key);
}

clear() {
this._first = undefined;
this._last = undefined;
this.map.clear();
}

*values(key?: K) {
let value = key ? this.map.get(key) : this._first;
if (value === undefined) return;
Expand Down
6 changes: 3 additions & 3 deletions packages/p2p-media-loader-core/src/request-container.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import { Segment, Settings, Playback } from "./types";
import { BandwidthApproximator } from "./bandwidth-approximator";
import { BandwidthCalculator } from "./bandwidth-calculator";
import { Request } from "./request";

export class RequestsContainer {
private readonly requests = new Map<Segment, Request>();

constructor(
private readonly requestProcessQueueCallback: () => void,
private readonly bandwidthApproximator: BandwidthApproximator,
private readonly bandwidthCalculator: BandwidthCalculator,
private readonly playback: Playback,
private readonly settings: Settings
) {}
Expand Down Expand Up @@ -44,7 +44,7 @@ export class RequestsContainer {
request = new Request(
segment,
this.requestProcessQueueCallback,
this.bandwidthApproximator,
this.bandwidthCalculator,
this.playback,
this.settings
);
Expand Down
24 changes: 10 additions & 14 deletions packages/p2p-media-loader-core/src/request.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Segment, SegmentResponse, Playback } from "./types";
import { BandwidthApproximator } from "./bandwidth-approximator";
import { BandwidthCalculator } from "./bandwidth-calculator";
import * as StreamUtils from "./utils/stream";
import * as Utils from "./utils/utils";
import * as LoggerUtils from "./utils/logger";
Expand Down Expand Up @@ -73,7 +73,7 @@ export class Request {
constructor(
readonly segment: Segment,
private readonly requestProcessQueueCallback: () => void,
private readonly bandwidthApproximator: BandwidthApproximator,
private readonly bandwidthCalculator: BandwidthCalculator,
private readonly playback: Playback,
private readonly settings: StreamUtils.PlaybackTimeWindowsSettings
) {
Expand Down Expand Up @@ -179,7 +179,7 @@ export class Request {
loadedBytes: 0,
startTimestamp: performance.now(),
};
this.bandwidthApproximator.addLoading(this.progress);
this.bandwidthCalculator.startLoading();
const { notReceivingBytesTimeoutMs, abort } = controls;
this._abortRequestCallback = abort;

Expand Down Expand Up @@ -207,10 +207,8 @@ export class Request {

resolveEngineCallbacksSuccessfully() {
if (!this.finalData) return;
this._engineCallbacks?.onSuccess({
data: this.finalData,
bandwidth: this.bandwidthApproximator.getBandwidth(),
});
const bandwidth = this.bandwidthCalculator.getBandwidthForLastNSeconds(3);
this._engineCallbacks?.onSuccess({ data: this.finalData, bandwidth });
this._engineCallbacks = undefined;
}

Expand All @@ -236,6 +234,7 @@ export class Request {
this._abortRequestCallback = undefined;
this.currentAttempt = undefined;
this.notReceivingBytesTimeout.clear();
this.bandwidthCalculator.stopLoading();
}

private abortOnTimeout = () => {
Expand All @@ -252,6 +251,7 @@ export class Request {
error,
});
this.notReceivingBytesTimeout.clear();
this.bandwidthCalculator.stopLoading();
this.requestProcessQueueCallback();
};

Expand All @@ -266,19 +266,20 @@ export class Request {
error,
});
this.notReceivingBytesTimeout.clear();
this.bandwidthCalculator.stopLoading();
this.requestProcessQueueCallback();
};

private completeOnSuccess = () => {
this.throwErrorIfNotLoadingStatus();
if (!this.currentAttempt) return;

this.bandwidthCalculator.stopLoading();
this.notReceivingBytesTimeout.clear();
this.finalData = Utils.joinChunks(this.bytes);
this.setStatus("succeed");
this._totalBytes = this._loadedBytes;

this.resolveEngineCallbacksSuccessfully();
this.logger(
`${this.currentAttempt.type} ${this.segment.externalId} succeed`
);
Expand All @@ -290,6 +291,7 @@ export class Request {
if (!this.currentAttempt || !this.progress) return;
this.notReceivingBytesTimeout.restart();

this.bandwidthCalculator.addBytes(chunk.length);
this.bytes.push(chunk);
this.progress.lastLoadedChunkTimestamp = performance.now();
this.progress.loadedBytes += chunk.length;
Expand Down Expand Up @@ -320,11 +322,6 @@ export class Request {

class FailedRequestAttempts {
private attempts: Required<RequestAttempt>[] = [];
private _lastClearTimestamp = performance.now();

get lastClearTimestamp() {
return this._lastClearTimestamp;
}

add(attempt: Required<RequestAttempt>) {
this.attempts.push(attempt);
Expand All @@ -343,7 +340,6 @@ class FailedRequestAttempts {

clear() {
this.attempts = [];
this._lastClearTimestamp = performance.now();
}
}

Expand Down
6 changes: 6 additions & 0 deletions packages/p2p-media-loader-core/src/utils/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,9 @@ export function hexToUtf8(hexString: string) {
const decoder = new TextDecoder();
return decoder.decode(bytes);
}

export function* arrayBackwards<T>(arr: T[]) {
for (let i = arr.length - 1; i >= 0; i--) {
yield arr[i];
}
}