-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathd_process_task.py
58 lines (41 loc) · 1.68 KB
/
d_process_task.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# Distributed Processing Module Worker Task
__author__ = 'Sandesh'
from celery import Celery
from CustomClasses.ATree import *
from Bio import SeqIO, Seq
import os
import subprocess
from itertools import product
def break_sequence(sequence, depth):
length = len(sequence)
for i in range(length):
if i + depth <= length:
yield sequence[i:i + depth]
else:
yield sequence[i:]
app = Celery('tasks', broker='redis://192.168.6.4:6379/0', backend='redis://192.168.6.4:6379/0')
app.conf.CELERYD_LOG_FORMAT = '[%(processName)s] %(message)s'
@app.task
def process(filename):
print("Processing " + filename)
sequence_record_list = []
# print(os.getcwd())
for record in SeqIO.parse(filename, "fasta"):
sequence_record_list.append(record.seq)
print("Sequences Extracted!")
sequence_record = ''.join(str(e) for e in sequence_record_list)
atree = ATree()
print(str(len(sequence_record)) + "-->" + str(atree))
for subsequence_chunks in break_sequence(sequence_record, 8):
atree.process_subsequence(subsequence_chunks)
atree.dump_to_file(filename + "_TREE")
print("Ensuring correct File System Navigation: " + os.getcwd())
atree.pickle_into_file("GenomeDataset/Processing/" + os.path.basename(filename) + "_pTREE")
subprocess.call(["rsync", "-az", "GenomeDataset/Processing/",
"[email protected]:~/Documents/master-GSAFv2/gsaf-2.0/GenomeDataset/Processing/"])
return len(sequence_record)
@app.task
def unique_pattern_generation(depth):
print("I am doing - " + str(depth))
temp_result = [''.join(x) for x in (product(*['ACGT'] * depth))]
return temp_result