Ractor - Ruby 類似 Actor 的並行抽象

Ractor 被設計為提供 Ruby 的平行執行功能,而無需擔心執行緒安全性。

摘要

解釋器處理序中的多個 Ractor

您可以建立多個 Ractor,它們會平行執行。

多個 ractor 之間的有限共用

與執行緒不同,Ractor 並非共用所有內容。

Ractor 間的兩種通訊類型

Ractor 透過 Ractor 間的訊息交換進行通訊並同步執行。有兩種訊息交換協定:推式(訊息傳遞)和拉式。

傳送訊息的複製和移動語意

若要將不可共用物件作為訊息傳送,會複製或移動物件。

執行緒安全

Ractor 有助於撰寫執行緒安全的並行程式,但我們可以使用 Ractor 建立執行緒不安全的程式。

沒有 Ractor,我們需要追蹤所有狀態變異才能偵錯執行緒安全問題。有了 Ractor,您可以專注於與 Ractors 共用的可疑程式碼。

建立和終止

Ractor.new

# Ractor.new with a block creates new Ractor
r = Ractor.new do
  # This block will be run in parallel with other ractors
end

# You can name a Ractor with `name:` argument.
r = Ractor.new name: 'test-name' do
end

# and Ractor#name returns its name.
r.name #=> 'test-name'

給定區塊隔離

Ractor 會在給定的區塊中執行給定的 expr。給定的區塊會透過 Proc#isolate 方法與外部範圍隔離(Ruby 使用者目前無法使用)。為了防止在 ractors 之間共用不可共用的物件,區塊外部變數、self 和其他資訊會被隔離。

Proc#isolate 會在 Ractor 建立時(呼叫 Ractor.new 時)呼叫。如果給定的 Proc 物件無法因為外部變數等原因而隔離,就會產生錯誤。

begin
  a = true
  r = Ractor.new do
    a #=> ArgumentError because this block accesses `a`.
  end
  r.take # see later
rescue ArgumentError
end
r = Ractor.new do
  p self.class #=> Ractor
  self.object_id
end
r.take == self.object_id #=> false

傳遞給 Ractor.new() 的引數會變成給定區塊的區塊參數。然而,詮釋器不會傳遞參數物件參考,而是將它們作為訊息傳送(詳情請見下方)。

r = Ractor.new 'ok' do |msg|
  msg #=> 'ok'
end
r.take #=> 'ok'
# almost similar to the last example
r = Ractor.new do
  msg = Ractor.receive
  msg
end
r.send 'ok'
r.take #=> 'ok'

給定區塊的執行結果

給定區塊的回傳值會變成傳出訊息(詳情請見下方)。

r = Ractor.new do
  'ok'
end
r.take #=> `ok`
# almost similar to the last example
r = Ractor.new do
  Ractor.yield 'ok'
end
r.take #=> 'ok'

給定區塊中的錯誤會傳播到傳出訊息的接收者。

r = Ractor.new do
  raise 'ok' # exception will be transferred to the receiver
end

begin
  r.take
rescue Ractor::RemoteError => e
  e.cause.class   #=> RuntimeError
  e.cause.message #=> 'ok'
  e.ractor        #=> r
end

Ractor 之間的通訊

Ractor 之間的通訊是透過傳送和接收訊息來達成。有兩種方式可以互相通訊。

使用者可以使用 (1) 來控制程式執行時間,但不要使用 (2) 來控制(僅作為臨界區管理)。

對於訊息傳送和接收,有兩種 API 類型:推送型和拉取型。

傳送/接收埠

每個 Ractor 都有一個輸入埠輸出埠。輸入埠連接到無限大小的輸入佇列。

Ractor r
                 +-------------------------------------------+
                 | incoming                         outgoing |
                 | port                                 port |
   r.send(obj) ->*->[incoming queue]     Ractor.yield(obj) ->*-> r.take
                 |                |                          |
                 |                v                          |
                 |           Ractor.receive                  |
                 +-------------------------------------------+


Connection example: r2.send obj on r1、Ractor.receive on r2
  +----+     +----+
  * r1 |---->* r2 *
  +----+     +----+


Connection example: Ractor.yield(obj) on r1, r1.take on r2
  +----+     +----+
  * r1 *---->- r2 *
  +----+     +----+

Connection example: Ractor.yield(obj) on r1 and r2,
                    and waiting for both simultaneously by Ractor.select(r1, r2)

  +----+
  * r1 *------+
  +----+      |
              +----> Ractor.select(r1, r2)
  +----+      |
  * r2 *------|
  +----+
r = Ractor.new do
  msg = Ractor.receive # Receive from r's incoming queue
  msg # send back msg as block return value
end
r.send 'ok' # Send 'ok' to r's incoming port -> incoming queue
r.take      # Receive from r's outgoing port

最後一個範例顯示下列 ractor 網路。

  +------+        +---+
  * main |------> * r *---+
  +------+        +---+   |
      ^                   |
      +-------------------+

而且,透過使用 Ractor.new 的引數,可以簡化此程式碼。

# Actual argument 'ok' for `Ractor.new()` will be sent to created Ractor.
r = Ractor.new 'ok' do |msg|
  # Values for formal parameters will be received from incoming queue.
  # Similar to: msg = Ractor.receive

  msg # Return value of the given block will be sent via outgoing port
end

# receive from the r's outgoing port.
r.take #=> `ok`

Ractor.new 的區塊回傳值

如前所述,Ractor.new 的回傳值(Ractor.new{ expr }expr 的評估值)可以透過 Ractor#take 取得。

Ractor.new{ 42 }.take #=> 42

當區塊回傳值可用時,Ractor 已死亡,因此除了已取得的 Ractor 之外,沒有任何 ractor 可以觸及回傳值,因此任何值都可以透過此通訊路徑傳送,而不會有任何修改。

r = Ractor.new do
  a = "hello"
  binding
end

r.take.eval("p a") #=> "hello" (other communication path can not send a Binding object directly)

使用 Ractor.select 等待多個 Ractor

您可以使用 Ractor.select(*ractors) 等待多個 Ractor 的 yieldRactor.select() 的回傳值為 [r, msg],其中 r 是執行 yieldRactor,而 msg 是執行 yield 的訊息。

等待單一 ractor(與 Ractor.take 相同)

r1 = Ractor.new{'r1'}

r, obj = Ractor.select(r1)
r == r1 and obj == 'r1' #=> true

等待兩個 ractor

r1 = Ractor.new{'r1'}
r2 = Ractor.new{'r2'}
rs = [r1, r2]
as = []

# Wait for r1 or r2's Ractor.yield
r, obj = Ractor.select(*rs)
rs.delete(r)
as << obj

# Second try (rs only contain not-closed ractors)
r, obj = Ractor.select(*rs)
rs.delete(r)
as << obj
as.sort == ['r1', 'r2'] #=> true

Complex 範例

pipe = Ractor.new do
  loop do
    Ractor.yield Ractor.receive
  end
end

RN = 10
rs = RN.times.map{|i|
  Ractor.new pipe, i do |pipe, i|
    msg = pipe.take
    msg # ping-pong
  end
}
RN.times{|i|
  pipe << i
}
RN.times.map{
  r, n = Ractor.select(*rs)
  rs.delete r
  n
}.sort #=> [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

多個 Ractor 可以傳送到一個 Ractor

# Create 10 ractors and they send objects to pipe ractor.
# pipe ractor yield received objects

pipe = Ractor.new do
  loop do
    Ractor.yield Ractor.receive
  end
end

RN = 10
rs = RN.times.map{|i|
  Ractor.new pipe, i do |pipe, i|
    pipe << i
  end
}

RN.times.map{
  pipe.take
}.sort #=> [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

待辦事項:目前的 Ractor.select() 有與 select(2) 相同的問題,因此應該改善此介面。

待辦事項:go 語言的 select 語法使用輪詢技術來進行公平排程。現在 Ractor.select() 尚未使用此技術。

關閉 Ractor 的連接埠

範例(嘗試從已關閉的 Ractor 接收)

r = Ractor.new do
  'finish'
end
r.take # success (will return 'finish')
begin
  o = r.take # try to take from closed Ractor
rescue Ractor::ClosedError
  'ok'
else
  "ng: #{o}"
end

範例(嘗試傳送至已關閉(已終止)的 Ractor

r = Ractor.new do
end

r.take # wait terminate

begin
  r.send(1)
rescue Ractor::ClosedError
  'ok'
else
  'ng'
end

當多個 Ractor 正在等待 Ractor.yield() 時,Ractor#close_outgoing 會透過引發例外狀況(ClosedError)來取消所有封鎖。

透過複製傳送訊息

如果 obj 是不可共用的物件,Ractor#send(obj)Ractor.yield(obj) 會深入複製 obj

obj = 'str'.dup
r = Ractor.new obj do |msg|
  # return received msg's object_id
  msg.object_id
end

obj.object_id == r.take #=> false

有些物件不支援複製值,並會引發例外狀況。

obj = Thread.new{}
begin
  Ractor.new obj do |msg|
    msg
  end
rescue TypeError => e
  e.message #=> #<TypeError: allocator undefined for Thread>
else
  'ng' # unreachable here
end

透過移動傳送訊息

Ractor#send(obj, move: true)Ractor.yield(obj, move: true) 會將 obj 移至目的地 Ractor。如果來源 Ractor 觸及已移動的物件(例如,呼叫 obj.foo() 等方法),這將會產生錯誤。

# move with Ractor#send
r = Ractor.new do
  obj = Ractor.receive
  obj << ' world'
end

str = 'hello'
r.send str, move: true
modified = r.take #=> 'hello world'

# str is moved, and accessing str from this Ractor is prohibited

begin
  # Error because it touches moved str.
  str << ' exception' # raise Ractor::MovedError
rescue Ractor::MovedError
  modified #=> 'hello world'
else
  raise 'unreachable'
end
# move with Ractor.yield
r = Ractor.new do
  obj = 'hello'
  Ractor.yield obj, move: true
  obj << 'world'  # raise Ractor::MovedError
end

str = r.take
begin
  r.take
rescue Ractor::RemoteError
  p str #=> "hello"
end

有些物件不支援移動,並會引發例外狀況。

r = Ractor.new do
  Ractor.receive
end

r.send(Thread.new{}, move: true) #=> allocator undefined for Thread (TypeError)

為了禁止存取已移動的物件,會使用類別替換技術來實作。

可共用物件

下列物件可共用。

實作:現在可共用的物件(RVALUE)具有 FL_SHAREABLE 旗標。此旗標可以延遲新增。

為了建立可共用的物件,提供 Ractor.make_shareable(obj) 方法。在此情況下,嘗試透過凍結 obj 和遞迴可遍歷的物件來建立可共用的物件。此方法接受 copy: 關鍵字(預設值為 false)。Ractor.make_shareable(obj, copy: true) 嘗試建立 obj 的深度拷貝,並建立可共用的拷貝物件。

語言變更以在 Ractor 之間隔離不可共用的物件

為了在 Ractor 之間隔離不可共用的物件,我們在多 Ractor Ruby 程式中引入了其他語言語意。

請注意,如果不使用 Ractor,則不需要這些其他語意(與 Ruby 2 完全相容)。

全域變數

只有主 Ractor(在解釋器啟動時建立的 Ractor)可以存取全域變數。

$gv = 1
r = Ractor.new do
  $gv
end

begin
  r.take
rescue Ractor::RemoteError => e
  e.cause.message #=> 'can not access global variables from non-main Ractors'
end

請注意,某些特殊全域變數是 ractor 本地的,例如 $stdin$stdout$stderr。有關更多詳細資訊,請參閱 [Bug #17268]

可共用物件的實例變數

如果參照值是可共用物件,則可以從非主 Ractor 取得類別/模組的實例變數。

class C
  @iv = 1
end

p Ractor.new do
  class C
     @iv
  end
end.take #=> 1

否則,只有主 Ractor 可以存取可共用物件的實例變數。

class C
  @iv = [] # unshareable object
end

Ractor.new do
  class C
    begin
      p @iv
    rescue Ractor::IsolationError
      p $!.message
      #=> "can not get unshareable values from instance variables of classes/modules from non-main Ractors"
    end

    begin
      @iv = 42
    rescue Ractor::IsolationError
      p $!.message
      #=> "can not set instance variables of classes/modules by non-main Ractors"
    end
  end
end.take
shared = Ractor.new{}
shared.instance_variable_set(:@iv, 'str')

r = Ractor.new shared do |shared|
  p shared.instance_variable_get(:@iv)
end

begin
  r.take
rescue Ractor::RemoteError => e
  e.cause.message #=> can not access instance variables of shareable objects from non-main Ractors (Ractor::IsolationError)
end

請注意,類別/模組物件的實例變數在 Ractor 上也遭到禁止。

Class 變數

只有主 Ractor 可以存取類別變數。

class C
  @@cv = 'str'
end

r = Ractor.new do
  class C
    p @@cv
  end
end


begin
  r.take
rescue => e
  e.class #=> Ractor::IsolationError
end

常數

只有主 Ractor 可以讀取參照不可共用物件的常數。

class C
  CONST = 'str'
end
r = Ractor.new do
  C::CONST
end
begin
  r.take
rescue => e
  e.class #=> Ractor::IsolationError
end

只有主 Ractor 可以定義參照不可共用物件的常數。

class C
end
r = Ractor.new do
  C::CONST = 'str'
end
begin
  r.take
rescue => e
  e.class #=> Ractor::IsolationError
end

若要建立支援多個反應器的函式庫,常數應僅參照可共用的物件。

TABLE = {a: 'ko1', b: 'ko2', c: 'ko3'}

在此情況下,TABLE 參照不可共用的 Hash 物件。因此,其他反應器無法參照 TABLE 常數。若要使其可共用,我們可以使用 Ractor.make_shareable(),如下所示。

TABLE = Ractor.make_shareable( {a: 'ko1', b: 'ko2', c: 'ko3'} )

為了簡化,Ruby 3.0 引入了新的 shareable_constant_value 指令。

# shareable_constant_value: literal

TABLE = {a: 'ko1', b: 'ko2', c: 'ko3'}
#=> Same as: TABLE = Ractor.make_shareable( {a: 'ko1', b: 'ko2', c: 'ko3'} )

shareable_constant_value 指令接受下列模式(說明使用範例:CONST = expr

除了 none 模式(預設)外,保證指定的常數僅參照可共用的物件。

請參閱 doc/syntax/comments.rdoc 以取得更多詳細資訊。

實作注意事項

範例

Actor 模型中的傳統 Ring 範例

RN = 1_000
CR = Ractor.current

r = Ractor.new do
  p Ractor.receive
  CR << :fin
end

RN.times{
  r = Ractor.new r do |next_r|
    next_r << Ractor.receive
  end
}

p :setup_ok
r << 1
p Ractor.receive

Fork-join

def fib n
  if n < 2
    1
  else
    fib(n-2) + fib(n-1)
  end
end

RN = 10
rs = (1..RN).map do |i|
  Ractor.new i do |i|
    [i, fib(i)]
  end
end

until rs.empty?
  r, v = Ractor.select(*rs)
  rs.delete r
  p answer: v
end

Worker 池

require 'prime'

pipe = Ractor.new do
  loop do
    Ractor.yield Ractor.receive
  end
end

N = 1000
RN = 10
workers = (1..RN).map do
  Ractor.new pipe do |pipe|
    while n = pipe.take
      Ractor.yield [n, n.prime?]
    end
  end
end

(1..N).each{|i|
  pipe << i
}

pp (1..N).map{
  _r, (n, b) = Ractor.select(*workers)
  [n, b]
}.sort_by{|(n, b)| n}

管線

# pipeline with yield/take
r1 = Ractor.new do
  'r1'
end

r2 = Ractor.new r1 do |r1|
  r1.take + 'r2'
end

r3 = Ractor.new r2 do |r2|
  r2.take + 'r3'
end

p r3.take #=> 'r1r2r3'
# pipeline with send/receive

r3 = Ractor.new Ractor.current do |cr|
  cr.send Ractor.receive + 'r3'
end

r2 = Ractor.new r3 do |r3|
  r3.send Ractor.receive + 'r2'
end

r1 = Ractor.new r2 do |r2|
  r2.send Ractor.receive + 'r1'
end

r1 << 'r0'
p Ractor.receive #=> "r0r1r2r3"

監督

# ring example again

r = Ractor.current
(1..10).map{|i|
  r = Ractor.new r, i do |r, i|
    r.send Ractor.receive + "r#{i}"
  end
}

r.send "r0"
p Ractor.receive #=> "r0r10r9r8r7r6r5r4r3r2r1"
# ring example with an error

r = Ractor.current
rs = (1..10).map{|i|
  r = Ractor.new r, i do |r, i|
    loop do
      msg = Ractor.receive
      raise if /e/ =~ msg
      r.send msg + "r#{i}"
    end
  end
}

r.send "r0"
p Ractor.receive #=> "r0r10r9r8r7r6r5r4r3r2r1"
r.send "r0"
p Ractor.select(*rs, Ractor.current) #=> [:receive, "r0r10r9r8r7r6r5r4r3r2r1"]
r.send "e0"
p Ractor.select(*rs, Ractor.current)
#=>
#<Thread:0x000056262de28bd8 run> terminated with exception (report_on_exception is true):
Traceback (most recent call last):
        2: from /home/ko1/src/ruby/trunk/test.rb7:in `block (2 levels) in <main>'
        1: from /home/ko1/src/ruby/trunk/test.rb:7:in `loop'
/home/ko1/src/ruby/trunk/test.rb:9:in `block (3 levels) in <main>': unhandled exception
Traceback (most recent call last):
        2: from /home/ko1/src/ruby/trunk/test.rb7:in `block (2 levels) in <main>'
        1: from /home/ko1/src/ruby/trunk/test.rb:7:in `loop'
/home/ko1/src/ruby/trunk/test.rb:9:in `block (3 levels) in <main>': unhandled exception
        1: from /home/ko1/src/ruby/trunk/test.rb21:in `<main>'
<internal:ractor>:69:in `select': thrown by remote Ractor. (Ractor::RemoteError)
# resend non-error message

r = Ractor.current
rs = (1..10).map{|i|
  r = Ractor.new r, i do |r, i|
    loop do
      msg = Ractor.receive
      raise if /e/ =~ msg
      r.send msg + "r#{i}"
    end
  end
}

r.send "r0"
p Ractor.receive #=> "r0r10r9r8r7r6r5r4r3r2r1"
r.send "r0"
p Ractor.select(*rs, Ractor.current)
[:receive, "r0r10r9r8r7r6r5r4r3r2r1"]
msg = 'e0'
begin
  r.send msg
  p Ractor.select(*rs, Ractor.current)
rescue Ractor::RemoteError
  msg = 'r0'
  retry
end

#=> <internal:ractor>:100:in `send': The incoming-port is already closed (Ractor::ClosedError)
# because r == r[-1] is terminated.
# ring example with supervisor and re-start

def make_ractor r, i
  Ractor.new r, i do |r, i|
    loop do
      msg = Ractor.receive
      raise if /e/ =~ msg
      r.send msg + "r#{i}"
    end
  end
end

r = Ractor.current
rs = (1..10).map{|i|
  r = make_ractor(r, i)
}

msg = 'e0' # error causing message
begin
  r.send msg
  p Ractor.select(*rs, Ractor.current)
rescue Ractor::RemoteError
  r = rs[-1] = make_ractor(rs[-2], rs.size-1)
  msg = 'x0'
  retry
end

#=> [:receive, "x0r9r9r8r7r6r5r4r3r2r1"]