序
MQTT(Message Queuing Telemetry Transport)是基于二进制消息的发布/订阅编程模式的消息协议,非常适合需要低功耗和网络带宽有限的IoT场景。这里简单介绍一下如何在springboot中集成。
maven
org.springframework.boot spring-boot-starter-integration org.springframework.integration spring-integration-stream org.springframework.integration spring-integration-mqtt
配置client factory
@Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setServerURIs("tcp://demo:1883");// factory.setUserName("guest");// factory.setPassword("guest"); return factory; }
配置consumer
@Bean public IntegrationFlow mqttInFlow() { return IntegrationFlows.from(mqttInbound()) .transform(p -> p + ", received from MQTT") .handle(logger()) .get(); } private LoggingHandler logger() { LoggingHandler loggingHandler = new LoggingHandler("INFO"); loggingHandler.setLoggerName("siSample"); return loggingHandler; } @Bean public MessageProducerSupport mqttInbound() { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("siSampleConsumer", mqttClientFactory(), "siSampleTopic"); adapter.setCompletionTimeout(5000); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(1); return adapter; }
配置producer
@Bean public IntegrationFlow mqttOutFlow() { //console input// return IntegrationFlows.from(CharacterStreamReadingMessageSource.stdin(),// e -> e.poller(Pollers.fixedDelay(1000)))// .transform(p -> p + " sent to MQTT")// .handle(mqttOutbound())// .get(); return IntegrationFlows.from(outChannel()) .handle(mqttOutbound()) .get(); } @Bean public MessageChannel outChannel() { return new DirectChannel(); } @Bean public MessageHandler mqttOutbound() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("siSamplePublisher", mqttClientFactory()); messageHandler.setAsync(true); messageHandler.setDefaultTopic("siSampleTopic"); return messageHandler; }
配置MessagingGateway
@MessagingGateway(defaultRequestChannel = "outChannel")public interface MsgWriter { void write(String note);}
这样就大功告成了