关于一道面试题的分析:Batching HTTP 请求
最近在找工作,其中一家公司提出这道题目。
背景:FacesDetact() 只能串行调用,现在希望为程序添加批量处理功能,但不允许设置 delay 时间窗口,来一个请求,处理一个请求
分析:请求将会乱序到达,如果不设置数量或时间方面的窗口,其实程序需要处理的目标是积压态的网络请求(协程)
思路:如果需要对分散且独立到达的事件(参RxTS)做批量处理,其中必然需要有一个收束的阶段。如果能意识到这种现象的存在,最好是能意识到,可以针对该渠道做 batching 处理。在积压出现的时候,把被积压的请求进行批量取出,批量调用,然后返回
当时没有回答得很好,在处理请求装饰器的时候手忙脚乱,请求可以收拢,但生命周期如何管理呢?可能还是常见的编程范式用多了,哈哈,后来和 GPT 聊天,它虽然没有给出正确的答案,但它对 channel 的熟练运用到是给了我启发
Go 有「share memory by communicating」,我们可以控制权逆转,让我们的 handler 陷入 <-chan 的等待状态(当时是预期进行接口 HiJack)
后来想想,这不就是批量扇出扇入吗?查了下资料,的确是几乎完全一致的题目
下附草稿解答
1package main
2
3import (
4 "encoding/json"
5 "fmt"
6 "math"
7 "net/http"
8 "sync"
9 "time"
10)
11
12func main() {
13 http.HandleFunc("/faces:detact", FacesDetactHandler)
14 http.HandleFunc("/faces:detactv2", FacesDetactHandlerV2)
15
16 const listenAt = ":8080"
17 fmt.Printf("serve at %s\n", listenAt)
18 err := http.ListenAndServe(listenAt, http.DefaultServeMux)
19
20 if err != nil {
21 fmt.Printf("http.ListenAndServe failed: %+v\n", err)
22 }
23}
24
25type Image []byte
26
27type Face struct{} /* position etc. */
28
29type DetactResult struct {
30 Faces []Face `json:"faces"`
31}
32
33func FacesDetactHandler(w http.ResponseWriter, r *http.Request) {
34 type Req struct {
35 Image Image `json:"image"`
36 }
37 var req Req
38 var err error
39 {
40 err = json.NewDecoder(r.Body).Decode(&req)
41 if err != nil {
42 w.WriteHeader(http.StatusBadRequest)
43 fmt.Fprintf(w, "error: %+v", err)
44 return
45 }
46 }
47
48 result := FacesDetact([]Image{req.Image})
49
50 json.NewEncoder(w).Encode(result[0])
51}
52
53type QueueItem struct {
54 Image Image
55 ResultWriteTo chan<- DetactResult // FIXME 可以根据情况分析有没有必要配置 len=1 的 buffer
56}
57
58var queue chan QueueItem
59
60var FacesDetactHandlerV2Initial = sync.OnceFunc(func() {
61 const maxBatchSize = 128
62 queue = make(chan QueueItem, maxBatchSize+1)
63 go func() {
64 for {
65 for queueIten := range queue {
66 var todo = []QueueItem{queueIten}
67 if len(queue) > 0 {
68 consumeQuota := maxBatchSize - len(todo)
69 if consumeQuota > len(queue) {
70 consumeQuota = len(queue)
71 }
72 for i := 0; i < consumeQuota; i++ {
73 todo = append(todo, <-queue)
74 }
75 }
76
77 var args []Image = make([]Image, len(todo))
78 {
79 for index, req := range todo {
80 args[index] = req.Image
81 }
82 }
83
84 // enable to check how many req has been batching
85 // fmt.Printf("args.length = %d\n", len(args))
86 results := FacesDetact(args)
87
88 for index, result := range results {
89 todo[index].ResultWriteTo <- result
90 }
91 }
92 }
93 }()
94})
95
96func FacesDetactHandlerV2(w http.ResponseWriter, r *http.Request) {
97 type Req struct {
98 Image Image `json:"image"`
99 }
100 var req Req
101 var err error
102 {
103 err = json.NewDecoder(r.Body).Decode(&req)
104 if err != nil {
105 w.WriteHeader(http.StatusBadRequest)
106 fmt.Fprintf(w, "error: %+v", err)
107 return
108 }
109 }
110
111 FacesDetactHandlerV2Initial()
112
113 resultQueue := make(chan DetactResult, 1)
114
115 queueReq := QueueItem{
116 Image: req.Image,
117 ResultWriteTo: resultQueue,
118 }
119
120 queue <- queueReq
121
122 result := <-resultQueue
123
124 json.NewEncoder(w).Encode(result)
125}
126
127var FacesDetactLocker sync.Mutex // 以防外部函数错误并行调用
128
129// FacesDetact 执行图像序列处理,因 GPU 底层限制,本函数不支持并行调用,请注意
130// images 与 results 为一对一的关系。本函数有冷启动时间,处理时间随图像数量近双曲线
131func FacesDetact(images []Image) (results []DetactResult) {
132 if len(images) <= 0 {
133 return
134 }
135 FacesDetactLocker.Lock()
136 defer FacesDetactLocker.Unlock()
137
138 results = make([]DetactResult, len(images))
139
140 time.Sleep(time.Duration(math.Pow(float64(len(images)), 0.1)) * 100 * time.Millisecond)
141
142 return
143}在面试结束后重写优化以及调试的时候,观察数据会很有趣,因为机械压测带来的流量(like wrk but wrk2)很有规律,联合起 poolSize、具体耗时曲线、请求到达频次、放行逻辑,它们组成了一个可供线性规划的区域。
此时此刻理解了 Java 仔,「这些都是可以调优的空间」。
队列中的压力、消费速度,其实都是函数曲线的一部分
自己测试中发现 Apifox 的压测启动速度真的超逊,远远不如 wrk。
BTW. 也许可以 vibe 一个 wrk2-go #TODO