主协程依赖多数据源数据

func TestTimeout(t *testing.T) {
    // 主协程总超时时间
    mainTimeout := 3 * time.Second
    ctx, cancel := context.WithTimeout(context.Background(), mainTimeout)
    defer cancel()

    // 数据源配置:数据源名称和各自的超时时间
    dataSources := map[string]time.Duration{
        "DB1":     1 * time.Second,
        "API1":    2 * time.Second,
        "Cache1":  500 * time.Millisecond,
        "FileIO1": 4 * time.Second, // 这个会超过主协程超时
    }

    var wg sync.WaitGroup
    results := make(chan string, len(dataSources))

    // 为每个数据源启动协程
    for name, timeout := range dataSources {
        wg.Add(1)
        go func(sourceName string, sourceTimeout time.Duration) {
            defer wg.Done()

            // 为每个数据源创建带超时的context
            sourceCtx, cancel := context.WithTimeout(ctx, sourceTimeout)
            defer cancel()

            // 使用缓冲通道接收结果
            resultCh := make(chan string, 1)
            go fetchData(sourceCtx, sourceName, resultCh)

            select {
            case res := <-resultCh:
                results <- fmt.Sprintf("%s: %s", sourceName, res)
            case <-sourceCtx.Done():
                results <- fmt.Sprintf("%s: error - %v", sourceName, sourceCtx.Err())
            }
        }(name, timeout)
    }

    // 等待所有协程完成
    go func() {
        wg.Wait()
        close(results)
    }()

    // 收集结果
    for result := range results {
        fmt.Println(result)
    }
    fmt.Println("主协程结束")
}

func fetchData(ctx context.Context, name string, resultCh chan<- string) {
    // 模拟不同的处理时间
    delay := time.Duration(rand.Intn(3000)) * time.Millisecond
    select {
    case <-time.After(delay):
        resultCh <- fmt.Sprintf("成功获取数据 (耗时 %v)", delay)
    case <-ctx.Done():
        return
    }
}

results matching ""

    No results matching ""