主协程依赖多数据源数据
func TestTimeout(t *testing.T) {
// 主协程总超时时间
mainTimeout := 3 * time.Second
ctx, cancel := context.WithTimeout(context.Background(), mainTimeout)
defer cancel()
// 数据源配置:数据源名称和各自的超时时间
dataSources := map[string]time.Duration{
"DB1": 1 * time.Second,
"API1": 2 * time.Second,
"Cache1": 500 * time.Millisecond,
"FileIO1": 4 * time.Second, // 这个会超过主协程超时
}
var wg sync.WaitGroup
results := make(chan string, len(dataSources))
// 为每个数据源启动协程
for name, timeout := range dataSources {
wg.Add(1)
go func(sourceName string, sourceTimeout time.Duration) {
defer wg.Done()
// 为每个数据源创建带超时的context
sourceCtx, cancel := context.WithTimeout(ctx, sourceTimeout)
defer cancel()
// 使用缓冲通道接收结果
resultCh := make(chan string, 1)
go fetchData(sourceCtx, sourceName, resultCh)
select {
case res := <-resultCh:
results <- fmt.Sprintf("%s: %s", sourceName, res)
case <-sourceCtx.Done():
results <- fmt.Sprintf("%s: error - %v", sourceName, sourceCtx.Err())
}
}(name, timeout)
}
// 等待所有协程完成
go func() {
wg.Wait()
close(results)
}()
// 收集结果
for result := range results {
fmt.Println(result)
}
fmt.Println("主协程结束")
}
func fetchData(ctx context.Context, name string, resultCh chan<- string) {
// 模拟不同的处理时间
delay := time.Duration(rand.Intn(3000)) * time.Millisecond
select {
case <-time.After(delay):
resultCh <- fmt.Sprintf("成功获取数据 (耗时 %v)", delay)
case <-ctx.Done():
return
}
}