並行處理¶
並行處理 vs. 平行處理¶
「並行處理」和「平行處理」的定義有時會混淆,但它們並不相同。
並行系統是可以處理許多任務的系統,儘管不一定同時執行這些任務。您可以想像自己在廚房做飯:您切洋蔥,放入鍋中煎,在煎洋蔥的同時您切番茄,但您不是同時做所有這些事情:您在這些任務之間分配您的時間。平行處理會是一隻手炒洋蔥,另一隻手切番茄。
在撰寫本文時,Crystal 支援並行處理但不支持平行處理:可以執行多個任務,並且每個任務都會花費一些時間,但兩個程式碼路徑永遠不會在完全相同的時間執行。
Crystal 程式預設在單一作業系統執行緒中執行,除了垃圾收集器(目前是 Boehm GC)。支援平行處理,但目前被視為實驗性的。請查看 這篇關於 Crystal 平行處理的部落格文章 以了解更多資訊。
纖程¶
為了實現並行處理,Crystal 擁有纖程。纖程在某種程度上類似於作業系統執行緒,只是它更輕量級,並且其執行由程序內部管理。因此,程式會產生多個纖程,而 Crystal 會確保在適當的時間執行它們。
事件迴圈¶
對於所有與 I/O 相關的事務,都有一個事件迴圈。一些耗時的操作會委派給它,當事件迴圈等待該操作完成時,程式可以繼續執行其他纖程。一個簡單的例子是等待資料通過 socket 傳輸。
通道¶
Crystal 擁有受 CSP 啟發的通道。它們允許在纖程之間傳遞資料,而無需共享記憶體,也無需擔心鎖定、號誌或其他特殊結構。
程式的執行¶
當程式啟動時,它會啟動一個主纖程,該纖程將執行您的頂層程式碼。在那裡,可以產生許多其他纖程。程式的組成部分是
- 執行時期排程器,負責在適當的時間執行所有纖程。
- 事件迴圈,它只是另一個纖程,負責處理非同步任務,例如檔案、socket、管道、訊號和計時器(例如執行
sleep
)。 - 通道,用於在纖程之間傳遞資料。執行時期排程器將協調纖程和通道以進行通訊。
- 垃圾收集器:用於清理「不再使用」的記憶體。
纖程¶
纖程是一個比執行緒更輕量級的執行單元。它是一個小型物件,具有 8MB 的相關 堆疊,這通常是分配給作業系統執行緒的大小。
纖程與執行緒不同,它們是協作式的。執行緒是搶佔式的:作業系統可能會隨時中斷執行緒並開始執行另一個執行緒。纖程必須明確告訴執行時期排程器切換到另一個纖程。例如,如果需要等待 I/O,纖程會告訴排程器「聽著,我必須等待此 I/O 可用,您可以繼續執行其他纖程,當該 I/O 準備就緒時再回來找我」。
協作的優勢在於,切換上下文(在執行緒之間切換)的大部分開銷都消失了。
纖程比執行緒輕量得多:即使分配了 8MB,它也以 4KB 的小堆疊開始。
在 64 位元機器上,我們可以產生數百萬個纖程。在 32 位元機器上,我們只能產生 512 個纖程,這並不多。但是,由於 32 位元機器正開始過時,我們將賭未來,並更專注於 64 位元機器。
執行時期排程器¶
排程器有一個佇列,包含
- 準備好執行的纖程:例如,當您產生一個纖程時,它就準備好執行了。
- 事件迴圈:它是另一個纖程。當沒有其他準備好執行的纖程時,事件迴圈會檢查是否有任何準備就緒的非同步操作,然後執行等待該操作的纖程。事件迴圈目前使用
libevent
實作,它是其他事件機制的抽象,例如epoll
和kqueue
。 - 自願要求等待的纖程:這是使用
Fiber.yield
完成的,這意味著「我可以繼續執行,但如果您願意,我可以給您一些時間來執行其他纖程」。
傳遞資料¶
因為目前只有單一執行緒在執行您的程式碼,所以在不同的纖程中存取和修改類別變數會運作良好。然而,一旦語言中引入多執行緒(平行處理),這可能會出錯。這就是為什麼建議的資料傳遞機制是使用通道,並在它們之間傳送訊息。在內部,通道會實作所有的鎖定機制來避免資料競爭,但從外部來看,您將它們用作通訊基元,因此您(使用者)不必使用鎖定。
範例程式碼¶
產生纖程¶
要產生纖程,您可以使用帶有程式碼區塊的 spawn
spawn do
# ...
socket.gets
# ...
end
spawn do
# ...
sleep 5.seconds
# ...
end
這裡我們有兩個纖程:一個從 socket 讀取,另一個執行 sleep
。當第一個纖程到達 socket.gets
行時,它會被暫停,事件迴圈會被告知當 socket 中有資料時繼續執行這個纖程,並且程式會繼續執行第二個纖程。這個纖程想要休眠 5 秒,因此事件迴圈會被告知在 5 秒後繼續執行這個纖程。如果沒有其他纖程要執行,事件迴圈將會等待直到這些事件之一發生,而不會消耗 CPU 時間。
socket.gets
和 sleep
的行為如此的原因是因為它們的實作直接與執行時排程器和事件迴圈溝通,這沒有什麼神奇之處。一般而言,標準函式庫已經處理了所有這些,所以您不必擔心。
但是請注意,纖程不會立即執行。例如
spawn do
loop do
puts "Hello!"
end
end
執行上面的程式碼將不會產生任何輸出,並立即退出。
這樣的原因是纖程不會在產生後立即執行。因此,主纖程(產生上述纖程的那個)會完成其執行,程式就會退出。
一種解決方法是執行 sleep
spawn do
loop do
puts "Hello!"
end
end
sleep 1.second
這個程式現在會列印 "Hello!" 一秒鐘,然後退出。這是因為 sleep
呼叫會排程主纖程在一秒後執行,然後執行另一個「準備執行」的纖程,也就是上面的那個。
另一種方法是這樣
spawn do
loop do
puts "Hello!"
end
end
Fiber.yield
這次 Fiber.yield
會告知排程器執行另一個纖程。這會列印 "Hello!" 直到標準輸出阻塞(系統呼叫會告訴我們必須等待直到輸出準備好),然後執行會繼續回到主纖程,並且程式退出。這裡標準輸出可能永遠不會阻塞,因此程式會永遠執行下去。
如果我們想要永遠執行產生的纖程,我們可以使用不帶引數的 sleep
spawn do
loop do
puts "Hello!"
end
end
sleep
當然,上面的程式可以完全不使用 spawn
來撰寫,只需一個迴圈即可。當產生多個纖程時,sleep
更有用。
產生呼叫¶
您也可以通過傳遞方法呼叫而不是程式碼區塊來產生。為了理解為什麼這很有用,讓我們看看這個例子
i = 0
while i < 10
spawn do
puts(i)
end
i += 1
end
Fiber.yield
上面的程式會列印 "10" 十次。問題是只有一個變數 i
是所有產生的纖程都參考的,而且當執行 Fiber.yield
時,它的值是 10。
為了解決這個問題,我們可以這樣做
i = 0
while i < 10
proc = ->(x : Int32) do
spawn do
puts(x)
end
end
proc.call(i)
i += 1
end
Fiber.yield
現在它可以運作了,因為我們正在建立一個 Proc,並且傳遞 i
來調用它,所以這個值會被複製,現在產生的纖程會收到一個副本。
為了避免所有這些樣板程式碼,標準函式庫提供了一個 spawn
巨集,它接受一個呼叫表達式,並基本上重寫它來執行上述操作。使用它,我們會得到
i = 0
while i < 10
spawn puts(i)
i += 1
end
Fiber.yield
這對於在迭代中改變的區域變數非常有用。區塊引數則不會發生這種情況。例如,這會如預期般運作
10.times do |i|
spawn do
puts i
end
end
Fiber.yield
產生纖程並等待它完成¶
我們可以為此使用通道
channel = Channel(Nil).new
spawn do
puts "Before send"
channel.send(nil)
puts "After send"
end
puts "Before receive"
channel.receive
puts "After receive"
這會列印出
Before receive
Before send
After send
After receive
首先,程式會產生一個纖程,但不會立即執行它。當我們調用 channel.receive
時,主纖程會阻塞,並且執行會繼續到產生的纖程。然後會調用 channel.send(nil)
。請注意,這個 send
不會佔用通道中的空間,因為在第一個 send
之前已經調用了 receive
,send
不會被阻塞。纖程只有在阻塞或執行完成時才會切換。因此,產生的纖程會在 send
之後繼續執行,並且一旦執行 puts "After send"
,執行就會切換回主纖程。
然後,主纖程會在 channel.receive
處恢復執行,它一直在等待一個值。接著,主纖程會繼續執行並完成。
在上面的例子中,我們使用 nil
只是為了傳達纖程已經結束。我們也可以使用通道來在纖程之間傳遞值
channel = Channel(Int32).new
spawn do
puts "Before first send"
channel.send(1)
puts "Before second send"
channel.send(2)
end
puts "Before first receive"
value = channel.receive
puts value # => 1
puts "Before second receive"
value = channel.receive
puts value # => 2
輸出
Before first receive
Before first send
Before second send
1
Before second receive
2
請注意,當程式執行 receive
時,目前的纖程會阻塞,並且執行會繼續到另一個纖程。當執行 channel.send(1)
時,執行會繼續,因為如果通道尚未滿,send
是非阻塞的。然而,channel.send(2)
會導致纖程阻塞,因為通道(預設大小為 1)已滿,因此執行會繼續到等待該通道的纖程。
這裡我們傳送的是字面值,但是產生的纖程可能會透過例如讀取檔案,或從 socket 取得來計算這個值。當這個纖程必須等待 I/O 時,其他纖程可以繼續執行程式碼,直到 I/O 準備好,最後當值準備好並透過通道傳送時,主纖程會收到它。例如
require "socket"
channel = Channel(String).new
spawn do
server = TCPServer.new("0.0.0.0", 8080)
socket = server.accept
while line = socket.gets
channel.send(line)
end
end
spawn do
while line = gets
channel.send(line)
end
end
3.times do
puts channel.receive
end
上面的程式會產生兩個纖程。第一個會建立一個 TCPServer,接受一個連線,並從中讀取行,將它們傳送到通道。第二個纖程則會從標準輸入讀取行。主纖程會讀取傳送到通道的前 3 則訊息,無論是來自 socket 還是 stdin,然後程式會退出。gets
呼叫會阻塞纖程,並告知事件迴圈如果資料進來就從那裡繼續。
同樣地,我們可以等待多個纖程完成執行,並收集它們的值
channel = Channel(Int32).new
10.times do |i|
spawn do
channel.send(i * 2)
end
end
sum = 0
10.times do
sum += channel.receive
end
puts sum # => 90
當然,您可以在產生的纖程中使用 receive
channel = Channel(Int32).new
spawn do
puts "Before send"
channel.send(1)
puts "After send"
end
spawn do
puts "Before receive"
puts channel.receive
puts "After receive"
end
puts "Before yield"
Fiber.yield
puts "After yield"
輸出
Before yield
Before send
Before receive
1
After receive
After send
After yield
這裡會先執行 channel.send
,但是由於還沒有人在等待值(目前),執行會繼續到其他纖程。第二個纖程會執行,通道上有一個值,它會被取得,並且執行會繼續,首先是第一個纖程,然後是主纖程,因為 Fiber.yield
會將纖程放在執行佇列的末尾。
緩衝通道¶
上面的範例使用的是非緩衝通道:當傳送一個值時,如果一個纖程正在等待該通道,那麼執行就會繼續到該纖程。
使用緩衝通道,調用 send
不會切換到另一個纖程,除非緩衝區已滿
# A buffered channel of capacity 2
channel = Channel(Int32).new(2)
spawn do
puts "Before send 1"
channel.send(1)
puts "Before send 2"
channel.send(2)
puts "Before send 3"
channel.send(3)
puts "After send"
end
3.times do |i|
puts channel.receive
end
輸出
Before send 1
Before send 2
Before send 3
After send
1
2
3
請注意,第一個 send
不會佔用通道中的空間。這是因為在第一個 send
之前調用了 receive
,而其他 2 個 send
調用發生在它們各自的 receive
之前。send
呼叫的次數沒有超過緩衝區的界限,因此 send 纖程會不間斷地執行到完成。
這裡有一個例子,所有緩衝區中的空間都被佔用了
# A buffered channel of capacity 1
channel = Channel(Int32).new(1)
spawn do
puts "Before send 1"
channel.send(1)
puts "Before send 2"
channel.send(2)
puts "Before send 3"
channel.send(3)
puts "End of send fiber"
end
3.times do |i|
puts channel.receive
end
輸出
Before send 1
Before send 2
Before send 3
1
2
3
請注意,「End of send fiber」沒有出現在輸出中,因為我們 receive
了 3 個 send
呼叫,這表示 3.times
會執行到完成,進而解除主纖程的阻塞,使其執行到完成。
這是我們剛才看到的相同程式碼片段,但在最底部新增了 Fiber.yield
呼叫
# A buffered channel of capacity 1
channel = Channel(Int32).new(1)
spawn do
puts "Before send 1"
channel.send(1)
puts "Before send 2"
channel.send(2)
puts "Before send 3"
channel.send(3)
puts "End of send fiber"
end
3.times do |i|
puts channel.receive
end
Fiber.yield
輸出
Before send 1
Before send 2
Before send 3
1
2
3
End of send fiber
在程式碼片段的末尾新增 Fiber.yield
呼叫後,我們會在輸出中看到「End of send fiber」訊息,如果主纖程執行到完成,這個訊息原本會被遺漏。