allpy
changeset 632:cc1959669928
Rewritten of test/usecase2 to make it work. It works, but is still hacky and ugly since it relies on markups, which are not sufficiently designed/implemented yet.
author | Daniil Alexeyevsky <dendik@kodomo.fbb.msu.ru> |
---|---|
date | Thu, 19 May 2011 22:29:38 +0400 |
parents | eb41f6fa0c41 |
children | 434024b451be |
files | test/usecase2.py |
diffstat | 1 files changed, 47 insertions(+), 61 deletions(-) [+] |
line diff
1.1 --- a/test/usecase2.py Thu May 19 22:13:56 2011 +0400 1.2 +++ b/test/usecase2.py Thu May 19 22:29:38 2011 +0400 1.3 @@ -1,72 +1,58 @@ 1.4 -# Fragments are in pair_repeat.fasta 1.5 +import sys 1.6 from allpy import dna 1.7 +from allpy.processors import Needle, Left 1.8 +from allpy.fileio import FastaFile 1.9 +from collections import deque 1.10 + 1.11 width = 15 1.12 -treshold = 14 1.13 +threshold = 10 1.14 1.15 -def my_column_mark(column, threshold): 1.16 - """Helper to mark column (given as dict) by identity.""" 1.17 - count = {} 1.18 - for sequence, monomer in column: 1.19 - code = monomer.code1 1.20 - count[code] = count.get(code, 0) + 1 1.21 - for code in count: 1.22 - if count[code] > threshold: 1.23 - return "+" 1.24 - return "-" 1.25 +def has_identity(column): 1.26 + as_list = column.values() 1.27 + return len(column) == 2 and as_list[0] == as_list[1] 1.28 1.29 -def my_pair_mark(column): 1.30 - """Helper to mark column of 2 sequences (given as list) by identity.""" 1.31 - if column[0] is None or column[1] is None: 1.32 - return "-" 1.33 - if column[0].code1 == column[0].code1: 1.34 - return "+" 1.35 - return "-" 1.36 +def is_good_window(window): 1.37 + sum_id = sum(int(has_identity(column)) for column in window) 1.38 + return len(window) == width and sum_id >= threshold 1.39 1.40 -def find_runs(markup): 1.41 - """Fund long positive runs. 1.42 - 1.43 - This obscure and probably broken function has nothing to do with allpy, 1.44 - so it's presence in the example is unnecessary. 1.45 - """ 1.46 - position = 0 1.47 - count = 0 1.48 - plus_positions=[] 1.49 - for i in range(len(markup)): 1.50 - position += 1 1.51 - if position < width : 1.52 - if markup[i]=="+": 1.53 - count += 1 1.54 - continue 1.55 - if position > width: 1.56 - if markup[i-width] == "+": 1.57 - count -=1 1.58 - if count >= treshold: 1.59 - plus_positions.append(position) 1.60 - if len(plus_positions)==0: 1.61 - raise Exception("No blocks in alignment") 1.62 - 1.63 - blocks=[] 1.64 - start = plus_positions[0]-width + 1 1.65 - stop = plus_positions[0] 1.66 - for p in plus_positions[1:]: 1.67 - if p == stop +1: 1.68 - stop = p 1.69 - continue 1.70 - blocks.append((start,stop)) 1.71 - start = p - width + 1 1.72 - stop = p 1.73 +def find_runs(alignment): 1.74 + window = deque([], width) 1.75 + blocks = [] 1.76 + in_block = False 1.77 + for column in alignment.columns: 1.78 + window.append(column) 1.79 + in_block, was_in_block = is_good_window(window), in_block 1.80 + if in_block and not was_in_block: 1.81 + block = dna.Block.from_alignment(alignment, columns=list(window)) 1.82 + blocks.append(block) 1.83 + elif in_block: 1.84 + block.columns.append(column) 1.85 return blocks 1.86 1.87 +def blocks_markup(alignment, blocks): 1.88 + for column in alignment.columns: 1.89 + column.in_block = "-" 1.90 + for block in blocks: 1.91 + for column in block.columns: 1.92 + column.in_block = "+" 1.93 + return "".join(column.in_block for column in alignment.columns) 1.94 + 1.95 def main(): 1.96 - alignment = dna.Alignment().append_file(open("pair_repeat.fasta")) 1.97 - if len(alignment.sequences) != 2: 1.98 - raise Exception("Input must have exactly 2 sequences!") 1.99 - alignment.realign("needle", gap_open = 0) 1.100 - markup = [] 1.101 - for column in alignment.columns_as_lists(): 1.102 - markup.append(my_pair_mark(column)) 1.103 - markup = alignment.map_columns(my_pair_mark) 1.104 - print find_runs(markup) 1.105 + alignment = dna.Alignment().append_file(sys.stdin) 1.106 + assert len(alignment.sequences) == 2, "Input must have TWO sequences!" 1.107 + alignment.realign(Left()) 1.108 + alignment.realign(Needle()) 1.109 + blocks = find_runs(alignment) 1.110 + 1.111 + for n, block in enumerate(blocks, 1): 1.112 + block.to_file(open("block_%02d.fasta" % n, "w")) 1.113 + 1.114 + alignment.to_file(sys.stdout) 1.115 + FastaFile(sys.stdout).write_string( 1.116 + blocks_markup(alignment, blocks), 1.117 + "markup", 1.118 + "In run with window %s and threshold %s" % (width, threshold) 1.119 + ) 1.120 1.121 try: 1.122 main()