Документ взят из кэша поисковой машины. Адрес оригинального документа : http://kodomo.fbb.msu.ru/hg/allpy/rev/cc1959669928
Дата изменения: Unknown
Дата индексирования: Tue Oct 2 00:57:01 2012
Кодировка:
allpy: cc1959669928

allpy

changeset 632:cc1959669928

Rewritten of test/usecase2 to make it work. It works, but is still hacky and ugly since it relies on markups, which are not sufficiently designed/implemented yet.
author Daniil Alexeyevsky <dendik@kodomo.fbb.msu.ru>
date Thu, 19 May 2011 22:29:38 +0400
parents eb41f6fa0c41
children 434024b451be
files test/usecase2.py
diffstat 1 files changed, 47 insertions(+), 61 deletions(-) [+]
line diff
     1.1 --- a/test/usecase2.py	Thu May 19 22:13:56 2011 +0400
     1.2 +++ b/test/usecase2.py	Thu May 19 22:29:38 2011 +0400
     1.3 @@ -1,72 +1,58 @@
     1.4 -# Fragments are in pair_repeat.fasta
     1.5 +import sys
     1.6  from allpy import dna
     1.7 +from allpy.processors import Needle, Left
     1.8 +from allpy.fileio import FastaFile
     1.9 +from collections import deque
    1.10 +
    1.11  width = 15
    1.12 -treshold = 14
    1.13 +threshold = 10
    1.14  
    1.15 -def my_column_mark(column, threshold):
    1.16 -    """Helper to mark column (given as dict) by identity."""
    1.17 -    count = {}
    1.18 -    for sequence, monomer in column:
    1.19 -        code = monomer.code1
    1.20 -        count[code] = count.get(code, 0) + 1
    1.21 -    for code in count:
    1.22 -        if count[code] > threshold:
    1.23 -            return "+"
    1.24 -    return "-"
    1.25 +def has_identity(column):
    1.26 +    as_list = column.values()
    1.27 +    return len(column) == 2 and as_list[0] == as_list[1]
    1.28  
    1.29 -def my_pair_mark(column):
    1.30 -    """Helper to mark column of 2 sequences (given as list) by identity."""
    1.31 -    if column[0] is None or column[1] is None:
    1.32 -        return "-"
    1.33 -    if column[0].code1 == column[0].code1:   
    1.34 -        return "+"
    1.35 -    return "-"
    1.36 +def is_good_window(window):
    1.37 +    sum_id = sum(int(has_identity(column)) for column in window)
    1.38 +    return len(window) == width and sum_id >= threshold
    1.39  
    1.40 -def find_runs(markup):
    1.41 -    """Fund long positive runs.
    1.42 -    
    1.43 -    This obscure and probably broken function has nothing to do with allpy,
    1.44 -    so it's presence in the example is unnecessary.
    1.45 -    """
    1.46 -    position = 0
    1.47 -    count = 0
    1.48 -    plus_positions=[]
    1.49 -    for i in range(len(markup)):
    1.50 -        position += 1
    1.51 -        if position < width :
    1.52 -            if markup[i]=="+":
    1.53 -                count += 1
    1.54 -            continue 
    1.55 -        if position > width:
    1.56 -            if markup[i-width] == "+":
    1.57 -                count -=1 
    1.58 -        if count >= treshold:
    1.59 -            plus_positions.append(position)
    1.60 -    if len(plus_positions)==0:
    1.61 -        raise Exception("No blocks in alignment")
    1.62 -
    1.63 -    blocks=[]
    1.64 -    start = plus_positions[0]-width + 1
    1.65 -    stop = plus_positions[0]
    1.66 -    for p in plus_positions[1:]:
    1.67 -        if p == stop +1:
    1.68 -            stop = p
    1.69 -            continue
    1.70 -        blocks.append((start,stop))
    1.71 -        start = p - width + 1
    1.72 -        stop = p
    1.73 +def find_runs(alignment):
    1.74 +    window = deque([], width)
    1.75 +    blocks = []
    1.76 +    in_block = False
    1.77 +    for column in alignment.columns:
    1.78 +        window.append(column)
    1.79 +        in_block, was_in_block = is_good_window(window), in_block
    1.80 +        if in_block and not was_in_block:
    1.81 +            block = dna.Block.from_alignment(alignment, columns=list(window))
    1.82 +            blocks.append(block)
    1.83 +        elif in_block:
    1.84 +            block.columns.append(column)
    1.85      return blocks
    1.86  
    1.87 +def blocks_markup(alignment, blocks):
    1.88 +    for column in alignment.columns:
    1.89 +        column.in_block = "-"
    1.90 +    for block in blocks:
    1.91 +        for column in block.columns:
    1.92 +            column.in_block = "+"
    1.93 +    return "".join(column.in_block for column in alignment.columns)
    1.94 +
    1.95  def main():
    1.96 -    alignment = dna.Alignment().append_file(open("pair_repeat.fasta"))
    1.97 -    if len(alignment.sequences) != 2:
    1.98 -        raise Exception("Input must have exactly 2 sequences!")
    1.99 -    alignment.realign("needle", gap_open = 0)
   1.100 -    markup = []
   1.101 -    for column in alignment.columns_as_lists():
   1.102 -      markup.append(my_pair_mark(column))
   1.103 -    markup = alignment.map_columns(my_pair_mark)
   1.104 -    print find_runs(markup)
   1.105 +    alignment = dna.Alignment().append_file(sys.stdin)
   1.106 +    assert len(alignment.sequences) == 2, "Input must have TWO sequences!"
   1.107 +    alignment.realign(Left())
   1.108 +    alignment.realign(Needle())
   1.109 +    blocks = find_runs(alignment)
   1.110 +
   1.111 +    for n, block in enumerate(blocks, 1):
   1.112 +        block.to_file(open("block_%02d.fasta" % n, "w"))
   1.113 +
   1.114 +    alignment.to_file(sys.stdout)
   1.115 +    FastaFile(sys.stdout).write_string(
   1.116 +      blocks_markup(alignment, blocks),
   1.117 +      "markup",
   1.118 +      "In run with window %s and threshold %s" % (width, threshold)
   1.119 +    )
   1.120  
   1.121  try:
   1.122      main()