作者:j7988l28 | 来源:互联网 | 2023-09-16 10:43
最近在学习android推送的实现,Client用的paho,broker用的Apollo,在写测试demo时出现了以下情况:在app中subscribe主题topic1
最近在学习android推送的实现,Client用的paho,broker用的Apollo,在写测试demo时出现了以下情况:
- 在app中subscribe主题topic1,broker中没有topic1,则后台publish消息到topic1后,app的connectionLost方法被回调,而messageArrived并没有被回调;
- 在app中subscribe主题topic1,broker中存在topic1,则app的MqttCallback中的messageArrived方法被回调。如果此时在后台再次publish消息到topic1,则app的connectionLost被回调。
可能我的表达不够清晰,请看程序代码,请问我的代码有什么问题。
Android代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public class MainActivity extends Activity {
private TextView tv;
private Handler mhandler = new Handler() {
@Override
public void handleMessage(Message msg) {
super.handleMessage(msg);
String strCOntent= tv.getText().toString();
strContent += "\n" + msg.getData().getString("content");
//每接收一次消息,将消息内容追加到textview中显示
tv.setText(strContent);
}
};
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
tv = (TextView) findViewById(R.id.tv);
try {
Subscribe.doTest(mhandler);
} catch (Exception e) {
e.printStackTrace();
}
}
} |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| public class Subscribe {
private final static String userName = "admin";
private final static String passWord = "password";
private final static String HOST = "tcp://10.0.2.2:61613";
private final static String TOPIC = "t1";
private static MqttClient client;
public static void doTest(final Handler mhandler) throws Exception {
client = new MqttClient(HOST, "java_client", new MemoryPersistence());
MqttConnectOptions optiOns= new MqttConnectOptions();
options.setCleanSession(true);
options.setUserName(userName);
options.setPassword(passWord.toCharArray());
client.setCallback(new MqttCallback() {
@Override
public void messageArrived(String arg0, MqttMessage arg1)
throws Exception {
Message msg = Message.obtain();
Bundle bundle = new Bundle();
bundle.putString("content", arg1.toString());
msg.setData(bundle);
mhandler.sendMessage(msg);
System.out.println("Client messageArrived");
}
@Override
public void deliveryComplete(IMqttDeliveryToken arg0) {
System.out.println("Client deliveryComplete");
}
@Override
public void connectionLost(Throwable arg0) {
System.out.println("Client connectionLost");
if (!client.isConnected()) {
try {
client.connect();
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
client.connect(options);
client.subscribe(TOPIC);
}
} |
后台代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| public class PubTest {
private static String userName = "admin";
private static String passWord = "password";
private static String HOST = "tcp://127.0.0.1:61613";
private static MqttClient client;
public static void main(String[] args) {
try {
client = new MqttClient(HOST, "java_client",
new MemoryPersistence());
MqttConnectOptions optiOns= new MqttConnectOptions();
options.setCleanSession(false);
options.setUserName(userName);
options.setPassword(passWord.toCharArray());
MqttTopic topic = client.getTopic("t1");
MqttMessage message = new MqttMessage("m1".getBytes());
message.setQos(1);
message.setRetained(true);
client.connect(options);
MqttDeliveryToken token = topic.publish(message);
while (!token.isComplete()) {
token.waitForCompletion();
}
client.disconnect();
} catch (Exception e) {
e.printStackTrace();
}
}
} |