Документ взят из кэша поисковой машины. Адрес оригинального документа : http://kodomo.fbb.msu.ru/hg/allpy/file/d60d8993ac35/allpy/fileio.py
Дата изменения: Unknown
Дата индексирования: Mon Feb 4 03:53:09 2013
Кодировка:
allpy: d60d8993ac35 allpy/fileio.py

allpy

view allpy/fileio.py @ 848:d60d8993ac35

Fixed versions in NEWS; added comment for 1.4.1
author Daniil Alexeyevsky <dendik@kodomo.fbb.msu.ru>
date Wed, 20 Jul 2011 18:29:18 +0400
parents 4f896db3531d
children 6cc007e68af6
line source
1 import os
2 from subprocess import Popen, PIPE
3 from tempfile import NamedTemporaryFile
4 import util
6 def get_markups_class(classname):
7 """This ugly helper is to avoid bad untimely import loops."""
8 import markups
9 return getattr(markups, classname)
11 class File(object):
12 """Automatical file IO."""
13 def __new__(cls, file, format="fasta", **kw):
14 if format == "fasta":
15 return FastaFile(file, **kw)
16 elif format == 'markup':
17 return MarkupFile(file, **kw)
18 elif format.startswith('markup:'):
19 subformat = format.split(':',1)[1]
20 return MarkupFile(file, format=subformat, **kw)
21 else:
22 return EmbossFile(file, format, **kw)
24 class AlignmentFile(object):
25 """Some helpers."""
27 def __init__(self, file, format='fasta', gaps='-', wrap_column=70):
28 self.file = file
29 self.format = format
30 self.gaps = gaps
31 self.wrap_column = wrap_column
33 def write_alignment(self, alignment):
34 """Append alignment to the file."""
35 self.write_strings(
36 (row, row.sequence.name, row.sequence.description)
37 for row in alignment.rows_as_strings(self.gaps)
38 )
40 def read_alignment(self, alignment):
41 """Read alignment from the file."""
42 append_row = alignment.append_row_from_string
43 source = getattr(self.file, 'name', '')
44 for name, description, body in self.read_strings():
45 append_row(body, name, description, source, self.gaps)
47 class FastaFile(AlignmentFile):
48 """Fasta parser & writer."""
50 def write_string(self, string, name, description=''):
51 """Append one sequence to file."""
52 if description:
53 name += " " + description
54 self.file.write(">%s\n" % name)
55 if self.wrap_column:
56 while string:
57 self.file.write(string[:self.wrap_column]+"\n")
58 string = string[self.wrap_column:]
59 else:
60 self.file.write(string+"\n")
61 self.file.flush()
63 def write_strings(self, sequences):
64 """Write sequences to file.
66 Sequences are given as list of tuples (string, name, description).
67 """
68 for string, name, description in sequences:
69 self.write_string(string, name, description)
71 def read_parts(self):
72 """Read parts beginning with > in FASTA file.
74 This is a drop-in replacement for self.file.read().split("\n>")
75 It is required for markup format, which combines parts read with
76 different parsers. Python prohibits combining iterators and file.read
77 methods on the same file.
78 """
79 part = None
80 for line in self.file:
81 if line.startswith(">"):
82 if part: yield part
83 part = ""
84 part += line
85 if part: yield part
87 def read_strings(self):
88 for part in self.read_parts():
89 header, _, body = part.partition("\n")
90 header = header.lstrip(">")
91 name, _, description = header.partition(" ")
92 name = name.strip()
93 description = description.strip()
94 body = util.remove_each(body, " \n\r\t\v")
95 yield (name, description, body)
97 class MarkupFile(AlignmentFile):
98 """Parser & writer for our own marked alignment file format.
100 Marked alignment file consists of a list of records, separated with one or
101 more empty lines. Each record consists of type name, header and optional
102 contents. Type name is a line, containing just one word, describing the
103 record type. Header is a sequence of lines, each in format `key: value`.
104 Content, if present, is separated from header with an empty line.
106 Type names and header key names are case-insensitive and '-' and '_' in
107 them are equivalent.
109 Known record types now are:
111 - `alignment` -- this must be the last record in file for now
112 - `sequence_markup`
113 - `alignment_markup`
115 Example::
117 sequence-markup
118 sequence-name: cyb5_mouse
119 sequence-description:
120 name: pdb_residue_number
121 type: SequencePDBResidueNumberMarkup
122 markup: -,12,121,122,123,124,13,14,15,-,-,16
124 alignment-markup
125 name: geometrical_core
126 type: AlignmentGeometricalCoreMarkup
127 markup: -,-,-,-,+,+,+,-,-,-,+,+,-,-,-,-
129 alignment
130 format: fasta
132 > cyb5_mouse
133 seqvencemouse
134 """
136 _empty_line = ''
137 """Helper attribute for write_empty_line."""
139 def write_alignment(self, alignment):
140 """Write alignment to file."""
141 self.write_markups(alignment.markups, 'alignment_markup')
142 for sequence in alignment.sequences:
143 record = {
144 'sequence_name': sequence.name,
145 'sequence_description': sequence.description,
147 self.write_markups(sequence.markups, 'sequence_markup', record)
148 record = {'type': 'alignment', 'format': self.format}
149 self.write_record(record)
150 self.write_empty_line()
151 alignment.to_file(self.file, format=self.format, gap=self.gaps)
153 def write_markups(self, markups, type, pre_record={}):
154 """Write a dictionary of markups as series of records."""
155 for name, markup in markups.items():
156 record = markup.to_record()
157 record.update(pre_record)
158 record['type'] = type
159 record['name'] = name
160 record['class'] = markup.__class__.__name__
161 self.write_record(record)
163 def write_record(self, record):
164 """Write record to file. Add new line before every but first record."""
165 self.write_empty_line()
166 self.file.write('%s\n' % self.normalize('write', record['type']))
167 del record['type']
168 for key, value in record.items():
169 key = self.normalize('write', key)
170 self.file.write('%s: %s\n' % (key, value))
172 def write_empty_line(self):
173 """Add empty line every time except the first call."""
174 self.file.write(self._empty_line)
175 self._empty_line = '\n'
177 def read_alignment(self, alignment):
178 """Read alignment from file."""
179 for record in list(self.read_records(alignment)):
180 handler = getattr(self, 'add_%s' % record['type'])
181 handler(alignment, record)
183 def add_sequence_markup(self, alignment, record):
184 """Found sequence markup record in file. Do something about it."""
185 for sequence in alignment.sequences:
186 if sequence.name == record['sequence_name']:
187 description = record.get('sequence_description')
188 if description:
189 assert sequence.description == description
190 cls = get_markups_class(record['class'])
191 cls.from_record(sequence, record, name=record.get('name'))
192 return
193 raise AssertionError("Could not find sequence in alignment")
195 def add_alignment_markup(self, alignment, record):
196 """Found alignment markup record in file. Do something about it."""
197 cls = get_markups_class(record['class'])
198 cls.from_record(alignment, record, name=record.get('name'))
200 def add_alignment(self, alignment, record):
201 """Found alignment record. It has been handled in read_payload."""
202 pass
204 def read_records(self, alignment):
205 """Read records and return them as a list of dicts."""
206 for line in self.file:
207 if line.strip() == "":
208 continue
209 yield self.read_record(alignment, line)
211 def read_record(self, alignment, type):
212 """Read record headers and record payload."""
213 type = self.normalize('read', type)
214 record = {'type': type}
215 for line in self.file:
216 if line.strip() == "":
217 self.read_payload(alignment, record, type)
218 return record
219 key, value = line.split(':', 1)
220 key = self.normalize('read', key)
221 value = value.strip()
222 record[key] = value
223 return record
225 def read_payload(self, alignment, record, type):
226 """Read record payload, if necessary."""
227 if type == 'alignment':
228 io = File(self.file, record.get('format', 'fasta'), gaps=self.gaps)
229 io.read_alignment(alignment)
231 @staticmethod
232 def normalize(for_what, string):
233 if for_what == 'read':
234 return string.strip().replace('-', '_').lower()
235 if for_what == 'write':
236 return string.strip().replace('_', '-').capitalize()
238 class EmbossFile(AlignmentFile):
239 """Parser & writer for file formats supported by EMBOSS."""
241 def write_strings(self, sequences):
242 """Write sequences to file."""
243 pipe = Popen(['seqret', 'stdin', '%s::stdout' % self.format],
244 stdin=PIPE, stdout=PIPE
246 FastaFile(pipe.stdin).write_strings(self.fix_sequences(sequences))
247 pipe.stdin.close()
248 for line in pipe.stdout:
249 self.file.write(line)
251 def fix_sequences(self, sequences):
252 """EMBOSS does not permit : in file names. Fix sequences for that."""
253 for name, description, sequence in sequences:
254 yield name.replace(':', '_'), description, sequence
256 def read_strings(self):
257 """Read sequences from file."""
258 pipe = Popen(['seqret', '%s::stdin' % self.format, 'stdout'],
259 stdin=PIPE, stdout=PIPE
261 for line in self.file:
262 pipe.stdin.write(line)
263 pipe.stdin.close()
264 return FastaFile(pipe.stdout).read_strings()
266 # vim: set et ts=4 sts=4 sw=4: