/**
*
* @module kdns/node
*/
'use strict';
const { Writable: WritableStream } = require('node:stream');
const { EventEmitter } = require('node:events');
const constants = require('./constants');
const keys = require('./keys');
const { Protocol } = require('./protocol');
const { Contact, ContactList } = require('./contact');
const { Router } = require('./router');
/**
* Shuffles an array in place.
* @param {array} array - Array to shuffle
* @returns {array}
*/
function knuthShuffle(array) {
let currentIndex = array.length;
let temporaryValue;
let randomIndex;
while (0 !== currentIndex) {
randomIndex = Math.floor(Math.random() * currentIndex);
currentIndex -= 1;
temporaryValue = array[currentIndex];
array[currentIndex] = array[randomIndex];
array[randomIndex] = temporaryValue;
}
return array;
}
module.exports.knuthShuffle = knuthShuffle;
/**
* Wraps the supplied function in a pseudo-random length timeout to help
* prevent convoy effects. These occur when a number of processes need to use
* a resource in turn. There is a tendency for such bursts of activity to
* drift towards synchronization, which can be disasterous. In Kademlia all
* nodes are requird to republish their contents every hour (T_REPLICATE). A
* convoy effect might lead to this being synchronized across the network,
* which would appear to users as the network dying every hour. The default
* timeout will be between 0 and 30 minutes unless specified.
* @param {function} func - Function to wrap to execution later
* @param {number} [maxtime] - Maximum timeout
* @returns {function}
*/
function preventConvoy(func, timeout) {
return function() {
let t = Math.ceil(
Math.random() * (typeof timeout !== 'number' ? 1.8e+6 : timeout)
);
return setTimeout(func, t);
};
};
module.exports.preventConvoy = preventConvoy;
/**
* Reads objects from a {@link module:kdns/node~ReadableStore} and if an
* item should be replicated, performs an iterative store operation.
* @typedef {WritableStream<module:kdns/node~StoredItem>} ReplicatorStream
* @property {boolean} objectMode
*/
/**
* Reads objects from a {@link module:kdns/node~ReadableStore} and if an
* item should be expired, requests deletion.
* @fires module:kdns/node~Node#storage_delete
* @typedef {WritableStream<module:kdns/node~StoredItem>} ExpirationStream
* @property {boolean} objectMode - true
*/
/**
* Stream consumed by {@link module:kdns/node~ReplicatorStream} and
* {@link module:kdns/node~ReplicatorStream}.
* @typedef {ReadableStream<module:kdns/node~StoredItem>} ReadableStore
* @property {boolean} objectMode - true
*/
/**
* Format of a storage entry.
* @typedef {Object} StoredItem
* @property {buffer} blob - Raw bytes for the stored item
* @property {object} meta - Metadata needed for replication and expiration
* @property {Date} meta.timestamp - The time this item was stored
* @property {string} meta.publisher - {@link module:kdns/contacts~Contact} fingerprint that stored this item
*/
class Node extends EventEmitter {
/**
* RPC message is queued for you to send. This is implementation
* specific based on how you want to handle networking. This event
* contains all the data necessary for you to handle transport.
* @event module:kdns/node~Node#message_queued
* @param {string} method - RPC method name to call
* @param {array<string|object|number|boolean|Stream>} params - Arguments to pass the method call
* @param {module:kdns/contacts~Contact} target - Contact address information
* @param {module:kdns/protocol~HandlerResponse} respond - Resolves the message response
*/
/**
* Indicates that you may delete the item stored by the key. This is
* triggered most likely by the expiration routine.
* @event module:kdns/node~Node#storage_delete
* @param {string} key - Key of storage item to delete
* @param {module:kdns/protocol~HandlerResponse} respond - Resolves the delete request
*/
/**
* Replication routine has started. A writable stream is provided. Only
* items requiring replication will be replicated.
* @event module:kdns/node~Node#storage_replicate
* @param {module:kdns/node~ReplicatorStream} replicator - Reads from a {@link module:kdns/node~ReadableStore}
*/
/**
* Expiration routine has started. A writable stream is provided. Only
* items requiring expiration will be requested for deletion.
* @event module:kdns/node~Node#storage_expire
* @param {module:kdns/node~ExpirationStream} expirator - Reads from a {@link module:kdns/node~ReadableStore}
*/
/**
* Overridable options passed to {@link Node}.
* @typedef {Object} NodeOptions
* @property {module:kdns/router~Router} router
* @property {module:kdns/protocol~Protocol} protocol
*/
/**
* Kademlia protocol implementation. Creates an interface for orchestrating
* a Kademlia network and extending it.
* @constructor
* @extends EventEmitter
* @param {module:kdns/contacts~Contact} contact - Address and fingerprint of this node
* @param {module:kdns/node~NodeOptions} [options] - Internal overrides
*/
constructor(contact, options = {}) {
super();
this._lookups = new Map(); // NB: Track the last lookup time for buckets
this._pings = new Map(); // NB: Track the last ping time for contacts
this.events = new EventEmitter();
this.contact = contact || new Contact();
this.identity = Buffer.from(this.contact.fingerprint, 'hex')
this.router = options.router || new Router(this.identity);
this.protocol = options.protocol || new Protocol(this.router);
setInterval(
preventConvoy(() => this.refresh(0)),
constants.T_REFRESH
);
setInterval(
preventConvoy(() => this.replicate(() => this.expire())),
constants.T_REPLICATE
);
this.router.events
.on('contact_added',
(fingerprint) => this.events.emit('contact_added', fingerprint))
.on('contact_deleted',
(fingerprint) => this.events.emit('contact_deleted', fingerprint));
this.protocol.events
.on('storage_put',
(k, v, r) => this.events.emit('storage_put', k, v, r))
.on('storage_get',
(k, r) => this.events.emit('storage_get', k, r))
}
/**
* Inserts the given contact into the routing table and uses it to perform
* a {@link Node#iterativeFindNode} for this node's identity,
* then refreshes all buckets further than it's closest neighbor, which will
* be in the occupied bucket with the lowest index
* @param {module:kdns/contacts~Contact} peer - Peer to bootstrap from
* @returns {Promise<undefined>}
*/
join(peer) {
return this._join(peer);
}
/**
* @private
*/
_join(contact) {
const identity = contact.fingerprint;
return new Promise(async (resolve, reject) => {
this.router.addContactByNodeId(identity, contact);
try {
await this.iterativeFindNode(this.identity.toString('hex'));
await this.refresh(this.router.getClosestBucket() + 1);
} catch (e) {
return reject(e);
}
resolve();
});
}
/**
* Sends a PING message to the supplied contact, resolves with latency.
* @param {module:kdns/contacts~Contact} peer - Peer to PING
* @fires module:kdns/node~Node#message_queued
* @returns {Promise<number>}
*/
ping(contact) {
return this._ping(contact);
}
/**
* @private
*/
_ping(contact) {
return new Promise((resolve, reject) => {
const start = Date.now();
contact = new Contact(contact.address, contact.fingerprint);
this.events.emit('message_queued', 'PING', [this.contact], contact, (err) => {
if (err) {
return reject(err);
}
resolve(Date.now() - start);
});
});
}
/**
* @private
*/
_createStorageItem(value) {
if (typeof value === 'string') {
value = Buffer.from(value);
} else if (Buffer.isBuffer(value)) {
// noop
} else if (typeof value === 'object') {
const keys = Object.keys(value);
const meta = Object.keys(value.meta);
const alreadyHasMetadata = keys.includes('blob') &&
meta.includes('publisher') &&
meta.includes('timestamp');
if (alreadyHasMetadata) {
value.meta.timestamp = Date.now();
value.meta.publisher = value.meta.publisher.toString('hex');
return value;
}
}
return {
blob: value,
meta: {
timestamp: Date.now(),
publisher: this.identity.toString('hex')
}
};
}
/**
* Performs a lookup to collect K contacts nearest to the given key,
* sending a STORE message to each of them. Note that if there is a
* protocol/validation error, you will not receive it as a rejection.
* Be sure to also check that stored > 0 as part of error handling here.
* @param {buffer|string} key - Key to store data under
* @param {module:kdns/node~StoredItem|Buffer} value - Value to store by key
* @fires module:kdns/node~Node#message_queued
* @returns {Promise<number>}
*/
iterativeStore(key, value) {
return this._iterativeStore(key, value);
}
/**
* @private
*/
_iterativeStore(key, value) {
return new Promise(async (resolve, reject) => {
key = key.toString('hex');
let stored = 0;
const _wrapStore = (key, item, target) => {
return new Promise((resolve, reject) => {
target = new Contact(target.address, target.fingerprint);
this.events.emit('message_queued', 'STORE', [key, item, this.contact], target, (e) => {
if (e) {
return reject(e);
}
resolve();
});
});
};
const dispatchStoreRpcs = async (iterator) => {
for (const [, target] of iterator) {
try {
await _wrapStore(key, this._createStorageItem(value), target);
} catch (err) {
continue;
}
stored++;
}
};
const contacts = await this.iterativeFindNode(key);
const entries = contacts.entries();
const workers = new Array(constants.ALPHA).fill(entries).map(dispatchStoreRpcs);
await Promise.allSettled(workers);
if (stored === 0) {
return reject(new Error('Failed to stored entry with peers'));
}
resolve(stored);
});
}
/**
* Basic kademlia lookup operation that builds a set of K contacts closest
* to the given key
* @param {buffer|string} key - Reference key for node lookup
* @fires module:kdns/node~Node#message_queued
* @returns {Promise<module:kdns/contacts~Contact[]>}
*/
iterativeFindNode(key) {
key = key.toString('hex');
return new Promise((resolve, reject) => {
this._iterativeFind('FIND_NODE', key).then(contacts => {
for (let i = 0; i < contacts.length; i++) {
this.router.addContactByNodeId(contacts[i].fingerprint,
new Contact(contacts[i].address, contacts[i].fingerprint));
}
resolve(contacts.map(c => new Contact(c.address, c.fingerprint)));
}, reject);
});
}
/**
* Kademlia search operation that is conducted as a node lookup and builds
* a list of K closest contacts. If at any time during the lookup the value
* is returned, the search is abandoned. If no value is found, the K closest
* contacts are returned. Upon success, we must store the value at the
* nearest node seen during the search that did not return the value.
* @param {buffer|string} key - Key for value lookup
* @fires module:kdns/node~Node#message_queued
* @returns {Promise<module:kdns/contacts~Contact[]|module:kdns/node~StoredItem>}
*/
iterativeFindValue(key) {
key = key.toString('hex');
return this._iterativeFind('FIND_VALUE', key);
}
/**
* Performs a scan of the storage adapter and performs
* republishing/replication of items stored. Items that we did not publish
* ourselves get republished every T_REPLICATE. Items we did publish get
* republished every T_REPUBLISH.
* @fires module:kdns/node~Node#storage_replicate
* @returns {undefined}
*/
replicate() {
this.events.emit('storage_replicate', this._replicate());
}
/**
* @private
*/
_replicate() {
const self = this;
const now = Date.now();
const replicateStream = new WritableStream({
objectMode: true,
write: maybeReplicate
});
function maybeReplicate({ hash, meta, blob }, enc, next) {
const isPublisher = meta.publisher === self.identity.toString('hex');
const republishDue = (meta.timestamp + constants.T_REPUBLISH) <= now;
const replicateDue = (meta.timestamp + constants.T_REPLICATE) <= now;
const shouldRepublish = isPublisher && republishDue;
const shouldReplicate = !isPublisher && replicateDue;
if (shouldReplicate || shouldRepublish) {
return self.iterativeStore(hash, { meta, blob }, next);
}
next();
}
return replicateStream;
}
/**
* Items expire T_EXPIRE seconds after the original publication. All items
* are assigned an expiration time which is "exponentially inversely
* proportional to the number of nodes between the current node and the node
* whose ID is closest to the key", where this number is "inferred from the
* bucket structure of the current node".
* @fires module:kdns/node~Node#storage_expire
* @returns {undefined}
*/
expire() {
this.events.emit('storage_expire', this._expire());
}
/**
* @private
*/
_expire() {
const self = this;
const now = Date.now();
const expireStream = new WritableStream({
objectMode: true,
write: maybeExpire
});
function maybeExpire({ hash, meta, blob }, enc, next) {
if ((meta.timestamp + constants.T_EXPIRE) <= now) {
self.events.emit('storage_delete', hash);
}
next();
}
return expireStream;
}
/**
* If no node lookups have been performed in any given bucket's range for
* T_REFRESH, the node selects a random number in that range and does a
* refresh, an iterativeFindNode using that number as key.
* @param {number} startIndex - bucket index to start refresh from
* @returns {Promise<undefined>}
*/
refresh(startIndex = 0) {
return this._refresh(startIndex);
}
/**
* @private
*/
_refresh(startIndex) {
let now = Date.now();
let indices = [
...this.router.entries()
].slice(startIndex).map((entry) => entry[0]);
// NB: We want to avoid high churn during refresh and prevent further
// NB: refreshes if lookups in the next bucket do not return any new
// NB: contacts. To do this we will shuffle the bucket indexes we are
// NB: going to check and only continue to refresh if new contacts were
// NB: discovered in the last MAX_UNIMPROVED_REFRESHES consecutive lookups.
let results = new Set(), consecutiveUnimprovedLookups = 0;
function isDiscoveringNewContacts() {
return consecutiveUnimprovedLookups < constants.MAX_UNIMPROVED_REFRESHES;
}
return new Promise(async (resolve, reject) => {
indices = knuthShuffle(indices);
for (let i = 0; i < indices.length; i++) {
let index = indices[i];
if (!isDiscoveringNewContacts()) {
return resolve();
}
const lastBucketLookup = this._lookups.get(index) || 0;
const needsRefresh = lastBucketLookup + constants.T_REFRESH <= now;
if (needsRefresh) {
let contacts;
try {
contacts = await this.iterativeFindNode(
keys.getRandomBufferInBucketRange(this.identity, index)
.toString('hex')
);
for (let i = 0; i < contacts.length; i++) {
try { await this._updateContact(contacts[i]); } catch (e) {}
}
} catch (e) {
return reject(e);
}
let discoveredNewContacts = false;
for (let contact of contacts) {
if (!results.has(contact.fingerprint)) {
discoveredNewContacts = true;
consecutiveUnimprovedLookups = 0;
results.add(contact.fingerprint);
}
}
if (!discoveredNewContacts) {
consecutiveUnimprovedLookups++;
}
}
}
resolve();
});
}
/**
* Builds a list of closest contacts for a particular RPC
* @private
*/
_iterativeFind(method, key) {
return new Promise((resolve) => {
let shortlist = new ContactList(key, [
...this.router.getClosestContactsToKey(key, constants.ALPHA)
].map(([,c]) => {
return new Contact(c.address, c.fingerprint)
}));
let closest = shortlist.closest;
this._lookups.set(keys.getBucketIndex(this.identity, key), Date.now());
const _wrapFindRpc = (contact) => {
return new Promise((resolve, reject) => {
contact = new Contact(contact.address, contact.fingerprint);
this.events.emit('message_queued', method, [key, this.contact], contact, (err, result) => {
if (err) {
return reject(err);
}
if (method === 'FIND_NODE') {
result = result.map((n) => {
const c = new Contact(n.address, n.fingerprint);
return c;
});
resolve(result);
} else if (method === 'FIND_VALUE') {
resolve(result);
}
});
});
};
const iterativeLookup = async (selection, continueLookup = true) => {
if (!selection.length) {
return resolve(shortlist.active.slice(0, constants.K));
}
for (let i = 0; i < selection.length; i++) {
const contact = selection[i];
// NB: mark this node as contacted so as to avoid repeats
shortlist.contacted(contact);
let result;
try {
result = await _wrapFindRpc(contact);
} catch (e) {
continue;
}
// NB: mark this node as active to include it in any return values
shortlist.responded(contact);
// NB: If the result is a contact/node list, just keep track of it
// NB: Otherwise, do not proceed with iteration, just callback
if (Array.isArray(result) || method !== 'FIND_VALUE') {
const added = shortlist.add(Array.isArray(result) ? result : []);
// NB: If it wasn't in the shortlist, we haven't added to the
// NB: routing table, so do that now.
for (let i = 0; i < added.length; i++) {
await this._updateContact(added[i]);
}
continue;
}
// NB: If we did get an item back, get the closest node we contacted
// NB: who is missing the value and store a copy with them
const closestMissingValue = new Contact(shortlist.active[0].address,
shortlist.active[0].fingerprint);
if (closestMissingValue) {
this.events.emit('message_queued', 'STORE', [
key,
this._createStorageItem(result),
this.contact
], closestMissingValue, () => null);
}
// NB: we found a value, so stop searching
return resolve(result, contact);
}
// NB: If we have reached at least K active nodes, or haven't found a
// NB: closer node, even on our finishing trip, return to the caller
// NB: the K closest active nodes.
if (shortlist.active.length >= constants.K ||
(closest[0] === shortlist.closest[0] && !continueLookup)
) {
return resolve(shortlist.active.slice(0, constants.K));
}
// NB: we haven't discovered a closer node, call k uncalled nodes and
// NB: finish up
if (closest[0] === shortlist.closest[0]) {
return iterativeLookup(
shortlist.uncontacted.slice(0, constants.K),
false
);
}
closest = shortlist.closest;
// NB: continue the lookup with ALPHA close, uncontacted nodes
iterativeLookup(shortlist.uncontacted.slice(0, constants.ALPHA), true);
};
iterativeLookup(
shortlist.uncontacted.slice(0, constants.ALPHA),
true
);
});
}
/**
* Worker for updating contact in a routing table bucket
* @private
*/
_updateContact(_contact) {
const contact = new Contact(_contact.address, _contact.fingerprint);
const identity = contact.fingerprint.toString('hex');
if (identity === this.identity.toString('hex')) {
return Promise.resolve();
}
const now = Date.now();
const reset = 600000;
const [, bucket, contactIndex] = this.router.addContactByNodeId(
identity,
contact
);
const [headId, headContact] = bucket.head;
const lastPing = this._pings.get(headId);
if (contactIndex !== -1) {
return Promise.resolve();
}
if (lastPing && lastPing.responded && lastPing.timestamp > (now - reset)) {
return Promise.resolve();
}
return new Promise((resolve, reject) => {
this.ping(headContact).then(() => {
this._pings.set(headId, { timestamp: Date.now(), responded: true });
resolve();
}, (e) => {
this._pings.set(headId, { timestamp: Date.now(), responded: false });
this.router.removeContactByNodeId(headId);
this.router.addContactByNodeId(identity, contact);
reject(e);
});
});
}
}
module.exports.Node = Node;