Ractorに入門してみた - Consumers/Producer はこんな感じ?編 -
以下での背景と失敗を踏まえて、元々やりたいことを書いてみた、という内容の記事になります。
やりたい事
- ある処理をするためのデータが多数ある
- それを並列に処理したい。
- 並列数は固定でOK。
- 結果は随時処理したいが、処理つの都合で1カ所でやりたい
みたいな感じです。
の Worker pool
のサンプルだと
(1..N).each{|i| pipe << i }
で、全てのデータを渡してから、最後に結果をごそっと受け取るという感じで微妙にニーズと一致しませんでした。(結果を随時受け取っていきたい)
実装
そこで、実際のデータとか実処理の内容はのぞいて、並列処理部分はこんな感じ?というのを書いてみました。
def main() # 同時実行数 c = 2 producer = Ractor.new Ractor.current, c do |parent, c| puts "start producer" get_data.each do |d| Ractor.yield Ractor.make_shareable(d, copy: true) # sleep 1 end # consumer に終了通知 c.times do Ractor.yield :term end parent.send :producer_finished end consumers = (1..c).map do |i| Ractor.new producer, i do |producer, i| puts "start consumer #{i}" loop do d = producer.take break if d == :term puts "consumer_#{i}: #{d['id']}" Ractor.yield d['value'] end # main Ractor への終了報告 Ractor.yield :consumer_finished end end # consumer からの結果受け取り&終了待ち until consumers.empty? r, obj = Ractor.select(*consumers) if obj == :consumer_finished consumers.delete r next end puts "message: #{obj}" end puts "wait producer" Ractor.receive end def get_data [ { 'id' => 1, 'value' => 'aaa'}, { 'id' => 2, 'value' => 'bbb'}, { 'id' => 3, 'value' => 'ccc'}, { 'id' => 4, 'value' => 'ddd'}, ] end main
一応、これで期待通りっぽく動いているのだけど、果たして。。。
詳しい人に教えて欲しい〜。