Документ взят из кэша поисковой машины. Адрес оригинального документа : http://kodomo.fbb.msu.ru/hg/allpy/file/c421263ef000/allpy/fileio.py
Дата изменения: Unknown
Дата индексирования: Mon Feb 4 03:48:23 2013
Кодировка:
allpy: c421263ef000 allpy/fileio.py

allpy

view allpy/fileio.py @ 736:c421263ef000

html viewer of blocks: now compatible with jquery-1.4.2 (jquery-1.4.2 is used in ubuntu maverick) Methods .mouseover( [eventData,] handler(eventObject) ) and .mouseleave( [eventData,] handler(eventObject) ) were added in jquery-1.4.3, but .bind( eventType, [eventData,] handler(eventObject) ) was in jquery-1.0. Maybe viewer is compatible with jquery-1.0, but I have not tested
author boris <bnagaev@gmail.com>
date Fri, 08 Jul 2011 21:13:54 +0200
parents 80043822a41e
children d16e8559b6dd
line source
1 import os
2 from subprocess import Popen, PIPE
3 from tempfile import NamedTemporaryFile
4 import util
6 def get_markups_class(classname):
7 """This ugly helper is to avoid bad untimely import loops."""
8 import markups
9 return getattr(markups, classname)
11 class File(object):
12 """Automatical file IO."""
13 def __new__(cls, file, format="fasta", **kw):
14 if format == "fasta":
15 return FastaFile(file, **kw)
16 elif format == 'markup':
17 return MarkupFile(file, **kw)
18 elif format.startswith('markup:'):
19 subformat = format.split(':',1)[1]
20 return MarkupFile(file, format=subformat, **kw)
21 else:
22 return EmbossFile(file, format, **kw)
24 class AlignmentFile(object):
25 """Some helpers."""
27 def __init__(self, file, format='fasta', gaps='-', wrap_column=70):
28 self.file = file
29 self.format = format
30 self.gaps = gaps
31 self.wrap_column = wrap_column
33 def write_alignment(self, alignment):
34 """Append alignment to the file."""
35 self.write_strings(
36 (row, row.sequence.name, row.sequence.description)
37 for row in alignment.rows_as_strings(self.gaps)
38 )
40 def read_alignment(self, alignment):
41 """Read alignment from the file."""
42 append_row = alignment.append_row_from_string
43 for name, description, body in self.read_strings():
44 append_row(body, name, description, file.name, self.gaps)
46 class FastaFile(AlignmentFile):
47 """Fasta parser & writer."""
49 def write_string(self, string, name, description=''):
50 """Append one sequence to file."""
51 if description:
52 name += " " + description
53 self.file.write(">%s\n" % name)
54 if self.wrap_column:
55 while string:
56 self.file.write(string[:self.wrap_column]+"\n")
57 string = string[self.wrap_column:]
58 else:
59 self.file.write(string+"\n")
60 self.file.flush()
62 def write_strings(self, sequences):
63 """Write sequences to file.
65 Sequences are given as list of tuples (string, name, description).
66 """
67 for string, name, description in sequences:
68 self.write_string(string, name, description)
70 def read_strings(self):
71 for part in self.file.read().split("\n>"):
72 header, _, body = part.partition("\n")
73 header = header.lstrip(">")
74 name, _, description = header.partition(" ")
75 name = name.strip()
76 description = description.strip()
77 body = util.remove_each(body, " \n\r\t\v")
78 yield (name, description, body)
80 class MarkupFile(AlignmentFile):
81 """Parser & writer for our own marked alignment file format.
83 Marked alignment file consists of a list of records, separated with one or
84 more empty lines. Each record consists of type name, header and optional
85 contents. Type name is a line, containing just one word, describing the
86 record type. Header is a sequence of lines, each in format `key: value`.
87 Content, if present, is separated from header with an empty line.
89 Type names and header key names are case-insensitive.
91 Known record types now are:
93 - `alignment` -- this must be the last record in file for now
94 - `sequence_markup`
95 - `alignment_markup`
97 Example::
99 sequence_markup
100 sequence_name: cyb5_mouse
101 sequence_description:
102 name: pdb_residue_number
103 type: SequencePDBResidueNumberMarkup
104 markup: -,12,121,122,123,124,13,14,15,-,-,16
106 alignment_markup
107 name: geometrical_core
108 type: AlignmentGeometricalCoreMarkup
109 markup: -,-,-,-,+,+,+,-,-,-,+,+,-,-,-,-
111 alignment
112 format: fasta
114 > cyb5_mouse
115 seqvencemouse
116 """
118 _empty_line = ''
119 """Helper attribute for write_empty_line."""
121 def write_alignment(self, alignment):
122 """Write alignment to file."""
123 self.write_markups(alignment.markups, 'alignment_markup')
124 for sequence in alignment.sequences:
125 record = {
126 'sequence_name': sequence.name,
127 'sequence_description': sequence.description,
129 self.write_markups(sequence.markups, 'sequence_markup', record)
130 record = {'type': 'alignment', 'format': self.format}
131 self.write_record(record)
132 self.write_empty_line()
133 alignment.to_file(self.file)
135 def write_markups(self, markups, type, pre_record={}):
136 """Write a dictionary of markups as series of records."""
137 for name, markup in markups.items():
138 record = markup.to_record()
139 record.update(pre_record)
140 record['type'] = type
141 record['name'] = name
142 record['class'] = markup.__class__.__name__
143 self.write_record(record)
145 def write_record(self, record):
146 """Write record to file. Add new line before every but first record."""
147 self.write_empty_line()
148 self.file.write('%s\n' % record['type'])
149 del record['type']
150 for key, value in record.items():
151 self.file.write('%s: %s\n' % (key, value))
153 def write_empty_line(self):
154 """Add empty line every time except the first call."""
155 self.file.write(self._empty_line)
156 self._empty_line = '\n'
158 def read_alignment(self, alignment):
159 """Read alignment from file."""
160 for record in list(self.read_records(alignment)):
161 handler = getattr(self, 'add_%s' % record['type'])
162 handler(alignment, record)
164 def add_sequence_markup(self, alignment, record):
165 """Found sequence markup record in file. Do something about it."""
166 for sequence in alignment.sequences:
167 if sequence.name == record['sequence_name']:
168 description = record.get('sequence_description')
169 if description:
170 assert sequence.description == description
171 cls = get_markups_class(record['class'])
172 cls.from_record(sequence, record, name=record.get('name'))
173 return
174 raise AssertionError("Could not find sequence in alignment")
176 def add_alignment_markup(self, alignment, record):
177 """Found alignment markup record in file. Do something about it."""
178 cls = get_markups_class(record['class'])
179 cls.from_record(alignment, record, name=record.get('name'))
181 def add_alignment(self, alignment, record):
182 """Found alignment record. It has been handled in read_payload."""
183 pass
185 def read_records(self, alignment):
186 """Read records and return them as a list of dicts."""
187 for line in self.file:
188 if line.strip() == "":
189 continue
190 yield self.read_record(alignment, line)
192 def read_record(self, alignment, type):
193 """Read record headers and record payload."""
194 type = type.strip().lower()
195 record = {'type': type}
196 for line in self.file:
197 if line.strip() == "":
198 self.read_payload(alignment, record, type)
199 return record
200 key, value = line.split(':', 1)
201 key = key.strip().lower()
202 value = value.strip()
203 record[key] = value
204 return record
206 def read_payload(self, alignment, record, type):
207 """Read record payload, if necessary."""
208 if type == 'alignment':
209 io = File(self.file, record.get('format', 'fasta'))
210 io.read_alignment(alignment)
212 class EmbossFile(AlignmentFile):
213 """Parser & writer for file formats supported by EMBOSS."""
215 def write_strings(self, sequences):
216 """Write sequences to file."""
217 pipe = Popen(['seqret', 'stdin', '%s::stdout' % self.format],
218 stdin=PIPE, stdout=PIPE
220 FastaFile(pipe.stdin).write_strings(self.fix_sequences(sequences))
221 pipe.stdin.close()
222 for line in pipe.stdout:
223 self.file.write(line)
225 def fix_sequences(self, sequences):
226 """EMBOSS does not permit : in file names. Fix sequences for that."""
227 for name, description, sequence in sequences:
228 yield name.replace(':', '_'), description, sequence
230 def read_strings(self):
231 """Read sequences from file."""
232 pipe = Popen(['seqret', '%s::stdin' % self.format, 'stdout'],
233 stdin=PIPE, stdout=PIPE
235 for line in self.file:
236 pipe.stdin.write(line)
237 pipe.stdin.close()
238 return FastaFile(pipe.stdout).read_strings()
240 # vim: set et ts=4 sts=4 sw=4: