Skip to content

Commit

Permalink
clone streams
Browse files Browse the repository at this point in the history
  • Loading branch information
ricsam committed May 13, 2020
1 parent 62255bc commit 6bc5ac0
Showing 1 changed file with 49 additions and 22 deletions.
71 changes: 49 additions & 22 deletions src/nafs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,30 @@ import * as path from 'path';
import * as queryString from 'query-string';
import * as stream from 'stream';

class ClonedReadStream extends stream.Readable {
constructor(
readableStream: stream.Readable,
options?: stream.ReadableOptions
) {
super(options);

readableStream.on('data', (chunk) => {
this.push(chunk);
});

readableStream.on('end', () => {
this.push(null);
});

readableStream.on('error', (err) => {
this.emit('error', err);
});
}
_read() {
/* */
}
}

type NAFS = {
writeFile: (fpath: string, body: any) => Promise<any>;
readFile: (fpath: string) => Promise<Buffer | string>;
Expand Down Expand Up @@ -74,14 +98,15 @@ const activeStorageFileServe: NAFSFactory = (url: string) => {

// pass a into b
const forwardStream = (a: stream.Readable, b: stream.PassThrough) => {
a.pipe(b);
a.on('close', () => {
const readStream = new ClonedReadStream(a);
readStream.pipe(b);
readStream.on('close', () => {
b.emit('close');
});
a.on('end', () => {
readStream.on('end', () => {
b.emit('end');
});
a.on('error', (err) => {
readStream.on('error', (err) => {
b.emit('error', err);
});
};
Expand Down Expand Up @@ -150,20 +175,21 @@ const activeStorageS3Serve: NAFSFactory = (url) => {

const writeStreamToCache = async (
fpath: string,
stream: NodeJS.ReadableStream
readStream: stream.Readable
) => {
if (cachePath) {
const cacheFpath = path.join(cachePath, fpath);
await mkdirp(path.dirname(cacheFpath));
const fstream = fs.createWriteStream(cacheFpath);
stream.pipe(fstream);
const clonedReadStream = new ClonedReadStream(readStream);
clonedReadStream.pipe(fstream);
return new Promise((resolve, reject) => {
const onError = (err: any) => {
fs.unlink(cacheFpath, () => {
reject(err);
});
};
stream.once('error', onError);
clonedReadStream.once('error', onError);
fstream.once('end', () => {
resolve();
});
Expand All @@ -183,11 +209,11 @@ const activeStorageS3Serve: NAFSFactory = (url) => {
forwardStream(fs.createReadStream(cacheFpath), passStream);
})
.catch(() => {
const stream = readRemoteStream(fpath);
writeStreamToCache(fpath, stream).catch((err) => {
const remoteStream = readRemoteStream(fpath);
writeStreamToCache(fpath, remoteStream).catch((err) => {
throw err;
});
forwardStream(stream, passStream);
forwardStream(remoteStream, passStream);
});
} else if (fetchPolicy === 'cache-and-network') {
const remoteStream = readRemoteStream(fpath);
Expand Down Expand Up @@ -221,19 +247,20 @@ const activeStorageS3Serve: NAFSFactory = (url) => {

const createWriteStream = (fpath: string) => {
const ptStream = new stream.PassThrough();
const s3Stream = new stream.PassThrough();
const cacheStream = new stream.PassThrough();
ptStream.pipe(s3Stream);
ptStream.pipe(cacheStream);
s3.upload({
Bucket: bucket,
Key: getS3Path(fpath),
Body: s3Stream,
}, (err) => {
if (err) {
throw err;
const s3Stream = new ClonedReadStream(ptStream);
const cacheStream = new ClonedReadStream(ptStream);
s3.upload(
{
Bucket: bucket,
Key: getS3Path(fpath),
Body: s3Stream,
},
(err) => {
if (err) {
throw err;
}
}
});
);
writeStreamToCache(fpath, cacheStream);
return ptStream;
};
Expand Down

0 comments on commit 6bc5ac0

Please sign in to comment.