最近在找工作,其中一家公司提出这道题目。

背景:FacesDetact() 只能串行调用,现在希望为程序添加批量处理功能,但不允许设置 delay 时间窗口,来一个请求,处理一个请求

分析:请求将会乱序到达,如果不设置数量或时间方面的窗口,其实程序需要处理的目标是积压态的网络请求(协程)

思路:如果需要对分散且独立到达的事件(参RxTS)做批量处理,其中必然需要有一个收束的阶段。如果能意识到这种现象的存在,最好是能意识到,可以针对该渠道做 batching 处理。在积压出现的时候,把被积压的请求进行批量取出,批量调用,然后返回

当时没有回答得很好,在处理请求装饰器的时候手忙脚乱,请求可以收拢,但生命周期如何管理呢?可能还是常见的编程范式用多了,哈哈,后来和 GPT 聊天,它虽然没有给出正确的答案,但它对 channel 的熟练运用到是给了我启发

Go 有「share memory by communicating」,我们可以控制权逆转,让我们的 handler 陷入 <-chan 的等待状态(当时是预期进行接口 HiJack)

后来想想,这不就是批量扇出扇入吗?查了下资料,的确是几乎完全一致的题目

下附草稿解答

  1package main
  2
  3import (
  4	"encoding/json"
  5	"fmt"
  6	"math"
  7	"net/http"
  8	"sync"
  9	"time"
 10)
 11
 12func main() {
 13	http.HandleFunc("/faces:detact", FacesDetactHandler)
 14	http.HandleFunc("/faces:detactv2", FacesDetactHandlerV2)
 15
 16	const listenAt = ":8080"
 17	fmt.Printf("serve at %s\n", listenAt)
 18	err := http.ListenAndServe(listenAt, http.DefaultServeMux)
 19
 20	if err != nil {
 21		fmt.Printf("http.ListenAndServe failed: %+v\n", err)
 22	}
 23}
 24
 25type Image []byte
 26
 27type Face struct{} /* position etc. */
 28
 29type DetactResult struct {
 30	Faces []Face `json:"faces"`
 31}
 32
 33func FacesDetactHandler(w http.ResponseWriter, r *http.Request) {
 34	type Req struct {
 35		Image Image `json:"image"`
 36	}
 37	var req Req
 38	var err error
 39	{
 40		err = json.NewDecoder(r.Body).Decode(&req)
 41		if err != nil {
 42			w.WriteHeader(http.StatusBadRequest)
 43			fmt.Fprintf(w, "error: %+v", err)
 44			return
 45		}
 46	}
 47
 48	result := FacesDetact([]Image{req.Image})
 49
 50	json.NewEncoder(w).Encode(result[0])
 51}
 52
 53type QueueItem struct {
 54	Image         Image
 55	ResultWriteTo chan<- DetactResult // FIXME 可以根据情况分析有没有必要配置 len=1 的 buffer
 56}
 57
 58var queue chan QueueItem
 59
 60var FacesDetactHandlerV2Initial = sync.OnceFunc(func() {
 61	const maxBatchSize = 128
 62	queue = make(chan QueueItem, maxBatchSize+1)
 63	go func() {
 64		for {
 65			for queueIten := range queue {
 66				var todo = []QueueItem{queueIten}
 67				if len(queue) > 0 {
 68					consumeQuota := maxBatchSize - len(todo)
 69					if consumeQuota > len(queue) {
 70						consumeQuota = len(queue)
 71					}
 72					for i := 0; i < consumeQuota; i++ {
 73						todo = append(todo, <-queue)
 74					}
 75				}
 76
 77				var args []Image = make([]Image, len(todo))
 78				{
 79					for index, req := range todo {
 80						args[index] = req.Image
 81					}
 82				}
 83
 84				// enable to check how many req has been batching
 85				// fmt.Printf("args.length = %d\n", len(args))
 86				results := FacesDetact(args)
 87
 88				for index, result := range results {
 89					todo[index].ResultWriteTo <- result
 90				}
 91			}
 92		}
 93	}()
 94})
 95
 96func FacesDetactHandlerV2(w http.ResponseWriter, r *http.Request) {
 97	type Req struct {
 98		Image Image `json:"image"`
 99	}
100	var req Req
101	var err error
102	{
103		err = json.NewDecoder(r.Body).Decode(&req)
104		if err != nil {
105			w.WriteHeader(http.StatusBadRequest)
106			fmt.Fprintf(w, "error: %+v", err)
107			return
108		}
109	}
110
111	FacesDetactHandlerV2Initial()
112
113	resultQueue := make(chan DetactResult, 1)
114
115	queueReq := QueueItem{
116		Image:         req.Image,
117		ResultWriteTo: resultQueue,
118	}
119
120	queue <- queueReq
121
122	result := <-resultQueue
123
124	json.NewEncoder(w).Encode(result)
125}
126
127var FacesDetactLocker sync.Mutex    // 以防外部函数错误并行调用
128
129// FacesDetact 执行图像序列处理,因 GPU 底层限制,本函数不支持并行调用,请注意
130// images 与 results 为一对一的关系。本函数有冷启动时间,处理时间随图像数量近双曲线
131func FacesDetact(images []Image) (results []DetactResult) {
132	if len(images) <= 0 {
133		return
134	}
135	FacesDetactLocker.Lock()
136	defer FacesDetactLocker.Unlock()
137
138	results = make([]DetactResult, len(images))
139
140	time.Sleep(time.Duration(math.Pow(float64(len(images)), 0.1)) * 100 * time.Millisecond)
141
142	return
143}

在面试结束后重写优化以及调试的时候,观察数据会很有趣,因为机械压测带来的流量(like wrk but wrk2)很有规律,联合起 poolSize、具体耗时曲线、请求到达频次、放行逻辑,它们组成了一个可供线性规划的区域。

此时此刻理解了 Java 仔,「这些都是可以调优的空间」。

队列中的压力、消费速度,其实都是函数曲线的一部分

自己测试中发现 Apifox 的压测启动速度真的超逊,远远不如 wrk。

BTW. 也许可以 vibe 一个 wrk2-go #TODO