一、概述

核心方法:

golang官方版本的介绍

  1. errgroup.WithContext, 初始化Group变量,生成新的带有cancel方法的context变量
1
2
3
4
func WithContext(ctx context.Context) (*Group, context.Context) {
ctx, cancel := context.WithCancel(ctx)
return &Group{cancel: cancel}, ctx
}
  1. Group.Go,发起协程,监听到error产生,调用cancel方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (g *Group) Go(f func() error) {
if g.sem != nil {
g.sem <- token{}
}

g.wg.Add(1)
go func() {
defer g.done()

if err := f(); err != nil {
g.errOnce.Do(func() {
g.err = err
if g.cancel != nil {
g.cancel()
}
})
}
}()
}
  1. Group.Wait,等待所有协程执行完毕
1
2
3
4
5
6
7
func (g *Group) Wait() error {
g.wg.Wait()
if g.cancel != nil {
g.cancel()
}
return g.err
}

用法示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func main() {
ctx := context.Background()
// 1. init group
group, newCtx := errgroup.WithContext(ctx)

// 2. initiate goroutine
group.Go(func() error {
// do something 1
})
group.Go(func() error {
// do something 2
})

// 3. wait for all goroutine to return
if err := group.Wait(); err != nil {
// handle error
}
}

对比

相较于golang官方版本

优点:

  1. 支持捕获panic并报错返回
  2. 解除对context类cancel方法的依赖

缺点:

  • 协程加倍,所以一般适用于协程数量不高的场景「go默认协程最多1万个」

为什么“解除对context类cancel方法的依赖”是优点,如何体现?
errgroup有一个「快速响应」的特性:发起10个协程进行rpc调用,那么其中1个协程的rpc调用失败报错返回,另外9个中未执行完毕的协程也会立刻返回,达成「快速响应」效果

想要让自己的程序具备「快速响应」的特性,前提要实现对context cancel事件的监听,而一些 golang 的 rpc框架「比如kitex、gorm」刚好有实现了这套监听的逻辑,所以才能触发「快速响应」;其次,如果我们只是将自定义的计算逻辑放入其中,同样也要实现对context cancel事件的监听才能达到「快速响应」的效果

所以,改进版直接解除调用对context cancel事件的依赖,维护一个新变量,同时主动为用户监听该变量的‘cancel’事件

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
// Package chitian/errgroup is an extension of the sync/errgroup which has this header comment:
//
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package errgroup

import (
"context"
"log"
"runtime"
"sync"
)
type Error string

func (e Error) Error() string {
return string(e)
}

const (
PanicError = Error("errgroup: goroutine panic")
)
// A Group is a collection of goroutines working on subtasks that are part of
// the same overall task.
//
// A zero Group is valid and does not cancel on error.
type Group struct {
doneOnce sync.Once
done chan struct{}

wg sync.WaitGroup
errOnce sync.Once
err error

ch chan struct{}
}

// default cancel on error.
func WithCancel() *Group {
return &Group{done: make(chan struct{})}
}

// Not Recommended, just to be compatibility with package "golang.org/x/sync/errgroup"
func WithContext(ctx context.Context) (*Group, context.Context) {
return &Group{done: make(chan struct{})}, ctx
}

// set max goroutine to work.
func (g *Group) SetLimit(n int) {
g.ch = make(chan struct{}, n)
}

// close channel only once
func (g *Group) cancel() {
if g.done == nil {
return
}
g.doneOnce.Do(func() {
close(g.done)
})
}

func (g *Group) done_() {
g.wg.Done()
if g.ch != nil {
<-g.ch
}
}

// Wait blocks until all function calls from the Go method have returned, then
// returns the first non-nil error (if any) from them.
func (g *Group) Wait() error {
g.wg.Wait()
g.cancel()
if g.ch != nil {
close(g.ch)
}
return g.err
}

// Go calls the given function in a new goroutine.
//
// The first call to return a non-nil error cancels the group; its error will be
// returned by Wait.
func (g *Group) Go(f func() error) {
if g.ch != nil {
select {
case <-g.done:
return
case g.ch <- struct{}{}:
break
}
}

g.wg.Add(1)
go func() {
defer g.done_()
localDone := make(chan error, 2)
go func() {
defer func() {
if err := recover(); err != nil {
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
log.Printf("panic: err=%s\n%s", err, string(buf))
localDone <- PanicError
}
close(localDone)
}()
if err := f(); err != nil {
localDone <- err
}
}()

select {
case err := <-localDone:
if err != nil {
g.errOnce.Do(func() {
g.err = err
})
g.cancel()
}
case <-g.done: // when g.done is nil, this case will be blocked
}
}()
}