Go学习笔记(14)-goroutine 和 channel

Go学习笔记(14)

goroutine 和 channel

goroutine-看一个需求

需求:要求统计 1-9000000000 的数字中,哪些是素数?

分析思路:

  1. 传统的方法,就是使用一个循环,循环的判断各个数是不是素数。[很慢]
  2. 使用并发或者并行的方式,将统计素数的任务分配给多个 goroutine 去完成,这时就会使用到goroutine.【速度提高 4 倍】

goroutine-基本介绍

进程和线程介绍

程序、进程和线程的关系示意图

并发和并行

并发和并行

  1. 多线程程序在单核上运行,就是并发
  2. 多线程程序在多核上运行,就是并行
  3. 示意图:

小结

Go 协程和 Go 主线程

​ Go 主线程(有程序员直接称为线程/也可以理解成进程): 一个 Go 线程上,可以起多个协程,你可以这样理解,协程是轻量级的线程[编译器做优化]。

Go 协程的特点

  1. 有独立的栈空间
  2. 共享程序堆空间
  3. 调度由用户控制
  4. 协程是轻量级的线程

一个示意图

goroutine-快速入门

案例说明

请编写一个程序,完成如下功能:

  1. 在主线程(可以理解成进程)中,开启一个 goroutine, 该协程每隔 1 秒输出 "hello,world"
  2. 在主线程中也每隔一秒输出"hello,golang", 输出 10 次后,退出程序
  3. 要求主线程和 goroutine 同时执行.
  4. 画出主线程和协程执行流程图

代码实现

输出的效果说明, main 这个主线程和 test 协程同时执行

主线程和协程执行流程图

快速入门小结

  1. 主线程是一个物理线程,直接作用在 cpu 上的。是重量级的,非常耗费 cpu 资源。
  2. 协程从主线程开启的,是轻量级的线程,是逻辑态。对资源消耗相对小。
  3. Golang 的协程机制是重要的特点,可以轻松的开启上万个协程。其它编程语言的并发机制是一般基于线程的,开启过多的线程,资源耗费大,这里就突显 Golang 在并发上的优势了

goroutine 的调度模型

MPG 模式基本介绍

MPG 模式运行的状态 1

MPG 模式运行的状态 2

设置 Golang 运行的 cpu 数

介绍:为了充分了利用多 cpu 的优势,在 Golang 程序中,设置运行的 cpu 数目

channel(管道)-看个需求

需求:现在要计算 1-200 的各个数的阶乘,并且把各个数的阶乘放入到 map 中。最后显示出来。要求使用 goroutine 完成

分析思路:

  1. 使用 goroutine 来完成,效率高,但是会出现并发/并行安全问题.
  2. 这里就提出了不同 goroutine 如何通信的问题

代码实现

  1. 使用 goroutine 来完成(看看使用 gorotine 并发完成会出现什么问题? 然后我们会去解决)

  2. 在运行某个程序时,如何知道是否存在资源竞争问题。 方法很简单,在编译该程序时,增加一个参数 -race 即可 [示意图]

  3. 代码实现:

package main

import (
	"fmt"
	"time"
)
// 需求:现在要计算 1-200 的各个数的阶乘,并且把各个数的阶乘放入到 map 中。
// 最后显示出来。要求使用 goroutine 完成
// 思路
// 1. 编写一个函数,来计算各个数的阶乘,并放入到 map 中.
// 2. 我们启动的协程多个,统计的将结果放入到 map 中
// 3. map 应该做出一个全局的.
var myMap = make(map[int]int, 10)

// test 函数就是计算 n!, 让将这个结果放入到 myMap
func test(n int)  {
	
	res := 1
	for i := 1; i <= n; i++ {
		res *= 1
	}
    //这里我们将 res 放入到 myMap
	myMap[n] = res 
}

func main() {
    // 我们这里开启多个协程完成这个任务[200 个]
	for i := 1; i <= 200; i++ {
		go test(i)
	}

    //休眠 10 秒钟【第二个问题 】
	time.Sleep(time.Second * 10)

    //这里我们输出结果,变量这个结果
	for i,v := range myMap {
		fmt.Printf("map[%d]=%d",i,v)
	}
}
  1. 示意图

不同 goroutine 之间如何通讯

  1. 全局变量的互斥锁

  2. 使用管道 channel 来解决

使用全局变量加锁同步改进程序

因为没有对全局变量 m 加锁,因此会出现资源争夺问题,代码会出现错误,提示 concurrent map writes

解决方案:加入互斥锁

我们的数的阶乘很大,结果会越界,可以将求阶乘改成 sum += uint64(i)

代码改进

为什么需要 channel

  1. 前面使用全局变量加锁同步来解决 goroutine 的通讯,但不完美
  2. 主线程在等待所有 goroutine 全部完成的时间很难确定,我们这里设置 10 秒,仅仅是估算。
  3. 如果主线程休眠时间长了,会加长等待时间,如果等待时间短了,可能还有 goroutine 处于工作状态,这时也会随主线程的退出而销毁
  4. 通过全局变量加锁同步来实现通讯,也并不利用多个协程对全局变量的读写操作。
  5. 上面种种分析都在呼唤一个新的通讯机制-channel

channel 的基本介绍

  1. channle 本质就是一个数据结构-队列【示意图】
  2. 数据是先进先出【FIFO : first in first out】
  3. 线程安全,多 goroutine 访问时,不需要加锁,就是说 channel 本身就是线程安全的
  4. channel 有类型的,一个 string 的 channel 只能存放 string 类型数据。
  5. 示意图:

定义/声明 channel

var 变量名 chan 数据类型

举例: var intChan chan int (intChan 用于存放 int 数据)

var mapChan chan map[int]string (mapChan 用于存放 map[int]string 类型)

var perChan chan Person

var perChan2 chan *Person ...

说明

channel 是引用类型

channel 必须初始化才能写入数据, 即 make 后才能使用

管道是有类型的,intChan 只能写入 整数 int

管道的初始化,写入数据到管道,从管道读取数据及基本的注意事项

package main

import (
	"fmt"
)

func main() {
    
    //演示一下管道的使用
    //1. 创建一个可以存放 3 个 int 类型的管道
    var intChan chan int
    intChan = make(chan int, 3)
    
    //2. 看看 intChan 是什么
    fmt.Printf("intChan 的值=%v intChan 本身的地址=%p\n", intChan, &intChan)
    
    //3. 向管道写入数据
    intChan<- 10
    num := 211
    intChan<- num
    intChan<- 50
    // intChan<- 98//注意点, 当我们给管写入数据时,不能超过其容量
    
    //4. 看看管道的长度和 cap(容量)
    fmt.Printf("channel len= %v cap=%v \n", len(intChan), cap(intChan)) // 3, 3
    
    //5. 从管道中读取数据
    var num2 int
    num2 = <-intChan
    fmt.Println("num2=", num2)
    fmt.Printf("channel len= %v cap=%v \n", len(intChan), cap(intChan)) // 2, 3
    
    //6. 在没有使用协程的情况下,如果我们的管道数据已经全部取出,再取就会报告 deadlock
    num3 := <-intChan
    num4 := <-intChan
    num5 := <-intChan
    fmt.Println("num3=", num3, "num4=", num4, "num5=", num5)
}

channel 使用的注意事项

  1. channel 中只能存放指定的数据类型
  2. channle 的数据放满后,就不能再放入了
  3. 如果从 channel 取出数据后,可以继续放入
  4. 在没有使用协程的情况下,如果 channel 数据取完了,再取,就会报 dead lock

读写 channel 案例演示

管道的课后练习题

package main

import (
	"fmt"
	"math/rand"
	"strconv"
	"time"
)

type Person struct {
	Name string
	Age int
	Address string
}


func main() {

	channel := make(chan interface{}, 10)
	rand.Seed(time.Now().UnixNano())

	for i := 0; i < 10; i++ {
		num := rand.Intn(100)
		strnum := strconv.Itoa(num)
		person := Person{
			Name: "cbx"+strnum,
			Age: num,
			Address: "sz"+strnum,
		}
		channel <- person
	}

	for i := 0; i < cap(channel); i++ {
		fmt.Println(<-channel)
	}	
	
}

channel 的遍历和关闭

channel 的关闭

​ 使用内置函数 close 可以关闭 channel, 当 channel 关闭后,就不能再向 channel 写数据了,但是仍然可以从该 channel 读取数据

案例演示:

channel 的遍历

channel 支持 for--range 的方式进行遍历,请注意两个细节

  1. 在遍历时,如果 channel 没有关闭,则回出现 deadlock 的错误
  2. 在遍历时,如果 channel 已经关闭,则会正常遍历数据,遍历完后,就会退出遍历。

channel 遍历和关闭的案例演示

看代码演示:

应用实例 1

思路分析:

代码的实现:

package main
import (
    "fmt"
    _ "time"
)

//write Data
func writeData(intChan chan int) {
    for i := 1; i <= 50; i++ {
        //放入数据
        intChan<- i
        fmt.Println("writeData ", i)
        //time.Sleep(time.Second)
	}
	close(intChan) //关闭
}


//read data
func readData(intChan chan int, exitChan chan bool) {
	for {
        v, ok := <-intChan
        if !ok {
        	break
        }
        //time.Sleep(time.Second)
        fmt.Printf("readData 读到数据=%v\n", v)
	}
    //readData 读取完数据后,即任务完成
    exitChan<- true
    close(exitChan)
}

func main() {
	//创建两个管道
	intChan := make(chan int, 50)
    exitChan := make(chan bool, 1)
	go writeData(intChan)
	go readData(intChan, exitChan)
	//time.Sleep(time.Second * 10)
	for {
        _, ok := <-exitChan
        if !ok {
        	break
        }
	}
}

应用实例 2-阻塞

应用实例 3

需求: 要求统计 1-200000 的数字中,哪些是素数?这个问题在本章开篇就提出了,现在我们有 goroutine和 channel 的知识后,就可以完成了 [测试数据: 80000]

分析思路:

传统的方法,就是使用一个循环,循环的判断各个数是不是素数【ok】。

使用并发/并行的方式,将统计素数的任务分配给多个(4 个)goroutine 去完成,完成任务时间短。

画出分析思路

代码实现

package main

import (
    "fmt"
    "time"
)

//向 intChan 放入 1-8000 个数
func putNum(intChan chan int) {
    for i := 1; i <= 8000; i++ {
    	intChan<- i
    }
    //关闭 intChan
    close(intChan)
}

// 从 intChan 取出数据,并判断是否为素数,如果是,就
// //放入到 primeChan
func primeNum(intChan chan int, primeChan chan int, exitChan chan bool) {
    //使用 for 循环
    // var num int
	var flag bool //
    for {
        time.Sleep(time.Millisecond * 10)
        num, ok := <-intChan
        
        if !ok { //intChan 取不到..
        	break
        }
        
        flag = true //假设是素数
        //判断 num 是不是素数
        for i := 2; i < num; i++ {
            if num % i == 0 {//说明该 num 不是素数
            	flag = false
            	break
            }
        }
        if flag {
            //将这个数就放入到 primeChan
            primeChan<- num
        }
    }
    
    fmt.Println("有一个 primeNum 协程因为取不到数据,退出")
    //这里我们还不能关闭 primeChan
    //向 exitChan 写入 true
    exitChan<- true
}

func main() {
    intChan := make(chan int , 1000)
    primeChan := make(chan int, 2000)//放入结果
    //标识退出的管道
    exitChan := make(chan bool, 4) // 4 个
    //开启一个协程,向 intChan 放入 1-8000 个数
    go putNum(intChan)
    //开启 4 个协程,从 intChan 取出数据,并判断是否为素数,如果是,就
    //放入到 primeChan
    for i := 0; i < 4; i++ {
    	go primeNum(intChan, primeChan, exitChan)
    }
    
    //这里我们主线程,进行处理
    //直接
    go func(){
        for i := 0; i < 4; i++ {
            <-exitChan
         }
         //当我们从 exitChan 取出了 4 个结果,就可以放心的关闭 prprimeChan
         close(primeChan)
     }()
        //遍历我们的 primeChan ,把结果取出
        for {
            res, ok := <-primeChan
            if !ok{
                break
            }
            //将结果输出
            fmt.Printf("素数=%d\n", res)
    	}	
    fmt.Println("main 线程退出")
}

结论:使用 go 协程后,执行的速度,比普通方法提高至少 4 倍

channel 使用细节和注意事项

  1. channel 可以声明为只读,或者只写性质 【案例演示】
  1. channel 只读和只写的最佳实践案例

  1. 使用 select 可以解决从管道取数据的阻塞问题
package main

import (
    "fmt"
    "time"
)

func main() {
    //使用 select 可以解决从管道取数据的阻塞问题
    //1.定义一个管道 10 个数据 int
    intChan := make(chan int, 10)
    
    for i := 0; i < 10; i++ {
    	intChan<- i
    }
    //2.定义一个管道 5 个数据 string
    stringChan := make(chan string, 5)
    
    for i := 0; i < 5; i++ {
    	stringChan <- "hello" + fmt.Sprintf("%d", i)
    }
    
    //传统的方法在遍历管道时,如果不关闭会阻塞而导致 deadlock
    //问题,在实际开发中,可能我们不好确定什么关闭该管道.
    //可以使用 select 方式可以解决
    //label:
    for {
        select {
            //注意: 这里,如果 intChan 一直没有关闭,不会一直阻塞而 deadlock
            //,会自动到下一个 case 匹配
            case v := <-intChan :
                fmt.Printf("从 intChan 读取的数据%d\n", v)
                time.Sleep(time.Second)
            case v := <-stringChan :
                fmt.Printf("从 stringChan 读取的数据%s\n", v)
                time.Sleep(time.Second)
            default :
                fmt.Printf("都取不到了,不玩了, 程序员可以加入逻辑\n")
                time.Sleep(time.Second)
                return
                //break label
        }
    }
}
  1. goroutine 中使用 recover,解决协程中出现 panic,导致程序崩溃问题

代码实现:

package main
import (
    "fmt"
    "time"
)

//函数
func sayHello() {
    for i := 0; i < 10; i++ {
        time.Sleep(time.Second)
        fmt.Println("hello,world")
    }
}

//函数
func test() {
    //这里我们可以使用 defer + recover
    defer func() {
    	//捕获 test 抛出的 panic
        if err := recover(); err != nil {
			fmt.Println("test() 发生错误", err)
		}
	}()
    //定义了一个 map
    var myMap map[int]string
    myMap[0] = "golang" //error
}

func main() {
    
    go sayHello()
    go test()
    
    for i := 0; i < 10; i++ {
        fmt.Println("main() ok=", i)
        time.Sleep(time.Second)
    }
}

sync.WaitGroup的用法

介绍

经常会看到以下了代码:

package main

import (
    "fmt"
    "time"
)

func main(){
    for i := 0; i < 100 ; i++{
        go fmt.Println(i)
    }
    time.Sleep(time.Second)
}

主线程为了等待goroutine都运行完毕,不得不在程序的末尾使用time.Sleep() 来睡眠一段时间,等待其他线程充分运行。对于简单的代码,100个for循环可以在1秒之内运行完毕,time.Sleep() 也可以达到想要的效果。

但是对于实际生活的大多数场景来说,1秒是不够的,并且大部分时候我们都无法预知for循环内代码运行时间的长短。这时候就不能使用time.Sleep() 来完成等待操作了。

可以考虑使用管道来完成上述操作:

func main() {
    c := make(chan bool, 100)
    for i := 0; i < 100; i++ {
        go func(i int) {
            fmt.Println(i)
            c <- true
        }(i)
    }

    for i := 0; i < 100; i++ {
        <-c
    }
}

首先可以肯定的是使用管道是能达到我们的目的的,而且不但能达到目的,还能十分完美的达到目的。

但是管道在这里显得有些大材小用,因为它被设计出来不仅仅只是在这里用作简单的同步处理,在这里使用管道实际上是不合适的。而且假设我们有一万、十万甚至更多的for循环,也要申请同样数量大小的管道出来,对内存也是不小的开销。

对于这种情况,go语言中有一个其他的工具sync.WaitGroup 能更加方便的帮助我们达到这个目的。

WaitGroup 对象内部有一个计数器,最初从0开始,它有三个方法:Add(), Done(), Wait() 用来控制计数器的数量。Add(n) 把计数器设置为nDone() 每次把计数器-1wait() 会阻塞代码的运行,直到计数器地值减为0

使用WaitGroup 将上述代码可以修改为:

func main() {
    wg := sync.WaitGroup{}
    wg.Add(100)
    for i := 0; i < 100; i++ {
        go func(i int) {
            fmt.Println(i)
            wg.Done()
        }(i)
    }
    wg.Wait()
}

这里首先把wg 计数设置为100, 每个for循环运行完毕都把计数器减一,主函数中使用Wait() 一直阻塞,直到wg为零——也就是所有的100for循环都运行完毕。相对于使用管道来说,WaitGroup 轻巧了许多。

注意事项

  1. 计数器不能为负值

add不是设置,是加上;是计数器不能为负值,不是add不能为负值,add可以为负的

  1. WaitGroup对象不是一个引用类型

WaitGroup对象不是一个引用类型,在通过函数传值的时候需要使用地址:

func main() {
    wg := sync.WaitGroup{}
    wg.Add(100)
    for i := 0; i < 100; i++ {
        go f(i, &wg)
    }
    wg.Wait()
}

// 一定要通过指针传值,不然进程会进入死锁状态
func f(i int, wg *sync.WaitGroup) { 
    fmt.Println(i)
    wg.Done()
}

作业

package main

import (
	"fmt"
)

type Result struct {
	N int
	Sum int
}


func MakeNumChan(ch chan int)  {
	for i := 1; i <= 2000; i++ {
		ch <- i
	}

	close(ch)
}


func GetSum(n int,ch chan Result)  {
	sum := 0
	for i := 1; i <= n; i++ {
		sum += i
	}

	res := Result{
		N: n,
		Sum: sum,
	}
	ch <- res
}

func GetSumRange(numCh chan int,start int,end int,ch chan Result,exitCh chan int)  {
	for i := start; i < end; i++ {
		n := <- numCh
		GetSum(n,ch)
	}
	exitCh <- 1
}

func main() {
	numCh := make(chan int, 0)

	// 启动协程输入1-2000的数
	go MakeNumChan(numCh)

	resCh := make(chan Result, 2000)

	exitCh := make(chan int,8)

	// 分8个协程计算和,并存入到resCh
	oneBatch := 2000 / 8
	for i := 0; i < 8; i++ {
		go GetSumRange(numCh,i * oneBatch,(i+1) * oneBatch,resCh,exitCh)
	}

	for {
		if len(exitCh) == 8 {
			close(exitCh)
			close(resCh)
			break
		}
	}

	for v := range resCh {
		fmt.Printf("res[%v]=%v\n",v.N,v.Sum)
	}
}
package main

import (
	"bufio"
	"fmt"
	"io"
	"log"
	"math/rand"
	"os"
	"sort"
	"strconv"
	"strings"
	"time"
)

func WriteDataToFile(fileIndex int,writeOkCh chan int)  {
	fileName := fmt.Sprintf("F:/files/%v.txt",fileIndex)
	f,err := os.OpenFile(fileName,os.O_CREATE|os.O_WRONLY,0666)

	if err != nil {
		log.Fatalf("打开文件失败,%v!!\n",err)
	}

	defer f.Close()

	rand.Seed(int64(time.Now().Nanosecond()))

	for i := 0; i < 1000; i++ {
		ele := rand.Intn(1000)
		s := fmt.Sprintf("%v\n",ele)
		f.WriteString(s)
	}
	writeOkCh <- 1
}

func Sort(fileIndex int,sortOkCh chan int)  {
	fileName := fmt.Sprintf("F:/files/%v.txt",fileIndex)
	f, err := os.Open(fileName)
	if err != nil {
		log.Fatalf("打开文件失败,%v!!\n",err)
	}

	defer f.Close()

	reader := bufio.NewReader(f)

	intArr := make([]int,0)
	for {
		str, err := reader.ReadString('\n')
		if err == io.EOF {
			break
		}
		str = strings.ReplaceAll(str,"\n","")
		ele,err := strconv.Atoi(str)
		if err != nil{
			log.Fatalf("转换数字失败,%v!!\n",err)
		}
		intArr = append(intArr,ele)
	}

	fmt.Println("len of intArr: ",len(intArr))

	sort.Ints(intArr)

	wFileName := fmt.Sprintf("F:/files/2%v.txt",fileIndex)

	wf, err := os.OpenFile(wFileName, os.O_CREATE|os.O_WRONLY, 0666)

	if err != nil {
		log.Fatalf("打开文件失败,%v!!\n",err)
	}

	defer wf.Close()

	for _, val := range intArr {
		s := fmt.Sprintf("%v\n",val)
		wf.WriteString(s)
	}

	sortOkCh <- 1

}


func main() {
	writeOkCh := make(chan int,10)
	for i := 1; i <= 10; i++ {
		go WriteDataToFile(i,writeOkCh)
	}

	for i := 0; i < 10; i++ {
		<- writeOkCh
	}

	sortOkCh := make(chan int,10)

	for i := 1; i <= 10; i++ {
		go Sort(i,sortOkCh)
	}

	for i := 0; i < 10; i++ {
		<- sortOkCh
	}
}

end
  • 作者:AWhiteElephant(联系作者)
  • 发表时间:2022-05-30 16:54
  • 版权声明:自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)
  • 转载声明:如果是转载栈主转载的文章,请附上原文链接
  • 评论