Add additional metrics for streaming (#26945)
parent
fd4f319fd2
commit
c1b1cbb58a
|
@ -152,6 +152,28 @@ const redisConfigFromEnv = (env) => {
|
|||
};
|
||||
};
|
||||
|
||||
const PUBLIC_CHANNELS = [
|
||||
'public',
|
||||
'public:media',
|
||||
'public:local',
|
||||
'public:local:media',
|
||||
'public:remote',
|
||||
'public:remote:media',
|
||||
'hashtag',
|
||||
'hashtag:local',
|
||||
];
|
||||
|
||||
// Used for priming the counters/gauges for the various metrics that are
|
||||
// per-channel
|
||||
const CHANNEL_NAMES = [
|
||||
'system',
|
||||
'user',
|
||||
'user:notification',
|
||||
'list',
|
||||
'direct',
|
||||
...PUBLIC_CHANNELS
|
||||
];
|
||||
|
||||
const startServer = async () => {
|
||||
const app = express();
|
||||
|
||||
|
@ -203,9 +225,6 @@ const startServer = async () => {
|
|||
labelNames: ['type'],
|
||||
});
|
||||
|
||||
connectedClients.set({ type: 'websocket' }, 0);
|
||||
connectedClients.set({ type: 'eventsource' }, 0);
|
||||
|
||||
const connectedChannels = new metrics.Gauge({
|
||||
name: 'connected_channels',
|
||||
help: 'The number of channels the streaming server is streaming to',
|
||||
|
@ -217,6 +236,35 @@ const startServer = async () => {
|
|||
help: 'The number of Redis channels the streaming server is subscribed to',
|
||||
});
|
||||
|
||||
const redisMessagesReceived = new metrics.Counter({
|
||||
name: 'redis_messages_received_total',
|
||||
help: 'The total number of messages the streaming server has received from redis subscriptions'
|
||||
});
|
||||
|
||||
const messagesSent = new metrics.Counter({
|
||||
name: 'messages_sent_total',
|
||||
help: 'The total number of messages the streaming server sent to clients per connection type',
|
||||
labelNames: [ 'type' ]
|
||||
});
|
||||
|
||||
// Prime the gauges so we don't loose metrics between restarts:
|
||||
redisSubscriptions.set(0);
|
||||
connectedClients.set({ type: 'websocket' }, 0);
|
||||
connectedClients.set({ type: 'eventsource' }, 0);
|
||||
|
||||
// For each channel, initialize the gauges at zero; There's only a finite set of channels available
|
||||
CHANNEL_NAMES.forEach(( channel ) => {
|
||||
connectedChannels.set({ type: 'websocket', channel }, 0);
|
||||
connectedChannels.set({ type: 'eventsource', channel }, 0);
|
||||
})
|
||||
|
||||
// Prime the counters so that we don't loose metrics between restarts.
|
||||
// Unfortunately counters don't support the set() API, so instead I'm using
|
||||
// inc(0) to achieve the same result.
|
||||
redisMessagesReceived.inc(0);
|
||||
messagesSent.inc({ type: 'websocket' }, 0);
|
||||
messagesSent.inc({ type: 'eventsource' }, 0);
|
||||
|
||||
// When checking metrics in the browser, the favicon is requested this
|
||||
// prevents the request from falling through to the API Router, which would
|
||||
// error for this endpoint:
|
||||
|
@ -262,6 +310,8 @@ const startServer = async () => {
|
|||
* @param {string} message
|
||||
*/
|
||||
const onRedisMessage = (channel, message) => {
|
||||
redisMessagesReceived.inc();
|
||||
|
||||
const callbacks = subs[channel];
|
||||
|
||||
log.silly(`New message on channel ${redisPrefix}${channel}`);
|
||||
|
@ -490,17 +540,6 @@ const startServer = async () => {
|
|||
}
|
||||
};
|
||||
|
||||
const PUBLIC_CHANNELS = [
|
||||
'public',
|
||||
'public:media',
|
||||
'public:local',
|
||||
'public:local:media',
|
||||
'public:remote',
|
||||
'public:remote:media',
|
||||
'hashtag',
|
||||
'hashtag:local',
|
||||
];
|
||||
|
||||
/**
|
||||
* @param {any} req
|
||||
* @param {string|undefined} channelName
|
||||
|
@ -705,10 +744,11 @@ const startServer = async () => {
|
|||
* @param {any} req
|
||||
* @param {function(string, string): void} output
|
||||
* @param {undefined | function(string[], SubscriptionListener): void} attachCloseHandler
|
||||
* @param {'websocket' | 'eventsource'} destinationType
|
||||
* @param {boolean=} needsFiltering
|
||||
* @returns {SubscriptionListener}
|
||||
*/
|
||||
const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false) => {
|
||||
const streamFrom = (ids, req, output, attachCloseHandler, destinationType, needsFiltering = false) => {
|
||||
const accountId = req.accountId || req.remoteAddress;
|
||||
|
||||
log.verbose(req.requestId, `Starting stream from ${ids.join(', ')} for ${accountId}`);
|
||||
|
@ -717,6 +757,8 @@ const startServer = async () => {
|
|||
// TODO: Replace "string"-based delete payloads with object payloads:
|
||||
const encodedPayload = typeof payload === 'object' ? JSON.stringify(payload) : payload;
|
||||
|
||||
messagesSent.labels({ type: destinationType }).inc(1);
|
||||
|
||||
log.silly(req.requestId, `Transmitting for ${accountId}: ${event} ${encodedPayload}`);
|
||||
output(event, encodedPayload);
|
||||
};
|
||||
|
@ -1031,7 +1073,7 @@ const startServer = async () => {
|
|||
const onSend = streamToHttp(req, res);
|
||||
const onEnd = streamHttpEnd(req, subscriptionHeartbeat(channelIds));
|
||||
|
||||
streamFrom(channelIds, req, onSend, onEnd, options.needsFiltering);
|
||||
streamFrom(channelIds, req, onSend, onEnd, 'eventsource', options.needsFiltering);
|
||||
}).catch(err => {
|
||||
log.verbose(req.requestId, 'Subscription error:', err.toString());
|
||||
httpNotFound(res);
|
||||
|
@ -1241,7 +1283,7 @@ const startServer = async () => {
|
|||
|
||||
const onSend = streamToWs(request, socket, streamNameFromChannelName(channelName, params));
|
||||
const stopHeartbeat = subscriptionHeartbeat(channelIds);
|
||||
const listener = streamFrom(channelIds, request, onSend, undefined, options.needsFiltering);
|
||||
const listener = streamFrom(channelIds, request, onSend, undefined, 'websocket', options.needsFiltering);
|
||||
|
||||
connectedChannels.labels({ type: 'websocket', channel: channelName }).inc();
|
||||
|
||||
|
|
Loading…
Reference in New Issue