Tuesday, May 25, 2010

This is an implementation of parallel dot product of two vectors. It seems obvious that one could use AMQP as messaging interface for parallel computing. So that's what I did here. I "ported" MPI Dot Product that was shown to me in a HPC class. The version of the algorithms is supposedly from Parallel Programming with MPI by Peter Pacheco.

I used RabbitMQ as the AMQP server and py-amqplib as client.

To use it, you should run a master: python dotprod.py -r 0 -n 2 and a worker:
python dotprod.py -r 1 -n 2. The -r option works like the "RANK" attribute of MPI. The -n option is how many processes are involved.




import sys

from cPickle import dumps, loads
from amqplib import client_0_8 as amqp
from optparse import OptionParser

my_rank = None
num_procs = None

DEBUG = True

# As there is no strict passing by reference I use "boxing" :-)
class Box(object):
def __init__(self):
self.data = None

def pack(self, data):
self.data = data

def unpack(self):
return self.data

def __str__(self):
return '%s' % self.data

def read_vector(conn, chan, prompt, vector_order, my_rank, num_procs):
n_bar = vector_order / num_procs

if (my_rank == 0):
s = raw_input(("Enter a list of integers of size %d:" % vector_order))

try:
vector = [int(i) for i in s.split(",")]

except ValueError, e:
print e
chan.close()
conn.close()
sys.exit(1)

local_vector = vector[0:n_bar]

fst, snd = n_bar, n_bar * 2
for q in xrange(1, num_procs):
slz = vector[fst:snd]
fst = snd
snd += n_bar

# Send
msg = amqp.Message(dumps(slz))
msg.properties['delivery_mode'] = 2
chan.basic_publish(msg, exchange="sorting_room",
routing_key=("key_%d" % q))

return local_vector

else:
# Receive
local_data = Box()

def recv_callback(msg):
print ('Received, from channel #'
+ str(msg.channel.channel_id)),
local_data.pack(loads(msg.body))
print 'data:', local_data

chan.basic_consume(queue=('po_box_%d' % my_rank), no_ack=True,
callback=recv_callback,
consumer_tag=("tag_%d" % my_rank))
chan.wait()
chan.basic_cancel("tag_%d" % my_rank)

return local_data.unpack()


def main(conn, chan, my_rank, num_procs):

# Queue and exchange for broadcasting
chan.exchange_declare(exchange="bcast", type="fanout", durable=True,
auto_delete=False)
chan.queue_declare(queue=("bcast_queue_%d" % my_rank), durable=True,
exclusive=False, auto_delete=False)
chan.queue_bind(queue=("bcast_queue_%d" % my_rank), exchange="bcast")

# Queue and exchange for point-to-point communication
chan.exchange_declare(exchange="sorting_room", type="direct",
durable=True, auto_delete=False)
chan.queue_declare(queue=("po_box_%d" % my_rank), durable=True,
exclusive=False, auto_delete=False)
chan.queue_bind(queue=("po_box_%d" % my_rank),
exchange="sorting_room",
routing_key=("key_%d" % my_rank))

if (my_rank == 0):

try:
vector_order = int(raw_input("Enter an integer number:"))
except ValueError, e:
print e
chan.close()
conn.close()
sys.exit(1)

# Broadcasting to a set of programs
msg = amqp.Message(dumps(vector_order))
msg.properties["delivery_mode"] = 2
chan.basic_publish(msg, exchange="bcast")

else:
# Receive Bcast
local_data = Box()

def recv_callback(msg, ):
print ('Received, from channel #'
+ str(msg.channel.channel_id)),
print 'data:', loads(msg.body)
local_data.pack(loads(msg.body))


chan.basic_consume(queue=('bcast_queue_%d' % my_rank),
no_ack=True,
callback=recv_callback,
consumer_tag=("bcast_tag_%d" % my_rank))


chan.wait()
chan.basic_cancel("bcast_tag_%d" % my_rank)
print 'Done'

vector_order = local_data.unpack()

local_x = read_vector(conn, chan, "read first vector", vector_order,
my_rank, num_procs)
local_y = read_vector(conn, chan, "read second vector", vector_order,
my_rank, num_procs)

print 'Vectors:', local_x, local_y

# local dot prod
acc = 0.0
for idx in xrange(0, len(local_x)):
acc += local_x[idx] * local_y[idx]

local_dot = acc

print '[rank: %d] Local dot product: %f' % (my_rank, local_dot)

if my_rank != 0:
# send to master
msg = amqp.Message(dumps(local_dot))
msg.properties["delivery_mode"] = 2
chan.basic_publish(msg, exchange="sorting_room",
routing_key="key_0")
print '[rank: %d] done' % my_rank
else:
# Receive messages
local_data = Box()

def recv_callback(msg):
print ('Received, from channel #'
+ str(msg.channel.channel_id)),
print 'data:', loads(msg.body)
local_data.pack(loads(msg.body))

chan.basic_consume(queue=('po_box_%d' % my_rank),
no_ack=True,
callback=recv_callback,
consumer_tag=("tag_%d" % my_rank))

acc = 0.0
for i in xrange(1, num_procs):
chan.wait()
acc += local_data.unpack()
print '[rank: %d] acc: %f' % (my_rank, acc)

chan.basic_cancel("tag_%d" % my_rank)
print '[rank: %d] done, result %f' % (my_rank, acc + local_dot)


if __name__ == '__main__':
# Initializing connection
conn = amqp.Connection(host="localhost:5672", userid="guest",
password="guest", virtual_host="/",
insist=False)
chan = conn.channel()


parser = OptionParser()
parser.add_option("-r", "--rank", dest="rank",
help="Gives the rank to the process",
default=1)
parser.add_option("-n", "--number_procs", dest="np", default=2,
help="Number of procs to use")

(options, args) = parser.parse_args()

try:
my_rank = int(options.rank)
num_procs = int(options.np)
except ValueError, e:
print e
chan.close()
conn.close()
sys.exit(1)

print 'Running with rank:', my_rank, 'and', num_procs, 'processes'

main(conn, chan, my_rank, num_procs)

# close channel and connection
chan.close()
conn.close()

No comments:

Post a Comment