GraphQL使用服务器推送事件和EventSource进行订阅

9
我正在研究使用服务器发送事件作为支持api来实现“订阅”类型。但我遇到的问题是界面,更确切地说,此类操作的http层。使用原生EventSource不支持:1.指定HTTP方法,默认使用“GET”;2.包含有效载荷(GraphQL查询)。虽然第一个无可辩驳,但第二个可以通过使用查询参数来规避。查询参数有一个约为2000个字符的限制(可以争论),这使得仅依赖于它们感觉太脆弱。我想到的解决方案是为每个可能的事件创建一个专用的端点。例如:表示各方完成的交易事件的URI:/graphql/transaction-status/$ID将在服务器上转换为此查询:
subscription TransactionStatusSubscription {
    status(id: $ID) {
        ready
    }
}

这种方法存在以下问题:
  1. 需要为每个URI到GraphQL的转换创建一个处理程序。
  2. 部署新版本的服务器。
  3. 丧失GraphQL提供的灵活性->客户端应该控制查询。
  4. 跟踪代码库中的所有端点(后端,前端,移动端)。

可能还有我没有注意到的其他问题。

也许你能想到更好的方法吗? 是否可以提供一种更好的方法来使用EventSource提供请求有效载荷?

4个回答

11

GraphQL中的订阅通常使用WebSockets实现,而不是SSE。Apollo和Relay都支持使用subscriptions-transport-ws客户端来监听事件。Apollo Server包含了使用WebSockets进行订阅的内置支持。如果您只是想实现订阅,则最好使用其中一个现有的解决方案。

话虽如此,还有一个库可以利用SSE进行订阅,在这里。它看起来已经不再维护了,但是如果您一定要尝试使用SSE,可以查看源代码获取一些想法。从源代码来看,作者通过使用返回订阅ID的POST请求初始化每个订阅来解决了上述限制。


我目前正在构建一个使用通知的应用程序。我已经设置了一个RESTful端点,并使用SSE,它运行良好。在应用程序中继续使用SSE通知是否有意义?我还使用GraphQL来处理所有其他资源,但我没有看到使用Websockets的理由。 - Amon

4

目前,您可以使用多个GraphQL订阅SSE包。

graphql-sse

提供用于使用GraphQL订阅SSE的客户端和服务器。该软件包具有专用的处理程序用于订阅。

以下是在express中使用的示例用法。

import express from 'express'; // yarn add express
import { createHandler } from 'graphql-sse';

// Create the GraphQL over SSE handler
const handler = createHandler({ schema });

// Create an express app serving all methods on `/graphql/stream`
const app = express();
app.use('/graphql/stream', handler);

app.listen(4000);
console.log('Listening to port 4000');

@graphql-sse/server

提供了一个适用于GraphQL订阅的服务器处理程序。但是,HTTP处理取决于您使用的框架。

免责声明:我是@graphql-sse包的作者。

下面是一个使用express的示例。

import express, { RequestHandler } from "express";
import {
  getGraphQLParameters,
  processSubscription,
} from "@graphql-sse/server";
import { schema } from "./schema";

const app = express();

app.use(express.json());

app.post(path, async (req, res, next) => {
    const request = {
        body: req.body,
        headers: req.headers,
        method: req.method,
        query: req.query,
    };

    const { operationName, query, variables } = getGraphQLParameters(request);
    if (!query) {
        return next();
    }
    const result = await processSubscription({
        operationName,
        query,
        variables,
        request: req,
        schema,
    });

    if (result.type === RESULT_TYPE.NOT_SUBSCRIPTION) {
        return next();
    } else if (result.type === RESULT_TYPE.ERROR) {
        result.headers.forEach(({ name, value }) => res.setHeader(name, value));
        res.status(result.status);
        res.json(result.payload);
    } else if (result.type === RESULT_TYPE.EVENT_STREAM) {
        res.writeHead(200, {
            'Content-Type': 'text/event-stream',
            Connection: 'keep-alive',
            'Cache-Control': 'no-cache',
        });

        result.subscribe((data) => {
            res.write(`data: ${JSON.stringify(data)}\n\n`);
        });

        req.on('close', () => {
            result.unsubscribe();
        });
    }
});

客户端

上述提到的两个包都有相应的客户端。由于 EventSource API 的限制,这两个包实现了一个自定义客户端,为发送 HTTP 头部、带有 post 载荷等提供选项,而这些是 EvenSource API 不支持的。 graphql-sse 附带了其客户端,而 @graphql-sse/server 则在一个单独的包中提供了其伴侣客户端。

graphql-sse 客户端示例

import { createClient } from 'graphql-sse';

const client = createClient({
  // singleConnection: true, use "single connection mode" instead of the default "distinct connection mode"
  url: 'http://localhost:4000/graphql/stream',
});

// query

  const result = await new Promise((resolve, reject) => {
    let result;
    client.subscribe(
      {
        query: '{ hello }',
      },
      {
        next: (data) => (result = data),
        error: reject,
        complete: () => resolve(result),
      },
    );
  });


 // subscription

  const onNext = () => {
    /* handle incoming values */
  };

  let unsubscribe = () => {
    /* complete the subscription */
  };

  await new Promise((resolve, reject) => {
    unsubscribe = client.subscribe(
      {
        query: 'subscription { greetings }',
      },
      {
        next: onNext,
        error: reject,
        complete: resolve,
      },
    );
  });

;

@graphql-sse/client

这是@graphql-sse/server的一个附属组件。

示例:

import {
  SubscriptionClient,
  SubscriptionClientOptions,
} from '@graphql-sse/client';

const subscriptionClient = SubscriptionClient.create({
    graphQlSubscriptionUrl: 'http://some.host/graphl/subscriptions'
});

const subscription = subscriptionClient.subscribe(
    {
        query: 'subscription { greetings }',
    }
)

const onNext = () => {
    /* handle incoming values */
  };

const onError = () => {
    /* handle incoming errors */
  };

subscription.susbscribe(onNext, onError)

@graphql-sse/apollo-client

这是@graphql-sse/server包的配套包,专为Apollo Client设计。

import { split, HttpLink, ApolloClient, InMemoryCache } from '@apollo/client';
import { getMainDefinition } from '@apollo/client/utilities';
import { ServerSentEventsLink } from '@graphql-sse/apollo-client';

const httpLink = new HttpLink({
  uri: 'http://localhost:4000/graphql',
});

const sseLink = new ServerSentEventsLink({
  graphQlSubscriptionUrl: 'http://localhost:4000/graphql',
});

const splitLink = split(
  ({ query }) => {
    const definition = getMainDefinition(query);
    return (
      definition.kind === 'OperationDefinition' &&
      definition.operation === 'subscription'
    );
  },
  sseLink,
  httpLink
);

export const client = new ApolloClient({
  link: splitLink,
  cache: new InMemoryCache(),
});

3
如果您正在使用Apollo,则它们支持自动持久查询(在文档中缩写为APQ)。 如果您没有使用Apollo,则在任何语言中实现都不应太困难。 我建议遵循他们的惯例,以便您的客户端可以使用Apollo(如果他们想要的话)。
第一次任何客户端使用查询哈希进行EventSource请求时,它将失败,然后使用完整负载重试到常规GraphQL端点的请求。 如果服务器上启用了APQ,则所有带有查询参数的客户端的后续GET请求将按计划执行。
解决了这个问题之后,您只需为GraphQL创建一个服务器发送事件传输即可(考虑到subscribe函数只返回AsyncIterator,这应该很容易)。
我正在研究在我们公司做这件事,因为一些前端开发人员喜欢处理EventSource的简单性。

1
这里涉及到两个方面:SSE连接和GraphQL端点。端点必须遵循规范,因此仅从订阅请求返回SSE是不够的,需要进行GET请求。所以这两个必须分开。
对于让客户端打开一个通过/graphql-sse创建通道令牌的SSE通道,然后使用该令牌客户端可以请求订阅,并且事件将通过所选通道到达,您怎么看?
该令牌可以作为SSE通道上的第一个事件发送,并且为了将令牌传递给查询,客户端可以在cookie、请求头甚至未使用的查询变量中提供它。
或者,服务器可以将最后打开的通道存储在会话存储中(将客户端限制为单个通道)。
如果找不到通道,则查询失败。如果通道关闭,客户端可以再次打开它,并且可以通过在查询字符串/cookie/header中传递令牌或让会话存储处理它。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接