我正在尝试在我的项目中实现 eclipse.paho
,以连接Mqtt Broker(用于订阅和发布)。问题是,当我使用订阅功能(实现MqttCallback
接口)时,我无法弄清楚如何在连接丢失后重新连接。MqttCallback接口有一个connectionLost方法,但它只有在调试连接丢失原因时有用。我搜索了一下,但找不到建立自动重连的方法。你能否提供有关此问题的建议或文档?
//Use the MqttCallbackExtended to (re-)subscribe when method connectComplete is invoked
public class MyMqttClient implements MqttCallbackExtended {
private static final Logger logger = LoggerFactory.getLogger(MqttClientTerni.class);
private final int qos = 0;
private String topic = "mytopic";
private MqttClient client;
public MyMqttClient() throws MqttException {
String host = "tcp://localhost:1883";
String clientId = "MQTT-Client";
MqttConnectOptions conOpt = new MqttConnectOptions();
conOpt.setCleanSession(true);
//Pay attention here to automatic reconnect
conOpt.setAutomaticReconnect(true);
this.client = new org.eclipse.paho.client.mqttv3.MqttClient(host, clientId);
this.client.setCallback(this);
this.client.connect(conOpt);
}
/**
* @see MqttCallback#connectionLost(Throwable)
*/
public void connectionLost(Throwable cause) {
logger.error("Connection lost because: " + cause);
/**
* @see MqttCallback#deliveryComplete(IMqttDeliveryToken)
*/
public void deliveryComplete(IMqttDeliveryToken token) {
}
/**
* @see MqttCallback#messageArrived(String, MqttMessage)
*/
public void messageArrived(String topic, MqttMessage message) throws MqttException {
logger.info(String.format("[%s] %s", topic, new String(message.getPayload())));
}
public static void main(String[] args) throws MqttException, URISyntaxException {
MyMqttClient s = new MyMqttClient();
}
@Override
public void connectComplete(boolean arg0, String arg1) {
try {
//Very important to resubcribe to the topic after the connection was (re-)estabslished.
//Otherwise you are reconnected but you don't get any message
this.client.subscribe(this.topic, qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
}
最好的方法是将连接逻辑结构化,使其成为一个独立的方法,这样可以从MqttCallback
实例中的connectionLost
回调中再次调用它。
connectionLost
方法会传入一个Throwable对象,该对象将是触发断开连接的异常,因此您可以决定根本原因以及这可能会影响何时/如何重新连接。
连接方法应连接并订阅您需要的主题。
像这样:
public class PubSub {
MqttClient client;
String topics[] = ["foo/#", "bar"];
MqttCallback callback = new MqttCallback() {
public void connectionLost(Throwable t) {
this.connect();
}
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("topic - " + topic + ": " + new String(message.getPayload()));
}
public void deliveryComplete(IMqttDeliveryToken token) {
}
};
public static void main(String args[]) {
PubSub foo = new PubSub();
}
public PubSub(){
this.connect();
}
public void connect(){
client = new MqttClient("mqtt://localhost", "pubsub-1");
client.setCallback(callback);
client.connect();
client.subscribe(topics);
}
}
要使用自动重连功能,只需在MqttConnectOptions
对象上设置setAutomaticReconnect(true)
即可。
MqttAndroidClient mqttClient = new MqttAndroidClient(context, mqttUrl, clientId);
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setAutomaticReconnect(true);
mqttClient.connect(mqttConnectOptions, null, mqttActionListener());