咸鱼

咸鱼是以盐腌渍后,晒干的鱼

0%

MQTT-Client库-org-fusesource-mqtt-client

由于不太想用Eclipse的库,寻求其他库来试试。

org.fusesource.mqtt-client 也是MQTT推荐 client 端的库,github的start也有500+吧,fork也有200+

引用库

1
compile 'org.fusesource.mqtt-client:mqtt-client:1.12'

作者给出了三种使用方法:1.阻塞;2.非阻塞(推荐);3.Future(没理解?)

Github上的 非阻塞 示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
final CallbackConnection connection = mqtt.callbackConnection();
connection.listener(new Listener() {

public void onDisconnected() {
}
public void onConnected() {
}

public void onPublish(UTF8Buffer topic, Buffer payload, Runnable ack) {
// You can now process a received message from a topic.
// Once process execute the ack runnable.
ack.run();
}
public void onFailure(Throwable value) {
connection.close(null); // a connection failure occured.
}
})
connection.connect(new Callback<Void>() {
public void onFailure(Throwable value) {
result.failure(value); // If we could not connect to the server.
}

// Once we connect..
public void onSuccess(Void v) {

// Subscribe to a topic
Topic[] topics = {new Topic("foo", QoS.AT_LEAST_ONCE)};
connection.subscribe(topics, new Callback<byte[]>() {
public void onSuccess(byte[] qoses) {
// The result of the subcribe request.
}
public void onFailure(Throwable value) {
connection.close(null); // subscribe failed.
}
});

// Send a message to a topic
connection.publish("foo", "Hello".getBytes(), QoS.AT_LEAST_ONCE, false, new Callback<Void>() {
public void onSuccess(Void v) {
// the pubish operation completed successfully.
}
public void onFailure(Throwable value) {
connection.close(null); // publish failed.
}
});

// To disconnect..
connection.disconnect(new Callback<Void>() {
public void onSuccess(Void v) {
// called once the connection is disconnected.
}
public void onFailure(Throwable value) {
// Disconnects never fail.
}
});
}
});

上面代码片段在Android和纯Java程序都可以执行,需要注意的是,纯Java要用一个阻塞事件(如输入)来阻止main函数结束运行。

Android:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
import android.os.Bundle;
import android.os.Handler;
import android.os.Message;
import android.support.annotation.Nullable;
import android.util.Log;
import android.view.LayoutInflater;
import android.view.View;
import android.view.ViewGroup;
import android.widget.TextView;

import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.UTF8Buffer;
import org.fusesource.mqtt.client.Callback;
import org.fusesource.mqtt.client.CallbackConnection;
import org.fusesource.mqtt.client.Listener;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;

import butterknife.BindString;
import butterknife.BindView;
import butterknife.ButterKnife;
import butterknife.Unbinder;
import cn.content.DefaultValue;


public class FragmentMqtt extends FragmentBase {

private final String TAG = getClass().getName();
@BindView(R.id.tv_hello)
TextView tvHello;
private Unbinder unbinder;

private ActContainer context;
@BindString(R.string.title_name_mqtt)
String title;

private String devTopic;
CallbackConnection connection;
@Override
public void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
}

@Override
public View onCreateView(LayoutInflater inflater, @Nullable ViewGroup container,
@Nullable Bundle savedInstanceState) {
View rootView = inflater.inflate(R.layout.fragment_mqtt, null);
unbinder = ButterKnife.bind(this, rootView);
return rootView;
}

@Override
public void onViewCreated(View rootView, @Nullable Bundle savedInstanceState) {
super.onViewCreated(rootView, savedInstanceState);

context = (ActContainer) getActivity();
context.setTitleName(title);

devTopic = "hello";
try {
MQTT mqtt = new MQTT();
mqtt.setHost("192.168.1.150", 1883);
connection = mqtt.callbackConnection();
connection.listener(new Listener() {

public void onDisconnected() {
Log.i(TAG, "onDisconnected.");
sendMessageUI(0,"onDisconnected");
}

public void onConnected() {
Log.i(TAG, "onConnected.");
sendMessageUI(0,"onConnected");
}

public void onPublish(UTF8Buffer topic, Buffer payload, Runnable ack) {
// You can now process a received message from a topic.
// Once process execute the ack runnable.
ack.run();

Log.i(TAG, "onPublish: " + payload.toString());
//不能操作UI
//tvHello.setText(payload.toString());
sendMessageUI(1,payload.toString());
}

public void onFailure(Throwable value) {
//connection.close(null); // a connection failure occured.
connection.disconnect(null);
}
});
connection.connect(new Callback<Void>() {
public void onFailure(Throwable value) {
//result.failure(value); // If we could not connect to the server.
Log.e(TAG, "connect failure.");
}

// Once we connect..
public void onSuccess(Void v) {

// Subscribe to a topic
Topic[] topics = {new Topic(devTopic, QoS.AT_LEAST_ONCE)};
connection.subscribe(topics, new Callback<byte[]>() {
public void onSuccess(byte[] qoses) {
// The result of the subcribe request.
Log.i(TAG, "subscribe onSuccess.topic:" + devTopic);
//不能操作UI
//tvHello.setText("subscribe onSuccess.");
sendMessageUI(0,"subscribe onSuccess.");
}

public void onFailure(Throwable value) {
//connection.close(null); // subscribe failed.
Log.e(TAG, "subscribe failed.");
sendMessageUI(0,"subscribe failed.");
}
});
}
});
} catch (Exception e) {
e.printStackTrace();
}
}

public void sendMessageUI(int what, String obj) {

Message message = Message.obtain();
message.what = what;
message.obj = obj;
handler.sendMessage(message);
}

private Handler handler = new Handler() {
@Override
public void handleMessage(Message msg) {
if(tvHello!=null){
switch (msg.what) {
case 1:
//数据
tvHello.setText(msg.obj.toString());
break;
default:
//事件
tvHello.setText(msg.obj.toString());
break;
}
}
super.handleMessage(msg);
}
};


@Override
public void onDestroyView() {
super.onDestroyView();
unbinder.unbind();
connection.disconnect(null);
}

}

Java:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100

package com.demo;

import java.util.Scanner;

import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.UTF8Buffer;
import org.fusesource.mqtt.client.Callback;
import org.fusesource.mqtt.client.CallbackConnection;
import org.fusesource.mqtt.client.Listener;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import org.fusesource.mqtt.client.Tracer;
import org.fusesource.mqtt.codec.MQTTFrame;

public class Test {

private static final String Topic = "hello";

public static void main(String[] args) {
try{
MQTT mqtt = new MQTT();
mqtt.setHost("192.168.1.150", 1883);

mqtt.setTracer(new Tracer(){
@Override
public void onReceive(MQTTFrame frame) {
System.out.println("recv: "+frame);
}

@Override
public void onSend(MQTTFrame frame) {
System.out.println("send: "+frame);
}

@Override
public void debug(String message, Object... args) {
System.out.println(String.format("debug: "+message, args));
}
});

final CallbackConnection connection = mqtt.callbackConnection();
connection.listener(new Listener() {

public void onDisconnected() {
System.out.println( "onDisconnected.");
}
public void onConnected() {
System.out.println("onConnected.");
}
public void onPublish(UTF8Buffer topic, Buffer payload, Runnable ack) {
// You can now process a received message from a topic.
// Once process execute the ack runnable.
ack.run();
System.out.println( "onPublish: " + payload.toString());
}
public void onFailure(Throwable value) {
//connection.close(null); // a connection failure occured.
connection.disconnect(null);
}
});
System.out.println("callback...");

connection.connect(new Callback<Void>() {
public void onFailure(Throwable value) {
//result.failure(value); // If we could not connect to the server.
System.out.println( "connect failure.");
}
// Once we connect..
public void onSuccess(Void v) {

System.out.println( "connect onSuccess.");
// Subscribe to a topic
Topic[] topics = {new Topic(Topic, QoS.AT_LEAST_ONCE)};
connection.subscribe(topics, new Callback<byte[]>() {
public void onSuccess(byte[] qoses) {
// The result of the subcribe request.
System.out.println( "subscribe onSuccess.");
connection.publish(Topic, "Hello".getBytes(), QoS.AT_LEAST_ONCE, false, null);

}
public void onFailure(Throwable value) {
//connection.close(null); // subscribe failed.
System.out.println( "subscribe failed.");
}
});
}
});


}catch (Exception e){
e.printStackTrace();
}
//输入阻塞
Scanner sc = new Scanner(System.in);
sc.nextLine();
}
}

各种参数

这里有一篇文章写的很详细,我就懒的写了 传送门

在Linux下无法连接

写了个web程序在Linux下部署发现一个问题,无法连接上服务器,没有任何报错的信息。(在Windows下是正常连接的
网上找一下,原来这个库会获取Host,失败就卡住了,我的Linux机器修改过HostName,所以就出问题了。其实这个问题在spring-boot-starter-data-redis也存在,但是spring-boot-starter-data-redis有报错信息。
解决办法也比较简单,把当前的HostName加入到/etc/hosts文件就可以了

1
2
3
4
5
6
root@bogon:~# hostname
bogon
root@bogon:~# vim /etc/hosts

#加入
127.0.1.1 bogon