/**
* a JSON-RPC variant with support for streams.
* @module scarf
*/
'use strict';
const { Stream, Transform, Readable } = require('node:stream');
const { EventEmitter } = require('node:events');
const { randomUUID: uuid } = require('node:crypto');
const net = require('node:net');
const jsonrpc = require('jsonrpc-lite');
const { URL } = require('node:url');
/**
* @private
*/
class Serializer extends Transform {
constructor() {
super({ objectMode: true });
}
_transform(data, encoding, callback) {
callback(null, JSON.stringify(data) + '\r\n');
}
}
/**
* @private
*/
class Deserializer extends Transform {
constructor() {
super({ objectMode: true });
this._buffer = '';
}
_transform(data, encoding, callback) {
this._buffer += data.toString();
if (this._buffer.indexOf('\r\n') === -1) {
return callback();
}
let valid = true;
let parts = this._buffer.split('\r\n');
while (valid && parts.length) {
let rpc = jsonrpc.parse(parts[0]);
if (rpc.type !== 'invalid') {
this.push(rpc);
parts.shift();
} else {
valid = false;
}
}
this._buffer = parts.join('\r\n');
callback();
}
}
function _createStreamPointer(ctx, stream, serializer) {
let id = uuid();
let readable = typeof stream.read === 'function';
let type = readable ? 'readable' : 'writable';
let pointer = `scarf://${id}.${type}`;
ctx.streams.set(pointer, stream);
if (readable) {
_bindReadable(pointer, stream, serializer);
}
return pointer;
}
function _bindReadable(pointer, stream, serializer) {
stream.on('data', (data) => {
serializer.write(jsonrpc.notification(pointer, [data]));
});
stream.on('end', () => {
serializer.write(jsonrpc.notification(pointer, [null]));
});
stream.on('error', () => {
serializer.write(jsonrpc.notification(pointer, [null]));
});
}
class Server extends EventEmitter {
/**
* @typedef {Function} Server~RpcHandler
* @param {...(number|string|boolean|object|array|Stream)} argument - Implementation specific arguments
* @param {Server~rpcHandlerCallback} callback - Completion callback that must be called
*/
/**
* @callback Server~rpcHandlerCallback
* @param {?Error} [err] - Error object if the handler fails
* @param {Array.<(string|boolean|number|object|array|Stream)>} [result] - Success result if any
*/
/**
* @typedef {Function} Server~createServer
* @param {Server~connectionHandlerCallback} callback - Client connection callback
* @returns {Server~AbstractServer}
*/
/**
* @callback Server~connectionHandlerCallback
* @param {stream.Duplex} clientConnection - Stream that represents a connected client
*/
/**
* @typedef {stream.Duplex} Server~AbstractServer
* @property {Function} listen - Returned server must also implement this
*/
/**
* Implementation of a scarf server. Takes an API definition
* and routes JSON-RPC messages to their handlers, automatically
* transforming scarf:// links into streams.
* @extends EventEmitter
* @constructor
* @param {Object.<string, Server~RpcHandler>} api - Map of methods to message handler functions
* @param {?Server~createServer} [createServer=net.createServer] - Function returning a duplex stream
*/
constructor(api = {}, createServer) {
super();
this.api = api;
this.clients = new Map();
this.server = createServer
? createServer(stream => this._registerClient(stream))
: net.createServer(stream => this._registerClient(stream));
this.streams = new Map();
}
/**
* Passthrough function that calls the underlying {@link Server~AbstractServer}'s listen() method
*/
listen() {
this.server.listen(...arguments);
}
_registerClient(stream) {
const id = uuid();
const serializer = new Serializer();
const deserializer = new Deserializer();
this.clients.set(id, { stream, serializer, deserializer });
stream.on('error', () => this.clients.delete(id));
stream.on('close', () => this.clients.delete(id));
stream.pipe(deserializer).on('data', rpc => this._execMethod(rpc, id));
serializer.pipe(stream);
}
_execMethod(rpc, client) {
const { serializer } = this.clients.get(client);
const { type, payload } = rpc;
if (type === 'notification') {
let parsedUrl;
try {
parsedUrl = new URL(payload.method);
} catch (e) {
return serializer.write(
jsonrpc.error(payload.id, new jsonrpc.JsonRpcError(
`Invalid stream reference: "${payload.method}"`
))
);
}
let stream = this.streams.get(payload.method);
if (stream) {
return payload.params.forEach((data) => {
if (data === null) {
stream.end();
} else {
stream.write(data);
}
});
}
this.emit('unhandled', rpc);
}
if (typeof this.api[payload.method] !== 'function') {
return serializer.write(
jsonrpc.error(payload.id, new jsonrpc.JsonRpcError(
`Invalid method: "${payload.method}"`
))
);
}
const srv = this;
function handleResultCallback(err) {
if (err) {
return serializer.write(
jsonrpc.error(payload.id, new jsonrpc.JsonRpcError(err.message))
);
}
const args = [...arguments];
for (let a = 0; a < args.length; a++) {
if (args[a] instanceof Stream) {
args[a] = _createStreamPointer(srv, args[a], serializer);
}
}
serializer.write(jsonrpc.success(payload.id, args.slice(1)));
}
try {
this.api[payload.method](...payload.params, handleResultCallback);
} catch (err) {
serializer.write(
jsonrpc.error(payload.id, new jsonrpc.JsonRpcError(err.message))
);
}
}
}
class Client extends EventEmitter {
/**
* @typedef {Function} Client~createClient
* @returns {Client~AbstractClient}
*/
/**
* @typedef {stream.Duplex} Client~AbstractClient
* @property {Function} connect - Returned client must also implement this
*/
/**
* Implementation of a scarf client. Connects to a scarf {@link Server} and
* automatically transforms stream arguments into scarf:// links and vice-versa.
* @extends EventEmitter
* @constructor
* @param {Client~createClient} [createClient=new net.Socket()] - Function returning a Duplex stream
*/
constructor(createClient) {
super();
this.stream = createClient
? createClient()
: new net.Socket();
this.deserializer = new Deserializer();
this.serializer = new Serializer();
this._callbacks = new Map();
this.streams = new Map();
this.stream.pipe(this.deserializer);
this.serializer.pipe(this.stream);
this.stream.on('error', (err) => this.emit('error', err));
this.deserializer.on('data', (rpc) => this._process(rpc));
}
/**
* Passthrough function that calls the underlying {@link Client~AbstractClient}'s connect() method
*/
connect() {
this.stream.connect(...arguments);
}
_process(rpc) {
const { type, payload } = rpc;
const handleResponse = () => {
let callback = this._callbacks.get(payload.id);
let { result } = payload;
if (result) {
for (let p = 0; p < result.length; p++) {
if (typeof result[p] === 'string' && result[p].includes('scarf://')) {
const parsedUrl = new URL(result[p]);
let [id, streamType] = parsedUrl.hostname.split('.');
let pointer = `scarf://${id}.${streamType}`;
let stream = null;
if (streamType === 'writable') {
stream = result[p] = new Transform({
write: (data, encoding, callback) => {
this.serializer.write(
jsonrpc.notification(pointer, [data])
);
callback();
},
flush: (callback) => {
this.serializer.write(
jsonrpc.notification(pointer, [null])
);
callback();
},
objectMode: true
});
stream.on('finish', () => this.streams.delete(id));
} else {
stream = result[p] = new Readable({
read: () => null,
objectMode: true
});
stream.on('end', () => this.streams.delete(id));
}
this.streams.set(pointer, stream);
}
}
callback(null, ...payload.result);
} else {
callback(new Error(payload.error.message));
}
this._callbacks.delete(payload.id);
};
const handleNotification = () => {
let parsedUrl;
try {
parsedUrl = new URL(payload.method);
} catch (e) {
return this.emit('unhandled', rpc);
}
let stream = this.streams.get(payload.method);
if (!stream) {
return this.emit('unhandled', rpc);
}
payload.params.forEach((param) => stream.push(param));
};
if (['success', 'error'].includes(type)) {
handleResponse();
} else if (type === 'notification') {
handleNotification();
} else {
this.emit('unhandled', rpc);
}
}
_invoke(method, params) {
return new Promise((resolve, reject) => {
const id = uuid();
this._callbacks.set(id, function(err, result) {
if (err) {
reject(err);
} else {
resolve(result);
}
});
this.serializer.write(jsonrpc.request(id, method, params));
});
}
/**
* @callback Client~invokeCallback
* @param {?Error} [err] - Error object if the request fails
* @param {Array.<(string|boolean|number|object|array|Stream)>} [result] - Success result if any
*/
/**
* Constructs a scarf message from the given arguents and handles the response
* @param {string} method - JSON-RPC method name
* @param {Array.<(string|array|boolean|object|number|Stream)>} params - Data to send the server
* @param {Client~invokeCallback} [callback] - Handle the results returned from the server
* @returns {Promise}
*/
invoke(method, params, callback) {
if (typeof callback === 'function') {
return this._invoke(method, params).then(function(result) {
callback(null, result);
}, callback);
} else {
return this._invoke(method, params);
}
}
}
module.exports.Server = Server;
module.exports.Client = Client;