Документ взят из кэша поисковой машины. Адрес оригинального документа : http://kodomo.fbb.msu.ru/hg/allpy/raw-rev/737b52785e5e
Дата изменения: Unknown
Дата индексирования: Tue Oct 2 08:01:43 2012
Кодировка:

# HG changeset patch
# User Daniil Alexeyevsky
# Date 1300988766 -10800
# Node ID 737b52785e5ebed58eaa1f40ecc146e8176416fa
# Parent 9054c1aae06cec5270fe81d49ca610e89cd5b880
Clean rewrite of fileio

diff -r 9054c1aae06c -r 737b52785e5e allpy/base.py
--- a/allpy/base.py Thu Mar 24 20:45:32 2011 +0300
+++ b/allpy/base.py Thu Mar 24 20:46:06 2011 +0300
@@ -201,13 +201,8 @@
`gaps` set), treat them accordingly.
"""
sequences = []
- if format == 'fasta':
- sequences = fileio.FastaIo(file).get_all_strings()
- elif format == 'msf':
- sequences = fileio.MsfIo(file).get_all_strings()
- else:
- raise Exception("We don't support other formats yet")
- for (name, description, body) in sequences:
+ io = fileio.File(file, format)
+ for name, description, body in io.read_strings():
self.append_row_from_string(body, name, description, file.name, gaps)
return self

@@ -217,16 +212,11 @@
if monomer:
return monomer.code1
return gap
- if format == 'fasta':
- io = fileio.FastaIo(file)
- elif format == 'msf':
- io = fileio.MsfIo(file)
- else:
- raise Exception("We don't support other formats yet")
+ io = fileio.File(file, format)
for row in self.rows_as_lists():
seq = row.sequence
line = "".join(map(char, row))
- io.save_string(line, seq.name, seq.description)
+ io.write_string(line, seq.name, seq.description)

# Data access methods for alignment
# =================================
diff -r 9054c1aae06c -r 737b52785e5e allpy/fileio.py
--- a/allpy/fileio.py Thu Mar 24 20:45:32 2011 +0300
+++ b/allpy/fileio.py Thu Mar 24 20:46:06 2011 +0300
@@ -1,107 +1,82 @@
import os
from tempfile import NamedTemporaryFile
-
import util

-class BaseIo(object):
- """ Base class providing alignment/sequence import and export
+class File(object):
+ """Automatical file IO."""
+ def __new__(cls, file, format="fasta"):
+ if format == "fasta":
+ return FastaFile(file)
+ else:
+ return EmbossFile(file, format)

- Data:
- * file - file object
- """
+class FastaFile(object):
+ """Fasta parser & writer."""

- def __init__(self, file):
+ def __init__(self, file, wrap_column=70):
self.file = file
+ self.wrap_column = wrap_column

- def save_string(self, string, name, description=''):
- """ Saves given string to file
-
- Splits long lines to substrings of length=long_line
- To prevent this, set long_line=None
- """
- pass
-
- def get_all_strings(self):
- """Parse fasta file, remove spaces and newlines from sequence bodies.
-
- Return a list of tuples (name, description, sequence_body).
- """
- pass
-
- def get_string(self, name):
- """ return tuple (name, description, string) for sequence with name name """
- for name_test, description, body in self.get_all_strings():
- if name_test == name:
- return (name_test, description, body)
-
-class FastaIo(BaseIo):
- """ Fasta import and export
-
- Additional data:
- * long_line - max length of file line while export
- Splits long lines to substrings of length=long_line
- To prevent this, set long_line=None
- """
-
- def __init__(self, file, long_line=70):
- BaseIo.__init__(self, file)
- self.long_line = long_line
-
- def save_string(self, string, name, description=''):
+ def write_string(self, string, name, description=''):
+ """Append one sequence to file."""
if description:
name += " " + description
self.file.write(">%s\n" % name)
- if self.long_line:
- for i in range(0, len(string) // self.long_line + 1):
- start = i*self.long_line
- end = i*self.long_line + self.long_line
- self.file.write("%s\n" % string[start:end])
+ if self.wrap_column:
+ while string:
+ self.file.write(string[:self.wrap_column]+"\n")
+ string = string[self.wrap_column:]
else:
- self.file.write("%s\n" % string)
+ self.file.write(string+"\n")
+ self.file.flush()

- def get_all_strings(self):
+ def write_strings(self, sequences):
+ """Write sequences to file.
+
+ Sequences are given as list of tuples (string, name, description).
+ """
+ for string, name, description in sequences:
+ self.write_string(string, name, desription)
+
+ def read_strings(self):
for part in self.file.read().split("\n>"):
header, _, body = part.partition("\n")
- header = header.lstrip(">").strip()
+ header = header.lstrip(">")
name, _, description = header.partition(" ")
name = name.strip()
description = description.strip()
body = util.remove_each(body, " \n\r\t\v")
yield (name, description, body)

- def get_string(self, name):
- for name_test, description, body in self.get_all_strings():
- if name_test == name:
- return (name_test, description, body)
+class EmbossFile(object):
+ """Parser & writer for file formats supported by EMBOSS."""

-class MsfIo(BaseIo):
- """ Msf import and export """
+ def __init__(self, file, format):
+ self.file = file
+ self.format = format

- def __init__(self, file):
- BaseIo.__init__(self, file)
+ def write_strings(self, sequences):
+ """Write sequences to file."""
+ # XXX: in case of exceptions files are not closed, nor unlinked
+ tmpfile = NamedTemporaryFile('w', delete=False)
+ FastaFile(tmpfile).write_strings(self.fix_sequences(sequences))
+ tmpfile.close()
+ os.system("seqret %s::%s %s" % (self.format, tmpfile, self.file.name))
+ os.unlink(tmpfile)

- def save_string(self, string, name, description=''):
- name = name.replace(':', '_') # seqret bug
- tmp_fasta = NamedTemporaryFile('w', delete=False)
- tmp_fasta.close()
- os.system("seqret %(msf)s %(fasta)s" % \
- {'msf': self.file.name, 'fasta': tmp_fasta.name})
- tmp_fasta = open(tmp_fasta.name, 'a')
- fasta = FastaIo(tmp_fasta)
- fasta.save_string(string, name, description)
- tmp_fasta.close()
- self.file.close()
- os.system("seqret %(fasta)s msf::%(msf)s" % \
- {'msf': self.file.name, 'fasta': tmp_fasta.name})
- os.unlink(tmp_fasta.name)
- self.file = open(self.file.name)
+ def fix_sequences(self, sequences):
+ """EMBOSS does not permit : in file names. Fix sequences for that."""
+ for name, description, sequence in sequences:
+ yield name.replace(':', '_'), description, sequence

- def get_all_strings(self):
- tmp_fasta = NamedTemporaryFile(delete=False)
- os.system("seqret %(msf)s %(fasta)s" % \
- {'msf': self.file.name, 'fasta': tmp_fasta.name})
- fasta = FastaIo(tmp_fasta)
- strings = list(fasta.get_all_strings())
- os.unlink(tmp_fasta.name)
- return strings
+ def read_strings(self):
+ """Read sequences from file."""
+ # XXX: in case of exceptions files are not closed, nor unlinked
+ tmpfile = NamedTemporaryFile(delete=False)
+ self.file.flush()
+ os.system("seqret %s %s::%s" % (self.file.name, self.format, tmpfile))
+ sequences = FastaFile(tmpfile).read_strings()
+ os.unlink(tmpfile)
+ return sequences

+# vim: set et ts=4 sts=4 sw=4:
diff -r 9054c1aae06c -r 737b52785e5e geometrical_core/geometrical-core
--- a/geometrical_core/geometrical-core Thu Mar 24 20:45:32 2011 +0300
+++ b/geometrical_core/geometrical-core Thu Mar 24 20:46:06 2011 +0300
@@ -104,10 +104,10 @@
IOs = []
if args.f:
block.to_file(args.f, format='fasta')
- IOs.append(fileio.FastaIo(args.f))
+ IOs.append(fileio.File(args.f, format='fasta'))
if args.g:
block.to_file(args.g, format='msf')
- IOs.append(fileio.MsfIo(args.g))
+ IOs.append(fileio.File(args.g, format='msf'))
for i, GC in enumerate(GCs):
for column in GC:
m[column] = True
@@ -116,7 +116,7 @@
description = 'Main geometrical core' if i==0 \
else 'Alternative geometrical core %i' % i
for io in IOs:
- io.save_string(string, name, description)
+ io.write_string(string, name, description)
m.clear()

if args.p: