Документ взят из кэша поисковой машины. Адрес оригинального документа : http://kodomo.fbb.msu.ru/hg/allpy/file/cfcbd13f6761/allpy/fileio.py
Дата изменения: Unknown
Дата индексирования: Mon Feb 4 03:28:33 2013
Кодировка:
allpy: cfcbd13f6761 allpy/fileio.py

allpy

view allpy/fileio.py @ 864:cfcbd13f6761

Added fileio.BioPythonFile as a method to parse unknown file formats [closes #106] Biopython can parse more formats than EMBOSS, but surprisingly, it cannot do msf. Also, there is no way to see in the current tests, where a test used biopython or emboss for a particular IO task. This will likely be fixed with the 1.5.0 release with the new fileio system. For now, Biopython has precedence over EMBOSS, so an IO test of msf tests EMBOSS, and IO test of Stockholm tests Biopython.
author Daniil Alexeyevsky <dendik@kodomo.fbb.msu.ru>
date Mon, 25 Jul 2011 14:40:41 +0400
parents 6cc007e68af6
children
line source
1 import os
2 from subprocess import Popen, PIPE
3 from tempfile import NamedTemporaryFile
4 import util
6 bio_python = False
7 try:
8 from Bio import Seq, SeqRecord, Align, SeqIO, AlignIO, Alphabet
9 bio_python = True
10 except ImportError:
11 pass
13 def get_markups_class(classname):
14 """This ugly helper is to avoid bad untimely import loops."""
15 import markups
16 return getattr(markups, classname)
18 class File(object):
19 """Automatical file IO."""
20 def __new__(cls, file, format="fasta", **kw):
21 if format == "fasta":
22 return FastaFile(file, **kw)
23 elif format == 'markup':
24 return MarkupFile(file, **kw)
25 elif format.startswith('markup:'):
26 subformat = format.split(':',1)[1]
27 return MarkupFile(file, format=subformat, **kw)
28 elif bio_python and BioPythonFile.supports(format):
29 return BioPythonFile(file, format, **kw)
30 else:
31 return EmbossFile(file, format, **kw)
33 class AlignmentFile(object):
34 """Some helpers."""
36 def __init__(self, file, format='fasta', gaps='-', wrap_column=70):
37 self.file = file
38 self.format = format
39 self.gaps = gaps
40 self.wrap_column = wrap_column
42 def write_alignment(self, alignment):
43 """Append alignment to the file."""
44 self.write_strings(
45 (row, row.sequence.name, row.sequence.description)
46 for row in alignment.rows_as_strings(self.gaps)
47 )
49 def read_alignment(self, alignment):
50 """Read alignment from the file."""
51 append_row = alignment.append_row_from_string
52 source = getattr(self.file, 'name', '')
53 for name, description, body in self.read_strings():
54 append_row(body, name, description, source, self.gaps)
56 class FastaFile(AlignmentFile):
57 """Fasta parser & writer."""
59 def write_string(self, string, name, description=''):
60 """Append one sequence to file."""
61 if description:
62 name += " " + description
63 self.file.write(">%s\n" % name)
64 if self.wrap_column:
65 while string:
66 self.file.write(string[:self.wrap_column]+"\n")
67 string = string[self.wrap_column:]
68 else:
69 self.file.write(string+"\n")
70 self.file.flush()
72 def write_strings(self, sequences):
73 """Write sequences to file.
75 Sequences are given as list of tuples (string, name, description).
76 """
77 for string, name, description in sequences:
78 self.write_string(string, name, description)
80 def read_parts(self):
81 """Read parts beginning with > in FASTA file.
83 This is a drop-in replacement for self.file.read().split("\n>")
84 It is required for markup format, which combines parts read with
85 different parsers. Python prohibits combining iterators and file.read
86 methods on the same file.
87 """
88 part = None
89 for line in self.file:
90 if line.startswith(">"):
91 if part: yield part
92 part = ""
93 part += line
94 if part: yield part
96 def read_strings(self):
97 for part in self.read_parts():
98 header, _, body = part.partition("\n")
99 header = header.lstrip(">")
100 name, _, description = header.partition(" ")
101 name = name.strip()
102 description = description.strip()
103 body = util.remove_each(body, " \n\r\t\v")
104 yield (name, description, body)
106 class MarkupFile(AlignmentFile):
107 """Parser & writer for our own marked alignment file format.
109 Marked alignment file consists of a list of records, separated with one or
110 more empty lines. Each record consists of type name, header and optional
111 contents. Type name is a line, containing just one word, describing the
112 record type. Header is a sequence of lines, each in format `key: value`.
113 Content, if present, is separated from header with an empty line.
115 Type names and header key names are case-insensitive and '-' and '_' in
116 them are equivalent.
118 Known record types now are:
120 - `alignment` -- this must be the last record in file for now
121 - `sequence_markup`
122 - `alignment_markup`
124 Example::
126 sequence-markup
127 sequence-name: cyb5_mouse
128 sequence-description:
129 name: pdb_residue_number
130 type: SequencePDBResidueNumberMarkup
131 markup: -,12,121,122,123,124,13,14,15,-,-,16
133 alignment-markup
134 name: geometrical_core
135 type: AlignmentGeometricalCoreMarkup
136 markup: -,-,-,-,+,+,+,-,-,-,+,+,-,-,-,-
138 alignment
139 format: fasta
141 > cyb5_mouse
142 seqvencemouse
143 """
145 _empty_line = ''
146 """Helper attribute for write_empty_line."""
148 def write_alignment(self, alignment):
149 """Write alignment to file."""
150 self.write_markups(alignment.markups, 'alignment_markup')
151 for sequence in alignment.sequences:
152 record = {
153 'sequence_name': sequence.name,
154 'sequence_description': sequence.description,
156 self.write_markups(sequence.markups, 'sequence_markup', record)
157 record = {'type': 'alignment', 'format': self.format}
158 self.write_record(record)
159 self.write_empty_line()
160 alignment.to_file(self.file, format=self.format, gap=self.gaps)
162 def write_markups(self, markups, type, pre_record={}):
163 """Write a dictionary of markups as series of records."""
164 for name, markup in markups.items():
165 if not markup.save:
166 continue
167 record = markup.to_record()
168 record.update(pre_record)
169 record['type'] = type
170 record['name'] = name
171 record['class'] = markup.__class__.__name__
172 self.write_record(record)
174 def write_record(self, record):
175 """Write record to file. Add new line before every but first record."""
176 self.write_empty_line()
177 self.file.write('%s\n' % self.normalize('write', record['type']))
178 del record['type']
179 for key, value in record.items():
180 key = self.normalize('write', key)
181 self.file.write('%s: %s\n' % (key, value))
183 def write_empty_line(self):
184 """Add empty line every time except the first call."""
185 self.file.write(self._empty_line)
186 self._empty_line = '\n'
188 def read_alignment(self, alignment):
189 """Read alignment from file."""
190 for record in list(self.read_records(alignment)):
191 handler = getattr(self, 'add_%s' % record['type'])
192 handler(alignment, record)
194 def add_sequence_markup(self, alignment, record):
195 """Found sequence markup record in file. Do something about it."""
196 for sequence in alignment.sequences:
197 if sequence.name == record['sequence_name']:
198 description = record.get('sequence_description')
199 if description:
200 assert sequence.description == description
201 cls = get_markups_class(record['class'])
202 cls.from_record(sequence, record, name=record.get('name'))
203 return
204 raise AssertionError("Could not find sequence in alignment")
206 def add_alignment_markup(self, alignment, record):
207 """Found alignment markup record in file. Do something about it."""
208 cls = get_markups_class(record['class'])
209 cls.from_record(alignment, record, name=record.get('name'))
211 def add_alignment(self, alignment, record):
212 """Found alignment record. It has been handled in read_payload."""
213 pass
215 def read_records(self, alignment):
216 """Read records and return them as a list of dicts."""
217 for line in self.file:
218 if line.strip() == "":
219 continue
220 yield self.read_record(alignment, line)
222 def read_record(self, alignment, type):
223 """Read record headers and record payload."""
224 type = self.normalize('read', type)
225 record = {'type': type}
226 for line in self.file:
227 if line.strip() == "":
228 self.read_payload(alignment, record, type)
229 return record
230 key, value = line.split(':', 1)
231 key = self.normalize('read', key)
232 value = value.strip()
233 record[key] = value
234 return record
236 def read_payload(self, alignment, record, type):
237 """Read record payload, if necessary."""
238 if type == 'alignment':
239 io = File(self.file, record.get('format', 'fasta'), gaps=self.gaps)
240 io.read_alignment(alignment)
242 @staticmethod
243 def normalize(for_what, string):
244 if for_what == 'read':
245 return string.strip().replace('-', '_').lower()
246 if for_what == 'write':
247 return string.strip().replace('_', '-').capitalize()
249 class BioPythonFile(AlignmentFile):
250 """Parser & writer for file formats supporte by Bio python."""
252 @staticmethod
253 def supports(format):
254 """Tell what formats this method supports."""
255 return (
256 format in AlignIO._FormatToWriter
257 or format in SeqIO._FormatToWriter
260 def write_strings(self, sequences):
261 """Write sequences to file."""
262 aln = Align.MultipleSeqAlignment([
263 SeqRecord.SeqRecord(
264 Seq.Seq(body, Alphabet.single_letter_alphabet),
265 id=name,
266 description=description
268 for body, name, description in sequences
269 ])
270 AlignIO.write(aln, self.file, self.format)
272 def read_strings(self):
273 """Read sequences from file."""
274 for seq in AlignIO.read(self.file, self.format):
275 yield seq.id, seq.description, str(seq.seq)
277 class EmbossFile(AlignmentFile):
278 """Parser & writer for file formats supported by EMBOSS."""
280 def write_strings(self, sequences):
281 """Write sequences to file."""
282 pipe = Popen(['seqret', 'stdin', '%s::stdout' % self.format],
283 stdin=PIPE, stdout=PIPE
285 FastaFile(pipe.stdin).write_strings(self.fix_sequences(sequences))
286 pipe.stdin.close()
287 for line in pipe.stdout:
288 self.file.write(line)
290 def fix_sequences(self, sequences):
291 """EMBOSS does not permit : in file names. Fix sequences for that."""
292 for name, description, sequence in sequences:
293 yield name.replace(':', '_'), description, sequence
295 def read_strings(self):
296 """Read sequences from file."""
297 pipe = Popen(['seqret', '%s::stdin' % self.format, 'stdout'],
298 stdin=PIPE, stdout=PIPE
300 for line in self.file:
301 pipe.stdin.write(line)
302 pipe.stdin.close()
303 return FastaFile(pipe.stdout).read_strings()
305 # vim: set et ts=4 sts=4 sw=4: