Execute PushUpdateWorker only for accounts who uses StreamingAPI just now. (#3278)

* Add redis key "subscribed:timeline:#{account.id}" to indicate active streaming API listeners exists.

* Add endpoint for notification only stream.

* Run PushUpdateWorker only for users uses Streaming API now.

* Move close hander streamTo(Http/Ws) -> stream(Http/Ws)End (Deal with #3370)

* Add stream type for stream start log message.
rebase/4.0.0rc2
Clworld 2017-06-04 03:50:53 +09:00 committed by Eugen Rochko
parent 8a597f0138
commit dab8fc4584
2 changed files with 49 additions and 9 deletions

View File

@ -34,7 +34,7 @@ class FeedManager
trim(timeline_type, account.id) trim(timeline_type, account.id)
end end
PushUpdateWorker.perform_async(account.id, status.id) PushUpdateWorker.perform_async(account.id, status.id) if push_update_required?(timeline_type, account.id)
end end
def trim(type, account_id) def trim(type, account_id)
@ -43,6 +43,10 @@ class FeedManager
redis.zremrangebyscore(key(type, account_id), '-inf', "(#{last.last}") redis.zremrangebyscore(key(type, account_id), '-inf', "(#{last.last}")
end end
def push_update_required?(timeline_type, account_id)
timeline_type != :home || redis.get("subscribed:timeline:#{account_id}").present?
end
def merge_into_timeline(from_account, into_account) def merge_into_timeline(from_account, into_account)
timeline_key = key(:home, into_account.id) timeline_key = key(:home, into_account.id)
query = from_account.statuses.limit(FeedManager::MAX_ITEMS / 4) query = from_account.statuses.limit(FeedManager::MAX_ITEMS / 4)

View File

@ -110,11 +110,12 @@ const startWorker = (workerId) => {
const redisPrefix = redisNamespace ? `${redisNamespace}:` : ''; const redisPrefix = redisNamespace ? `${redisNamespace}:` : '';
const redisSubscribeClient = redisUrlToClient(redisParams, process.env.REDIS_URL);
const redisClient = redisUrlToClient(redisParams, process.env.REDIS_URL); const redisClient = redisUrlToClient(redisParams, process.env.REDIS_URL);
const subs = {}; const subs = {};
redisClient.on('pmessage', (_, channel, message) => { redisSubscribeClient.on('pmessage', (_, channel, message) => {
const callbacks = subs[channel]; const callbacks = subs[channel];
log.silly(`New message on channel ${channel}`); log.silly(`New message on channel ${channel}`);
@ -126,7 +127,19 @@ const startWorker = (workerId) => {
callbacks.forEach(callback => callback(message)); callbacks.forEach(callback => callback(message));
}); });
redisClient.psubscribe(`${redisPrefix}timeline:*`); redisSubscribeClient.psubscribe(`${redisPrefix}timeline:*`);
const subscriptionHeartbeat = (channel) => {
const interval = 6*60;
const tellSubscribed = () => {
redisClient.set(`${redisPrefix}subscribed:${channel}`, '1', 'EX', interval*3);
};
tellSubscribed();
const heartbeat = setInterval(tellSubscribed, interval*1000);
return () => {
clearInterval(heartbeat);
};
};
const subscribe = (channel, callback) => { const subscribe = (channel, callback) => {
log.silly(`Adding listener for ${channel}`); log.silly(`Adding listener for ${channel}`);
@ -231,8 +244,9 @@ const startWorker = (workerId) => {
const placeholders = (arr, shift = 0) => arr.map((_, i) => `$${i + 1 + shift}`).join(', '); const placeholders = (arr, shift = 0) => arr.map((_, i) => `$${i + 1 + shift}`).join(', ');
const streamFrom = (id, req, output, attachCloseHandler, needsFiltering = false) => { const streamFrom = (id, req, output, attachCloseHandler, needsFiltering = false, notificationOnly = false) => {
log.verbose(req.requestId, `Starting stream from ${id} for ${req.accountId}`); const streamType = notificationOnly ? ' (notification)' : '';
log.verbose(req.requestId, `Starting stream from ${id} for ${req.accountId}${streamType}`);
const listener = message => { const listener = message => {
const { event, payload, queued_at } = JSON.parse(message); const { event, payload, queued_at } = JSON.parse(message);
@ -245,6 +259,10 @@ const startWorker = (workerId) => {
output(event, payload); output(event, payload);
}; };
if (notificationOnly && event !== 'notification') {
return;
}
// Only messages that may require filtering are statuses, since notifications // Only messages that may require filtering are statuses, since notifications
// are already personalized and deletes do not matter // are already personalized and deletes do not matter
if (needsFiltering && event === 'update') { if (needsFiltering && event === 'update') {
@ -313,9 +331,12 @@ const startWorker = (workerId) => {
}; };
// Setup stream end for HTTP // Setup stream end for HTTP
const streamHttpEnd = req => (id, listener) => { const streamHttpEnd = (req, closeHandler = false) => (id, listener) => {
req.on('close', () => { req.on('close', () => {
unsubscribe(id, listener); unsubscribe(id, listener);
if (closeHandler) {
closeHandler();
}
}); });
}; };
@ -330,15 +351,21 @@ const startWorker = (workerId) => {
}; };
// Setup stream end for WebSockets // Setup stream end for WebSockets
const streamWsEnd = (req, ws) => (id, listener) => { const streamWsEnd = (req, ws, closeHandler = false) => (id, listener) => {
ws.on('close', () => { ws.on('close', () => {
log.verbose(req.requestId, `Ending stream for ${req.accountId}`); log.verbose(req.requestId, `Ending stream for ${req.accountId}`);
unsubscribe(id, listener); unsubscribe(id, listener);
if (closeHandler) {
closeHandler();
}
}); });
ws.on('error', e => { ws.on('error', e => {
log.verbose(req.requestId, `Ending stream for ${req.accountId}`); log.verbose(req.requestId, `Ending stream for ${req.accountId}`);
unsubscribe(id, listener); unsubscribe(id, listener);
if (closeHandler) {
closeHandler();
}
}); });
}; };
@ -348,7 +375,12 @@ const startWorker = (workerId) => {
app.use(errorMiddleware); app.use(errorMiddleware);
app.get('/api/v1/streaming/user', (req, res) => { app.get('/api/v1/streaming/user', (req, res) => {
streamFrom(`timeline:${req.accountId}`, req, streamToHttp(req, res), streamHttpEnd(req)); const channel = `timeline:${req.accountId}`;
streamFrom(channel, req, streamToHttp(req, res), streamHttpEnd(req, subscriptionHeartbeat(channel)));
});
app.get('/api/v1/streaming/user/notification', (req, res) => {
streamFrom(`timeline:${req.accountId}`, req, streamToHttp(req, res), streamHttpEnd(req), false, true);
}); });
app.get('/api/v1/streaming/public', (req, res) => { app.get('/api/v1/streaming/public', (req, res) => {
@ -382,7 +414,11 @@ const startWorker = (workerId) => {
switch(location.query.stream) { switch(location.query.stream) {
case 'user': case 'user':
streamFrom(`timeline:${req.accountId}`, req, streamToWs(req, ws), streamWsEnd(req, ws)); const channel = `timeline:${req.accountId}`;
streamFrom(channel, req, streamToWs(req, ws), streamWsEnd(req, ws, subscriptionHeartbeat(channel)));
break;
case 'user:notification':
streamFrom(`timeline:${req.accountId}`, req, streamToWs(req, ws), streamWsEnd(req, ws), false, true);
break; break;
case 'public': case 'public':
streamFrom('timeline:public', req, streamToWs(req, ws), streamWsEnd(req, ws), true); streamFrom('timeline:public', req, streamToWs(req, ws), streamWsEnd(req, ws), true);