MqttClient对象的同步和异步接口无法工作

16
我创建了一个 MqttClient 类型的 client,并且如下面的代码所示,我创建了一个客户端并设置了它的 Asynchronous callback。问题是:

1- 当我运行程序时,“System.out.println("Client is Connected");”出现了,但我没有从onSuccessonFailure中接收到任何响应,为什么?在代码中做错了什么。

2- 我实现了static IMqttAsyncClient asynchClientCB = new IMqttAsyncClient()接口,但由于我有一个类型为MqttClient 的客户端,因此无法使用此IMqttAsyncClient接口。我尝试使用mqttAsynchClien,但由于我是为Java而编程,而不是为Android,因此无法使用它。如何使用IMqttAsyncClient接口?

Update_1

在下面的代码"Updated_code_1"中,我略微修改了代码,但每次成功连接到代理时,我都希望打印同步回调中的消息onSuccess,并且在连接终止(例如当我故意断开网络时)的情况下,打印同步回调中的消息onFailure。但是,在运行时,当我连接到代理时,onSuccessonFailure都没有显示任何东西。那么,它们的设计是为什么?

*Update_2_17_Dec_2014

我有一个问题可能会引导我们找到解决方案,那就是,如果我通过有线/无线网络连接到代理是否重要?这会改变同步和异步侦听器的行为吗?

Updated_1_code

MqttConnectOptions opts = getClientOptions();
        client = MQTTClientFactory.newClient(broker, port, clientID);

        if (client != null) {
            System.out.println("Client is not Null");
            client.setCallback(AsynchCallBack);
            if (opts != null) {
                iMQTTToken = client.connectWithResult(opts);
                publishMSG(client, TOPIC,"010101".getBytes(), QoS, pub_isRetained);
                iMQTTToken.setActionCallback(synchCallBack);
                if (client.isConnected()) {
                    System.out.println("Client CONNECTED.");
                    publishMSG(client, TOPIC,"010101".getBytes(), QoS, pub_isRetained);
                }
            }
        }
    ....
    ....
    ....
    ....
IMqttToken iMQTTToken = new IMqttToken() {

    @Override
    public void waitForCompletion(long arg0) throws MqttException {
        // TODO Auto-generated method stub
        System.out.println("@waitForCompletion(): waiting " + (arg0 * 1000) + " seconds for connection to be established.");
    }

    @Override
    public void waitForCompletion() throws MqttException {
        // TODO Auto-generated method stub
        System.out.println("@waitForCompletion(): waiting for connection to be established.");
    }

    @Override
    public void setUserContext(Object arg0) {
        // TODO Auto-generated method stub

    }

    @Override
    public void setActionCallback(IMqttActionListener arg0) {
        // TODO Auto-generated method stub
        arg0.onSuccess(iMQTTToken);
        //System.out.println(" " + arg0.onSuccess());
        //System.out.println(" " + arg0.onSuccess(iMQTTToken));
        iMQTTToken.setActionCallback(synchCallBack);
    }

    @Override
    public boolean isComplete() {
        // TODO Auto-generated method stub
        return false;
    }

    @Override
    public Object getUserContext() {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public String[] getTopics() {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public boolean getSessionPresent() {
        // TODO Auto-generated method stub
        return false;
    }

    @Override
    public MqttWireMessage getResponse() {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public int getMessageId() {
        // TODO Auto-generated method stub
        return 0;
    }

    @Override
    public int[] getGrantedQos() {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public MqttException getException() {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public IMqttAsyncClient getClient() {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public IMqttActionListener getActionCallback() {
        // TODO Auto-generated method stub
        return null;
    }
};

IMqttActionListener synchCallBack = new IMqttActionListener() {

    @Override
    public void onSuccess(IMqttToken arg0) {
        // TODO Auto-generated method stub
        System.out.println("@onSuccess: Connection Successful.");
    }

    @Override
    public void onFailure(IMqttToken arg0, Throwable arg1) {
        // TODO Auto-generated method stub
        System.out.println("@onFailure: Connection Failed.");
        setViewEnableState(Bconnect, true);
    }
};

MqttCallback AsynchCallBack = new MqttCallback() {

    @Override
    public void messageArrived(String topic, MqttMessage msg) throws Exception {
        // TODO Auto-generated method stub
        System.out.println("@messageArrived: Message Delivered.");
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        // TODO Auto-generated method stub
        System.out.println("@deliveryComplete: Delivery Completed.");
    }

    @Override
    public void connectionLost(Throwable thrw) {
        // TODO Auto-generated method stub
        System.out.println("@Connection Lost: Connection Lost.");
        setViewEnableState(Bconnect, true);
    }
};

新客户:

    MqttConnectOptions opts = new MqttConnectOptions();
    opts.setCleanSession(CS);
    opts.setKeepAliveInterval(KATimer);
    HashMap<Integer, WILL> LWTData = WILLFactory.newWILL("LWT", "LWT MS".getBytes(), 1, false);
    opts.setWill(LWTData.get(0).getWILLTopic(), 
            LWTData.get(0).getWILLPayLoad(), 
            LWTData.get(0).getWILLQoS(), 
            LWTData.get(0).isWILLRetained());

    client = MQTTClientFactory.newClient(IP, PORT, clientID);

    if (client != null) {
        System.out.println("client is not null");

        client.setCallback(AsynchCB);
        IMqttToken token = client.connectWithResult(opts);

        if (client.isConnected()) {
            System.out.println("Client is Connected");

            token.setActionCallback(new IMqttActionListener() {

                public void onSuccess(IMqttToken arg0) {
                    // TODO Auto-generated method stub
                    System.out.println("synchCB->@onSuccess(): Connection Successful");

                    try {
                        client.subscribe(TOPIC, QoS);
                    } catch (MqttException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    try {
                        client.disconnect();
                    } catch (MqttException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }

                public void onFailure(IMqttToken arg0, Throwable arg1) {
                    // TODO Auto-generated method stub
                    System.out.println("synchCB->@onFailure(): Connection Failed");
                }
            });
        }else {
            System.out.println("client is not connected");
        }
    }else {
        System.out.println("client = null");
    }
异步回调:
/**
 * Asynchronous Callback to inform the user about events that might happens Asynchronously. If it is not used, any pending 
 * messages destined to the client would not be received.
 */
private static MqttCallback AsynchCB = new MqttCallback() {

    public void messageArrived(String topic, MqttMessage msg) throws Exception {
        // TODO Auto-generated method stub
        System.out.println("AsynchCB->@messageArrived(): ");

        System.out.println("Topic: " + topic);
        System.out.println("MSG: " + msg.toString());

    }

    public void deliveryComplete(IMqttDeliveryToken arg0) {
        // TODO Auto-generated method stub
        System.out.println("AsynchCB->@deliveryComplete(): ");
    }

    public void connectionLost(Throwable arg0) {
        // TODO Auto-generated method stub
        System.out.println("AsynchCB->@connectionLost(): ");
    }
};

如果您从onSuccess()中删除订阅并在检查连接成功后立即添加它,会发生什么?我的意思是,在检查isConnected()返回true之后立即放置此行client.subscribe(TOPIC, QoS)。我有点困惑为什么您希望在实际订阅或除设置回调侦听器外,还未执行任何与连接相关的操作之前调用onSuccess()。 - kha
@kha 谢谢您的评论。实际上,在阅读了您的评论后,我似乎误解了 onSuccess 和 onFailure 的作用。因为我认为 onSuccess() 和 onFailure 是同步回调函数,当连接成功时会调用 "onSuccess",或者失败时会调用 "onFailure",所以我在 onSuccess() 中订阅,认为当连接建立/成功时就进行订阅。我是对还是错?请指导。 - rmaik
@kha [您的连接已经成功建立。您已经在这行代码中正确地检查它:if (client.isConnected())] ===> 是的,好的,我正在正确地检查它,但是,如果网络连接意外断开并重新连接,我将不会收到任何来自"onSuccess"或"onFailure"的响应。据我所知,上述同步回调旨在报告此类操作"连接/断开连接"。请看上面的更新。 - rmaik
网络连接类型不重要,我已经在GridGain网络中工作过,所有节点都使用无线网络连接,因此无论进程是否异步,这都不应该成为问题。 - aurelius
@kha 和 @rmaik,请帮帮我。http://stackoverflow.com/questions/36056381/the-listener-of-publish-method-does-not-work-asynchronously-on-mqtt-in-eclipse-p - séan35
显示剩余2条评论
2个回答

2

您的机器上托管客户端的机器,该机器处理您的回调,可能会被机器防火墙阻止出站口。


我不这么认为,因为我可以正常发布。 - rmaik
如果您的客户使用特定端口进行出版,那么您是正确的;如果您不确定,请检查一下。 - aurelius

0

如果上一个周期中的连接没有成功断开,那么再次尝试连接将不会产生任何响应。 即,如果为已连接的客户端调用connect,则不会产生任何响应。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接