類別 Ractor
Ractor 是 Ruby 的 Actor 模型抽象,提供執行緒安全的平行執行。
Ractor.new
建立新的 Ractor,可以在平行執行。
# The simplest ractor r = Ractor.new {puts "I am in Ractor!"} r.take # wait for it to finish # Here, "I am in Ractor!" is printed
Ractor 彼此不會共用所有物件。這有兩個主要好處:跨 Ractor 時,不會有資料競爭和競爭條件等執行緒安全問題。另一個好處是平行處理。
為了達成這個目標,物件共用在 Ractor 之間受到限制。例如,與執行緒不同,Ractor 無法存取其他 Ractor 中可用的所有物件。即使是通常可透過外部範圍中的變數存取的物件,也禁止跨 Ractor 使用。
a = 1 r = Ractor.new {puts "I am in Ractor! a=#{a}"} # fails immediately with # ArgumentError (can not isolate a Proc because it accesses outer variables (a).)
物件必須明確共用
a = 1 r = Ractor.new(a) { |a1| puts "I am in Ractor! a=#{a1}"}
在 CRuby(預設實作)上,每個 Ractor 都持有 Global Virtual Machine Lock (GVL),因此 Ractor 可以平行執行,而不會彼此鎖定。這與 CRuby 上執行緒的情況不同。
物件不應存取共用狀態,而應透過傳送和接收訊息,將物件傳送至 Ractor 和從 Ractor 傳送出來。
a = 1 r = Ractor.new do a_in_ractor = receive # receive blocks until somebody passes a message puts "I am in Ractor! a=#{a_in_ractor}" end r.send(a) # pass it r.take # Here, "I am in Ractor! a=1" is printed
有兩對方法可用於傳送/接收訊息
-
Ractor#send
和Ractor.receive
,用於傳送者知道接收者(推); -
Ractor.yield
和Ractor#take
,用於接收者知道傳送者(拉);
此外,傳遞給 Ractor.new
的任何引數都會傳遞給區塊,並在那裡提供,就像由 Ractor.receive
接收一樣,而最後的區塊值會傳送到 Ractor 外部,就像由 Ractor.yield
傳送一樣。
經典乒乓球的小示範
server = Ractor.new(name: "server") do puts "Server starts: #{self.inspect}" puts "Server sends: ping" Ractor.yield 'ping' # The server doesn't know the receiver and sends to whoever interested received = Ractor.receive # The server doesn't know the sender and receives from whoever sent puts "Server received: #{received}" end client = Ractor.new(server) do |srv| # The server is sent to the client, and available as srv puts "Client starts: #{self.inspect}" received = srv.take # The client takes a message from the server puts "Client received from " \ "#{srv.inspect}: #{received}" puts "Client sends to " \ "#{srv.inspect}: pong" srv.send 'pong' # The client sends a message to the server end [client, server].each(&:take) # Wait until they both finish
這會輸出類似以下的內容
Server starts: #<Ractor:#2 server test.rb:1 running> Server sends: ping Client starts: #<Ractor:#3 test.rb:8 running> Client received from #<Ractor:#2 server test.rb:1 blocking>: ping Client sends to #<Ractor:#2 server test.rb:1 blocking>: pong Server received: pong
Ractor 透過輸入埠接收訊息,並將訊息傳送至輸出埠。可以使用 Ractor#close_incoming
和 Ractor#close_outgoing
分別停用任一埠。當 Ractor 終止時,其埠會自動關閉。
可分享和不可分享的物件¶ ↑
當物件傳送至 Ractor 並從 Ractor 傳送出來時,了解物件是否可分享或不可分享非常重要。大多數 Ruby 物件都是不可分享的物件。即使是凍結的物件,如果包含(透過其實例變數)未凍結的物件,也可能不可分享。
可分享的物件是可以由多個執行緒使用,而不會影響執行緒安全性,例如數字、true
和 false
。您可以使用 Ractor.shareable?
檢查這一點,而 Ractor.make_shareable
會嘗試將物件設為可分享(如果尚未設為可分享),如果無法執行此操作,則會傳回錯誤訊息。
Ractor.shareable?(1) #=> true -- numbers and other immutable basic values are shareable Ractor.shareable?('foo') #=> false, unless the string is frozen due to # frozen_string_literal: true Ractor.shareable?('foo'.freeze) #=> true Ractor.shareable?([Object.new].freeze) #=> false, inner object is unfrozen ary = ['hello', 'world'] ary.frozen? #=> false ary[0].frozen? #=> false Ractor.make_shareable(ary) ary.frozen? #=> true ary[0].frozen? #=> true ary[1].frozen? #=> true
當可分享的物件傳送(透過 send
或 Ractor.yield
)時,不會對其執行其他處理。它只是可以同時由兩個 Ractor 使用。當傳送不可分享的物件時,可以複製或移動它。第一個是預設值,它會透過深度複製(Object#clone
)其結構中不可分享的部分,完整複製物件。
data = ['foo', 'bar'.freeze] r = Ractor.new do data2 = Ractor.receive puts "In ractor: #{data2.object_id}, #{data2[0].object_id}, #{data2[1].object_id}" end r.send(data) r.take puts "Outside : #{data.object_id}, #{data[0].object_id}, #{data[1].object_id}"
這會輸出類似以下的內容
In ractor: 340, 360, 320 Outside : 380, 400, 320
請注意,Ractor 中陣列和陣列內部未凍結字串的物件 ID 已變更,因為它們是不同的物件。第二個陣列的元素(可分享的凍結字串)是同一個物件。
深度複製物件可能會很慢,有時甚至不可能。或者,可以在傳送期間使用 move: true
。這會將不可分享的物件移動至接收 Ractor,讓傳送 Ractor 無法存取它。
data = ['foo', 'bar'] r = Ractor.new do data_in_ractor = Ractor.receive puts "In ractor: #{data_in_ractor.object_id}, #{data_in_ractor[0].object_id}" end r.send(data, move: true) r.take puts "Outside: moved? #{Ractor::MovedObject === data}" puts "Outside: #{data.inspect}"
這會輸出
In ractor: 100, 120 Outside: moved? true test.rb:9:in `method_missing': can not send any methods to a moved object (Ractor::MovedError)
請注意,即使是 inspect
(以及更基本的函式,例如 __id__
)也無法在已移動的物件上存取。
Class
和 Module
物件可分享,因此類別/模組定義會在 Ractor 之間分享。Ractor 物件也可分享。對可分享物件的所有操作都是執行緒安全的,因此會保留執行緒安全性屬性。我們無法在 Ruby 中定義可變動的可分享物件,但 C 擴充功能可以加入它們。
如果變數的值不可分享,則禁止在其他 Ractor 中存取(取得)可分享物件的實例變數。這可能會發生,因為模組/類別可分享,但它們可以有值不可分享的實例變數。在非主要 Ractor 中,也禁止設定類別/模組的實例變數(即使值可分享)。
class C class << self attr_accessor :tricky end end C.tricky = "unshareable".dup r = Ractor.new(C) do |cls| puts "I see #{cls}" puts "I can't see #{cls.tricky}" cls.tricky = true # doesn't get here, but this would also raise an error end r.take # I see C # can not access instance variables of classes/modules from non-main Ractors (RuntimeError)
如果常數可共用,Ractor 可以存取這些常數。只有主 Ractor 可以存取不可共用的常數。
GOOD = 'good'.freeze BAD = 'bad'.dup r = Ractor.new do puts "GOOD=#{GOOD}" puts "BAD=#{BAD}" end r.take # GOOD=good # can not access non-shareable objects in constant Object::BAD by non-main Ractor. (NameError) # Consider the same C class from above r = Ractor.new do puts "I see #{C}" puts "I can't see #{C.tricky}" end r.take # I see C # can not access instance variables of classes/modules from non-main Ractors (RuntimeError)
另請參閱 註解語法 說明中的 # shareable_constant_value
準則說明。
Ractor 與執行緒¶ ↑
每個 ractor 都有自己的主 Thread
。可以在 ractor 內部建立新的執行緒(在 CRuby 中,它們與此 ractor 的其他執行緒共用 GVL)。
r = Ractor.new do a = 1 Thread.new {puts "Thread in ractor: a=#{a}"}.join end r.take # Here "Thread in ractor: a=1" will be printed
程式碼範例注意事項¶ ↑
在以下範例中,我們有時會使用下列方法來等待目前未封鎖的 ractor 完成(或進行)。
def wait sleep(0.1) end
這 **僅供示範目的**,不應在實際程式碼中使用。大部分時間,take
用於等待 ractor 完成。
參考¶ ↑
請參閱 Ractor 設計文件 以取得更多詳細資料。
公開類別方法
傳回目前正在執行或封鎖(等待)的 Ractor 數目。
Ractor.count #=> 1 r = Ractor.new(name: 'example') { Ractor.yield(1) } Ractor.count #=> 2 (main + example ractor) r.take # wait for Ractor.yield(1) r.take # wait until r will finish Ractor.count #=> 1
# File ruby_3_3_0/ractor.rb, line 302 def self.count __builtin_cexpr! %q{ ULONG2NUM(GET_VM()->ractor.cnt); } end
傳回目前正在執行的 Ractor
。
Ractor.current #=> #<Ractor:#1 running>
# File ruby_3_3_0/ractor.rb, line 288 def self.current __builtin_cexpr! %q{ rb_ractor_self(rb_ec_ractor_ptr(ec)); } end
傳回主 ractor
# File ruby_3_3_0/ractor.rb, line 848 def self.main __builtin_cexpr! %q{ rb_ractor_self(GET_VM()->ractor.main_ractor); } end
使用 args 和區塊建立新的 Ractor。
指定的區塊 (Proc
) 會被隔離(無法存取任何外部變數)。區塊內的 self
會參照目前的 Ractor。
r = Ractor.new { puts "Hi, I am #{self.inspect}" } r.take # Prints "Hi, I am #<Ractor:#2 test.rb:1 running>"
傳遞的任何 args
都會透過與透過 send
/Ractor.receive 傳送的物件相同的規則傳播到區塊參數。如果 args
中的參數不可共用,它將會被複製(透過深度複製,這可能會很低效)。
arg = [1, 2, 3] puts "Passing: #{arg} (##{arg.object_id})" r = Ractor.new(arg) {|received_arg| puts "Received: #{received_arg} (##{received_arg.object_id})" } r.take # Prints: # Passing: [1, 2, 3] (#280) # Received: [1, 2, 3] (#300)
Ractor 的 name
可設定為除錯目的
r = Ractor.new(name: 'my ractor') {}; r.take p r #=> #<Ractor:#3 my ractor test.rb:1 terminated>
# File ruby_3_3_0/ractor.rb, line 273 def self.new(*args, name: nil, &block) b = block # TODO: builtin bug raise ArgumentError, "must be called with a block" unless block if __builtin_cexpr!("RBOOL(ruby_single_main_ractor)") warn("Ractor is experimental, and the behavior may change in future versions of Ruby! " \ "Also there are many implementation issues.", uplevel: 0, category: :experimental) end loc = caller_locations(1, 1).first loc = "#{loc.path}:#{loc.lineno}" __builtin_ractor_create(loc, name, args, b) end
從目前 ractor 的輸入埠接收訊息(由另一個 ractor 的 send
傳送至該埠)。
r = Ractor.new do v1 = Ractor.receive puts "Received: #{v1}" end r.send('message1') r.take # Here will be printed: "Received: message1"
或者,可以使用私有執行個體方法 receive
r = Ractor.new do v1 = receive puts "Received: #{v1}" end r.send('message1') r.take # This prints: "Received: message1"
如果佇列為空,此方法會區塊。
r = Ractor.new do puts "Before first receive" v1 = Ractor.receive puts "Received: #{v1}" v2 = Ractor.receive puts "Received: #{v2}" end wait puts "Still not received" r.send('message1') wait puts "Still received only one" r.send('message2') r.take
輸出
Before first receive Still not received Received: message1 Still received only one Received: message2
如果對 ractor 呼叫 close_incoming
,且輸入佇列中沒有更多訊息,此方法會引發 Ractor::ClosedError
Ractor.new do close_incoming receive end wait # in `receive': The incoming port is already closed => #<Ractor:#2 test.rb:1 running> (Ractor::ClosedError)
# File ruby_3_3_0/ractor.rb, line 430 def self.receive __builtin_cexpr! %q{ ractor_receive(ec, rb_ec_ractor_ptr(ec)) } end
僅接收特定訊息。
Ractor.receive_if
可以取代 Ractor.receive
,並在區塊中提供樣式(或任何篩選器),您可以選擇接受 ractor 輸入佇列中可用的訊息。
r = Ractor.new do p Ractor.receive_if{|msg| msg.match?(/foo/)} #=> "foo3" p Ractor.receive_if{|msg| msg.match?(/bar/)} #=> "bar1" p Ractor.receive_if{|msg| msg.match?(/baz/)} #=> "baz2" end r << "bar1" r << "baz2" r << "foo3" r.take
這會輸出
foo3 bar1 baz2
如果區塊傳回真值,訊息會從輸入佇列中移除並傳回。否則,訊息會保留在輸入佇列中,且下一個訊息會由指定的區塊檢查。
如果輸入佇列中沒有訊息,此方法會區塊,直到有新訊息到達。
如果區塊被 break/return/exception/throw 跳脫,訊息會從輸入佇列中移除,就像傳回真值一樣。
r = Ractor.new do val = Ractor.receive_if{|msg| msg.is_a?(Array)} puts "Received successfully: #{val}" end r.send(1) r.send('test') wait puts "2 non-matching sent, nothing received" r.send([1, 2, 3]) wait
列印
2 non-matching sent, nothing received Received successfully: [1, 2, 3]
請注意,您無法在指定的區塊中遞迴呼叫 receive/receive_if。您不應在區塊中執行訊息過濾以外的任何工作。
Ractor.current << true Ractor.receive_if{|msg| Ractor.receive} #=> `receive': can not call receive/receive_if recursively (Ractor::Error)
# File ruby_3_3_0/ractor.rb, line 509 def self.receive_if &b Primitive.ractor_receive_if b end
等待任何 ractor 在其輸出埠中有內容,從此 ractor 讀取,然後傳回該 ractor 和收到的物件。
r1 = Ractor.new {Ractor.yield 'from 1'} r2 = Ractor.new {Ractor.yield 'from 2'} r, obj = Ractor.select(r1, r2) puts "received #{obj.inspect} from #{r.inspect}" # Prints: received "from 1" from #<Ractor:#2 test.rb:1 running> # But could just as well print "from r2" here, either prints could be first.
如果指定的反應器之一是目前的反應器,且已選取,r
將包含 :receive
符號,而不是反應器物件。
r1 = Ractor.new(Ractor.current) do |main| main.send 'to main' Ractor.yield 'from 1' end r2 = Ractor.new do Ractor.yield 'from 2' end r, obj = Ractor.select(r1, r2, Ractor.current) puts "received #{obj.inspect} from #{r.inspect}" # Could print: received "to main" from :receive
如果提供 yield_value
,如果另一個反應器呼叫 take
,則可能會產生該值。在此情況下,會傳回配對 [:yield, nil]
r1 = Ractor.new(Ractor.current) do |main| puts "Received from main: #{main.take}" end puts "Trying to select" r, obj = Ractor.select(r1, Ractor.current, yield_value: 123) wait puts "Received #{obj.inspect} from #{r.inspect}"
這會列印
Trying to select Received from main: 123 Received nil from :yield
move
布林旗標定義是否會複製 (預設) 或移動產生的值。
# File ruby_3_3_0/ractor.rb, line 358 def self.select(*ractors, yield_value: yield_unspecified = true, move: false) raise ArgumentError, 'specify at least one ractor or `yield_value`' if yield_unspecified && ractors.empty? if ractors.delete Ractor.current do_receive = true else do_receive = false end __builtin_ractor_select_internal ractors, do_receive, !yield_unspecified, yield_value, move end
傳送訊息至目前的反應器傳出埠,以供 take
接受。
r = Ractor.new {Ractor.yield 'Hello from ractor'} puts r.take # Prints: "Hello from ractor"
此方法會封鎖,且僅在有人使用傳送的訊息時才會傳回。
r = Ractor.new do Ractor.yield 'Hello from ractor' puts "Ractor: after yield" end wait puts "Still not taken" puts r.take
這會列印
Still not taken Hello from ractor Ractor: after yield
如果傳出埠已使用 close_outgoing
關閉,此方法會引發
r = Ractor.new do close_outgoing Ractor.yield 'Hello from ractor' end wait # `yield': The outgoing-port is already closed (Ractor::ClosedError)
move
參數的意義與 send
相同。
# File ruby_3_3_0/ractor.rb, line 643 def self.yield(obj, move: false) __builtin_cexpr! %q{ ractor_yield(ec, rb_ec_ractor_ptr(ec), obj, move) } end
公開實例方法
從反應器本機儲存空間取得值
# File ruby_3_3_0/ractor.rb, line 838 def [](sym) Primitive.ractor_local_value(sym) end
在反應器本機儲存空間設定值
# File ruby_3_3_0/ractor.rb, line 843 def []=(sym, val) Primitive.ractor_local_value_set(sym, val) end
關閉傳入埠,並傳回是否已關閉。在反應器中進一步嘗試 Ractor.receive
,以及傳送至反應器的 send
,將會因 Ractor::ClosedError
而失敗。
r = Ractor.new {sleep(500)} r.close_incoming #=> false r.close_incoming #=> true r.send('test') # Ractor::ClosedError (The incoming-port is already closed)
# File ruby_3_3_0/ractor.rb, line 749 def close_incoming __builtin_cexpr! %q{ ractor_close_incoming(ec, RACTOR_PTR(self)); } end
關閉傳出埠,並傳回是否已關閉。在反應器中進一步嘗試 Ractor.yield
,以及從反應器 take
,將會因 Ractor::ClosedError
而失敗。
r = Ractor.new {sleep(500)} r.close_outgoing #=> false r.close_outgoing #=> true r.take # Ractor::ClosedError (The outgoing-port is already closed)
# File ruby_3_3_0/ractor.rb, line 767 def close_outgoing __builtin_cexpr! %q{ ractor_close_outgoing(ec, RACTOR_PTR(self)); } end
# File ruby_3_3_0/ractor.rb, line 716 def inspect loc = __builtin_cexpr! %q{ RACTOR_PTR(self)->loc } name = __builtin_cexpr! %q{ RACTOR_PTR(self)->name } id = __builtin_cexpr! %q{ UINT2NUM(rb_ractor_id(RACTOR_PTR(self))) } status = __builtin_cexpr! %q{ rb_str_new2(ractor_status_str(RACTOR_PTR(self)->status_)) } "#<Ractor:##{id}#{name ? ' '+name : ''}#{loc ? " " + loc : ''} #{status}>" end
在 Ractor.new
中設定的名稱,或為 nil
。
# File ruby_3_3_0/ractor.rb, line 729 def name __builtin_cexpr! %q{RACTOR_PTR(self)->name} end
傳送訊息至 Ractor 的輸入佇列,以供 Ractor.receive
接受。
r = Ractor.new do value = Ractor.receive puts "Received #{value}" end r.send 'message' # Prints: "Received: message"
此方法為非封鎖式(即使 ractor 尚未準備好接收任何內容,也會立即傳回)
r = Ractor.new {sleep(5)} r.send('test') puts "Sent successfully" # Prints: "Sent successfully" immediately
嘗試傳送至已完成執行的 ractor 會引發 Ractor::ClosedError
。
r = Ractor.new {} r.take p r # "#<Ractor:#6 (irb):23 terminated>" r.send('test') # Ractor::ClosedError (The incoming-port is already closed)
如果對 ractor 呼叫 close_incoming
,此方法也會引發 Ractor::ClosedError
。
r = Ractor.new do sleep(500) receive end r.close_incoming r.send('test') # Ractor::ClosedError (The incoming-port is already closed) # The error is raised immediately, not when the ractor tries to receive
如果 obj
為不可共用,預設會透過深度複製將其複製至接收 ractor 中。如果傳遞 move: true
,物件會移動至接收 ractor 中,且傳送者將無法存取。
r = Ractor.new {puts "Received: #{receive}"} msg = 'message' r.send(msg, move: true) r.take p msg
這會列印
Received: message in `p': undefined method `inspect' for #<Ractor::MovedObject:0x000055c99b9b69b8>
物件及其部分的所有參照都會對傳送者失效。
r = Ractor.new {puts "Received: #{receive}"} s = 'message' ary = [s] copy = ary.dup r.send(ary, move: true) s.inspect # Ractor::MovedError (can not send any methods to a moved object) ary.class # Ractor::MovedError (can not send any methods to a moved object) copy.class # => Array, it is different object copy[0].inspect # Ractor::MovedError (can not send any methods to a moved object) # ...but its item was still a reference to `s`, which was moved
如果物件可共用,move: true
對其沒有影響
r = Ractor.new {puts "Received: #{receive}"} s = 'message'.freeze r.send(s, move: true) s.inspect #=> "message", still available
# File ruby_3_3_0/ractor.rb, line 599 def send(obj, move: false) __builtin_cexpr! %q{ ractor_send(ec, RACTOR_PTR(self), obj, move) } end
從 ractor 的輸出埠取得訊息,該訊息是由 Ractor.yield
或在 ractor 終止時放入的。
r = Ractor.new do Ractor.yield 'explicit yield' 'last value' end puts r.take #=> 'explicit yield' puts r.take #=> 'last value' puts r.take # Ractor::ClosedError (The outgoing-port is already closed)
最後一個值也會傳送到輸出埠,表示 take
可用作 Thread#join
的類比(「等到 ractor 完成」)。但是,如果有人已使用該訊息,它會引發錯誤。
如果輸出埠已使用 close_outgoing
關閉,此方法會引發 Ractor::ClosedError
。
r = Ractor.new do sleep(500) Ractor.yield 'Hello from ractor' end r.close_outgoing r.take # Ractor::ClosedError (The outgoing-port is already closed) # The error would be raised immediately, not when ractor will try to receive
如果在 Ractor
中引發未捕捉的例外狀況,它會透過 take 作為 Ractor::RemoteError
進行傳播。
r = Ractor.new {raise "Something weird happened"} begin r.take rescue => e p e # => #<Ractor::RemoteError: thrown by remote Ractor.> p e.ractor == r # => true p e.cause # => #<RuntimeError: Something weird happened> end
Ractor::ClosedError
是 StopIteration
的子類別,因此 ractor 的終止會中斷接收此訊息且未傳播錯誤的任何迴圈
r = Ractor.new do 3.times {|i| Ractor.yield "message #{i}"} "finishing" end loop {puts "Received: " + r.take} puts "Continue successfully"
這會列印
Received: message 0 Received: message 1 Received: message 2 Received: finishing Continue successfully
# File ruby_3_3_0/ractor.rb, line 710 def take __builtin_cexpr! %q{ ractor_take(ec, RACTOR_PTR(self)) } end
私人執行個體方法
與 Ractor.receive
相同
# File ruby_3_3_0/ractor.rb, line 441 def receive __builtin_cexpr! %q{ ractor_receive(ec, rb_ec_ractor_ptr(ec)) } end
與 Ractor.receive_if
相同
# File ruby_3_3_0/ractor.rb, line 514 def receive_if &b Primitive.ractor_receive_if b end