先回顾下,消息路由器v1.0 的主要特征如下:
- 每个消息在生成时指定消息类型
- 不同类型的消息分发到不同的消息队列
- 不同类型的消息由不同的消息处理器消费
- 消息消费完成后可以执行回调逻辑
第二版主要新增了一个功能:
- 支持多优先级消息队列
在某些业务系统中,接收到消息后,为了确保系统宕机后,之前未处理成功的消息也能恢复出来,一般是先把消息写入一个日志队列,成功后,然后再把消息写入业务逻辑队列。
来看下具体的代码改动:
IQueue
接口增加返回优先级方法
1 | type IQueue interface { |
规定:GetLevel()
返回的数值越小,优先级越高。
消息路由器生成新消息时,可以指定该消息路由的顺序:从优先级高的队列链式执行到优先级低的队列,处理完成后执行回调函数 cb。
这里,msgTypes
必须保持增序,比如,[]{1,2,3}
, 同优先级的可以并列出现,比如,[]{1,2,2,3}
1 | func (s *QueueSwitch) NewMessage(data interface{}, cb MessageCB, msgTypes ...MessageType) (*Message, error) |
看下具体的代码细节:
1 | func (s *QueueSwitch) NewMessage(data interface{}, cb MessageCB, msgTypes ...MessageType) (*Message, error) { |
下面我们来看一下2.0版本的测试示例:
测试
1 | package queue |
测试结果:
1 | recv log msg: &{tx-data 0x10f0e50 0} |
可以看到,该消息会先发送到 A 类型对应的消息队列 logq 中,然后再发送到 B 类型对应的消息队列 matchq 中。
具体细节可参考源码 github.com/hxzqlh/queue:v2.0