golang多线程与异步关注点

Overview

多线程

一个典型的多线程模型是生产者-消费者,多个生产者线程往一个queue/chan里面写数据,然后另一侧多个消费者线程从queue/chan里面读数据。

异步

生产者-消费者模型了,如何判断生产者成功写入一条数据?

  • 同步,record被完全确认(这里的确认可以是来自queue/chan侧,也可以是来自consumer侧)
  • 异步,生产者只管往queue/chan里面发,暂时不管后果
    • 如果完全不管后果,即不管queue/chan crash/full与否,都一直发
    • 根据统计来决定自己(生产者)是否要中断发送(send timeout stats)

code解析

package main

import (
	"context"
	"fmt"
	"time"
)

func main() {
	//go f1()
	//go f2()
	//go f3()
	//go f4()
	//go f5()
	go f6()

	fmt.Println("let's go")
	select {}
}

// normal
func f1() {
	c := make(chan int, 4) // buffer channel
	go func(c chan int) {
		var i = -1
		for {
			i += 1
			ctx, _ := context.WithTimeout(context.Background(), time.Duration(5*time.Second)) // if sending to channel will take too long, then timeout

			fmt.Printf("\ncurrent num is : %d\n", i)
			select {
			case c <- i:
				fmt.Printf("sent success : %d\n", i)
			case <-ctx.Done():
				fmt.Printf("sent failed : %d, since timeout\n", i)
			}
		}
	}(c)

	go func(c chan int) {
		for e := range c {
			fmt.Printf("receive : %d\n", e)
		}
	}(c)
}

// timeout since channel buffer full-blocking
func f2() {
	c := make(chan int, 4)
	go func(c chan int) {
		var i = -1
		for {
			i += 1
			ctx, _ := context.WithTimeout(context.Background(), time.Duration(5*time.Second))

			fmt.Printf("\ncurrent num is : %d\n", i)
			select {
			case c <- i:
				fmt.Printf("sent success : %d\n", i)
			case <-ctx.Done():
				fmt.Printf("sent failed : %d, since timeout\n", i)
			}
		}
	}(c)
}

// blocking but without timeout since go through default directly
// will lost data, if going to `default`
func f3() {
	c := make(chan int, 4)
	go func(c chan int) {
		var i = -1
		for {
			i += 1
			ctx, _ := context.WithTimeout(context.Background(), time.Duration(5*time.Second))

			fmt.Printf("\ncurrent num is : %d\n", i)
			select {
			case c <- i:
				fmt.Printf("sent success : %d\n", i)
			case <-ctx.Done():
				fmt.Printf("sent failed : %d, since timeout\n", i)
			default:
				fmt.Printf("sent unknown : %d, since default\n", i)
			}
		}
	}(c)

	go func(c chan int) {
		for {
			num := <-c
			fmt.Printf("receive : %d\n", num)
		}
	}(c)
}

// 有部分会跑到default去,导致没有sent,也就没有receive
func f4() {
	c := make(chan int, 7)
	go func(c chan int) {
		var i = -1
		for {
			i += 1
			ctx, _ := context.WithTimeout(context.Background(), time.Duration(5*time.Second))

			fmt.Printf("\ncurrent num is : %d\n", i)
			select {
			case c <- i:
				time.Sleep(1 * time.Second) //每秒产生一个
				fmt.Printf("sent success : %d\n", i)
			case <-ctx.Done():
				fmt.Printf("sent failed : %d, since timeout\n", i)
			default:
				time.Sleep(1 * time.Second) //每秒产生一个
				fmt.Printf("sent unknown : %d, since default\n", i)
			}
		}
	}(c)

	go func(c chan int) {
		for e := range c {
			time.Sleep(2 * time.Second) //每2秒才消费一个
			fmt.Printf("receive : %d\n", e)
		}
	}(c)
}

// 直接丢掉被buff的部分,直到consumer把buffer降低,才能再塞入一个
func f5() {
	c := make(chan int, 7)
	go func(c chan int) {
		var i = -1
		for {
			i += 1
			ctx, _ := context.WithTimeout(context.Background(), time.Duration(5*time.Second))

			select {
			case c <- i:
				fmt.Printf("sent success : %d\n", i)
			case <-ctx.Done():
				fmt.Printf("sent failed : %d, since timeout\n", i)
			default:
			}
		}
	}(c)

	go func(c chan int) {
		for e := range c {
			time.Sleep(2 * time.Second) //每2秒才消费一个
			fmt.Printf("receive : %d\n", e)
		}
	}(c)
}

// 不丢弃,blocking
func f6() {
	c := make(chan int, 7)
	go func(c chan int) {
		var i = -1
		for {
			i += 1
			ctx, _ := context.WithTimeout(context.Background(), time.Duration(5*time.Second))

			select {
			case c <- i:
				fmt.Printf("sent success : %d\n", i)
			case <-ctx.Done():
				fmt.Printf("sent failed : %d, since timeout\n", i)
				//default:
			}
		}
	}(c)

	go func(c chan int) {
		for e := range c {
			time.Sleep(2 * time.Second) //每2秒才消费一个
			fmt.Printf("receive : %d\n", e)
		}
	}(c)
}

上面的code,有几个key point,

  • buffer channel,如果queue/chan满了,会产生阻塞blocking,而此时如果生产者是同步的话,那么就一直hold在此。而如果生产者是异步的话,该thread/goroutine也会一直hold
  • timeout,为了避免阻塞而导致的thread/goroutine数量溢出,通常可以加上case <-ctx.Done()来控制该thread/goroutine的全部生命周期
  • default,加入default可以避免阻塞,在select的顺序里面,default是2nd,而1st是能够通过的case,如果多个case通过,就伪随机从这些通过的case里面选一个,而如果没有case通过,那么就走default
    • 所以通常1st-case里面queue/chan crash/full了,导致该case失败,所以才走的default[2]

illustration

image

from unh

image

from datastax

image

Reference

  1. 单向channel
  2. Priority of case versus default in golang select statements
  3. Asynchronous programming with Go
  4. Anatomy of Channels in Go - Concurrency in Go
    • Adding timeout
    • WaitGroup
    • Mutex
  5. Go 微服务,第11部分:Hystrix和Resilience
  6. Testing for asynchronous results without sleep in Go
  7. hystrix-go