diff options
author | root <root@gmona.(none)> | 2010-05-26 10:01:43 +0300 |
---|---|---|
committer | root <root@gmona.(none)> | 2010-05-26 10:01:43 +0300 |
commit | 2c4497353a913c7669d0c51ce617967878b1ac65 (patch) | |
tree | 59fe61e351010aa2085c0237c6b348a71061acef /twrapper | |
parent | Some dirs added (diff) | |
download | idfetch-2c4497353a913c7669d0c51ce617967878b1ac65.tar.gz idfetch-2c4497353a913c7669d0c51ce617967878b1ac65.tar.bz2 idfetch-2c4497353a913c7669d0c51ce617967878b1ac65.zip |
twrapper kills child processes on exit. Log file for wget replaced with pipe
Diffstat (limited to 'twrapper')
-rwxr-xr-x | twrapper/twrapper.py | 197 |
1 files changed, 129 insertions, 68 deletions
diff --git a/twrapper/twrapper.py b/twrapper/twrapper.py index 9e0951c..224f637 100755 --- a/twrapper/twrapper.py +++ b/twrapper/twrapper.py @@ -12,6 +12,7 @@ import os import re import time import sys +import signal from portage.process import spawn import idfetch_settings import pickle @@ -20,7 +21,7 @@ import time from threading import Thread CLEAN_LINE=" " MAX_ACTIVE_DOWNLOADS=8 -TASK_SPACE=4 +TASK_SPACE=3 def time_msg(): stdscr.addstr(0,50,"["+time.ctime()+"] q - Quit") @@ -31,16 +32,11 @@ def fit_string(text): return text[0:max_x-1] def msg(index,msg_text): global exit_flag -# print(msg_text) if exit_flag: pass else: stdscr.addstr(index,0,fit_string(msg_text)) stdscr.refresh() -# stdscr.nodelay(1) -# key = stdscr.getch() -# if key== ord('q'): -# sys.exit() def download_msg(index,msg_text): msg(TASK_SPACE*index,str(index)+")"+msg_text) @@ -53,13 +49,8 @@ def total_msg(msg_text): def progress_msg(index,msg_text1,msg_text2): -# global exit_flag -# if exit_flag: -# pass -# else: - stdscr.addstr(TASK_SPACE*index+1,0,fit_string(" "+msg_text1)) - stdscr.addstr(TASK_SPACE*index+2,0,fit_string(" "+msg_text2)) - stdscr.refresh() + msg(TASK_SPACE*index+1,msg_text1) + def error_msg(msg_text): msg(max_y-1,"WARNING: "+msg_text) @@ -91,40 +82,102 @@ class fetchit(Thread): self.trials_log_file_name = idfetch_settings.TASK_DIR+'/logs/log_trials/'+self.download_file+".log" def start(self,place_in_the_list): self.place_in_the_list=place_in_the_list -# msg(14+place_in_the_list,"lkajkldfjjlsdkjflaskjdfkalskdjallllllllfalskdjflaskdjffff"+str(self.place_in_the_list)) Thread.start(self) def run(self): # msg(0,'DOWNLOADING: '+self.download_file) + global exit_flag self.download_file_url_list_file=open(idfetch_settings.TASK_DIR+'/urls/'+self.download_file+".urllist") self.download_file_url_list=pickle.load(self.download_file_url_list_file) self.download_file_url_list_file.close() -# print(self.download_file_url_list) -# while downloadfile_url_list: -# print(downloadfile_url_list.pop()) -# return -# a=os.spawnl(os.P_NOWAIT,"/bin/touch","/usr/lib/portage/swrapper/1.txt") self.trials_log_file= open (self.trials_log_file_name,"w") - #wget_output = os.popen("/usr/bin/wget "+self.download_file_url_list.pop()+" -o "+log_file,"r") - while (self.download_file_url_list) and (self.status !=0) : + while (self.download_file_url_list) and (self.status !=0) and (not(exit_flag)) : self.current_url=self.download_file_url_list.pop() self.trials_log_file.write("[DL "+str(self.index)+"]: "+self.current_url+"\n") download_msg(self.place_in_the_list,"[DL "+str(self.index)+"]: "+self.current_url) -# wget_run_status=os.spawnl(os.P_WAIT,"/usr/bin/wget","wget","--connect-timeout=1","--progress=bar:force",self.current_url, "-o",self.wget_log_file_name) - wget_run_status=os.spawnl(os.P_WAIT,"/usr/bin/wget","wget","--connect-timeout=1",\ - self.current_url,"--directory-prefix="+idfetch_settings.DIST_DIR,\ - '--tries='+str(idfetch_settings.WGET_TRIES),\ - "--read-timeout="+str(idfetch_settings.WGET_READ_TIMEOUT),"-o",self.wget_log_file_name) -# wget_run_status = os.popen("/usr/bin/wget "+self.download_file_url_list.pop(),"r") - wget_output= open(self.wget_log_file_name,"r") - while 1: - line = wget_output.readline() - if not line: break - igot = re.findall('saved',line) - if igot: - self.status = 0 - break -# int(igot[0]) -# msg(10+self.index,"exited") + # preparing to fork + #if fd_pipes is None: + self.fd_pipes = { + 0:sys.stdin.fileno(), + 1:sys.stdout.fileno(), + 2:sys.stderr.fileno(), + } + + + if 1: + # Using a log file requires that stdout and stderr + # are assigned to the process we're running. + if 1 not in self.fd_pipes or 2 not in self.fd_pipes: + raise ValueError(self.fd_pipes) + + # Create a pipe + (self.pr, self.pw) = os.pipe() + + # Create a tee process, giving it our stdout and stderr + # as well as the read end of the pipe. + # mypids.extend(spawn(('tee', '-i', '-a', logfile), + # returnpid=True, fd_pipes={0:pr, + # 1:fd_pipes[1], 2:fd_pipes[2]})) + + # We don't need the read end of the pipe, so close it. + ### os.close(pr) + + # Assign the write end of the pipe to our stdout and stderr. + self.fd_pipes[1] = self.pw + self.fd_pipes[2] = self.pw + self.pid = os.fork() + if self.pid: + # we are the parent +# global + mypids.append(self.pid) + os.close(self.pw) # use os.close() to close a file descriptor + self.pr = os.fdopen(self.pr) # turn r into a file object +# print "parent: reading" +# print "--------------->" + # for i in range(1,50): + self.line=self.pr.readline() + while self.line: + progress_msg(self.place_in_the_list, self.line,"") +# time.ctime(1) + self.line=self.pr.readline() + self.igot = re.findall('saved',self.line) + if self.igot: + self.status = 0 + break +### os.waitpid(self.pid, 0) # make sure the child process gets cleaned up + try: + os.waitpid(self.pid, os.WNOHANG) +# os.waitpid(self.pid, 0) + except OSError: + # This pid has been cleaned up outside + # of spawn(). + pass + else: + # we are the child + os.close(self.pr) + self.w = os.fdopen(self.pw, 'w') +# print "child: writing" + self.my_fds = {} + # To protect from cases where direct assignment could + # clobber needed fds ({1:2, 2:1}) we first dupe the fds + # into unused fds. + for self.fd in self.fd_pipes: + self.my_fds[self.fd] = os.dup(self.fd_pipes[self.fd]) + # Then assign them to what they should be. + for self.fd in self.my_fds: + os.dup2(self.my_fds[self.fd], self.fd) + # Then close _all_ fds that haven't been explictly + # requested to be kept open. + # for fd in get_open_fds(): + if self.fd not in self.my_fds: + try: + os.close(self.fd) + except OSError: + pass + os.execv('/usr/bin/wget', ["wget","--connect-timeout=1", self.current_url,\ + "--directory-prefix="+idfetch_settings.DIST_DIR,\ + "--tries="+str(idfetch_settings.WGET_TRIES),\ + "--read-timeout="+str(idfetch_settings.WGET_READ_TIMEOUT)]) + sys.exit(0) if self.status ==0: self.trials_log_file.write("[FIN "+str(self.index)+"]: "+self.current_url+"\n") download_msg(self.place_in_the_list,"[FIN "+str(self.index)+"]: "+self.current_url) @@ -136,24 +189,10 @@ class fetchit(Thread): download_msg(self.place_in_the_list,"[ERROR+LIST_IS_EMPTY "+str(self.index)+"]: "+self.current_url) self.trials_log_file.close() -#### a=os.spawnl(os.P_NOWAIT,"/usr/bin/wget","wget",downloadfile_url_list.pop(), "-o",log_file) -# a=os.system("/bin/touch /usr/lib/portage/swrapper/touch.txt") -# a=os.system("wget www.mail.ru") -# print("boom:",a) -# spawn(["/bin/bash", "-c", "exec \"$@\""],'','') - def show_fetch_progress(self): - try: - fileHandle = open(self.wget_log_file_name,"r") - lineList = fileHandle.readlines() - fileHandle.close() - if len(lineList)>1: - progress_msg(self.place_in_the_list,lineList[-2],lineList[-1]) - except: - #error_msg("ERROR while reading"+self.wget_log_file_name) - pass def get_place_in_the_list(self): return self.place_in_the_list -def do_tasks(task_list,exit_flag): +def do_tasks(task_list): + global exit_flag msg(1,"TASK DIR: "+idfetch_settings.TASK_DIR) msg(0,"DOWNLOADING with twrapper...") @@ -185,10 +224,8 @@ def do_tasks(task_list,exit_flag): key='x' stdscr.nodelay(1) while (key != ord('q')) and (key != ord('Q')): - key = stdscr.getch() + for current_fetch_distfile_thread in running_fetch_distfile_thread_list: -# status_msg(current_fetch.index,"Status: "+report[current_fetch.status]) - current_fetch_distfile_thread.show_fetch_progress() if current_fetch_distfile_thread.status==0: if to_start_fetch_distfile_thread_list: #start next one on the same place in the list @@ -200,36 +237,60 @@ def do_tasks(task_list,exit_flag): starting_fetch_distfile_thread.start(current_fetch_distfile_thread.get_place_in_the_list()) time_msg() time.sleep(0.5) + key = stdscr.getch() + if (key == ord('q')) or (key == ord('Q')): + msg(28,">>>>>>>>>>>>>>>>> EXITING <<<<<<<<<<<<<<<<<<<<<<<<<<") exit_flag=1 -def main(task_list,exit_flag): +def main(task_list): try: + task_list=open_task_list() - do_tasks(task_list,exit_flag) + do_tasks(task_list) + msg(30,">>>>>>>>>>>> more exited <<<<<<<<<<<<<<") + finally: + for pid in mypids: + try: + if os.waitpid(pid, os.WNOHANG) == (0, 0): + os.kill(pid, signal.SIGTERM) + os.waitpid(pid, 0) + except OSError: + # This pid has been cleaned up outside + # of spawn(). + pass curses.nocbreak() stdscr.keypad(0) curses.echo() curses.endwin() print("twrapper exited") - sys.exit() -# if key== ord('q'): -# for current_fetch in fetchlist: -# current_fetch.join() -## print("Status for",current_fetch.download_file,"is",report[current_fetch.status]) +def cleanup(): + while spawned_pids: + pid = spawned_pids.pop() + try: + if os.waitpid(pid, os.WNOHANG) == (0, 0): + os.kill(pid, signal.SIGTERM) + os.waitpid(pid, 0) + except OSError: + # This pid has been cleaned up outside + # of spawn(). + pass + if __name__ == '__main__': exit_flag=0 task_list=[] + # mypids will hold the pids of all processes created. + mypids = [] stdscr = curses.initscr() max_y,max_x=stdscr.getmaxyx() curses.noecho() curses.cbreak() stdscr.keypad(1) -# win = curses.newwin(10, 30, 20, 0) -# win.addstr(1, 1, "Current") -# win.refresh() -# time.sleep(5) - main(task_list,exit_flag) + curses.curs_set(0) + try: + main(task_list) + finally: + sys.exit() |