先看我們最后實(shí)現(xiàn)的一個(gè)效果
1.手機(jī)端向主題 topic111 發(fā)送消息,并接收。(手機(jī)測試工具名稱:MQTT調(diào)試器)
2.控制臺(tái)打印
MQTT基本簡介
MQTT 是用于物聯(lián)網(wǎng) (IoT) 的 OASIS 標(biāo)準(zhǔn)消息傳遞協(xié)議。它被設(shè)計(jì)為一種極其輕量級(jí)的發(fā)布/訂閱消息傳輸,非常適合連接具有小代碼足跡和最小網(wǎng)絡(luò)帶寬的遠(yuǎn)程設(shè)備。
MQTT協(xié)議簡介
MQTT 是客戶端服務(wù)器發(fā)布/訂閱消息傳輸協(xié)議。它重量輕、開放、簡單,并且易于實(shí)施。這些特性使其非常適合在許多情況下使用,包括受限制的環(huán)境,例如機(jī)器對(duì)機(jī)器 (M2M) 和物聯(lián)網(wǎng) (IoT) 環(huán)境中的通信,其中需要小代碼足跡和/或網(wǎng)絡(luò)帶寬非常寶貴。
該協(xié)議通過 TCP/IP 或其他提供有序、無損、雙向連接的網(wǎng)絡(luò)協(xié)議運(yùn)行。其特點(diǎn)包括:
· 使用發(fā)布/訂閱消息模式,提供一對(duì)多的消息分發(fā)和應(yīng)用程序的解耦。
· 與有效負(fù)載內(nèi)容無關(guān)的消息傳輸。
· 消息傳遞的三種服務(wù)質(zhì)量:
o “最多一次”,根據(jù)操作環(huán)境的最大努力傳遞消息??赡軙?huì)發(fā)生消息丟失。例如,此級(jí)別可用于環(huán)境傳感器數(shù)據(jù),其中單個(gè)讀數(shù)是否丟失并不重要,因?yàn)橄乱粋€(gè)讀數(shù)將很快發(fā)布。
o “至少一次”,保證消息到達(dá)但可能出現(xiàn)重復(fù)。
o “Exactly once”,保證消息只到達(dá)一次。例如,此級(jí)別可用于重復(fù)或丟失消息可能導(dǎo)致應(yīng)用不正確費(fèi)用的計(jì)費(fèi)系統(tǒng)。
· 最小化傳輸開銷和協(xié)議交換以減少網(wǎng)絡(luò)流量。
· 發(fā)生異常斷開時(shí)通知相關(guān)方的機(jī)制。
EMQX簡介
通過開放標(biāo)準(zhǔn)物聯(lián)網(wǎng)協(xié)議 MQTT、CoAP 和 LwM2M 連接任何設(shè)備。使用 EMQX Enterprise 集群輕松擴(kuò)展到數(shù)千萬并發(fā) MQTT 連接。
并且EMQX還是開源的,又支持集群,所以還是一個(gè)比較不錯(cuò)的選擇
EMQX集群搭建
前期準(zhǔn)備:
1.兩臺(tái)服務(wù)器:我的兩個(gè)服務(wù)器一臺(tái)是騰訊云、一臺(tái)是阿里云的(不要問為什么,薅羊毛得來的)咱們暫且叫他們 mqtt_service_aliyun和
mqtt_service_txyun 吧。
2.一個(gè)域名: mqtt.zhouhong.icu
安裝開始
1.分別在兩臺(tái)服務(wù)器上執(zhí)行以下操作進(jìn)行安裝(如果是單機(jī):只需要進(jìn)行下面1、2操作就安裝完成了)
## 1.下載wget https://www.emqx.com/zh/downloads/broker/4.4.4/emqx-4.4.4-otp24.1.5-3-el8-amd64.rpm## 2.安裝sudo yum install emqx-4.4.4-otp24.1.5-3-el8-amd64.rpm## 3.修改配置文件vim /etc/emqx/emqx.conf## 4.修改以下內(nèi)容## 注意node.name是當(dāng)前這臺(tái)服務(wù)器名稱node.name = [email protected] = [email protected],[email protected] = staticcluster.name = my-mqtt-cluster
2.分別啟動(dòng)兩臺(tái)服務(wù)器的EMQX
sudo emqx start
3.到瀏覽器輸入 http://xxx.xx.xxx.xxx:18083/ 查看(隨便一臺(tái)都可以,默認(rèn)賬號(hào)admin 密碼public),注意打開18083,1883 安全組
4.nginx負(fù)載均衡
nginx搭建很簡單略過,大家只需要修改以下nginx.conf里面的內(nèi)容即可
stream { upstream mqtt.zhouhong.icu { zone tcp_servers 64k; hash $remote_addr; server xxx.xx.xxx.xx:1883 weight=1 max_fails=3 fail_timeout=30s; server xxx.xx.xxx.xx:1883 weight=1 max_fails=3 fail_timeout=30s; } server { listen 8883 ssl; status_zone tcp_server; proxy_pass mqtt.zhouhong.icu; proxy_buffer_size 4k; ssl_handshake_timeout 15s; ssl_certificate /etc/nginx/7967358_www.mqtt.zhouhong.icu.pem; ssl_certificate_key /etc/nginx/7967358_www.mqtt.zhouhong.icu.key; }}
與SpringBoot集成并實(shí)現(xiàn)服務(wù)器端監(jiān)控對(duì)應(yīng)topic下的消息
1.項(xiàng)目搭建
- 引入MQTT相關(guān)jar包
org.springframework.integration spring-integration-stream org.springframework.integration spring-integration-mqtt
- yml配置文件 (如果大家沒搭建好的話,可以直接使用我搭建的這個(gè))
server: port: 8080mqtt: ## 單機(jī)版–只需要把域名改為ip既可 hostUrl: tcp://mqtt.zhouhong.icu:1883 username: admin password: public ## 服務(wù)端 clientId (發(fā)送端自己定義) clientId: service_client_id cleanSession: true reconnect: true timeout: 100 keepAlive: 100 defaultTopic: topic111 qos: 0
- 屬性配置
/** * description: * date: 2022/6/16 15:51 * @author: zhouhong */@Component@ConfigurationProperties(“mqtt”)@Datapublic class MqttProperties { /** * 用戶名 */ private String username; /** * 密碼 */ private String password; /** * 連接地址 */ private String hostUrl; /** * 客戶端Id,同一臺(tái)服務(wù)器下,不允許出現(xiàn)重復(fù)的客戶端id */ private String clientId; /** * 默認(rèn)連接主題 */ private String topic; /** * 超時(shí)時(shí)間 */ private int timeout; /** * 設(shè)置會(huì)話心跳時(shí)間 單位為秒 服務(wù)器會(huì)每隔1.5*20秒的時(shí)間向客戶端 * 發(fā)送個(gè)消息判斷客戶端是否在線,但這個(gè)方法并沒有重連的機(jī)制 */ private int keepAlive; /** * 設(shè)置是否清空session,這里如果設(shè)置為false表示服務(wù)器會(huì)保留客戶端的連 * 接記錄,這里設(shè)置為true表示每次連接到服務(wù)器都以新的身份連接 */ private Boolean cleanSession; /** * 是否斷線重連 */ private Boolean reconnect; /** * 連接方式 */ private Integer qos;}
- 發(fā)送消息回調(diào)
/** * description: 發(fā)生消息成功后 的 回調(diào) * date: 2022/6/16 15:55 * * @author: zhouhong */@Component@Log4j2public class MqttSendCallBack implements MqttCallbackExtended { /** * 客戶端斷開后觸發(fā) * @param throwable */ @Override public void connectionLost(Throwable throwable) { log.info(“發(fā)送消息回調(diào): 連接斷開,可以做重連”); } /** * 客戶端收到消息觸發(fā) * * @param topic 主題 * @param mqttMessage 消息 */ @Override public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { log.info(“發(fā)送消息回調(diào): 接收消息主題 : ” + topic); log.info(“發(fā)送消息回調(diào): 接收消息內(nèi)容 : ” + new String(mqttMessage.getPayload())); } /** * 發(fā)布消息成功 * * @param token token */ @Override public void deliveryComplete(IMqttDeliveryToken token) { String[] topics = token.getTopics(); for (String topic : topics) { log.info(“發(fā)送消息回調(diào): 向主題:” + topic + “發(fā)送消息成功!”); } try { MqttMessage message = token.getMessage(); byte[] payload = message.getPayload(); String s = new String(payload, “UTF-8”); log.info(“發(fā)送消息回調(diào): 消息的內(nèi)容是:” + s); } catch (MqttException e) { e.printStackTrace(); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } /** * 連接emq服務(wù)器后觸發(fā) * * @param b * @param s */ @Override public void connectComplete(boolean b, String s) { log.info(“——————–ClientId:” + MqttAcceptClient.client.getClientId() + “客戶端連接成功!——————–“); }}
- 接收消息回調(diào)
/** * description: 接收消息后的回調(diào) * date: 2022/6/16 15:52 * * @author: zhouhong */@Component@Log4j2public class MqttAcceptCallback implements MqttCallbackExtended { @Resource private MqttAcceptClient mqttAcceptClient; /** * 客戶端斷開后觸發(fā) * * @param throwable */ @Override public void connectionLost(Throwable throwable) { log.info(“接收消息回調(diào): 連接斷開,可以做重連”); if (MqttAcceptClient.client == null || !MqttAcceptClient.client.isConnected()) { log.info(“接收消息回調(diào): emqx重新連接…………………………………………….”); mqttAcceptClient.reconnection(); } } /** * 客戶端收到消息觸發(fā) * * @param topic 主題 * @param mqttMessage 消息 */ @Override public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { log.info(“接收消息回調(diào): 接收消息主題 : ” + topic); log.info(“接收消息回調(diào): 接收消息內(nèi)容 : ” + new String(mqttMessage.getPayload())); } /** * 發(fā)布消息成功 * * @param token token */ @Override public void deliveryComplete(IMqttDeliveryToken token) { String[] topics = token.getTopics(); for (String topic : topics) { log.info(“接收消息回調(diào): 向主題:” + topic + “發(fā)送消息成功!”); } try { MqttMessage message = token.getMessage(); byte[] payload = message.getPayload(); String s = new String(payload, “UTF-8”); log.info(“接收消息回調(diào): 消息的內(nèi)容是:” + s); } catch (MqttException e) { e.printStackTrace(); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } /** * 連接emq服務(wù)器后觸發(fā) * * @param b * @param s */ @Override public void connectComplete(boolean b, String s) { log.info(“——————–ClientId:” + MqttAcceptClient.client.getClientId() + “客戶端連接成功!——————–“); // 以/#結(jié)尾表示訂閱所有以test開頭的主題 // 訂閱所有機(jī)構(gòu)主題 mqttAcceptClient.subscribe(“topic111”, 0); }}
- 發(fā)消息
/** * description: 發(fā)送消息 * date: 2022/6/16 16:01 * * @author: zhouhong */@Componentpublic class MqttSendClient { @Autowired private MqttSendCallBack mqttSendCallBack; @Autowired private MqttProperties mqttProperties; public MqttClient connect() { MqttClient client = null; try { String uuid = UUID.randomUUID().toString().replaceAll(“-“,””); client = new MqttClient(mqttProperties.getHostUrl(),uuid , new MemoryPersistence()); MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(mqttProperties.getUsername()); options.setPassword(mqttProperties.getPassword().toCharArray()); options.setConnectionTimeout(mqttProperties.getTimeout()); options.setKeepAliveInterval(mqttProperties.getKeepAlive()); options.setCleanSession(true); options.setAutomaticReconnect(false); try { // 設(shè)置回調(diào) client.setCallback(mqttSendCallBack); client.connect(options); } catch (Exception e) { e.printStackTrace(); } } catch (Exception e) { e.printStackTrace(); } return client; } /** * 發(fā)布消息 * 主題格式: server:report:$orgCode(參數(shù)實(shí)際使用機(jī)構(gòu)代碼) * * @param retained 是否保留 * @param pushMessage 消息體 */ public void publish(boolean retained, String topic, String pushMessage) { MqttMessage message = new MqttMessage(); message.setQos(mqttProperties.getQos()); message.setRetained(retained); message.setPayload(pushMessage.getBytes()); MqttClient mqttClient = connect(); try { mqttClient.publish(topic, message); } catch (MqttException e) { e.printStackTrace(); } finally { disconnect(mqttClient); close(mqttClient); } } /** * 關(guān)閉連接 * * @param mqttClient */ public static void disconnect(MqttClient mqttClient) { try { if (mqttClient != null) { mqttClient.disconnect(); } } catch (MqttException e) { e.printStackTrace(); } } /** * 釋放資源 * * @param mqttClient */ public static void close(MqttClient mqttClient) { try { if (mqttClient != null) { mqttClient.close(); } } catch (MqttException e) { e.printStackTrace(); } }}
- 接收消息
/** * description: 服務(wù)器段端連接訂閱消息、監(jiān)控topic * date: 2022/6/16 15:52 * * @author: zhouhong */@Component@Log4j2public class MqttAcceptClient { @Autowired @Lazy private MqttAcceptCallback mqttAcceptCallback; @Autowired private MqttProperties mqttProperties; public static MqttClient client; private static MqttClient getClient() { return client; } private static void setClient(MqttClient client) { MqttAcceptClient.client = client; } /** * 客戶端連接 */ public void connect() { MqttClient client; try { // clientId 使用服務(wù)器 yml里面配置的 clientId client = new MqttClient(mqttProperties.getHostUrl(), mqttProperties.getClientId(), new MemoryPersistence()); MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(mqttProperties.getUsername()); options.setPassword(mqttProperties.getPassword().toCharArray()); options.setConnectionTimeout(mqttProperties.getTimeout()); options.setKeepAliveInterval(mqttProperties.getKeepAlive()); options.setAutomaticReconnect(mqttProperties.getReconnect()); options.setCleanSession(mqttProperties.getCleanSession()); MqttAcceptClient.setClient(client); try { // 設(shè)置回調(diào) client.setCallback(mqttAcceptCallback); client.connect(options); } catch (Exception e) { e.printStackTrace(); } } catch (Exception e) { e.printStackTrace(); } } /** * 重新連接 */ public void reconnection() { try { client.connect(); } catch (MqttException e) { e.printStackTrace(); } } /** * 訂閱某個(gè)主題 * * @param topic 主題 * @param qos 連接方式 */ public void subscribe(String topic, int qos) { log.info(“==============開始訂閱主題==============” + topic); try { client.subscribe(topic, qos); } catch (MqttException e) { e.printStackTrace(); } } /** * 取消訂閱某個(gè)主題 * * @param topic */ public void unsubscribe(String topic) { log.info(“==============開始取消訂閱主題==============” + topic); try { client.unsubscribe(topic); } catch (MqttException e) { e.printStackTrace(); } }}
- 服務(wù)端啟動(dòng)時(shí)連接訂閱主題并監(jiān)控
/** * description: 啟動(dòng)后連接 MQTT 服務(wù)器, 監(jiān)聽 mqtt/my_topic 這個(gè)topic發(fā)送的消息 * date: 2022/6/16 15:57 * @author: zhouhong */@Configurationpublic class MqttConfig { @Resource private MqttAcceptClient mqttAcceptClient; @Bean public MqttAcceptClient getMqttPushClient() { mqttAcceptClient.connect(); return mqttAcceptClient; }}
- 發(fā)消息控制類
/** * description: 發(fā)消息控制類 * date: 2022/6/16 15:58 * * @author: zhouhong */@RestControllerpublic class SendController { @Resource private MqttSendClient mqttSendClient; @PostMapping(“/mqtt/sendmessage”) public void sendMessage(@RequestBody SendParam sendParam) { mqttSendClient.publish(false,sendParam.getTopic(),sendParam.getMessageContent()); }}
2.測試
- postman調(diào)用發(fā)消息接口
- 控制臺(tái)日志
- 使用另外一個(gè)移動(dòng)端MQTT調(diào)試工具測試
2. 控制臺(tái)打印