一个简单的消息路由器v1.0

最近在整理代码,看到不少在设计、实现上有意思的模块,这里总结记录下,将别人的东西消化成自己的,或许以后用得着呢。

这里介绍下一个简单的消息路由器,第一版很简单,主要特征如下:

  • 每个消息在生成时指定消息类型
  • 不同类型的消息分发到不同的消息队列
  • 不同类型的消息由不同的消息处理器消费
  • 消息消费完成后可以执行回调逻辑

使用说明

初始化流程:

  • qs := NewQueueSwitch()生成消息路由器实例
  • 调用 qs.SetRouteqs.SetHandle 对不同类型的消息设置消息路由和消息处理器

消息发送流程:

  • 构造消息:
1
2
3
4
ch := make(chan interface{}, 1) // 消息处理结果会写入该管道
msg, _ = qw.NewMessage("Hello World!", func(msg *Message, err error) {
ch <- msg.Data
}, MessageType_MsgA)
  • 发送消息到路由器:
1
qw.Send(msg)
  • 等待消息处理完成:
1
2
result := <-ch
fmt.Println(result)

下面我们来看一下该消息模型的测试示例:

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package queue

import (
"fmt"
"testing"
)

var (
MessageType_MsgA MessageType = 0
MessageType_MsgB MessageType = 1
)

type TradeServer struct {
sw *QueueSwitch
q *Queue
}

func NewTradeServer() *TradeServer {
s := &TradeServer{}
s.sw = NewQueueSwitch()
s.q = NewQueue()
s.RegistMsgA()
s.RegistMsgB()
return s
}

func (s *TradeServer) RegistMsgA() {
s.sw.SetRoute(MessageType_MsgA, s.q)
s.sw.SetHandle(MessageType_MsgA, func(msg *Message) (*Message, error) {
fmt.Println("A type msg:", msg)
// some logic...
msg.Data = "Hello, MsgA"
return msg, nil
})
}

func (s *TradeServer) RegistMsgB() {
s.sw.SetRoute(MessageType_MsgB, s.q)
s.sw.SetHandle(MessageType_MsgB, func(msg *Message) (*Message, error) {
fmt.Println("B type msg:", msg)
// some logic...
msg.Data = "Hello, MsgB"
return msg, nil
})
}

func TestQueue(t *testing.T) {
ch := make(chan interface{}, 1)
s := NewTradeServer()
var msg *Message

msg, _ = s.sw.NewMessage("test-A", func(msg *Message, err error) {
fmt.Println("in msg-A callback")
ch <- msg.Data
}, MessageType_MsgA)
s.sw.Send(msg)
fmt.Println(<-ch)

msg, _ = s.sw.NewMessage("test-B", func(msg *Message, err error) {
fmt.Println("in msg-B callback")
ch <- msg.Data
}, MessageType_MsgB)
s.sw.Send(msg)
fmt.Println(<-ch)
}

测试结果:

1
2
3
4
5
6
7
8
9
hxzdeMac-mini:~/workspace/golang/src/github.com/hxzqlh/queue (master) $ go test
A type msg: &{test-A 0x10f0bf0 0}
in msg-A callback
Hello, MsgA
B type msg: &{test-B 0x10f0bf0 1}
in msg-B callback
Hello, MsgB
PASS
ok github.com/hxzqlh/queue 0.005s

具体细节可参考源码 github.com/hxzqlh/queue

总结

从代码可以看出,消息路由器主动硬编码它所支持的消息类型和消息处理器,并将消息路由到不同队列,这种模式:

优点:

  • 消息分发、处理的逻辑清晰明朗

缺点:

  • 消息发送者必须知晓消息路由器支持的消息类型,否则无法构造消息,增加了消息发送者的负担。
彦祖老师 wechat