一道腾讯云平台开发的 Golang 编程题

前两天面试腾讯云平台开发的时候,面到最后,终于到了人皆吐槽的「手撕代码」环节。

小哥扔给我一道 Golang 编程题:

  1. 有一系列任务需要处理,最多 N 个并发;
  2. 只要有任务处理遇到错误,主程序就立即返回,输出对应错误信息;
  3. 等待所有任务执行成功。

代码框架如下:

1
2
3
4
5
6
7
8
9
10
11
const N = 5
var APIList = []string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"}

func main() {
// impl your job scheduler here
}

func callAPI(a string) error {
// handler
return nil
}

分析

这道题目综合考察了面试者对于 Golang 中协程并发调度、上下文通信、错误处理的编码熟练程度,还考察面试者是否掌握一些常见的 并发控制 套路,也称模式。

理解题目要求后,简单做下分析:

  1. 并发度:最多 N 个并发,不能有多少个任务就无脑地启动多少个协程。
  2. 只要有任务处理遇到错误,主程序结束,输出对应错误信息。这里我们很自然地会想到用 context 来控制父子协程。
  3. 等待所有任务执行成功:这里也涉及到协程之间的调度,主程序可以用 WaitGroup 也可以用 channel 等待子协程返回任务的处理结果。

参考实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package main

import (
"context"
"fmt"
"runtime"
"time"
)

const N = 5
var APIList = []string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"}

type JobRes struct {
job string
err error
}

func main() {
workerCh := make(chan struct{}, N)
for i := 0; i < N; i++ {
workerCh <- struct{}{}
}

respCh := make(chan JobRes, len(APIList))
waitCh := make(chan string)

ctx, cancel := context.WithCancel(context.Background())
go func() {
var num int
for {
select {
case <-ctx.Done():
fmt.Printf("resp goroutine canceled by main, reason:%s\n", ctx.Err())
return
case res := <-respCh:
num++
fmt.Printf("job:%v result:%v\n", res.job, res.err)
if num == len(APIList) || res.err != nil{
cancel()
if res.err != nil {
waitCh <- res.err.Error()
} else {
waitCh <- "success"
}
}
}
}
}()

for _, job := range APIList {
// wait worker available
<-workerCh
go dealJob(ctx, workerCh, respCh, job)
}

fmt.Println("wait all jobs finished or once error occurs...")
fmt.Println(<-waitCh)
}

func dealJob(ctx context.Context, workerCh chan struct{}, respCh chan JobRes, job string) {
// fmt.Println("goroutines:", runtime.NumGoroutine())

for {
select {
case <-ctx.Done():
fmt.Printf("job:%s canceled by main, reason:%s\n", job, ctx.Err())
return
default:
err := callApi(job)
respCh <- JobRes{job: job, err: err}

// reuse worker
workerCh <- struct{}{}
return
}
}
}

func callApi(a string) error {
// handler
time.Sleep(100 * time.Millisecond)
// imutate job failure
// if a == "e" {
// return errors.New("ops!")
// }
return nil
}

说明:

  1. workerCh 控制并发度:

    • 初始化带 N 个缓冲的工作池 workerCh
    • workerCh 拿一个(若阻塞,说明所有 worker 都启动了,已达到并发上限 N),启动协程去处理任务。
    • 任务完成后,重新唤醒 workerCh,实现重用。
  2. 使用 contextWithCancel 来实现主程序控制子协程

  3. go-for-select 经典的多路监听:任务处理结果输出到 respCh,失败则马上 cancel,取消所有子协程

  4. 主程序等待所有任务执行完成或遭遇错误: <-waitCh

成功结果

1
2
3
4
5
6
7
8
9
10
11
12
13
job:d result:<nil>
job:b result:<nil>
job:c result:<nil>
job:a result:<nil>
wait all jobs finished or once error occurs...
job:e result:<nil>
job:i result:<nil>
job:f result:<nil>
job:g result:<nil>
job:j result:<nil>
job:h result:<nil>
success
resp goroutine canceled by main, reason:context canceled

失败结果

1
2
3
4
5
6
7
8
wait all jobs finished or once error occurs...
job:d result:<nil>
job:c result:<nil>
job:a result:<nil>
job:e result:ops!
job:b result:<nil>
resp goroutine canceled by main, reason:context canceled
ops!

优化

上面的实现,虽然控制了程序中最多有 N 个并发,但它是每次都启动一个协程,只不过我们控制了不超过 N 个,任务处理完成后,这个协程就自动销毁了。

但频繁地创建/销毁协程也是一笔开销,可以考虑用 协程池:初始化 N 个协程,放入协程池,每个协程去拉取消费任务列表里面的任务。

扩展

  1. 题目中的任务列表是固定的,如果任务列表也在动态变化,该如何实现?如果让面试者也实现任务的生产过程呢?
  2. 题目中并没有对任务处理进行超时控制,如果每个任务限制最长处理时间1s,该如何实现?并保证所有任务都能得到处理?
彦祖老师 wechat