Skip to content

Commit

Permalink
Create universal bandwidth-approximator.
Browse files Browse the repository at this point in the history
  • Loading branch information
i-zolotarenko committed Oct 15, 2023
1 parent 0bd2822 commit 4c01697
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 105 deletions.
87 changes: 41 additions & 46 deletions packages/p2p-media-loader-core/src/bandwidth-approximator.ts
Original file line number Diff line number Diff line change
@@ -1,58 +1,53 @@
const SMOOTH_INTERVAL = 15 * 1000;
const MEASURE_INTERVAL = 60 * 1000;

type NumberWithTime = {
readonly value: number;
readonly timeStamp: number;
};
import { LoadProgress } from "./request";

export class BandwidthApproximator {
private lastBytes: NumberWithTime[] = [];
private currentBytesSum = 0;
private lastBandwidth: NumberWithTime[] = [];

addBytes(bytes: number): void {
const timeStamp = performance.now();
this.lastBytes.push({ value: bytes, timeStamp });
this.currentBytesSum += bytes;

while (timeStamp - this.lastBytes[0].timeStamp > SMOOTH_INTERVAL) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.currentBytesSum -= this.lastBytes.shift()!.value;
}
private readonly loadings: LoadProgress[] = [];

const interval = Math.min(SMOOTH_INTERVAL, timeStamp);
this.lastBandwidth.push({
value: (this.currentBytesSum * 8000) / interval,
timeStamp,
});
addLoading(progress: LoadProgress) {
this.loadings.push(progress);
}

// in bits per seconds
getBandwidth(): number {
const timeStamp = performance.now();
while (
this.lastBandwidth.length !== 0 &&
timeStamp - this.lastBandwidth[0].timeStamp > MEASURE_INTERVAL
) {
this.lastBandwidth.shift();
}
getBandwidth() {
this.clearStale();
return getBandwidthByProgressList(this.loadings);
}

let maxBandwidth = 0;
for (const bandwidth of this.lastBandwidth) {
if (bandwidth.value > maxBandwidth) {
maxBandwidth = bandwidth.value;
}
private clearStale() {
const now = performance.now();
for (const { startTimestamp } of this.loadings) {
if (now - startTimestamp <= 15000) break;
this.loadings.shift();
}

return maxBandwidth;
}
}

getSmoothInterval(): number {
return SMOOTH_INTERVAL;
}
function getBandwidthByProgressList(loadings: LoadProgress[]) {
let currentRange: { from: number; to: number } | undefined;
let totalLoadingTime = 0;
let totalBytes = 0;
const now = performance.now();

for (let {
// eslint-disable-next-line prefer-const
startTimestamp: from,
lastLoadedChunkTimestamp: to,
// eslint-disable-next-line prefer-const
loadedBytes,
} of loadings) {
totalBytes += loadedBytes;
if (to === undefined) to = now;

if (!currentRange || from > currentRange.to) {
currentRange = { from, to };
totalLoadingTime += to - from;
continue;
}

getMeasureInterval(): number {
return MEASURE_INTERVAL;
if (from <= currentRange.to && to > currentRange.to) {
totalLoadingTime += to - currentRange.to;
currentRange.to = to;
}
}

return (totalBytes * 8000) / totalLoadingTime;
}
87 changes: 48 additions & 39 deletions packages/p2p-media-loader-core/src/http-loader.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
import { RequestAbortError, FetchError } from "./errors";
import { Segment } from "./types";
import { HttpRequest, LoadProgress } from "./request";
import * as process from "process";

export function getHttpSegmentRequest(segment: Segment): Readonly<HttpRequest> {
const { promise, abortController, progress, startTimestamp } =
fetchSegmentData(segment);
const { promise, abortController, progress } = fetchSegmentData(segment);
return {
type: "http",
promise,
progress,
startTimestamp,
abort: () => abortController.abort(),
};
}
Expand All @@ -25,7 +24,13 @@ function fetchSegmentData(segment: Segment) {
}
const abortController = new AbortController();

let progress: LoadProgress | undefined;
const progress: LoadProgress = {
canBeTracked: false,
totalBytes: 0,
loadedBytes: 0,
percent: 0,
startTimestamp: performance.now(),
};
const loadSegmentData = async () => {
try {
const response = await window.fetch(url, {
Expand All @@ -34,12 +39,10 @@ function fetchSegmentData(segment: Segment) {
});

if (response.ok) {
const result = getDataPromiseAndMonitorProgress(response);
progress = result.progress;
const data = await getDataPromiseAndMonitorProgress(response, progress);
// Don't return dataPromise immediately
// should await it for catch correct working
const resultData = await result.dataPromise;
return resultData;
return data;
}
throw new FetchError(
response.statusText ?? `Network response was not for ${segmentId}`,
Expand All @@ -58,48 +61,54 @@ function fetchSegmentData(segment: Segment) {
promise: loadSegmentData(),
abortController,
progress,
startTimestamp: performance.now(),
};
}

function getDataPromiseAndMonitorProgress(response: Response): {
progress?: LoadProgress;
dataPromise: Promise<ArrayBuffer>;
} {
async function getDataPromiseAndMonitorProgress(
response: Response,
progress: LoadProgress
): Promise<ArrayBuffer> {
const totalBytesString = response.headers.get("Content-Length");
if (totalBytesString === null || !response.body) {
return { dataPromise: response.arrayBuffer() };
if (!response.body) {
return response.arrayBuffer().then((data) => {
progress.loadedBytes = data.byteLength;
progress.totalBytes = data.byteLength;
progress.lastLoadedChunkTimestamp = performance.now();
progress.percent = 100;
return data;
});
}

if (totalBytesString) {
progress.totalBytes = +totalBytesString;
progress.canBeTracked = true;
}

const totalBytes = +totalBytesString;
const progress: LoadProgress = {
percent: 0,
loadedBytes: 0,
totalBytes,
};
const reader = response.body.getReader();

const getDataPromise = async () => {
const chunks: Uint8Array[] = [];
for await (const chunk of readStream(reader)) {
chunks.push(chunk);
progress.loadedBytes += chunk.length;
progress.percent = (progress.loadedBytes / totalBytes) * 100;
progress.lastLoadedChunkTimestamp = performance.now();
const chunks: Uint8Array[] = [];
for await (const chunk of readStream(reader)) {
chunks.push(chunk);
progress.loadedBytes += chunk.length;
progress.lastLoadedChunkTimestamp = performance.now();
if (progress.canBeTracked) {
progress.percent = (progress.loadedBytes / progress.totalBytes) * 100;
}
}

const resultBuffer = new ArrayBuffer(progress.loadedBytes);
const view = new Uint8Array(resultBuffer);

let offset = 0;
for (const chunk of chunks) {
view.set(chunk, offset);
offset += chunk.length;
}
if (!progress.canBeTracked) {
progress.totalBytes = progress.loadedBytes;
progress.percent = 100;
}
const resultBuffer = new ArrayBuffer(progress.loadedBytes);
const view = new Uint8Array(resultBuffer);

return resultBuffer;
};
return { progress, dataPromise: getDataPromise() };
let offset = 0;
for (const chunk of chunks) {
view.set(chunk, offset);
offset += chunk.length;
}
return resultBuffer;
}

async function* readStream(
Expand Down
19 changes: 12 additions & 7 deletions packages/p2p-media-loader-core/src/hybrid-loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,9 @@ export class HybridLoader {
const request = this.requests.get(segment);
const timeToPlayback = getTimeToSegmentPlayback(segment, this.playback);

// console.log(this.bandwidthApp.getAverageBandwidth() / 1024 ** 2);
if (statuses.isHighDemand) {
if (request?.type === "http") continue;
console.log("timeToPlayback", timeToPlayback);
console.log(this.bandwidthApproximator.getBandwidth() / 1024 ** 2);

if (request?.type === "p2p") {
const remainingDownloadTime =
Expand Down Expand Up @@ -217,10 +216,11 @@ export class HybridLoader {
}

this.requests.addLoaderRequest(segment, httpRequest);
this.bandwidthApproximator.addLoading(httpRequest.progress);
data = await httpRequest.promise;

if (!data) return;
this.logger.loader(`http responses: ${segment.externalId}`);
if (data) this.onSegmentLoaded(segment, data, "http");
this.onSegmentLoaded(segment, data, "http");
} catch (err) {
if (err instanceof FetchError) {
// TODO: handle error
Expand Down Expand Up @@ -280,8 +280,12 @@ export class HybridLoader {
type: "http" | "p2p"
) {
const byteLength = data.byteLength;
this.bandwidthApproximator.addBytes(data.byteLength);
console.log(
"approx: ",
this.bandwidthApproximator.getBandwidth() / 1024 ** 2
);
void this.segmentStorage.storeSegment(segment, data);

this.requests.resolveEngineRequest(segment, {
data,
bandwidth: this.bandwidthApproximator.getBandwidth(),
Expand Down Expand Up @@ -365,14 +369,15 @@ function getTimeToSegmentPlayback(segment: Segment, playback: Playback) {
}

function getPredictedRemainingDownloadTime(request: HybridLoaderRequest) {
const { startTimestamp, progress } = request;
const { progress } = request;
if (!progress || progress.lastLoadedChunkTimestamp === undefined) {
return undefined;
}

const now = performance.now();
const bandwidth =
progress.percent / (progress.lastLoadedChunkTimestamp - startTimestamp);
progress.percent /
(progress.lastLoadedChunkTimestamp - progress.startTimestamp);
const remainingDownloadPercent = 100 - progress.percent;
const predictedRemainingTimeFromLastDownload =
remainingDownloadPercent / bandwidth;
Expand Down
10 changes: 6 additions & 4 deletions packages/p2p-media-loader-core/src/p2p-loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,17 @@ export class P2PLoader {
const peer =
peerWithSegment[Math.floor(Math.random() * peerWithSegment.length)];
const request = peer.requestSegment(segment);
this.requests.addLoaderRequest(segment, request);
this.logger(
`p2p request ${segment.externalId} | ${LoggerUtils.getStatusesString(
statuses
)}`
);
const data = await request.promise;
this.requests.addLoaderRequest(segment, request);
this.logger(`p2p loaded: ${segment.externalId}`);
return data;
request.promise.then(() => {
this.logger(`p2p loaded: ${segment.externalId}`);
});

return request.promise;
}

isLoadingOrLoadedBySomeone(segment: Segment): boolean {
Expand Down
16 changes: 10 additions & 6 deletions packages/p2p-media-loader-core/src/peer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,9 @@ export class Peer {

case PeerCommandType.SegmentData:
if (this.request?.segment.externalId === command.i) {
this.request.p2pRequest.progress = {
percent: 0,
loadedBytes: 0,
totalBytes: command.s,
};
const { progress } = this.request.p2pRequest;
progress.totalBytes = command.s;
progress.canBeTracked = true;
}
break;

Expand Down Expand Up @@ -202,7 +200,13 @@ export class Peer {
chunks: [],
p2pRequest: {
type: "p2p",
startTimestamp: performance.now(),
progress: {
canBeTracked: false,
totalBytes: 0,
loadedBytes: 0,
percent: 0,
startTimestamp: performance.now(),
},
promise,
abort: () => this.cancelSegmentRequest("abort"),
},
Expand Down
7 changes: 4 additions & 3 deletions packages/p2p-media-loader-core/src/request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,18 @@ export type EngineCallbacks = {
};

export type LoadProgress = {
startTimestamp: number;
lastLoadedChunkTimestamp?: number;
percent: number;
loadedBytes: number;
totalBytes: number;
lastLoadedChunkTimestamp?: number;
canBeTracked: boolean;
};

type RequestBase = {
promise: Promise<ArrayBuffer>;
abort: () => void;
progress?: LoadProgress;
startTimestamp: number;
progress: LoadProgress;
};

export type HttpRequest = RequestBase & {
Expand Down

0 comments on commit 4c01697

Please sign in to comment.