類別 Fiber::Scheduler

這不是現有的類別,而是 Scheduler 物件應遵守的介面文件,才能用作 Fiber.scheduler 的引數,並處理非封鎖纖維。請參閱 Fiber 類別文件中的「非封鎖纖維」區段,以了解一些概念的說明。

預期排程器的行為和用法如下

  • 當非封鎖 Fiber 中的執行到達某些封鎖操作(例如睡眠、等待處理程序或未準備好的 I/O)時,它會呼叫排程器的一些掛鉤方法,如下所列。

  • Scheduler 以某種方式註冊目前纖維正在等待的內容,並使用 Fiber.yield 將控制權讓給其他纖維(因此纖維在等待等待結束時會暫停,而同一個執行緒中的其他纖維可以執行)

  • 在目前執行緒執行結束時,會呼叫排程器的 scheduler_close 方法

  • 排程器會進入等待迴圈,檢查所有封鎖的纖維(已在掛鉤呼叫中註冊),並在等待的資源準備好時(例如 I/O 準備好或睡眠時間已過)恢復它們。

這樣一來,每個個別 Fiber 的程式碼都能透明地實現並行執行。

Scheduler 實作是由 gem 提供,例如 Async

掛鉤方法為

除非另有說明,否則掛鉤實作是強制性的:如果未實作,嘗試呼叫掛鉤的方法會失敗。為了提供向後相容性,未來掛鉤將是可選的(如果未實作,由於排程器是為舊版 Ruby 版本建立,需要此掛鉤的程式碼不會失敗,只會以封鎖方式運作)。

強烈建議排程器實作fiber方法,它委派給Fiber.schedule

排程器的範例玩具實作可在 Ruby 的程式碼中找到,在test/fiber/scheduler.rb

公開實例方法

address_resolve(hostname) → array_of_strings or nil 按一下以切換來源

由執行非反向 DNS 查詢的任何方法呼叫。最顯著的方法是 Addrinfo.getaddrinfo,但還有許多其他方法。

預期該方法傳回一個字串陣列,對應於hostname解析出的 IP 位址,或在無法解析時傳回nil

所有可能呼叫站點的相當詳盡清單

  • Addrinfo.getaddrinfo

  • Addrinfo.tcp

  • Addrinfo.udp

  • Addrinfo.ip

  • Addrinfo.new

  • Addrinfo.marshal_load

  • SOCKSSocket.new

  • TCPServer.new

  • TCPSocket.new

  • IPSocket.getaddress

  • TCPSocket.gethostbyname

  • UDPSocket#connect

  • UDPSocket#bind

  • UDPSocket#send

  • Socket.getaddrinfo

  • Socket.gethostbyname

  • Socket.pack_sockaddr_in

  • Socket.sockaddr_in

  • Socket.unpack_sockaddr_in

VALUE
rb_fiber_scheduler_address_resolve(VALUE scheduler, VALUE hostname)
{
    VALUE arguments[] = {
        hostname
    };

    return rb_check_funcall(scheduler, id_address_resolve, 1, arguments);
}
block(blocker, timeout = nil) 按一下以切換來源

Thread.join等方法和 Mutex 呼叫,表示目前的Fiber已封鎖,直到進一步通知(例如unblock)或直到timeout已過。

blocker是我們正在等待的項目,僅供參考(用於偵錯和記錄)。無法保證其值。

預期傳回布林值,指定封鎖作業是否成功。

VALUE
rb_fiber_scheduler_block(VALUE scheduler, VALUE blocker, VALUE timeout)
{
    return rb_funcall(scheduler, id_block, 2, blocker, timeout);
}
close() 按一下以切換來源

當目前的執行緒結束時呼叫。預期排程器實作此方法,以允許所有等待的纖維完成其執行。

建議的模式是在close方法中實作主要事件迴圈。

VALUE
rb_fiber_scheduler_close(VALUE scheduler)
{
    VM_ASSERT(ruby_thread_has_gvl_p());

    VALUE result;

    // The reason for calling `scheduler_close` before calling `close` is for
    // legacy schedulers which implement `close` and expect the user to call
    // it. Subsequently, that method would call `Fiber.set_scheduler(nil)`
    // which should call `scheduler_close`. If it were to call `close`, it
    // would create an infinite loop.

    result = rb_check_funcall(scheduler, id_scheduler_close, 0, NULL);
    if (!UNDEF_P(result)) return result;

    result = rb_check_funcall(scheduler, id_close, 0, NULL);
    if (!UNDEF_P(result)) return result;

    return Qnil;
}
fiber(&block)

Fiber.schedule的實作。預期該方法會立即在一個獨立的非封鎖纖維中執行指定的程式碼區塊,並傳回該Fiber

建議的最小實作是

def fiber(&block)
  fiber = Fiber.new(blocking: false, &block)
  fiber.resume
  fiber
end
io_pread(io, buffer, from, length, offset) → read length or -errno 按一下以切換來源

IO#preadIO::Buffer#pread呼叫,從io的偏移量from讀取length位元組,並寫入指定的buffer(請參閱IO::Buffer)中的指定offset

此方法在語意上與io_read相同,但它允許指定要讀取的偏移量,而且通常更適合於同一個檔案上的非同步IO

此方法應視為實驗性質

VALUE
rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset)
{
    VALUE arguments[] = {
        io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset)
    };

    return rb_check_funcall(scheduler, id_io_pread, 5, arguments);
}
io_pwrite(io, buffer, from, length, offset) → 已寫入長度或 -errno 按一下以切換來源

IO#pwriteIO::Buffer#pwrite 呼叫,以將 length 位元組寫入 io,其偏移量為 from,並寫入指定 buffer (請參閱 IO::Buffer) 的指定 offset

此方法在語意上與 io_write 相同,但它允許指定要寫入的偏移量,而且通常更適合同一個檔案上的非同步 IO

此方法應視為實驗性質

VALUE
rb_fiber_scheduler_io_pwrite(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset)
{
    VALUE arguments[] = {
        io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset)
    };

    return rb_check_funcall(scheduler, id_io_pwrite, 5, arguments);
}
io_read(io, buffer, length, offset) → 已讀取長度或 -errno 按一下以切換來源

IO#read 或 IO#Buffer.read 呼叫,以從 io 讀取 length 位元組,並讀入指定 buffer (請參閱 IO::Buffer) 的指定 offset

length 參數是「要讀取的最小長度」。如果 IO 緩衝區大小為 8KiB,但 length1024 (1KiB),則可能會讀取多達 8KiB,但至少會讀取 1KiB。一般來說,讀取的資料少於 length 的唯一情況是讀取資料時發生錯誤。

指定 length 為 0 是有效的,表示至少嘗試讀取一次並傳回任何可用的資料。

建議的實作應嘗試以非封鎖方式從 io 讀取,如果 io 尚未準備好,則呼叫 io_wait (這會將控制權讓給其他執行緒)。

請參閱 IO::Buffer,以取得可用的介面來傳回資料。

預期傳回已讀取的位元組數,或者在發生錯誤時傳回 -errno (與系統錯誤碼對應的負數)。

此方法應視為實驗性質

VALUE
rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset)
{
    VALUE arguments[] = {
        io, buffer, SIZET2NUM(length), SIZET2NUM(offset)
    };

    return rb_check_funcall(scheduler, id_io_read, 4, arguments);
}
io_select(readables, writables, exceptables, timeout) 按一下以切換來源

IO.select 呼叫,以詢問指定的描述符是否在指定的 timeout 內已準備好執行指定的事件。

預期傳回已準備好的 IO 陣列 的 3 元組。

VALUE rb_fiber_scheduler_io_select(VALUE scheduler, VALUE readables, VALUE writables, VALUE exceptables, VALUE timeout)
{
    VALUE arguments[] = {
        readables, writables, exceptables, timeout
    };

    return rb_fiber_scheduler_io_selectv(scheduler, 4, arguments);
}
io_wait(io, events, timeout) 按一下以切換來源

IO#waitIO#wait_readableIO#wait_writable 呼叫,以詢問指定的描述符是否在指定的 timeout 內已準備好執行指定的事件。

eventsIO::READABLEIO::WRITABLEIO::PRIORITY 的位元遮罩。

建議實作應註冊哪些 Fiber 正在等待哪些資源,並立即呼叫 Fiber.yield 以將控制權傳遞給其他 fiber。然後,在 close 方法中,排程器可能會將所有 I/O 資源分派給正在等待它的 fiber。

預期傳回立即準備好的事件子集。

VALUE
rb_fiber_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE timeout)
{
    return rb_funcall(scheduler, id_io_wait, 3, io, events, timeout);
}
io_write(io, buffer, length, offset) → 寫入長度或 -errno 按一下以切換來源

IO#writeIO::Buffer#write 呼叫,以將 length 位元組寫入 io,從指定的 buffer (請參閱 IO::Buffer) 的指定 offset

length 參數是「要寫入的最小長度」。如果 IO 緩衝區大小為 8KiB,但指定的 length 為 1024 (1KiB),最多會寫入 8KiB,但至少會寫入 1KiB。一般而言,寫入的資料少於 length 的唯一情況是寫入資料時發生錯誤。

指定 length 為 0 是有效的,表示至少嘗試寫入一次,盡可能多的資料。

建議實作應嘗試以非封鎖方式寫入 io,如果 io 尚未準備好,則呼叫 io_wait (這會將控制權讓給其他 fiber)。

請參閱 IO::Buffer,以取得可從緩衝區有效率取得資料的介面。

預期傳回寫入的位元組數,或者在發生錯誤時傳回 -errno (與系統錯誤碼對應的負數)。

此方法應視為實驗性質

VALUE
rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset)
{
    VALUE arguments[] = {
        io, buffer, SIZET2NUM(length), SIZET2NUM(offset)
    };

    return rb_check_funcall(scheduler, id_io_write, 4, arguments);
}
kernel_sleep(duration = nil) 按一下以切換來源

Kernel#sleep 和 Mutex#sleep 呼叫,預期提供以非封鎖方式休眠的實作。實作可能會在「哪些 fiber 等待到哪個時間點」的清單中註冊目前的 fiber,呼叫 Fiber.yield 以傳遞控制權,然後在 close 中繼續等待時間已過的 fiber。

VALUE
rb_fiber_scheduler_kernel_sleep(VALUE scheduler, VALUE timeout)
{
    return rb_funcall(scheduler, id_kernel_sleep, 1, timeout);
}
process_wait(pid, flags) 按一下以切換來源

Process::Status.wait 呼叫,用於等待指定的程序。有關參數說明,請參閱該方法說明。

建議的最小實作

Thread.new do
  Process::Status.wait(pid, flags)
end.value

此掛勾是選用的:如果它不存在於目前的排程器中,Process::Status.wait 將會表現為封鎖方法。

預期傳回 Process::Status 實例。

VALUE
rb_fiber_scheduler_process_wait(VALUE scheduler, rb_pid_t pid, int flags)
{
    VALUE arguments[] = {
        PIDT2NUM(pid), RB_INT2NUM(flags)
    };

    return rb_check_funcall(scheduler, id_process_wait, 2, arguments);
}
timeout_after(duration, exception_class, *exception_arguments, &block) → result of block 按一下以切換來源

由 Timeout.timeout 呼叫,以便在指定的 duration 內執行指定的 block。它也可以由排程器或使用者程式碼直接呼叫。

嘗試將指定的 block 的執行時間限制在指定的 duration 內(如果可能)。當非封鎖作業導致 block 的執行時間超過指定的 duration 時,該非封鎖作業應中斷,方法是引發使用指定的 exception_arguments 建構的指定 exception_class

通常會將一般執行逾時視為有風險。此實作只會中斷非封鎖作業。這是設計使然,因為預期非封鎖作業可能會因為各種無法預測的原因而失敗,因此應用程式應已具備處理這些情況的穩健性,並因此具備處理逾時的穩健性。

然而,由於此設計,如果 block 沒有呼叫任何非封鎖作業,將無法中斷它。如果您希望提供可預測的逾時點,請考慮加入 +sleep(0)+。

如果成功執行區塊,將傳回其結果。

例外通常會使用 Fiber#raise 引發。

VALUE
rb_fiber_scheduler_timeout_after(VALUE scheduler, VALUE timeout, VALUE exception, VALUE message)
{
    VALUE arguments[] = {
        timeout, exception, message
    };

    return rb_check_funcall(scheduler, id_timeout_after, 3, arguments);
}
unblock(blocker, fiber) 按一下以切換來源

呼叫以喚醒先前使用 block 封鎖的 Fiber(例如,Mutex#lock 會呼叫 block,而 Mutex#unlock 會呼叫 unblock)。排程器應使用 fiber 參數來了解哪個 fiber 已解除封鎖。

blocker 是等待的對象,但它僅供參考(用於除錯和記錄),而且不保證與 blockblocker 為相同值。

VALUE
rb_fiber_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber)
{
    VM_ASSERT(rb_obj_is_fiber(fiber));

    return rb_funcall(scheduler, id_unblock, 2, blocker, fiber);
}