目前,您可以使用多个GraphQL订阅SSE包。
提供用于使用GraphQL订阅SSE的客户端和服务器。该软件包具有专用的处理程序用于订阅。
以下是在express中使用的示例用法。
import express from 'express';
import { createHandler } from 'graphql-sse';
const handler = createHandler({ schema });
const app = express();
app.use('/graphql/stream', handler);
app.listen(4000);
console.log('Listening to port 4000');
提供了一个适用于GraphQL订阅的服务器处理程序。但是,HTTP处理取决于您使用的框架。
免责声明:我是@graphql-sse包的作者。
下面是一个使用express的示例。
import express, { RequestHandler } from "express";
import {
getGraphQLParameters,
processSubscription,
} from "@graphql-sse/server";
import { schema } from "./schema";
const app = express();
app.use(express.json());
app.post(path, async (req, res, next) => {
const request = {
body: req.body,
headers: req.headers,
method: req.method,
query: req.query,
};
const { operationName, query, variables } = getGraphQLParameters(request);
if (!query) {
return next();
}
const result = await processSubscription({
operationName,
query,
variables,
request: req,
schema,
});
if (result.type === RESULT_TYPE.NOT_SUBSCRIPTION) {
return next();
} else if (result.type === RESULT_TYPE.ERROR) {
result.headers.forEach(({ name, value }) => res.setHeader(name, value));
res.status(result.status);
res.json(result.payload);
} else if (result.type === RESULT_TYPE.EVENT_STREAM) {
res.writeHead(200, {
'Content-Type': 'text/event-stream',
Connection: 'keep-alive',
'Cache-Control': 'no-cache',
});
result.subscribe((data) => {
res.write(`data: ${JSON.stringify(data)}\n\n`);
});
req.on('close', () => {
result.unsubscribe();
});
}
});
客户端
上述提到的两个包都有相应的客户端。由于 EventSource
API 的限制,这两个包实现了一个自定义客户端,为发送 HTTP 头部、带有 post 载荷等提供选项,而这些是 EvenSource
API 不支持的。 graphql-sse
附带了其客户端,而 @graphql-sse/server
则在一个单独的包中提供了其伴侣客户端。
graphql-sse 客户端示例
import { createClient } from 'graphql-sse';
const client = createClient({
url: 'http://localhost:4000/graphql/stream',
});
const result = await new Promise((resolve, reject) => {
let result;
client.subscribe(
{
query: '{ hello }',
},
{
next: (data) => (result = data),
error: reject,
complete: () => resolve(result),
},
);
});
const onNext = () => {
};
let unsubscribe = () => {
};
await new Promise((resolve, reject) => {
unsubscribe = client.subscribe(
{
query: 'subscription { greetings }',
},
{
next: onNext,
error: reject,
complete: resolve,
},
);
});
;
这是@graphql-sse/server
的一个附属组件。
示例:
import {
SubscriptionClient,
SubscriptionClientOptions,
} from '@graphql-sse/client';
const subscriptionClient = SubscriptionClient.create({
graphQlSubscriptionUrl: 'http://some.host/graphl/subscriptions'
});
const subscription = subscriptionClient.subscribe(
{
query: 'subscription { greetings }',
}
)
const onNext = () => {
};
const onError = () => {
};
subscription.susbscribe(onNext, onError)
这是@graphql-sse/server
包的配套包,专为Apollo Client设计。
import { split, HttpLink, ApolloClient, InMemoryCache } from '@apollo/client';
import { getMainDefinition } from '@apollo/client/utilities';
import { ServerSentEventsLink } from '@graphql-sse/apollo-client';
const httpLink = new HttpLink({
uri: 'http://localhost:4000/graphql',
});
const sseLink = new ServerSentEventsLink({
graphQlSubscriptionUrl: 'http://localhost:4000/graphql',
});
const splitLink = split(
({ query }) => {
const definition = getMainDefinition(query);
return (
definition.kind === 'OperationDefinition' &&
definition.operation === 'subscription'
);
},
sseLink,
httpLink
);
export const client = new ApolloClient({
link: splitLink,
cache: new InMemoryCache(),
});