Java Eclipse Paho实现 - 自动重连

15

我正在尝试在我的项目中实现 eclipse.paho,以连接Mqtt Broker(用于订阅和发布)。问题是,当我使用订阅功能(实现MqttCallback接口)时,我无法弄清楚如何在连接丢失后重新连接。MqttCallback接口有一个connectionLost方法,但它只有在调试连接丢失原因时有用。我搜索了一下,但找不到建立自动重连的方法。你能否提供有关此问题的建议或文档?

3个回答

12
我正在使用paho客户端1.2.0。 通过MqttClient.setAutomaticReconnect(true)和接口MqttCallbackExtended API,并且多亏https://github.com/eclipse/paho.mqtt.java/issues/493,当连接到代理失效时,我可以自动重新连接。 请参见下面的代码。
//Use the MqttCallbackExtended to (re-)subscribe when method connectComplete is invoked
public class MyMqttClient implements MqttCallbackExtended {
    private static final Logger logger = LoggerFactory.getLogger(MqttClientTerni.class);
    private final int qos = 0;
    private String topic = "mytopic";
    private MqttClient client;

    public MyMqttClient() throws MqttException {
        String host = "tcp://localhost:1883";
        String clientId = "MQTT-Client";

        MqttConnectOptions conOpt = new MqttConnectOptions();
        conOpt.setCleanSession(true);
        //Pay attention here to automatic reconnect
    conOpt.setAutomaticReconnect(true);
        this.client = new org.eclipse.paho.client.mqttv3.MqttClient(host, clientId);
        this.client.setCallback(this);
        this.client.connect(conOpt);
    }

    /**
     * @see MqttCallback#connectionLost(Throwable)
     */
    public void connectionLost(Throwable cause) {
        logger.error("Connection lost because: " + cause);


    /**
     * @see MqttCallback#deliveryComplete(IMqttDeliveryToken)
     */
    public void deliveryComplete(IMqttDeliveryToken token) {
    }

    /**
     * @see MqttCallback#messageArrived(String, MqttMessage)
     */
    public void messageArrived(String topic, MqttMessage message) throws MqttException {
        logger.info(String.format("[%s] %s", topic, new String(message.getPayload())));
    }

    public static void main(String[] args) throws MqttException, URISyntaxException {
        MyMqttClient s = new MyMqttClient();
    }

    @Override
    public void connectComplete(boolean arg0, String arg1) {
        try {
      //Very important to resubcribe to the topic after the connection was (re-)estabslished. 
      //Otherwise you are reconnected but you don't get any message
        this.client.subscribe(this.topic, qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }

    }
}

10

最好的方法是将连接逻辑结构化,使其成为一个独立的方法,这样可以从MqttCallback实例中的connectionLost回调中再次调用它。

connectionLost方法会传入一个Throwable对象,该对象将是触发断开连接的异常,因此您可以决定根本原因以及这可能会影响何时/如何重新连接。

连接方法应连接并订阅您需要的主题。

像这样:

public class PubSub {

  MqttClient client;
  String topics[] = ["foo/#", "bar"];
  MqttCallback callback = new MqttCallback() {
    public void connectionLost(Throwable t) {
      this.connect();
    }

    public void messageArrived(String topic, MqttMessage message) throws Exception {
      System.out.println("topic - " + topic + ": " + new String(message.getPayload()));
    }

    public void deliveryComplete(IMqttDeliveryToken token) {
    }
  };

  public static void main(String args[]) {
    PubSub foo = new PubSub();
  }

  public PubSub(){
    this.connect();
  }

  public void connect(){
    client = new MqttClient("mqtt://localhost", "pubsub-1");
    client.setCallback(callback);
    client.connect();
    client.subscribe(topics);
  }

}

针对新问题提出新的问题。 - hardillb
我可能来晚了……一旦调用MqttCallback :: connectionLost,它将尝试重新连接。如果网络仍然不稳定,那么连接尝试将失败。因此,在那个时候,Mqtt部分将停止尝试(我假设)。当网络恢复后调用连接的下一步应该怎么做?(BroadcastReceiver onReceive?) - brechmos
1
这个问题和答案与Android无关,请提出一个新的问题。 - hardillb

4

要使用自动重连功能,只需在MqttConnectOptions对象上设置setAutomaticReconnect(true)即可。

MqttAndroidClient mqttClient = new MqttAndroidClient(context, mqttUrl, clientId);

MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setAutomaticReconnect(true);

mqttClient.connect(mqttConnectOptions, null, mqttActionListener());

我尝试了自动重新连接的解决方案,但仍然遇到连接错误。我使用mosquitto docker代理,因此停止和启动代理非常容易。还有其他缺少的配置可以使其正常工作吗? - Gadi
1
这个东西根本不起作用。回调被调用一次,抛出一个错误,然后paho似乎就挂掉了。 - spyro
@Gadi - 请确保您的clientId是唯一的。我现在正在尝试在末尾放置一个截断的时间戳,并在断开连接时清除clientId。 - BLH

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接