151 lines
5.5 KiB
JavaScript
151 lines
5.5 KiB
JavaScript
|
"use strict";
|
||
|
Object.defineProperty(exports, "__esModule", { value: true });
|
||
|
exports.FsaNodeWriteStream = void 0;
|
||
|
const stream_1 = require("stream");
|
||
|
const Defer_1 = require("../thingies/Defer");
|
||
|
const concurrency_1 = require("../thingies/concurrency");
|
||
|
const util_1 = require("../node/util");
|
||
|
const queueMicrotask_1 = require("../queueMicrotask");
|
||
|
/**
|
||
|
* This WriteStream implementation does not build on top of the `fs` module,
|
||
|
* but instead uses the lower-level `FileSystemFileHandle` interface. The reason
|
||
|
* is the different semantics in `fs` and FSA (File System Access API) write streams.
|
||
|
*
|
||
|
* When data is written to an FSA file, a new FSA stream is created, it copies
|
||
|
* the file to a temporary swap file. After each written chunk, that swap file
|
||
|
* is closed and the original file is replaced with the swap file. This means,
|
||
|
* if WriteStream was built on top of `fs`, each chunk write would result in
|
||
|
* a file copy, write, close, rename operations, which is not what we want.
|
||
|
*
|
||
|
* Instead this implementation hooks into the lower-level and closes the swap
|
||
|
* file only once the stream is closed. The downside is that the written data
|
||
|
* is not immediately visible to other processes (because it is written to the
|
||
|
* swap file), but that is the trade-off we have to make.
|
||
|
*
|
||
|
* @todo Could make this flush the data to the original file periodically, so that
|
||
|
* the data is visible to other processes.
|
||
|
* @todo This stream could work through `FileSystemSyncAccessHandle.write` in a
|
||
|
* Worker thread instead.
|
||
|
*/
|
||
|
class FsaNodeWriteStream extends stream_1.Writable {
|
||
|
constructor(handle, path, options) {
|
||
|
super();
|
||
|
this.path = path;
|
||
|
this.options = options;
|
||
|
this.__pending__ = true;
|
||
|
this.__closed__ = false;
|
||
|
this.__bytes__ = 0;
|
||
|
this.__mutex__ = (0, concurrency_1.concurrency)(1);
|
||
|
if (options.start !== undefined) {
|
||
|
if (typeof options.start !== 'number') {
|
||
|
throw new TypeError('"start" option must be a Number');
|
||
|
}
|
||
|
if (options.start < 0) {
|
||
|
throw new TypeError('"start" must be >= zero');
|
||
|
}
|
||
|
}
|
||
|
const stream = new Defer_1.Defer();
|
||
|
this.__stream__ = stream.promise;
|
||
|
(async () => {
|
||
|
var _a, _b;
|
||
|
const fsaHandle = await handle;
|
||
|
const fileWasOpened = !options.fd;
|
||
|
if (fileWasOpened)
|
||
|
this.emit('open', fsaHandle.fd);
|
||
|
const flags = (0, util_1.flagsToNumber)((_a = options.flags) !== null && _a !== void 0 ? _a : 'w');
|
||
|
const keepExistingData = flags & 1024 /* FLAG.O_APPEND */ ? true : false;
|
||
|
const writable = await fsaHandle.file.createWritable({ keepExistingData });
|
||
|
if (keepExistingData) {
|
||
|
const start = Number((_b = options.start) !== null && _b !== void 0 ? _b : 0);
|
||
|
if (start)
|
||
|
await writable.seek(start);
|
||
|
}
|
||
|
this.__pending__ = false;
|
||
|
stream.resolve(writable);
|
||
|
})().catch(error => {
|
||
|
stream.reject(error);
|
||
|
});
|
||
|
}
|
||
|
async ___write___(buffers) {
|
||
|
await this.__mutex__(async () => {
|
||
|
if (this.__closed__)
|
||
|
return;
|
||
|
// if (this.__closed__) throw new Error('WriteStream is closed');
|
||
|
const writable = await this.__stream__;
|
||
|
for (const buffer of buffers) {
|
||
|
await writable.write(buffer);
|
||
|
this.__bytes__ += buffer.byteLength;
|
||
|
}
|
||
|
});
|
||
|
}
|
||
|
async __close__() {
|
||
|
const emitClose = this.options.emitClose;
|
||
|
await this.__mutex__(async () => {
|
||
|
if (this.__closed__ && emitClose) {
|
||
|
(0, queueMicrotask_1.default)(() => this.emit('close'));
|
||
|
return;
|
||
|
}
|
||
|
try {
|
||
|
const writable = await this.__stream__;
|
||
|
this.__closed__ = true;
|
||
|
await writable.close();
|
||
|
if (emitClose)
|
||
|
this.emit('close');
|
||
|
}
|
||
|
catch (error) {
|
||
|
this.emit('error', error);
|
||
|
if (emitClose)
|
||
|
this.emit('close', error);
|
||
|
}
|
||
|
});
|
||
|
}
|
||
|
// ------------------------------------------------------------- IWriteStream
|
||
|
get bytesWritten() {
|
||
|
return this.__bytes__;
|
||
|
}
|
||
|
get pending() {
|
||
|
return this.__pending__;
|
||
|
}
|
||
|
close(cb) {
|
||
|
if (cb)
|
||
|
this.once('close', cb);
|
||
|
this.__close__().catch(() => { });
|
||
|
}
|
||
|
// ----------------------------------------------------------------- Writable
|
||
|
_write(chunk, encoding, callback) {
|
||
|
this.___write___([chunk])
|
||
|
.then(() => {
|
||
|
if (callback)
|
||
|
callback(null);
|
||
|
})
|
||
|
.catch(error => {
|
||
|
if (callback)
|
||
|
callback(error);
|
||
|
});
|
||
|
}
|
||
|
_writev(chunks, callback) {
|
||
|
const buffers = chunks.map(({ chunk }) => chunk);
|
||
|
this.___write___(buffers)
|
||
|
.then(() => {
|
||
|
if (callback)
|
||
|
callback(null);
|
||
|
})
|
||
|
.catch(error => {
|
||
|
if (callback)
|
||
|
callback(error);
|
||
|
});
|
||
|
}
|
||
|
_final(callback) {
|
||
|
this.__close__()
|
||
|
.then(() => {
|
||
|
if (callback)
|
||
|
callback(null);
|
||
|
})
|
||
|
.catch(error => {
|
||
|
if (callback)
|
||
|
callback(error);
|
||
|
});
|
||
|
}
|
||
|
}
|
||
|
exports.FsaNodeWriteStream = FsaNodeWriteStream;
|
||
|
//# sourceMappingURL=FsaNodeWriteStream.js.map
|