Документ взят из кэша поисковой машины. Адрес оригинального документа : http://kodomo.fbb.msu.ru/hg/allpy/raw-rev/c1091715f8a3
Дата изменения: Unknown
Дата индексирования: Tue Oct 2 07:50:38 2012
Кодировка:

# HG changeset patch
# User Daniil Alexeyevsky
# Date 1338543782 -14400
# Node ID c1091715f8a3bb21f2282ef5bfcfc0be608ddb2d
# Parent a3e439d09322959035dcda8467e3e65c11c8b51d
Added allpy.util.Silence to faciliate running most code with as little noise as possible

This class is intended for use around noisy chunks of code using `with`, e.g.::

with Silence():
structure = PDBParser().load_pdb(file)

This will produce no output to stderr if no exceptions are raised in the code.
Otherwise, all output will be printed as if the code was run without this
wrapper. See docstrings for more documentation.

diff -r a3e439d09322 -r c1091715f8a3 allpy/util.py
--- a/allpy/util.py Thu May 31 15:23:37 2012 +0400
+++ b/allpy/util.py Fri Jun 01 13:43:02 2012 +0400
@@ -2,6 +2,9 @@
"""
import sys
import warnings
+import os
+from tempfile import mkstemp
+from StringIO import StringIO

def unzip(seq):
"""The oppozite of zip() builtin."""
@@ -44,4 +47,81 @@
"""Warn about function being deprecated."""
warnings.warn(message, DeprecationWarning, stacklevel=2)

+class Silence(object):
+ """Context manager for use with `with`.
+
+ Run code in context without any message to the screen, but show all output
+ that was there if an error occured. E.g.::
+
+ with Silence():
+ structure = PDBParser().get_structure(name, file)
+
+ There are two mutually exclusive ways of silencing:
+
+ 1. By replacing OS'es stdout/stderr with tempfile (called `dup`)
+ 2. By replacing python's sys.stdout/sys.stderr with StringIO (`hide`)
+
+ For each of ways you may specify values: 'stdout', 'stderr' or 'both'.
+ E.g::
+
+ with Silence(dup="both"):
+ check_call(["mucle", alignment])
+
+ `Silence()` is equivalent to `Silence(hide='stderr')`
+ """
+
+ def __init__(self, hide=None, dup=None):
+ if dup is None and hide is None:
+ hide = "stderr"
+ assert not (dup is not None and hide is not None)
+ self.stderr = self.stdout = self.dup_stderr = self.dup_stdout = False
+ if dup == "stdout" or dup == "both":
+ self.dup_stdout = True
+ if dup == "stderr" or dup == "both":
+ self.dup_stderr = True
+ if hide == "stdout" or hide == "both":
+ self.stdout = True
+ if hide == "stderr" or hide == "both":
+ self.stderr = True
+
+ def __enter__(self):
+ assert not (self.stderr and self.dup_stderr)
+ assert not (self.stdout and self.dup_stdout)
+ if self.dup_stdout or self.dup_stderr:
+ fd, name = mkstemp(prefix="allpy-silent-")
+ self.captured_dup = os.fdopen(fd)
+ self.captured_dup_name = name
+ if self.dup_stdout:
+ self.dup_stdout = os.dup(sys.stdout.fileno())
+ os.dup2(self.captured_dup.fileno(), sys.stdout.fileno())
+ if self.dup_stderr:
+ self.dup_stderr = os.dup(sys.stderr.fileno())
+ os.dup2(self.captured_dup.fileno(), sys.stderr.fileno())
+ if self.stdout or self.stderr:
+ self.captured_output = StringIO()
+ if self.stdout:
+ self.stdout, sys.stdout = sys.stdout, self.captured_output
+ if self.stderr:
+ self.stderr, sys.stderr = sys.stderr, self.captured_output
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ if self.stdout or self.stderr:
+ if self.stdout:
+ sys.stdout = self.stdout
+ if self.stderr:
+ sys.stderr = self.stderr
+ if exc_type is not None:
+ sys.stderr.write(self.captured_output.getvalue())
+ if self.dup_stdout or self.dup_stderr:
+ if self.dup_stdout:
+ os.dup2(self.dup_stdout, sys.stdout.fileno())
+ os.close(self.dup_stdout)
+ if self.dup_stderr:
+ os.dup2(self.dup_stderr, sys.stderr.fileno())
+ os.close(self.dup_stderr)
+ if exc_type is not None:
+ self.captured_dup.seek(0)
+ sys.stderr.write(self.captured_dup.read())
+ os.unlink(self.captured_dup_name)
+
# vim: set et ts=4 sts=4 sw=4:
diff -r a3e439d09322 -r c1091715f8a3 test/test_silence.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/test/test_silence.py Fri Jun 01 13:43:02 2012 +0400
@@ -0,0 +1,95 @@
+"""Test for allpy.util.Silence
+
+>>> print check_output(["python", __file__], stderr=STDOUT)
+Example1
+Stdout must be seen (only stderr silenced)
+Example2
+Example3
+Stdout must be seen (error raised)
+Example4
+Stdout must be seen (only stderr silenced)
+Example5
+Example6
+Stdout must be seen (error raised)
+Stderr must be seen too
+...
+done
+
+
+"""
+import sys
+from subprocess import call, check_call, Popen, PIPE, STDOUT
+from allpy.util import Silence
+
+def check_output(*args, **kws):
+ """`check_output` is only introduced in 2.7, we need it NOW"""
+ p = Popen(stdout=PIPE, *args, **kws)
+ out, _ = p.communicate()
+ assert p.poll() == 0
+ return out
+
+class Unbuffered(object):
+ """Fix python stdIO buffering by wrapping this around stdout/stderr.
+
+ Python uses different buffering strategies in interactive and
+ non-interactive invocations making this test output look ugly and
+ difficult to understand. This class is a crutch for the problem.
+ """
+ def __init__(self, stream):
+ self.stream = stream
+ def write(self, data):
+ self.stream.write(data)
+ self.stream.flush()
+ def __getattr__(self, attr):
+ return getattr(self.stream, attr)
+
+if __name__ == "__main__":
+
+ # Only make uniform buffering when running as __main__ program.
+ # Otherwise, importing this module would cripple interactive interpreter.
+ sys.stdout = Unbuffered(sys.stdout)
+ sys.stderr = Unbuffered(sys.stderr)
+
+ print "Example1"
+
+ with Silence():
+ print "Stdout must be seen (only stderr silenced)"
+ sys.stderr.write("Stderr must not be seen")
+
+ print "Example2"
+
+ with Silence(hide="both"):
+ print "Stdout must not be seen (both silenced)"
+
+ print "Example3"
+
+ try:
+ with Silence(hide="both"):
+ print "Stdout must be seen (error raised)"
+ raise Exception()
+ except Exception:
+ pass
+
+ call(["echo", "Example4"])
+
+ with Silence(dup="stderr"):
+ check_call(["echo", "Stdout must be seen (only stderr silenced)"])
+ check_call(["echo", "Stderr must not be seen"], stdout=sys.stderr)
+
+ call(["echo", "Example5"])
+
+ with Silence(dup="both"):
+ check_call(["echo", "Stdout must not be seen (both silenced)"])
+
+ call(["echo", "Example6"])
+
+ try:
+ with Silence(dup="both"):
+ check_call(["echo", "Stdout must be seen (error raised)"])
+ check_call(["echo", "Stderr must be seen too"], stdout=sys.stderr)
+ check_call(["false"])
+ except Exception:
+ pass
+
+ print "..."
+ call(["echo", "done"])