Fix: Streaming server memory leak in HTTP EventSource cleanup (#26228)

lolsob-rspec
Emelia Smith 2023-07-28 12:06:29 +02:00 committed by GitHub
parent afb246aff7
commit 813acdb431
1 changed files with 17 additions and 9 deletions

View File

@ -222,9 +222,15 @@ const startServer = async () => {
callbacks.forEach(callback => callback(json)); callbacks.forEach(callback => callback(json));
}; };
/**
* @callback SubscriptionListener
* @param {ReturnType<parseJSON>} json of the message
* @returns void
*/
/** /**
* @param {string} channel * @param {string} channel
* @param {function(string): void} callback * @param {SubscriptionListener} callback
*/ */
const subscribe = (channel, callback) => { const subscribe = (channel, callback) => {
log.silly(`Adding listener for ${channel}`); log.silly(`Adding listener for ${channel}`);
@ -241,7 +247,7 @@ const startServer = async () => {
/** /**
* @param {string} channel * @param {string} channel
* @param {function(Object<string, any>): void} callback * @param {SubscriptionListener} callback
*/ */
const unsubscribe = (channel, callback) => { const unsubscribe = (channel, callback) => {
log.silly(`Removing listener for ${channel}`); log.silly(`Removing listener for ${channel}`);
@ -613,9 +619,9 @@ const startServer = async () => {
* @param {string[]} ids * @param {string[]} ids
* @param {any} req * @param {any} req
* @param {function(string, string): void} output * @param {function(string, string): void} output
* @param {function(string[], function(string): void): void} attachCloseHandler * @param {undefined | function(string[], SubscriptionListener): void} attachCloseHandler
* @param {boolean=} needsFiltering * @param {boolean=} needsFiltering
* @returns {function(object): void} * @returns {SubscriptionListener}
*/ */
const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false) => { const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false) => {
const accountId = req.accountId || req.remoteAddress; const accountId = req.accountId || req.remoteAddress;
@ -633,6 +639,7 @@ const startServer = async () => {
// The listener used to process each message off the redis subscription, // The listener used to process each message off the redis subscription,
// message here is an object with an `event` and `payload` property. Some // message here is an object with an `event` and `payload` property. Some
// events also include a queued_at value, but this is being removed shortly. // events also include a queued_at value, but this is being removed shortly.
/** @type {SubscriptionListener} */
const listener = message => { const listener = message => {
const { event, payload } = message; const { event, payload } = message;
@ -825,7 +832,7 @@ const startServer = async () => {
subscribe(`${redisPrefix}${id}`, listener); subscribe(`${redisPrefix}${id}`, listener);
}); });
if (attachCloseHandler) { if (typeof attachCloseHandler === 'function') {
attachCloseHandler(ids.map(id => `${redisPrefix}${id}`), listener); attachCloseHandler(ids.map(id => `${redisPrefix}${id}`), listener);
} }
@ -862,12 +869,13 @@ const startServer = async () => {
/** /**
* @param {any} req * @param {any} req
* @param {function(): void} [closeHandler] * @param {function(): void} [closeHandler]
* @returns {function(string[]): void} * @returns {function(string[], SubscriptionListener): void}
*/ */
const streamHttpEnd = (req, closeHandler = undefined) => (ids) => {
const streamHttpEnd = (req, closeHandler = undefined) => (ids, listener) => {
req.on('close', () => { req.on('close', () => {
ids.forEach(id => { ids.forEach(id => {
unsubscribe(id); unsubscribe(id, listener);
}); });
if (closeHandler) { if (closeHandler) {
@ -1131,7 +1139,7 @@ const startServer = async () => {
* @typedef WebSocketSession * @typedef WebSocketSession
* @property {any} socket * @property {any} socket
* @property {any} request * @property {any} request
* @property {Object.<string, { listener: function(string): void, stopHeartbeat: function(): void }>} subscriptions * @property {Object.<string, { listener: SubscriptionListener, stopHeartbeat: function(): void }>} subscriptions
*/ */
/** /**