The Go Blog

Goの並行処理パターン: パイプラインとキャンセル

Sameer Ajmani
2014年3月13日

はじめに

Goの並行処理プリミティブを使用すると、I/Oと複数のCPUを効率的に使用できるストリーミングデータパイプラインを簡単に構築できます。この記事では、そのようなパイプラインの例を示し、操作が失敗したときに発生する微妙な問題を highlighted し、エラーをクリーンに処理するための手法を紹介します。

パイプラインとは何ですか?

Goにはパイプラインの正式な定義はありません。これは、多くの種類の並行プログラムの1つにすぎません。非公式には、パイプラインはチャネルによって接続された一連の_ステージ_であり、各ステージは同じ関数を実行するゴルーチンのグループです。各ステージで、ゴルーチンは

  • _上流_から_入力_チャネルを介して値を受信します
  • 通常、新しい値を生成するそのデータに対して何らかの関数を実行します
  • _出力_チャネルを介して_下流_に値を送信します

各ステージには、それぞれ出力チャネルまたは入力チャネルのみを持つ最初のステージと最後のステージを除いて、任意の数の入力チャネルと出力チャネルがあります。最初のステージは、_ソース_または_プロデューサー_と呼ばれることもあります。最後のステージは、_シンク_または_コンシューマー_です。

アイデアと手法を説明するために、簡単なパイプラインの例から始めます。後で、より現実的な例を紹介します。

数値の2乗

3つのステージを持つパイプラインを考えてみましょう。

最初のステージである `gen` は、整数のリストを、リスト内の整数を発行するチャネルに変換する関数です。 `gen` 関数は、チャネルで整数を送信し、すべての値が送信されたらチャネルを閉じるゴルーチンを開始します

func gen(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

2番目のステージである `sq` は、チャネルから整数を受信し、受信した各整数の2乗を発行するチャネルを返します。入力チャネルが閉じられ、このステージがすべての値を下流に送信した後、出力チャネルを閉じます

func sq(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

`main` 関数はパイプラインを設定し、最後のステージを実行します。2番目のステージから値を受信し、チャネルが閉じられるまでそれぞれを出力します

func main() {
    // Set up the pipeline.
    c := gen(2, 3)
    out := sq(c)

    // Consume the output.
    fmt.Println(<-out) // 4
    fmt.Println(<-out) // 9
}

`sq` は、入力チャネルと出力チャネルのタイプが同じであるため、何度でも作成できます。また、他のステージと同様に、`main` を範囲ループとして書き直すこともできます

func main() {
    // Set up the pipeline and consume the output.
    for n := range sq(sq(gen(2, 3))) {
        fmt.Println(n) // 16 then 81
    }
}

ファンアウト、ファンイン

複数の関数は、そのチャネルが閉じられるまで同じチャネルから読み取ることができます。これは_ファンアウト_と呼ばれます。これにより、CPU使用率とI/Oを並列化するために、ワーカーのグループ間で作業を分散する方法が提供されます。

関数は複数の入力から読み取り、すべての入力が閉じられたときに閉じられる単一のチャネルに入力チャネルを多重化することにより、すべてが閉じられるまで続行できます。これは_ファンイン_と呼ばれます。

パイプラインを変更して、`sq` の2つのインスタンスを実行し、それぞれが同じ入力チャネルから読み取るようにできます。結果をファンインするために、新しい関数_マージ_を導入します

func main() {
    in := gen(2, 3)

    // Distribute the sq work across two goroutines that both read from in.
    c1 := sq(in)
    c2 := sq(in)

    // Consume the merged output from c1 and c2.
    for n := range merge(c1, c2) {
        fmt.Println(n) // 4 then 9, or 9 then 4
    }
}

`merge` 関数は、各入力チャネルのゴルーチンを開始することにより、チャネルのリストを単一のチャネルに変換します。このゴルーチンは、値を唯一の出力チャネルにコピーします。すべての `output` ゴルーチンが開始されると、`merge` は、そのチャネルのすべての送信が完了した後に出力チャネルを閉じるために、さらに1つのゴルーチンを開始します。

閉じたチャネルへの送信はパニックを引き起こすため、closeを呼び出す前にすべての送信が完了していることを確認することが重要です。 `sync.WaitGroup` タイプは、この同期を調整する簡単な方法を提供します

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // Start an output goroutine for each input channel in cs.  output
    // copies values from c to out until c is closed, then calls wg.Done.
    output := func(c <-chan int) {
        for n := range c {
            out <- n
        }
        wg.Done()
    }
    wg.Add(len(cs))
    for _, c := range cs {
        go output(c)
    }

    // Start a goroutine to close out once all the output goroutines are
    // done.  This must start after the wg.Add call.
    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}

途中で停止する

パイプライン関数にはパターンがあります

  • ステージは、すべての送信操作が完了すると、出力チャネルを閉じます。
  • ステージは、それらのチャネルが閉じられるまで、入力チャネルから値を受信し続けます。

このパターンにより、各受信ステージを `range` ループとして記述でき、すべての値が正常に下流に送信されると、すべてのゴルーチンが終了することが保証されます。

しかし、実際のパイプラインでは、ステージがすべての入力値を受信するとは限りません。これは設計による場合もあります。レシーバーは、進捗するために値のサブセットのみを必要とする場合があります。多くの場合、入力値が前のステージのエラーを表しているため、ステージは早期に終了します。いずれの場合も、レシーバーは残りの値が到着するのを待つ必要はなく、後のステージが必要としない値を前のステージが生成しないようにする必要があります。

パイプラインの例では、ステージが入力値をすべて消費できない場合、それらの値を送信しようとするゴルーチンは無期限にブロックされます

    // Consume the first value from the output.
    out := merge(c1, c2)
    fmt.Println(<-out) // 4 or 9
    return
    // Since we didn't receive the second value from out,
    // one of the output goroutines is hung attempting to send it.
}

これはリソースリークです。ゴルーチンはメモリとランタイムリソースを消費し、ゴルーチンスタックのヒープ参照はデータがガベージコレクションされるのを防ぎます。ゴルーチンはガベージコレクションされません。彼らは自分で終了しなければなりません。

下流のステージがすべての入力値の受信に失敗した場合でも、パイプラインの上流のステージが終了するように調整する必要があります。これを行う1つの方法は、出力チャネルに変更してバッファを持たせることです。バッファは固定数の値を保持できます。バッファに空きがあれば、送信操作はすぐに完了します

c := make(chan int, 2) // buffer size 2
c <- 1  // succeeds immediately
c <- 2  // succeeds immediately
c <- 3  // blocks until another goroutine does <-c and receives 1

送信される値の数がチャネル作成時にわかっている場合、バッファはコードを簡略化できます。たとえば、`gen` を書き直して、整数のリストをバッファリングされたチャネルにコピーし、新しいゴルーチンを作成しないようにすることができます

func gen(nums ...int) <-chan int {
    out := make(chan int, len(nums))
    for _, n := range nums {
        out <- n
    }
    close(out)
    return out
}

パイプラインでブロックされたゴルーチンに戻ると、`merge` によって返される出力チャネルにバッファを追加することを検討できます

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int, 1) // enough space for the unread inputs
    // ... the rest is unchanged ...

これはこのプログラムのブロックされたゴルーチンを修正しますが、これは悪いコードです。ここでバッファサイズを1に選択することは、`merge` が受信する値の数と下流のステージが消費する値の数を知っていることに依存しています。これは脆弱です。`gen` に追加の値を渡したり、下流のステージが値を少なく読み取ると、ゴルーチンが再びブロックされます。

代わりに、下流のステージが送信者にこれ以上入力を受け入れないことを示す方法を提供する必要があります。

明示的なキャンセル

`main` が `out` からすべての値を受信せずに終了することを決定した場合、上流のステージのゴルーチンに送信しようとしている値を放棄するように指示する必要があります。これは、`done` と呼ばれるチャネルで値を送信することによって行います。潜在的に2つのブロックされた送信者があるため、2つの値を送信します

func main() {
    in := gen(2, 3)

    // Distribute the sq work across two goroutines that both read from in.
    c1 := sq(in)
    c2 := sq(in)

    // Consume the first value from output.
    done := make(chan struct{}, 2)
    out := merge(done, c1, c2)
    fmt.Println(<-out) // 4 or 9

    // Tell the remaining senders we're leaving.
    done <- struct{}{}
    done <- struct{}{}
}

送信ゴルーチンは、送信操作を、`out` での送信が発生したとき、または `done` から値を受信したときのいずれかで続行する `select` ステートメントに置き換えます。 `done` の値の型は空の構造体です。値は問題ではないためです。 `out` での送信を放棄する必要があることを示すのは受信イベントです。 `output` ゴルーチンは入力チャネル `c` でループし続けるため、上流のステージはブロックされません。(このループを早期に戻す方法については、後で説明します。)

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // Start an output goroutine for each input channel in cs.  output
    // copies values from c to out until c is closed or it receives a value
    // from done, then output calls wg.Done.
    output := func(c <-chan int) {
        for n := range c {
            select {
            case out <- n:
            case <-done:
            }
        }
        wg.Done()
    }
    // ... the rest is unchanged ...

このアプローチには問題があります。_各_下流レシーバーは、潜在的にブロックされた上流送信者の数を知っており、早期リターン時にそれらの送信者にシグナルを送信するように調整する必要があります。これらの数を追跡することは退屈でエラーが発生しやすいです。

未知の数のゴルーチンに、値を下流に送信しないように指示する方法が必要です。Goでは、チャネルを閉じることでこれを行うことができます。 閉じたチャネルでの受信操作は常にすぐに続行でき、要素型のゼロ値が生成されるためです。

これは、`main` が `done` チャネルを閉じるだけで、すべての送信者のブロックを解除できることを意味します。このcloseは、送信者へのブロードキャスト信号として効果的に機能します。_各_パイプライン関数を拡張して、`done` をパラメーターとして受け入れ、`defer` ステートメントを介してcloseが発生するように調整します。そのため、`main` からのすべてのリターンパスは、パイプラインステージに終了するようにシグナルを送信します。

func main() {
    // Set up a done channel that's shared by the whole pipeline,
    // and close that channel when this pipeline exits, as a signal
    // for all the goroutines we started to exit.
    done := make(chan struct{})
    defer close(done)          

    in := gen(done, 2, 3)

    // Distribute the sq work across two goroutines that both read from in.
    c1 := sq(done, in)
    c2 := sq(done, in)

    // Consume the first value from output.
    out := merge(done, c1, c2)
    fmt.Println(<-out) // 4 or 9

    // done will be closed by the deferred call.      
}

これで、各パイプラインステージは、`done` が閉じられるとすぐに自由に返せるようになりました。 `merge` の `output` ルーチンは、上流の送信者である `sq` が `done` が閉じられたときに送信の試行を停止することを知っているため、入力チャネルをドレインせずに返すことができます。 `output` は、`defer` ステートメントを介してすべてのリターンパスで `wg.Done` が呼び出されることを保証します

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // Start an output goroutine for each input channel in cs.  output
    // copies values from c to out until c or done is closed, then calls
    // wg.Done.
    output := func(c <-chan int) {
        defer wg.Done()
        for n := range c {
            select {
            case out <- n:
            case <-done:
                return
            }
        }
    }
    // ... the rest is unchanged ...

同様に、`sq` は `done` が閉じられるとすぐに返すことができます。 `sq` は、`defer` ステートメントを介してすべてのリターンパスで出力チャネル `out` が閉じられることを保証します

func sq(done <-chan struct{}, in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            select {
            case out <- n * n:
            case <-done:
                return
            }
        }
    }()
    return out
}

パイプライン構築のガイドラインは次のとおりです

  • ステージは、すべての送信操作が完了すると、出力チャネルを閉じます。
  • ステージは、それらのチャネルが閉じられるか、送信者のブロックが解除されるまで、入力チャネルから値を受信し続けます。

パイプラインは、送信されるすべての値に十分なバッファがあることを確認するか、レシーバーがチャネルを放棄する可能性があるときに送信者に明示的にシグナルを送信することにより、送信者のブロックを解除します。

ツリーのダイジェスト

より現実的なパイプラインを考えてみましょう。

MD5は、ファイルチェックサムとして役立つメッセージダイジェストアルゴリズムです。コマンドラインユーティリティ `md5sum` は、ファイルのリストのダイジェスト値を出力します。

% md5sum *.go
d47c2bbc28298ca9befdfbc5d3aa4e65  bounded.go
ee869afd31f83cbb2d10ee81b2b831dc  parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96  serial.go

このサンプルプログラムは `md5sum` のようなものですが、引数として単一のディレクトリを取り、そのディレクトリの下にある各通常ファイルのダイジェスト値をパス名でソートして出力します。

% go run serial.go .
d47c2bbc28298ca9befdfbc5d3aa4e65  bounded.go
ee869afd31f83cbb2d10ee81b2b831dc  parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96  serial.go

プログラムの `main` 関数は、パス名からダイジェスト値へのマップを返すヘルパー関数 `MD5All` を呼び出し、結果をソートして出力します

func main() {
    // Calculate the MD5 sum of all files under the specified directory,
    // then print the results sorted by path name.
    m, err := MD5All(os.Args[1])
    if err != nil {
        fmt.Println(err)
        return
    }
    var paths []string
    for path := range m {
        paths = append(paths, path)
    }
    sort.Strings(paths)
    for _, path := range paths {
        fmt.Printf("%x  %s\n", m[path], path)
    }
}

`MD5All` 関数は、この議論の中心です。 serial.go では、実装は並行処理を使用せず、ツリーをウォークしながら各ファイルを読み取って合計します。

// MD5All reads all the files in the file tree rooted at root and returns a map
// from file path to the MD5 sum of the file's contents.  If the directory walk
// fails or any read operation fails, MD5All returns an error.
func MD5All(root string) (map[string][md5.Size]byte, error) {
    m := make(map[string][md5.Size]byte)
    err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
        if err != nil {
            return err
        }
        if !info.Mode().IsRegular() {
            return nil
        }
        data, err := ioutil.ReadFile(path)
        if err != nil {
            return err
        }
        m[path] = md5.Sum(data)
        return nil
    })
    if err != nil {
        return nil, err
    }
    return m, nil
}

並列ダイジェスト

parallel.go では、`MD5All` を2段階のパイプラインに分割します。最初のステージである `sumFiles` は、ツリーをウォークし、新しいゴルーチンで各ファイルをダイジェストし、値の型が `result` のチャネルで結果を送信します

type result struct {
    path string
    sum  [md5.Size]byte
    err  error
}

`sumFiles` は2つのチャネルを返します。1つは `results` 用、もう1つは `filepath.Walk` によって返されるエラー用です。 walk関数は、各通常ファイルを処理するために新しいゴルーチンを開始し、次に `done` をチェックします。 `done` が閉じている場合、walkはすぐに停止します

func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) {
    // For each regular file, start a goroutine that sums the file and sends
    // the result on c.  Send the result of the walk on errc.
    c := make(chan result)
    errc := make(chan error, 1)
    go func() {
        var wg sync.WaitGroup
        err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
            if err != nil {
                return err
            }
            if !info.Mode().IsRegular() {
                return nil
            }
            wg.Add(1)
            go func() {
                data, err := ioutil.ReadFile(path)
                select {
                case c <- result{path, md5.Sum(data), err}:
                case <-done:
                }
                wg.Done()
            }()
            // Abort the walk if done is closed.
            select {
            case <-done:
                return errors.New("walk canceled")
            default:
                return nil
            }
        })
        // Walk has returned, so all calls to wg.Add are done.  Start a
        // goroutine to close c once all the sends are done.
        go func() {
            wg.Wait()
            close(c)
        }()
        // No select needed here, since errc is buffered.
        errc <- err
    }()
    return c, errc
}

`MD5All` は `c` からダイジェスト値を受信します。 `MD5All` はエラーが発生すると早期に戻り、`defer` を介して `done` を閉じます

func MD5All(root string) (map[string][md5.Size]byte, error) {
    // MD5All closes the done channel when it returns; it may do so before
    // receiving all the values from c and errc.
    done := make(chan struct{})
    defer close(done)          

    c, errc := sumFiles(done, root)

    m := make(map[string][md5.Size]byte)
    for r := range c {
        if r.err != nil {
            return nil, r.err
        }
        m[r.path] = r.sum
    }
    if err := <-errc; err != nil {
        return nil, err
    }
    return m, nil
}

制限付き並列処理

parallel.go にある MD5All の実装では、ファイルごとに新しいゴルーチンが起動されます。多数の大きなファイルを含むディレクトリでは、マシンで使用可能なメモリよりも多くのメモリが割り当てられる可能性があります。

並列に読み取るファイル数を制限することで、これらのメモリ割り当てを制限できます。 bounded.go では、ファイルを読み取るためのゴルーチンを固定数作成することでこれを実現しています。パイプラインには、ツリーの走査、ファイルの読み取りとダイジェスト、ダイジェストの収集の 3 つのステージがあります。

最初のステージである walkFiles は、ツリー内の通常ファイルのパスを出力します。

func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) {
    paths := make(chan string)
    errc := make(chan error, 1)
    go func() {
        // Close the paths channel after Walk returns.
        defer close(paths)
        // No select needed for this send, since errc is buffered.
        errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
            if err != nil {
                return err
            }
            if !info.Mode().IsRegular() {
                return nil
            }
            select {
            case paths <- path:
            case <-done:
                return errors.New("walk canceled")
            }
            return nil
        })
    }()
    return paths, errc
}

中間ステージでは、固定数の digester ゴルーチンが起動されます。これらのゴルーチンは、paths からファイル名を受け取り、チャネル cresults を送信します。

func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {
    for path := range paths {
        data, err := ioutil.ReadFile(path)
        select {
        case c <- result{path, md5.Sum(data), err}:
        case <-done:
            return
        }
    }
}

前の例とは異なり、digester は出力チャネルを閉じません。これは、複数のゴルーチンが共有チャネルに送信しているためです。代わりに、MD5All のコードは、すべての digester が完了したときにチャネルが閉じられるようにします。

    // Start a fixed number of goroutines to read and digest files.
    c := make(chan result)
    var wg sync.WaitGroup
    const numDigesters = 20
    wg.Add(numDigesters)
    for i := 0; i < numDigesters; i++ {
        go func() {
            digester(done, paths, c)
            wg.Done()
        }()
    }
    go func() {
        wg.Wait()
        close(c)
    }()

代わりに、各 digester が独自の出力チャネルを作成して返すこともできますが、その場合は結果をファンインするための追加のゴルーチンが必要になります。

最後のステージは、c からすべての results を受信し、errc からのエラーをチェックします。このチェックは、これより前の時点では実行できません。なぜなら、この時点より前では、walkFiles が下流に値を送信しようとしてブロックする可能性があるためです。

    m := make(map[string][md5.Size]byte)
    for r := range c {
        if r.err != nil {
            return nil, r.err
        }
        m[r.path] = r.sum
    }
    // Check whether the Walk failed.
    if err := <-errc; err != nil {
        return nil, err
    }
    return m, nil
}

結論

この記事では、Go でストリーミングデータパイプラインを構築するためのテクニックを紹介しました。このようなパイプラインでの障害の処理は注意が必要です。パイプラインの各ステージは、下流に値を送信しようとしてブロックする可能性があり、下流のステージは受信データに関心がなくなっている可能性があるためです。チャネルを閉じることで、パイプラインによって開始されたすべてのゴルーチンに「完了」シグナルをブロードキャストする方法と、パイプラインを正しく構築するためのガイドラインを定義しました。

参考文献

  • Go Concurrency Patternsビデオ)では、Go の並行処理プリミティブの基本と、それらを適用するいくつかの方法を紹介しています。
  • Advanced Go Concurrency Patternsビデオ)では、Go のプリミティブ、特に select のより複雑な使用方法について説明しています。
  • Douglas McIlroy の論文 Squinting at Power Series は、Go のような並行処理が複雑な計算をどのようにエレガントにサポートするかを示しています。

次の記事:Go Gopher
前の記事:FOSDEM 2014 での Go の講演
ブログインデックス