自研 Pulsar Starter:Winfun-Pulsar-Spring-Boot-Starter
里程碑 版本 功能点 作者 完成 1.0.0 支持PulsarTemplate发送消息&支持自定义注解实例化Consumer监听消息 hoinfun ✅ 1.1.0 支持动态开启/关闭Consumer消费线程池、支持自定义配置Consuemr消费线程池参数 hoinfun ✅ 1.2.0 支持Spring容器停止时,释放Pulsar所有相关资源 hoinfun TODO 1.3.0 支持多Pulsar数据源 hoinfun TODO
一、背景
Pulsar 作为新生代云原生消息队列,越来越受到开发者的热爱;而我们现在基本上的项目都是基于 SpringBoot 上开发的,我们可以发现,至今都没有比较大众和成熟的关于 Pulsar 的 Starter,所以我们需要自己整一个,从而避免常规使用 Pulsar API 时产生大量的重复代码。
二、设计思路
由于是第一版的设计,所以我们是从简单开始,不会一开始就设计得很复杂,尽量保留 Pulsar API 原生的功能。
2.1、PulsarClient
我们都知道,不管是 Producer 还是 Consumer,都是由 PulsarClient 创建的。
了,PulsarClient 可以根据业务需要自定义很多参数,第一版的设计只会支持比较常用的参数。
我们这个组件支持下面功能点
支持 PulsarClient 参数配置外部化,参数可配置在 applicatin.properties 中。 支持 applicatin.properties 提供配置提示信息。 读取外部配置文件,根据参数实例化 PulsarClient,并注入到 IOC 容器中。 2.2、Producer
Producer是发送消息的组件。
这里我们提供一个模版类,可以根据需求创建对应的 Producer 实例。 支持将 TopicProducer 关系缓存起来,避免重复创建 Producer 实例。 支持同步/异步发送消息。 2.3、Consumer
Consumer是消费消息的组件。
这里我们提供一个抽象类,开发者只需要集成此实现类并实现 doReceive 方法即可,即消费消息的逻辑方法。 接着还提供一个自定义注解,自定义注解支持自定义 Consmuer 配置,例如Topic、Tenant、Namespace等。 实现类加入上述自定义注解后,组件将会自动识别并且生成对应的 Consumer 实例。 支持同步/线程池异步消费。 三、使用例子 3.1、引入依赖 io.github.hoinfun infun-pulsar-spring-boot-starter 1.1.0 3.2、加入配置 pulsar.service-url=pulsar://127.0.0.1:6650 pulsar.tenant=infun pulsar.namespace=study pulsar.operation-timeout=30 pulsar.io-threads=10 pulsar.listener-threads=10 3.3、发送消息 / 发送消息 @author: infun / @RestController @RequestMapping("msg") public class MessageController { @Autoired private PulsarTemplate pulsarTemplate; @Autoired private PulsarProperties pulsarProperties; / 往指定ic发送消息 @author infun @param ic ic @param msg msg @return {@link String } / @GetMapping("/{ic}/{msg}") public String send(@PathVariable("ic") String ic,@PathVariable("msg") String msg) thros Exception { this.pulsarTemplate.createBuilder().persistent(Boolean.TRUE) .tenant(this.pulsarProperties.getTenant()) .namespace(this.pulsarProperties.getNamespace()) .ic(ic) .send(msg); return "suess"; } } 3.4、消费消息 / @author: infun @date: 2021/8/20 8:13 下午 / @Slf4j @PulsarListener(ics = {"test-ic2"}, threadPool = @ThreadPool( coreThreads = 2, maxCoreThreads = 3, threadPoolName = "test-thread-pool")) public class ConsumerListener extends BaseMessageListener { / 消费消息 @param consumer 消费者 @param msg 消息 / @Override protected void doReceived(Consumer consumer, Message msg) { log.info("成功消费消息{}",msg.getValue()); try { consumer.acknoledge(msg); } catch (PulsarClientException e) { e.printStackTrace(); } } / 是否开启异步消费 @return {@link Boolean } / @Override public Boolean enableAsync() { return Boolean.TRUE; } } 四、源码
源码就不放在这里分析了,大家可到Github上看看,如果有什么代码上面的建议或意见,欢迎大家提MR。
人工智能培训
- 真正能和人交流的机器人什么时候实现
- 国产机器人成功完成首例远程冠脉介入手术
- 人工智能与第四次工业革命
- 未来30年的AI和物联网
- 新三板创新层公司东方水利新增专利授权:“一
- 发展人工智能是让人和机器更好地合作
- 新春贺喜! 经开区持续推进工业互联网平台建设
- 以工业机器人为桥 传统企业如何趟过智造这条河
- 山立滤芯SAGL-1HH SAGL-2HH
- 2015国际智能星创师大赛火热报名中!
- 未来机器人会咋看人类?递归神经网络之父-像蚂
- 成都新川人工智能创新中心二期主体结构封顶
- 斯坦德机器人完成数亿元人民币C轮融资,小米产
- 到2020年,智能手机将拥有十项AI功能,有些可能
- 寻找AI机器人的增长“跳板”:老龄化为支点的产
- 力升高科耐高温消防机器人参加某支队性能测试