類別 Fiber::Scheduler
這不是現有的類別,而是 Scheduler
物件應遵守的介面文件,才能用作 Fiber.scheduler
的引數,並處理非封鎖纖維。請參閱 Fiber
類別文件中的「非封鎖纖維」區段,以了解一些概念的說明。
預期排程器的行為和用法如下
-
當非封鎖
Fiber
中的執行到達某些封鎖操作(例如睡眠、等待處理程序或未準備好的 I/O)時,它會呼叫排程器的一些掛鉤方法,如下所列。 -
Scheduler
以某種方式註冊目前纖維正在等待的內容,並使用Fiber.yield
將控制權讓給其他纖維(因此纖維在等待等待結束時會暫停,而同一個執行緒中的其他纖維可以執行) -
在目前執行緒執行結束時,會呼叫排程器的 scheduler_close 方法
-
排程器會進入等待迴圈,檢查所有封鎖的纖維(已在掛鉤呼叫中註冊),並在等待的資源準備好時(例如 I/O 準備好或睡眠時間已過)恢復它們。
這樣一來,每個個別 Fiber 的程式碼都能透明地實現並行執行。
Scheduler
實作是由 gem 提供,例如 Async。
掛鉤方法為
除非另有說明,否則掛鉤實作是強制性的:如果未實作,嘗試呼叫掛鉤的方法會失敗。為了提供向後相容性,未來掛鉤將是可選的(如果未實作,由於排程器是為舊版 Ruby 版本建立,需要此掛鉤的程式碼不會失敗,只會以封鎖方式運作)。
強烈建議排程器實作fiber
方法,它委派給Fiber.schedule
。
排程器的範例玩具實作可在 Ruby 的程式碼中找到,在test/fiber/scheduler.rb
公開實例方法
由執行非反向 DNS 查詢的任何方法呼叫。最顯著的方法是 Addrinfo.getaddrinfo,但還有許多其他方法。
預期該方法傳回一個字串陣列,對應於hostname
解析出的 IP 位址,或在無法解析時傳回nil
。
所有可能呼叫站點的相當詳盡清單
-
Addrinfo.getaddrinfo
-
Addrinfo.tcp
-
Addrinfo.udp
-
Addrinfo.ip
-
Addrinfo.new
-
Addrinfo.marshal_load
-
SOCKSSocket.new
-
TCPServer.new
-
TCPSocket.new
-
IPSocket.getaddress
-
TCPSocket.gethostbyname
-
UDPSocket#connect
-
UDPSocket#bind
-
UDPSocket#send
-
Socket.getaddrinfo
-
Socket.gethostbyname
-
Socket.pack_sockaddr_in
-
Socket.sockaddr_in
-
Socket.unpack_sockaddr_in
VALUE rb_fiber_scheduler_address_resolve(VALUE scheduler, VALUE hostname) { VALUE arguments[] = { hostname }; return rb_check_funcall(scheduler, id_address_resolve, 1, arguments); }
由Thread.join
等方法和 Mutex 呼叫,表示目前的Fiber
已封鎖,直到進一步通知(例如unblock
)或直到timeout
已過。
blocker
是我們正在等待的項目,僅供參考(用於偵錯和記錄)。無法保證其值。
預期傳回布林值,指定封鎖作業是否成功。
VALUE rb_fiber_scheduler_block(VALUE scheduler, VALUE blocker, VALUE timeout) { return rb_funcall(scheduler, id_block, 2, blocker, timeout); }
當目前的執行緒結束時呼叫。預期排程器實作此方法,以允許所有等待的纖維完成其執行。
建議的模式是在close
方法中實作主要事件迴圈。
VALUE rb_fiber_scheduler_close(VALUE scheduler) { VM_ASSERT(ruby_thread_has_gvl_p()); VALUE result; // The reason for calling `scheduler_close` before calling `close` is for // legacy schedulers which implement `close` and expect the user to call // it. Subsequently, that method would call `Fiber.set_scheduler(nil)` // which should call `scheduler_close`. If it were to call `close`, it // would create an infinite loop. result = rb_check_funcall(scheduler, id_scheduler_close, 0, NULL); if (!UNDEF_P(result)) return result; result = rb_check_funcall(scheduler, id_close, 0, NULL); if (!UNDEF_P(result)) return result; return Qnil; }
Fiber.schedule
的實作。預期該方法會立即在一個獨立的非封鎖纖維中執行指定的程式碼區塊,並傳回該Fiber
。
建議的最小實作是
def fiber(&block) fiber = Fiber.new(blocking: false, &block) fiber.resume fiber end
由IO#pread
或IO::Buffer#pread
呼叫,從io
的偏移量from
讀取length
位元組,並寫入指定的buffer
(請參閱IO::Buffer
)中的指定offset
。
此方法在語意上與io_read
相同,但它允許指定要讀取的偏移量,而且通常更適合於同一個檔案上的非同步IO
。
此方法應視為實驗性質。
VALUE rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset) { VALUE arguments[] = { io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset) }; return rb_check_funcall(scheduler, id_io_pread, 5, arguments); }
由 IO#pwrite
或 IO::Buffer#pwrite
呼叫,以將 length
位元組寫入 io
,其偏移量為 from
,並寫入指定 buffer
(請參閱 IO::Buffer
) 的指定 offset
。
此方法在語意上與 io_write
相同,但它允許指定要寫入的偏移量,而且通常更適合同一個檔案上的非同步 IO
。
此方法應視為實驗性質。
VALUE rb_fiber_scheduler_io_pwrite(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset) { VALUE arguments[] = { io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset) }; return rb_check_funcall(scheduler, id_io_pwrite, 5, arguments); }
由 IO#read
或 IO#Buffer.read 呼叫,以從 io
讀取 length
位元組,並讀入指定 buffer
(請參閱 IO::Buffer
) 的指定 offset
。
length
參數是「要讀取的最小長度」。如果 IO
緩衝區大小為 8KiB,但 length
為 1024
(1KiB),則可能會讀取多達 8KiB,但至少會讀取 1KiB。一般來說,讀取的資料少於 length
的唯一情況是讀取資料時發生錯誤。
指定 length
為 0 是有效的,表示至少嘗試讀取一次並傳回任何可用的資料。
建議的實作應嘗試以非封鎖方式從 io
讀取,如果 io
尚未準備好,則呼叫 io_wait
(這會將控制權讓給其他執行緒)。
請參閱 IO::Buffer
,以取得可用的介面來傳回資料。
預期傳回已讀取的位元組數,或者在發生錯誤時傳回 -errno
(與系統錯誤碼對應的負數)。
此方法應視為實驗性質。
VALUE rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset) { VALUE arguments[] = { io, buffer, SIZET2NUM(length), SIZET2NUM(offset) }; return rb_check_funcall(scheduler, id_io_read, 4, arguments); }
由 IO.select
呼叫,以詢問指定的描述符是否在指定的 timeout
內已準備好執行指定的事件。
預期傳回已準備好的 IO 陣列
的 3 元組。
VALUE rb_fiber_scheduler_io_select(VALUE scheduler, VALUE readables, VALUE writables, VALUE exceptables, VALUE timeout) { VALUE arguments[] = { readables, writables, exceptables, timeout }; return rb_fiber_scheduler_io_selectv(scheduler, 4, arguments); }
由 IO#wait
、IO#wait_readable
、IO#wait_writable
呼叫,以詢問指定的描述符是否在指定的 timeout
內已準備好執行指定的事件。
events
是 IO::READABLE
、IO::WRITABLE
和 IO::PRIORITY
的位元遮罩。
建議實作應註冊哪些 Fiber
正在等待哪些資源,並立即呼叫 Fiber.yield
以將控制權傳遞給其他 fiber。然後,在 close
方法中,排程器可能會將所有 I/O 資源分派給正在等待它的 fiber。
預期傳回立即準備好的事件子集。
VALUE rb_fiber_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE timeout) { return rb_funcall(scheduler, id_io_wait, 3, io, events, timeout); }
由 IO#write
或 IO::Buffer#write
呼叫,以將 length
位元組寫入 io
,從指定的 buffer
(請參閱 IO::Buffer
) 的指定 offset
。
length
參數是「要寫入的最小長度」。如果 IO
緩衝區大小為 8KiB,但指定的 length
為 1024 (1KiB),最多會寫入 8KiB,但至少會寫入 1KiB。一般而言,寫入的資料少於 length
的唯一情況是寫入資料時發生錯誤。
指定 length
為 0 是有效的,表示至少嘗試寫入一次,盡可能多的資料。
建議實作應嘗試以非封鎖方式寫入 io
,如果 io
尚未準備好,則呼叫 io_wait
(這會將控制權讓給其他 fiber)。
請參閱 IO::Buffer
,以取得可從緩衝區有效率取得資料的介面。
預期傳回寫入的位元組數,或者在發生錯誤時傳回 -errno
(與系統錯誤碼對應的負數)。
此方法應視為實驗性質。
VALUE rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset) { VALUE arguments[] = { io, buffer, SIZET2NUM(length), SIZET2NUM(offset) }; return rb_check_funcall(scheduler, id_io_write, 4, arguments); }
由 Kernel#sleep
和 Mutex#sleep 呼叫,預期提供以非封鎖方式休眠的實作。實作可能會在「哪些 fiber 等待到哪個時間點」的清單中註冊目前的 fiber,呼叫 Fiber.yield
以傳遞控制權,然後在 close
中繼續等待時間已過的 fiber。
VALUE rb_fiber_scheduler_kernel_sleep(VALUE scheduler, VALUE timeout) { return rb_funcall(scheduler, id_kernel_sleep, 1, timeout); }
由 Process::Status.wait
呼叫,用於等待指定的程序。有關參數說明,請參閱該方法說明。
建議的最小實作
Thread.new do Process::Status.wait(pid, flags) end.value
此掛勾是選用的:如果它不存在於目前的排程器中,Process::Status.wait
將會表現為封鎖方法。
預期傳回 Process::Status
實例。
VALUE rb_fiber_scheduler_process_wait(VALUE scheduler, rb_pid_t pid, int flags) { VALUE arguments[] = { PIDT2NUM(pid), RB_INT2NUM(flags) }; return rb_check_funcall(scheduler, id_process_wait, 2, arguments); }
由 Timeout.timeout 呼叫,以便在指定的 duration
內執行指定的 block
。它也可以由排程器或使用者程式碼直接呼叫。
嘗試將指定的 block
的執行時間限制在指定的 duration
內(如果可能)。當非封鎖作業導致 block
的執行時間超過指定的 duration
時,該非封鎖作業應中斷,方法是引發使用指定的 exception_arguments
建構的指定 exception_class
。
通常會將一般執行逾時視為有風險。此實作只會中斷非封鎖作業。這是設計使然,因為預期非封鎖作業可能會因為各種無法預測的原因而失敗,因此應用程式應已具備處理這些情況的穩健性,並因此具備處理逾時的穩健性。
然而,由於此設計,如果 block
沒有呼叫任何非封鎖作業,將無法中斷它。如果您希望提供可預測的逾時點,請考慮加入 +sleep(0)+。
如果成功執行區塊,將傳回其結果。
例外通常會使用 Fiber#raise
引發。
VALUE rb_fiber_scheduler_timeout_after(VALUE scheduler, VALUE timeout, VALUE exception, VALUE message) { VALUE arguments[] = { timeout, exception, message }; return rb_check_funcall(scheduler, id_timeout_after, 3, arguments); }
呼叫以喚醒先前使用 block
封鎖的 Fiber
(例如,Mutex#lock 會呼叫 block
,而 Mutex#unlock 會呼叫 unblock
)。排程器應使用 fiber
參數來了解哪個 fiber 已解除封鎖。
blocker
是等待的對象,但它僅供參考(用於除錯和記錄),而且不保證與 block
的 blocker
為相同值。
VALUE rb_fiber_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber) { VM_ASSERT(rb_obj_is_fiber(fiber)); return rb_funcall(scheduler, id_unblock, 2, blocker, fiber); }