一个简单的消息路由器v2.0

先回顾下,消息路由器v1.0 的主要特征如下:

  • 每个消息在生成时指定消息类型
  • 不同类型的消息分发到不同的消息队列
  • 不同类型的消息由不同的消息处理器消费
  • 消息消费完成后可以执行回调逻辑

第二版主要新增了一个功能:

  • 支持多优先级消息队列

在某些业务系统中,接收到消息后,为了确保系统宕机后,之前未处理成功的消息也能恢复出来,一般是先把消息写入一个日志队列,成功后,然后再把消息写入业务逻辑队列。

来看下具体的代码改动:

IQueue 接口增加返回优先级方法

1
2
3
4
type IQueue interface {
Send(msg *Message)
GetLevel() int
}

规定:GetLevel() 返回的数值越小,优先级越高。

消息路由器生成新消息时,可以指定该消息路由的顺序:从优先级高的队列链式执行到优先级低的队列,处理完成后执行回调函数 cb。

这里,msgTypes 必须保持增序,比如,[]{1,2,3}, 同优先级的可以并列出现,比如,[]{1,2,2,3}

1
func (s *QueueSwitch) NewMessage(data interface{}, cb MessageCB, msgTypes ...MessageType) (*Message, error)

看下具体的代码细节:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
func (s *QueueSwitch) NewMessage(data interface{}, cb MessageCB, msgTypes ...MessageType) (*Message, error) {
...
var fn func(*Message)
for i := len(msgTypes) - 1; i >= 0; i-- {
if i == len(msgTypes)-1 {
fn = s.createLastFn(msgTypes[i], cb)
} else {
level1 := s.queues[msgTypes[i]].GetLevel()
level2 := s.queues[msgTypes[i+1]].GetLevel()
if level1 > level2 {
return nil, ErrMessageLevel
}
fn = s.createFn(msgTypes[i], msgTypes[i+1], fn, cb)
}
}

var msg Message
msg.Type = msgTypes[0]
msg.Fn = fn
msg.Data = data
return &msg, nil
}

func (s *QueueSwitch) createFn(msgType, nextType MessageType, nextFn func(*Message), cb MessageCB) func(*Message) {
return func(msg *Message) {
//执行自己的handle
msg, err := s.handles[msgType](msg)
if err != nil {
if cb != nil {
cb(msg, err)
}
return
}
//把消息发送给下一个channel
msg.Type = nextType
msg.Fn = nextFn
s.Send(msg)
}
}

func (s *QueueSwitch) createLastFn(msgType MessageType, cb MessageCB) func(*Message) {
return func(msg *Message) {
// 执行自己的 handle
msg, err := s.handles[msgType](msg)
if cb != nil {
// 回调处理结果
cb(msg, err)
}
}
}

下面我们来看一下2.0版本的测试示例:

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package queue

import (
"fmt"
"testing"
)

var (
MessageType_MsgA MessageType = 0
MessageType_MsgB MessageType = 1
)

type TradeServer struct {
sw *QueueSwitch
logq *Queue
matchq *Queue
}

func NewTradeServer() *TradeServer {
s := &TradeServer{}
s.sw = NewQueueSwitch()
s.logq = NewQueue(1)
s.matchq = NewQueue(2)
s.RegistMsgA()
s.RegistMsgB()
return s
}

func (s *TradeServer) RegistMsgA() {
s.sw.SetRoute(MessageType_MsgA, s.logq)
s.sw.SetHandle(MessageType_MsgA, func(msg *Message) (*Message, error) {
fmt.Println("recv log msg:", msg)
return msg, nil
})
}

func (s *TradeServer) RegistMsgB() {
s.sw.SetRoute(MessageType_MsgB, s.matchq)
s.sw.SetHandle(MessageType_MsgB, func(msg *Message) (*Message, error) {
fmt.Println("recv match msg:", msg)
// some logic...
msg.Data = "tx-result"
return msg, nil
})
}

func TestQueueSwitch(t *testing.T) {
ch := make(chan interface{}, 1)
s := NewTradeServer()

var msg *Message
msg, _ = s.sw.NewMessage("tx-data", func(msg *Message, err error) {
fmt.Println("in callback")
ch <- msg.Data
}, MessageType_MsgA, MessageType_MsgB)
s.sw.Send(msg)
fmt.Println(<-ch)
}

测试结果:

1
2
3
4
5
6
recv log msg: &{tx-data 0x10f0e50 0}
recv match msg: &{tx-data 0x10f0f50 1}
in callback
tx-result
PASS
ok github.com/hxzqlh/queue 0.008s

可以看到,该消息会先发送到 A 类型对应的消息队列 logq 中,然后再发送到 B 类型对应的消息队列 matchq 中。

具体细节可参考源码 github.com/hxzqlh/queue:v2.0

彦祖老师 wechat