如何在微服务/事件驱动体系结构中处理HTTP请求?

82

背景:

我正在构建一个应用程序,拟议的架构是基于微服务的事件/消息驱动。

做事的传统方式是,我有一个用户/HTTP请求,它执行一些命令,具有直接的同步响应。因此,对同一用户/HTTP请求进行响应是“无需麻烦”的。

enter image description here

问题:

用户向UI服务(有多个UI服务)发送HTTP请求,该请求触发将某些事件发送到队列(Kafka / RabbitMQ / 任何其他)。 N个服务接收该事件/消息,在途中进行某些操作,然后在某个时刻,同一UI服务应该接收该响应并将其返回给发起HTTP请求的用户。这样,处理请求就是异步的,但根据您典型的HTTP交互,用户/HTTP请求->响应同步的。

问题: 在这个通用的/事件驱动的世界中,如何将响应发送到发起操作的同一UI服务(与用户通过HTTP交互的服务)?

我迄今为止的研究工作 我一直在寻找解决此问题的人,似乎有些人正在使用WebSockets解决该问题。

但是,复杂性的层面是需要有一些表来映射(RequestId-> Websocket(客户端 - 服务器)),用于“发现”网关中哪个节点具有某个特定响应的websocket连接。但即使我理解了问题和复杂性,我仍然无法找到任何文章,可以让我获得有关如何在实现层面解决此问题的信息。并且,由于第三方集成(例如期望请求/响应的付款提供者(WorldPay)),这仍然不是可行的选项-特别是在3DS验证中。

因此,我有些不情愿认为WebSockets是一个选项。但即使WebSockets适用于Web面向应用程序,在连接到外部系统的API方面也不是很好的架构。

** ** ** 更新: ** ** **

即使长轮询对于具有202 AcceptedLocation头重试后头的WebService API是可能的解决方案,但对于高并发和高能力的网站来说性能不够好。想象一下大量人试图在他们进行的每个请求上获取交易状态更新,并且您必须无效化CDN缓存(现在去解决那个问题吧!哈)。

但最重要的是与我的情况相关的是,我有第三方API,例如付款系统,其中3DS系统具有由付款提供者系统处理的自动重定向,它们期望典型的请求/响应流,因此,此模型将不适用于我,也不适用于套接字模型。

由于这种用例,HTTP请求/响应应该以典型方式处理,在这种方式中

 POST /user
    Payload {userdata}
    RETURNs: 
        HTTP/1.1 202 Accepted
        Content-Type: application/json; charset=utf-8
        Date: Mon, 27 Nov 2018 17:25:55 GMT
        Location: https://mydomain/user/transaction/status/:transaction_id
        Retry-After: 10

GET 
   https://mydomain/user/transaction/status/:transaction_id

在此输入图片描述


8
如果您不想与客户端进行双向通信,可以返回“202 Accepted”并带有一个“Location header”,告诉客户端他们可以轮询的位置,以便在处理完成时获取结果。这是处理长时间运行的HTTP请求的常见模式,因为您无法立即响应请求。 - jonrsharpe
2
在阅读了Confluent博客上有关Kafka的文章(https://www.confluent.io/blog/build-services-backbone-events/)后,我也在考虑并寻找这样的解决方案。 - Cory Robinson
乔纳森:你发现了什么? - Magne
7个回答

22

正如我所预料的那样 - 即使不适合,人们也会试图将一切都纳入一个概念之中。这不是批评,这是我经验和阅读你的问题以及其他答案后的观察。

没错,你说得对,微服务架构基于异步消息传递模式。但是,当我们谈论UI时,我脑海中有两种可能的情况:

  1. UI需要立即响应(例如读取操作或用户期望立即获得答案的命令)。 这些不必是异步的。如果响应需要立即显示在屏幕上,为什么要增加消息传递和异步的开销呢?这没有意义。微服务架构旨在解决问题,而不是通过增加开销来创造新问题。

  2. UI可以重构以容忍延迟响应(例如,UI可以仅提交命令,接收确认,并在响应准备好时让用户做其他事情)。在这种情况下,您可以引入异步性。与UI直接交互的网关服务可以编排异步处理(等待完成事件等),并在准备就绪时向UI通信。我见过UI在这种情况下使用SignalR,而网关服务是接受套接字连接的API。如果浏览器不支持套接字,则最好回退到轮询。无论如何,重要的是,这只能在以下情况下起作用:UI可以容忍延迟响应

  3. 如果微服务确实与您的情况相关(案例2),则相应地结构化UI流程,后端的微服务中不应该有挑战。在这种情况下,您的问题归结为将事件驱动架构应用于服务集合(边缘是连接事件驱动和UI交互的网关微服务)。这个问题(事件驱动服务)是可以解决的,您知道这一点。您只需要决定是否可以重新思考UI的工作方式。


4

从更一般的角度来看 - 在接收请求时,您可以在当前请求的上下文中(即当请求对象在范围内时)在队列上注册一个订阅者,该订阅者在负责服务完成其工作时(如维护总操作数量的状态机)收到确认。当达到终止状态时,它返回响应并删除侦听器。我认为这将适用于任何发布/订阅样式的消息队列。以下是我建议的过度简化演示。

// a stub for any message queue using the pub sub pattern
let Q = {
  pub: (event, data) => {},
  sub: (event, handler) => {}
}
// typical express request handler
let controller = async (req, res) => {
  // initiate saga
  let sagaId = uuid()
  Q.pub("saga:register-user", {
    username: req.body.username,
    password: req.body.password,
    promoCode: req.body.promoCode,
    sagaId: sagaId
  })
  // wait for user to be added 
  let p1 = new Promise((resolve, reject) => {
    Q.sub("user-added", ack => {
      resolve(ack)
    })
  })
  // wait for promo code to be applied
  let p2 = new Promise((resolve, reject) => {
    Q.sub("promo-applied", ack => {
      resolve(ack)
    })
  })

  // wait for both promises to finish successfully
  try {
    
    var sagaComplete = await Promise.all([p1, p2])
    // respond with some transformation of data
    res.json({success: true, data: sagaComplete})

  } catch (e) {
    logger.error('saga failed due to reasons')
    // rollback asynchronously
    Q.pub('rollback:user-added', {sagaId: sagaId})
    Q.pub('rollback:promo-applied', {sagaId: sagaId})
    // respond with appropriate status 
    res.status(500).json({message: 'could not complete saga. Rolling back side effects'})
  }
  
}

作为您可能已经注意到的,这似乎是一个通用模式,可以抽象成框架,以减少代码重复并管理横切关注点。这就是 saga pattern 的基本内容。客户端只会等待所需操作完成的时间(即使全部同步执行也是如此),再加上由于服务间通信而产生的额外延迟。如果您正在使用基于事件循环的系统(如NodeJS或Python Tornado),请确保不要阻塞线程。
仅仅使用基于Websocket的推送机制并不能真正提高系统的效率或性能。然而,建议您使用Socket连接向客户端推送消息,因为它使您的架构更加通用(即使您的客户端表现得像您的服务一样),一致,并允许更好地分离关注点。这还将使您能够独立扩展推送服务,而无需担心业务逻辑。Saga模式可以扩展以在部分故障或超时的情况下启用回滚,并使您的系统更易于管理。

3
以下是一个非常简单的示例,演示如何实现 UI 服务 以便它可以与正常的 HTTP 请求/响应流配合使用。 它使用 node.js 的 events.EventEmitter 类来“路由”响应到正确的 HTTP 处理程序。
实现概述:
1. 连接生产者/消费者客户端到 Kafka: - 使用生产者将请求数据发送到内部微服务。 - 使用消费者监听来自微服务的数据,表示请求已被处理,并假设那些 Kafka 项还包含应返回给 HTTP 客户端的数据。
2. 创建一个全局的 事件分派器 ,从 EventEmitter 类中创建。
3. 注册 HTTP 请求处理程序: - 创建一个 UUID 并将其包含在推送到 Kafka 的有效负载中。 - 在我们的事件分派器中注册一个事件侦听器,其中 UUID 用作其侦听的事件名称。
4. 开始消耗 Kafka 主题并检索 HTTP 请求处理程序正在等待的 UUID,并为其发出事件。 在示例代码中,我没有在发出的事件中包含任何有效负载,但通常您会想要将某些来自 Kafka 数据的数据包含为参数,以便 HTTP 处理程序可以将其返回给 HTTP 客户端。
请注意,我尝试使代码尽可能小,略去了错误和超时处理等!
还要注意,kafkaProduceTopickafkaConsumTopic 是相同的主题,以简化测试,不需要另一个服务/函数来生产到 UI 服务 消费主题。代码假设已经使用 npm 安装了 kafka-nodeuuid 包,并且 Kafka 在 localhost:9092 上可访问。
const http = require('http');
const EventEmitter = require('events');
const kafka = require('kafka-node');
const uuidv4 = require('uuid/v4');

const kafkaProduceTopic = "req-res-topic";
const kafkaConsumeTopic = "req-res-topic";

class ResponseEventEmitter extends EventEmitter {}

const responseEventEmitter = new ResponseEventEmitter();

var HighLevelProducer = kafka.HighLevelProducer,
    client = new kafka.Client(),
    producer = new HighLevelProducer(client);

var HighLevelConsumer = kafka.HighLevelConsumer,
    client = new kafka.Client(),
    consumer = new HighLevelConsumer(
        client,
        [
            { topic: kafkaConsumeTopic }
        ],
        {
            groupId: 'my-group'
        }
    );

var s = http.createServer(function (req, res) {
    // Generate a random UUID to be used as the request id that
    // that is used to correlated request/response requests.
    // The internal micro-services need to include this id in
    // the "final" message that is pushed to Kafka and consumed
    // by the ui service
    var id = uuidv4();

    // Send the request data to the internal back-end through Kafka
    // In real code the Kafka message would be a JSON/protobuf/... 
    // message, but it needs to include the UUID generated by this 
    // function
    payloads = [
        { topic: kafkaProduceTopic, messages: id},
    ];
    producer.send(payloads, function (err, data) {
        if(err != null) {
            console.log("Error: ", err);
            return;
        }
    });

    responseEventEmitter.once(id, () => {
        console.log("Got the response event for ", id);
        res.write("Order " + id + " has been processed\n");
        res.end();
    })
});

s.timeout = 10000;
s.listen(8080); 

// Listen to the Kafka topic that streams messages
// indicating that the request has been processed and
// emit an event to the request handler so it can finish.
// In this example the consumed Kafka message is simply
// the UUID of the request that has been processed (which
// is also the event name that the response handler is
// listening to).
//
// In real code the Kafka message would be a JSON/protobuf/... message
// which needs to contain the UUID the request handler generated.
// This Kafka consumer would then have to deserialize the incoming
// message and get the UUID from it. 
consumer.on('message', function (message) {
    responseEventEmitter.emit(message.value);
});

非常感谢您。这里有很多事情被省略了,其中之一是 responseEventEmitter 没有从 kafka 监听到数据。想法是 HTTP REQ 入口通过 "给定的 ID" 发送和消费事件管理器中的内容,并在 on event-chain-finish (它可以是 doneerror) 时进行处理。此示例将事件和一些数据推送到 kafka,但它没有监听 (或过滤) 相同事件数据通过 ID 和状态更改 (一个给定状态,表示它应该响应用户)。这种体系结构的关键问题是从分布式系统中按 ID 和 EVENT_TYPE 将处理后的数据返回给用户。 - Jonathan Thurft
@JonathanThurft 代码的最后几行将Kafka消费者连接到responseEventEmitter。为了保持代码简单,我假设消费的数据只是请求处理程序生成并发送到内部后端作为协调令牌的UUID。但实际上,这将是一些更复杂的数据,您需要从中提取令牌。内部服务必须确保在所有内部请求中包含此UUID/令牌,以便它可以包含在ui服务从中消耗的Kafka主题发送的最终消息中。 - johlo

3
问题:在这个不分语言和事件的世界中,我如何向发起操作的同一UI服务(与用户通过HTTP交互的服务)发送响应?所以我正在寻找一个解决方案,在外部我有一个典型的请求-响应(同步),而系统的异步复杂性是在内部处理的。我认为需要注意的重要事情是以下内容。简而言之:
您的系统边缘可以是同步的,同时它在内部以异步事件驱动的方式处理事务。请参见:illustrationbacking
UI服务(Web服务器或API网关)可以仅发送一个异步/等待函数(使用Node.js编写),然后继续处理其他请求。然后,当UI服务从后端另一个微服务(例如通过监听Kafka日志)接收到“OrderConfirmed”事件时,它将再次接收用户请求(回调),并向客户端发送指定的响应。
您可以尝试使用Reactive Interaction Gateway (RIG)来处理此过程。
客户端可以同步服务,而后端处理其内部事件异步。客户端不知道也不需要知道。但是客户端必须等待。因此要注意长时间的处理时间。因为客户端的HTTP请求可能会超时(通常为30秒至2分钟的HTTP请求。或者如果UI服务在Cloud Function中运行,它也可能会超时;默认为1分钟,但可延长至9分钟)。但如果请求一直是同步的,则超时实际上也是一个问题。异步事件驱动架构并不改变这一点,只是让人们更加关注这个问题。
由于客户端发送了同步请求,所以它是阻塞的(用户必须在UI中等待)。但是,这并不意味着UI服务(Web服务器或API网关)必须是阻塞的。它只需启动一个异步/等待函数(使用Node.js编写),然后继续处理其他请求。然后,当UI服务收到“OrderConfirmed”事件时,它将重新开始执行该函数(回调),并向客户端发送指定的响应。
与第三方系统交互的任何(微)服务都可以通过典型的HTTP请求/响应流进行同步操作(尽管通常您也希望在此处使用异步/等待,以在第三方正在处理时释放自己的微服务资源)。当它同步收到来自第三方的响应时,它可以向后端的其余部分发送异步事件(例如,“StockResupplied”事件)。如果UI服务被设计为在给出响应之前等待此类事件,则可以将其传播回客户端。
灵感来源:Brad Irby在相关问题的回答与上述方案相反的是: 设计客户端的用户界面,使用户不必阻塞/等待(也许已经收到了乐观的成功响应,符合“乐观UI”原则),但稍后异步接收服务器发送事件(SSE)
  • “服务器发送事件(SSE)是一种技术,使浏览器(客户端)通过HTTP连接从服务器接收文本事件数据等自动更新。”

  • SSE“在后台使用一个长时间的HTTP连接”。

您可能还想知道:
  • “很好知道SSE受到最大打开连接数的限制,当打开多个标签时可能会特别痛苦,因为每个浏览器的限制是六个。”

  • “与SSE相比,WebSockets更加复杂和任务要求高,需要更多设置。”

来源: https://www.telerik.com/blogs/websockets-vs-server-sent-events

此外:

使用带有服务工作者的Web Push API也是SSE的另一种选择,但它更加复杂,会给用户带来烦人的弹窗,并且更多地设计用于在浏览器外部发送通知和/或在Web应用程序关闭时发送通知。

你也可以看看:

反应式交互网关(RIG),可以提供一些不错的架构图,以及自述文件,其中说明了本答案的主要观点。RIG是一个可扩展的免费开源微服务API网关,处理SSE、WebSockets、长轮询等。反应式交互网关(RIG)“订阅Kafka主题,同时持有所有活动前端的连接,将事件转发给它们所寻址的用户,所有这些都是可扩展的。除此之外,它还处理授权,因此您的服务也不必担心这个问题。”


1
很遗憾,我认为您可能需要使用长轮询或WebSockets来完成这样的任务。您需要向用户“推送”某些内容,或者保持HTTP请求开放直到有返回结果。
要将数据发送回实际用户,可以使用socket.io之类的工具。当用户连接时,socket.io会创建一个ID。每次用户连接时,都会将userid映射到socket.io给您的ID上。
一旦每个请求都附带了userid,就可以将结果发送回正确的客户端。流程大致如下:
Web请求订单(POST带有数据和userId)
UI服务将订单放入队列(此订单应该有userId)
x个服务处理订单(每次传递userId)
UI服务从主题中获取。在某个时间点,数据出现在主题上。它所消费的数据具有userId,UI服务查找映射以确定要发射到哪个socket。
无论您的UI上运行的是什么代码,都需要是事件驱动的,以便处理推送数据而不涉及原始请求的上下文。您可以使用类似redux的东西来实现这一点。本质上,您将使服务器在客户端上创建redux操作,它的效果非常好!希望这可以帮助到您。

0

使用Promises怎么样?如果你想要实时性,Socket.io也是一个解决方案。

同时看看CQRS。这种架构模式适用于事件驱动模型和微服务架构。

更好的方法是阅读this


-6

好问题。我的答案是,在系统中引入同步流程。

我正在使用rabbitMq,所以我不知道kafka,但你应该搜索一下kafka的同步流程。

WebSockets似乎有些过度了。

希望这可以帮到你。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接