diff --git a/primer_designer/designer.py b/primer_designer/designer.py index 23399be..2852382 100644 --- a/primer_designer/designer.py +++ b/primer_designer/designer.py @@ -3,8 +3,6 @@ import re import requests -from Bio.Alphabet import IUPAC -from Bio.Seq import Seq from Bio.SeqIO import SeqRecord from Bio import SeqIO @@ -115,33 +113,44 @@ def __init__(self, folder=None, taxon_for_codon_usage=None, tm="55", self.clustype = clustype self.amptype = amptype self.email = email - self.designed_primers = [] + self.report = "" def design_primers(self): - if os.path.exists(self.folder): - all_files = os.path.join(self.folder, "*") - alns = glob.glob(all_files) + alns = self.get_alignments() - if alns: - for aln in alns: - if is_fasta(aln): + if alns: + self.call_primer4clades_for_primers(alns) - if self.taxon_for_codon_usage: - aln = self.insert_taxon_in_new_fasta_file(aln) + # Write primers to alignment file + with open("primers_report.txt", "a") as handle: + handle.write(self.report) - print("\nProcessing file \"{0}\"".format(aln)) + print("\nDone.\nAll primers have been saved in the file \"primers_report.txt\"") + return self.report + else: + msg = "\nError! the folder {0} is empty.\n".format(self.folder) + raise AttributeError(msg) - r = self.request_primers(aln) - self.process_response(aln, r.text) + def call_primer4clades_for_primers(self, alns): + for aln in alns: + if is_fasta(aln): - # Write primers to alignment file - SeqIO.write(self.designed_primers, "primers.fasta", "fasta") - print("\nDone.\nAll primers have been saved in the file \"primers.fasta\"") - return self.designed_primers - else: - print("\nError! the folder {0} is empty.\n".format(self.folder)) + if self.taxon_for_codon_usage: + aln = self.insert_taxon_in_new_fasta_file(aln) + + print("\nProcessing file \"{0}\"".format(aln)) + + r = self.request_primers(aln) + self.process_response(aln, r.text) + + def get_alignments(self): + if os.path.exists(self.folder): + all_files = os.path.join(self.folder, "*") + alns = glob.glob(all_files) else: - print("\nError! the folder {0} does not exist.\n".format(self.folder)) + msg = "\nError! the folder {0} does not exist.\n".format(self.folder) + raise AttributeError(msg) + return alns def insert_taxon_in_new_fasta_file(self, aln): """primer4clades infers the codon usage table from the taxon names in the @@ -176,25 +185,87 @@ def process_response(self, aln, response_body): with open("{0}.html".format(aln), "w") as handle: handle.write(response_body) - # Show primer pair to user - html_file = response_body.split("\n") - i = 1 - while i < 4: - for line in html_file: - if "degen_corr" in line: - seq = line.split(" ")[0].strip() - - description = line.split(" ")[2].strip() - - this_id = this_file + "_" + line.split(" ")[1].strip() - this_id += "_" + str(i) - - seq = Seq(seq, IUPAC.ambiguous_dna) - seq_record = SeqRecord(seq) - seq_record.id = this_id - seq_record.description = description - self.designed_primers.append(seq_record) - i += 1 + self.make_report_from_html_file(response_body, this_file) + + def make_report_from_html_file(self, response_body, this_file): + """Processes the results from primer4clades (a html file). + + Makes a report based on the best possible primer pair (with highest + quality and longest amplicon). + """ + amplicon_tuples = self.get_amplicon_data_as_tuples(response_body) + + best_amplicon = self.choose_best_amplicon(amplicon_tuples) + + if best_amplicon is not None: + self.report += """\n\n\ +#################################################### +# Alignment {0} +""".format(this_file) + self.report += self.format_amplicon(best_amplicon) + + def get_amplicon_data_as_tuples(self, response_body): + amplicons = re.findall("(## Amplicon.+) codon", response_body) + primers_codehop = self.group_primers(re.findall("(\w+ codeh)_corr.+\n", response_body)) + primers_relaxed = self.group_primers(re.findall("(\w+ relax)_corr.+\n", response_body)) + primers_degen = self.group_primers(re.findall("(\w+ degen)_corr.+\n", response_body)) + primer_pair_qualities = re.findall("# primer pair.+= ([0-9]+)%\n", response_body) + expected_pcr_product_lengths = re.findall("# expected PCR .+= ([0-9]+)\n", response_body) + forward_temperatures = re.findall("(# fwd: minTm.+)\n", response_body) + reverse_temperatures = re.findall("(# rev: minTm.+)\n", response_body) + + amplicon_tuples = zip(amplicons, primers_codehop, primers_relaxed, + primers_degen, + primer_pair_qualities, + expected_pcr_product_lengths, + forward_temperatures, reverse_temperatures) + return amplicon_tuples + + def format_amplicon(self, best_amplicon): + best_amplicon_formatted = "" + for idx, value in enumerate(best_amplicon): + if idx == 0: + best_amplicon_formatted += "{0}".format(value).replace("##", "# Best") + elif idx in [2, 3]: + best_amplicon_formatted += "\n\n{0}".format(value) + elif idx == 4: + best_amplicon_formatted += "\n\n# primer pair quality = {0}%".format( + value) + elif idx == 5: + best_amplicon_formatted += "\n# expected PCR product length (nt) = {0}".format( + value) + else: + best_amplicon_formatted += "\n{0}".format(value) + return best_amplicon_formatted + + def group_primers(self, my_list): + """Group elements in list by certain number 'n'""" + new_list = [] + n = 2 + for i in range(0, len(my_list), n): + grouped_primers = my_list[i:i + n] + forward_primer = grouped_primers[0].split(" ") + reverse_primer = grouped_primers[1].split(" ") + formatted_primers = ">F_{0}\n{1}".format(forward_primer[1], forward_primer[0]) + formatted_primers += "\n>R_{0}\n{1}".format(reverse_primer[1], reverse_primer[0]) + new_list.append(formatted_primers) + return new_list + + def choose_best_amplicon(self, amplicon_tuples): + """Iterates over amplicon tuples and returns the one with highest quality + and amplicon length. + """ + quality = 0 + amplicon_length = 0 + best_amplicon = None + + for amplicon in amplicon_tuples: + if int(amplicon[4]) >= quality and int(amplicon[5]) >= amplicon_length: + quality = int(amplicon[4]) + amplicon_length = int(amplicon[5]) + best_amplicon = amplicon + + return best_amplicon def request_primers(self, aln): url = "http://floresta.eead.csic.es/primers4clades/primers4clades.cgi" diff --git a/tests/test_primer-designer.py b/tests/test_primer-designer.py index e9b1f0b..be99362 100644 --- a/tests/test_primer-designer.py +++ b/tests/test_primer-designer.py @@ -1,3 +1,4 @@ +import copy import os import unittest @@ -25,12 +26,33 @@ def setUp(self): amptype="dna_GTRG", email="youremail@email.com", ) + self.tmp_folder= os.path.join(TEST_FOLDER, '..', 'tmp_folder') + if not os.path.isdir(self.tmp_folder): + os.mkdir(self.tmp_folder) + self.maxDiff = None def tearDown(self): output_html_file = '{0}.html'.format(ALIGNMENT) if os.path.isfile(output_html_file): os.remove(output_html_file) + if os.path.isfile(self.tmp_folder): + os.remove(self.tmp_folder) + + def test_design_primers_from_empty_folder(self): + pd = copy.copy(self.pd) + pd.folder = self.tmp_folder + self.assertRaises(AttributeError, pd.design_primers) + + def test_get_alignments(self): + result = self.pd.get_alignments() + self.assertTrue(len(result) > 0) + + def test_get_alignments_error(self): + pd = copy.copy(self.pd) + pd.folder = "fake_folder" + self.assertRaises(AttributeError, pd.get_alignments) + @responses.activate def test_request_primers(self): url = "http://floresta.eead.csic.es/primers4clades/primers4clades.cgi" @@ -45,10 +67,32 @@ def test_request_primers(self): resp = self.pd.request_primers(ALIGNMENT) assert resp.content.decode('ascii') == response_html_body - def test_process_response(self): + def test_report_from_html_response(self): + expected = """\n\n\ +#################################################### +# Alignment Ca2.fst +# Best Amplicon 4 +>F_codeh +GACCTGAAAGAAGAACTGggvaargghgc +>R_codeh +CCAGGTGTACCAGCGaadccraacca + +>F_relax +GACCTGAAAGAAGAACTGggvaargghgc +>R_relax +CCAGGTGTACCAgcraadccraacca + +>F_degen +gahytdaargaagaaytdggvaargghgc +>R_degen +ccnggdgtdccdgcraadccraacca + +# primer pair quality = 80% +# expected PCR product length (nt) = 471 +# fwd: minTm = 62.0 maxTm = 67.5 +# rev: minTm = 65.5 maxTm = 68.8""" self.pd.process_response(ALIGNMENT, open(RESPONSE).read()) - self.assertTrue('gayaaytaygahytdaargaagaaytdggvaargghgc' in - [str(seq.seq) for seq in self.pd.designed_primers]) + self.assertEqual(expected, self.pd.report) def test_inserting_taxon_in_fasta_seq_descriptions(self): self.pd.taxon_for_codon_usage = 'Bombyx mori'