I used RabbitMQ as the AMQP server and py-amqplib as client.
To use it, you should run a master: python dotprod.py -r 0 -n 2 and a worker:
python dotprod.py -r 1 -n 2. The -r option works like the "RANK" attribute of MPI. The -n option is how many processes are involved.
import sys
from cPickle import dumps, loads
from amqplib import client_0_8 as amqp
from optparse import OptionParser
my_rank = None
num_procs = None
DEBUG = True
# As there is no strict passing by reference I use "boxing" :-)
class Box(object):
def __init__(self):
self.data = None
def pack(self, data):
self.data = data
def unpack(self):
return self.data
def __str__(self):
return '%s' % self.data
def read_vector(conn, chan, prompt, vector_order, my_rank, num_procs):
n_bar = vector_order / num_procs
if (my_rank == 0):
s = raw_input(("Enter a list of integers of size %d:" % vector_order))
try:
vector = [int(i) for i in s.split(",")]
except ValueError, e:
print e
chan.close()
conn.close()
sys.exit(1)
local_vector = vector[0:n_bar]
fst, snd = n_bar, n_bar * 2
for q in xrange(1, num_procs):
slz = vector[fst:snd]
fst = snd
snd += n_bar
# Send
msg = amqp.Message(dumps(slz))
msg.properties['delivery_mode'] = 2
chan.basic_publish(msg, exchange="sorting_room",
routing_key=("key_%d" % q))
return local_vector
else:
# Receive
local_data = Box()
def recv_callback(msg):
print ('Received, from channel #'
+ str(msg.channel.channel_id)),
local_data.pack(loads(msg.body))
print 'data:', local_data
chan.basic_consume(queue=('po_box_%d' % my_rank), no_ack=True,
callback=recv_callback,
consumer_tag=("tag_%d" % my_rank))
chan.wait()
chan.basic_cancel("tag_%d" % my_rank)
return local_data.unpack()
def main(conn, chan, my_rank, num_procs):
# Queue and exchange for broadcasting
chan.exchange_declare(exchange="bcast", type="fanout", durable=True,
auto_delete=False)
chan.queue_declare(queue=("bcast_queue_%d" % my_rank), durable=True,
exclusive=False, auto_delete=False)
chan.queue_bind(queue=("bcast_queue_%d" % my_rank), exchange="bcast")
# Queue and exchange for point-to-point communication
chan.exchange_declare(exchange="sorting_room", type="direct",
durable=True, auto_delete=False)
chan.queue_declare(queue=("po_box_%d" % my_rank), durable=True,
exclusive=False, auto_delete=False)
chan.queue_bind(queue=("po_box_%d" % my_rank),
exchange="sorting_room",
routing_key=("key_%d" % my_rank))
if (my_rank == 0):
try:
vector_order = int(raw_input("Enter an integer number:"))
except ValueError, e:
print e
chan.close()
conn.close()
sys.exit(1)
# Broadcasting to a set of programs
msg = amqp.Message(dumps(vector_order))
msg.properties["delivery_mode"] = 2
chan.basic_publish(msg, exchange="bcast")
else:
# Receive Bcast
local_data = Box()
def recv_callback(msg, ):
print ('Received, from channel #'
+ str(msg.channel.channel_id)),
print 'data:', loads(msg.body)
local_data.pack(loads(msg.body))
chan.basic_consume(queue=('bcast_queue_%d' % my_rank),
no_ack=True,
callback=recv_callback,
consumer_tag=("bcast_tag_%d" % my_rank))
chan.wait()
chan.basic_cancel("bcast_tag_%d" % my_rank)
print 'Done'
vector_order = local_data.unpack()
local_x = read_vector(conn, chan, "read first vector", vector_order,
my_rank, num_procs)
local_y = read_vector(conn, chan, "read second vector", vector_order,
my_rank, num_procs)
print 'Vectors:', local_x, local_y
# local dot prod
acc = 0.0
for idx in xrange(0, len(local_x)):
acc += local_x[idx] * local_y[idx]
local_dot = acc
print '[rank: %d] Local dot product: %f' % (my_rank, local_dot)
if my_rank != 0:
# send to master
msg = amqp.Message(dumps(local_dot))
msg.properties["delivery_mode"] = 2
chan.basic_publish(msg, exchange="sorting_room",
routing_key="key_0")
print '[rank: %d] done' % my_rank
else:
# Receive messages
local_data = Box()
def recv_callback(msg):
print ('Received, from channel #'
+ str(msg.channel.channel_id)),
print 'data:', loads(msg.body)
local_data.pack(loads(msg.body))
chan.basic_consume(queue=('po_box_%d' % my_rank),
no_ack=True,
callback=recv_callback,
consumer_tag=("tag_%d" % my_rank))
acc = 0.0
for i in xrange(1, num_procs):
chan.wait()
acc += local_data.unpack()
print '[rank: %d] acc: %f' % (my_rank, acc)
chan.basic_cancel("tag_%d" % my_rank)
print '[rank: %d] done, result %f' % (my_rank, acc + local_dot)
if __name__ == '__main__':
# Initializing connection
conn = amqp.Connection(host="localhost:5672", userid="guest",
password="guest", virtual_host="/",
insist=False)
chan = conn.channel()
parser = OptionParser()
parser.add_option("-r", "--rank", dest="rank",
help="Gives the rank to the process",
default=1)
parser.add_option("-n", "--number_procs", dest="np", default=2,
help="Number of procs to use")
(options, args) = parser.parse_args()
try:
my_rank = int(options.rank)
num_procs = int(options.np)
except ValueError, e:
print e
chan.close()
conn.close()
sys.exit(1)
print 'Running with rank:', my_rank, 'and', num_procs, 'processes'
main(conn, chan, my_rank, num_procs)
# close channel and connection
chan.close()
conn.close()