Go并发

1.等待组

概括与使用

等待组(sync.WaitGroup)用于等待一组 goroutine 完成任务,保证在并发环境中完成指定数量的任务

WaitGroup 提供了一种简单的机制来跟踪一组 goroutine 的状态。它主要包含以下三个方法:

  • Add(delta int):向 WaitGroup 添加指定数量的等待任务,delta 可以是正数也可以是负数。当 delta 的值为正数时,表示新增等待任务;当 delta 的值为负数时,表示完成一部分等待任务。
  • Done():表示一个等待任务已经完成,相当于调用 Add(-1)。
  • Wait():等待所有的等待任务都完成。

类比如一个工厂可以并行生产一批零件,但是只有全部零件生产完成才能组装。

package main

import (
   "fmt"
   "sync"
)

func main() {
   var wg sync.WaitGroup
   wg.Add(2)
   go func() {
      defer wg.Done()
      // 模拟任务1
      for i := 0; i < 5; i++ {
         fmt.Println("Task 1:", i)
      }
   }()
   go func() {
      defer wg.Done()
      // 模拟任务2
      for i := 0; i < 5; i++ {
         fmt.Println("Task 2:", i)
      }
   }()
   wg.Wait()
   fmt.Println("All tasks completed")
}

输出如下

注意:

协程间传递时需要以指计的方式或闭包的方式引用 WaitGroup 对象。否则会产生操控了不同对象而产生死锁

上面使用闭包方法,下面是指针方法

wg := &sync.WaitGroup{}
wg.Add(2)
go func(w *sync.WaitGroup) {
    defer w.Done()
    // 模拟任务1
    for i := 0; i < 5; i++ {
        fmt.Println("Task 1:", i)
    }
}(wg)

如果注掉一个协程,wg次数不为零,就会产生死锁

输出如下:死锁

分工示例:

可以看到节省了好多时间

package main

import (
   "fmt"
   "sync"
   "time"
)

func main() {
   var a, b = 1000, 10000
   //1.
   //计时开始
   start := time.Now()
   for i := 0; i < 10000000000; i++ {
      mul(a, b)
   }
   //计时结束
   t := time.Since(start)
   fmt.Println(t)
   //2.
   var wg sync.WaitGroup
   //计时开始
   start = time.Now()
   for i := 0; i < 20; i++ {
      wg.Add(1)
      go func() {
         defer wg.Done()
         for j := 0; j < 500000000; j++ {
            mul(a, b)
         }
      }()
   }
   wg.Wait()
   //计时结束
   t = time.Since(start)
   fmt.Println(t)
   fmt.Println("All tasks completed")
}
func mul(a, b int) int {
   return a * b
}

 

原理

sync.WaitGroup 的原理是基于计数器的机制。它内部维护了一个整数计数器,初始值为0。

  • 当调用 Add(delta int) 方法时,计数器的值会增加或减少指定的数量。
  • 当调用 Done() 方法时,相当于调用 Add(-1),计数器的值减少1,表示一个等待任务已经完成。当计数器的值变为0时,即所有等待任务都已完成,Wait() 方法会解除阻塞,程序继续执行。
  • 在调用 Wait() 方法时,如果计数器的值不为0,则会阻塞当前 goroutine,直到计数器的值变为0。这样可以确保在等待任务完成之前,主 goroutine 或其他 goroutine 不会提前退出。

 

2.互斥锁

互斥锁(sync.Mutex)是一种常用的同步机制,用于保护共享资源在并发环境下的安全访问。

举例:

在 Go 中,map 不是线程安全的,如果多个 goroutine 并发地读写同一个 map,可能会导致竞态条件和数据不一致的问题。

package main

import (
   "fmt"
   "sync"
)

func main() {
   mp := make(map[string]int, 0)
   list := []string{"A", "B", "C", "D"}
   wg := sync.WaitGroup{}
   for i := 0; i < 20; i++ {
      wg.Add(1)
      go func() {
         defer wg.Done()
         for _, item := range list {
            _, ok := mp[item]
            if !ok {
               mp[item] = 0
            }
            mp[item] += 1
         }
      }()
   }
   wg.Wait()
   fmt.Println(mp)
}

输出如下

一个解决方法是使用互斥锁(Mutex)(该例子不考虑效率)

package main

import (
   "fmt"
   "sync"
)

func main() {
   mp := make(map[string]int, 0)
   list := []string{"A", "B", "C", "D"}
   wg := sync.WaitGroup{}
   mu := sync.Mutex{}
   for i := 0; i < 20; i++ {
      wg.Add(1)
      go func() {
         defer wg.Done()
         mu.Lock()
         defer mu.Unlock()
         for _, item := range list {
            _, ok := mp[item]
            if !ok {
               mp[item] = 0
            }
            mp[item] += 1
         }
      }()
   }
   wg.Wait()
   fmt.Println(mp)
}

输出如下:

 

3.读写锁

读写锁(sync.RWMutex)是一种常用的线程同步机制,用于在多个读操作和写操作同时存在的情况下,保证数据的一致性和线程安全性。读写锁允许多个 goroutine 并发地读取共享数据,但只允许一个 goroutine 写入共享数据。

读写锁有两种状态:读状态和写状态。

  • 在读状态下,多个 goroutine 可以并发地读取共享数据,但不能进行写操作。
  • 在写状态下,只允许一个 goroutine 对共享数据进行写操作,其他 goroutine 都不能进行读或写操作。

在 Go 中,读写锁由 sync 包提供。sync.RWMutex 类型表示一个读写锁

常用方法:

  • Lock 和 Unlock 用于写操作,
  • Rlock 和 RUnlock 用于读操作。

读写锁(sync.RWMutex)允许多个协程同时获取读锁进行读操作,但只有一个协程能够获取写锁进行写操作。

package main

import (
   "fmt"
   "sync"
)

func main() {
   var wg sync.WaitGroup
   // 定义一个包含锁和数据的结构体
   type SafeMap struct {
      sync.RWMutex
      data map[string]int
   }

   // 初始化 SafeMap
   sm := &SafeMap{
      data: make(map[string]int),
   }

   // 启动多个协程进行读操作
   for i := 0; i < 5; i++ {
      wg.Add(1)
      go func() {
         defer wg.Done()
         sm.RLock()
         defer sm.RUnlock()
         // 读取数据
         fmt.Println(sm.data)
      }()
   }
   // 启动多个协程进行写操作
   for i := 0; i < 3; i++ {
      wg.Add(1)
      go func() {
         defer wg.Done()
         sm.Lock()
         defer sm.Unlock()
         // 写入数据
         sm.data["key"] = i
      }()
   }
   wg.Wait()
}

输出如下:

 

4.sync.Map

sync.Map 是 Go 语言标准库中提供的一种并发安全的 Map 类型。与普通的 map 不同,sync.Map 在并发访问时无需额外的锁机制,而是通过内部的算法实现了高效的并发安全。

sync.Map 常用的方法:

  • Store(key, value any):存储键值对到 sync.Map 中,如果键已经存在,则更新其对应的值。
  • Load(key any) (value any, ok bool):获取指定键的值,如果键不存在,则返回默认值和 false。
  • LoadOrStore(key, value any) (actual any, loaded bool):获取指定键的值,如果键不存在,则存储指定的键值对,返回新存储的值和 false;如果键已经存在,则返回现有的值和 true。
  • Delete(key any):删除指定键的键值对。
  • LoadAndDelete(key any) (value any, loaded bool):接受一个键作为参数,并返回对应的值和一个布尔值,表示是否成功加载并删除键值对。如果键不存在,返回默认值和 false。
  • Range(f func(key, value any) bool):遍历 sync.Map 中的所有键值对,并对每个键值对执行指定的函数。如果该函数返回 false,则停止遍历。
package main

import (
   "fmt"
   "sync"
)

func main() {
   var m sync.Map
   // 存储键值对
   m.Store("key1", "value1")
   m.Store("key2", "value2")
   // 启动多个协程并发读取数据
   var wg sync.WaitGroup
   for i := 0; i < 3; i++ {
      wg.Add(1)
      go func() {
         defer wg.Done()
         if value, ok := m.Load("key2"); ok {
            fmt.Println("Value found:", value)
         }
      }()
   }
   // 启动多个协程并发写入数据
   for i := 0; i < 3; i++ {
      wg.Add(1)
      go func() {
         defer wg.Done()
         m.Store(fmt.Sprintf("key%d", i+3), fmt.Sprintf("value%d", i+3))
      }()
   }
   wg.Wait()
   // 遍历所有键值对
   m.Range(func(key, value interface{}) bool {
      fmt.Println("Key:", key, "Value:", value)
      //如果返回false则不会继续遍历
      return true
   })
}

输出如下

注意:

sync.Map 并不保证遍历时键值对的顺序,因为其内部数据结构不维护插入顺序。

sync.Map的方法保证了并发安全,但是多条方法之间不保证。

 

5.连接池

连接池(Connection Pool)是一种常见的技术,用于管理和重复使用与外部资源的连接,比如数据库连接、网络连接等。连接池可以提高应用程序的性能和效率,避免频繁地创建和销毁连接对象。

在 Go 语言中,你可以使用 sync.Pool 类型来实现连接池。sync.Pool 是Go标准库提供的一个并发安全的对象池。

sync.Pool 类型提供了以下常用方法:

  • Get():从连接池中获取一个对象。如果连接池为空,则会调用 New 函数来创建新的对象。
  • Put(x any):将一个对象放回连接池中,以便后续使用。被放回的对象可以被其他协程重复使用。
  • New:一个函数类型,当连接池为空时,会调用该函数来创建新的对象。
package main

import (
   "fmt"
   "math/rand"
   "sync"
)

type Connection struct {
   ID int64
}

type ConPool struct {
   sync.Pool
}

func main() {
   pool := GetPool()
   var wg sync.WaitGroup
   numWorkers := 5
   wg.Add(numWorkers)
   //如果事先存在id1
   conn := &Connection{
      ID: 1,
   }
   pool.Put(conn)
   for i := 0; i < numWorkers; i++ {
      go func() {
         defer wg.Done()
         //可能获取id1,但是也可能不够创建一条新的
         conn = pool.Get()
         // 使用连接对象进行工作
         fmt.Println("Working with connection ID:", conn.ID)
         pool.Put(conn)
      }()
   }
   wg.Wait()

   fmt.Println("看看线程池里有多少线程")
   for i := 0; i < 5; i++ {
      //只Get不Put就不会放回去连接池
      //但是可能已经回收了,会创建新的,所以这里简单看看可不可以获取重复协程
      conn = pool.Get()
      // 使用连接对象进行工作
      fmt.Println("Look connection ID:", conn.ID)
   }
}

func (c *ConPool) Put(conn *Connection) {
   //判断,处理,同下
   c.Pool.Put(conn)

}

func (c *ConPool) Get() *Connection {
   conn := c.Pool.Get().(*Connection)
   //判断,处理,需要判断对象是否已经回收,否则拿到了该协程,但是该协程对应的对象已经回收了
   //......判断逻辑

   return conn

}

func GetPool() *ConPool {
   pool := &ConPool{
      Pool: sync.Pool{
         New: func() any {
            fmt.Println("Creating new connection")
            return &Connection{
               ID: rand.Int63(),
            }
         },
      },
   }
   return pool
}
输出如下:

 

 

6.sync.Once

sync.Once 类型有一个 Do 方法,Do 方法接收一个函数作为参数,并且仅在第一次调用时执行该函数,之后的调用都会被忽略。在多个协程同时调用 Do 方法时,只有一个协程会执行函数,其他协程会等待该函数执行完成。

package main
import (
 "fmt"
 "sync"
 "time"
)

func main() {
    var once sync.Once
    for i := 0; i < 5; i++ {
         go func() {
              once.Do(func() {
                   //对某初始化
                   fmt.Println("Initialize")
                 })
              fmt.Println("Do something")
            }()
       }
    time.Sleep(3 * time.Second)
}

注意:

  • sync.Once 保证函数只会被执行一次,因此会输出 “Initialize”。而后续的协程在调用 Do 时,由于函数已经执行过了,所以只会输出 “Do something”。
  • sync.Once 并不保证函数的执行顺序,它只保证函数只会被执行一次。因此,在上述示例中,输出的顺序可能会有所不同。
  • sync.Once保证函数只会被执行一次,所以也是并发安全的,在do中是并发安全的

 

7.sync.cond

sync.Cond 是 Go 语言标准库中的条件变量类型,用于在多个 goroutine 之间进行同步和通信。

条件变量是一种线程间同步机制,它允许一个或多个 goroutine 等待另一个 goroutine 发出特定通知时被唤醒。条件变量通常和锁一起使用,以避免竞态条件(race condition)。

sync.Cond 的主要方法包括:

  • Wait():阻塞当前 goroutine,并且释放与条件变量相关联的锁。当其他 goroutine 调用 Signal() 或者 Broadcast() 时,该 goroutine 会重新获得锁并继续执行。
  • Signal():唤醒至少一个正在等待的 goroutine,如果没有等待的 goroutine,则不会做任何事情。
  • Broadcast():唤醒所有正在等待的 goroutine。

通常使用 sync.NewCond(&sync.Mutex{}) 创建一个与互斥锁关联的条件变量。这样可以确保在调用 Wait() 方法时,能够正确地获取和释放锁,避免竞态条件的发生。

cond.L.Lock() 是 sync.Cond 类型的方法,用于获取条件变量的互斥锁,主要用于在等待条件时需要获取互斥锁进行保护。

package main

import (
   "fmt"
   "sync"
   "time"
)

func main() {
   lock := sync.Mutex{}
   cond := sync.NewCond(&lock)
   done := false
   go func() {
      cond.L.Lock()
      for !done {
         cond.Wait()
      }
      cond.L.Unlock()
      fmt.Println("Task1 completed")
   }()
   go func() {
      cond.L.Lock()
      for !done {
         cond.Wait()
      }
      cond.L.Unlock()
      fmt.Println("Task2 completed")
   }()
   time.Sleep(time.Second)
   fmt.Println("Signal task to complete")
   done = true
   //随机唤醒一个线程
   //cond.Signal()
   //唤醒所有线程
   cond.Broadcast()
   time.Sleep(time.Second)
}

 

在函数中传递要传递指针

package main

import (
   "fmt"
   "sync"
   "time"
)

func main() {
   list := make([]int, 0)
   cond := sync.NewCond(&sync.Mutex{})
   go read(&list, cond)
   go read(&list, cond)
   go read(&list, cond)
   go read(&list, cond)
   time.Sleep(time.Second)
   ListInit(&list, cond)
   time.Sleep(5 * time.Second)
}
func read(list *[]int, cond *sync.Cond) {
   cond.L.Lock()
   defer cond.L.Unlock()
   for len(*list) == 0 {
      fmt.Println("read wait")
      cond.Wait()
   }
   fmt.Println("list = ", *list)
}

func ListInit(list *[]int, cond *sync.Cond) {
   cond.L.Lock()
   defer cond.L.Unlock()
   for i := 0; i < 10; i++ {
      *list = append(*list, i)
   }
   //唤醒所有线程
   cond.Broadcast()
}

 

8.atomic 包

atomic包提供底层的原子级内存操作,用于实现同步算法

atomic 包是 Go 语言标准库中的一个包,用于提供原子操作。该包中的函数可以保证对于共享变量的并发访问是安全的,并且能够保证操作的原子性。

atomic 包中提供了一些常见的原子操作函数,比如增减、交换、比较等操作。这些函数能够保证在多个 goroutine(Go 语言中的轻量级线程)并发访问共享变量时,不会发生数据竞争和冲突。

以下是 atomic 包中一些常用的函数:

 

  • atomic.StoreInt64() 函数将一个 int64 值存储到指定的内存地址中,
  • atomic.LoadInt64() 函数则从指定的内存地址中加载并返回 int64 值。
  • AddInt32、AddInt64:原子地将指定值加到某个变量的值上。
  • SwapInt32、SwapInt64:原子地交换某个变量的值。
  • CompareAndSwapInt32、CompareAndSwapInt64:比较并交换某个变量的值,如果当前值与旧值相等,则将新值赋给变量。

使用 atomic 包可以确保对共享变量的并发访问的正确性和可靠性,避免了传统的互斥锁带来的性能开销和复杂性。

package main

import (
   "fmt"
   "sync/atomic"
)

func main() {
   var counter int64
   //atomic.StoreInt64() 函数将一个 int64 值存储到指定的内存地址中
   atomic.StoreInt64(&counter, 10)
   //atomic.LoadInt64() 函数则从指定的内存地址中加载并返回 int64 值。
   fmt.Println(atomic.LoadInt64(&counter))
}

输出如下

比如:以下不是一个原子操作

counter = counter + 1

可以使用以下代替

//counter = counter + 1
atomic.AddInt64(&counter, 1)
//输出
fmt.Println(counter)

交換变量的值:并返回原有的值

//交換变量的值:并返回原有的值
old := atomic.SwapInt64(&counter, 30)
fmt.Println(old)
fmt.Println(counter)

比较并替换原有的值,并返回是否替换成功,如果原有的值不同,替换失败

swapped := atomic.CompareAndSwapInt64(&counter, 30, 2)
fmt.Println(swapped)
fmt.Println(counter)

swapInt64 := atomic.CompareAndSwapInt64(&counter, 30, 40)
fmt.Println(swapInt64)
fmt.Println(counter)

 

计数器

完成一个计数器

package main

import (
   "fmt"
   "sync"
   "sync/atomic"
)

type atomicCounter struct {
   count int64
}

func (c *atomicCounter) increase() {
   atomic.AddInt64(&c.count, 1)
}

func (c *atomicCounter) Load() int64 {
   return atomic.LoadInt64(&c.count)
}

func main() {
   var count = atomicCounter{}
   wg := sync.WaitGroup{}
   for i := 0; i < 200; i++ {
      wg.Add(1)
      go func() {
         defer wg.Done()
         count.increase()
      }()
   }
   wg.Wait()
   fmt.Println(count.Load())
}

 

原子值

atomic.Value 是 Go 语言标准库中的一个类型,用于原子地读取和修改某个值。

atomic.Value 类型有两个方法:

  • func (v *Value) Load() (x interface{}):以原子方式加载当前存储的值,并返回该值。
  • func (v *Value) Store(x interface{}):以原子方式存储一个新的值。
package main

import (
   "fmt"
   "sync"
   "sync/atomic"
)

func main() {
   list := []string{"A", "B", "C", "D"}
   //原子值
   var atomicMp atomic.Value
   mp := map[string]int{}
   atomicMp.Store(&mp)
   wg := sync.WaitGroup{}
   for i := 0; i < 100; i++ {
      wg.Add(1)
      go func() {
         defer wg.Done()
      atomicLaber:
         m := atomicMp.Load().(*map[string]int)
         newm := map[string]int{}
         for k, v := range *m {
            newm[k] = v
         }
         for _, item := range list {
            _, ok := newm[item]
            if !ok {
               newm[item] = 0
            }
            newm[item] += 1
         }
         swap := atomicMp.CompareAndSwap(m, &newm)
         if !swap {
            goto atomicLaber
         }
      }()
   }
   wg.Wait()
   fmt.Println(atomicMp.Load())
}

注意:

atomic.Value 可以存储任意类型的值,但是在加载时,需要使用类型断言将其转换回原始类型。如果类型不匹配,将导致运行时错误。

 

9.context包

在 Go 语言中,context 包提供了一种在多个 Goroutine 之间传递上下文信息、控制 Goroutine 生命周期的机制。通过 context 包,可以对 Goroutine 进行取消操作、设置超时、传递请求相关的值等。

以下是 context 包中常用的方法和函数:

  • func Background() context.Context:创建一个空的根上下文(Background),它没有任何值和超时限制。通常用于最顶层的 Goroutine。
  • func TODO() context.Context:创建一个空的上下文(TODO),它没有任何值或超时限制,但表示代码尚未实现完整。
  • func WithCancel(parent Context) (ctx Context, cancelFunc CancelFunc):返回一个继承自父上下文(parent)的新上下文(ctx),并创建一个可以用来取消上下文的函数(cancelFunc)。当调用 cancelFunc 时,所有从该上下文派生的子上下文都会被取消。
  • func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc):返回一个继承自父上下文(parent)的新上下文,它会在指定的截止时间(deadline)之后自动取消。
  • func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc):返回一个继承自父上下文(parent)的新上下文,它会在指定的超时时间(timeout)之后自动取消。
  • func WithValue(parent Context, key interface{}, val interface{}) Context:返回一个继承自父上下文(parent)的新上下文,并关联指定的键值对(key 和 val)。这个键值对可以通过 Value 方法在上下文中进行检索。
package main

import (
   "context"
   "fmt"
   "time"
)

func main() {
   // 创建一个上下文
   ctx := context.Background()
   // 启动一个 Goroutine
   go func() {
      //存储值
      ctx = context.WithValue(ctx, "lang", "golang")
   }()
   time.Sleep(3 * time.Second)
   // 从上下文中获取值
   if value, ok := ctx.Value("lang").(string); ok {
      fmt.Printf("Language is %s\n", value)
   } else {
      fmt.Println("Language not found")
   }

   // 尝试获取不存在的键
   if value, ok := ctx.Value("version").(string); ok {
      fmt.Printf("Version is %s\n", value)
   } else {
      fmt.Println("Version not found")
   }
}

输出如下:

ctx.Done() 是一个通道(channel),用于接收上下文取消的信号。当上下文被取消时,这个通道会收到一个值。

package main

import (
   "context"
   "fmt"
   "time"
)

func worker(ctx context.Context, name string) {
   for {
      select {
      case <-ctx.Done():
         fmt.Printf("[%s] Context canceled\n", name)
         return
      default:
         fmt.Printf("[%s] Working...\n", name)
         time.Sleep(1 * time.Second)
      }
   }
}

func main() {
   // 创建一个带有取消功能的上下文
   ctx, cancel := context.WithCancel(context.Background())
   // 启动两个 Goroutine,并传递上下文
   go worker(ctx, "Worker 1")
   go worker(ctx, "Worker 2")
   // 让程序运行一段时间
   time.Sleep(3 * time.Second)
   // 取消上下文
   cancel()
   // 等待一段时间,观察输出
   time.Sleep(3 * time.Second)
}

 

参考

atomic package – sync/atomic – Go Packages

sync package – sync – Go Packages

Documentation – The Go Programming Language (studygolang.com)

暂无评论

发送评论 编辑评论

|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇