Go学习笔记(14)
goroutine 和 channel
goroutine-看一个需求
需求:要求统计 1-9000000000 的数字中,哪些是素数?
分析思路:
- 传统的方法,就是使用一个循环,循环的判断各个数是不是素数。[很慢]
- 使用并发或者并行的方式,将统计素数的任务分配给多个 goroutine 去完成,这时就会使用到goroutine.【速度提高 4 倍】
goroutine-基本介绍
进程和线程介绍
程序、进程和线程的关系示意图
并发和并行
并发和并行
- 多线程程序在单核上运行,就是并发
- 多线程程序在多核上运行,就是并行
- 示意图:
小结
Go 协程和 Go 主线程
Go 主线程(有程序员直接称为线程/也可以理解成进程): 一个 Go 线程上,可以起多个协程,你可以这样理解,协程是轻量级的线程[编译器做优化]。
Go 协程的特点
- 有独立的栈空间
- 共享程序堆空间
- 调度由用户控制
- 协程是轻量级的线程
一个示意图
goroutine-快速入门
案例说明
请编写一个程序,完成如下功能:
- 在主线程(可以理解成进程)中,开启一个 goroutine, 该协程每隔 1 秒输出 "hello,world"
- 在主线程中也每隔一秒输出"hello,golang", 输出 10 次后,退出程序
- 要求主线程和 goroutine 同时执行.
- 画出主线程和协程执行流程图
代码实现
输出的效果说明, main 这个主线程和 test 协程同时执行
主线程和协程执行流程图
快速入门小结
- 主线程是一个物理线程,直接作用在 cpu 上的。是重量级的,非常耗费 cpu 资源。
- 协程从主线程开启的,是轻量级的线程,是逻辑态。对资源消耗相对小。
- Golang 的协程机制是重要的特点,可以轻松的开启上万个协程。其它编程语言的并发机制是一般基于线程的,开启过多的线程,资源耗费大,这里就突显 Golang 在并发上的优势了
goroutine 的调度模型
MPG 模式基本介绍
MPG 模式运行的状态 1
MPG 模式运行的状态 2
设置 Golang 运行的 cpu 数
介绍:为了充分了利用多 cpu 的优势,在 Golang 程序中,设置运行的 cpu 数目
channel(管道)-看个需求
需求:现在要计算 1-200 的各个数的阶乘,并且把各个数的阶乘放入到 map 中。最后显示出来。要求使用 goroutine 完成
分析思路:
- 使用 goroutine 来完成,效率高,但是会出现并发/并行安全问题.
- 这里就提出了不同 goroutine 如何通信的问题
代码实现
-
使用 goroutine 来完成(看看使用 gorotine 并发完成会出现什么问题? 然后我们会去解决)
-
在运行某个程序时,如何知道是否存在资源竞争问题。 方法很简单,在编译该程序时,增加一个参数 -race 即可 [示意图]
-
代码实现:
package main
import (
"fmt"
"time"
)
// 需求:现在要计算 1-200 的各个数的阶乘,并且把各个数的阶乘放入到 map 中。
// 最后显示出来。要求使用 goroutine 完成
// 思路
// 1. 编写一个函数,来计算各个数的阶乘,并放入到 map 中.
// 2. 我们启动的协程多个,统计的将结果放入到 map 中
// 3. map 应该做出一个全局的.
var myMap = make(map[int]int, 10)
// test 函数就是计算 n!, 让将这个结果放入到 myMap
func test(n int) {
res := 1
for i := 1; i <= n; i++ {
res *= 1
}
//这里我们将 res 放入到 myMap
myMap[n] = res
}
func main() {
// 我们这里开启多个协程完成这个任务[200 个]
for i := 1; i <= 200; i++ {
go test(i)
}
//休眠 10 秒钟【第二个问题 】
time.Sleep(time.Second * 10)
//这里我们输出结果,变量这个结果
for i,v := range myMap {
fmt.Printf("map[%d]=%d",i,v)
}
}
- 示意图
不同 goroutine 之间如何通讯
-
全局变量的互斥锁
-
使用管道 channel 来解决
使用全局变量加锁同步改进程序
因为没有对全局变量 m 加锁,因此会出现资源争夺问题,代码会出现错误,提示 concurrent map writes
解决方案:加入互斥锁
我们的数的阶乘很大,结果会越界,可以将求阶乘改成 sum += uint64(i)
代码改进
为什么需要 channel
- 前面使用全局变量加锁同步来解决 goroutine 的通讯,但不完美
- 主线程在等待所有 goroutine 全部完成的时间很难确定,我们这里设置 10 秒,仅仅是估算。
- 如果主线程休眠时间长了,会加长等待时间,如果等待时间短了,可能还有 goroutine 处于工作状态,这时也会随主线程的退出而销毁
- 通过全局变量加锁同步来实现通讯,也并不利用多个协程对全局变量的读写操作。
- 上面种种分析都在呼唤一个新的通讯机制-channel
channel 的基本介绍
- channle 本质就是一个数据结构-队列【示意图】
- 数据是先进先出【FIFO : first in first out】
- 线程安全,多 goroutine 访问时,不需要加锁,就是说 channel 本身就是线程安全的
- channel 有类型的,一个 string 的 channel 只能存放 string 类型数据。
- 示意图:
定义/声明 channel
var 变量名 chan 数据类型
举例: var intChan chan int (intChan 用于存放 int 数据)
var mapChan chan map[int]string (mapChan 用于存放 map[int]string 类型)
var perChan chan Person
var perChan2 chan *Person ...
说明
channel 是引用类型
channel 必须初始化才能写入数据, 即 make 后才能使用
管道是有类型的,intChan 只能写入 整数 int
管道的初始化,写入数据到管道,从管道读取数据及基本的注意事项
package main
import (
"fmt"
)
func main() {
//演示一下管道的使用
//1. 创建一个可以存放 3 个 int 类型的管道
var intChan chan int
intChan = make(chan int, 3)
//2. 看看 intChan 是什么
fmt.Printf("intChan 的值=%v intChan 本身的地址=%p\n", intChan, &intChan)
//3. 向管道写入数据
intChan<- 10
num := 211
intChan<- num
intChan<- 50
// intChan<- 98//注意点, 当我们给管写入数据时,不能超过其容量
//4. 看看管道的长度和 cap(容量)
fmt.Printf("channel len= %v cap=%v \n", len(intChan), cap(intChan)) // 3, 3
//5. 从管道中读取数据
var num2 int
num2 = <-intChan
fmt.Println("num2=", num2)
fmt.Printf("channel len= %v cap=%v \n", len(intChan), cap(intChan)) // 2, 3
//6. 在没有使用协程的情况下,如果我们的管道数据已经全部取出,再取就会报告 deadlock
num3 := <-intChan
num4 := <-intChan
num5 := <-intChan
fmt.Println("num3=", num3, "num4=", num4, "num5=", num5)
}
channel 使用的注意事项
- channel 中只能存放指定的数据类型
- channle 的数据放满后,就不能再放入了
- 如果从 channel 取出数据后,可以继续放入
- 在没有使用协程的情况下,如果 channel 数据取完了,再取,就会报 dead lock
读写 channel 案例演示
管道的课后练习题
package main
import (
"fmt"
"math/rand"
"strconv"
"time"
)
type Person struct {
Name string
Age int
Address string
}
func main() {
channel := make(chan interface{}, 10)
rand.Seed(time.Now().UnixNano())
for i := 0; i < 10; i++ {
num := rand.Intn(100)
strnum := strconv.Itoa(num)
person := Person{
Name: "cbx"+strnum,
Age: num,
Address: "sz"+strnum,
}
channel <- person
}
for i := 0; i < cap(channel); i++ {
fmt.Println(<-channel)
}
}
channel 的遍历和关闭
channel 的关闭
使用内置函数 close 可以关闭 channel, 当 channel 关闭后,就不能再向 channel 写数据了,但是仍然可以从该 channel 读取数据
案例演示:
channel 的遍历
channel 支持 for--range 的方式进行遍历,请注意两个细节
- 在遍历时,如果 channel 没有关闭,则回出现 deadlock 的错误
- 在遍历时,如果 channel 已经关闭,则会正常遍历数据,遍历完后,就会退出遍历。
channel 遍历和关闭的案例演示
看代码演示:
应用实例 1
思路分析:
代码的实现:
package main
import (
"fmt"
_ "time"
)
//write Data
func writeData(intChan chan int) {
for i := 1; i <= 50; i++ {
//放入数据
intChan<- i
fmt.Println("writeData ", i)
//time.Sleep(time.Second)
}
close(intChan) //关闭
}
//read data
func readData(intChan chan int, exitChan chan bool) {
for {
v, ok := <-intChan
if !ok {
break
}
//time.Sleep(time.Second)
fmt.Printf("readData 读到数据=%v\n", v)
}
//readData 读取完数据后,即任务完成
exitChan<- true
close(exitChan)
}
func main() {
//创建两个管道
intChan := make(chan int, 50)
exitChan := make(chan bool, 1)
go writeData(intChan)
go readData(intChan, exitChan)
//time.Sleep(time.Second * 10)
for {
_, ok := <-exitChan
if !ok {
break
}
}
}
应用实例 2-阻塞
应用实例 3
需求: 要求统计 1-200000 的数字中,哪些是素数?这个问题在本章开篇就提出了,现在我们有 goroutine和 channel 的知识后,就可以完成了 [测试数据: 80000]
分析思路:
传统的方法,就是使用一个循环,循环的判断各个数是不是素数【ok】。
使用并发/并行的方式,将统计素数的任务分配给多个(4 个)goroutine 去完成,完成任务时间短。
画出分析思路
代码实现
package main
import (
"fmt"
"time"
)
//向 intChan 放入 1-8000 个数
func putNum(intChan chan int) {
for i := 1; i <= 8000; i++ {
intChan<- i
}
//关闭 intChan
close(intChan)
}
// 从 intChan 取出数据,并判断是否为素数,如果是,就
// //放入到 primeChan
func primeNum(intChan chan int, primeChan chan int, exitChan chan bool) {
//使用 for 循环
// var num int
var flag bool //
for {
time.Sleep(time.Millisecond * 10)
num, ok := <-intChan
if !ok { //intChan 取不到..
break
}
flag = true //假设是素数
//判断 num 是不是素数
for i := 2; i < num; i++ {
if num % i == 0 {//说明该 num 不是素数
flag = false
break
}
}
if flag {
//将这个数就放入到 primeChan
primeChan<- num
}
}
fmt.Println("有一个 primeNum 协程因为取不到数据,退出")
//这里我们还不能关闭 primeChan
//向 exitChan 写入 true
exitChan<- true
}
func main() {
intChan := make(chan int , 1000)
primeChan := make(chan int, 2000)//放入结果
//标识退出的管道
exitChan := make(chan bool, 4) // 4 个
//开启一个协程,向 intChan 放入 1-8000 个数
go putNum(intChan)
//开启 4 个协程,从 intChan 取出数据,并判断是否为素数,如果是,就
//放入到 primeChan
for i := 0; i < 4; i++ {
go primeNum(intChan, primeChan, exitChan)
}
//这里我们主线程,进行处理
//直接
go func(){
for i := 0; i < 4; i++ {
<-exitChan
}
//当我们从 exitChan 取出了 4 个结果,就可以放心的关闭 prprimeChan
close(primeChan)
}()
//遍历我们的 primeChan ,把结果取出
for {
res, ok := <-primeChan
if !ok{
break
}
//将结果输出
fmt.Printf("素数=%d\n", res)
}
fmt.Println("main 线程退出")
}
结论:使用 go 协程后,执行的速度,比普通方法提高至少 4 倍
channel 使用细节和注意事项
- channel 可以声明为只读,或者只写性质 【案例演示】
- channel 只读和只写的最佳实践案例
- 使用 select 可以解决从管道取数据的阻塞问题
package main
import (
"fmt"
"time"
)
func main() {
//使用 select 可以解决从管道取数据的阻塞问题
//1.定义一个管道 10 个数据 int
intChan := make(chan int, 10)
for i := 0; i < 10; i++ {
intChan<- i
}
//2.定义一个管道 5 个数据 string
stringChan := make(chan string, 5)
for i := 0; i < 5; i++ {
stringChan <- "hello" + fmt.Sprintf("%d", i)
}
//传统的方法在遍历管道时,如果不关闭会阻塞而导致 deadlock
//问题,在实际开发中,可能我们不好确定什么关闭该管道.
//可以使用 select 方式可以解决
//label:
for {
select {
//注意: 这里,如果 intChan 一直没有关闭,不会一直阻塞而 deadlock
//,会自动到下一个 case 匹配
case v := <-intChan :
fmt.Printf("从 intChan 读取的数据%d\n", v)
time.Sleep(time.Second)
case v := <-stringChan :
fmt.Printf("从 stringChan 读取的数据%s\n", v)
time.Sleep(time.Second)
default :
fmt.Printf("都取不到了,不玩了, 程序员可以加入逻辑\n")
time.Sleep(time.Second)
return
//break label
}
}
}
- goroutine 中使用 recover,解决协程中出现 panic,导致程序崩溃问题
代码实现:
package main
import (
"fmt"
"time"
)
//函数
func sayHello() {
for i := 0; i < 10; i++ {
time.Sleep(time.Second)
fmt.Println("hello,world")
}
}
//函数
func test() {
//这里我们可以使用 defer + recover
defer func() {
//捕获 test 抛出的 panic
if err := recover(); err != nil {
fmt.Println("test() 发生错误", err)
}
}()
//定义了一个 map
var myMap map[int]string
myMap[0] = "golang" //error
}
func main() {
go sayHello()
go test()
for i := 0; i < 10; i++ {
fmt.Println("main() ok=", i)
time.Sleep(time.Second)
}
}
sync.WaitGroup的用法
介绍
经常会看到以下了代码:
package main
import (
"fmt"
"time"
)
func main(){
for i := 0; i < 100 ; i++{
go fmt.Println(i)
}
time.Sleep(time.Second)
}
主线程为了等待goroutine都运行完毕,不得不在程序的末尾使用time.Sleep()
来睡眠一段时间,等待其他线程充分运行。对于简单的代码,100个for循环可以在1秒之内运行完毕,time.Sleep()
也可以达到想要的效果。
但是对于实际生活的大多数场景来说,1秒是不够的,并且大部分时候我们都无法预知for循环内代码运行时间的长短。这时候就不能使用time.Sleep()
来完成等待操作了。
可以考虑使用管道来完成上述操作:
func main() {
c := make(chan bool, 100)
for i := 0; i < 100; i++ {
go func(i int) {
fmt.Println(i)
c <- true
}(i)
}
for i := 0; i < 100; i++ {
<-c
}
}
首先可以肯定的是使用管道是能达到我们的目的的,而且不但能达到目的,还能十分完美的达到目的。
但是管道在这里显得有些大材小用,因为它被设计出来不仅仅只是在这里用作简单的同步处理,在这里使用管道实际上是不合适的。而且假设我们有一万、十万甚至更多的for
循环,也要申请同样数量大小的管道出来,对内存也是不小的开销。
对于这种情况,go语言中有一个其他的工具sync.WaitGroup
能更加方便的帮助我们达到这个目的。
WaitGroup
对象内部有一个计数器,最初从0开始,它有三个方法:Add(), Done(), Wait()
用来控制计数器的数量。Add(n)
把计数器设置为n
,Done()
每次把计数器-1
,wait()
会阻塞代码的运行,直到计数器地值减为0
。
使用WaitGroup
将上述代码可以修改为:
func main() {
wg := sync.WaitGroup{}
wg.Add(100)
for i := 0; i < 100; i++ {
go func(i int) {
fmt.Println(i)
wg.Done()
}(i)
}
wg.Wait()
}
这里首先把wg
计数设置为100
, 每个for
循环运行完毕都把计数器减一,主函数中使用Wait()
一直阻塞,直到wg
为零——也就是所有的100
个for
循环都运行完毕。相对于使用管道来说,WaitGroup
轻巧了许多。
注意事项
- 计数器不能为负值
add不是设置,是加上;是计数器不能为负值,不是add不能为负值,add可以为负的
- WaitGroup对象不是一个引用类型
WaitGroup
对象不是一个引用类型,在通过函数传值的时候需要使用地址:
func main() {
wg := sync.WaitGroup{}
wg.Add(100)
for i := 0; i < 100; i++ {
go f(i, &wg)
}
wg.Wait()
}
// 一定要通过指针传值,不然进程会进入死锁状态
func f(i int, wg *sync.WaitGroup) {
fmt.Println(i)
wg.Done()
}
作业
package main
import (
"fmt"
)
type Result struct {
N int
Sum int
}
func MakeNumChan(ch chan int) {
for i := 1; i <= 2000; i++ {
ch <- i
}
close(ch)
}
func GetSum(n int,ch chan Result) {
sum := 0
for i := 1; i <= n; i++ {
sum += i
}
res := Result{
N: n,
Sum: sum,
}
ch <- res
}
func GetSumRange(numCh chan int,start int,end int,ch chan Result,exitCh chan int) {
for i := start; i < end; i++ {
n := <- numCh
GetSum(n,ch)
}
exitCh <- 1
}
func main() {
numCh := make(chan int, 0)
// 启动协程输入1-2000的数
go MakeNumChan(numCh)
resCh := make(chan Result, 2000)
exitCh := make(chan int,8)
// 分8个协程计算和,并存入到resCh
oneBatch := 2000 / 8
for i := 0; i < 8; i++ {
go GetSumRange(numCh,i * oneBatch,(i+1) * oneBatch,resCh,exitCh)
}
for {
if len(exitCh) == 8 {
close(exitCh)
close(resCh)
break
}
}
for v := range resCh {
fmt.Printf("res[%v]=%v\n",v.N,v.Sum)
}
}
package main
import (
"bufio"
"fmt"
"io"
"log"
"math/rand"
"os"
"sort"
"strconv"
"strings"
"time"
)
func WriteDataToFile(fileIndex int,writeOkCh chan int) {
fileName := fmt.Sprintf("F:/files/%v.txt",fileIndex)
f,err := os.OpenFile(fileName,os.O_CREATE|os.O_WRONLY,0666)
if err != nil {
log.Fatalf("打开文件失败,%v!!\n",err)
}
defer f.Close()
rand.Seed(int64(time.Now().Nanosecond()))
for i := 0; i < 1000; i++ {
ele := rand.Intn(1000)
s := fmt.Sprintf("%v\n",ele)
f.WriteString(s)
}
writeOkCh <- 1
}
func Sort(fileIndex int,sortOkCh chan int) {
fileName := fmt.Sprintf("F:/files/%v.txt",fileIndex)
f, err := os.Open(fileName)
if err != nil {
log.Fatalf("打开文件失败,%v!!\n",err)
}
defer f.Close()
reader := bufio.NewReader(f)
intArr := make([]int,0)
for {
str, err := reader.ReadString('\n')
if err == io.EOF {
break
}
str = strings.ReplaceAll(str,"\n","")
ele,err := strconv.Atoi(str)
if err != nil{
log.Fatalf("转换数字失败,%v!!\n",err)
}
intArr = append(intArr,ele)
}
fmt.Println("len of intArr: ",len(intArr))
sort.Ints(intArr)
wFileName := fmt.Sprintf("F:/files/2%v.txt",fileIndex)
wf, err := os.OpenFile(wFileName, os.O_CREATE|os.O_WRONLY, 0666)
if err != nil {
log.Fatalf("打开文件失败,%v!!\n",err)
}
defer wf.Close()
for _, val := range intArr {
s := fmt.Sprintf("%v\n",val)
wf.WriteString(s)
}
sortOkCh <- 1
}
func main() {
writeOkCh := make(chan int,10)
for i := 1; i <= 10; i++ {
go WriteDataToFile(i,writeOkCh)
}
for i := 0; i < 10; i++ {
<- writeOkCh
}
sortOkCh := make(chan int,10)
for i := 1; i <= 10; i++ {
go Sort(i,sortOkCh)
}
for i := 0; i < 10; i++ {
<- sortOkCh
}
}
评论