Finish update of node-redis (#17183)

* fix streaming redis client

* use console.error instead of console.log

* follow node-redis migration guide

https://github.com/redis/node-redis/blob/master/docs/v3-to-v4.md

* fix config options for node-redis

* keep indentation

* Update streaming/index.js

Co-authored-by: Yamagishi Kazutoshi <ykzts@desire.sh>

Co-authored-by: Yamagishi Kazutoshi <ykzts@desire.sh>
rebase/4.0.0rc2
Lerk 2021-12-25 21:55:06 +00:00 committed by GitHub
parent fad37dd1bc
commit 4d1eaf3e6e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 61 additions and 73 deletions

View File

@ -63,20 +63,29 @@ const dbUrlToConfig = (dbUrl) => {
* @param {Object.<string, any>} defaultConfig * @param {Object.<string, any>} defaultConfig
* @param {string} redisUrl * @param {string} redisUrl
*/ */
const redisUrlToClient = (defaultConfig, redisUrl) => { const redisUrlToClient = async (defaultConfig, redisUrl) => {
const config = defaultConfig; const config = defaultConfig;
let client;
if (!redisUrl) { if (!redisUrl) {
return redis.createClient(config); client = redis.createClient(config);
} else if (redisUrl.startsWith('unix://')) {
client = redis.createClient(Object.assign(config, {
socket: {
path: redisUrl.slice(7),
},
}));
} else {
client = redis.createClient(Object.assign(config, {
url: redisUrl,
}));
} }
if (redisUrl.startsWith('unix://')) { client.on('error', (err) => log.error('Redis Client Error!', err));
return redis.createClient(redisUrl.slice(7), config); await client.connect();
}
return redis.createClient(Object.assign(config, { return client;
url: redisUrl,
}));
}; };
const numWorkers = +process.env.STREAMING_CLUSTER_NUM || (env === 'development' ? 1 : Math.max(os.cpus().length - 1, 1)); const numWorkers = +process.env.STREAMING_CLUSTER_NUM || (env === 'development' ? 1 : Math.max(os.cpus().length - 1, 1));
@ -102,7 +111,7 @@ const startMaster = () => {
log.warn(`Starting streaming API server master with ${numWorkers} workers`); log.warn(`Starting streaming API server master with ${numWorkers} workers`);
}; };
const startWorker = (workerId) => { const startWorker = async (workerId) => {
log.warn(`Starting worker ${workerId}`); log.warn(`Starting worker ${workerId}`);
const pgConfigs = { const pgConfigs = {
@ -127,7 +136,7 @@ const startWorker = (workerId) => {
if (!!process.env.DB_SSLMODE && process.env.DB_SSLMODE !== 'disable') { if (!!process.env.DB_SSLMODE && process.env.DB_SSLMODE !== 'disable') {
pgConfigs.development.ssl = true; pgConfigs.development.ssl = true;
pgConfigs.production.ssl = true; pgConfigs.production.ssl = true;
} }
const app = express(); const app = express();
@ -139,9 +148,11 @@ const startWorker = (workerId) => {
const redisNamespace = process.env.REDIS_NAMESPACE || null; const redisNamespace = process.env.REDIS_NAMESPACE || null;
const redisParams = { const redisParams = {
host: process.env.REDIS_HOST || '127.0.0.1', socket: {
port: process.env.REDIS_PORT || 6379, host: process.env.REDIS_HOST || '127.0.0.1',
db: process.env.REDIS_DB || 0, port: process.env.REDIS_PORT || 6379,
},
database: process.env.REDIS_DB || 0,
password: process.env.REDIS_PASSWORD || undefined, password: process.env.REDIS_PASSWORD || undefined,
}; };
@ -151,25 +162,8 @@ const startWorker = (workerId) => {
const redisPrefix = redisNamespace ? `${redisNamespace}:` : ''; const redisPrefix = redisNamespace ? `${redisNamespace}:` : '';
const redisSubscribeClient = redisUrlToClient(redisParams, process.env.REDIS_URL); const redisSubscribeClient = await redisUrlToClient(redisParams, process.env.REDIS_URL);
const redisClient = redisUrlToClient(redisParams, process.env.REDIS_URL); const redisClient = await redisUrlToClient(redisParams, process.env.REDIS_URL);
/**
* @type {Object.<string, Array.<function(string): void>>}
*/
const subs = {};
redisSubscribeClient.on('message', (channel, message) => {
const callbacks = subs[channel];
log.silly(`New message on channel ${channel}`);
if (!callbacks) {
return;
}
callbacks.forEach(callback => callback(message));
});
/** /**
* @param {string[]} channels * @param {string[]} channels
@ -197,34 +191,16 @@ const startWorker = (workerId) => {
*/ */
const subscribe = (channel, callback) => { const subscribe = (channel, callback) => {
log.silly(`Adding listener for ${channel}`); log.silly(`Adding listener for ${channel}`);
subs[channel] = subs[channel] || [];
if (subs[channel].length === 0) { redisSubscribeClient.subscribe(channel, callback);
log.verbose(`Subscribe ${channel}`);
redisSubscribeClient.subscribe(channel);
}
subs[channel].push(callback);
}; };
/** /**
* @param {string} channel * @param {string} channel
* @param {function(string): void} callback
*/ */
const unsubscribe = (channel, callback) => { const unsubscribe = (channel) => {
log.silly(`Removing listener for ${channel}`);
if (!subs[channel]) { redisSubscribeClient.unsubscribe(channel);
return;
}
subs[channel] = subs[channel].filter(item => item !== callback);
if (subs[channel].length === 0) {
log.verbose(`Unsubscribe ${channel}`);
redisSubscribeClient.unsubscribe(channel);
delete subs[channel];
}
}; };
const FALSE_VALUES = [ const FALSE_VALUES = [
@ -365,7 +341,7 @@ const startWorker = (workerId) => {
const { path, query } = req; const { path, query } = req;
const onlyMedia = isTruthy(query.only_media); const onlyMedia = isTruthy(query.only_media);
switch(path) { switch (path) {
case '/api/v1/streaming/user': case '/api/v1/streaming/user':
return 'user'; return 'user';
case '/api/v1/streaming/user/notification': case '/api/v1/streaming/user/notification':
@ -496,7 +472,7 @@ const startWorker = (workerId) => {
const listener = createSystemMessageListener(req, { const listener = createSystemMessageListener(req, {
onKill () { onKill() {
res.end(); res.end();
}, },
@ -548,7 +524,7 @@ const startWorker = (workerId) => {
}; };
/** /**
* @param {array} * @param {array} arr
* @param {number=} shift * @param {number=} shift
* @return {string} * @return {string}
*/ */
@ -590,7 +566,7 @@ const startWorker = (workerId) => {
* @return {function(string): void} * @return {function(string): void}
*/ */
const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false) => { const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false) => {
const accountId = req.accountId || req.remoteAddress; const accountId = req.accountId || req.remoteAddress;
log.verbose(req.requestId, `Starting stream from ${ids.join(', ')} for ${accountId}`); log.verbose(req.requestId, `Starting stream from ${ids.join(', ')} for ${accountId}`);
@ -602,8 +578,8 @@ const startWorker = (workerId) => {
const { event, payload, queued_at } = json; const { event, payload, queued_at } = json;
const transmit = () => { const transmit = () => {
const now = new Date().getTime(); const now = new Date().getTime();
const delta = now - queued_at; const delta = now - queued_at;
const encodedPayload = typeof payload === 'object' ? JSON.stringify(payload) : payload; const encodedPayload = typeof payload === 'object' ? JSON.stringify(payload) : payload;
log.silly(req.requestId, `Transmitting for ${accountId}: ${event} ${encodedPayload} Delay: ${delta}ms`); log.silly(req.requestId, `Transmitting for ${accountId}: ${event} ${encodedPayload} Delay: ${delta}ms`);
@ -617,9 +593,9 @@ const startWorker = (workerId) => {
return; return;
} }
const unpackedPayload = payload; const unpackedPayload = payload;
const targetAccountIds = [unpackedPayload.account.id].concat(unpackedPayload.mentions.map(item => item.id)); const targetAccountIds = [unpackedPayload.account.id].concat(unpackedPayload.mentions.map(item => item.id));
const accountDomain = unpackedPayload.account.acct.split('@')[1]; const accountDomain = unpackedPayload.account.acct.split('@')[1];
if (Array.isArray(req.chosenLanguages) && unpackedPayload.language !== null && req.chosenLanguages.indexOf(unpackedPayload.language) === -1) { if (Array.isArray(req.chosenLanguages) && unpackedPayload.language !== null && req.chosenLanguages.indexOf(unpackedPayload.language) === -1) {
log.silly(req.requestId, `Message ${unpackedPayload.id} filtered by language (${unpackedPayload.language})`); log.silly(req.requestId, `Message ${unpackedPayload.id} filtered by language (${unpackedPayload.language})`);
@ -639,7 +615,15 @@ const startWorker = (workerId) => {
} }
const queries = [ const queries = [
client.query(`SELECT 1 FROM blocks WHERE (account_id = $1 AND target_account_id IN (${placeholders(targetAccountIds, 2)})) OR (account_id = $2 AND target_account_id = $1) UNION SELECT 1 FROM mutes WHERE account_id = $1 AND target_account_id IN (${placeholders(targetAccountIds, 2)})`, [req.accountId, unpackedPayload.account.id].concat(targetAccountIds)), client.query(`SELECT 1
FROM blocks
WHERE (account_id = $1 AND target_account_id IN (${placeholders(targetAccountIds, 2)}))
OR (account_id = $2 AND target_account_id = $1)
UNION
SELECT 1
FROM mutes
WHERE account_id = $1
AND target_account_id IN (${placeholders(targetAccountIds, 2)})`, [req.accountId, unpackedPayload.account.id].concat(targetAccountIds)),
]; ];
if (accountDomain) { if (accountDomain) {
@ -702,12 +686,12 @@ const startWorker = (workerId) => {
/** /**
* @param {any} req * @param {any} req
* @param {function(): void} [closeHandler] * @param {function(): void} [closeHandler]
* @return {function(string[], function(string): void)} * @return {function(string[]): void}
*/ */
const streamHttpEnd = (req, closeHandler = undefined) => (ids, listener) => { const streamHttpEnd = (req, closeHandler = undefined) => (ids) => {
req.on('close', () => { req.on('close', () => {
ids.forEach(id => { ids.forEach(id => {
unsubscribe(id, listener); unsubscribe(id);
}); });
if (closeHandler) { if (closeHandler) {
@ -754,7 +738,7 @@ const startWorker = (workerId) => {
app.get('/api/v1/streaming/*', (req, res) => { app.get('/api/v1/streaming/*', (req, res) => {
channelNameToIds(req, channelNameFromPath(req), req.query).then(({ channelIds, options }) => { channelNameToIds(req, channelNameFromPath(req), req.query).then(({ channelIds, options }) => {
const onSend = streamToHttp(req, res); const onSend = streamToHttp(req, res);
const onEnd = streamHttpEnd(req, subscriptionHeartbeat(channelIds)); const onEnd = streamHttpEnd(req, subscriptionHeartbeat(channelIds));
streamFrom(channelIds, req, onSend, onEnd, options.needsFiltering); streamFrom(channelIds, req, onSend, onEnd, options.needsFiltering);
}).catch(err => { }).catch(err => {
@ -797,7 +781,7 @@ const startWorker = (workerId) => {
* @return {Promise.<{ channelIds: string[], options: { needsFiltering: boolean } }>} * @return {Promise.<{ channelIds: string[], options: { needsFiltering: boolean } }>}
*/ */
const channelNameToIds = (req, name, params) => new Promise((resolve, reject) => { const channelNameToIds = (req, name, params) => new Promise((resolve, reject) => {
switch(name) { switch (name) {
case 'user': case 'user':
resolve({ resolve({
channelIds: channelsForUserStream(req), channelIds: channelsForUserStream(req),
@ -927,14 +911,17 @@ const startWorker = (workerId) => {
* @param {StreamParams} params * @param {StreamParams} params
*/ */
const subscribeWebsocketToChannel = ({ socket, request, subscriptions }, channelName, params) => const subscribeWebsocketToChannel = ({ socket, request, subscriptions }, channelName, params) =>
checkScopes(request, channelName).then(() => channelNameToIds(request, channelName, params)).then(({ channelIds, options }) => { checkScopes(request, channelName).then(() => channelNameToIds(request, channelName, params)).then(({
channelIds,
options,
}) => {
if (subscriptions[channelIds.join(';')]) { if (subscriptions[channelIds.join(';')]) {
return; return;
} }
const onSend = streamToWs(request, socket, streamNameFromChannelName(channelName, params)); const onSend = streamToWs(request, socket, streamNameFromChannelName(channelName, params));
const stopHeartbeat = subscriptionHeartbeat(channelIds); const stopHeartbeat = subscriptionHeartbeat(channelIds);
const listener = streamFrom(channelIds, request, onSend, undefined, options.needsFiltering); const listener = streamFrom(channelIds, request, onSend, undefined, options.needsFiltering);
subscriptions[channelIds.join(';')] = { subscriptions[channelIds.join(';')] = {
listener, listener,
@ -982,7 +969,7 @@ const startWorker = (workerId) => {
const listener = createSystemMessageListener(request, { const listener = createSystemMessageListener(request, {
onKill () { onKill() {
socket.close(); socket.close();
}, },
@ -992,7 +979,8 @@ const startWorker = (workerId) => {
subscriptions[systemChannelId] = { subscriptions[systemChannelId] = {
listener, listener,
stopHeartbeat: () => {}, stopHeartbeat: () => {
},
}; };
}; };
@ -1011,7 +999,7 @@ const startWorker = (workerId) => {
wss.on('connection', (ws, req) => { wss.on('connection', (ws, req) => {
const location = url.parse(req.url, true); const location = url.parse(req.url, true);
req.requestId = uuid.v4(); req.requestId = uuid.v4();
req.remoteAddress = ws._socket.remoteAddress; req.remoteAddress = ws._socket.remoteAddress;
ws.isAlive = true; ws.isAlive = true;