转载自:https://zhuanlan.zhihu.com/p/489740094
使用
spring-boot-starter-integration
方式实现的 mqtt,报错详情
logDispatcher has no subscribers for channel 'mqttOutboundChannel'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage []
报错代码
/**
* @author aqin1012 AQin.
* @date 2022/2/23 9:16 AM
* @Version 1.0
*/
@Configuration
@IntegrationComponentScan
public class MqttSenderConfig {
@Autowired
private MqttProperties prop;
@Autowired
MqttEncryptHelper mqttEncryptHelper;
@Bean
public MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setServerURIs(new String[]{prop.getHostUrl()});
mqttConnectOptions.setUserName(prop.getUsername());
mqttConnectOptions.setPassword(prop.getPassword().toCharArray());
// 客户端断线时暂时不清除,直到超时注销
mqttConnectOptions.setCleanSession(false);
mqttConnectOptions.setKeepAliveInterval(prop.getKeepAliveInterval());
mqttConnectOptions.setAutomaticReconnect(true);
factory.setConnectionOptions(mqttConnectOptions);
return factory;
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(prop.getClientId() + "-pub-" + Instant.now().toEpochMilli(), mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultQos(0);
messageHandler.setDefaultTopic(prop.getSenderDispatchTopic());
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel() {return new DirectChannel();
}
}
报错分析
由报错信息可以看出来是由于频道没有订阅者,由于我的 MARKDOWN_HASH17fdb716b8346c721481335713824d0aMARKDOWNHASH
()
和 mqttOutbound()
是绑定的,_于是我就试着把给频道手动添加订阅者:
mqttOutboundChannel_()_
做如下修改:
@Bean
public MessageChannel mqttOutboundChannel() {DirectChannel dc = new DirectChannel();
dc.subscribe(mqttOutbound());
return dc;
}
再次运行服务
搞定 ~ 撒花!!!
很多博文给出的解决方案是添加注解 @EnableIntegration
但是我这边并未起作用(有知道原因的大佬麻烦不吝指导 ~~ 谢谢呢 )