Showing posts with label python. Show all posts
Showing posts with label python. Show all posts

Monday, July 19, 2010

Infinite list of primes! Yay!

or until the memory gives up...


# -*- coding: utf-8 -*-

from itertools import count
from collections import defaultdict

def seive():
table = defaultdict(list)
for x in count(2):
facts = table[x]
if facts:
del table[x]
for p in facts:
table[x+p] = table[x+p] + [p]
else:
table[x*x] = [x]
yield x

if __name__ == '__main__':
for k, v in enumerate(seive()):
print k, v


Got the idea from here.

Tuesday, May 25, 2010

This is an implementation of parallel dot product of two vectors. It seems obvious that one could use AMQP as messaging interface for parallel computing. So that's what I did here. I "ported" MPI Dot Product that was shown to me in a HPC class. The version of the algorithms is supposedly from Parallel Programming with MPI by Peter Pacheco.

I used RabbitMQ as the AMQP server and py-amqplib as client.

To use it, you should run a master: python dotprod.py -r 0 -n 2 and a worker:
python dotprod.py -r 1 -n 2. The -r option works like the "RANK" attribute of MPI. The -n option is how many processes are involved.




import sys

from cPickle import dumps, loads
from amqplib import client_0_8 as amqp
from optparse import OptionParser

my_rank = None
num_procs = None

DEBUG = True

# As there is no strict passing by reference I use "boxing" :-)
class Box(object):
def __init__(self):
self.data = None

def pack(self, data):
self.data = data

def unpack(self):
return self.data

def __str__(self):
return '%s' % self.data

def read_vector(conn, chan, prompt, vector_order, my_rank, num_procs):
n_bar = vector_order / num_procs

if (my_rank == 0):
s = raw_input(("Enter a list of integers of size %d:" % vector_order))

try:
vector = [int(i) for i in s.split(",")]

except ValueError, e:
print e
chan.close()
conn.close()
sys.exit(1)

local_vector = vector[0:n_bar]

fst, snd = n_bar, n_bar * 2
for q in xrange(1, num_procs):
slz = vector[fst:snd]
fst = snd
snd += n_bar

# Send
msg = amqp.Message(dumps(slz))
msg.properties['delivery_mode'] = 2
chan.basic_publish(msg, exchange="sorting_room",
routing_key=("key_%d" % q))

return local_vector

else:
# Receive
local_data = Box()

def recv_callback(msg):
print ('Received, from channel #'
+ str(msg.channel.channel_id)),
local_data.pack(loads(msg.body))
print 'data:', local_data

chan.basic_consume(queue=('po_box_%d' % my_rank), no_ack=True,
callback=recv_callback,
consumer_tag=("tag_%d" % my_rank))
chan.wait()
chan.basic_cancel("tag_%d" % my_rank)

return local_data.unpack()


def main(conn, chan, my_rank, num_procs):

# Queue and exchange for broadcasting
chan.exchange_declare(exchange="bcast", type="fanout", durable=True,
auto_delete=False)
chan.queue_declare(queue=("bcast_queue_%d" % my_rank), durable=True,
exclusive=False, auto_delete=False)
chan.queue_bind(queue=("bcast_queue_%d" % my_rank), exchange="bcast")

# Queue and exchange for point-to-point communication
chan.exchange_declare(exchange="sorting_room", type="direct",
durable=True, auto_delete=False)
chan.queue_declare(queue=("po_box_%d" % my_rank), durable=True,
exclusive=False, auto_delete=False)
chan.queue_bind(queue=("po_box_%d" % my_rank),
exchange="sorting_room",
routing_key=("key_%d" % my_rank))

if (my_rank == 0):

try:
vector_order = int(raw_input("Enter an integer number:"))
except ValueError, e:
print e
chan.close()
conn.close()
sys.exit(1)

# Broadcasting to a set of programs
msg = amqp.Message(dumps(vector_order))
msg.properties["delivery_mode"] = 2
chan.basic_publish(msg, exchange="bcast")

else:
# Receive Bcast
local_data = Box()

def recv_callback(msg, ):
print ('Received, from channel #'
+ str(msg.channel.channel_id)),
print 'data:', loads(msg.body)
local_data.pack(loads(msg.body))


chan.basic_consume(queue=('bcast_queue_%d' % my_rank),
no_ack=True,
callback=recv_callback,
consumer_tag=("bcast_tag_%d" % my_rank))


chan.wait()
chan.basic_cancel("bcast_tag_%d" % my_rank)
print 'Done'

vector_order = local_data.unpack()

local_x = read_vector(conn, chan, "read first vector", vector_order,
my_rank, num_procs)
local_y = read_vector(conn, chan, "read second vector", vector_order,
my_rank, num_procs)

print 'Vectors:', local_x, local_y

# local dot prod
acc = 0.0
for idx in xrange(0, len(local_x)):
acc += local_x[idx] * local_y[idx]

local_dot = acc

print '[rank: %d] Local dot product: %f' % (my_rank, local_dot)

if my_rank != 0:
# send to master
msg = amqp.Message(dumps(local_dot))
msg.properties["delivery_mode"] = 2
chan.basic_publish(msg, exchange="sorting_room",
routing_key="key_0")
print '[rank: %d] done' % my_rank
else:
# Receive messages
local_data = Box()

def recv_callback(msg):
print ('Received, from channel #'
+ str(msg.channel.channel_id)),
print 'data:', loads(msg.body)
local_data.pack(loads(msg.body))

chan.basic_consume(queue=('po_box_%d' % my_rank),
no_ack=True,
callback=recv_callback,
consumer_tag=("tag_%d" % my_rank))

acc = 0.0
for i in xrange(1, num_procs):
chan.wait()
acc += local_data.unpack()
print '[rank: %d] acc: %f' % (my_rank, acc)

chan.basic_cancel("tag_%d" % my_rank)
print '[rank: %d] done, result %f' % (my_rank, acc + local_dot)


if __name__ == '__main__':
# Initializing connection
conn = amqp.Connection(host="localhost:5672", userid="guest",
password="guest", virtual_host="/",
insist=False)
chan = conn.channel()


parser = OptionParser()
parser.add_option("-r", "--rank", dest="rank",
help="Gives the rank to the process",
default=1)
parser.add_option("-n", "--number_procs", dest="np", default=2,
help="Number of procs to use")

(options, args) = parser.parse_args()

try:
my_rank = int(options.rank)
num_procs = int(options.np)
except ValueError, e:
print e
chan.close()
conn.close()
sys.exit(1)

print 'Running with rank:', my_rank, 'and', num_procs, 'processes'

main(conn, chan, my_rank, num_procs)

# close channel and connection
chan.close()
conn.close()

Friday, March 26, 2010

Hello, World!

I've been writing many snippets of code. Some snippets do what I consider really nifty things. Some other pieces were written while I was learning a language, a framework, or just having fun. Most of them are deleted...

Why not post them? Clean them up a bit, add a short explanation, or a long one if I can. Maybe I can get some constructive criticism, or perhaps it is useful for someone out there. Considering what I just said, I think that posting is one good incentive to keep learning languages and frameworks and computer programming in general.

So here goes a snippet. The merge sort algorithm developed using the divide and conquer strategy in python.


def merge(xs, ys):
"""
takes two ordered lists and merges keeping the order
>>> merge([1,3,5,8],[2,4,6,7])
[1, 2, 3, 4, 5, 6, 7, 8]

>>> merge([1,2,6,9],[-2,-1,6])
[-2, -1, 1, 2, 6, 6, 9]

>>> merge([-10,-2], [1,2,3,4,5,6,7,8])
[-10, -2, 1, 2, 3, 4, 5, 6, 7, 8]
"""

merged_list = []
x_index, y_index = 0, 0

while (x_index < len(xs)) and (y_index < len(ys)):
x, y = xs[x_index], ys[y_index]
if x <= y:
merged_list.append(x)
x_index += 1
else:
merged_list.append(y)
y_index += 1


return merged_list + (xs[x_index:] or ys[y_index:])

def merge_sort(xs):
""" Divide and conquer strategy in merge sort """

# Divide the problem
half = len(xs) / 2

# If the list is empty or singleton then it's ordered
if len(xs) <= 1:
return xs
else: # Conquer
first = xs[0:half]
second = xs[half:len(xs)]
return merge(merge_sort(first), merge_sort(second))

if __name__ == '__main__':
import doctest
doctest.testmod()