将Akka HTTP连接到Akka Stream

4
我希望使用Akka HTTP构建一个REST服务,该服务连接到一个现有的Sink(使用Kafka reactive stream),但我不知道如何将HTTP流链接到Akka流的sink中...
我应该选择使用Flow的低级别Akka HTTP API吗?
我的要求是:
- 整个流都具有反压功能 - 所有事件都被kafka sink确认时返回200响应代码 - 当反压太大时返回500?这可能吗?
以下是我的当前代码:
// flow to split group of lines into lines
  val splitLines = Flow[String].mapConcat(_.split("\n").toList)

// sink to produce kafka records in kafka
val kafkaSink = Flow[String]
    .map(new ProducerRecord[Array[Byte], String](topic, _))
    .toMat(Producer.plainSink(ProducerSettings(system,new ByteArraySerializer, new StringSerializer)))(Keep.right)

val routes = {
    path("ingest") {
      post {
        logger.info("starting ingestion")
        entity(as[GenericEvent]) { eventIngest =>
          ????      
        }~
        entity(as[GenericEventList]) { eventIngestList =>
          ????
        }
      }
    }
  }

Http(actorSystem).bindAndHandle(routes, config.httpInterface, config.httpPort)
1个回答

2

有几种方法可以解决这个问题。一种建议是将数据直接从请求实体流式传输到kafka sink。使用extractDataBytes指令可以帮助您做到这一点(更多信息请参见这里)。

尝试类似下面的示例。我添加了一个???流,以便允许您针对特定情况正确拆分/转换请求实体字节。您可以使用Framing.delimiter之类的方法来分割实体字节流(更多信息请参见这里)。

  (extractDataBytes & extractMaterializer) { (byteSrc, mat) =>
    val f = byteSrc.via(???).runWith(kafkaSink)(mat)
    onComplete(f){
      case Success(value) => complete(s"OK")
      case Failure(ex)    => complete((StatusCodes.InternalServerError, s"An error occurred: ${ex.getMessage}"))
    }
  }

或者,如果你想将实体反序列化为某个领域对象,你可以这样做

  (entity(as[Event]) & extractMaterializer) { (event, mat) =>
    val f = Source.single(event).via(???).runWith(kafkaSink)(mat)
    onComplete(f){
      case Success(value) => complete(s"OK")
      case Failure(ex)    => complete((StatusCodes.InternalServerError, s"An error occurred: ${ex.getMessage}"))
    }
  }

回答你的最后一个问题,如果Kafka出现背压,你的流永远不会完成。你应该期望在配置的请求超时时间后(参考下面的文档),服务器会给你返回500错误:

默认的请求超时时间适用于所有路由,并且可以使用akka.http.server.request-timeout设置进行配置(默认为20秒)。


有没有办法使用实体(如[event])对流进行取消编组? - vgkowski
是的,您可以始终将未解组的事件运行到Kafka接收器中。我在答案中放了另一个例子。 - Stefano Bonetti
这是Akka HTTP流中的新流程,它不会引入一些无用的开销吗? - vgkowski
如果您想继续使用高级路由DSL,目前这是唯一可能的方式。请参见相关问题:https://dev59.com/oFoV5IYBdhLWcg3wZOGe。如果您非常关注材料化开销,您应该考虑转向低级DSL(http://doc.akka.io/docs/akka-http/10.0.0/scala/http/introduction.html#low-level-http-server-apis)。 - Stefano Bonetti

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接