The Go Blog

Go並行処理パターン:パイプラインとキャンセル

Sameer Ajmani
2014年3月13日

はじめに

Goの並行処理プリミティブを使うと、I/Oと複数のCPUを効率的に利用するストリーミングデータパイプラインを簡単に構築できます。この記事では、そのようなパイプラインの例を示し、操作が失敗したときに発生する微妙な問題に焦点を当て、失敗をきれいに処理するためのテクニックを紹介します。

パイプラインとは?

Goにはパイプラインの正式な定義はありません。これは多くの種類の並行プログラムの一つに過ぎません。非公式には、パイプラインとは、チャネルで接続された一連のステージであり、各ステージは同じ関数を実行するゴルーチンのグループです。各ステージでは、ゴルーチンは

  • インバウンドチャネルを介してアップストリームから値を受信する
  • そのデータに対して何らかの関数を実行し、通常は新しい値を生成する
  • アウトバウンドチャネルを介してダウンストリームに値を送信する

各ステージには任意の数のインバウンドチャネルとアウトバウンドチャネルがありますが、最初と最後のステージはそれぞれアウトバウンドチャネルまたはインバウンドチャネルのみを持ちます。最初のステージは時にソースまたはプロデューサーと呼ばれ、最後のステージはシンクまたはコンシューマーと呼ばれます。

アイデアとテクニックを説明するために、簡単なパイプラインの例から始めます。後で、より現実的な例を紹介します。

数の二乗

3つのステージを持つパイプラインを考えてみましょう。

最初のステージであるgenは、整数のリストを、リスト内の整数を排出するチャネルに変換する関数です。gen関数は、チャネルに整数を送信し、すべての値が送信されたらチャネルを閉じるゴルーチンを開始します

func gen(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

2番目のステージであるsqは、チャネルから整数を受信し、受信した各整数の二乗を排出するチャネルを返します。インバウンドチャネルが閉じられ、このステージがすべての値をダウンストリームに送信した後、アウトバウンドチャネルを閉じます

func sq(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

main関数はパイプラインを設定し、最終ステージを実行します。チャネルが閉じられるまで、2番目のステージから値を受信し、それぞれをプリントします

func main() {
    // Set up the pipeline.
    c := gen(2, 3)
    out := sq(c)

    // Consume the output.
    fmt.Println(<-out) // 4
    fmt.Println(<-out) // 9
}

sqはインバウンドチャネルとアウトバウンドチャネルに同じ型を持つため、任意の回数合成できます。また、mainを他のステージと同様に範囲ループとして書き直すこともできます

func main() {
    // Set up the pipeline and consume the output.
    for n := range sq(sq(gen(2, 3))) {
        fmt.Println(n) // 16 then 81
    }
}

ファンアウト、ファンイン

複数の関数が同じチャネルから、そのチャネルが閉じられるまで読み取ることができます。これをファンアウトと呼びます。これは、CPUの使用とI/Oを並列化するために、ワーカーのグループ間で作業を分散する方法を提供します。

関数は複数の入力から読み取り、すべての入力が閉じられたときに閉じられる単一のチャネルに入力チャネルを多重化することで、すべての入力が閉じられるまで処理を続行できます。これをファンインと呼びます。

パイプラインを変更して、sqの2つのインスタンスを実行し、それぞれが同じ入力チャネルから読み取るようにできます。結果をファンインするために、新しい関数mergeを導入します

func main() {
    in := gen(2, 3)

    // Distribute the sq work across two goroutines that both read from in.
    c1 := sq(in)
    c2 := sq(in)

    // Consume the merged output from c1 and c2.
    for n := range merge(c1, c2) {
        fmt.Println(n) // 4 then 9, or 9 then 4
    }
}

merge関数は、各インバウンドチャネルに対してゴルーチンを開始し、値を唯一のアウトバウンドチャネルにコピーすることで、チャネルのリストを単一のチャネルに変換します。すべてのoutputゴルーチンが開始されたら、mergeは、そのチャネルでのすべての送信が完了した後、アウトバウンドチャネルを閉じるための別のゴルーチンを開始します。

閉じられたチャネルへの送信はパニックを引き起こすため、closeを呼び出す前にすべての送信が完了していることを確認することが重要です。 sync.WaitGroup型は、この同期を調整する簡単な方法を提供します

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // Start an output goroutine for each input channel in cs.  output
    // copies values from c to out until c is closed, then calls wg.Done.
    output := func(c <-chan int) {
        for n := range c {
            out <- n
        }
        wg.Done()
    }
    wg.Add(len(cs))
    for _, c := range cs {
        go output(c)
    }

    // Start a goroutine to close out once all the output goroutines are
    // done.  This must start after the wg.Add call.
    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}

途中で停止する

パイプライン関数にはパターンがあります

  • すべての送信操作が完了すると、ステージはアウトバウンドチャネルを閉じます。
  • ステージは、インバウンドチャネルが閉じられるまで、インバウンドチャネルから値を受信し続けます。

このパターンにより、各受信ステージをrangeループとして記述でき、すべての値がダウンストリームに正常に送信された後、すべてのゴルーチンが終了することを保証します。

しかし、実際のパイプラインでは、ステージがすべてのインバウンド値を受信するとは限りません。これは意図的な場合もあります。受信側は、処理を進めるために値のサブセットしか必要としないかもしれません。多くの場合、インバウンド値が以前のステージでのエラーを表すため、ステージが早期に終了します。いずれの場合も、受信側は残りの値が到着するのを待つ必要がなく、後のステージが必要としない値を以前のステージが生成するのを停止させたいのです。

例のパイプラインでは、ステージがすべてのインバウンド値を消費できなかった場合、それらの値を送信しようとするゴルーチンは無期限にブロックされます

    // Consume the first value from the output.
    out := merge(c1, c2)
    fmt.Println(<-out) // 4 or 9
    return
    // Since we didn't receive the second value from out,
    // one of the output goroutines is hung attempting to send it.
}

これはリソースリークです。ゴルーチンはメモリとランタイムリソースを消費し、ゴルーチンのスタック内のヒープ参照はデータのガーベージコレクションを妨げます。ゴルーチンはガーベージコレクションされません。それらは自分自身で終了しなければなりません。

ダウンストリームステージがすべてのインバウンド値を受信できなかった場合でも、パイプラインのアップストリームステージが終了するように手配する必要があります。これを行う一つの方法は、アウトバウンドチャネルをバッファ付きに変更することです。バッファは固定数の値を保持できます。バッファに空きがある場合、送信操作はすぐに完了します

c := make(chan int, 2) // buffer size 2
c <- 1  // succeeds immediately
c <- 2  // succeeds immediately
c <- 3  // blocks until another goroutine does <-c and receives 1

チャネル作成時に送信する値の数がわかっている場合、バッファはコードを簡素化できます。たとえば、genを書き直して、整数のリストをバッファ付きチャネルにコピーし、新しいゴルーチンを作成しないようにできます

func gen(nums ...int) <-chan int {
    out := make(chan int, len(nums))
    for _, n := range nums {
        out <- n
    }
    close(out)
    return out
}

パイプライン内のブロックされたゴルーチンに戻ると、mergeが返すアウトバウンドチャネルにバッファを追加することを検討するかもしれません

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int, 1) // enough space for the unread inputs
    // ... the rest is unchanged ...

これはこのプログラムのブロックされたゴルーチンを修正しますが、これは悪いコードです。ここでのバッファサイズ1の選択は、mergeが受信する値の数と、ダウンストリームステージが消費する値の数を知っていることに依存します。これは壊れやすいです。genに追加の値を渡したり、ダウンストリームステージが読み取る値が少なくなったりすると、再びゴルーチンがブロックされます。

代わりに、ダウンストリームステージが入力の受け入れを停止することを送信側に示す方法を提供する必要があります。

明示的なキャンセル

mainoutからすべての値を受信せずに終了することを決定した場合、アップストリームステージのゴルーチンに、送信しようとしている値を放棄するように指示する必要があります。これは、doneというチャネルに値を送信することによって行われます。潜在的に2つのブロックされた送信側があるため、2つの値を送信します

func main() {
    in := gen(2, 3)

    // Distribute the sq work across two goroutines that both read from in.
    c1 := sq(in)
    c2 := sq(in)

    // Consume the first value from output.
    done := make(chan struct{}, 2)
    out := merge(done, c1, c2)
    fmt.Println(<-out) // 4 or 9

    // Tell the remaining senders we're leaving.
    done <- struct{}{}
    done <- struct{}{}
}

送信ゴルーチンは、送信操作をselectステートメントに置き換えます。このステートメントは、outへの送信が発生するか、doneから値を受信したときに進行します。doneの値型は空の構造体です。なぜなら、値は重要ではなく、outへの送信を放棄すべきであることを示すのは受信イベントだからです。outputゴルーチンはインバウンドチャネルcでループを続行するため、アップストリームステージはブロックされません。(このループを早期に返す方法については後ほど説明します。)

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // Start an output goroutine for each input channel in cs.  output
    // copies values from c to out until c is closed or it receives a value
    // from done, then output calls wg.Done.
    output := func(c <-chan int) {
        for n := range c {
            select {
            case out <- n:
            case <-done:
            }
        }
        wg.Done()
    }
    // ... the rest is unchanged ...

このアプローチには問題があります。ダウンストリーム受信側は、潜在的にブロックされているアップストリーム送信側の数を知り、早期に返す際にそれらの送信側にシグナルを送るように手配する必要があります。これらの数を追跡するのは面倒でエラーが発生しやすいです。

未知の、無制限の数のゴルーチンに、ダウンストリームに値を送信するのを停止するように指示する方法が必要です。Goでは、チャネルを閉じることによってこれを行うことができます。なぜなら、閉じられたチャネルでの受信操作は常にすぐに進行し、要素型のゼロ値を生成できるからです。

これは、maindoneチャネルを閉じるだけで、すべての送信側をブロック解除できることを意味します。このクローズは、送信側への事実上のブロードキャストシグナルです。パイプライン関数のそれぞれを拡張してdoneをパラメータとして受け入れ、deferステートメントを介してクローズが行われるように手配します。これにより、mainからのすべての戻りパスがパイプラインステージに終了を通知します。

func main() {
    // Set up a done channel that's shared by the whole pipeline,
    // and close that channel when this pipeline exits, as a signal
    // for all the goroutines we started to exit.
    done := make(chan struct{})
    defer close(done)          

    in := gen(done, 2, 3)

    // Distribute the sq work across two goroutines that both read from in.
    c1 := sq(done, in)
    c2 := sq(done, in)

    // Consume the first value from output.
    out := merge(done, c1, c2)
    fmt.Println(<-out) // 4 or 9

    // done will be closed by the deferred call.      
}

これで、パイプラインの各ステージは、doneが閉じられるとすぐに戻ることができます。mergeoutputルーチンは、アップストリーム送信側であるsqdoneが閉じられたときに送信を試みるのを停止することを知っているため、インバウンドチャネルをドレインすることなく戻ることができます。outputは、deferステートメントを介してすべての戻りパスでwg.Doneが呼び出されることを保証します

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // Start an output goroutine for each input channel in cs.  output
    // copies values from c to out until c or done is closed, then calls
    // wg.Done.
    output := func(c <-chan int) {
        defer wg.Done()
        for n := range c {
            select {
            case out <- n:
            case <-done:
                return
            }
        }
    }
    // ... the rest is unchanged ...

同様に、sqdoneが閉じられるとすぐに戻ることができます。sqは、deferステートメントを介してすべての戻りパスでそのoutチャネルが閉じられることを保証します

func sq(done <-chan struct{}, in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            select {
            case out <- n * n:
            case <-done:
                return
            }
        }
    }()
    return out
}

パイプライン構築のガイドラインは次のとおりです

  • すべての送信操作が完了すると、ステージはアウトバウンドチャネルを閉じます。
  • ステージは、インバウンドチャネルが閉じられるか、送信側がブロック解除されるまで、インバウンドチャネルから値を受信し続けます。

パイプラインは、送信されるすべての値に十分なバッファがあることを保証するか、受信側がチャネルを放棄する可能性があるときに送信側に明示的にシグナルを送ることで、送信側をブロック解除します。

ツリーのダイジェスト

より現実的なパイプラインを考えてみましょう。

MD5は、ファイルのチェックサムとして有用なメッセージダイジェストアルゴリズムです。コマンドラインユーティリティmd5sumは、ファイルのリストのダイジェスト値を出力します。

% md5sum *.go
d47c2bbc28298ca9befdfbc5d3aa4e65  bounded.go
ee869afd31f83cbb2d10ee81b2b831dc  parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96  serial.go

例のプログラムはmd5sumに似ていますが、引数として単一のディレクトリを受け取り、そのディレクトリの下にある各通常ファイルのダイジェスト値をパス名でソートして出力します。

% go run serial.go .
d47c2bbc28298ca9befdfbc5d3aa4e65  bounded.go
ee869afd31f83cbb2d10ee81b2b831dc  parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96  serial.go

プログラムのmain関数は、ヘルパー関数MD5Allを呼び出します。これは、パス名からダイジェスト値へのマップを返し、その後結果をソートして出力します

func main() {
    // Calculate the MD5 sum of all files under the specified directory,
    // then print the results sorted by path name.
    m, err := MD5All(os.Args[1])
    if err != nil {
        fmt.Println(err)
        return
    }
    var paths []string
    for path := range m {
        paths = append(paths, path)
    }
    sort.Strings(paths)
    for _, path := range paths {
        fmt.Printf("%x  %s\n", m[path], path)
    }
}

MD5All関数が議論の焦点です。 serial.goでは、実装は並行処理を使用せず、ツリーをウォークしながら各ファイルを読み取り、合計するだけです。

// MD5All reads all the files in the file tree rooted at root and returns a map
// from file path to the MD5 sum of the file's contents.  If the directory walk
// fails or any read operation fails, MD5All returns an error.
func MD5All(root string) (map[string][md5.Size]byte, error) {
    m := make(map[string][md5.Size]byte)
    err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
        if err != nil {
            return err
        }
        if !info.Mode().IsRegular() {
            return nil
        }
        data, err := ioutil.ReadFile(path)
        if err != nil {
            return err
        }
        m[path] = md5.Sum(data)
        return nil
    })
    if err != nil {
        return nil, err
    }
    return m, nil
}

並列ダイジェスト

parallel.goでは、MD5Allを2段階のパイプラインに分割します。最初のステージであるsumFilesは、ツリーをウォークし、新しいゴルーチンで各ファイルをダイジェストし、result型の値を持つチャネルに結果を送信します

type result struct {
    path string
    sum  [md5.Size]byte
    err  error
}

sumFilesは2つのチャネルを返します。1つはresults用、もう1つはfilepath.Walkによって返されるエラー用です。ウォーク関数は、各通常ファイルを処理するために新しいゴルーチンを開始し、その後doneをチェックします。doneが閉じられている場合、ウォークはすぐに停止します

func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) {
    // For each regular file, start a goroutine that sums the file and sends
    // the result on c.  Send the result of the walk on errc.
    c := make(chan result)
    errc := make(chan error, 1)
    go func() {
        var wg sync.WaitGroup
        err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
            if err != nil {
                return err
            }
            if !info.Mode().IsRegular() {
                return nil
            }
            wg.Add(1)
            go func() {
                data, err := ioutil.ReadFile(path)
                select {
                case c <- result{path, md5.Sum(data), err}:
                case <-done:
                }
                wg.Done()
            }()
            // Abort the walk if done is closed.
            select {
            case <-done:
                return errors.New("walk canceled")
            default:
                return nil
            }
        })
        // Walk has returned, so all calls to wg.Add are done.  Start a
        // goroutine to close c once all the sends are done.
        go func() {
            wg.Wait()
            close(c)
        }()
        // No select needed here, since errc is buffered.
        errc <- err
    }()
    return c, errc
}

MD5Allcからダイジェスト値を受信します。MD5Allはエラーが発生すると早期にリターンし、deferを介してdoneを閉じます

func MD5All(root string) (map[string][md5.Size]byte, error) {
    // MD5All closes the done channel when it returns; it may do so before
    // receiving all the values from c and errc.
    done := make(chan struct{})
    defer close(done)          

    c, errc := sumFiles(done, root)

    m := make(map[string][md5.Size]byte)
    for r := range c {
        if r.err != nil {
            return nil, r.err
        }
        m[r.path] = r.sum
    }
    if err := <-errc; err != nil {
        return nil, err
    }
    return m, nil
}

制限された並列処理

parallel.goMD5Allの実装は、各ファイルに対して新しいゴルーチンを開始します。多くの大きなファイルがあるディレクトリでは、これはマシン上で利用可能なメモリよりも多くのメモリを割り当てる可能性があります。

ファイルの読み取りを並列で実行する数を制限することで、これらの割り当てを制限できます。 bounded.goでは、固定数のファイルを読み取るゴルーチンを作成することでこれを行います。パイプラインには3つのステージがあります。ツリーをウォークし、ファイルを読み取り、ダイジェストし、ダイジェストを収集します。

最初のステージであるwalkFilesは、ツリー内の通常ファイルのパスを出力します

func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) {
    paths := make(chan string)
    errc := make(chan error, 1)
    go func() {
        // Close the paths channel after Walk returns.
        defer close(paths)
        // No select needed for this send, since errc is buffered.
        errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
            if err != nil {
                return err
            }
            if !info.Mode().IsRegular() {
                return nil
            }
            select {
            case paths <- path:
            case <-done:
                return errors.New("walk canceled")
            }
            return nil
        })
    }()
    return paths, errc
}

中間ステージは、pathsからファイル名を受信し、チャネルcresultsを送信する固定数のdigesterゴルーチンを開始します

func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {
    for path := range paths {
        data, err := ioutil.ReadFile(path)
        select {
        case c <- result{path, md5.Sum(data), err}:
        case <-done:
            return
        }
    }
}

以前の例とは異なり、digesterは出力チャネルを閉じません。複数のゴルーチンが共有チャネルに送信しているからです。代わりに、MD5Allのコードは、すべてのdigestersが完了したときにチャネルが閉じられるように手配します

    // Start a fixed number of goroutines to read and digest files.
    c := make(chan result)
    var wg sync.WaitGroup
    const numDigesters = 20
    wg.Add(numDigesters)
    for i := 0; i < numDigesters; i++ {
        go func() {
            digester(done, paths, c)
            wg.Done()
        }()
    }
    go func() {
        wg.Wait()
        close(c)
    }()

代わりに、各ダイジェスターが独自の出力チャネルを作成して返すこともできますが、その場合、結果をファンインするためにより多くのゴルーチンが必要になります。

最終ステージは、cからすべてのresultsを受信し、その後errcからのエラーをチェックします。この時点より前に、walkFilesがダウンストリームに値を送信するのをブロックする可能性があるため、このチェックはこれより早く行うことはできません

    m := make(map[string][md5.Size]byte)
    for r := range c {
        if r.err != nil {
            return nil, r.err
        }
        m[r.path] = r.sum
    }
    // Check whether the Walk failed.
    if err := <-errc; err != nil {
        return nil, err
    }
    return m, nil
}

まとめ

この記事では、Goでストリーミングデータパイプラインを構築するためのテクニックを紹介しました。そのようなパイプラインでの障害への対処は、パイプラインの各ステージがダウンストリームに値を送信しようとしてブロックされる可能性があり、ダウンストリームステージが受信データに関心を持たなくなる可能性があるため、困難です。チャネルを閉じることで、パイプラインによって開始されたすべてのゴルーチンに「完了」シグナルをブロードキャストする方法を示し、パイプラインを正しく構築するためのガイドラインを定義しました。

さらに読む

  • Go並行処理パターン (動画) では、Goの並行処理プリミティブの基本と、それらを適用するいくつかの方法を紹介しています。
  • 高度なGo並行処理パターン (動画) では、Goのプリミティブ、特にselectのより複雑な使用法について説明しています。
  • Douglas McIlroyの論文Squinting at Power Seriesは、Goのような並行処理が複雑な計算をいかにエレガントにサポートするかを示しています。

次の記事:Go Gopher
前の記事:FOSDEM 2014でのGoトーク
ブログインデックス