Android WorkManager中的异步工作者

57

最近,Google 宣布了新的WorkManager 架构组件。它通过在 Worker 类中实现 doWork() 方法,使同步工作调度变得容易,但是如果我想在后台执行一些异步工作怎么办?例如,我想使用 Retrofit 进行网络服务调用。我知道可以进行同步网络请求,但这会阻塞线程,感觉不太对。是否有解决方案或当前不支持呢?


你是指主线程被阻塞还是当前线程被阻塞? - Sagar
1
工作线程将被阻塞。 - Anton Tananaev
你可以同时将两个工作线程加入队列,而不是生成一个新的线程? - Sagar
请仔细阅读问题。我没有生成任何新线程。 - Anton Tananaev
我的意思是,如果你想要做一些异步的事情,你需要生成一个线程,这样它就不会在同一个线程上执行。我正在尝试理解你的用例。 - Sagar
嗯,在内部,Retrofit可能有一个单独的线程。 - Anton Tananaev
9个回答

36

我使用了CountDownLatch并等待它达到0,这只会在异步回调更新它后发生。请查看此代码:

public WorkerResult doWork() {

        final WorkerResult[] result = {WorkerResult.RETRY};
        CountDownLatch countDownLatch = new CountDownLatch(1);
        FirebaseFirestore db = FirebaseFirestore.getInstance();

        db.collection("collection").whereEqualTo("this","that").get().addOnCompleteListener(task -> {
            if(task.isSuccessful()) {
                task.getResult().getDocuments().get(0).getReference().update("field", "value")
                        .addOnCompleteListener(task2 -> {
                            if (task2.isSuccessful()) {
                                result[0] = WorkerResult.SUCCESS;
                            } else {
                                result[0] = WorkerResult.RETRY;
                            }
                            countDownLatch.countDown();
                        });
            } else {
                result[0] = WorkerResult.RETRY;
                countDownLatch.countDown();
            }
        });

        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return result[0];

    }

3
当约束条件无法满足时会发生什么。这意味着当理想状态的约束条件得到满足后,工作管理器将被触发。但是经过一段时间后,手机不再处于理想状态。 - Nitish

33

顺便提一下,现在有ListenableWorker,它被设计成异步执行。

编辑:这里是一些示例用法的片段。我删去了我认为不具有说明性的大段代码,因此可能存在一些小错误。

这是针对一个任务的,它需要一个 String photoKey,从服务器检索元数据,进行一些压缩工作,然后上传已压缩的照片。这发生在主线程之外。以下是我们如何发送工作请求:

private void compressAndUploadFile(final String photoKey) {
    Data inputData = new Data.Builder()
            .putString(UploadWorker.ARG_PHOTO_KEY, photoKey)
            .build();
    Constraints constraints = new Constraints.Builder()
            .setRequiredNetworkType(NetworkType.CONNECTED)
            .build();
    OneTimeWorkRequest request = new OneTimeWorkRequest.Builder(UploadWorker.class)
            .setInputData(inputData)
            .setConstraints(constraints)
            .build();
    WorkManager.getInstance().enqueue(request);
}

并在UploadWorker中:

public class UploadWorker extends ListenableWorker {
    private static final String TAG = "UploadWorker";
    public static final String ARG_PHOTO_KEY = "photo-key";

    private String mPhotoKey;

    /**
     * @param appContext   The application {@link Context}
     * @param workerParams Parameters to setup the internal state of this worker
     */
    public UploadWorker(@NonNull Context appContext, @NonNull WorkerParameters workerParams) {
        super(appContext, workerParams);
        mPhotoKey = workerParams.getInputData().getString(ARG_PHOTO_KEY);
    }

    @NonNull
    @Override
    public ListenableFuture<Payload> onStartWork() {
        SettableFuture<Payload> future = SettableFuture.create();
        Photo photo = getPhotoMetadataFromServer(mPhotoKey).addOnCompleteListener(task -> {
            if (!task.isSuccessful()) {
                Log.e(TAG, "Failed to retrieve photo metadata", task.getException());
                future.setException(task.getException());
                return;
            }
            MyPhotoType photo = task.getResult();
            File file = photo.getFile();
            Log.d(TAG, "Compressing " + photo);
            MyImageUtil.compressImage(file, MyConstants.photoUploadConfig).addOnCompleteListener(compressionTask -> {
                if (!compressionTask.isSuccessful()) {
                    Log.e(TAG, "Could not parse " + photo + " as an image.", compressionTask.getException());
                    future.set(new Payload(Result.FAILURE));
                    return;
                }
                byte[] imageData = compressionTask.getResult();
                Log.d(TAG, "Done compressing " + photo);
                UploadUtil.uploadToServer(photo, imageData);
                future.set(new Payload(Result.SUCCESS));
            });
        });
        return future;
    }
}

编辑

根据您在应用程序中使用的工具,您也可以扩展RxWorker(如果您正在使用RxJava)或CoroutineWorker(如果您正在使用Coroutines)。它们都继承自ListenableWorker


2
请问您能否添加一个使用该类的示例? - idish
7
我无法在alpha-13中使用SettableFuture.create(),该类仅限于同一库组。 - David Vávra
实际上,SettableFuture.create(); 模块仅限于 WorkManager 库组的私有使用,无法使用。 - idish
1
它实际上并不是私有的。 https://dev59.com/8VcP5IYBdhLWcg3w5d_Q 只需抑制警告"@SuppressLint("RestrictedApi")" - A.Sanchez.SD
2
任务在主线程上执行。据说“startWork()”方法在主线程上调用。此外,我无法在类中看到任何“onStartWork”的内容。你能解释一下吗? - Abhay Pai
显示剩余2条评论

23

根据WorkManager文档

默认情况下,WorkManager在后台线程上运行其操作。如果您已经在后台线程上运行,并需要同步(阻塞)调用WorkManager,则使用synchronous()访问此类方法。

因此,如果您不使用synchronous(),则可以安全地从doWork()执行同步网络调用。从设计角度来看,这也是更好的方法,因为回调很混乱。

话虽如此,如果您真的想要从doWork()触发异步作业,则需要暂停执行线程,并在异步作业完成后恢复它,使用wait/notify机制(或其他线程管理机制,例如Semaphore)。这在大多数情况下都不是我推荐的做法。

另外,值得一提的是,WorkManager目前处于非常早期的alpha版本。


7
如果您谈论异步作业,可以将工作移动到RxJava Observables / Singles中。有一组操作符,如.blockingGet().blockingFirst(),可将Observable<T>转换为阻塞的TWorker在后台线程上执行,因此不必担心NetworkOnMainThreadException

你能回答这个问题吗:https://dev59.com/cKvka4cB1Zd3GeqPx76p - Usman Rana
在后台线程运行时,使用同步 API 覆盖异步 API 并不总是足够好。例如,某些异步 API 有一些 onProgress 回调将在主线程上调用,请注意。 - idish
FFmpeg命令可以使用RxJava执行吗?因为它已经是具有回调的异步方法。 - Usman Rana

7

使用协程的能力,您可以像这样“同步”doWork()

获取位置的暂停方法(异步):

private suspend fun getLocation(): Location = suspendCoroutine { continuation ->
    val mFusedLocationClient = LocationServices.getFusedLocationProviderClient(appContext)
    mFusedLocationClient.lastLocation.addOnSuccessListener {
        continuation.resume(it)
    }.addOnFailureListener {
        continuation.resumeWithException(it)
    }
}

doWork()中的调用示例:

override fun doWork(): Result {
    val loc = runBlocking {
        getLocation()
    }
    val latitude = loc.latitude
}

2021年更新: 现在可以使用CoroutineWorker,它有一个暂停的doWork()方法。

class MySuspendWorker(private val appContext: Context, workerParams: WorkerParameters) : CoroutineWorker(appContext, workerParams) {
    override suspend fun doWork(): Result {
        //do your async work
    }
}

非常优雅的解决方案 - Maher Abuthraa
1
谢谢@MaherAbuthraa,我也更新了答案,使用内置的挂起工作器。 - Webfreak

6

我使用了BlockingQueue,它简化了线程同步和在线程之间传递结果的过程,你只需要一个对象就行了。

private var disposable = Disposables.disposed()

private val completable = Completable.fromAction { 
        //do some heavy computation
    }.subscribeOn(Schedulers.computation()) // you will do the work on background thread

override fun doWork(): Result {
    val result = LinkedBlockingQueue<Result>()

    disposable = completable.subscribe(
            { result.put(Result.SUCCESS) },
            { result.put(Result.RETRY) }
    )

    return try {
        result.take() //need to block this thread untill completable has finished
    } catch (e: InterruptedException) {
        Result.RETRY
    }
}

同样不要忘记在您的 Worker 停止后释放资源,这是比使用 .blockingGet() 更大的优势,因为现在您可以正确地取消 Rx 任务。
override fun onStopped(cancelled: Boolean) {
    disposable.dispose()
}

请问你能否添加更多相同的代码。这有点抽象。 - Vinayak

1

虽然有点晚了,但这可能会帮助其他人,

您可以使用 CoroutineWorker,并在其中使用称为 suspendCancellableCoroutine 的东西,在内部进行 doWork(),它专门设计用于此目的。

以下是代码片段:

class FileDownloader(private val appContext: Context, params: WorkerParameters) :
CoroutineWorker(appContext, params) {

   override suspend fun doWork(): Result {

       try {

          suspendCancellableCoroutine<Int> { cancellableContinuation ->

              // Here you can call your asynchronous callback based network

                override fun onComplete() {
                        cancellableContinuation.resumeWith(
                            kotlin.Result.success(100))
                }

                override fun onError(error: Error?) {

                        cancellableContinuation.resumeWithException(
                            error?.connectionException ?: Throwable()
                        )
                   
               }
               
     }

     }catch (e: Exception) {
           return Result.failure()
      }

  return Result.success()
}
}

在这里,协程将会被暂停,直到你调用cancellableContinuation.resumeWith。


0

这个示例对于寻找 Firebase 和 Work Manager 的人可能会有用。它使用了 androidx.concurrent,因此您需要在 Android 项目中 [安装][1] 它。

import android.content.Context;
import androidx.annotation.NonNull;
import androidx.work.ListenableWorker;
import androidx.work.WorkerParameters;
import androidx.concurrent.futures.CallbackToFutureAdapter;

import com.google.android.gms.tasks.OnFailureListener;
import com.google.android.gms.tasks.OnSuccessListener;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.firebase.firestore.FirebaseFirestore;

public class MessageWorker extends ListenableWorker
{
    // Define the parameter keys:
    public static final String MESSAGE_ID = "messageId";
    public static final String MESSAGE_STATUS = "messageStatus";
    public MessageWorker(@NonNull Context context, @NonNull WorkerParameters 
      workerParams) {
        super(context, workerParams);
         }

    @NonNull
    @Override
    public ListenableFuture<Result> startWork() {
        return CallbackToFutureAdapter.getFuture(completer -> {
            String messageId = getInputData().getString(MESSAGE_ID);
            String messageStatus = getInputData().getString(MESSAGE_STATUS);
            FirebaseFirestore.getInstance()
                    .collection("messages")
                    .document(messageId)
                    .update("status", messageStatus)
                    .addOnSuccessListener(new OnSuccessListener<Void>() {
                        @Override
                        public void onSuccess(Void unused) {
                            completer.set(Result.success());

                        }
                    })
                    .addOnFailureListener(new OnFailureListener() {
                        @Override
                        public void onFailure(@NonNull Exception e) {
                            completer.set(Result.retry());
                        }
                    });

            // This value is used only for debug purposes: it will be used
            // in toString() of returned future or error cases.
            return "startSomeAsyncStuff";
        });

    }


}


  [1]: https://developer.android.com/jetpack/androidx/releases/concurrent#1.0.0

0
我也比较喜欢@TomH推荐的方法。虽然我曾在Firebase Storage上使用过它。将WorkManager与CountDownLatch一起使用对我很有帮助。这里是代码片段。使用Timber记录日志。
在任务完成之后但工作程序返回成功之前,它将Firebase的downloadUrl作为字符串返回。
@NonNull
@Override
public Result doWork() {
    mFirebaseStorage = mFirebaseStorage.getInstance();
    mTriviaImageStorageReference = mFirebaseStorage.getReference().child("images");

    CountDownLatch countDown = new CountDownLatch(2);
    Uri imageUri = Uri.parse(getInputData().getString(KEY_IMAGE_URI));

    try {

    // get the image reference
    final StorageReference imageRef = mTriviaImageStorageReference.child(imageUri.getLastPathSegment());

    // upload the image to Firebase
    imageRef.putFile(imageUri).continueWithTask(new Continuation<UploadTask.TaskSnapshot, Task<Uri>>() {
        @Override
        public Task<Uri> then(@NonNull Task<UploadTask.TaskSnapshot> task) throws Exception {
            if (!task.isSuccessful()) {
                throw task.getException();
            }
            countDown.countDown();
            return imageRef.getDownloadUrl();
        }
    }).addOnCompleteListener(new OnCompleteListener<Uri>() {
        @Override
        public void onComplete(@NonNull Task<Uri> task) {
            if (task.isSuccessful()) {
                Timber.d("Image was successfully uploaded to Firebase");
                Uri downloadUri = task.getResult();
                String imageUrl = downloadUri.toString();

                Timber.d(("URl of the image is: " + imageUrl));

                mOutputData = new Data.Builder()
                        .putString(KEY_FIREBASE_IMAGE_URL, imageUrl)
                        .build();
                countDown.countDown();
            } else {
                Toast.makeText(getApplicationContext(), "upload failed", Toast.LENGTH_SHORT).show();
                countDown.countDown();
            }
        }
    });
    countDown.await();
    return Result.success(mOutputData);

    } catch (Throwable throwable) {
        Timber.e(throwable, "Error uploading image");
        return Result.failure();
    }
}

我尝试了你的方法,但是我的doWork被调用了多次。 - gr_aman

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接