我希望使用Akka HTTP构建一个REST服务,该服务连接到一个现有的Sink(使用Kafka reactive stream),但我不知道如何将HTTP流链接到Akka流的sink中...
我应该选择使用Flow的低级别Akka HTTP API吗?
我的要求是:
- 整个流都具有反压功能 - 所有事件都被kafka sink确认时返回200响应代码 - 当反压太大时返回500?这可能吗?
以下是我的当前代码:
我应该选择使用Flow的低级别Akka HTTP API吗?
我的要求是:
- 整个流都具有反压功能 - 所有事件都被kafka sink确认时返回200响应代码 - 当反压太大时返回500?这可能吗?
以下是我的当前代码:
// flow to split group of lines into lines
val splitLines = Flow[String].mapConcat(_.split("\n").toList)
// sink to produce kafka records in kafka
val kafkaSink = Flow[String]
.map(new ProducerRecord[Array[Byte], String](topic, _))
.toMat(Producer.plainSink(ProducerSettings(system,new ByteArraySerializer, new StringSerializer)))(Keep.right)
val routes = {
path("ingest") {
post {
logger.info("starting ingestion")
entity(as[GenericEvent]) { eventIngest =>
????
}~
entity(as[GenericEventList]) { eventIngestList =>
????
}
}
}
}
Http(actorSystem).bindAndHandle(routes, config.httpInterface, config.httpPort)