AWS API网关的WebSocket不一致地接收消息

3

我有一个websocket,连接到一个类似这样的lambda函数:

const AWS = require('aws-sdk');
const amqp = require('amqplib');

const api = new AWS.ApiGatewayManagementApi({
    endpoint: 'MY_ENDPOINT',
});

async function sendMsgToApp(response, connectionId) {
    console.log('=========== posting reply');
    const params = {
        ConnectionId: connectionId,
        Data: Buffer.from(response),
    };
    return api.postToConnection(params).promise();
}

let rmqServerUrl =
    'MY_RMQ_SERVER_URL';
let rmqServerConn = null;

exports.handler = async event => {
    console.log('websocket event:', event);
    const { routeKey: route, connectionId } = event.requestContext;

    switch (route) {
        case '$connect':
            console.log('user connected');
            const creds = event.queryStringParameters.x;
            console.log('============ x.length:', creds.length);
            const decodedCreds = Buffer.from(creds, 'base64').toString('utf-8');
            try {
                const conn = await amqp.connect(
                    `amqps://${decodedCreds}@${rmqServerUrl}`
                );
                const channel = await conn.createChannel();
                console.log('============ created channel successfully:');
                rmqServerConn = conn;
                const [userId] = decodedCreds.split(':');
                const { queue } = await channel.assertQueue(userId, {
                    durable: true,
                    autoDelete: false,
                });
                console.log('============ userId:', userId, 'queue:', queue);
                channel.consume(queue, msg => {
                    console.log('========== msg:', msg);
                    const { content } = msg;
                    const msgString = content.toString('utf-8');
                    console.log('========== msgString:', msgString);
                    sendMsgToApp(msgString, connectionId)
                        .then(res => {
                            console.log(
                                '================= sent queued message to the app, will ack, outcome:',
                                res
                            );
                            try {
                                channel.ack(msg);
                            } catch (e) {
                                console.log(
                                    '================= error acking message:',
                                    e
                                );
                            }
                        })
                        .catch(e => {
                            console.log(
                                '================= error sending queued message to the app, will not ack, error:',
                                e
                            );
                        });
                });
            } catch (e) {
                console.log(
                    '=========== error initializing amqp connection',
                    e
                );
                if (rmqServerConn) {
                    await rmqServerConn.close();
                }
                const response = {
                    statusCode: 401,
                    body: JSON.stringify('failed auth!'),
                };
                return response;
            }
            break;
        case '$disconnect':
            console.log('user disconnected');
            if (rmqServerConn) {
                await rmqServerConn.close();
            }
            break;
        case 'message':
            console.log('message route');
            await sendMsgToApp('test', connectionId);
            break;
        default:
            console.log('unknown route', route);
            break;
    }
    const response = {
        statusCode: 200,
        body: JSON.stringify('Hello from websocket Lambda!'),
    };
    return response;
};

amqp连接是用于由AmazonMQ提供的RabbitMQ服务器。我遇到的问题是,发布到队列中的消息在.consume回调中要么根本不显示,要么只有在WebSocket断开并重新连接后才会显示。实际上,它们缺失直到之后某个点才突然出现。这发生在WebSocket中。即使它们出现了,它们也不会被发送到连接到WebSocket的客户端(在这种情况下是应用程序)。这里可能存在什么问题?


RabbitMQ交换机类型队列绑定到什么? - Le chat du rabbin
这是一个直接的交流,但问题在于我对API网关的Websockets工作方式有错误的想法。 - Uche Ozoemena
1个回答

2
这里的问题在于我对API Gateway的websocket工作原理有误解。API Gateway维护websocket连接,但不维护lambda本身。我把我的.consume订阅逻辑放在了lambda内部,这是错误的,因为lambda运行后即终止,无法保持活动状态。更好的方法是将队列作为lambda的事件源然而,这对我来说也不起作用,因为它需要在设置lambda时知道你的队列。我的队列是动态创建的,这就违反了要求。最终我在VPS上建立了一个rmq服务器。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接