diff --git a/streaming/errors.js b/streaming/errors.js new file mode 100644 index 00000000000..9a641180ba4 --- /dev/null +++ b/streaming/errors.js @@ -0,0 +1,51 @@ +// @ts-check + +/** + * Typed as a string because otherwise it's a const string, which means we can't + * override it in let statements. + * @type {string} + */ +const UNEXPECTED_ERROR_MESSAGE = 'An unexpected error occurred'; +exports.UNKNOWN_ERROR_MESSAGE = UNEXPECTED_ERROR_MESSAGE; + +/** + * Extracts the status and message properties from the error object, if + * available for public use. The `unknown` is for catch statements + * @param {Error | AuthenticationError | RequestError | unknown} err + */ +exports.extractStatusAndMessage = function(err) { + let statusCode = 500; + let errorMessage = UNEXPECTED_ERROR_MESSAGE; + if (err instanceof AuthenticationError || err instanceof RequestError) { + statusCode = err.status; + errorMessage = err.message; + } + + return { statusCode, errorMessage }; +}; + +class RequestError extends Error { + /** + * @param {string} message + */ + constructor(message) { + super(message); + this.name = "RequestError"; + this.status = 400; + } +} + +exports.RequestError = RequestError; + +class AuthenticationError extends Error { + /** + * @param {string} message + */ + constructor(message) { + super(message); + this.name = "AuthenticationError"; + this.status = 401; + } +} + +exports.AuthenticationError = AuthenticationError; diff --git a/streaming/index.js b/streaming/index.js index 6945a9ae7da..1c312ebd70b 100644 --- a/streaming/index.js +++ b/streaming/index.js @@ -14,6 +14,8 @@ const pg = require('pg'); const dbUrlToConfig = require('pg-connection-string').parse; const WebSocket = require('ws'); +const errors = require('./errors'); +const { AuthenticationError, RequestError } = require('./errors'); const { logger, httpLogger, initializeLogLevel, attachWebsocketHttpLogger, createWebsocketLogger } = require('./logging'); const { setupMetrics } = require('./metrics'); const { isTruthy, normalizeHashtag, firstParam } = require("./utils"); @@ -324,7 +326,7 @@ const startServer = async () => { // Unfortunately for using the on('upgrade') setup, we need to manually // write a HTTP Response to the Socket to close the connection upgrade // attempt, so the following code is to handle all of that. - const statusCode = err.status ?? 401; + const {statusCode, errorMessage } = errors.extractStatusAndMessage(err); /** @type {Record} */ const headers = { @@ -332,7 +334,7 @@ const startServer = async () => { 'Content-Type': 'text/plain', 'Content-Length': 0, 'X-Request-Id': request.id, - 'X-Error-Message': err.status ? err.toString() : 'An unexpected error occurred' + 'X-Error-Message': errorMessage }; // Ensure the socket is closed once we've finished writing to it: @@ -350,7 +352,7 @@ const startServer = async () => { statusCode, headers } - }, err.toString()); + }, errorMessage); return; } @@ -535,11 +537,7 @@ const startServer = async () => { } if (result.rows.length === 0) { - err = new Error('Invalid access token'); - // @ts-ignore - err.status = 401; - - reject(err); + reject(new AuthenticationError('Invalid access token')); return; } @@ -570,11 +568,7 @@ const startServer = async () => { const accessToken = location.query.access_token || req.headers['sec-websocket-protocol']; if (!authorization && !accessToken) { - const err = new Error('Missing access token'); - // @ts-ignore - err.status = 401; - - reject(err); + reject(new AuthenticationError('Missing access token')); return; } @@ -651,11 +645,7 @@ const startServer = async () => { return; } - const err = new Error('Access token does not cover required scopes'); - // @ts-ignore - err.status = 401; - - reject(err); + reject(new AuthenticationError('Access token does not have the required scopes')); }); /** @@ -731,11 +721,7 @@ const startServer = async () => { // If no channelName can be found for the request, then we should terminate // the connection, as there's nothing to stream back if (!channelName) { - const err = new Error('Unknown channel requested'); - // @ts-ignore - err.status = 400; - - next(err); + next(new RequestError('Unknown channel requested')); return; } @@ -762,10 +748,7 @@ const startServer = async () => { return; } - const hasStatusCode = Object.hasOwnProperty.call(err, 'status'); - // @ts-ignore - const statusCode = hasStatusCode ? err.status : 500; - const errorMessage = hasStatusCode ? err.toString() : 'An unexpected error occurred'; + const {statusCode, errorMessage } = errors.extractStatusAndMessage(err); res.writeHead(statusCode, { 'Content-Type': 'application/json' }); res.end(JSON.stringify({ error: errorMessage })); @@ -1140,7 +1123,7 @@ const startServer = async () => { }; /** - * @param {any} res + * @param {http.ServerResponse} res */ const httpNotFound = res => { res.writeHead(404, { 'Content-Type': 'application/json' }); @@ -1155,16 +1138,29 @@ const startServer = async () => { api.use(errorMiddleware); api.get('/api/v1/streaming/*', (req, res) => { - // @ts-ignore - channelNameToIds(req, channelNameFromPath(req), req.query).then(({ channelIds, options }) => { + const channelName = channelNameFromPath(req); + + // FIXME: In theory we'd never actually reach here due to + // authenticationMiddleware catching this case, however, we need to refactor + // how those middlewares work, so I'm adding the extra check in here. + if (!channelName) { + httpNotFound(res); + return; + } + + channelNameToIds(req, channelName, req.query).then(({ channelIds, options }) => { const onSend = streamToHttp(req, res); const onEnd = streamHttpEnd(req, subscriptionHeartbeat(channelIds)); // @ts-ignore streamFrom(channelIds, req, req.log, onSend, onEnd, 'eventsource', options.needsFiltering); }).catch(err => { - res.log.info({ err }, 'Subscription error:', err.toString()); - httpNotFound(res); + const {statusCode, errorMessage } = errors.extractStatusAndMessage(err); + + res.log.info({ err }, 'Eventsource subscription error'); + + res.writeHead(statusCode, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: errorMessage })); }); }); @@ -1265,8 +1261,8 @@ const startServer = async () => { break; case 'hashtag': - if (!params.tag || params.tag.length === 0) { - reject('No tag for stream provided'); + if (!params.tag) { + reject(new RequestError('Missing tag name parameter')); } else { resolve({ channelIds: [`timeline:hashtag:${normalizeHashtag(params.tag)}`], @@ -1276,8 +1272,8 @@ const startServer = async () => { break; case 'hashtag:local': - if (!params.tag || params.tag.length === 0) { - reject('No tag for stream provided'); + if (!params.tag) { + reject(new RequestError('Missing tag name parameter')); } else { resolve({ channelIds: [`timeline:hashtag:${normalizeHashtag(params.tag)}:local`], @@ -1287,19 +1283,23 @@ const startServer = async () => { break; case 'list': - // @ts-ignore + if (!params.list) { + reject(new RequestError('Missing list name parameter')); + return; + } + authorizeListAccess(params.list, req).then(() => { resolve({ channelIds: [`timeline:list:${params.list}`], options: { needsFiltering: false }, }); }).catch(() => { - reject('Not authorized to stream this list'); + reject(new AuthenticationError('Not authorized to stream this list')); }); break; default: - reject('Unknown stream type'); + reject(new RequestError('Unknown stream type')); } }); @@ -1353,8 +1353,17 @@ const startServer = async () => { stopHeartbeat, }; }).catch(err => { - logger.error({ err }, 'Subscription error'); - websocket.send(JSON.stringify({ error: err.toString() })); + const {statusCode, errorMessage } = errors.extractStatusAndMessage(err); + + logger.error({ err }, 'Websocket subscription error'); + + // If we have a socket that is alive and open still, send the error back to the client: + if (websocket.isAlive && websocket.readyState === websocket.OPEN) { + websocket.send(JSON.stringify({ + error: errorMessage, + status: statusCode + })); + } }); }; @@ -1393,10 +1402,11 @@ const startServer = async () => { channelNameToIds(request, channelName, params).then(({ channelIds }) => { removeSubscription(session, channelIds); }).catch(err => { - logger.error({err}, 'Unsubscribe error'); + logger.error({err}, 'Websocket unsubscribe error'); // If we have a socket that is alive and open still, send the error back to the client: if (websocket.isAlive && websocket.readyState === websocket.OPEN) { + // TODO: Use a better error response here websocket.send(JSON.stringify({ error: "Error unsubscribing from channel" })); } });