go的并发编程

如果了解了GMP模型之后,自然了解go的并发特点,协程之间都可能是多线程并发执行的,通过开协程就可以实现并发:

  • package main
  • import (
  •    "fmt"
  •    "strconv"
  •    "time"
  • )
  • func main() {
  •    go test("1")
  •    go test("2")
  •    go test("3")
  •    test("main")
  •    time.Sleep(time.Second*10)
  • }
  • func test(name string)  {
  •    for i:=0;i<10;i++ {
  •       time.Sleep(1)
  •       fmt.Println(name+":  "+strconv.Itoa(i))
  •    }
  • }

输出:

要注意的是,GMP模型下,协程一定是并发的,但不一定是并行的

看代码可以看到,我额外加了一个sleep,那是因为main协程如果结束运行了,子协程也会直接结束,sleep等待子协程执行一会儿,这样才能打印出数据

这个实现方案显然不太好,我们可以通过waitGroup实现协程等待

WaitGroup

  • package main
  • import (
  •    "fmt"
  •    "strconv"
  •    "sync"
  • )
  • var wg sync.WaitGroup
  • func main() {
  •    for i:=1;i<=3;i++ {
  •       wg.Add(1)
  •       go test(strconv.Itoa(i))
  •    }
  •    wg.Add(1)
  •    test("main")
  •    wg.Wait()
  • }
  • func test(name string)  {
  •    defer wg.Done()
  •    for i:=0;i<10;i++ {
  •       time.Sleep(1)
  •       fmt.Println(name+":  "+strconv.Itoa(i))
  •    }
  • }

回到之前的代码,可看到我在for循环中增加了一个sleep,sleep的意义是让出时间片,从而去执行其他的协程进行并发 (GMP模型,如果没有让出时间片,同时所有协程都在同一个线程下时,协程之间将顺序执行,例如协程1运行完才会运行协程2)

主要实现了一个协程切换调度的功能

我们也可通过runtime包去做协程调度

runtime

  • package main
  • import (
  •    "fmt"
  •    "runtime"
  •    "strconv"
  •    "sync"
  • )
  • var wg sync.WaitGroup
  • func main() {
  •    //runtime.Gosched() //当前协程让出
  •    //runtime.Goexit() //直接退出当前协程
  •    //runtime.GOMAXPROCS(1) //限制P队列数量,如果为1,则无法并行
  •    //runtime.NumGoroutine() //返回正在执行和排队的协程数
  •    for i:=1;i<=3;i++ {
  •       wg.Add(1)
  •       go test(strconv.Itoa(i))
  •    }
  •    wg.Add(1)
  •    test("main")
  •    wg.Wait()
  • }
  • func test(name string)  {
  •    defer wg.Done()
  •    for i:=0;i<10;i++ {
  •       runtime.Gosched()
  •       fmt.Printf("当前协程数:%d \\n",runtime.NumGoroutine())
  •       fmt.Println(name+":  "+strconv.Itoa(i))
  •    }
  • }

并发问题

多开协程自然会有并发问题,我们可以通过waitGroup去控制主协程在子协程执行完之后进行操作,可以通过runtime包进行做协程并发切换,但这2个都没有涉及到变量共享问题,如何实现go的变量协程安全呢?

首先我们要理解一句话:

goroutine 奉行通过通信来共享内存,而不是共享内存来通信。

channel

通过channel,进行安全的传输变量

  • package main
  • import (
  •    "fmt"
  •    "runtime"
  •    "strconv"
  •    "sync"
  • )
  • var wg sync.WaitGroup
  • func main() {
  •    runtime.GOMAXPROCS(8)
  •    var chann = make(chan int)
  •    go func() {
  •       //模拟100条数据需要处理
  •       for i:=0;i<100;i++ {
  •          chann<-i
  •       }
  •       close(chann)
  •    }()
  •    //开3个协程处理
  •    for j := 0; j < 3; j++ {
  •       wg.Add(1)
  •       go queueHandle(strconv.Itoa(j),chann)
  •    }
  •    wg.Wait()
  • }
  • func queueHandle(name string,chann chan int)  {
  •    defer wg.Done()
  •    for i := range chann {
  •       fmt.Println("协程"+name+"处理数据:",i)
  •    }
  • }
展开

可看到,3个协程通过channel,安全的获取到了需要处理的通道数据:

协程变量安全

  • package main
  • import (
  •    "fmt"
  •    "runtime"
  •    "sync"
  •    "time"
  • )
  • var a int = 0
  • var wg sync.WaitGroup
  • func main() {
  •    runtime.GOMAXPROCS(8)
  •    for i := 0; i < 10000; i++ {
  •       go add()
  •    }
  •    time.Sleep(time.Second * 1)
  •    wg.Wait()
  •    fmt.Println("i:", a)
  • }
  • func add() {
  •    defer wg.Done()
  •    wg.Add(1)
  •    a += 1
  • }

开启足够多的协程之后,协程变量出现了协程污染,导致最后a的值小于10000:

sync包

上面的waitGroup,其实就是sync包的一种类型,sync中还存在着其他的类型

sync.Mutex互斥锁

  • package main
  • import (
  •    "fmt"
  •    "runtime"
  •    "sync"
  •    "time"
  • )
  • var a int = 0
  • var wg sync.WaitGroup
  • var lock sync.Mutex
  • func main() {
  •    //var lock sync.Mutex
  •    //lock.Lock() //加锁,加锁后其他协程调用将阻塞直到解锁
  •    //lock.Unlock() //解锁
  •    runtime.GOMAXPROCS(8)
  •    for i := 0; i < 10000; i++ {
  •       go add()
  •    }
  •    time.Sleep(time.Second * 1)
  •    wg.Wait()
  •    fmt.Println("i:", a)
  • }
  • func add() {
  •    defer wg.Done()
  •    wg.Add(1)
  •    lock.Lock()
  •    defer lock.Unlock()
  •    a += 1
  • }

sync.RWMutex  读写锁

  • func (rw *RWMutex) Lock()
  • func (rw *RWMutex) Unlock()
  • func (rw *RWMutex) RLock()
  • func (rw *RWMutex) RUnlock()

rwmutex基于 mutex实现,多个协程可以重复获取读锁,如果获取写锁,其他协程读锁也将阻塞,这个读写锁太简单了,不说了

sync.Once 只执行一次

当在高并发情况下时,我们可能需要保证一个函数只执行一次,例如单例模式,加载配置文件,等等,我们可以通过sync.once实现

  • func (o *Once) Do(f func()) {}

单例模式实现

  • package main
  • import (
  •    "fmt"
  •    "sync"
  • )
  • type testStruct struct {}
  • var singleton *testStruct
  • var  once =  sync.Once{}
  • func GetInstance()*testStruct  {
  •    once.Do(func() {
  •       singleton = &testStruct{}
  •       fmt.Println("执行实例化")
  •    })
  •    return singleton
  • }

sync.once内部存在一个mutex锁和一个bool值,如果bool为false,则通过mutex加锁执行一次,然后bool为true直接忽略执行

协程安全类型

代码中的加锁操作因为涉及内核态的上下文切换会比较耗时、代价比较高。针对基本数据类型我们还可以使用原子操作来保证并发安全

协程安全的变量类型有sync.map,atomic包等

太简单了,不讲了

本文为仙士可原创文章,转载无需和我联系,但请注明来自仙士可博客www.php20.cn

本站文章资源均来源自网络,除非特别声明,否则均不代表站方观点,并仅供查阅,不作为任何参考依据!
如有侵权请及时跟我们联系,本站将及时删除!
如遇版权问题,请查看 本站版权声明
THE END
分享
二维码
海报
<<上一篇
下一篇>>