python - How to output progress of parallel process on regular basis? -


i'm learning python multiprocessing module , i've found this example (this bit modified version):

#!/bin/env python import multiprocessing mp import random import string import time  # define output queue output = mp.queue()  # define example function def rand_string(length, output):     time.sleep(1)     """ generates random string of numbers, lower- , uppercase chars. """     rand_str = ''.join(random.choice(                     string.ascii_lowercase                     + string.ascii_uppercase                     + string.digits)                in range(length))     result = (len(rand_str), rand_str)     # print result     time.sleep(1)     output.put(result)   def queue_size(queue):     size = int(queue.qsize())     print size   # setup list of processes want run processes = [mp.process(target=rand_string, args=(3, output)) x in range(1,1000)]   # run processes p in processes:     p.start()  # exit completed processes p in processes:     p.join()   # process results output queue # results = [output.get() p in processes] # print(results) 

i want add progress-bar code. not know best way it. here 1st approach runs in infinite loop @ parent process , counts elements in queue:

#!/bin/env python import multiprocessing mp import random import string import time  # define output queue output = mp.queue()  # define example function def rand_string(length, output):     time.sleep(1)     """ generates random string of numbers, lower- , uppercase chars. """     rand_str = ''.join(random.choice(                     string.ascii_lowercase                     + string.ascii_uppercase                     + string.digits)                in range(length))     result = (len(rand_str), rand_str)     # print result     time.sleep(1)     output.put(result)   def queue_size(queue):     size = int(queue.qsize())     print size   # setup list of processes want run processes = [mp.process(target=rand_string, args=(3, output)) x in range(1,1000)]   # run processes p in processes:     p.start()  # max_size  while true:   current_size = output.qsize()   max_size     = len(processes)   print "%s %%" % (current_size*100/float(max_size))   if current_size == max_size:     break   time.sleep(0.5)   # exit completed processes p in processes:     p.join()  # process results output queue # results = [output.get() p in processes] # print(results) 

the output of following:

71.3713713714 % 79.8798798799 % 86.8868868869 % 93.3933933934 % 99.7997997998 % 100.0 % 

my 2nd approach create an separate process handle progress counting (basically same way 1st approach, counting elements in qeue):

#!/bin/env python import multiprocessing mp import random import string import time  # define output queue output = mp.queue()  # define example function def rand_string(length, output):     time.sleep(1)     """ generates random string of numbers, lower- , uppercase chars. """     rand_str = ''.join(random.choice(                     string.ascii_lowercase                     + string.ascii_uppercase                     + string.digits)                in range(length))     result = (len(rand_str), rand_str)     # print result     time.sleep(1)     output.put(result)   def count_progress(queue, max_size):     current_size = queue.qsize()     if current_size <= max_size:     # while current_size <= max_size:         print "%s %%" % (current_size*100/float(max_size))         # time.sleep(1)   # setup list of processes want run processes = [mp.process(target=rand_string, args=(3, output)) x in range(1,1000)]   # run processes p in processes:     p.start()  p_progress_count = mp.process(target=count_progress, args=(output, len(processes))) p_progress_count.start() p_progress_count.join()  # exit completed processes p in processes:     p.join()   # process results output queue # results = [output.get() p in processes] # print(results) 

the output of following:

69.5695695696 % 

questions:

  1. as can see both progress outputs looks ugly (too few outputs). possible print out progress on regular basis, e.g. every second?
  2. is there way can prefer running 1 process on (run more count_progress on rand_string) other process? think possible adding sleep end of rand_string, seems performance waste me.
  3. what best way of handling similar problems?
  4. is writing/reading to/from output knowns inter process communication using shared memory?

by sub-classing multiprocessing.process progress counter can progress whenever want. there added pipe , queue receive progress other working processes. there added complication progress more reliable checking queue size. starting large number (1000) of rand_string-like processes may not advisable. instead make 1 process produces 'random strings'. have written sample code showing how sub-class process class make progress_counter.

import multiprocessing mp import random import string import time    # define example function def rand_string(length, output , progress_sender):     time.sleep(random.randint(1,5))     """ generates random string of numbers, lower- , uppercase chars. """     rand_str = ''.join(random.choice(                     string.ascii_lowercase                     + string.ascii_uppercase                     + string.digits)                in range(length))     result = (len(rand_str), rand_str)     # print result     output.put(result)     progress_sender.send(1)  #sub classed progress counter class count_progress(mp.process):     def __init__(self, pipe_recv, max_size,progress_queue):         super(count_progress, self).__init__()         self.current_size = 0         self.progress = 0         self.max_size = max_size         self.pipe_recv = pipe_recv.recv         self.progress_queue = progress_queue     def run(self):         while true:             self.current_size +=self.pipe_recv()             self.progress = (self.current_size *100/float(self.max_size))             self.put_progress()             # if int(self.progress)%10==0:             #     print self.progress             if self.progress>=100:                 break         return      def put_progress(self):         if  not self.progress_queue.empty():             self.progress_queue.get()         self.progress_queue.put(self.progress)   if __name__=='__main__':     # define output queue     output = mp.queue()     progress_queue = mp.queue(1)  # queue poll progress     recv, sendr = mp.pipe() # pipe example function send progress      # setup list of processes want run     processes = [mp.process(target=rand_string, args=(3, output, sendr)) x in range(1,20)]      # p_progress_count = mp.process(target=count_progress, args=(recv, len(processes)))     p_progress_count = count_progress(recv, len(processes), progress_queue)     p_progress_count.start()     # run processes     p in processes:         p.start()      # poll progress time want     while p_progress_count.is_alive():         print 'progress @ %.2f%%' % progress_queue.get()         time.sleep(1)     print 'progress @ %d%%' % 100     p_progress_count.join()      # exit completed processes     p in processes:         p.join()      # process results output queue     # results = [output.get() p in processes]     # print(results) 

Comments

Popular posts from this blog

java - UnknownEntityTypeException: Unable to locate persister (Hibernate 5.0) -

python - ValueError: empty vocabulary; perhaps the documents only contain stop words -

ubuntu - collect2: fatal error: ld terminated with signal 9 [Killed] -