python - How to output progress of parallel process on regular basis? -
i'm learning python multiprocessing module , i've found this example (this bit modified version):
#!/bin/env python import multiprocessing mp import random import string import time # define output queue output = mp.queue() # define example function def rand_string(length, output): time.sleep(1) """ generates random string of numbers, lower- , uppercase chars. """ rand_str = ''.join(random.choice( string.ascii_lowercase + string.ascii_uppercase + string.digits) in range(length)) result = (len(rand_str), rand_str) # print result time.sleep(1) output.put(result) def queue_size(queue): size = int(queue.qsize()) print size # setup list of processes want run processes = [mp.process(target=rand_string, args=(3, output)) x in range(1,1000)] # run processes p in processes: p.start() # exit completed processes p in processes: p.join() # process results output queue # results = [output.get() p in processes] # print(results)
i want add progress-bar code. not know best way it. here 1st approach runs in infinite loop @ parent process , counts elements in queue:
#!/bin/env python import multiprocessing mp import random import string import time # define output queue output = mp.queue() # define example function def rand_string(length, output): time.sleep(1) """ generates random string of numbers, lower- , uppercase chars. """ rand_str = ''.join(random.choice( string.ascii_lowercase + string.ascii_uppercase + string.digits) in range(length)) result = (len(rand_str), rand_str) # print result time.sleep(1) output.put(result) def queue_size(queue): size = int(queue.qsize()) print size # setup list of processes want run processes = [mp.process(target=rand_string, args=(3, output)) x in range(1,1000)] # run processes p in processes: p.start() # max_size while true: current_size = output.qsize() max_size = len(processes) print "%s %%" % (current_size*100/float(max_size)) if current_size == max_size: break time.sleep(0.5) # exit completed processes p in processes: p.join() # process results output queue # results = [output.get() p in processes] # print(results)
the output of following:
71.3713713714 % 79.8798798799 % 86.8868868869 % 93.3933933934 % 99.7997997998 % 100.0 %
my 2nd approach create an separate process handle progress counting (basically same way 1st approach, counting elements in qeue):
#!/bin/env python import multiprocessing mp import random import string import time # define output queue output = mp.queue() # define example function def rand_string(length, output): time.sleep(1) """ generates random string of numbers, lower- , uppercase chars. """ rand_str = ''.join(random.choice( string.ascii_lowercase + string.ascii_uppercase + string.digits) in range(length)) result = (len(rand_str), rand_str) # print result time.sleep(1) output.put(result) def count_progress(queue, max_size): current_size = queue.qsize() if current_size <= max_size: # while current_size <= max_size: print "%s %%" % (current_size*100/float(max_size)) # time.sleep(1) # setup list of processes want run processes = [mp.process(target=rand_string, args=(3, output)) x in range(1,1000)] # run processes p in processes: p.start() p_progress_count = mp.process(target=count_progress, args=(output, len(processes))) p_progress_count.start() p_progress_count.join() # exit completed processes p in processes: p.join() # process results output queue # results = [output.get() p in processes] # print(results)
the output of following:
69.5695695696 %
questions:
- as can see both progress outputs looks ugly (too few outputs). possible print out progress on regular basis, e.g. every second?
- is there way can prefer running 1 process on (run more
count_progress
onrand_string
) other process? think possible adding sleep end ofrand_string
, seems performance waste me. - what best way of handling similar problems?
- is writing/reading to/from
output
knowns inter process communication using shared memory?
by sub-classing multiprocessing.process
progress counter can progress whenever want. there added pipe
, queue
receive progress other working processes. there added complication progress more reliable checking queue size. starting large number (1000) of rand_string
-like processes may not advisable. instead make 1 process produces 'random strings'. have written sample code showing how sub-class process
class make progress_counter
.
import multiprocessing mp import random import string import time # define example function def rand_string(length, output , progress_sender): time.sleep(random.randint(1,5)) """ generates random string of numbers, lower- , uppercase chars. """ rand_str = ''.join(random.choice( string.ascii_lowercase + string.ascii_uppercase + string.digits) in range(length)) result = (len(rand_str), rand_str) # print result output.put(result) progress_sender.send(1) #sub classed progress counter class count_progress(mp.process): def __init__(self, pipe_recv, max_size,progress_queue): super(count_progress, self).__init__() self.current_size = 0 self.progress = 0 self.max_size = max_size self.pipe_recv = pipe_recv.recv self.progress_queue = progress_queue def run(self): while true: self.current_size +=self.pipe_recv() self.progress = (self.current_size *100/float(self.max_size)) self.put_progress() # if int(self.progress)%10==0: # print self.progress if self.progress>=100: break return def put_progress(self): if not self.progress_queue.empty(): self.progress_queue.get() self.progress_queue.put(self.progress) if __name__=='__main__': # define output queue output = mp.queue() progress_queue = mp.queue(1) # queue poll progress recv, sendr = mp.pipe() # pipe example function send progress # setup list of processes want run processes = [mp.process(target=rand_string, args=(3, output, sendr)) x in range(1,20)] # p_progress_count = mp.process(target=count_progress, args=(recv, len(processes))) p_progress_count = count_progress(recv, len(processes), progress_queue) p_progress_count.start() # run processes p in processes: p.start() # poll progress time want while p_progress_count.is_alive(): print 'progress @ %.2f%%' % progress_queue.get() time.sleep(1) print 'progress @ %d%%' % 100 p_progress_count.join() # exit completed processes p in processes: p.join() # process results output queue # results = [output.get() p in processes] # print(results)
Comments
Post a Comment