1.等待组
概括与使用
等待组(sync.WaitGroup)用于等待一组 goroutine 完成任务,保证在并发环境中完成指定数量的任务
WaitGroup 提供了一种简单的机制来跟踪一组 goroutine 的状态。它主要包含以下三个方法:
- Add(delta int):向 WaitGroup 添加指定数量的等待任务,delta 可以是正数也可以是负数。当 delta 的值为正数时,表示新增等待任务;当 delta 的值为负数时,表示完成一部分等待任务。
- Done():表示一个等待任务已经完成,相当于调用 Add(-1)。
- Wait():等待所有的等待任务都完成。
类比如一个工厂可以并行生产一批零件,但是只有全部零件生产完成才能组装。
package main
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
// 模拟任务1
for i := 0; i < 5; i++ {
fmt.Println("Task 1:", i)
}
}()
go func() {
defer wg.Done()
// 模拟任务2
for i := 0; i < 5; i++ {
fmt.Println("Task 2:", i)
}
}()
wg.Wait()
fmt.Println("All tasks completed")
}输出如下

注意:
协程间传递时需要以指计的方式或闭包的方式引用 WaitGroup 对象。否则会产生操控了不同对象而产生死锁
上面使用闭包方法,下面是指针方法
wg := &sync.WaitGroup{}
wg.Add(2)
go func(w *sync.WaitGroup) {
defer w.Done()
// 模拟任务1
for i := 0; i < 5; i++ {
fmt.Println("Task 1:", i)
}
}(wg)如果注掉一个协程,wg次数不为零,就会产生死锁

输出如下:死锁

分工示例:
可以看到节省了好多时间
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var a, b = 1000, 10000
//1.
//计时开始
start := time.Now()
for i := 0; i < 10000000000; i++ {
mul(a, b)
}
//计时结束
t := time.Since(start)
fmt.Println(t)
//2.
var wg sync.WaitGroup
//计时开始
start = time.Now()
for i := 0; i < 20; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 500000000; j++ {
mul(a, b)
}
}()
}
wg.Wait()
//计时结束
t = time.Since(start)
fmt.Println(t)
fmt.Println("All tasks completed")
}
func mul(a, b int) int {
return a * b
}
原理
sync.WaitGroup 的原理是基于计数器的机制。它内部维护了一个整数计数器,初始值为0。
- 当调用 Add(delta int) 方法时,计数器的值会增加或减少指定的数量。
- 当调用 Done() 方法时,相当于调用 Add(-1),计数器的值减少1,表示一个等待任务已经完成。当计数器的值变为0时,即所有等待任务都已完成,Wait() 方法会解除阻塞,程序继续执行。
- 在调用 Wait() 方法时,如果计数器的值不为0,则会阻塞当前 goroutine,直到计数器的值变为0。这样可以确保在等待任务完成之前,主 goroutine 或其他 goroutine 不会提前退出。
2.互斥锁
互斥锁(sync.Mutex)是一种常用的同步机制,用于保护共享资源在并发环境下的安全访问。
举例:
在 Go 中,map 不是线程安全的,如果多个 goroutine 并发地读写同一个 map,可能会导致竞态条件和数据不一致的问题。
package main
import (
"fmt"
"sync"
)
func main() {
mp := make(map[string]int, 0)
list := []string{"A", "B", "C", "D"}
wg := sync.WaitGroup{}
for i := 0; i < 20; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for _, item := range list {
_, ok := mp[item]
if !ok {
mp[item] = 0
}
mp[item] += 1
}
}()
}
wg.Wait()
fmt.Println(mp)
}输出如下

一个解决方法是使用互斥锁(Mutex)(该例子不考虑效率)
package main
import (
"fmt"
"sync"
)
func main() {
mp := make(map[string]int, 0)
list := []string{"A", "B", "C", "D"}
wg := sync.WaitGroup{}
mu := sync.Mutex{}
for i := 0; i < 20; i++ {
wg.Add(1)
go func() {
defer wg.Done()
mu.Lock()
defer mu.Unlock()
for _, item := range list {
_, ok := mp[item]
if !ok {
mp[item] = 0
}
mp[item] += 1
}
}()
}
wg.Wait()
fmt.Println(mp)
}输出如下:

3.读写锁
读写锁(sync.RWMutex)是一种常用的线程同步机制,用于在多个读操作和写操作同时存在的情况下,保证数据的一致性和线程安全性。读写锁允许多个 goroutine 并发地读取共享数据,但只允许一个 goroutine 写入共享数据。
读写锁有两种状态:读状态和写状态。
- 在读状态下,多个 goroutine 可以并发地读取共享数据,但不能进行写操作。
- 在写状态下,只允许一个 goroutine 对共享数据进行写操作,其他 goroutine 都不能进行读或写操作。
在 Go 中,读写锁由 sync 包提供。sync.RWMutex 类型表示一个读写锁
常用方法:
- Lock 和 Unlock 用于写操作,
- Rlock 和 RUnlock 用于读操作。
读写锁(sync.RWMutex)允许多个协程同时获取读锁进行读操作,但只有一个协程能够获取写锁进行写操作。
package main
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup
// 定义一个包含锁和数据的结构体
type SafeMap struct {
sync.RWMutex
data map[string]int
}
// 初始化 SafeMap
sm := &SafeMap{
data: make(map[string]int),
}
// 启动多个协程进行读操作
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
sm.RLock()
defer sm.RUnlock()
// 读取数据
fmt.Println(sm.data)
}()
}
// 启动多个协程进行写操作
for i := 0; i < 3; i++ {
wg.Add(1)
go func() {
defer wg.Done()
sm.Lock()
defer sm.Unlock()
// 写入数据
sm.data["key"] = i
}()
}
wg.Wait()
}输出如下:

4.sync.Map
sync.Map 是 Go 语言标准库中提供的一种并发安全的 Map 类型。与普通的 map 不同,sync.Map 在并发访问时无需额外的锁机制,而是通过内部的算法实现了高效的并发安全。
sync.Map 常用的方法:
- Store(key, value any):存储键值对到 sync.Map 中,如果键已经存在,则更新其对应的值。
- Load(key any) (value any, ok bool):获取指定键的值,如果键不存在,则返回默认值和 false。
- LoadOrStore(key, value any) (actual any, loaded bool):获取指定键的值,如果键不存在,则存储指定的键值对,返回新存储的值和 false;如果键已经存在,则返回现有的值和 true。
- Delete(key any):删除指定键的键值对。
- LoadAndDelete(key any) (value any, loaded bool):接受一个键作为参数,并返回对应的值和一个布尔值,表示是否成功加载并删除键值对。如果键不存在,返回默认值和 false。
- Range(f func(key, value any) bool):遍历 sync.Map 中的所有键值对,并对每个键值对执行指定的函数。如果该函数返回 false,则停止遍历。
package main
import (
"fmt"
"sync"
)
func main() {
var m sync.Map
// 存储键值对
m.Store("key1", "value1")
m.Store("key2", "value2")
// 启动多个协程并发读取数据
var wg sync.WaitGroup
for i := 0; i < 3; i++ {
wg.Add(1)
go func() {
defer wg.Done()
if value, ok := m.Load("key2"); ok {
fmt.Println("Value found:", value)
}
}()
}
// 启动多个协程并发写入数据
for i := 0; i < 3; i++ {
wg.Add(1)
go func() {
defer wg.Done()
m.Store(fmt.Sprintf("key%d", i+3), fmt.Sprintf("value%d", i+3))
}()
}
wg.Wait()
// 遍历所有键值对
m.Range(func(key, value interface{}) bool {
fmt.Println("Key:", key, "Value:", value)
//如果返回false则不会继续遍历
return true
})
}输出如下

注意:
sync.Map 并不保证遍历时键值对的顺序,因为其内部数据结构不维护插入顺序。
sync.Map的方法保证了并发安全,但是多条方法之间不保证。
5.连接池
连接池(Connection Pool)是一种常见的技术,用于管理和重复使用与外部资源的连接,比如数据库连接、网络连接等。连接池可以提高应用程序的性能和效率,避免频繁地创建和销毁连接对象。
在 Go 语言中,你可以使用 sync.Pool 类型来实现连接池。sync.Pool 是Go标准库提供的一个并发安全的对象池。
sync.Pool 类型提供了以下常用方法:
- Get():从连接池中获取一个对象。如果连接池为空,则会调用 New 函数来创建新的对象。
- Put(x any):将一个对象放回连接池中,以便后续使用。被放回的对象可以被其他协程重复使用。
- New:一个函数类型,当连接池为空时,会调用该函数来创建新的对象。
package main
import (
"fmt"
"math/rand"
"sync"
)
type Connection struct {
ID int64
}
type ConPool struct {
sync.Pool
}
func main() {
pool := GetPool()
var wg sync.WaitGroup
numWorkers := 5
wg.Add(numWorkers)
//如果事先存在id1
conn := &Connection{
ID: 1,
}
pool.Put(conn)
for i := 0; i < numWorkers; i++ {
go func() {
defer wg.Done()
//可能获取id1,但是也可能不够创建一条新的
conn = pool.Get()
// 使用连接对象进行工作
fmt.Println("Working with connection ID:", conn.ID)
pool.Put(conn)
}()
}
wg.Wait()
fmt.Println("看看线程池里有多少线程")
for i := 0; i < 5; i++ {
//只Get不Put就不会放回去连接池
//但是可能已经回收了,会创建新的,所以这里简单看看可不可以获取重复协程
conn = pool.Get()
// 使用连接对象进行工作
fmt.Println("Look connection ID:", conn.ID)
}
}
func (c *ConPool) Put(conn *Connection) {
//判断,处理,同下
c.Pool.Put(conn)
}
func (c *ConPool) Get() *Connection {
conn := c.Pool.Get().(*Connection)
//判断,处理,需要判断对象是否已经回收,否则拿到了该协程,但是该协程对应的对象已经回收了
//......判断逻辑
return conn
}
func GetPool() *ConPool {
pool := &ConPool{
Pool: sync.Pool{
New: func() any {
fmt.Println("Creating new connection")
return &Connection{
ID: rand.Int63(),
}
},
},
}
return pool
}
6.sync.Once
sync.Once 类型有一个 Do 方法,Do 方法接收一个函数作为参数,并且仅在第一次调用时执行该函数,之后的调用都会被忽略。在多个协程同时调用 Do 方法时,只有一个协程会执行函数,其他协程会等待该函数执行完成。
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var once sync.Once
for i := 0; i < 5; i++ {
go func() {
once.Do(func() {
//对某初始化
fmt.Println("Initialize")
})
fmt.Println("Do something")
}()
}
time.Sleep(3 * time.Second)
}
注意:
- sync.Once 保证函数只会被执行一次,因此会输出 “Initialize”。而后续的协程在调用 Do 时,由于函数已经执行过了,所以只会输出 “Do something”。
- sync.Once 并不保证函数的执行顺序,它只保证函数只会被执行一次。因此,在上述示例中,输出的顺序可能会有所不同。
- sync.Once保证函数只会被执行一次,所以也是并发安全的,在do中是并发安全的
7.sync.cond
sync.Cond 是 Go 语言标准库中的条件变量类型,用于在多个 goroutine 之间进行同步和通信。
条件变量是一种线程间同步机制,它允许一个或多个 goroutine 等待另一个 goroutine 发出特定通知时被唤醒。条件变量通常和锁一起使用,以避免竞态条件(race condition)。
sync.Cond 的主要方法包括:
- Wait():阻塞当前 goroutine,并且释放与条件变量相关联的锁。当其他 goroutine 调用 Signal() 或者 Broadcast() 时,该 goroutine 会重新获得锁并继续执行。
- Signal():唤醒至少一个正在等待的 goroutine,如果没有等待的 goroutine,则不会做任何事情。
- Broadcast():唤醒所有正在等待的 goroutine。
通常使用 sync.NewCond(&sync.Mutex{}) 创建一个与互斥锁关联的条件变量。这样可以确保在调用 Wait() 方法时,能够正确地获取和释放锁,避免竞态条件的发生。
cond.L.Lock() 是 sync.Cond 类型的方法,用于获取条件变量的互斥锁,主要用于在等待条件时需要获取互斥锁进行保护。
package main
import (
"fmt"
"sync"
"time"
)
func main() {
lock := sync.Mutex{}
cond := sync.NewCond(&lock)
done := false
go func() {
cond.L.Lock()
for !done {
cond.Wait()
}
cond.L.Unlock()
fmt.Println("Task1 completed")
}()
go func() {
cond.L.Lock()
for !done {
cond.Wait()
}
cond.L.Unlock()
fmt.Println("Task2 completed")
}()
time.Sleep(time.Second)
fmt.Println("Signal task to complete")
done = true
//随机唤醒一个线程
//cond.Signal()
//唤醒所有线程
cond.Broadcast()
time.Sleep(time.Second)
}
在函数中传递要传递指针
package main
import (
"fmt"
"sync"
"time"
)
func main() {
list := make([]int, 0)
cond := sync.NewCond(&sync.Mutex{})
go read(&list, cond)
go read(&list, cond)
go read(&list, cond)
go read(&list, cond)
time.Sleep(time.Second)
ListInit(&list, cond)
time.Sleep(5 * time.Second)
}
func read(list *[]int, cond *sync.Cond) {
cond.L.Lock()
defer cond.L.Unlock()
for len(*list) == 0 {
fmt.Println("read wait")
cond.Wait()
}
fmt.Println("list = ", *list)
}
func ListInit(list *[]int, cond *sync.Cond) {
cond.L.Lock()
defer cond.L.Unlock()
for i := 0; i < 10; i++ {
*list = append(*list, i)
}
//唤醒所有线程
cond.Broadcast()
}
8.atomic 包
atomic包提供底层的原子级内存操作,用于实现同步算法
atomic 包是 Go 语言标准库中的一个包,用于提供原子操作。该包中的函数可以保证对于共享变量的并发访问是安全的,并且能够保证操作的原子性。
atomic 包中提供了一些常见的原子操作函数,比如增减、交换、比较等操作。这些函数能够保证在多个 goroutine(Go 语言中的轻量级线程)并发访问共享变量时,不会发生数据竞争和冲突。
以下是 atomic 包中一些常用的函数:

- atomic.StoreInt64() 函数将一个 int64 值存储到指定的内存地址中,
- atomic.LoadInt64() 函数则从指定的内存地址中加载并返回 int64 值。
- AddInt32、AddInt64:原子地将指定值加到某个变量的值上。
- SwapInt32、SwapInt64:原子地交换某个变量的值。
- CompareAndSwapInt32、CompareAndSwapInt64:比较并交换某个变量的值,如果当前值与旧值相等,则将新值赋给变量。
使用 atomic 包可以确保对共享变量的并发访问的正确性和可靠性,避免了传统的互斥锁带来的性能开销和复杂性。
package main
import (
"fmt"
"sync/atomic"
)
func main() {
var counter int64
//atomic.StoreInt64() 函数将一个 int64 值存储到指定的内存地址中
atomic.StoreInt64(&counter, 10)
//atomic.LoadInt64() 函数则从指定的内存地址中加载并返回 int64 值。
fmt.Println(atomic.LoadInt64(&counter))
}输出如下

比如:以下不是一个原子操作
counter = counter + 1
可以使用以下代替
//counter = counter + 1 atomic.AddInt64(&counter, 1) //输出 fmt.Println(counter)

交換变量的值:并返回原有的值
//交換变量的值:并返回原有的值 old := atomic.SwapInt64(&counter, 30) fmt.Println(old) fmt.Println(counter)

比较并替换原有的值,并返回是否替换成功,如果原有的值不同,替换失败
swapped := atomic.CompareAndSwapInt64(&counter, 30, 2) fmt.Println(swapped) fmt.Println(counter) swapInt64 := atomic.CompareAndSwapInt64(&counter, 30, 40) fmt.Println(swapInt64) fmt.Println(counter)

计数器
完成一个计数器
package main
import (
"fmt"
"sync"
"sync/atomic"
)
type atomicCounter struct {
count int64
}
func (c *atomicCounter) increase() {
atomic.AddInt64(&c.count, 1)
}
func (c *atomicCounter) Load() int64 {
return atomic.LoadInt64(&c.count)
}
func main() {
var count = atomicCounter{}
wg := sync.WaitGroup{}
for i := 0; i < 200; i++ {
wg.Add(1)
go func() {
defer wg.Done()
count.increase()
}()
}
wg.Wait()
fmt.Println(count.Load())
}
原子值
atomic.Value 是 Go 语言标准库中的一个类型,用于原子地读取和修改某个值。
atomic.Value 类型有两个方法:
- func (v *Value) Load() (x interface{}):以原子方式加载当前存储的值,并返回该值。
- func (v *Value) Store(x interface{}):以原子方式存储一个新的值。
package main
import (
"fmt"
"sync"
"sync/atomic"
)
func main() {
list := []string{"A", "B", "C", "D"}
//原子值
var atomicMp atomic.Value
mp := map[string]int{}
atomicMp.Store(&mp)
wg := sync.WaitGroup{}
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
atomicLaber:
m := atomicMp.Load().(*map[string]int)
newm := map[string]int{}
for k, v := range *m {
newm[k] = v
}
for _, item := range list {
_, ok := newm[item]
if !ok {
newm[item] = 0
}
newm[item] += 1
}
swap := atomicMp.CompareAndSwap(m, &newm)
if !swap {
goto atomicLaber
}
}()
}
wg.Wait()
fmt.Println(atomicMp.Load())
}
注意:
atomic.Value 可以存储任意类型的值,但是在加载时,需要使用类型断言将其转换回原始类型。如果类型不匹配,将导致运行时错误。
9.context包
在 Go 语言中,context 包提供了一种在多个 Goroutine 之间传递上下文信息、控制 Goroutine 生命周期的机制。通过 context 包,可以对 Goroutine 进行取消操作、设置超时、传递请求相关的值等。
以下是 context 包中常用的方法和函数:
- func Background() context.Context:创建一个空的根上下文(Background),它没有任何值和超时限制。通常用于最顶层的 Goroutine。
- func TODO() context.Context:创建一个空的上下文(TODO),它没有任何值或超时限制,但表示代码尚未实现完整。
- func WithCancel(parent Context) (ctx Context, cancelFunc CancelFunc):返回一个继承自父上下文(parent)的新上下文(ctx),并创建一个可以用来取消上下文的函数(cancelFunc)。当调用 cancelFunc 时,所有从该上下文派生的子上下文都会被取消。
- func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc):返回一个继承自父上下文(parent)的新上下文,它会在指定的截止时间(deadline)之后自动取消。
- func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc):返回一个继承自父上下文(parent)的新上下文,它会在指定的超时时间(timeout)之后自动取消。
- func WithValue(parent Context, key interface{}, val interface{}) Context:返回一个继承自父上下文(parent)的新上下文,并关联指定的键值对(key 和 val)。这个键值对可以通过 Value 方法在上下文中进行检索。
package main
import (
"context"
"fmt"
"time"
)
func main() {
// 创建一个上下文
ctx := context.Background()
// 启动一个 Goroutine
go func() {
//存储值
ctx = context.WithValue(ctx, "lang", "golang")
}()
time.Sleep(3 * time.Second)
// 从上下文中获取值
if value, ok := ctx.Value("lang").(string); ok {
fmt.Printf("Language is %s\n", value)
} else {
fmt.Println("Language not found")
}
// 尝试获取不存在的键
if value, ok := ctx.Value("version").(string); ok {
fmt.Printf("Version is %s\n", value)
} else {
fmt.Println("Version not found")
}
}输出如下:

ctx.Done() 是一个通道(channel),用于接收上下文取消的信号。当上下文被取消时,这个通道会收到一个值。
package main
import (
"context"
"fmt"
"time"
)
func worker(ctx context.Context, name string) {
for {
select {
case <-ctx.Done():
fmt.Printf("[%s] Context canceled\n", name)
return
default:
fmt.Printf("[%s] Working...\n", name)
time.Sleep(1 * time.Second)
}
}
}
func main() {
// 创建一个带有取消功能的上下文
ctx, cancel := context.WithCancel(context.Background())
// 启动两个 Goroutine,并传递上下文
go worker(ctx, "Worker 1")
go worker(ctx, "Worker 2")
// 让程序运行一段时间
time.Sleep(3 * time.Second)
// 取消上下文
cancel()
// 等待一段时间,观察输出
time.Sleep(3 * time.Second)
}
参考
atomic package – sync/atomic – Go Packages
sync package – sync – Go Packages
Documentation – The Go Programming Language (studygolang.com)


