Документ взят из кэша поисковой машины. Адрес оригинального документа : http://kodomo.fbb.msu.ru/hg/allpy/file/4f896db3531d/allpy/fileio.py
Дата изменения: Unknown
Дата индексирования: Mon Feb 4 03:28:42 2013
Кодировка:
allpy: 4f896db3531d allpy/fileio.py

allpy

view allpy/fileio.py @ 825:4f896db3531d

allpy.fileio. markup filetype now allows either - or _ to be used in headers interchangeably [closes #89]
author Daniil Alexeyevsky <dendik@kodomo.fbb.msu.ru>
date Fri, 15 Jul 2011 18:01:31 +0400
parents c76ccff11df5
children 18119191a4c8
line source
1 import os
2 from subprocess import Popen, PIPE
3 from tempfile import NamedTemporaryFile
4 import util
6 def get_markups_class(classname):
7 """This ugly helper is to avoid bad untimely import loops."""
8 import markups
9 return getattr(markups, classname)
11 class File(object):
12 """Automatical file IO."""
13 def __new__(cls, file, format="fasta", **kw):
14 if format == "fasta":
15 return FastaFile(file, **kw)
16 elif format == 'markup':
17 return MarkupFile(file, **kw)
18 elif format.startswith('markup:'):
19 subformat = format.split(':',1)[1]
20 return MarkupFile(file, format=subformat, **kw)
21 else:
22 return EmbossFile(file, format, **kw)
24 class AlignmentFile(object):
25 """Some helpers."""
27 def __init__(self, file, format='fasta', gaps='-', wrap_column=70):
28 self.file = file
29 self.format = format
30 self.gaps = gaps
31 self.wrap_column = wrap_column
33 def write_alignment(self, alignment):
34 """Append alignment to the file."""
35 self.write_strings(
36 (row, row.sequence.name, row.sequence.description)
37 for row in alignment.rows_as_strings(self.gaps)
38 )
40 def read_alignment(self, alignment):
41 """Read alignment from the file."""
42 append_row = alignment.append_row_from_string
43 for name, description, body in self.read_strings():
44 append_row(body, name, description, file.name, self.gaps)
46 class FastaFile(AlignmentFile):
47 """Fasta parser & writer."""
49 def write_string(self, string, name, description=''):
50 """Append one sequence to file."""
51 if description:
52 name += " " + description
53 self.file.write(">%s\n" % name)
54 if self.wrap_column:
55 while string:
56 self.file.write(string[:self.wrap_column]+"\n")
57 string = string[self.wrap_column:]
58 else:
59 self.file.write(string+"\n")
60 self.file.flush()
62 def write_strings(self, sequences):
63 """Write sequences to file.
65 Sequences are given as list of tuples (string, name, description).
66 """
67 for string, name, description in sequences:
68 self.write_string(string, name, description)
70 def read_parts(self):
71 """Read parts beginning with > in FASTA file.
73 This is a drop-in replacement for self.file.read().split("\n>")
74 It is required for markup format, which combines parts read with
75 different parsers. Python prohibits combining iterators and file.read
76 methods on the same file.
77 """
78 part = None
79 for line in self.file:
80 if line.startswith(">"):
81 if part: yield part
82 part = ""
83 part += line
84 if part: yield part
86 def read_strings(self):
87 for part in self.read_parts():
88 header, _, body = part.partition("\n")
89 header = header.lstrip(">")
90 name, _, description = header.partition(" ")
91 name = name.strip()
92 description = description.strip()
93 body = util.remove_each(body, " \n\r\t\v")
94 yield (name, description, body)
96 class MarkupFile(AlignmentFile):
97 """Parser & writer for our own marked alignment file format.
99 Marked alignment file consists of a list of records, separated with one or
100 more empty lines. Each record consists of type name, header and optional
101 contents. Type name is a line, containing just one word, describing the
102 record type. Header is a sequence of lines, each in format `key: value`.
103 Content, if present, is separated from header with an empty line.
105 Type names and header key names are case-insensitive and '-' and '_' in
106 them are equivalent.
108 Known record types now are:
110 - `alignment` -- this must be the last record in file for now
111 - `sequence_markup`
112 - `alignment_markup`
114 Example::
116 sequence-markup
117 sequence-name: cyb5_mouse
118 sequence-description:
119 name: pdb_residue_number
120 type: SequencePDBResidueNumberMarkup
121 markup: -,12,121,122,123,124,13,14,15,-,-,16
123 alignment-markup
124 name: geometrical_core
125 type: AlignmentGeometricalCoreMarkup
126 markup: -,-,-,-,+,+,+,-,-,-,+,+,-,-,-,-
128 alignment
129 format: fasta
131 > cyb5_mouse
132 seqvencemouse
133 """
135 _empty_line = ''
136 """Helper attribute for write_empty_line."""
138 def write_alignment(self, alignment):
139 """Write alignment to file."""
140 self.write_markups(alignment.markups, 'alignment_markup')
141 for sequence in alignment.sequences:
142 record = {
143 'sequence_name': sequence.name,
144 'sequence_description': sequence.description,
146 self.write_markups(sequence.markups, 'sequence_markup', record)
147 record = {'type': 'alignment', 'format': self.format}
148 self.write_record(record)
149 self.write_empty_line()
150 alignment.to_file(self.file, format=self.format, gap=self.gaps)
152 def write_markups(self, markups, type, pre_record={}):
153 """Write a dictionary of markups as series of records."""
154 for name, markup in markups.items():
155 record = markup.to_record()
156 record.update(pre_record)
157 record['type'] = type
158 record['name'] = name
159 record['class'] = markup.__class__.__name__
160 self.write_record(record)
162 def write_record(self, record):
163 """Write record to file. Add new line before every but first record."""
164 self.write_empty_line()
165 self.file.write('%s\n' % self.normalize('write', record['type']))
166 del record['type']
167 for key, value in record.items():
168 key = self.normalize('write', key)
169 self.file.write('%s: %s\n' % (key, value))
171 def write_empty_line(self):
172 """Add empty line every time except the first call."""
173 self.file.write(self._empty_line)
174 self._empty_line = '\n'
176 def read_alignment(self, alignment):
177 """Read alignment from file."""
178 for record in list(self.read_records(alignment)):
179 handler = getattr(self, 'add_%s' % record['type'])
180 handler(alignment, record)
182 def add_sequence_markup(self, alignment, record):
183 """Found sequence markup record in file. Do something about it."""
184 for sequence in alignment.sequences:
185 if sequence.name == record['sequence_name']:
186 description = record.get('sequence_description')
187 if description:
188 assert sequence.description == description
189 cls = get_markups_class(record['class'])
190 cls.from_record(sequence, record, name=record.get('name'))
191 return
192 raise AssertionError("Could not find sequence in alignment")
194 def add_alignment_markup(self, alignment, record):
195 """Found alignment markup record in file. Do something about it."""
196 cls = get_markups_class(record['class'])
197 cls.from_record(alignment, record, name=record.get('name'))
199 def add_alignment(self, alignment, record):
200 """Found alignment record. It has been handled in read_payload."""
201 pass
203 def read_records(self, alignment):
204 """Read records and return them as a list of dicts."""
205 for line in self.file:
206 if line.strip() == "":
207 continue
208 yield self.read_record(alignment, line)
210 def read_record(self, alignment, type):
211 """Read record headers and record payload."""
212 type = self.normalize('read', type)
213 record = {'type': type}
214 for line in self.file:
215 if line.strip() == "":
216 self.read_payload(alignment, record, type)
217 return record
218 key, value = line.split(':', 1)
219 key = self.normalize('read', key)
220 value = value.strip()
221 record[key] = value
222 return record
224 def read_payload(self, alignment, record, type):
225 """Read record payload, if necessary."""
226 if type == 'alignment':
227 io = File(self.file, record.get('format', 'fasta'), gaps=self.gaps)
228 io.read_alignment(alignment)
230 @staticmethod
231 def normalize(for_what, string):
232 if for_what == 'read':
233 return string.strip().replace('-', '_').lower()
234 if for_what == 'write':
235 return string.strip().replace('_', '-').capitalize()
237 class EmbossFile(AlignmentFile):
238 """Parser & writer for file formats supported by EMBOSS."""
240 def write_strings(self, sequences):
241 """Write sequences to file."""
242 pipe = Popen(['seqret', 'stdin', '%s::stdout' % self.format],
243 stdin=PIPE, stdout=PIPE
245 FastaFile(pipe.stdin).write_strings(self.fix_sequences(sequences))
246 pipe.stdin.close()
247 for line in pipe.stdout:
248 self.file.write(line)
250 def fix_sequences(self, sequences):
251 """EMBOSS does not permit : in file names. Fix sequences for that."""
252 for name, description, sequence in sequences:
253 yield name.replace(':', '_'), description, sequence
255 def read_strings(self):
256 """Read sequences from file."""
257 pipe = Popen(['seqret', '%s::stdin' % self.format, 'stdout'],
258 stdin=PIPE, stdout=PIPE
260 for line in self.file:
261 pipe.stdin.write(line)
262 pipe.stdin.close()
263 return FastaFile(pipe.stdout).read_strings()
265 # vim: set et ts=4 sts=4 sw=4: