等待多个流程完成后再进行下一步。

3
我需要将图片上传到服务器,但不能使用其他库,我需要将它(base64编码)拆分成块并上传它们全部。
我正在使用Kotlin协程Flow来实现此操作,我目前所做的是进行第一次调用(返回Flow)以获取图像ID,我需要在所有上传请求中添加该ID。
以下是我用于上传图像的2个函数。
fun submitImage(payload: Payload): Flow<String> {
    val request = requestBuilder.buildUploadImageRequest(payload)
    return client.execute(request)
        .serviceFlow({ response ->
            val imageId = response.body.id
            uploadImage(payload.imageBase64, imageId)
            imageId
        }, { response ->
            throw MyServerError("Error ${response.error}")
        })
}

private fun uploadImage(imageBase64: String, imageId: String) {
    val chunks = divideEncodedImageInChunksOfSize(imageBase64)
    var v = 1
    for (chunk in chunks) {
        val payload = generatePayload(imageId, v, chunk, false)
        submitImageChunk(payload)
        v++
    }
    val payload = generatePayload(imageId, v, "", true)
    submitImageChunk(payload)
}

private fun submitImageChunk(payload: JSONObject): Flow<Unit> {
    val request = requestBuilder.buildUploadImageChunkRequest(payload)
    return client.execute(request)
        .serviceFlow({ }, { response ->
            throw MyHttpError(response)
        })
}

我使用以下实用函数:

// Extension function to handle Flows and their activation
internal fun MyHttpClient.execute(request: MyHttpRequest): Flow<MyHttpResponse> {
    return flow {
        val deferred = CompletableDeferred<MyHttpResponse>()
        executeHttp(request, object : MyHttpListener {
            override fun onSuccess(response: MyHttpResponse) {
                deferred.complete(response)
            }

            override fun onFailure(response: MyHttpResponse) {
                deferred.completeExceptionally(MyHttpError(response))
            }
        })
        emit(deferred.await())
    }
}

// Extension function to catch exceptions AND to check if the response body is null
internal fun <T> Flow<MyHttpResponse>.serviceFlow(
    onSuccess: (response: MyHttpResponse) -> T,
    onError: (response: MyHttpResponse) -> Unit
) = flatMapConcat { response ->
    flowOf(response)
        .map { res ->
            res.body?.let { it ->
                onSuccess(res)
            } ?: throw MyParseError("MyHttpResponse has a null body")
        }
        .catchException<JSONException, T> { e ->
            throw MyParseError("Parsing exception $e")
        }
}.catchException<MyHttpError, T> { e ->
    onError(e.response)
}

// Function leveraging OkHttpClient to make a HTTPRequest
internal fun executeHttp { ... }

我认为问题出在函数submitImage启动所有子流程上传图像后就返回了,但它没有等待所有子流程完成。我不确定Kotlin协程在这种情况下有什么构造,请问有人能帮忙吗?

Flow.zip() 可能是一个候选项。 - CommonsWare
zip 不是将两个流合并,而是在第一个流结束时立即终止。此外,图像中的块很容易累加到十几个,因此需要一种递归压缩函数。 - kioli
2个回答

1

感谢musafee给了我正确的方向。

最终的答案是我在uploadImage函数中创建了这些流程,但我从未实际调用collect,因此它们始终未启动。

我选择的解决方案是将在那里创建的流程列表返回到调用函数,并从那里将submitImage函数的返回类型从Flow<String>更改为Flow<List<Flow<Unit>>>,然后从上层触发它们。


0

我认为你应该使用WorkManager并考虑链式工作者特性

有了Flow特性,可以尝试这样做:

private suspend fun uploadImage(imageBase64: String, imageId: String) {
withContext(Dispatchers.IO){
  val chunks = divideEncodedImageInChunksOfSize(imageBase64)
  var v = 1
  for (chunk in chunks) {
     val payload = generatePayload(imageId, v, chunk, false)
     submitImageChunk(payload)
     v++
   }
  val payload = generatePayload(imageId, v, "", true)
  submitImageChunk(payload).await();
 }

private suspend fun submitImageChunk(payload: JSONObject): Deferred<Unit> {
 val request = requestBuilder.buildUploadImageChunkRequest(payload)
 return client.execute(request);
}

很遗憾,我目前无法添加其他依赖项。我希望能够利用 Kotlin 协程库中的结构和特性来解决问题。 - kioli
uploadImage不是一个暂停函数。因此,在图像成功上传之前,submitImage函数应该返回imageId。 - user12281411
这是正确的@musafee,但即使让uploadImage返回一个flow,它仍然无法工作(也许这是正确的方向)。 - kioli

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接