AWS IoT Android应用程序通过MQTT连接时抛出MqttException(0)- java.io.IOException:已连接。

8

我正在尝试在我的Android应用程序中使用“使用Cognito用户池进行Cognito-Identity身份验证”。 当我单独运行Cognito用户池身份验证时,它能很好地运行,并且我也看到了JWTToken。 当我使用未经身份验证的角色运行“PubSub”示例应用程序时,它按预期工作。 当我将这两个功能集成到一个应用程序中时,该应用程序抛出以下错误。

W/System.err: MqttException (0) - java.io.IOException: Already connected
W/System.err:     at org.eclipse.paho.client.mqttv3.internal.ExceptionHelper.createMqttException(ExceptionHelper.java:38)
W/System.err:     at org.eclipse.paho.client.mqttv3.internal.ClientComms$ConnectBG.run(ClientComms.java:664)
W/System.err:     at java.lang.Thread.run(Thread.java:761)
W/System.err: Caused by: java.io.IOException: Already connected
W/System.err:     at java.io.PipedOutputStream.connect(PipedOutputStream.java:100)
W/System.err:     at java.io.PipedInputStream.connect(PipedInputStream.java:195)
W/System.err:     at org.eclipse.paho.client.mqttv3.internal.websocket.WebSocketReceiver.<init>(WebSocketReceiver.java:42)
W/System.err:     at org.eclipse.paho.client.mqttv3.internal.websocket.WebSocketSecureNetworkModule.start(WebSocketSecureNetworkModule.java:78)
W/System.err:     at org.eclipse.paho.client.mqttv3.internal.ClientComms$ConnectBG.run(ClientComms.java:650)
W/System.err:   ... 1 more

自上周四开始,我一直在尝试解决这个问题,但仍然卡在同一个位置。真的不知道应该去哪里查找!

我正在添加我的认证(Cognito用户池认证)活动和连接活动。

AmazonCognitoIdentityProviderClient identityProviderClient = new 
AmazonCognitoIdentityProviderClient(new AnonymousAWSCredentials(), new ClientConfiguration());
identityProviderClient.setRegion(Region.getRegion(Regions.US_WEST_2));
CognitoUserPool userPool = new CognitoUserPool(getApplicationContext(), "us-west-2_ghtcc6ho9", "4t0mk45hNso69dp2j4jvel5ghm", "1jmq0lhhq721oif9k6nug31c29i760vihua8hvrgu5umfr2a1vd7", identityProviderClient);
cogUser = userPool.getUser();
authenticationHandler = new AuthenticationHandler() {



        @Override
        public void onSuccess(CognitoUserSession userSession, CognitoDevice newDevice) {
            String ids = userSession.getIdToken().getJWTToken();
            Log.d("MyToken","session id___"+userSession.getIdToken().getExpiration()+"___"+userSession.getIdToken().getIssuedAt());
            Intent pubSub = new Intent(MainActivity.this, PubSubActivity.class);
            pubSub.putExtra("token",""+ids);
            startActivity(pubSub);
            //MainActivity.this.finish();

        }

        @Override
        public void getAuthenticationDetails(AuthenticationContinuation authenticationContinuation, String userId) {
            Log.d("MyToken","getAuthenticationDetails");
            AuthenticationDetails authenticationDetails = new AuthenticationDetails("shone", "172737", null);
            authenticationContinuation.setAuthenticationDetails(authenticationDetails);
            // Allow the sign-in to continue
            authenticationContinuation.continueTask();
        }

        @Override
        public void getMFACode(MultiFactorAuthenticationContinuation multiFactorAuthenticationContinuation) {
            Log.d("MyToken","getMFACode");
            multiFactorAuthenticationContinuation.continueTask();
        }

        @Override
        public void authenticationChallenge(ChallengeContinuation continuation) {
            Log.d("MyToken","authenticationChallenge"+continuation.getChallengeName());
            newPasswordContinuation.continueTask();
        }

        @Override
        public void onFailure(Exception exception) {
            exception.printStackTrace();
            Log.d("MyToken","onFailure");
        }
    };
     cogUser.getSessionInBackground(authenticationHandler); 

当到达“OnSuccess”时,我会启动连接活动并通过Intent传递我的会话令牌。移动到下一个活动。

private static final String COGNITO_POOL_ID = "us-west-2:a153a090-508c-44c0-a9dd-efd450298c4b";
private static final Regions MY_REGION = Regions.US_WEST_2;
AWSIotMqttManager mqttManager;
String clientId;
AWSCredentials awsCredentials;
CognitoCachingCredentialsProvider credentialsProvider;
 @Override
protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_main);
    Intent intent = getIntent();
if(null == intent){
    Toast.makeText(getApplicationContext(), "Token is null", Toast.LENGTH_SHORT).show();
}else {
    token = intent.getStringExtra("token");
}
 clientId = UUID.randomUUID().toString();
    credentialsProvider = new CognitoCachingCredentialsProvider(
            getApplicationContext(), 
            COGNITO_POOL_ID, 
            MY_REGION 
    );
mqttManager = new AWSIotMqttManager(clientId, CUSTOMER_SPECIFIC_ENDPOINT);
 Map loginsMap = new HashMap();
    loginsMap.put("cognito-idp.us-west-2.amazonaws.com/us-west-2_ghtcc6ho9", token);
    credentialsProvider.setLogins(loginsMap);
    Log.d("SESSION_ID", ""+token);
    new Thread(new Runnable() {
        @Override
        public void run() {
            credentialsProvider.refresh();
            awsCredentials = credentialsProvider.getCredentials();
            Log.d("SESSION_ID B: ", ""+awsCredentials.getAWSAccessKeyId());
            Log.d("SESSION_ID C: ", ""+awsCredentials.getAWSSecretKey());
        }
    }).start();
}

 View.OnClickListener connectClick = new View.OnClickListener() {
    @Override
    public void onClick(View v) {

        Log.d(LOG_TAG, "clientId = " + clientId);

        try {
            mqttManager.connect(credentialsProvider, new AWSIotMqttClientStatusCallback() {
                @Override
                public void onStatusChanged(final AWSIotMqttClientStatus status,
                        final Throwable throwable) {
                    Log.d(LOG_TAG, "Status = " + String.valueOf(status)+"______"+((null !=throwable)?throwable.getMessage():""));

                    runOnUiThread(new Runnable() {
                        @Override
                        public void run() {
                            if (status == AWSIotMqttClientStatus.Connecting) {
                                tvStatus.setText("Connecting...");

                            } else if (status == AWSIotMqttClientStatus.Connected) {
                                tvStatus.setText("Connected");

                            } else if (status == AWSIotMqttClientStatus.Reconnecting) {
                                if (throwable != null) {
                                    Log.e(LOG_TAG, "Connection error.", throwable);
                                }
                                tvStatus.setText("Reconnecting");
                            } else if (status == AWSIotMqttClientStatus.ConnectionLost) {
                                if (throwable != null) {
                                    Log.e(LOG_TAG, "Connection error.", throwable);
                                    throwable.printStackTrace();
                                }
                                tvStatus.setText("Disconnected");
                            } else {
                                tvStatus.setText("Disconnected");

                            }
                        }
                    });
                }
            });
        } catch (final Exception e) {
            Log.e(LOG_TAG, "Connection error.", e);
        }
    }
};

我的代码有什么问题?当调用MQTT连接时,为什么会抛出异常?任何帮助都将不胜感激。

身份验证后,应将策略附加到身份。该文档中缺少此信息。我通过使用“attachPolicy()”方法解决了这个问题。 - Shibin Francis
3个回答

8

我花了将近一周的时间来解决这个问题。

完整的操作流程如下 -> 成功登录后,您将获得一个JWT令牌。

String idToken = cognitoUserSession.getIdToken().getJWTToken();

将其放入一个映射中。
Map<String, String> logins = new HashMap<String, String>(); 
//fill it with Cognito User token
logins.put("cognito-idp.<REGION>.amazonaws.com/<COGNITO_USER_POOL_ID>", idToken);

然后将其用于两个位置(未在任何文档中说明!)
CognitoCachingCredentialsProvider credentialsProvider = new 
CognitoCachingCredentialsProvider(context, IDENTITY_POOL_ID, REGION);
credentialsProvider.setLogins(logins);

并且

AmazonCognitoIdentity cognitoIdentity = new AmazonCognitoIdentityClient(credentialsProvider);
GetIdRequest getIdReq = new GetIdRequest();
getIdReq.setLogins(logins); //or if you have already set provider logins just use credentialsProvider.getLogins()
getIdReq.setIdentityPoolId(COGNITO_POOL_ID);
GetIdResult getIdRes = cognitoIdentity.getId(getIdReq);

之后,您仍需要进行一些调用。

AttachPrincipalPolicyRequest attachPolicyReq = new AttachPrincipalPolicyRequest(); //in docs it called AttachPolicyRequest but it`s wrong
attachPolicyReq.setPolicyName("allAllowed"); //name of your IOTAWS policy
attachPolicyReq.setPrincipal(getIdRes.getIdentityId());
new AWSIotClient(credentialsProvider).attachPrincipalPolicy(attachPolicyReq);

只有在此之后,您才能启用连接按钮并继续操作。

mqttManager.connect(credentialsProvider, new AWSIotMqttClientStatusCallback() {

为了这段简短的代码,我花费了很多时间...


我花了很多时间在这上面,但是在他们的文档中找不到任何有用的信息。之后,我购买了付费开发者支持会员,并向他们询问解决方案。他们花了一周时间才找到了解决方案,即“将策略附加到身份”。然后我要求他们展示在文档中哪里提到了这个解决方案,但是之后我没有收到任何回复。 :D - Shibin Francis

4

我也遇到了同样的错误 -

Feb 27, 2019 10:23:09 AM com.amazonaws.services.iot.client.mqtt.AwsIotMqttConnectionListener onFailure
WARNING: Connect request failure
MqttException (0) - java.io.IOException: Already connected
    at org.eclipse.paho.client.mqttv3.internal.ExceptionHelper.createMqttException(ExceptionHelper.java:38)
    at org.eclipse.paho.client.mqttv3.internal.ClientComms$ConnectBG.run(ClientComms.java:664)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Already connected
    at java.io.PipedOutputStream.connect(PipedOutputStream.java:100)

但问题是不同的。
首先,您不需要从代码中调用attachPrincipalPolicy。您也可以使用命令行。您可以像这样做- aws iot attach-principal-policy --principal us-east-1:1c973d17-98e6-4df6-86bf-d5cedc1fbc0d --policy-name "thingpolicy" --region us-east-1 --profile osfg 您将从身份池的身份浏览器中获取主体ID。现在让我们来看看错误-
要成功连接到具有经过身份验证的Cognito凭据的mqtt,您需要两个正确的策略-
1.与您的身份池对应的已验证角色应允许所有mqtt操作。
2.AWS IoT策略应允许相同的操作,并且您需要将cognito身份与此策略关联起来。我们使用attachPrincipalPolicy来这样做。
如果有任何步骤遗漏,我们会收到上述错误。我同意这个错误是误导的-对于这个错误,Already connected对我来说毫无意义。我通常认为它与clientId有关,它应该是唯一的。但是希望AWS的人们在某些时候能够改进它。
对于我的特定情况,问题在于第1点。尽管我的IoT策略具有所需的所有权限,但与身份池对应的auth角色没有。因此,请确保您这样做。
我创建了一个YouTube视频来展示这一点:https://www.youtube.com/watch?v=j2KJVHGHaFc

如何在命令行中获取令牌,我正在使用带有AWSIotMqttManager的Android SDK。 - Sweety Bertilla
@SweetyBertilla 你有解决办法吗? - undefined

0

当客户端连接到代理时,它具有唯一的客户端ID。如果客户端尝试使用相同的客户端ID连接,则会出现此错误。请使用不同的客户端ID,例如foo1、foo2、foo3等。


我不是很清楚,请您能否再解释得详细一些? - Shibin Francis
兄弟,你是在谈论第二个连接还是第一个成功连接之后的情况。我正在尝试首先建立第一个连接。 - Shibin Francis

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接