Logo Search packages:      
Sourcecode: sadms version File versions  Download package

runner.py

#!/usr/bin/python
# -*-coding: UTF-8 -*-
# bbou@ac-toulouse.fr
# GPL License
# 2006-11-17 15:17:55    
# runner.py

import signal
import os
import commands
import string
import locale
import popen2,fcntl,select

#######################################################################

pe=locale.getpreferredencoding()
#print 'preferred encoding=%s' % pe

#convertIn=lambda x: x.decode('utf8')
def convertIn(x):
      try: 
            return x.decode('utf8')
      except: 
            return x.decode(pe)
            
convertOut=lambda x: 'env LANG=POSIX '+x.encode('utf8')

#if pe=='_UTF-8':
#convertIn=lambda x: x.decode('utf8')
#convertOut=lambda x: x.encode('utf8')
#elif pe=='_ISO-8859-15':
#convertIn=lambda x: x.decode('ISO-8859-15')
#convertOut=lambda x: x.encode('ISO-8859-15')   
#else:
#convertIn=lambda x: x
#convertOut=lambda x: x

#######################################################################

sysSetting={
      'ssh':'ssh -T -l %USER% %HOST% cd %DIRECTORY%;%COMMAND%',
      'sshVerbose':'ssh -T -l %USER% %HOST% echo "@running on `hostname`";cd %DIRECTORY%;%COMMAND%',
      'sshNoHome':'ssh -T -l %USER% %HOST% %COMMAND%',
}

# alternative syntax
# sysSetting={
#     'ssh':'ssh -T -l %USER% %HOST% <<SSHEOF\ncd %DIRECTORY%/\n%COMMAND%\nSSHEOF',
#     'sshVerbose':'ssh -T -l %USER% %HOST% 2> /dev/null <<SSHEOF\n( echo -n "@running on " && \neval hostname )\ncd %DIRECTORY%/\n%COMMAND%\nSSHEOF',
#     'sshNoHome':'ssh -T -l %USER% %HOST% 2> /dev/null <<SSHEOF\n%COMMAND%\nSSHEOF',
#}
# 
# requires prefixing function body of local2remote, local2remote_verbose, local2remotefromhome with
# command=command.replace('$','\\$')

class Context:
      pass

#######################################################################
#     RUNNER
#######################################################################

class BasicRunner:
      
      def __init__(self):
            self.pid2process={}
            return

      def toString(self):
            return "localhost:."

      #######################################################################
      
      # sink's interface
      # sinkClear
      # sinkWrite(string)
      # sinkWriteErr(string)
      # sinkFail(context)
      # sinkSuccess(context)

      # notifiee's interface
      # notify(int)

      def run(self,command,context,sink,notifiee):
            #print 'command='+command
            #print 'context='+context
            #print 'sink='+str(sink)
            #print 'notifiee='+str(notifiee)
            self.pipeTo((command,context,sink,notifiee))
            return

      #######################################################################
      
      def runToString(self,command):
            status,output=self.runToStringVerbose(command)
            if not status:
                  return False,''
            return True,output
      
      def runToStringVerbose(self,command):
            command=convertOut(command)
            #print command
            child=os.popen(command)
            output=''
            try: output=child.read()
            except: pass
            status=child.close()
            if status!=None:
                  #print '<%s> failed w/ exit code %d' %(command,status)
                  return False,output
            output=output.strip('\n\r ')
            output=convertIn(output)
            return True,output
      
      # returns error with complex multiline command line
      #def runToString(self,command):
      #     command=convertOut(command)
      #     status,output=commands.getstatusoutput(command)
      #     if status:
      #           #print '%s failed w/ exit code %d' %(command,result)
      #           return False,''
      #     return True,output.strip('\n\r ')

      # subprocess not known in 2.3 (debian)
      #import subprocess
      #def runToString(self,command):
      #     command=convertOut(command)
      #     #child=os.popen(command)
      #     #output=child.read()
      #     #status=child.close()
      #     child=subprocess.Popen(command,shell=True,stdout=subprocess.PIPE)
      #     child.wait()
      #     pipeout=child.stdout
      #     output=pipeout.read()
      #     pipeout.close()
      #     status=child.returncode
      #     if status!=0:
      #           print '<%s> failed w/ exit code %d' %(command,status)
      #           return False,""
      #     return True,output.strip('\n\r ')

      #######################################################################

      def pipeTo(self,params):
            context,child,outpipe,errpipe,outfd,errfd,sink,notifiee=self.pipeBegin(params)
            stopped = self.pipeLoop(outfd,errfd,outpipe,errpipe,sink)
            return self.pipeEnd(context,child,stopped,sink,notifiee)

      def pipeBegin(self,params):
            # params
            command,context,sink,notifiee=params
      
            # convert
            command=convertOut(command)

            # open child with 3 pipes
            child=popen2.Popen3(command,1)      # capture stdout and stderr from command
              
            # don't need to talk to child's stdin
            child.tochild.close()

            # pipes
            outpipe=child.fromchild 
            errpipe=child.childerr
            outfd=outpipe.fileno()
            errfd=errpipe.fileno()

            # register process
            self.stop=False
            self.pid2process[child.pid]=(child,outfd,errfd)

            # don't deadlock!
            self.makeNonBlocking(outfd)
            self.makeNonBlocking(errfd)
      
            # initialize sink
            sink.sinkClear()
      
            return context,child,outpipe,errpipe,outfd,errfd,sink,notifiee

      def pipeEnd(self,context,child,stopped,sink,notifiee):

            # wait for child to terminate
            exitstatus=-1
            if not stopped: 
                  status=child.wait()
                  exitstatus=os.WEXITSTATUS(status)
            else:
                  pid,status=os.waitpid(child.pid,0)
                  exitstatus=os.WEXITSTATUS(status)

            # remove pid from list
            if child.pid in self.pid2process.keys():
                  del self.pid2process[child.pid]

            # result
            if stopped or exitstatus!=0: 
                  sink.sinkFail(context,stopped,exitstatus)
            else:
                  sink.sinkSuccess(context)
            
            # notify
            notifiee.notify(context,exitstatus)
            return exitstatus
      
      def pipeLoop(self,outfd,errfd,outpipe,errpipe,sink):
            # read pipe loop
            result=False
            outeof=erreof=False
            while True:
                  # wait for input from either stdout or stderr
                  try:
                        ready=select.select([outfd,errfd],[],[])
                  except:
                        print 'select exception'
                        pass

                  # read stdin
                  if outfd in ready[0]:
                        outchunk=outpipe.readline()
                        output=str(outchunk)
                        if output=='':
                              outeof=True
                        output=convertIn(output)
                        sink.sinkWrite(output)
      
                  # read stderr
                  if errfd in ready[0]:
                        errchunk=errpipe.readline()
                        output=str(errchunk)
                        if output=='':
                              erreof=True
                        output=convertIn(output)
                        sink.sinkWriteErr(output)
            
                  # interrupted
                  if self.stop:
                        #print '\nstopped'
                        result=True
                        break

                  # check for end of streams
                  if outeof and erreof:
                        #print '\neof'
                        break
      
                  # give the child time to put more data in the buffer
                  # without this,the parent may try to read only a few chars at a time
                  # which can be very expensive
                  #continue
                  try:
                        select.select([],[],[],.1)
                  except:
                        print 'select exception'
                        pass
            return result

      def makeNonBlocking(self,fd):
            flags=fcntl.fcntl(fd,fcntl.F_GETFL)
            try:
                  fcntl.fcntl(fd,fcntl.F_SETFL,flags | fcntl.O_NDELAY)
            except AttributeError:
                  pass
            return

      #######################################################################
      
      def kill(self):
            # stop signal
            self.stop=True
              
            # unstack processes
            for pid in self.pid2process.keys():
                      try:
                        # kill
                        #print 'interrupt '+str(pid)
                        os.kill(pid,signal.SIGINT)
                        #print 'terminate '+str(pid)
                        os.kill(pid,signal.SIGTERM)
                        #print 'quit '+str(pid)
                        os.kill(pid,signal.SIGQUIT)
                        #print 'kill '+str(pid)
                        os.kill(pid,signal.SIGKILL)
                        #print 'killed '+str(pid)
                  except:
                        print 'kill exception'
                        pass
            return self.pid2process.keys()

#######################################################################
#     REMOTABLERUNNER
#######################################################################

class RemotableRunner(BasicRunner):

      def __init__(self,host='localhost',user='root',wdir='~'):
            BasicRunner.__init__(self)
            self.setLocal(host,user,wdir)
            return

      #######################################################################
      
      # overridden methods

      def run(self,command,context,sink,notifiee):
            if self.runRemote:
                  command=self.local2remote_verbose(command)
            return BasicRunner.run(self,command,context,sink,notifiee)
      
      def runToString(self,command):
            if self.runRemote:
                  command=self.local2remote(command); 
            return BasicRunner.runToString(self,command)

      #######################################################################
      
      def local2remote(self,command):
            cl=sysSetting['ssh']
            cl=cl.replace('%USER%',self.remoteUser)
            cl=cl.replace('%HOST%',self.remoteHost)
            cl=cl.replace('%DIRECTORY%',self.remoteDir)
            cl=cl.replace('%COMMAND%',command)
            #print cl
            return cl

      def local2remote_verbose(self,command):
            cl=sysSetting['sshVerbose']
            cl=cl.replace('%USER%',self.remoteUser)
            cl=cl.replace('%HOST%',self.remoteHost)
            cl=cl.replace('%DIRECTORY%',self.remoteDir)
            cl=cl.replace('%COMMAND%',command)
            #print cl
            return cl

      def local2remotefromhome(self,command):
            cl=sysSetting['sshNoHome']
            cl=cl.replace('%USER%',self.remoteUser)
            cl=cl.replace('%HOST%',self.remoteHost)
            cl=cl.replace('%DIRECTORY%',self.remoteDir)
            cl=cl.replace('%COMMAND%',command)
            #print cl
            return cl

      def get(self):
            return self.runRemote,self.remoteHost,self.remoteUser,self.remoteDir
            
      def set(self,mode,host,user,wdir):
            self.runRemote=mode
            self.remoteHost=host
            self.remoteUser=user
            self.remoteDir=wdir
            return
            
      def setRemote(self,host,user,wdir):
            if host=="" or user=="" or dir=="":
                  return False
            self.remoteHost=host
            self.remoteUser=user
            self.remoteDir=wdir
            self.key=None
            self.runRemote=True
            return True
            
      def setLocal(self,host='localhost',user='root',wdir='~'):
            self.remoteHost=host
            self.remoteUser=user
            self.remoteDir=wdir
            self.key=None
            self.runRemote=False
            return
            
      def toString(self):
            if self.runRemote:
                  return self.remoteUser+"@"+self.remoteHost+":"+self.remoteDir
            else:
                  return "localhost:."

#######################################################################
#     THHEADEDRUNNER
#######################################################################

import thread

class ThreadedRunner(RemotableRunner):

      def __init__(self,host='localhost',user='root',wdir='~'):
            RemotableRunner.__init__(self,host,user,wdir)
            return

      def run(self,command,context,sink,notifiee):
            #print 'command='+command
            #print 'context='+context
            #print 'sink='+str(sink)
            #print 'notifiee='+str(notifiee)
            thread.start_new(self.pipeTo,((command,context,sink,notifiee),))
            return

#######################################################################
#     GTKRUNNER
#######################################################################

import gtk
import gobject

######################################################################

class GtkRunner(RemotableRunner):

      def __init__(self,host='localhost',user='root',wdir='~'):
            RemotableRunner.__init__(self,host,user,wdir)
            return

      def pipeTo(self,params):
            context,child,outpipe,errpipe,outfd,errfd,sink,notifiee=self.pipeBegin(params)
            gobject.io_add_watch(outpipe,gtk.gdk.INPUT_READ|gtk.gdk.INPUT_EXCEPTION,self.sinkCallback,context,child,outpipe,sink,notifiee,False)
            gobject.io_add_watch(errpipe,gtk.gdk.INPUT_READ|gtk.gdk.INPUT_EXCEPTION,self.sinkCallback,context,child,errpipe,sink,notifiee,True)
            return None

      def sinkCallback(self,fd,cond,context,child,outpipe,sink,notifiee,isErr):
            done = False
            if cond & gtk.gdk.INPUT_READ:
                  eof,stopped=self.read(child,outpipe,fd,sink,isErr)
            if eof:
                  self.pipeEnd(context,child,stopped,sink,notifiee,isErr)
                  return False
            return True

      def read(self,child,outpipe,fd,sink,isErr):
            eof=False
            try:
                  ready=select.select([fd],[],[])
            except:
                  print 'select exception'
                  pass
            if fd in ready[0]:
                  outchunk=outpipe.readline()
                  output=str(outchunk)
                  if output=='':
                        eof=True
                  output=convertIn(output)
                  if isErr:
                        sink.sinkWriteErr(output)
                  else:
                        sink.sinkWrite(output)
            return eof,self.stop

      def pipeEnd(self,context,child,stopped,sink,notifiee,isErr):
            #exitstatus=child.poll()
            #print 'status=%s stopped=%s err=%s' % (exitstatus,stopped,isErr)
            if not isErr:
                  RemotableRunner.pipeEnd(self,context,child,stopped,sink,notifiee)
            return

######################################################################

class Runner(RemotableRunner):

      def __init__(self,host='localhost',user='root',wdir='~'):
            RemotableRunner.__init__(self,host,user,wdir)
            return

Generated by  Doxygen 1.6.0   Back to index