并发-学习Go语言

阅读笔记

goroutines.go

package main

import (
    "fmt"
    "time"
)

func say(s string) {
    for i := 0; i < 5; i++ {
        time.Sleep(100 * time.Millisecond)
        fmt.Println(s)
    }
}

func main() {
    go say("world")
    say("hello")
}

笔记

Go 程(goroutine)是由 Go 运行时管理的轻量级线程。

go f(x, y, z)
会启动一个新的 Go 程并执行

f(x, y, z)
f, x, y 和 z 的求值发生在当前的 Go 程中,而 f 的执行发生在新的 Go 程中。

Go 程在相同的地址空间中运行,因此在访问共享的内存时必须进行同步。sync 包提供了这种能力,不过在 Go 中并不经常用到,因为还有其它的办法(见下一页)。

思考
1. 多次执行的顺序都是一致的。所以说和线程基本上不会乱序执行的?
2. 这个问题很奇怪

验证

为了方便验证我的想法,重新改写了代码如下(时间单元为纳秒):

//例子1
package main

import (
    "fmt"
    "time"
)

func say1(s string) {
    for i := 0; i < 5; i++ {
        fmt.Println("In say1()-",i,time.Now().UnixNano())
        time.Sleep(100 * time.Millisecond)
        fmt.Println(s)
        fmt.Println("end say1()",i,time.Now().UnixNano())
    }
}

func say2(s string) {
    for i := 0; i < 5; i++ {
        fmt.Println("In say2()-",i,time.Now().UnixNano())
        time.Sleep(100 * time.Millisecond)
        fmt.Println(s)
        fmt.Println("end say2()",i,time.Now().UnixNano())
    }
}

func main() {
    fmt.Println("In main()---",time.Now().UnixNano())
    go say1("world")
    say2("hello")
}

输出
In main()--- 1540706604305141891
In say2()- 0 1540706604305181351
In say1()- 0 1540706604305219272
hello
world
end say1() 0 1540706604405319232
In say1()- 1 1540706604405324817
end say2() 0 1540706604405313310
In say2()- 1 1540706604405345566
hello
end say2() 1 1540706604505434102
In say2()- 2 1540706604505443829
world
end say1() 1 1540706604505498411
In say1()- 2 1540706604505508293
hello
end say2() 2 1540706604605564777
In say2()- 3 1540706604605575973
world
end say1() 2 1540706604605609988
In say1()- 3 1540706604605615117
world
end say1() 3 1540706604705700643
In say1()- 4 1540706604705708692
hello
end say2() 3 1540706604705738341
In say2()- 4 1540706604705742821
world
end say1() 4 1540706604805845281
hello
end say2() 4 1540706604805858926

//例子2
package main

import (
    "fmt"
    "time"
)

func main() {
    fmt.Println("In main()")
    go longWait()
    go shortWait()
    fmt.Println("About to sleep in main()")
    // sleep works with a Duration in nanoseconds (ns) !
    time.Sleep(10 * 1e9)
    fmt.Println("At the end of main()")
}

func longWait() {
    fmt.Println("Beginning longWait()")
    time.Sleep(5 * 1e9) // sleep for 5 seconds
    fmt.Println("End of longWait()")
}

func shortWait() {
    fmt.Println("Beginning shortWait()")
    time.Sleep(2 * 1e9) // sleep for 2 seconds
    fmt.Println("End of shortWait()")
}

验证的思考

整理下上面这例子的执行过程:
1. main函数执行两个say函数,分别在不同的Go 协程执行。
2. say1 执行时,进入等待,这时会被go挂起,进入到say2,进入等待,被系统挂起,这时say1的等待时间到了,会先输出world。
3. 每次执行结果不一样,为什么?难道是因为部分CPU乱序执行,其他CPU顺序执行?

channels.go

package main

import "fmt"

func sum(s []int, c chan int) {
    sum := 0
    for _, v := range s {
        sum += v
    }
    c <- sum // 将和送入 c
}

func main() {
    s := []int{7, 2, 8, -9, 4, 0}

    c := make(chan int)
    go sum(s[:len(s)/2], c)
    go sum(s[len(s)/2:], c)
    x, y := <-c, <-c // 从 c 中接收

    fmt.Println(x, y, x+y)
}

笔记

信道是带有类型的管道,你可以通过它用信道操作符 <- 来发送或者接收值。

ch <- v    // 将 v 发送至信道 ch。
v := <-ch  // 从 ch 接收值并赋予 v。
(“箭头”就是数据流的方向。)

和映射与切片一样,信道在使用前必须创建:

ch := make(chan int)
默认情况下,发送和接收操作在另一端准备好之前都会阻塞。这使得 Go 程可以在没有显式的锁或竞态变量的情况下进行同步。

以下示例对切片中的数进行求和,将任务分配给两个 Go 程。一旦两个 Go 程完成了它们的计算,它就能算出最终的结果。

总结

  1. 申明一个sum函数,第一个参数为要计算的数组,第二个参数为协程,函数循环处理数组元素,将结果发送到channel中。
  2. main函数的主要作用,将数组切分2半,分别调用不同的go协程,将计算完的数据获取,统一计算结果。

buffered-channels.go

package main

import "fmt"

func main() {
    ch := make(chan int, 2)
    ch <- 1
    ch <- 2
    fmt.Println(<-ch)
    fmt.Println(<-ch)
}

笔记

信道可以是 带缓冲的。将缓冲长度作为第二个参数提供给 make 来初始化一个带缓冲的信道:

ch := make(chan int, 100)
仅当信道的缓冲区填满后,向其发送数据时才会阻塞。当缓冲区为空时,接受方会阻塞。

修改示例填满缓冲区,然后看看会发生什么。
  1. 那默认的buffer是多大的呢?
  2. 例子中,只有发送2个数据,如果再多发送一个呢?实际上会挂掉。

range-and-close.go

package main

import (
    "fmt"
)

func fibonacci(n int, c chan int) {
    x, y := 0, 1
    for i := 0; i < n; i++ {
        c <- x
        x, y = y, x+y
    }
    close(c)
}

func main() {
    c := make(chan int, 10)
    go fibonacci(cap(c), c)
    for i := range c {
        fmt.Println(i)
    }
}

笔记

发送者可通过 close 关闭一个信道来表示没有需要发送的值了。接收者可以通过为接收表达式分配第二个参数来测试信道是否被关闭:若没有值可以接收且信道已被关闭,那么在执行完

v, ok := <-ch
之后 ok 会被设置为 false。

循环 for i := range c 会不断从信道接收值,直到它被关闭。

*注意:* 只有发送者才能关闭信道,而接收者不能。向一个已经关闭的信道发送数据会引发程序恐慌(panic)。

*还要注意:* 信道与文件不同,通常情况下无需关闭它们。只有在必须告诉接收者不再有值需要发送的时候才有必要关闭,例如终止一个 range 循环。
  1. 例子中斐波那契函数,计算指定数以内的队列,计算完成后,会关闭信道。
  2. main函数中,创建一个缓冲为10的信道,然后执行一个协程,不断从信道获取计算的数字
  3. 我理解的信道类似于一个全局的栈,存放可访问的变量。

select.go

package main

import "fmt"

func fibonacci(c, quit chan int) {
    x, y := 0, 1
    for {
        select {
        case c <- x:
            x, y = y, x+y
        case <-quit:
            fmt.Println("quit")
            return
        }
    }
}

func main() {
    c := make(chan int)
    quit := make(chan int)
    go func() {
        for i := 0; i < 10; i++ {
            fmt.Println(<-c)
        }
        quit <- 0
    }()
    fibonacci(c, quit)
}

笔记

select 语句使一个 Go 程可以等待多个通信操作。

select 会阻塞到某个分支可以继续执行为止,这时就会执行该分支。当多个分支都准备好时会随机选择一个执行。
  1. 这,这咋理解呢?从不同的信道选择发送数据的那个信道。
  2. 在select中,理论上c <- x又是简写吧。<- c and c <- x 选择c,并把x的值发送到信道c
  3. main函数中,创建2个信道,执行一个匿名函数,获取从c信道接受到的数字。
  4. 如果信道没有发送新的数据过来,是否会一直阻塞?

default-selection.go

package main

import (
    "fmt"
    "time"
)

func main() {
    tick := time.Tick(100 * time.Millisecond)
    boom := time.After(500 * time.Millisecond)
    for {
        select {
        case <-tick:
            fmt.Println("tick.")
        case <-boom:
            fmt.Println("BOOM!")
            return
        default:
            fmt.Println("    .")
            time.Sleep(50 * time.Millisecond)
        }
    }
}

笔记

当 select 中的其它分支都没有准备好时,default 分支就会执行。

为了在尝试发送或者接收时不发生阻塞,可使用 default 分支:

select {
case i := <-c:
    // 使用 i
default:
    // 从 c 中接收会阻塞时执行
}
  1. 如何定义分支是否准备好?是否有数据发送过来就算准备好?
  2. 默认的信道是哪里来的?或许根本就默认信道这一说,基本上就是执行默认的逻辑。
  3. main函数,每50ms执行输出一个点号,等到100ms的时候就接受到了tick信道的数据,输出tick,等到500ms的接受到boom信道的数据,输出boom。

mutex-counter.go

package main

import (
    "fmt"
    "sync"
    "time"
)

// SafeCounter 的并发使用是安全的。
type SafeCounter struct {
    v   map[string]int
    mux sync.Mutex
}

// Inc 增加给定 key 的计数器的值。
func (c *SafeCounter) Inc(key string) {
    c.mux.Lock()
    // Lock 之后同一时刻只有一个 goroutine 能访问 c.v
    c.v[key]++
    c.mux.Unlock()
}

// Value 返回给定 key 的计数器的当前值。
func (c *SafeCounter) Value(key string) int {
    c.mux.Lock()
    // Lock 之后同一时刻只有一个 goroutine 能访问 c.v
    defer c.mux.Unlock()
    return c.v[key]
}

func main() {
    c := SafeCounter{v: make(map[string]int)}
    for i := 0; i < 1000; i++ {
        go c.Inc("somekey")
    }

    time.Sleep(time.Second)
    fmt.Println(c.Value("somekey"))
}

 笔记

我们已经看到信道非常适合在各个 Go 程间进行通信。

但是如果我们并不需要通信呢?比如说,若我们只是想保证每次只有一个 Go 程能够访问一个共享的变量,从而避免冲突?

这里涉及的概念叫做 _互斥(mutual_exclusion)_ ,我们通常使用 _互斥锁(Mutex)_ 这一数据结构来提供这种机制。

Go 标准库中提供了 sync.Mutex 互斥锁类型及其两个方法:

Lock
Unlock
我们可以通过在代码前调用 Lock 方法,在代码后调用 Unlock 方法来保证一段代码的互斥执行。参见 Inc 方法。

我们也可以用 defer 语句来保证互斥锁一定会被解锁。参见 Value 方法。
  1. 这个例子比较复杂。
  2. 首先定一个一个SafeCounter的结构体,有两个字段,一个是映射类型,key为string类型,值为int类型。另外一个字段是互斥锁类型。
  3. 给结构体增加Inc的方法,指定key的值+1
  4. 增加Value方法,获取时也加锁。
  5. main函数,创建一个映射,初始化SafeCounter的v值,循环1000次执行+1的操作,每次都是新的协程运行。
  6. 如果获取时不加锁呢?

参考链接

  1. Go 程
  2. 信道
  3. 带缓冲的信道
  4. range 和 close
  5. select 语句
  6. 默认选择
  7. 并发、并行和协程
  8. 英文地址 Part 22: Channels Part 23: Buffered Channels and Worker Pools 22.信道 23.缓冲信道和工作池
  9. Golang tutorial series

发表评论

电子邮件地址不会被公开。 必填项已用*标注