如何使用Kotlin协程处理回调

10

以下代码片段在顺序代码流中返回结果为“null”。我了解协程可能是处理回调异步的可行解决方案。


    fun getUserProperty(path: String): String? {
        var result: String? = null
        database.child(KEY_USERS).child(getUid()).child(path)
            .addListenerForSingleValueEvent(object : ValueEventListener {
                override fun onCancelled(error: DatabaseError) {
                    Log.e(TAG, "error: $error")
                }

                override fun onDataChange(snapshot: DataSnapshot) {
                    Log.w(TAG, "value: ${snapshot.value}")
                    result = snapshot.value.toString()
                }
            })
        return result
    }

在这种情况下,协程能否帮助等待回调(onDataChange() / onCancelled())的结果?

3个回答

10

由于 Firebase 实时数据库 SDK 没有提供任何挂起函数,协程在处理其 API 时没有什么用处。您需要将回调转换为挂起函数,以便能够在协程中等待结果。

下面是一个挂起扩展函数,它可以实现这一点(我通过进行谷歌搜索发现了一个解决方案):

suspend fun DatabaseReference.getValue(): DataSnapshot {
    return async(CommonPool) {
        suspendCoroutine<DataSnapshot> { continuation ->
            addListenerForSingleValueEvent(FValueEventListener(
                    onDataChange = { continuation.resume(it) },
                    onError = { continuation.resumeWithException(it.toException()) }
            ))
        }
    }.await()
}

class FValueEventListener(val onDataChange: (DataSnapshot) -> Unit, val onError: (DatabaseError) -> Unit) : ValueEventListener {
    override fun onDataChange(data: DataSnapshot) = onDataChange.invoke(data)
    override fun onCancelled(error: DatabaseError) = onError.invoke(error)
}

有了这个,现在你可以在协程中等待 DatabaseReference 上的 getValue() 方法。


Doug,感谢您的输入。是的,我遇到了一些类似的方法。不知道使用方式(指在数据库中引用节点、接收值)是如何工作的?https://gist.github.com/beyondeye/f69ba427938f79801c291d18131ff1d9 - Faisal
我不确定你在问什么。 - Doug Stevenson
以上代码片段不确定如何调用并从getValue()返回值。正在寻找深入的文章以更好地理解。 - Faisal
1
有没有任何原因使得Kotlin无法使用async?我只能找到GlobalScope.async - Faisal

5

如果您想保持列表,可以使用协程流,以下是单一值事件的@Doug示例:

@ExperimentalCoroutinesApi
inline fun <reified T> DatabaseReference.listen(): Flow<DataResult<T?>> =
  callbackFlow {
    val valueListener = object : ValueEventListener {
      override fun onCancelled(databaseError: DatabaseError) {
        close(databaseError.toException())
      }

      override fun onDataChange(dataSnapshot: DataSnapshot) {
        try {
          val value = dataSnapshot.getValue(T::class.java)
          offer(DataResult.Success(value))
        } catch (exp: Exception) {
          Timber.e(exp)
          if (!isClosedForSend) offer(DataResult.Error(exp))
        }
      }
    }
    addValueEventListener(valueListener)

    awaitClose { removeEventListener(valueListener) }
  }

我们需要 awaitClose 吗? - IgorGanapolsky
@IgorGanapolsky 是的 - Andrew

4

如果有人仍在使用原始答案的代码,但需要将其更新以匹配非实验版本的协程,这是我如何更改它的方式:

import com.google.firebase.database.DataSnapshot
import com.google.firebase.database.DatabaseError
import com.google.firebase.database.DatabaseReference
import com.google.firebase.database.ValueEventListener
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine

suspend fun DatabaseReference.getSnapshotValue(): DataSnapshot {
    return withContext(Dispatchers.IO) {
        suspendCoroutine<DataSnapshot> { continuation ->
            addListenerForSingleValueEvent(FValueEventListener(
                onDataChange = { continuation.resume(it) },
                onError = { continuation.resumeWithException(it.toException()) }
            ))
        }
    }
}

class FValueEventListener(val onDataChange: (DataSnapshot) -> Unit, val onError: (DatabaseError) -> Unit) : ValueEventListener {
    override fun onDataChange(data: DataSnapshot) = onDataChange.invoke(data)
    override fun onCancelled(error: DatabaseError) = onError.invoke(error)
}

那么使用它就像这样简单:val snapshot = ref.getSnapshotValue()

更新

我还需要观察一个节点并用Omar的答案来做到这一点。如果有人需要如何使用它的示例,在这里:

@ExperimentalCoroutinesApi
inline fun <reified T> DatabaseReference.listen(): Flow<T?>? =
callbackFlow {
    val valueListener = object : ValueEventListener {
        override fun onCancelled(databaseError: DatabaseError) {
            close()
        }

        override fun onDataChange(dataSnapshot: DataSnapshot) {
            try {
                val value = dataSnapshot.getValue(T::class.java)
                offer(value)
            } catch (exp: Exception) {
                if (!isClosedForSend) offer(null)
            }
        }
    }
    addValueEventListener(valueListener)
    awaitClose { removeEventListener(valueListener) }
}

如果要在Activity或Fragment中调用它,您可以像下面这样创建监听器:

var listener =  FirebaseUtils.databaseReference
   .child(AppConstants.FIREBASE_PATH_EMPLOYEES)
   .child(AuthUtils.retrieveUID()!!).listen<User>()

然后在你的函数内调用它:

CoroutineScope(IO).launch {
    withContext(IO) {
        listener?.collect{
            print(it)
        }
    }
}

然后在onStop()内进行处理:

override fun onStop(){
    listener = null
    super.onStop()
}

为什么你需要同时使用 CoroutineScope(IO)withContext(IO) - IgorGanapolsky
我不得不这样做,因为我没有使用ViewModel库。我有另一个协程需要处理,所以你可以删除withContext。 - kobowo

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接