RxJava在Android中等待回调完成后再返回数据

5

我是一个RxJava的新手。我的Android应用程序正在使用AWS Cognito SDK进行身份验证。我有一个AwsAuthClient类来处理调用SDK并返回结果。我有一个片段调用AwsAuthClient中的SignUp方法。我需要将注册结果返回给片段,以便它可以做出适当的反应。

RegisterFragment类:

public class RegisterFragment{
    AwsAuthClient authClient;

    public void onCreateAccountClick() {
        Subscription createSubscription = authClient.SignUp(params)
            .compose(Transformers.applyIoToMainSchedulers())
            .subscribe((CognitoUser currentUser) -> {
                transitionToVerificationScreen();
             }, (Throwable throwable) -> {
                 // Report the error.
             });
    }
}

这是 AwsAuthClient:

public class AwsAuthClient {

    public void SignUp(CreateParams createParams){

        // Create a CognitoUserAttributes object and add user attributes
        CognitoUserAttributes userAttributes = new CognitoUserAttributes();

        // Add the user attributes. Attributes are added as key-value pairs
        // Adding user's given name.
        // Note that the key is "given_name" which is the OIDC claim for given name
        userAttributes.addAttribute("given_name", createParams.getFirstname() + " " + createParams.getLastname());

        // Adding user's phone number
        userAttributes.addAttribute("phone_number", createParams.getPhone());

        // Adding user's email address
        userAttributes.addAttribute("email", createParams.getPhone());

        SignUpHandler signupCallback = new SignUpHandler() {

            @Override
            public void onSuccess(CognitoUser cognitoUser, boolean userConfirmed, CognitoUserCodeDeliveryDetails cognitoUserCodeDeliveryDetails) {
                // Sign-up was successful

                currentUser = cognitoUser;

                // Check if this user (cognitoUser) needs to be confirmed
                if(!userConfirmed) {
                    // This user must be confirmed and a confirmation code was sent to the user
                    // cognitoUserCodeDeliveryDetails will indicate where the confirmation code was sent
                    // Get the confirmation code from user
                    Timber.d("Sent confirmation code");
                }
                else {
                    // The user has already been confirmed
                    Timber.d("User has already been confirmed.");
                }
            }

            @Override
            public void onFailure(Exception exception) {
                // Sign-up failed, check exception for the cause
            }
        };

        userPool.signUpInBackground(userId, password, userAttributes, null, signupCallback);
    }
}

我该如何将onSuccess或OnFailure的结果返回到RegisterFragment类中?

RxJava1还是RxJava2?编辑:算了吧...Subscription告诉我这是RxJava1。 - Jon
2个回答

2
似乎Cognito SDK已经提供了异步获取信息的方法。为了将其包装成rx流,您应该考虑使用一个"Subject"。
"Subject"既可以作为能够发出数据的"Observable",也可以作为能够接收数据的"Observer"。"Subject"可以等待接收回调数据,获取数据,然后将其发射到流上。
public Observable<CognitoUser> SignUp(CreateParams createParams){
    BehaviorSubject<CognitoUser> subject = BehaviorSubject.create();

    // ...

    SignUpHandler signupCallback = new SignUpHandler() {

        @Override
        public void onSuccess(CognitoUser cognitoUser, boolean userConfirmed, CognitoUserCodeDeliveryDetails cognitoUserCodeDeliveryDetails) {
            // Sign-up was successful

            // Check if this user (cognitoUser) needs to be confirmed
            if(!userConfirmed) {
                // This user must be confirmed and a confirmation code was sent to the user
                // cognitoUserCodeDeliveryDetails will indicate where the confirmation code was sent
                // Get the confirmation code from user
                Timber.d("Sent confirmation code");
            }
            else {
                // The user has already been confirmed
                Timber.d("User has already been confirmed.");
            }

            subject.onNext(cognitoUser);
            subject.onComplete();
        }

        @Override
        public void onFailure(Exception exception) {
            subject.onError(exception);
        }
    };

    userPool.signUpInBackground(userId, password, userAttributes, null, signupCallback);
    return subject;
}

这绝对是一个“热”可观察对象。如何处理可观察对象并没有对错之分,因为这取决于开发人员的决定(热 vs 冷)。之所以在这里建议使用BehaviorSubject,正是因为这是一个“热”可观察对象。 :) - Jon
你的解决方案场景是这样的:我调用了SignUp->它调用signUpInBackground->一旦我订阅它->我得到了重放值。 正常情况应该是这样的:我调用SignUp->我得到一个Observable->我订阅它->它调用signUpInBackground->我得到了SignUpValue。 请注意,你的重放值并不完全等同于你所调用的值,它可能是你调用了这个方法两次,并从它的重放值中得到了错误的值。 - Phoenix Wang
想象一下,你调用了signUp方法一次,在订阅之前就获得了OK的结果。由于某种原因,你第二次调用它时失去了互联网连接。你只能为两个调用都获得失败的结果。 - Phoenix Wang
@PhoenixWang,你能提供一个 RxJava1 的替代方案吗? - stackunderflows
@stackunderflows 我编辑了我的答案。你可以直接使用 Observable.create(Action1>,BackPressureMode),它基本上是 RxJava 2 的相同的 create 操作符。 - Phoenix Wang
显示剩余2条评论

1
如果您正在使用RxJava2,您可以使用create()运算符来创建自己的异步调用:
public class AwsAuthClient {

    public Observable<CognitoUser> SignUp(CreateParams createParams){
        return Observable.create(emitter -> {
            SignUpHandler signupCallback = new SignUpHandler() {

                @Override
                public void onSuccess(CognitoUser cognitoUser, boolean userConfirmed, CognitoUserCodeDeliveryDetails cognitoUserCodeDeliveryDetails) {
                // Sign-up was successful
                emitter.onNext(cognitoUser);
                // Check if this user (cognitoUser) needs to be confirmed
                if(!userConfirmed) {
                // This user must be confirmed and a confirmation code was sent to the user
                // cognitoUserCodeDeliveryDetails will indicate where the confirmation code was sent
                // Get the confirmation code from user
                Timber.d("Sent confirmation code");
                }
                else {
                    // The user has already been confirmed
                    Timber.d("User has already been confirmed.");
                }
                emitter.onComplete();
            }

            @Override
            public void onFailure(Exception exception) {
                // Sign-up failed, check exception for the cause
                emitter.onError(exception);
            }
        };
        //cancel the call
        Observable.setCancellable(//your cancel code)
    })
}

编辑:如果您正在使用RxJava1(最新版本1.3.2),您可以只使用Observable.create(Action1>,BackPressureMode),而无需使用create,这是安全的

        Observable.create(new Action1<Emitter<CognitoUser extends Object>>() {
        @Override
        public void call(Emitter<CognitoUser> emitter) {
            SignUpHandler signupCallback = new SignUpHandler() {
                @Override
                public void onSuccess(CognitoUser cognitoUser, boolean userConfirmed, CognitoUserCodeDeliveryDetails cognitoUserCodeDeliveryDetails) {
                    if (!userConfirmed) {
                        Timber.d("Sent confirmation code");
                    } else {
                        Timber.d("User has already been confirmed.");
                    }
                    emitter.onNext(cognitoUser);

                    emitter.onComplete();
                }

                @Override
                public void onFailure(Exception exception) {
                    emitter.onError(exception);
                }
            };
            emitter.setCancellation(new Cancellable() {
                @Override
                public void cancel() throws Exception {
                    //Your Cancellation
                }
            });
            signUpInBackground(userId, password, userAttributes, null, signupCallback);
        }

        //Because RxJava 1 doesn't have Flowable so you need add backpressure by default.
    }, Emitter.BackpressureMode.NONE );

在RxJava1中要避免使用create!那么,在RxJava1中有什么相当于create的方法呢? - Eugen Pechanec
@EugenPechanec 是的。如果您正在使用RxJava 1的更新版本,则有一些重载可以安全地为您创建类似于RxJava2的Observable,例如:Observable<T> create(Action1<Emitter<T>> emitter, Emitter.BackpressureMode backpressure) - Phoenix Wang
由于我使用的是1.X版本,另一个答案最终对我起作用了,但是对于2版本这是有用的知识。 - stackunderflows
@stackunderflows 在 RxJava 1 中有与 RxJava2 create 相当的运算符,但我真的记不清是什么了。fromEmitter/fromAsync 或类似的东西。而且主题也应该小心使用。 - Phoenix Wang
@stackunderflows,就这个主题的解决方案,存在一些问题。请查看该答案下的评论。 - Phoenix Wang
@PhoenixWang 我尝试使用你的答案,但它无法编译。当我悬停在“extends”上时,它会显示“unexpected bound”。 - stackunderflows

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接