Документ взят из кэша поисковой машины. Адрес оригинального документа : http://kodomo.fbb.msu.ru/hg/allpy/raw-rev/21cfc7897a8f
Дата изменения: Unknown
Дата индексирования: Tue Oct 2 08:12:25 2012
Кодировка:

# HG changeset patch
# User Daniil Alexeyevsky
# Date 1310063541 -14400
# Node ID 21cfc7897a8feeca4a4925e878f5c2803ca56e13
# Parent 4e0312f00a6cc4294bb91016f941c9af526a9621
Implemented markup fileIO (closes #56)

This is done by adding file format 'markup' or 'markup:formatname', where
'formatname' is otherwise known alignment format.

The file format for is described briefly in fileio.MarkupFile docstrings.

This commit also contains example of defining Markup saving mixin:
markups.IntMarkupMixin and a test for it.

diff -r 4e0312f00a6c -r 21cfc7897a8f allpy/base.py
--- a/allpy/base.py Thu Jul 07 22:27:14 2011 +0400
+++ b/allpy/base.py Thu Jul 07 22:32:21 2011 +0400
@@ -504,6 +504,13 @@
def refresh(self):
pass

+ @classmethod
+ def from_record(cls, alignment, record, name=None):
+ return cls(alignment, name)
+
+ def to_record(self):
+ return {}
+
class SequenceMarkup(Markup):

def __init__(self, sequence, name=None):
diff -r 4e0312f00a6c -r 21cfc7897a8f allpy/fileio.py
--- a/allpy/fileio.py Thu Jul 07 22:27:14 2011 +0400
+++ b/allpy/fileio.py Thu Jul 07 22:32:21 2011 +0400
@@ -3,11 +3,21 @@
from tempfile import NamedTemporaryFile
import util

+def get_markups_class(classname):
+ """This ugly helper is to avoid bad untimely import loops."""
+ import markups
+ return getattr(markups, classname)
+
class File(object):
"""Automatical file IO."""
def __new__(cls, file, format="fasta", **kw):
if format == "fasta":
return FastaFile(file, **kw)
+ elif format == 'markup':
+ return MarkupFile(file, **kw)
+ elif format.startswith('markup:'):
+ subformat = format.split(':',1)[1]
+ return MarkupFile(file, format=subformat, **kw)
else:
return EmbossFile(file, format, **kw)

@@ -67,6 +77,138 @@
body = util.remove_each(body, " \n\r\t\v")
yield (name, description, body)

+class MarkupFile(AlignmentFile):
+ """Parser & writer for our own marked alignment file format.
+
+ Marked alignment file consists of a list of records, separated with one or
+ more empty lines. Each record consists of type name, header and optional
+ contents. Type name is a line, containing just one word, describing the
+ record type. Header is a sequence of lines, each in format `key: value`.
+ Content, if present, is separated from header with an empty line.
+
+ Type names and header key names are case-insensitive.
+
+ Known record types now are:
+
+ - `alignment` -- this must be the last record in file for now
+ - `sequence_markup`
+ - `alignment_markup`
+
+ Example::
+
+ sequence_markup
+ sequence_name: cyb5_mouse
+ sequence_description:
+ name: pdb_residue_number
+ type: SequencePDBResidueNumberMarkup
+ markup: -,12,121,122,123,124,13,14,15,-,-,16
+
+ alignment_markup
+ name: geometrical_core
+ type: AlignmentGeometricalCoreMarkup
+ markup: -,-,-,-,+,+,+,-,-,-,+,+,-,-,-,-
+
+ alignment
+ format: fasta
+
+ > cyb5_mouse
+ seqvencemouse
+ """
+
+ _empty_line = ''
+ """Helper attribute for write_empty_line."""
+
+ def write_alignment(self, alignment):
+ """Write alignment to file."""
+ self.write_markups(alignment.markups, 'alignment_markup')
+ for sequence in alignment.sequences:
+ record = {
+ 'sequence_name': sequence.name,
+ 'sequence_description': sequence.description,
+ }
+ self.write_markups(sequence.markups, 'sequence_markup', record)
+ record = {'type': 'alignment', 'format': self.format}
+ self.write_record(record)
+ self.write_empty_line()
+ alignment.to_file(self.file)
+
+ def write_markups(self, markups, type, pre_record={}):
+ """Write a dictionary of markups as series of records."""
+ for name, markup in markups.items():
+ record = markup.to_record()
+ record.update(pre_record)
+ record['type'] = type
+ record['name'] = name
+ record['class'] = markup.__class__.__name__
+ self.write_record(record)
+
+ def write_record(self, record):
+ """Write record to file. Add new line before every but first record."""
+ self.write_empty_line()
+ self.file.write('%s\n' % record['type'])
+ del record['type']
+ for key, value in record.items():
+ self.file.write('%s: %s\n' % (key, value))
+
+ def write_empty_line(self):
+ """Add empty line every time except the first call."""
+ self.file.write(self._empty_line)
+ self._empty_line = '\n'
+
+ def read_alignment(self, alignment):
+ """Read alignment from file."""
+ for record in list(self.read_records(alignment)):
+ handler = getattr(self, 'add_%s' % record['type'])
+ handler(alignment, record)
+
+ def add_sequence_markup(self, alignment, record):
+ """Found sequence markup record in file. Do something about it."""
+ for sequence in alignment.sequences:
+ if sequence.name == record['sequence_name']:
+ description = record.get('sequence_description')
+ if description:
+ assert sequence.description == description
+ cls = get_markups_class(record['class'])
+ cls.from_record(sequence, record, name=record.get('name'))
+ return
+ raise AssertionError("Could not find sequence in alignment")
+
+ def add_alignment_markup(self, alignment, record):
+ """Found alignment markup record in file. Do something about it."""
+ cls = get_markups_class(record['class'])
+ cls.from_record(alignment, record, name=record.get('name'))
+
+ def add_alignment(self, alignment, record):
+ """Found alignment record. It has been handled in read_payload."""
+ pass
+
+ def read_records(self, alignment):
+ """Read records and return them as a list of dicts."""
+ for line in self.file:
+ if line.strip() == "":
+ continue
+ yield self.read_record(alignment, line)
+
+ def read_record(self, alignment, type):
+ """Read record headers and record payload."""
+ type = type.strip().lower()
+ record = {'type': type}
+ for line in self.file:
+ if line.strip() == "":
+ self.read_payload(alignment, record, type)
+ return record
+ key, value = line.split(':', 1)
+ key = key.strip().lower()
+ value = value.strip()
+ record[key] = value
+ return record
+
+ def read_payload(self, alignment, record, type):
+ """Read record payload, if necessary."""
+ if type == 'alignment':
+ io = File(self.file, record.get('format', 'fasta'))
+ io.read_alignment(alignment)
+
class EmbossFile(AlignmentFile):
"""Parser & writer for file formats supported by EMBOSS."""

diff -r 4e0312f00a6c -r 21cfc7897a8f allpy/markups.py
--- a/allpy/markups.py Thu Jul 07 22:27:14 2011 +0400
+++ b/allpy/markups.py Thu Jul 07 22:32:21 2011 +0400
@@ -1,5 +1,27 @@
import base

+class IntMarkupMixin(base.Markup):
+
+ @classmethod
+ def from_record(cls, container, record, name=None):
+ assert record['io-class'] == 'IntMarkup'
+ result = cls(container, name=name)
+ separator = record.get('separator', ',')
+ values = record['markup'].split(separator)
+ assert len(values) == len(result.sorted_keys())
+ for key, value in zip(result.sorted_keys(), values):
+ if value:
+ result[key] = int(value)
+ return result
+
+ def to_record(self):
+ def fmt(value):
+ if value is None:
+ return ""
+ return str(value)
+ values = [fmt(self.get(key)) for key in self.sorted_keys()]
+ return {'markup': ','.join(values), 'io-class': 'IntMarkup'}
+
class SequenceNumberMarkup(base.SequenceMarkup):

name = 'number'
diff -r 4e0312f00a6c -r 21cfc7897a8f test/test_markups.py
--- a/test/test_markups.py Thu Jul 07 22:27:14 2011 +0400
+++ b/test/test_markups.py Thu Jul 07 22:32:21 2011 +0400
@@ -1,3 +1,4 @@
+from StringIO import StringIO
from allpy import base
from allpy import protein
from allpy import markups
@@ -34,4 +35,40 @@
seq[2].my_markup = 4
assert markup[seq[2]] == 4

+def test_io():
+ aln = (protein.Alignment().
+ append_row_from_string('aseq-vence', name='a').
+ append_row_from_string('ase-qret--', name='b')
+ )
+
+ class MySequenceMarkup(base.SequenceMarkup, markups.IntMarkupMixin):
+ pass
+ class MyAlignmentMarkup(base.AlignmentMarkup, markups.IntMarkupMixin):
+ pass
+ markups.MySequenceMarkup = MySequenceMarkup
+ markups.MyAlignmentMarkup = MyAlignmentMarkup
+
+ s = aln.sequences[0]
+ c = aln.columns
+ m1 = MySequenceMarkup(s, name='m1')
+ m1[s[5]] = 5
+ m1[s[3]] = 3
+ m2 = MyAlignmentMarkup(aln, name='m2')
+ m2[c[5]] = 5
+ m2[c[6]] = 6
+
+ file = StringIO()
+ aln.to_file(file, format='markup')
+ import sys
+ file.seek(0)
+
+ out = protein.Alignment().append_file(file, format='markup')
+ s = out.sequences[0]
+ c = out.columns
+ print out.markups
+ print s.markups
+ print out.sequences[1].markups
+ assert s[5].m1 == 5 and s[3].m1 == 3
+ assert out.markups['m2'][c[5]] == 5 and out.markups['m2'][c[6]] == 6
+
# vim: set ts=4 sts=4 sw=4 et: