Friday, April 11, 2014

Ruby Concurrency Primer - Part 1


In today's multi-core cpu world, concurrent programming is required to scale and fully utilize shared resources. Writing concurrent programs is hard and the choice of concurrency model chosen can greatly affect the application scalability. 

Consider nginx vs Apache Http Server - nginx uses non-blocking event based concurrency to fully utilize a single core, while using process based concurrency to scale across multiple cores. The Apache Http Server on the other hand uses thread (via the worker mpm module) based concurrency to scale on a single core and process based concurrency to scale across multiple cores.  The higher resource consumption per thread incurred by the Apache Server leaves it far behind nginx when scaling to thousands of requests.


While building concurrent applications in Ruby, you will want to consider the following aspects:


- Task profileIs the task cpu bound? Does it make system calls or network i/o? synchronous / asynchronous i/o?

- Choice of concurrency models :  Ruby provides Process and Thread based concurrency model constructs as part of its Standard libraries, while various gems exist to add Event based (eventmachine, rev etc)  and Actor based (celluloid) concurrency constructs to the language.

- Choice of Ruby Interpreter  : multi-core support , GVM(GIL), Copy on Write (COW) support

* Choice of OS matters as well, but assuming you are constrained to one of the linux variants


Lets explore all of these aspects practically with some code examples:

Threads and GVM (GIL)

First, let us consider the case where we need to run some busy cpu work concurrently. 

  def busy_work
    a=i=0; loop { break if 10000000 < (i=i+1); a += (i << 32) }
  end

Driver

without_threading = measure_time_taken do
  ITERATIONS.times.each do |i|
    busy_work
  end
end

with_threading = measure_time_taken do
  ITERATIONS.times.each_slice(NUM_THREADS) do |i|
    NUM_THREADS.times.collect { Thread.new { busy_work } }.each {|t| t.join}
  end
end

Output

(YARV 1.9.3)

--------------------------------------------------------------------------------
 RUBY_PLATFORM: x86_64-darwin13.0.0  RUBY_VERSION: 1.9.3 RUBY_INSTALL_NAME: ruby
--------------------------------------------------------------------------------
                                                    user     system      total        real
busy work (10 iterations)                      19.050000   0.010000  19.060000 ( 19.059950)
threaded busy work (10 iters / 2 thrds)        19.690000   0.020000  19.710000 ( 19.709079)
--------------------------------------------------------------------------------
Percentage increase in time taken with Thread concurrency: 3.41

(JRuby 1.9.3)

--------------------------------------------------------------------------------
 RUBY_PLATFORM: java  RUBY_VERSION: 1.9.3 RUBY_INSTALL_NAME: jruby
--------------------------------------------------------------------------------
                                                    user     system      total        real
busy work (10 iterations)                      18.610000   0.830000  19.440000 ( 17.609000)
threaded busy work (10 iters / 2 thrds)        21.910000   0.980000  22.890000 ( 11.353000)
--------------------------------------------------------------------------------
Percentage decrease in time taken with Thread concurrency: 35.53
  

Right off the bat, we see the choice of interpreter implementation makes a difference in our concurrent implementation. The JRuby version ran 30% faster, whereas YARV 1.9.3 version ran 22% slower. Ruby 2.0 didn't fare any better either.

In this code snippet, you saw YARV's GVM (GIL) come into play for cpu bound tasks. Even though YARV maps its thread implementation to native OS threads, only a single thread gets scheduled to run at a time as it has to acquire and hold the GIL for the duration of execution.

If you are stuck with YARV due to gem requirements etc, the only way to scale pure ruby work is by using process based concurrency,a heavier form of concurrency.

  
ITERATIONS.times.each_slice(num_threads) do |i|
   pids = num_threads.times.collect { fork { busy_work } }
   Process.waitall
end

--------------------------------------------------------------------------------
 RUBY_PLATFORM: x86_64-darwin13.0.0  RUBY_VERSION: 1.9.3 RUBY_INSTALL_NAME: ruby
--------------------------------------------------------------------------------
                                                    user     system      total        real
busy work (10 iterations)                      18.730000   0.010000  18.740000 ( 18.734536)
threaded busy work (10 iters / 2 thrds)        18.810000   0.010000  18.820000 ( 18.816004)
process based busy work (10 iters / 2 processes)  0.000000   0.000000  18.900000 (  9.457859)
--------------------------------------------------------------------------------
Percentage increase in time taken with Thread concurrency: 0.43
--------------------------------------------------------------------------------
Percentage decrease in time taken with Process concurrency (2 processes): 49.52


Using process based concurrency made the whole thing go faster, but as concurrency is increased, it will do so with diminishing returns as forking related system costs start to dominate.


The good news though is that most often real world tasks are not as cpu bound as our test case. Whenever the task involves any system call or network i/o, the YARV VM thread will release the GIL, allowing other threads to be scheduled. So Thread based concurrency can still be used based on your task profile.

  def mixed_work
    start = Time.now
    while (Time.now - start) < 5.0
      i=0; loop { break if 400000 < (i=i+1); dirs = Dir.glob(ENV['HOME']) }
    end
  end


--------------------------------------------------------------------------------
 RUBY_PLATFORM: x86_64-darwin13.0.0  RUBY_VERSION: 1.9.3 RUBY_INSTALL_NAME: ruby
--------------------------------------------------------------------------------
                                                    user     system      total        real
mixed work (10 iterations)                     47.600000  12.350000  59.950000 ( 59.942870)
threaded mixed work (10 iters / 2 thrds)       32.960000   8.690000  41.650000 ( 41.650272)
--------------------------------------------------------------------------------
Percentage decrease in time taken with Thread concurrency: 30.52


This also works with synchronous I/O tasks like connecting to the database or Net::HTTP calls:

def sync_io_work
   ::PGconn.new({:dbname => 'dev', :user => 'me', :password => 'cccccc', :host => '127.0.0.1'}).exec("SELECT pg_sleep(3)")
end


--------------------------------------------------------------------------------
 RUBY_PLATFORM: x86_64-darwin13.0.0  RUBY_VERSION: 1.9.3 RUBY_INSTALL_NAME: ruby
--------------------------------------------------------------------------------
                                                    user     system      total        real
sync_io work (10 iterations)                    0.000000   0.010000   0.010000 ( 30.161611)
threaded sync_io work (10 iters / 2 thrds)      0.010000   0.000000   0.010000 ( 15.029913)
--------------------------------------------------------------------------------
Percentage decrease in time taken with Thread concurrency: 50.17


Unfortunately, if you are on an older implementation like MRI, results are not so rosy (MRI does not map its threads to OS native threads):


--------------------------------------------------------------------------------
 RUBY_PLATFORM: i686-darwin13.0.0  RUBY_VERSION: 1.8.7 RUBY_INSTALL_NAME: ruby
--------------------------------------------------------------------------------
                                                   user     system      total        real
sync_io work (10 iterations)                   0.000000   0.000000   0.000000 ( 30.067002)
threaded sync_io work (10 iters / 2 thrds)     0.010000   0.000000   0.010000 ( 30.071508)
--------------------------------------------------------------------------------
Percentage increase in time taken with Thread concurrency: 0.02

MRI uses green threads that allow for portability, but at the cost of blocking all threads whenever there is any pure ruby work or synchronous i/o being done by any of the threads in the process. 

However, one can use non-blocking i/o to achieve some form of thread based concurrency in MRI.

  # ...
  # utils.rb

      def async_io_work
        conn = ::PGconn.new({:dbname => 'dev', :user => 'me', :password => 'cccccc', :host => '127.0.0.1'})
        conn.setnonblocking(true)
        conn.send_query("SELECT pg_sleep(3)")
        conn
      end

      def wait_for_results(conns)
        hsh = conns.inject({}) {|a,c| a[IO.new(c.socket)] = { :conn => c, :done => false}; a}
        loop do
          break if hsh.values.collect {|v| v[:done]}.all?
          res = select(hsh.keys.select {|k| !hsh[k][:done]}, nil, nil, 0.1)
          res.first.each {|s| hsh[s][:done] = process_non_blocking_event(hsh[s][:conn])} if res
        end
      end

      def process_non_blocking_event(conn)
        conn.consume_input
        unless conn.is_busy
          res, data = 0, []
          while res != nil
            res = conn.get_result
            res.each {|d| data.push d} unless res.nil?
          end
          return true
        end
        return false
      end

 #  ...
 #  driver.rb
    b.report("async_io work (#{ITERATIONS} iterations)") do
      without_threading = measure_time_taken do
        wait_for_results(ITERATIONS.times.collect do |i|
                           async_io_work
                         end)
      end
    end
   

--------------------------------------------------------------------------------
 RUBY_PLATFORM: i686-darwin13.0.0  RUBY_VERSION: 1.8.7 RUBY_INSTALL_NAME: ruby
--------------------------------------------------------------------------------
                                                   user     system      total        real
sync_io work (10 iterations)                   0.000000   0.000000   0.000000 ( 30.070065)
async_io work (10 iterations)                  0.010000   0.000000   0.010000 (  3.036897)


Process based concurrency and Pre-forking  

 

Process based concurrency model, though a heavy option due to the cost incurred in forking a process, stays an option of choice in many popular applications like Unicorn. As we saw above, GVM is one reason why we might want to continue to consider process based concurrency. 

One additional reason why these applications support this model is that thread safety may not be guaranteed when the user application code is loaded and a single thread of execution per process needs to be maintained. So if your application has thread unsafe code, you only option may be process based concurrency.

A common architecture for process based concurrency is a master process forking a worker process to handle each request. However this can be expensive as the process forking costs start to add up.
    
ITERATIONS.times.each_slice(num_processes) do |i|
  pids = num_processes.times.collect { fork { busy_work } }
  Process.waitall
end

Pre-forking is a common server side scaling technique that spins up a pool of persistent worker sub-processes which can then be dispatched the task that needs to be executed concurrently. This complicates the concurrency model implementation though, as some sort of bi-directional communication needs to be maintained between the master server process and the worker processes. The master brokers the request and decides which worker should be sent the work.


# ----------------------
# process_concurrency.rb
# -----------------------

    b.report("with pre-forking workers, busy load (#{ITERATIONS} iters / #{num_processes} procs)") do
      pool = []
      time_taken = measure_time_taken do
        pool = create_worker_pool(num_processes)
        ITERATIONS.times.each { |i| send_task_to_free_worker(pool, :busy_work) }
      end
      begin
        worker = send_task_to_free_worker(pool, :exit); pool.delete(worker)
      end while pool.length > 0
    end

# ----------------------
# utils.rb
# -----------------------

  module ProcessPool
    def create_worker_pool(num_workers)
      num_workers.times.collect do |i|
        m_r,m_w, w_r,w_w= IO.pipe + IO.pipe
        if fork
          m_r.close; w_w.close
          { :r => w_r, :w => m_w }
        else
          # worker code
          m_w.close; w_r.close; r = m_r; w = w_w
          loop { w.write "ready"; cmd_len = r.read(2); cmd=r.read(cmd_len.to_i); send(cmd.to_sym) }
        end
      end
    end

    def send_task_to_free_worker(pool, task)
      free_worker = wait_for_free_worker(pool)
      free_worker[:w].write(((cmd_len = task.to_s.length) > 9) ? cmd_len.to_s : "0#{cmd_len}")
      free_worker[:w].write task.to_s
      free_worker
    end

    def wait_for_free_worker(pool)
      read_handles = pool.collect {|v| v[:r]}
      ready_read_handle = IO.select(read_handles).first.first; ready_read_handle.read(5)
      pool.detect {|v| v[:r] == ready_read_handle}
    end
  end


The results don't show a huge decrease, but over thousands of connections, it starts to add up.

--------------------------------------------------------------------------------
 RUBY_PLATFORM: i686-darwin13.0.0  RUBY_VERSION: 1.8.7 RUBY_INSTALL_NAME: ruby
--------------------------------------------------------------------------------
                                                                  user     system      total        real
no pre-forking workers, busy work (30 iters / 4 procs)        0.000000   0.000000 320.460000 ( 81.602852)
with pre-forking workers, busy load (30 iters / 4 procs)      0.000000   0.000000   0.000000 ( 77.597048)


That is a lot of code to digest in one sitting. We will continue with process based concurrency constructs and explore event based and actor based concurrency models in the next post in this series.

The code is available on github.






No comments: