# -*- coding: utf-8 -*-
### Copyright (C) 2006-2012 Antonio Valentino <a_valentino@users.sf.net>
### This file is part of exectools.
### This module is free software; you can redistribute it and/or modify
### it under the terms of the GNU General Public License as published by
### the Free Software Foundation; either version 2 of the License, or
### (at your option) any later version.
### This module is distributed in the hope that it will be useful,
### but WITHOUT ANY WARRANTY; without even the implied warranty of
### MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
### GNU General Public License for more details.
### You should have received a copy of the GNU General Public License
### along with this module; if not, write to the Free Software
### Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
'''Tools for running external processes in a Qt4 GUI.'''
import time
import logging
try:
from qt import QtCore, QtGui
except ImportError:
# Select the PyQt API 2
import sip
sip.setapi('QDate', 2)
sip.setapi('QDateTime', 2)
sip.setapi('QString', 2)
sip.setapi('QTextStream', 2)
sip.setapi('QTime', 2)
sip.setapi('QUrl', 2)
sip.setapi('QVariant', 2)
from PyQt4 import QtCore, QtGui
QtCore.Signal = QtCore.pyqtSignal
QtCore.Slot = QtCore.pyqtSlot
from exectools import BaseOutputHandler, BaseToolController, EX_OK, level2tag
__author__ = 'Antonio Valentino <a_valentino@users.sf.net>'
__revision__ = '$Revision$'
__date__ = '$Date$'
__all__ = ['Qt4Blinker', 'Qt4OutputPlane', 'Qt4OutputHandler',
'Qt4LoggingHandler', 'Qt4DialogLoggingHandler', 'Qt4ToolController']
[docs]class Qt4Blinker(QtGui.QLabel):
'''Qt4 linker.
:SLOTS:
* :meth:`pulse`
'''
def __init__(self, parent=None, flags=QtCore.Qt.WindowFlags(0), **kwargs):
super(Qt4Blinker, self).__init__(parent, flags, **kwargs)
#qstyle = QtGui.QApplication.style()
#pixmap = qstyle.standardPixmap(QtGui.QStyle.SP_MediaStop)
pixmap = QtGui.QPixmap(
':/trolltech/styles/commonstyle/images/standardbutton-no-32.png')
self.setPixmap(pixmap)
@QtCore.Slot()
[docs] def pulse(self):
'''A blinker pulse.
:C++ signature: `void pulse()`
'''
sensitive = self.isEnabled()
sensitive = not sensitive
self.setEnabled(sensitive)
[docs]class Qt4OutputPlane(QtGui.QTextEdit):
#: SIGNAL: emits a hide request.
#:
#: :C++ signature: `void planeHideRequest()`
planeHideRequest = QtCore.Signal()
def __init__(self, parent=None, **kwargs):
super(Qt4OutputPlane, self).__init__(parent, **kwargs)
self._setupActions()
self.banner = None
def _setupActions(self):
qstype = QtGui.QApplication.style()
# Setup actions
self.actions = QtGui.QActionGroup(self)
# Save As
icon = qstype.standardIcon(QtGui.QStyle.SP_DialogSaveButton)
self.actionSaveAs = QtGui.QAction(icon, self.tr('&Save As'), self,
shortcut=self.tr('Ctrl+S'),
statusTip=self.tr(
'Save text to file'),
triggered=self.save)
self.actions.addAction(self.actionSaveAs)
# Clear
icon = QtGui.QIcon(':/trolltech/styles/commonstyle/images/'
'standardbutton-clear-32.png')
self.actionClear = QtGui.QAction(icon, self.tr('&Clear'), self,
shortcut=self.tr('Shift+F5'),
statusTip=self.tr('Clear the text'),
triggered=self.clear)
self.actions.addAction(self.actionClear)
# Close
icon = qstype.standardIcon(QtGui.QStyle.SP_DialogCloseButton)
self.actionHide = QtGui.QAction(icon, self.tr('&Hide'), self,
shortcut=self.tr('Ctrl+W'),
statusTip=self.tr(
'Hide the text plane'),
triggered=self.planeHideRequest)
self.actions.addAction(self.actionHide)
[docs] def contextMenuEvent(self, event):
menu = QtGui.QTextEdit.createStandardContextMenu(self)
menu.addSeparator()
menu.addActions(self.actions.actions())
menu.exec_(event.globalPos())
def _report(self):
if callable(self.banner):
header = self.banner()
elif self.banner is not None:
header = self.banner
else:
header = '# Output log generated on %s' % time.asctime()
text = self.toPlainText()
return '%s\n\n%s' % (header, text)
# def clear(self): # it is a standard QtGui.QTextEdit method
[docs] def save(self):
'''Save a file.'''
filter_ = self.tr('Text files (*.txt)')
filename, _ = QtGui.QFileDialog.getSaveFileNameAndFilter(self, '', '',
filter_)
if filename:
text = self._report()
logfile = open(filename, 'w')
logfile.write(text)
logfile.close()
[docs]class Qt4OutputHandler(QtCore.QObject, BaseOutputHandler):
'''Qt4 Output Handler.
:SIGNALS:
* :attr:`pulse`
* :attr:`percentageChanged`
'''
_statusbar_timeout = 2000 # ms
#: SIGNAL: it is emitted to signal some kind of activity of the external
#: process
#:
#: :param str text:
#: an optional text describing the kind activity of the external
#: process
#:
#: :C++ signature: `void pulse(QString)`
pulse = QtCore.Signal([], [str])
#: SIGNAL: it is emitted when the progress percentage changes
#:
#: :param float percentage:
#: the new completion percentage [0, 100]
#:
#: :C++ signature: `void percentageChanged(float)`
percentageChanged = QtCore.Signal([int], [])
def __init__(self, logger=None, statusbar=None, progressbar=None,
blinker=None, parent=None, **kwargs):
QtCore.QObject.__init__(self, parent, **kwargs)
BaseOutputHandler.__init__(self, logger)
self.statusbar = statusbar
if self.statusbar:
if blinker is None:
blinker = Qt4Blinker()
statusbar.addPermanentWidget(blinker)
blinker.hide()
self.pulse.connect(blinker.show)
self.pulse.connect(blinker.pulse)
self.pulse[str].connect(lambda text:
statusbar.showMessage(text, self._statusbar_timeout))
if progressbar is None:
progressbar = QtGui.QProgressBar(self.statusbar)
progressbar.setTextVisible(True)
statusbar.addPermanentWidget(progressbar) # , 1) # stretch=1
progressbar.hide()
self.progressbar = progressbar
#self.percentageChanged[()].connect(progressbar.show)
self.percentageChanged.connect(progressbar.show)
self.percentageChanged.connect(progressbar.setValue)
self.progressbar = progressbar
self.blinker = blinker
[docs] def feed(self, data):
'''Feed some data to the parser.
It is processed insofar as it consists of complete elements;
incomplete data is buffered until more data is fed or close()
is called.
'''
if self.blinker:
self.blinker.show()
super(Qt4OutputHandler, self).feed(data)
[docs] def close(self):
'''Reset the instance.'''
if self.statusbar:
self.statusbar.clearMessage()
super(Qt4OutputHandler, self).close()
[docs] def reset(self):
'''Reset the handler instance.
Loses all unprocessed data. This is called implicitly at
instantiation time.
'''
super(Qt4OutputHandler, self).reset()
if self.progressbar:
self.progressbar.setRange(0, 100)
self.progressbar.reset()
self.progressbar.hide()
if self.blinker:
self.blinker.reset()
self.blinker.hide()
[docs] def handle_progress(self, data):
'''Handle progress data.
:param data:
a list containing an item for each named group in the
"progress" regular expression: (pulse, percentage, text)
for the default implementation.
Each item can be None.
'''
#pulse = data.get('pulse')
percentage = data.get('percentage')
text = data.get('text')
self.pulse.emit()
if text:
self.pulse[str].emit(text)
if percentage is not None:
self.percentageChanged.emit(int(percentage))
[docs]class Qt4LoggingHandler(logging.Handler):
'''Custom handler for logging on Qt4 textviews.'''
def __init__(self, textview):
logging.Handler.__init__(self)
self.textview = textview
self._formats = self._setupFormats()
def _setupFormats(self):
'''Setup a different format for the different message types.'''
fmap = {}
fmt = QtGui.QTextCharFormat()
fmt.setForeground(QtGui.QColor('red'))
fmap['error'] = fmt
fmt = QtGui.QTextCharFormat()
fmt.setForeground(QtGui.QColor('orange'))
fmap['warning'] = fmt
fmt = QtGui.QTextCharFormat()
fmt.setForeground(QtGui.QColor('blue'))
fmap['info'] = fmt
fmt = QtGui.QTextCharFormat()
fmt.setForeground(QtGui.QColor('gray'))
fmap['debug'] = fmt
fmt = QtGui.QTextCharFormat()
fmt.setFontWeight(QtGui.QFont.Bold)
fmap['cmd'] = fmt
return fmap
def _flush(self):
QtGui.qApp.processEvents()
def _write(self, data, format_=None):
'''Write data on the textview.'''
if isinstance(format_, basestring):
format_ = self._formats.get(format_, '')
if data and not data.endswith('\n'):
data += '\n'
if format_:
oldFormat = self.textview.currentCharFormat()
self.textview.setCurrentCharFormat(format_)
self.textview.insertPlainText(data)
self.textview.setCurrentCharFormat(oldFormat)
else:
self.textview.insertPlainText(data)
self.textview.ensureCursorVisible()
[docs] def emit(self, record):
try:
msg = self.format(record)
tag = getattr(record, 'tag', level2tag(record.levelno))
self._write('%s' % msg, tag)
# @TODO: check
#self._flush()
except (KeyboardInterrupt, SystemExit):
raise
except:
self.handleError(record)
[docs]class Qt4DialogLoggingHandler(logging.Handler):
'''Qt4 handler for the logging dialog.'''
levelsmap = {
logging.CRITICAL: QtGui.QMessageBox.Critical,
# FATAL = CRITICAL
logging.ERROR: QtGui.QMessageBox.Critical,
logging.WARNING: QtGui.QMessageBox.Warning,
# WARN = WARNING
logging.INFO: QtGui.QMessageBox.Information,
logging.DEBUG: QtGui.QMessageBox.Information,
logging.NOTSET: QtGui.QMessageBox.Information,
}
def __init__(self, dialog=None, parent=None):
logging.Handler.__init__(self)
if dialog is None:
# @TODO: check
#~ if parent is None:
#~ parent = QtGui.qApp.mainWidget()
dialog = QtGui.QMessageBox(parent)
dialog.addButton(QtGui.QMessageBox.Close)
# @TODO: set dialog title
dialog.setTextFormat(QtCore.Qt.AutoText)
self.dialog = dialog
self.formatter = None
[docs] def emit(self, record):
try:
if self.dialog.isVisible():
raise RuntimeError('trying to show again a dialog that is '
'already visible.')
msgtype = self.levelsmap[record.levelno]
self.dialog.setIcon(msgtype)
level = logging.getLevelName(record.levelno)
level = level.upper()
self.dialog.setWindowTitle(level)
msg = ['<h1>%s</h1>' % level]
if record.exc_info:
msg.append('<p><b>%s<b></p><br>' %
record.getMessage().capitalize())
# @TODO: background-color="white"
msg.append('<pre>%s</pre>' % self.format(record))
else:
msg.append('<p>%s</p>' % self.format(record).capitalize())
msg = '\n'.join(msg)
self.dialog.setText(msg)
self.dialog.exec_()
self.dialog.hide()
except (KeyboardInterrupt, SystemExit):
raise
except:
self.handleError(record)
[docs]class Qt4ToolController(QtCore.QObject, BaseToolController):
'''Qt4 tool controller.
:SIGNALS:
* :attr:`finished`
:SLOTS:
* :meth:`stop_tool`
* :meth:`finalize_run`
* :meth:`handle_stdout`
* :meth:`handle_stderr`
* :meth:`handle_error`
'''
_delay_after_stop = 200 # ms
#: SIGNAL: it is emitted when the processing is finished.
#:
#: :param int exitcode:
#: the external proces exit code
#:
#: :C++ signature: `void finished(int exitCode)`
finished = QtCore.Signal(int)
def __init__(self, logger=None, parent=None, **kwargs):
QtCore.QObject.__init__(self, parent, **kwargs)
BaseToolController.__init__(self, logger)
self.subprocess = QtCore.QProcess(parent)
self.subprocess.setProcessChannelMode(QtCore.QProcess.MergedChannels)
# connect process handlers and I/O handlers
self.subprocess.readyReadStandardOutput.connect(self.handle_stdout)
self.subprocess.readyReadStandardError.connect(self.handle_stderr)
self.subprocess.error.connect(self.handle_error)
self.subprocess.finished.connect(self.finalize_run)
@property
[docs] def isbusy(self):
'''If True then the controller is already running a subprocess.'''
return self.subprocess.state() != self.subprocess.NotRunning
@QtCore.Slot(int, QtCore.QProcess.ExitStatus)
[docs] def finalize_run(self, exitCode=None, exitStatus=None):
'''Perform finalization actions.
This method is called when the controlled process terminates
to perform finalization actions like:
* read and handle residual data in buffers,
* flush and close output handlers,
* close subprocess file descriptors
* run the "finalize_run_hook" method
* reset the controller instance
If one just needs to perfor some additional finalization action
it should be better to use a custom "finalize_run_hook" instead
of overriging "finalize_run".
:C++ signature: `finalize_run(int, QProcess::ExitStatus)`
'''
if not self._tool:
return
out_encoding = self._tool.output_encoding
try:
# retrieve residual data
# @TODO: check if it is actually needed
if self._tool.stdout_handler:
byteArray = self.subprocess.readAllStandardOutput()
data = byteArray.data().decode(out_encoding)
self._tool.stdout_handler.feed(data)
if self._tool.stderr_handler:
byteArray = self.subprocess.readAllStandardError()
data = byteArray.data().decode(out_encoding)
self._tool.stderr_handler.feed(data)
# close the pipe and wait for the subprocess termination
self.subprocess.close()
if self._tool.stdout_handler:
self._tool.stdout_handler.close()
if self._tool.stderr_handler:
self._tool.stderr_handler.close()
if self._userstop:
self.logger.info('Execution stopped by the user.')
elif exitCode != EX_OK:
msg = ('Process (PID=%d) exited with return code %d.' %
(self.subprocess.pid(),
self.subprocess.exitCode()))
self.logger.warning(msg)
# Call finalize hook if available
self.finalize_run_hook()
finally:
# @TODO: check
# Protect for unexpected errors in the feed and close methods of
# the stdout_handler
self._reset()
self.finished.emit(exitCode)
def _reset(self):
'''Internal reset.'''
if self.subprocess.state() != self.subprocess.NotRunning:
self._stop(force=True)
self.subprocess.waitForFinished()
stopped = self.subprocess.state() == self.subprocess.NotRunning
if not stopped:
self.logger.warning('reset running process (PID=%d)' %
self.subprocess.pid())
assert self.subprocess.state() == self.subprocess.NotRunning, \
'the process is still running'
self.subprocess.setProcessState(self.subprocess.NotRunning)
# @TODO: check
self.subprocess.closeReadChannel(QtCore.QProcess.StandardOutput)
self.subprocess.closeReadChannel(QtCore.QProcess.StandardError)
self.subprocess.closeWriteChannel()
super(Qt4ToolController, self)._reset()
self.subprocess.close()
@QtCore.Slot()
[docs] def handle_stdout(self):
'''Handle standard output.
:C++ signature: `void handle_stdout()`
'''
byteArray = self.subprocess.readAllStandardOutput()
if not byteArray.isEmpty():
data = byteArray.data().decode(self._tool.output_encoding)
self._tool.stdout_handler.feed(data)
@QtCore.Slot()
[docs] def handle_stderr(self):
'''Handle standard error.
:C++ signature: `void handle_stderr()`
'''
byteArray = self.subprocess.readAllStandardError()
if not byteArray.isEmpty():
data = byteArray.data().decode(self._tool.output_encoding)
self._tool.stderr_handler.feed(data)
@QtCore.Slot(QtCore.QProcess.ProcessError)
[docs] def handle_error(self, error):
'''Handle a error in process execution.
Can be handle different types of errors:
* starting failed
* crashing after starts successfully
* timeout elapsed
* write error
* read error
* unknow error
:C++ signature: `void handle_error(QProcess::ProcessError)`
'''
msg = ''
level = logging.DEBUG
if self.subprocess.state() == self.subprocess.NotRunning:
logging.debug('NotRunning')
exit_code = self.subprocess.exitCode()
else:
exit_code = 0
if error == QtCore.QProcess.FailedToStart:
msg = ('The process failed to start. Either the invoked program '
'is missing, or you may have insufficient permissions to '
'invoke the program.')
level = logging.ERROR
# @TODO: check
#self._reset()
elif error == QtCore.QProcess.Crashed:
if not self._userstop and self.subprocess.exitCode() == EX_OK:
msg = ('The process crashed some time after starting '
'successfully.')
level = logging.ERROR
elif error == QtCore.QProcess.Timedout:
msg = ('The last waitFor...() function timed out. The state of '
'QProcess is unchanged, and you can try calling '
'waitFor...() again.')
level = logging.DEBUG
elif error == QtCore.QProcess.WriteError:
msg = ('An error occurred when attempting to write to the process.'
' For example, the process may not be running, or it may '
'have closed its input channel.')
#level = logging.ERROR # @TODO: check
elif error == QtCore.QProcess.ReadError:
msg = ('An error occurred when attempting to read from the '
'process. For example, the process may not be running.')
#level = logging.ERROR # @TODO: check
elif error == QtCore.QProcess.UnknownError:
msg = ('An unknown error occurred. This is the default return '
'value of error().')
#level = logging.ERROR # @TODO: check
if msg:
self.logger.log(level, msg)
self.finished.emit(exit_code)
#QtCore.Slot() # @TODO: check how to handle varargs
[docs] def run_tool(self, tool, *args, **kwargs):
'''Run an external tool in controlled way.
The output of the child process is handled by the controller
and, optionally, notifications can be achieved at sub-process
termination.
'''
assert self.subprocess.state() == self.subprocess.NotRunning
self.reset()
self._tool = tool
if self._tool.stdout_handler:
self._tool.stdout_handler.reset()
if self._tool.stderr_handler:
self._tool.stderr_handler.reset()
cmd = self._tool.cmdline(*args, **kwargs)
self.prerun_hook(cmd)
cmd = ' '.join(cmd)
if self._tool.env:
qenv = ['%s=%s' % (key, val)
for key, val in self._tool.env.iteritems()]
self.subprocess.setEnvironment(qenv)
if self._tool.cwd:
self.subprocess.setWorkingDirectory(self._tool.cwd)
self.logger.debug('"shell" flag set to %s.' % self._tool.shell)
self.logger.debug('Starting: %s' % cmd)
self.subprocess.start(cmd)
self.subprocess.closeWriteChannel()
def _stop(self, force=True):
if self.subprocess.state() == self.subprocess.NotRunning:
return
self.subprocess.terminate()
self.subprocess.waitForFinished(self._delay_after_stop)
stopped = self.subprocess.state() == self.subprocess.NotRunning
if not stopped and force:
self.logger.info('Force process termination (PID=%d).' %
self.subprocess.pid())
self.subprocess.kill()
@QtCore.Slot()
@QtCore.Slot(bool)
[docs] def stop_tool(self, force=True):
'''Stop the execution of controlled subprocess.
When this method is invoked the controller instance is always
reset even if the controller is unable to stop the subprocess.
When possible the controller try to kill the subprocess in a
polite way. If this fails it also tryes brute killing by
default (force=True). This behaviour can be controlled using
the `force` parameter.
:C++ signature: `void stop_tool(bool)`
'''
if self._userstop:
return
if self.subprocess.state() != self.subprocess.NotRunning:
self.logger.debug('Execution stopped by the user.')
self._userstop = True
self._stop(force)
self.subprocess.waitForFinished()
stopped = self.subprocess.state() == self.subprocess.NotRunning
if not stopped:
msg = ('Unable to stop the sub-process (PID=%d).' %
self.subprocess.pid())
self.logger.warning(msg)