In today's multi-core cpu world, concurrent programming is required to scale and fully utilize shared resources. Writing concurrent programs is hard and the choice of concurrency model chosen can greatly affect the application scalability.
Consider nginx vs Apache Http Server - nginx uses non-blocking event based concurrency to fully utilize a
single core, while using process based concurrency to scale across
multiple cores. The Apache Http Server on the other hand uses thread (via the worker mpm module) based concurrency to scale on a single core and process based concurrency to scale across multiple cores. The higher resource consumption per thread incurred by the Apache Server leaves it far behind nginx when scaling to thousands of requests.
While building concurrent applications in Ruby, you will want to consider the following aspects:
- Task profile : Is the task cpu bound? Does it make system calls or network i/o? synchronous / asynchronous i/o?
- Choice of concurrency models : Ruby provides Process and Thread based concurrency model constructs as part of its Standard libraries, while various gems exist to add Event based (eventmachine, rev etc) and Actor based (celluloid) concurrency constructs to the language.
- Choice of Ruby Interpreter : multi-core support , GVM(GIL), Copy on Write (COW) support
* Choice of OS matters as well, but assuming you are constrained to one of the linux variants
Lets explore all of these aspects practically with some code examples:
Threads and GVM (GIL)
First, let us consider the case where we need to run some busy cpu work concurrently.
def busy_work
a=i=0; loop { break if 10000000 < (i=i+1); a += (i << 32) }
end
Driver
without_threading = measure_time_taken do
ITERATIONS.times.each do |i|
busy_work
end
end
with_threading = measure_time_taken do
ITERATIONS.times.each_slice(NUM_THREADS) do |i|
NUM_THREADS.times.collect { Thread.new { busy_work } }.each {|t| t.join}
end
end
Output
(YARV 1.9.3)
--------------------------------------------------------------------------------
RUBY_PLATFORM: x86_64-darwin13.0.0 RUBY_VERSION: 1.9.3 RUBY_INSTALL_NAME: ruby
--------------------------------------------------------------------------------
user system total real
busy work (10 iterations) 19.050000 0.010000 19.060000 ( 19.059950)
threaded busy work (10 iters / 2 thrds) 19.690000 0.020000 19.710000 ( 19.709079)
--------------------------------------------------------------------------------
Percentage increase in time taken with Thread concurrency: 3.41
(JRuby 1.9.3)
--------------------------------------------------------------------------------
RUBY_PLATFORM: java RUBY_VERSION: 1.9.3 RUBY_INSTALL_NAME: jruby
--------------------------------------------------------------------------------
user system total real
busy work (10 iterations) 18.610000 0.830000 19.440000 ( 17.609000)
threaded busy work (10 iters / 2 thrds) 21.910000 0.980000 22.890000 ( 11.353000)
--------------------------------------------------------------------------------
Percentage decrease in time taken with Thread concurrency: 35.53
Right off the bat, we see the choice of interpreter implementation makes a difference in our concurrent implementation. The JRuby version ran 30% faster, whereas YARV 1.9.3 version ran 22% slower. Ruby 2.0 didn't fare any better either.
In this code snippet, you saw YARV's GVM (GIL) come into play for cpu bound tasks. Even
though YARV maps its thread implementation to native OS threads, only a single thread gets scheduled to run at a time as
it has to acquire and hold the GIL for the duration of execution.
If you are stuck with YARV due to gem requirements etc, the only way to scale pure ruby work is by using process based concurrency,a heavier form of concurrency.
ITERATIONS.times.each_slice(num_threads) do |i|
pids = num_threads.times.collect { fork { busy_work } }
Process.waitall
end
--------------------------------------------------------------------------------
RUBY_PLATFORM: x86_64-darwin13.0.0 RUBY_VERSION: 1.9.3 RUBY_INSTALL_NAME: ruby
--------------------------------------------------------------------------------
user system total real
busy work (10 iterations) 18.730000 0.010000 18.740000 ( 18.734536)
threaded busy work (10 iters / 2 thrds) 18.810000 0.010000 18.820000 ( 18.816004)
process based busy work (10 iters / 2 processes) 0.000000 0.000000 18.900000 ( 9.457859)
--------------------------------------------------------------------------------
Percentage increase in time taken with Thread concurrency: 0.43
--------------------------------------------------------------------------------
Percentage decrease in time taken with Process concurrency (2 processes): 49.52
Using process based concurrency made the whole thing go faster, but as concurrency is increased, it will do so with diminishing returns as forking related system costs start to dominate.
The good news though is that most often real world tasks are not as cpu bound as our test case. Whenever the task involves any system call or network i/o, the YARV VM thread will release the GIL,
allowing other threads to be scheduled. So Thread based concurrency can still be used based on your task profile.
def mixed_work
start = Time.now
while (Time.now - start) < 5.0
i=0; loop { break if 400000 < (i=i+1); dirs = Dir.glob(ENV['HOME']) }
end
end
--------------------------------------------------------------------------------
RUBY_PLATFORM: x86_64-darwin13.0.0 RUBY_VERSION: 1.9.3 RUBY_INSTALL_NAME: ruby
--------------------------------------------------------------------------------
user system total real
mixed work (10 iterations) 47.600000 12.350000 59.950000 ( 59.942870)
threaded mixed work (10 iters / 2 thrds) 32.960000 8.690000 41.650000 ( 41.650272)
--------------------------------------------------------------------------------
Percentage decrease in time taken with Thread concurrency: 30.52
This also works with synchronous I/O tasks like connecting to the database or Net::HTTP calls:
def sync_io_work
::PGconn.new({:dbname => 'dev', :user => 'me', :password => 'cccccc', :host => '127.0.0.1'}).exec("SELECT pg_sleep(3)")
end
--------------------------------------------------------------------------------
RUBY_PLATFORM: x86_64-darwin13.0.0 RUBY_VERSION: 1.9.3 RUBY_INSTALL_NAME: ruby
--------------------------------------------------------------------------------
user system total real
sync_io work (10 iterations) 0.000000 0.010000 0.010000 ( 30.161611)
threaded sync_io work (10 iters / 2 thrds) 0.010000 0.000000 0.010000 ( 15.029913)
--------------------------------------------------------------------------------
Percentage decrease in time taken with Thread concurrency: 50.17
Unfortunately, if you are on an older implementation like MRI, results are not so rosy (MRI does not map its threads to OS native threads):
--------------------------------------------------------------------------------
RUBY_PLATFORM: i686-darwin13.0.0 RUBY_VERSION: 1.8.7 RUBY_INSTALL_NAME: ruby
--------------------------------------------------------------------------------
user system total real
sync_io work (10 iterations) 0.000000 0.000000 0.000000 ( 30.067002)
threaded sync_io work (10 iters / 2 thrds) 0.010000 0.000000 0.010000 ( 30.071508)
--------------------------------------------------------------------------------
Percentage increase in time taken with Thread concurrency: 0.02
MRI uses green threads that allow for portability, but at the cost of blocking all threads whenever there is any pure ruby work or synchronous i/o being done by any of the threads in the process.
However, one can use non-blocking i/o to achieve some form of thread based concurrency in MRI.
# ...
# utils.rb
def async_io_work
conn = ::PGconn.new({:dbname => 'dev', :user => 'me', :password => 'cccccc', :host => '127.0.0.1'})
conn.setnonblocking(true)
conn.send_query("SELECT pg_sleep(3)")
conn
end
def wait_for_results(conns)
hsh = conns.inject({}) {|a,c| a[IO.new(c.socket)] = { :conn => c, :done => false}; a}
loop do
break if hsh.values.collect {|v| v[:done]}.all?
res = select(hsh.keys.select {|k| !hsh[k][:done]}, nil, nil, 0.1)
res.first.each {|s| hsh[s][:done] = process_non_blocking_event(hsh[s][:conn])} if res
end
end
def process_non_blocking_event(conn)
conn.consume_input
unless conn.is_busy
res, data = 0, []
while res != nil
res = conn.get_result
res.each {|d| data.push d} unless res.nil?
end
return true
end
return false
end
# ...
# driver.rb
b.report("async_io work (#{ITERATIONS} iterations)") do
without_threading = measure_time_taken do
wait_for_results(ITERATIONS.times.collect do |i|
async_io_work
end)
end
end
--------------------------------------------------------------------------------
RUBY_PLATFORM: i686-darwin13.0.0 RUBY_VERSION: 1.8.7 RUBY_INSTALL_NAME: ruby
--------------------------------------------------------------------------------
user system total real
sync_io work (10 iterations) 0.000000 0.000000 0.000000 ( 30.070065)
async_io work (10 iterations) 0.010000 0.000000 0.010000 ( 3.036897)
Process based concurrency and Pre-forking
Process based concurrency model, though a heavy option due to the cost incurred in forking a process, stays an option of choice in many popular applications like Unicorn. As we saw above, GVM is one reason why we might want to continue to consider process based concurrency.
One additional reason why these applications support this model is that thread safety may not be guaranteed when the user application code is loaded and a single thread of execution per process needs to be maintained. So if your application has thread unsafe code, you only option may be process based concurrency.
A common architecture for process based concurrency is a master process forking a worker process to handle each request. However this can be expensive as the process forking costs start to add up.
ITERATIONS.times.each_slice(num_processes) do |i|
pids = num_processes.times.collect { fork { busy_work } }
Process.waitall
end
Pre-forking is a common server side scaling technique that spins up a pool of persistent worker sub-processes which can then be dispatched the task that needs to be executed concurrently. This complicates the concurrency model implementation though, as some sort of bi-directional communication needs to be maintained between the master server process and the worker processes. The master brokers the request and decides which worker should be sent the work.
# ----------------------
# process_concurrency.rb
# -----------------------
b.report("with pre-forking workers, busy load (#{ITERATIONS} iters / #{num_processes} procs)") do
pool = []
time_taken = measure_time_taken do
pool = create_worker_pool(num_processes)
ITERATIONS.times.each { |i| send_task_to_free_worker(pool, :busy_work) }
end
begin
worker = send_task_to_free_worker(pool, :exit); pool.delete(worker)
end while pool.length > 0
end
# ----------------------
# utils.rb
# -----------------------
module ProcessPool
def create_worker_pool(num_workers)
num_workers.times.collect do |i|
m_r,m_w, w_r,w_w= IO.pipe + IO.pipe
if fork
m_r.close; w_w.close
{ :r => w_r, :w => m_w }
else
# worker code
m_w.close; w_r.close; r = m_r; w = w_w
loop { w.write "ready"; cmd_len = r.read(2); cmd=r.read(cmd_len.to_i); send(cmd.to_sym) }
end
end
end
def send_task_to_free_worker(pool, task)
free_worker = wait_for_free_worker(pool)
free_worker[:w].write(((cmd_len = task.to_s.length) > 9) ? cmd_len.to_s : "0#{cmd_len}")
free_worker[:w].write task.to_s
free_worker
end
def wait_for_free_worker(pool)
read_handles = pool.collect {|v| v[:r]}
ready_read_handle = IO.select(read_handles).first.first; ready_read_handle.read(5)
pool.detect {|v| v[:r] == ready_read_handle}
end
end
The results don't show a huge decrease, but over thousands of connections, it starts to add up.
--------------------------------------------------------------------------------
RUBY_PLATFORM: i686-darwin13.0.0 RUBY_VERSION: 1.8.7 RUBY_INSTALL_NAME: ruby
--------------------------------------------------------------------------------
user system total real
no pre-forking workers, busy work (30 iters / 4 procs) 0.000000 0.000000 320.460000 ( 81.602852)
with pre-forking workers, busy load (30 iters / 4 procs) 0.000000 0.000000 0.000000 ( 77.597048)
That is a lot of code to digest in one sitting. We will continue with process based concurrency constructs and explore event based and actor based concurrency models in the next post in this series.
The code is available on github.