funcdoWork(id int, c <-chanint, done chan<- bool) { for n := range c { fmt.Printf("Worker %d received %c\n", id, n) done <- true } }
funccreateWorker(id int) worker { w := worker{ c: make(chanint), done: make(chanbool), } go doWork(id, w.c, w.done) return w }
const n = 5
funcmain() { var workers [n]worker
for i := 0; i < n; i++ { workers[i] = createWorker(i) }
for i := 0; i < n; i++ { workers[i].c <- 'a' + i }
for i := 0; i < n; i++ { workers[i].c <- 'A' + i }
for i := 0; i < n; i++ { <-workers[i].done <-workers[i].done }
time.Sleep(time.Second) }
Worker 0 received a
Worker 2 received c
Worker 1 received b
Worker 3 received d
Worker 4 received e
fatal error: all goroutines are asleep - deadlock!
…
发生阻塞!
solution 1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
// original version funcdoWork(id int, c <-chanint, done chan<- bool) { for n := range c { fmt.Printf("Worker %d received %c\n", id, n) gofunc() { done <- true }() } }
// modified version funcdoWork(id int, c <-chanint, done chan<- bool) { for n := range c { fmt.Printf("Worker %d received %c\n", id, n) gofunc() { done <- true }() } }
有close(c)时:当通道被关闭后,worker goroutine 仍然尝试接收数据,但因为通道已关闭且没有更多数据,<-c 会返回通道的零值(对于 int 是 0),导致不停地打印 Worker 0 received。
配合 close(),其他写法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// range funcdoWork(id int, c chanint) { for n := range c { // When c is closed and has no data, range automatically exits. fmt.Printf("Worker %d received %c\n", id, n) } }
// if-ok funcdoWork(id int, c chanint) { for { n, ok := <-c // Explicitly check if the channel is closed if !ok { // If c is closed and has no data, exit the loop break } fmt.Printf("Worker %d received %c\n", id, n) } }
除此之外,还有:
sync.Mutex
sync.Cond
8.2 并发模式
生产者:
1 2 3 4 5 6 7 8 9 10 11 12
funcmsgGen(name string)chanstring { c := make(chanstring) gofunc() { i := 0 for { time.Sleep(time.Duration(rand.Intn(2000)) * time.Millisecond) // execution of mock business c <- fmt.Sprintf("service %s: message %d", name, i) i++ } }() return c }
消费者:
单个消费者:
1 2 3 4 5 6 7 8 9 10 11 12 13
funcfanInByLoop(chs ...chanstring)chanstring { m := make(chanstring) for _, cs := range chs { gofunc(cs chanstring) { var s string for { s = <-cs m <- s } }(cs) } return m }
多个消费者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
funcfanInBySelect(m1, m2, m3 chanstring)chanstring { m := make(chanstring) gofunc() { var s string for { select { case s = <-m1: m <- s case s = <-m2: m <- s case s = <-m3: m <- s } } }() return m }
funcfanInByLoop(chs ...chanstring)chanstring { m := make(chanstring) for _, cs := range chs { gofunc(cs chanstring) { for { s := <-cs m <- s } }(cs) } return m }
funcfanInByLoop(chs ...chanstring)chanstring { m := make(chanstring) for _, cs := range chs { var s string gofunc(cs chanstring) { for { s = <-cs m <- s } }(cs) } return m }
funcfanInByLoop(chs ...chanstring)chanstring { m := make(chanstring) for _, cs := range chs { gofunc(cs chanstring) { var s string for { s = <-cs m <- s } }(cs) } return m }
funcmsgGen(name string)chanstring { c := make(chanstring) gofunc() { i := 0 for { time.Sleep(time.Duration(rand.Intn(2000)) * time.Millisecond) // execution of mock business c <- fmt.Sprintf("service %s: message %d", name, i) i++ } }() return c }
funcnonBlockWait(c chanstring) (string, bool) { select { case s := <-c: return s, true default: return"", false } }
funcmain() { m1 := msgGen("service1") m2 := msgGen("service2") for { fmt.Println(<-m1) if s, ok := nonBlockWait(m2); ok { fmt.Println(s) } else { fmt.Println("no message received from m2") } } }
service service1: message 0
service service2: message 0
service service1: message 1
no message received from m2
service service1: message 2
service service2: message 1
service service1: message 3
no message received from m2
service service1: message 4
service service2: message 2
service service1: message 5
no message received from m2
service service1: message 6
service service2: message 3
service service1: message 7
no message received from m2
service service1: message 8
…
8.3.2 超时机制
1
time.Sleep(time.Duration(rand.Intn(2000)) * time.Millisecond) // execution of mock business
funcmsgGen(name string)chanstring { c := make(chanstring) gofunc() { i := 0 for { time.Sleep(time.Duration(rand.Intn(2000)) * time.Millisecond) // execution of mock business c <- fmt.Sprintf("service %s: message %d", name, i) i++ } }() return c }
functimeoutWait(c chanstring, timeout time.Duration) (string, bool) { select { case s := <-c: return s, true case <-time.After(timeout): return"", false } }
funcmain() { m := msgGen("service") for { if s, ok := timeoutWait(m, 1*time.Second); ok { fmt.Println(s) } else { fmt.Println("timeout") } } }