Ruby 中的多进程与多线程

June 04, 2016 10:30


目标

Ruby 线程的一些前置知识

  1. 线程可以共享程序内存,相对来说使用的资源更少
  2. 相对于进程,线程更加轻量,启动速度更快
  3. 相互之间通信也非常简单
  4. Ruby 由于 GIL(Global interpreter lock) 的原因,多线程并不能同时在多个 CPU 上执行

Ruby 进程的一些前置知识

  1. 进程间无法共享内存数据进行读写
  2. 2.0 开始 Copy On Write 功能可以让 fork 的进程共享内存数据,只在数据修改时才会复制数据
  3. 每个进程可以运行于不同的 CPU 核心上,更充分的利用多核 CPU
  4. 进程间的数据隔离的同时也提高了安全性,避免了像多线程间数据错乱的风险
  5. 同样由于进程间的数据隔离,在进程间的通信相对来说更加困难

多线程与多进程的应用场景

场景一

我们先假想一个场景: A 先生需要向同一房间中的 20 位员工收集个人信息,需要先给员工发送纸质表格,等待员工填写好后再收集起来。

首先来看看我们的任务场景,向 20 位员工收集个人信息,就是我们要处理的任务,给员工发送表格,是 A 先生要做的事件可以看做是 CPU 任务,而等待员工填写完成这个过程可以看做是 IO 等待,而 A 先生呢,就是那个处理任务的进程了。在这个场景中,A 先生要做的事件比较少,更多的时间是在等待员工完成表格。

接下来,我们拟定几个方案来一一分析:

这个方案基本就是一个单进程 + 单线程模式了,一个完成上交了再找另一个,一看就知道效率是非常低的

这个方案相当于使用了多进程来处理,我们可以把每个雇佣来的人看成一个进程,此方案中,通过对更多资源的利用来达到快速完成任务

此方案可以看作一个多线程的模式,在 IO 阻塞时(员工填写表格),我们并不是等待 IO 操作完成后才去执行后面的工作,而是继续执行计算任务(发送表格),完成所有计算任务后,再去等待 IO 并收集结果。

场景二

接下来,我们把场景稍加改变: 每位员工都在距离几公里到几十公里的办公室中

改变后的场景中,从一位员工到另一位员工的这个路途是 A 先生要去做的事,也就还是 CPU 任务,在这个场景中,在路程中的时间可能是一位员工完成表格填写时间的数倍到数十倍。我们继续来分析上面三个方案

一个完成上交了再找另一个,效率还是比较低的

多进程处理,可以明显的看出,完成效率提升了数倍,本来要由一个人走很远的路途,分给 4 个人一起执行后,只需要花 1/4 的时间就可以走完所有路程

多线程模式,上面的方案在计算机中执行一般会是这个情形:找到一位员工后,记录此员工办公室坐标,下次再来时,直接传送过来,不需要再去跑一段路程了。由于一次路程的时间(一个任务的 CPU 计算时间)就相当于几位或数十位员工的表格填写时间了,所以总体上来看,节约的时间几乎可以忽略不计了

场景总结

从上面的两个场景中可以看出:

在使用中,我们也可以把多进程与多线程结合起来使用,也就是第二方案中,每个雇佣来的人也使用第三方案的方式来执行任务,他们会在不同场景中得到不同的效果

在实际应用中,有更多的因素需要考虑,也会不同程度的影响方案的选择:

代码时间

Ruby 写一段简单的多线程

a = [1, 2, 3, 4]
b = []
mutex = Mutex.new

a.length.times.map do |i|
  Thread.new do
    v = [i, i ** 2].join(' - ')
    mutex.synchronize { b << v }
  end
end.map(&:join)

puts b
# => 2 - 4
# 1 - 1
# 0 - 0
# 3 - 9

多线程操作时,要为共享资源加锁,同时不要把可以在锁外完成的操作放到锁中间去执行,长时间占用锁会降低处理能力

Ruby 写一段多进程

require 'socket'

MAX_RECV = 100

sockets = 3.times.map do |i|
  parent_socket, child_socket = Socket.pair(:UNIX, :DGRAM, 0)
  fork do
    pid = Process.pid
    parent_socket.close
    number = child_socket.recv(MAX_RECV).to_i
    puts "#{Time.now} process #{pid}# receive #{number}"
    sleep 1
    child_socket.write("#{number} - #{number * 2}")
    child_socket.close
  end
  child_socket.close
  parent_socket
end

puts "send jobs"
sockets.each_with_index.each do |socket, index|
  socket.send((index + 1).to_s, 0)
end

puts "read result"
sockets.map do |socket|
  puts socket.recv(MAX_RECV)
  socket.close
end

# => send jobs
# read result
# 2016-04-03 11:30:34 +0800 process 21723# receive 12016-04-03 11:30:34 +0800 process 21724# receive 2
# 2016-04-03 11:30:34 +0800 process 21725# receive 3
# 1 - 2
# 2 - 4
# 3 - 6

由于进程间无法直接通信,也没有共享资源,所以我们不会做 array << result 的操作,上面使用 UnixSocket 进程通信

更简单的使用多线程与多进程

使用 Parallel Gem 可以更简单的使用多进程与多线程

require 'parallel'

list = 10.times.to_a
a = Proc.new { list.pop || Parallel::Stop }
result = Parallel.map(a, in_threads: 3) do |number|
  sleep 0.5
  puts "process #{Process.pid} receive #{number}\n"

  number = number.to_i
  number * 2
end

puts "result: #{result.join('-')}"

# => process 21738 receive 9
# process 21738 receive 7
# process 21738 receive 8
# process 21738 receive 5
# process 21738 receive 6
# process 21738 receive 4
# process 21738 receive 1
# process 21738 receive 2
# process 21738 receive 3
# process 21738 receive 0
# result: 18-16-14-12-10-8-6-4-2-0

更多详细的使用请自行围观使用文档

结语

本文中谈到的内容都比较基础的东西,但可能有不少的童鞋不了解,因为业务场景不同,可能一般不会涉及到这些,希望能帮助到大家

抛开业务场景谈技术都是耍流氓!每种技术都有他适用的场景。

Comments: