非常重要的是,为了接收订阅主题的消息,您需要调用client.setCallback并实现MqttCallbackHandler。以下是可以发布和订阅等操作的代码示例。
以下代码最初发布mqtt主题和载荷:
- 主题:AndroidPhone
- 载荷:Hello, I am an Android Mqtt Client.
该代码订阅了主题“tester”。如果它收到一个主题为“tester”,载荷为“Alarm Activated”的消息,则通过上述回调发布以下主题和载荷:
- 主题:Fitlet
- 载荷:Hello, the Mosquitto Broker got your message saying that the Alarm is Activated.
如果您正在使用Mosquitto,则在终端中运行以下命令将触发此消息。
mosquitto_pub -h 192.168.9.100 -t tester -m "Alarm Activated" -u fred -P 1234
我的Mosquitto用户名为fred,密码为1234
代码:
package colin.android.mqtt;
import android.support.v7.app.AppCompatActivity;
import android.os.Bundle;
import android.util.Log;
import android.widget.Toast;
import org.eclipse.paho.android.service.MqttAndroidClient;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import java.io.UnsupportedEncodingException;
public class MainActivity extends AppCompatActivity {
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
String clientId = MqttClient.generateClientId();
final MqttAndroidClient client = new MqttAndroidClient(this.getApplicationContext(), "tcp://192.168.9.100:1883", clientId);
client.setCallback(new MqttCallbackHandler(client));
MqttConnectOptions options = new MqttConnectOptions();
try {
options.setUserName("fred");
options.setPassword("1234".toCharArray());
IMqttToken token = client.connect(options);
token.setActionCallback(new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
Log.d("mqtt", "onSuccess");
MqttMessage message = new MqttMessage("Hello, I am an Android Mqtt Client.".getBytes());
message.setQos(2);
message.setRetained(false);
String topic = "AndroidPhone";
try {
client.publish(topic, message);
Log.i("mqtt", "Message published");
} catch (MqttPersistenceException e) {
e.printStackTrace();
} catch (MqttException e) {
e.printStackTrace();
}
String subtopic = "tester";
int qos = 1;
try {
IMqttToken subToken = client.subscribe(subtopic, qos);
subToken.setActionCallback(new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
Log.i("mqtt", "subscription success");
}
@Override
public void onFailure(IMqttToken asyncActionToken,
Throwable exception) {
Log.i("mqtt", "subscription failed");
}
});
} catch (MqttException e) {
e.printStackTrace();
}
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
Log.d("mqtt", "onFailure");
}
});
} catch (MqttException e) {
e.printStackTrace();
}
}
}
class MqttCallbackHandler implements MqttCallbackExtended {
private final MqttAndroidClient client;
public MqttCallbackHandler (MqttAndroidClient client)
{
this.client=client;
}
@Override
public void connectComplete(boolean b, String s) {
Log.w("mqtt", s);
}
@Override
public void connectionLost(Throwable throwable) {
}
public void AlarmActivatedMessageReceived()
{
MqttMessage msg= new MqttMessage("Hello, the Mosquitto Broker got your message saying that the Alarm is Activated.".getBytes());
try {
this.client.publish("Fitlet", msg);
Log.i("mqtt", "Message published");
} catch (MqttPersistenceException e) {
e.printStackTrace();
} catch (MqttException e) {
e.printStackTrace();
}
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
Log.w("mqtt", mqttMessage.toString());
if (mqttMessage.toString().contains("Alarm Activated"))
{
AlarmActivatedMessageReceived();
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
}