Fix performance of streaming by parsing message JSON once (#25278)
parent
4aa1c4e2ad
commit
6f819c7071
|
@ -91,18 +91,31 @@ const redisUrlToClient = async (defaultConfig, redisUrl) => {
|
|||
const numWorkers = +process.env.STREAMING_CLUSTER_NUM || (env === 'development' ? 1 : Math.max(os.cpus().length - 1, 1));
|
||||
|
||||
/**
|
||||
* Attempts to safely parse a string as JSON, used when both receiving a message
|
||||
* from redis and when receiving a message from a client over a websocket
|
||||
* connection, this is why it accepts a `req` argument.
|
||||
* @param {string} json
|
||||
* @param {any} req
|
||||
* @return {Object.<string, any>|null}
|
||||
* @param {any?} req
|
||||
* @returns {Object.<string, any>|null}
|
||||
*/
|
||||
const parseJSON = (json, req) => {
|
||||
try {
|
||||
return JSON.parse(json);
|
||||
} catch (err) {
|
||||
if (req.accountId) {
|
||||
log.warn(req.requestId, `Error parsing message from user ${req.accountId}: ${err}`);
|
||||
/* FIXME: This logging isn't great, and should probably be done at the
|
||||
* call-site of parseJSON, not in the method, but this would require changing
|
||||
* the signature of parseJSON to return something akin to a Result type:
|
||||
* [Error|null, null|Object<string,any}], and then handling the error
|
||||
* scenarios.
|
||||
*/
|
||||
if (req) {
|
||||
if (req.accountId) {
|
||||
log.warn(req.requestId, `Error parsing message from user ${req.accountId}: ${err}`);
|
||||
} else {
|
||||
log.silly(req.requestId, `Error parsing message from ${req.remoteAddress}: ${err}`);
|
||||
}
|
||||
} else {
|
||||
log.silly(req.requestId, `Error parsing message from ${req.remoteAddress}: ${err}`);
|
||||
log.warn(`Error parsing message from redis: ${err}`);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
@ -168,7 +181,7 @@ const startWorker = async (workerId) => {
|
|||
const redisPrefix = redisNamespace ? `${redisNamespace}:` : '';
|
||||
|
||||
/**
|
||||
* @type {Object.<string, Array.<function(string): void>>}
|
||||
* @type {Object.<string, Array.<function(Object<string, any>): void>>}
|
||||
*/
|
||||
const subs = {};
|
||||
|
||||
|
@ -208,7 +221,10 @@ const startWorker = async (workerId) => {
|
|||
return;
|
||||
}
|
||||
|
||||
callbacks.forEach(callback => callback(message));
|
||||
const json = parseJSON(message, null);
|
||||
if (!json) return;
|
||||
|
||||
callbacks.forEach(callback => callback(json));
|
||||
};
|
||||
|
||||
/**
|
||||
|
@ -230,6 +246,7 @@ const startWorker = async (workerId) => {
|
|||
|
||||
/**
|
||||
* @param {string} channel
|
||||
* @param {function(Object<string, any>): void} callback
|
||||
*/
|
||||
const unsubscribe = (channel, callback) => {
|
||||
log.silly(`Removing listener for ${channel}`);
|
||||
|
@ -379,7 +396,7 @@ const startWorker = async (workerId) => {
|
|||
|
||||
/**
|
||||
* @param {any} req
|
||||
* @return {string}
|
||||
* @returns {string|undefined}
|
||||
*/
|
||||
const channelNameFromPath = req => {
|
||||
const { path, query } = req;
|
||||
|
@ -488,15 +505,11 @@ const startWorker = async (workerId) => {
|
|||
/**
|
||||
* @param {any} req
|
||||
* @param {SystemMessageHandlers} eventHandlers
|
||||
* @return {function(string): void}
|
||||
* @returns {function(object): void}
|
||||
*/
|
||||
const createSystemMessageListener = (req, eventHandlers) => {
|
||||
return message => {
|
||||
const json = parseJSON(message, req);
|
||||
|
||||
if (!json) return;
|
||||
|
||||
const { event } = json;
|
||||
const { event } = message;
|
||||
|
||||
log.silly(req.requestId, `System message for ${req.accountId}: ${event}`);
|
||||
|
||||
|
@ -607,19 +620,16 @@ const startWorker = async (workerId) => {
|
|||
* @param {function(string, string): void} output
|
||||
* @param {function(string[], function(string): void): void} attachCloseHandler
|
||||
* @param {boolean=} needsFiltering
|
||||
* @return {function(string): void}
|
||||
* @returns {function(object): void}
|
||||
*/
|
||||
const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false) => {
|
||||
const accountId = req.accountId || req.remoteAddress;
|
||||
|
||||
log.verbose(req.requestId, `Starting stream from ${ids.join(', ')} for ${accountId}`);
|
||||
|
||||
// Currently message is of type string, soon it'll be Record<string, any>
|
||||
const listener = message => {
|
||||
const json = parseJSON(message, req);
|
||||
|
||||
if (!json) return;
|
||||
|
||||
const { event, payload, queued_at } = json;
|
||||
const { event, payload, queued_at } = message;
|
||||
|
||||
const transmit = () => {
|
||||
const now = new Date().getTime();
|
||||
|
@ -1078,8 +1088,15 @@ const startWorker = async (workerId) => {
|
|||
ws.on('close', onEnd);
|
||||
ws.on('error', onEnd);
|
||||
|
||||
ws.on('message', data => {
|
||||
const json = parseJSON(data, session.request);
|
||||
ws.on('message', (data, isBinary) => {
|
||||
if (isBinary) {
|
||||
log.debug('Received binary data, closing connection');
|
||||
ws.close(1003, 'The mastodon streaming server does not support binary messages');
|
||||
return;
|
||||
}
|
||||
const message = data.toString('utf8');
|
||||
|
||||
const json = parseJSON(message, session.request);
|
||||
|
||||
if (!json) return;
|
||||
|
||||
|
|
Loading…
Reference in New Issue