SYM's Tech Knowledge Index & Creation Records

「INPUT:OUTPUT=1:1以上」を掲げ構築する Tech Knowledge Stack and Index. by SYM@設計者足るため孤軍奮闘する IT Engineer.

Golang 並列処理(Gorutine/Channel/WaitGroup/ErrorGroup)

Golang 並列処理(Gorutine/Channel/WaitGroup/ErrorGroup)

Go の魅力

  • 他の言語は後から並列処理の機構を組み込むと大手術になることがあるが、Go は容易
  • 高水準のパフォーマンスが出るコードを少ない手間で実現できるところ
  • I/O コストが高い領域は Go との相性が良い

Go の業務アプリケーションで並列処理の適用を検討すべき場面は、1 リクエスト/バッチタスクの内部を高速化したい時。(例:1リクエスト中で複数データストアから情報取得し、結果を複数箇所に格納が必要な時)

Gorutine

  • 個々の goroutine は識別不可
  • 優先度や親子関係はない
  • 外部から終了させられない
  • 終了検知には別の仕組みが必要(channel?)
  • かなり少ない量のメモリしか要求せず、起動は高速
    • 起動コストはゼロではない

goroutine の乱用は避ける

  • 並列処理は複雑さと高める
  • goroutine を駆使したコードは意図が伝わりにくい
  • 基本的には標準/準標準パッケージ機能の利用を検討
func main() {
    go output("goroutine")

    go func(msg string) {
        fmt.Println(msg)
    }("immediate execution")
}

Channel

  • 同時実行する goroutine を接続するパイプ(複数の goroutine から送受信しても安全が保障されたキュー)
  • Channel は同期の手段
    • Channel は goroutine をブロックする
    • 送信 goroutine と受信 goroutine が揃うまでブロック(バッファなしの場合)
    • 送信側のバッファ一杯になると受信側が取りに来るまで or バッファが空ならブロック(バッファありの場合)

※ブロック=待ち受け

func main() {
    msgs := make(chan string, 3)
    msgs <- "main"
    go func() {
        msgs <- "func1"
    }()
    go func() {
        msgs <- "func2"
    }()

    msg1 := <-msgs  // channelから値を読み込むまでメインgoroutineストップ
    msg2 := <-msgs
    msg3 := <-msgs
    fmt.Println(msg1, msg2, msg3)  // main func2 func1
}
  • 一方向チャネル型
    • チャネルの向きを指定できる。向き:送信/受信
func send(recvCh chan<- string, msg string) {
    recvCh <- msg
}

func receive(sendCh <-chan string, recvCh chan<- string) {
    msg := <-sendCh
    recvCh <- msg
}

func main() {
    ch1 := make(chan string)
    ch2 := make(chan string)

    // send(ch1, "sending")  deadlock..
    // receive(ch1, ch2)
    go send(ch1, "sending")
    go receive(ch1, ch2)
    fmt.Println(<-ch2)
}
  • キャパシティが一杯のチャネルに書き込もうとするゴルーチンは、チャネルの空きが出るまで待機する
  • 空のチャネルから読み込もうとするゴルーチンは、チャネルに要素が入ってくるまで待機する
func main() {
    // send slow
    ch1 := make(chan string, 2)
    go func() {
        for i := 0; i < 6; i++ {
            send := "send" + strconv.Itoa(i)
            ch1 <- send
            time.Sleep(1 * time.Second)
        }
    }()
    go func() {
        for j := 0; j < 3; j++ {
            fmt.Println("sub:", j, <-ch1)
        }
    }()
    for j := 0; j < 3; j++ {
        fmt.Println("main:", j, <-ch1)
    }
    time.Sleep(3 * time.Second)
}
//main: 0 send0
//sub: 0 send1
//main: 1 send2
//sub: 1 send3
//main: 2 send4
//sub: 2 send5

Select

複数チャネルの待ち受けかつチャネル毎の制御ができる。

  • 複数 goroutine の待ち受け可
  • 先に終わったものから捌く
func main() {
    ch1 := make(chan string)
    ch2 := make(chan string)
    // ch3 := make(chan string)
    go func() {
        time.Sleep(1 * time.Second)
        ch1 <- "one"
    }()
    go func() {
        time.Sleep(2 * time.Second)
        ch2 <- "two"
    }()

    for i := 0; i < 2; i++ {
        select {
        case msg1 := <-ch1:
            fmt.Println("received1", msg1)
        case msg2 := <-ch2:
            fmt.Println("received2", msg2)
        // default:
        // fmt.Println("none")
        // default句を入れると待ち受けが起こらず全てnoneが出力
        // default句は何も送受信がなかった時の処理
        // goroutine起動のタイムラグによるすり抜け
        }
    }
}
// received1 one
// received2 two

timeout

func main() {

    ch1 := make(chan string, 1)
    go func() {
        time.Sleep(2 * time.Second)
        ch1 <- "sending 1"
    }()

    select {
    case res := <-ch1:
        fmt.Println(res)
    case <-time.After(1 * time.Second):
        fmt.Println("timeout 1")
    } // timeout

    ch2 := make(chan string, 1)
    go func() {
        time.Sleep(2 * time.Second)
        ch2 <- "sending 2"
    }()
    select {
    case res := <-ch2:
        fmt.Println(res)
    case <-time.After(3 * time.Second):
        fmt.Println("timeout 2")
    } // success
}

Non-Blocking Channel Operation

  • バッファなし channel のため最初からブロック
  • <-readCh : Channel から値読み込みを永遠に待ち続ける = 値書き込みがなければ永続待機 = deadlock 発生
func main() {
    // var writeCh chan<- string
    var readCh <-chan string
    ch := make(chan string)
    readCh = ch

    go func() {
        // writeCh <- "Writing..."
    }()

    fmt.Println(<-readCh)
}
// fatal error: all goroutines are asleep - deadlock!

上記ケースを制御できるのが default

func main() {
    messages := make(chan string)
    signals := make(chan bool)

    select {
    case msg := <-messages:
        fmt.Println("received message", msg)
    default:
        fmt.Println("no message received")
    }

    msg := "hi"
    select {
    case messages <- msg:
        fmt.Println("sent message", msg)
    default:
        fmt.Println("no message sent")
    }

    select {
    case msg := <-messages:
        fmt.Println("received message", msg)
    case sig := <-signals:
        fmt.Println("received signal", sig)
    default:
        fmt.Println("no activity")
    }
}

close

(チャネル受信待ちなど) ブロック中の goroutine を解放

func main() {
    jobs := make(chan int, 5)
    done := make(chan bool)

    go func() {
        for {
            j, more := <-jobs
            if more {
                fmt.Println("received job", j)
            } else {
                fmt.Println("received all jobs")
                done <- true
                return
            }
        }
    }()

    for j := 1; j <= 3; j++ {
        jobs <- j
        fmt.Println("sent job", j)
    }
    close(jobs) // ループも解除
    time.Sleep(time.Second)
    fmt.Println("sent all jobs")

    <-done
}

クローズすることで、range を用いて channel から取り出すことができる

※クローズしなければ受信待ちによるブロックで deadlock

func main() {
    queue := make(chan string, 2)
    queue <- "one"
    queue <- "two"
    close(queue) // 削除するとdeadlock

    for elem := range queue {
        fmt.Println(elem)
    }
}

worker

func worker(id int, jobs <-chan int, results chan<- int) {
    for j := range jobs {
        fmt.Println("worker", id, "started  job", j)
        time.Sleep(time.Second)
        fmt.Println("worker", id, "finished job", j)
        results <- j * 2
    }
}

func main() {
    const jobNum = 5
    jobs := make(chan int, numJobs)  // ★バッファ有無で挙動が変わる
    results := make(chan int, numJobs)  // バッファ指定なくすとdeadlock

    for w := 1; w <= 3; w++ {
        go worker(w, jobs, results)
    }
    // go func() {
    for j := 1; j <= jobNum; j++ {
        jobs <- j
    }
    // }
    fmt.Println("sleep...")
    time.Sleep(2 * time.Second)
    fmt.Println("sleep end")
    close(jobs)

    for a := 1; a <= jobNum; a++ {
        fmt.Println(<-results)
    }
}
// ★ バッファ指定なしの場合
// worker 3 started  job 1
// worker 1 started  job 2
// worker 2 started  job 3
// worker 1 finished job 2
// worker 1 started  job 4
// worker 2 finished job 3
// worker 2 started  job 5
// worker 3 finished job 1
// sleep...
// worker 1 finished job 4
// worker 2 finished job 5
// sleep end
// 4
// 6
// 2
// 8
// 10

// ★ バッファなしでも jobs <- j のループを別goroutineにすると以下
// sleep...
// worker 1 started  job 1
// worker 2 started  job 2
// worker 3 started  job 3
// worker 3 finished job 3
// worker 3 started  job 4
// worker 2 finished job 2
// worker 2 started  job 5
// worker 1 finished job 1
// sleep end
// 6
// 4
// 2
// worker 3 finished job 4
// 8
// worker 2 finished job 5
// 10

// ★ バッファ指定ありの場合
// sleep...
// worker 1 started  job 1
// worker 2 started  job 2
// worker 3 started  job 3
// worker 3 finished job 3
// worker 3 started  job 4
// worker 1 finished job 1
// worker 1 started  job 5
// worker 2 finished job 2
// worker 1 finished job 5
// worker 3 finished job 4
// sleep end
// 6
// 2
// 4
// worker 1 finished job 5
// 10
// worker 2 finished job 4
// 8
  • バッファ指定ありの場合は、待ち受け? バッファが空だからブロック、送信されたものを worker が受け取るまでのタイムラグ?の間に main の sleep 実行か?
  • バッファ指定なしの場合は、即実行? 送信側(main)と受信側の goroutine 揃うためブロックされず、worker が受け取ったら即実行
    • 送信側も goroutine にすると、起動のタイムラグで先に main の sleep 実行
  • results をバッファ指定なしにした場合は、3 回送信時点で受信側がそれ以上取り出そうとし deadlock

fan-out/fan-in

  • ファンアウト:並列処理の起点となる1つのロジックから分岐。この分岐
  • ファンイン:分岐の待ち合わせ

sync.WaitGroup

  • ファンアウト、ファンインの仕組みを提供
  • 複数の goroutine を管理

メソッド:

  • Add:タスク数登録
  • Done:タスク完了
  • Wait:タスク完了の待機
func worker(id int) {
    fmt.Printf("Worker %d start\n", id)
    time.Sleep(time.Second)
    fmt.Printf("Worker %d end\n", id)
}

func main() {
    var wg sync.WaitGroup
    for i := 1; i <= 5; i++ {
        wg.Add(1)
        i := i
        go func() {
            defer wg.Done()
            worker(i)
        }()
    }
    wg.Wait()
}

Channel で同様にデータの受け渡しが可能

func responseSize(wg *sync.WaitGroup, url string, nums chan int) {
    defer wg.Done()
    response, err := http.Get(url)
    if err != nil {
        log.Fatal(err)
    }
    defer response.Body.Close()
    body, err := ioutil.ReadAll(response.Body)
    if err != nil {
        log.Fatal(err)
    }
    nums <- len(body)
}

func main() {
    wg := new(sync.WaitGroup)
    nums := make(chan int)
    wg.Add(1)
    go responseSize(wg, "https://www.example.com", nums)
    fmt.Println(<-nums)
    wg.Wait()
    close(nums)
}

errorgroup.Group

  • 基本形
func main() {
    var eg errgroup.Group
    for i := 1; i <= 5; i++ {
        id := i
        eg.Go(func() error {
            worker(id)
            return nil
        })
    }

    if err := eg.Wait(); err != nil {
        fmt.Println("error: ", err)
    }
}
  • エラーが発生したときに後続の Goroutine をキャンセルする(context)
func main() {
    eg, ctx := errgroup.WithContext(context.Background())

    for i := 0; i < 100; i++ {
        i := i
        eg.Go(func() error {
            time.Sleep(2 * time.Second) // 長い処理

            select {
            case <-ctx.Done():
                fmt.Println("Canceled:", i)
                return nil
            default:
                if i > 90 {
                    fmt.Println("Error:", i)
                    return fmt.Errorf("Error: %d", i)
                }
                fmt.Println("End:", i)
                return nil
            }
        })
    }
    if err := eg.Wait(); err != nil {
        log.Fatal(err)
    }
}

refs

良き Example

Groutine/Channel

WaitGroup/ErrGroup

golang 並列処理のはまりどころ

詳細な解説

Concurrency Patterns