kikumotoのメモ帳

インフラ・ミドル周りを中心に、興味をもったことを適当な感じで。twitter : @takakiku

Ractorに入門してみた - Consumers/Producer はこんな感じ?編 -

以下での背景と失敗を踏まえて、元々やりたいことを書いてみた、という内容の記事になります。

kikumoto.hatenablog.com

やりたい事

  • ある処理をするためのデータが多数ある
  • それを並列に処理したい。
  • 並列数は固定でOK。
  • 結果は随時処理したいが、処理つの都合で1カ所でやりたい

みたいな感じです。

github.com

Worker pool のサンプルだと

(1..N).each{|i|
  pipe << i
}

で、全てのデータを渡してから、最後に結果をごそっと受け取るという感じで微妙にニーズと一致しませんでした。(結果を随時受け取っていきたい)

実装

そこで、実際のデータとか実処理の内容はのぞいて、並列処理部分はこんな感じ?というのを書いてみました。

def main()
  # 同時実行数
  c = 2

  producer = Ractor.new Ractor.current, c do |parent, c|
    puts "start producer"

    get_data.each do |d|
      Ractor.yield Ractor.make_shareable(d, copy: true)
      # sleep 1
    end

    # consumer に終了通知
    c.times do
      Ractor.yield :term
    end

    parent.send :producer_finished
  end

  consumers = (1..c).map do |i|
    Ractor.new producer, i do |producer, i|
      puts "start consumer #{i}"

      loop do
        d = producer.take
        break if d == :term

        puts "consumer_#{i}: #{d['id']}"
        Ractor.yield d['value']
      end

      # main Ractor への終了報告
      Ractor.yield :consumer_finished
    end
  end

  # consumer からの結果受け取り&終了待ち
  until consumers.empty?
    r, obj = Ractor.select(*consumers)
    if obj == :consumer_finished
      consumers.delete r
      next
    end
    puts "message: #{obj}"
  end

  puts "wait producer"
  Ractor.receive
end

def get_data
  [
    { 'id' => 1, 'value' => 'aaa'},
    { 'id' => 2, 'value' => 'bbb'},
    { 'id' => 3, 'value' => 'ccc'},
    { 'id' => 4, 'value' => 'ddd'},
  ]
end

main

一応、これで期待通りっぽく動いているのだけど、果たして。。。
詳しい人に教えて欲しい〜。