如何记录Akka HTTP客户端请求

35

我需要记录akka http客户端的请求以及它们的响应。虽然似乎有一些API可以记录这些请求,但没有清晰的文档说明应该如何实现。我的方法是创建一个已记录的请求,它透明地包装了Http().singleRequest(req),如下所示:

def loggedRequest(req: HttpRequest)
                  (implicit system: ActorSystem, ctx: ExecutionContext, m: Materializer): Future[HttpResponse] = {

  Http().singleRequest(req).map { resp ⇒
    Unmarshal(resp.entity).to[String].foreach{s ⇒
      system.log.info(req.toString)
      system.log.info(resp.toString + "\n" + s)
    }
    resp
  }
}

不幸的是,我必须通过解封装或仅通过请求resp.entity.dataBytes来获取响应正文以抓取未来。 我理解日志,但承诺已完成,我无法再将实体解封装为实际数据。 一个有效的解决方案将记录请求和响应,并通过不抛出“Promise already completed”引发IllegalStateException的测试用例:

<code>describe("Logged rest requests") {

  it("deliver typed responses") {
    val foo = Rest.loggedRequest(Get(s"http://127.0.0.1:9000/some/path"))
    val resp = foo.futureValue(patience)
    resp.status shouldBe StatusCodes.OK
    val res = Unmarshal(resp.entity).to[MyClass].futureValue
  }
}
</code>

欢迎提出想法。


2
我正在尝试做同样的事情。你找到解决办法了吗? - Abhijit Sarkar
4个回答

27
我找到的解决方案之一是使用:

import akka.http.scaladsl.server.directives.DebuggingDirectives

val clientRouteLogged = DebuggingDirectives.logRequestResult("Client ReST", Logging.InfoLevel)(clientRoute)
Http().bindAndHandle(clientRouteLogged, interface, port)

可以轻松记录请求和结果的原始(字节)格式。问题在于这些日志完全无法阅读。这就是变得复杂的地方。

这是我的示例,它对请求/响应的实体进行编码并将其写入记录器。

您可以将函数传递给:

DebuggingDirectives.logRequestResult

def logRequestResult(magnet: LoggingMagnet[HttpRequest ⇒ RouteResult ⇒ Unit])

这是使用磁铁模式编写的函数:

LoggingMagnet[HttpRequest ⇒ RouteResult ⇒ Unit]

在哪里:

LoggingMagnet[T](f: LoggingAdapter ⇒ T)

由于我们可以访问所有需要记录请求和结果的部分,因此感谢这一点。我们有LoggingAdapter、HttpRequest和RouteResult

在我的情况下,我创建了一个内部函数。我不想再次传递所有参数。

def logRequestResult(level: LogLevel, route: Route)
                      (implicit m: Materializer, ex: ExecutionContext) = {
  def myLoggingFunction(logger: LoggingAdapter)(req: HttpRequest)(res: Any): Unit = {
    val entry = res match {
      case Complete(resp) =>
        entityAsString(resp.entity).map(data ⇒ LogEntry(s"${req.method} ${req.uri}: ${resp.status} \n entity: $data", level))
      case other =>
        Future.successful(LogEntry(s"$other", level))
    }
    entry.map(_.logTo(logger))
  }
  DebuggingDirectives.logRequestResult(LoggingMagnet(log => myLoggingFunction(log)))(route)
}

最重要的部分是最后一行,我将myLoggingFunction放入logRequestResult中进行日志记录。

名为myLoggingFunction的函数,简单地匹配服务器计算结果并基于此创建LogEntry。

最后一件事是一种允许从流中解码结果实体的方法。

def entityAsString(entity: HttpEntity)
                   (implicit m: Materializer, ex: ExecutionContext): Future[String] = {
entity.dataBytes
  .map(_.decodeString(entity.contentType().charset().value))
  .runWith(Sink.head)
}

这个方法可以轻松地添加到任何 akka-http 路由中。

val myLoggedRoute = logRequestResult(Logging.InfoLevel, clientRoute)
Http().bindAndHandle(myLoggedRoute, interface, port)

1
请纠正拼写错误。请纠正缩进。也许可以添加更多关于代码正在做什么的信息。 - Martin Senne
9
我理解这个问题是关于在客户端记录请求和响应,而这个回答是关于在服务器记录请求和响应,对吗? - Arnout Engelen
1
这么多工作只是为了处理像日志记录这样的横切关注点,这太麻烦了。应该简单些。此外,我同意Arnout的看法,这并没有提供有关记录客户端请求的解决方案。 - chauhraj
1
@trudolf 确实,问题描述有些含糊不清,但是描述海报目前所采用的方法的代码显示了 Akka HTTP 客户端 API。 - Arnout Engelen
1
这个问题是关于HTTP客户端请求的,所使用的代码示例是针对HTTP客户端的。我也在寻找同样的内容,但这个答案是错误的。 - relgames
显示剩余3条评论

7
为了另一种解决方案,此代码记录请求IP并将随机数与每个请求和响应相关联,以便可以在日志中关联它们。 它还记录响应时间。
由于请求可能需要一段时间来处理,并且可能会失败,因此我希望立即看到请求,并在返回时查看响应。 RequestFields只是我关心的请求数据。 默认情况下会有很多噪音。
val logRequestResponse: Directive0 =
  extractRequestContext flatMap { ctx =>
    extractClientIP flatMap { ip =>
      val id = scala.math.abs(rand.nextLong).toString
      onSuccess(RequestFields.fromIdIpAndRequest(id, ip, ctx.request)) flatMap { req =>
        logger.info("request", req.asJson)
        val i = Instant.now()
        mapRouteResultWith { result => 
          Result.fromIdStartTimeAndRouteResult(id, i, result) map { res =>
            logger.info("response", res.asJson)
            result
        }
      }
    }
  }
}

1
谢谢Sean - 是否可以发布完整的代码,包括导入?不清楚哪些是您自己的定制代码,哪些是内置的Akka代码。 - Doug Donohoe

3
我的完整解决方案,受到@seanmcl的启发。最初的回答。
trait TraceDirectives extends LazyLogging {

  private val counter: AtomicLong = new AtomicLong(0)

  def log: Directive0 = count flatMap { requestId =>
    mapInnerRoute(addLoggingToRoute(requestId, _))
  }

  private def count: Directive1[Long] = Directive { innerRouteSupplier =>
    ctx =>
      innerRouteSupplier(Tuple1(counter.incrementAndGet()))(ctx)
  }

  private def addLoggingToRoute(requestId: Long, innerRoute: Route): Route = {
    ctx => {
      val requestStopwatch = Stopwatch.createStarted()
      extractClientIP { ip =>
        logger.info("Http request, id: {}, uri: {}, forwarded ip: {}", requestId, ctx.request.uri, ip)
        mapResponse(httpResponse => {
          logger.info("Http response, id: {}, code: {}, time: {}", requestId, httpResponse.status.intValue(), requestStopwatch.toString)
          httpResponse
        })(innerRoute)
      }(ctx)
    }
  }
}

object TraceDirectives extends TraceDirectives

我该如何在我的路由中实现i? - SimbaPK
我已将日志设置为公共。像其他指令一样使用它: post { IctTraceDirectives.log { complete(null) } } - G.Domozhirov
你应该展示以下导入: import java.util.concurrent.atomic.AtomicLong import akka.http.scaladsl.server.{Directive, Directive0, Directive1, Route} import akka.http.scaladsl.server.Directives._ import com.google.common.base.Stopwatch import com.typesafe.scalalogging.LazyLogging ... 并且提到依赖项:"com.typesafe.scala-logging" %% "scala-logging" % "3.9.2" - Mike Slinn

1

我通过将配置值akka.http.client.log-unencrypted-network-bytes设置为整数值(以字节为单位的块大小)来从Akka HTTP客户端获取线级日志,具体描述在这里

P.S. 我知道这是一个非常旧的问题,但这是我在搜索“akka http client wire logging”后找到的地方,所以我为其他可能也会来到这里的人贡献了解决方案。


谢谢您的更新。您是否有一个示例,可以从日志中提取请求以便记录特定细节? - David Weber

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接