| # Copyright 2015 The Chromium Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """RPC compatible subprocess-type module. |
| |
| This module defined both a task-side process class as well as a controller-side |
| process wrapper for easier access and usage of the task-side process. |
| """ |
| |
| import logging |
| import os |
| import subprocess |
| import sys |
| import threading |
| import time |
| |
| from legion.lib import common_lib |
| from utils import subprocess42 |
| |
| |
| class TimeoutError(Exception): |
| pass |
| |
| |
| class ControllerProcessWrapper(object): |
| """Controller-side process wrapper class. |
| |
| This class provides a more intuitive interface to task-side processes |
| than calling the methods directly using the RPC object. |
| """ |
| |
| def __init__(self, rpc, cmd, verbose=False, detached=False, cwd=None, |
| key=None, shell=None): |
| logging.debug('Creating a process with cmd=%s', cmd) |
| self._rpc = rpc |
| self._key = rpc.subprocess.Process(cmd, key) |
| logging.debug('Process created with key=%s', self._key) |
| if verbose: |
| self._rpc.subprocess.SetVerbose(self._key) |
| if detached: |
| self._rpc.subprocess.SetDetached(self._key) |
| if cwd: |
| self._rpc.subprocess.SetCwd(self._key, cwd) |
| if shell: |
| self._rpc.subprocess.SetShell(self._key) |
| self._rpc.subprocess.Start(self._key) |
| |
| @property |
| def key(self): |
| return self._key |
| |
| def Terminate(self): |
| logging.debug('Terminating process %s', self._key) |
| return self._rpc.subprocess.Terminate(self._key) |
| |
| def Kill(self): |
| logging.debug('Killing process %s', self._key) |
| self._rpc.subprocess.Kill(self._key) |
| |
| def Delete(self): |
| return self._rpc.subprocess.Delete(self._key) |
| |
| def GetReturncode(self): |
| return self._rpc.subprocess.GetReturncode(self._key) |
| |
| def ReadStdout(self): |
| """Returns all stdout since the last call to ReadStdout. |
| |
| This call allows the user to read stdout while the process is running. |
| However each call will flush the local stdout buffer. In order to make |
| multiple calls to ReadStdout and to retain the entire output the results |
| of this call will need to be buffered in the calling code. |
| """ |
| return self._rpc.subprocess.ReadStdout(self._key) |
| |
| def ReadStderr(self): |
| """Returns all stderr read since the last call to ReadStderr. |
| |
| See ReadStdout for additional details. |
| """ |
| return self._rpc.subprocess.ReadStderr(self._key) |
| |
| def ReadOutput(self): |
| """Returns the (stdout, stderr) since the last Read* call. |
| |
| See ReadStdout for additional details. |
| """ |
| return self._rpc.subprocess.ReadOutput(self._key) |
| |
| def Wait(self, timeout=None): |
| return self._rpc.subprocess.Wait(self._key, timeout) |
| |
| def Poll(self): |
| return self._rpc.subprocess.Poll(self._key) |
| |
| def GetPid(self): |
| return self._rpc.subprocess.GetPid(self._key) |
| |
| |
| class Process(object): |
| """Implements a task-side non-blocking subprocess. |
| |
| This non-blocking subprocess allows the caller to continue operating while |
| also able to interact with this subprocess based on a key returned to |
| the caller at the time of creation. |
| |
| Creation args are set via Set* methods called after calling Process but |
| before calling Start. This is due to a limitation of the XML-RPC |
| implementation not supporting keyword arguments. |
| """ |
| |
| _processes = {} |
| _process_next_id = 0 |
| _creation_lock = threading.Lock() |
| |
| def __init__(self, cmd, key): |
| self.stdout = '' |
| self.stderr = '' |
| self.key = key |
| self.cmd = cmd |
| self.proc = None |
| self.cwd = None |
| self.shell = False |
| self.verbose = False |
| self.detached = False |
| self.complete = False |
| self.data_lock = threading.Lock() |
| self.stdout_file = open(self._CreateOutputFilename('stdout'), 'wb+') |
| self.stderr_file = open(self._CreateOutputFilename('stderr'), 'wb+') |
| |
| def _CreateOutputFilename(self, fname): |
| return os.path.join(common_lib.GetOutputDir(), '%s.%s' % (self.key, fname)) |
| |
| def __str__(self): |
| return '%r, cwd=%r, verbose=%r, detached=%r' % ( |
| self.cmd, self.cwd, self.verbose, self.detached) |
| |
| def _reader(self): |
| for pipe, data in self.proc.yield_any(): |
| with self.data_lock: |
| if pipe == 'stdout': |
| self.stdout += data |
| self.stdout_file.write(data) |
| self.stdout_file.flush() |
| if self.verbose: |
| sys.stdout.write(data) |
| else: |
| self.stderr += data |
| self.stderr_file.write(data) |
| self.stderr_file.flush() |
| if self.verbose: |
| sys.stderr.write(data) |
| self.complete = True |
| |
| @classmethod |
| def KillAll(cls): |
| for key in cls._processes: |
| cls.Kill(key) |
| |
| @classmethod |
| def Process(cls, cmd, key=None): |
| with cls._creation_lock: |
| if not key: |
| key = 'Process%d' % cls._process_next_id |
| cls._process_next_id += 1 |
| if key in cls._processes: |
| raise KeyError('Key %s already in use' % key) |
| logging.debug('Creating process %s with cmd %r', key, cmd) |
| cls._processes[key] = cls(cmd, key) |
| return key |
| |
| def _Start(self): |
| logging.info('Starting process %s', self) |
| self.proc = subprocess42.Popen(self.cmd, stdout=subprocess42.PIPE, |
| stderr=subprocess42.PIPE, |
| detached=self.detached, cwd=self.cwd, |
| shell=self.shell) |
| threading.Thread(target=self._reader).start() |
| |
| @classmethod |
| def Start(cls, key): |
| cls._processes[key]._Start() |
| |
| @classmethod |
| def SetCwd(cls, key, cwd): |
| """Sets the process's cwd.""" |
| logging.debug('Setting %s cwd to %s', key, cwd) |
| cls._processes[key].cwd = cwd |
| |
| @classmethod |
| def SetShell(cls, key): |
| """Sets the process's shell arg to True.""" |
| logging.debug('Setting %s.shell = True', key) |
| cls._processes[key].shell = True |
| |
| @classmethod |
| def SetDetached(cls, key): |
| """Creates a detached process.""" |
| logging.debug('Setting %s.detached = True', key) |
| cls._processes[key].detached = True |
| |
| @classmethod |
| def SetVerbose(cls, key): |
| """Sets the stdout and stderr to be emitted locally.""" |
| logging.debug('Setting %s.verbose = True', key) |
| cls._processes[key].verbose = True |
| |
| @classmethod |
| def Terminate(cls, key): |
| logging.debug('Terminating process %s', key) |
| cls._processes[key].proc.terminate() |
| |
| @classmethod |
| def Kill(cls, key): |
| logging.debug('Killing process %s', key) |
| cls._processes[key].proc.kill() |
| |
| @classmethod |
| def Delete(cls, key): |
| if cls.GetReturncode(key) is None: |
| logging.warning('Killing %s before deleting it', key) |
| cls.Kill(key) |
| logging.debug('Deleting process %s', key) |
| cls._processes.pop(key) |
| |
| @classmethod |
| def GetReturncode(cls, key): |
| return cls._processes[key].proc.returncode |
| |
| @classmethod |
| def ReadStdout(cls, key): |
| """Returns all stdout since the last call to ReadStdout. |
| |
| This call allows the user to read stdout while the process is running. |
| However each call will flush the local stdout buffer. In order to make |
| multiple calls to ReadStdout and to retain the entire output the results |
| of this call will need to be buffered in the calling code. |
| """ |
| proc = cls._processes[key] |
| with proc.data_lock: |
| # Perform a "read" on the stdout data |
| stdout = proc.stdout |
| proc.stdout = '' |
| return stdout |
| |
| @classmethod |
| def ReadStderr(cls, key): |
| """Returns all stderr read since the last call to ReadStderr. |
| |
| See ReadStdout for additional details. |
| """ |
| proc = cls._processes[key] |
| with proc.data_lock: |
| # Perform a "read" on the stderr data |
| stderr = proc.stderr |
| proc.stderr = '' |
| return stderr |
| |
| @classmethod |
| def ReadOutput(cls, key): |
| """Returns the (stdout, stderr) since the last Read* call. |
| |
| See ReadStdout for additional details. |
| """ |
| return cls.ReadStdout(key), cls.ReadStderr(key) |
| |
| @classmethod |
| def Wait(cls, key, timeout=None): |
| """Wait for the process to complete. |
| |
| We wait for all of the output to be written before returning. This solves |
| a race condition found on Windows where the output can lag behind the |
| wait call. |
| |
| Raises: |
| TimeoutError if the process doesn't finish in the specified timeout. |
| """ |
| end = None if timeout is None else timeout + time.time() |
| while end is None or end > time.time(): |
| if cls._processes[key].complete: |
| return |
| time.sleep(0.05) |
| raise TimeoutError() |
| |
| @classmethod |
| def Poll(cls, key): |
| return cls._processes[key].proc.poll() |
| |
| @classmethod |
| def GetPid(cls, key): |
| return cls._processes[key].proc.pid |