Source code for truvari.matching

"""
Comparison engine
"""
import logging
from collections import Counter, defaultdict
from functools import total_ordering
import pysam


[docs] @total_ordering class MatchResult(): # pylint: disable=too-many-instance-attributes """ Holds results from a matching operation Attributes ---------- .. list-table:: :header-rows: 1 * - Attribute - Description * - `base` - The base (a.k.a. self) variant * - `comp` - The comp (a.k.a. other) variant * - `state` - Boolean of if variants match * - `seqsim` - Sequence similarity of variants * - `sizesim` - Size similarity of variants * - `ovlpct` - Reciprocal overlap ov variants * - `sizediff` - Base variant var_size minus comp variant var_size * - `st_dist` - Base variant start position minus comp variant start position * - `ed_dist` - Base variant end position minus comp variant end position * - `gt_match` - Boolean of if genotypes match * - `score` - TruScore of matches * - `matid` - Place to put MatchIds, not populated by `truvari.VariantRecord.match` """ __slots__ = ["base", "comp", "base_gt", "base_gt_count", "comp_gt", "comp_gt_count", "state", "seqsim", "sizesim", "ovlpct", "sizediff", "st_dist", "ed_dist", "gt_match", "multi", "score", "matid"] def __init__(self): self.base = None self.comp = None self.base_gt = None self.base_gt_count = 0 self.comp_gt = None self.comp_gt_count = 0 self.matid = None self.seqsim = None self.sizesim = None self.ovlpct = None self.sizediff = None self.st_dist = None self.ed_dist = None self.gt_match = None self.multi = None self.state = False self.score = None
[docs] def calc_score(self): """ Unite the similarity measures and make a score """ if None not in [self.seqsim, self.sizesim, self.ovlpct]: self.score = (self.seqsim + self.sizesim + self.ovlpct) / 3.0 * 100
def __lt__(self, other): # Trues are always worth more if self.state != other.state: return self.state < other.state m_score = self.score if self.score is not None else -float('inf') o_score = other.score if other.score is not None else -float('inf') return m_score < o_score def __eq__(self, other): return self.state == other.state and self.score == other.score def __str__(self): return f'{self.state} {self.score} ->\n {self.base} {self.comp}' def __repr__(self): sc = round(self.score, 3) if self.score is not None else None return f'<truvari.MatchResult ({self.state} {sc})>'
############################ # Parsing and set building # ############################
[docs] def file_zipper(*start_files): """ Zip multiple files to yield their entries in order. The function takes as input tuples of (`key`, `iterable`), where: - `key` is an identifier (used to track which file the yielded entry comes from). - `iterable` is an iterable object, typically a `truvari.VariantFile`. The function iterates through all input files in a coordinated manner, yielding the entries in order. :param start_files: A variable-length argument list of tuples (`key`, `iterable`). :type start_files: tuple :yields: A tuple of (`key`, `truvari.VariantRecord`), where `key` is the file identifier and the second element is the next record from the corresponding file. :rtype: tuple :raises StopIteration: Raised when all input files have been exhausted. **Logs**: - Logs a summary of the zipping process after all files have been processed. """ markers = [] # list of lists: [name, file_handler, top_entry] file_counts = Counter() for name, i in start_files: try: markers.append([name, i, next(i)]) except StopIteration: # For when there are no variants in the file pass while markers: # Get the first entry among each of the files' tops first_idx = 0 for idx, mk in enumerate(markers[1:]): idx += 1 if mk[2].chrom < markers[first_idx][2].chrom or \ (mk[2].chrom == markers[first_idx][2].chrom and mk[2].start < markers[first_idx][2].start): first_idx = idx name, fh, entry = markers[first_idx] file_counts[name] += 1 try: # update this file's top markers[first_idx][2] = next(fh) except StopIteration: # This file is done markers.pop(first_idx) yield name, entry logging.info("Zipped %d variants %s", sum(file_counts.values()), file_counts)
[docs] def chunker(params, *files): """ Given a `truvari.VariantParams` and multiple files, zip them and create chunks Yields tuple of the chunk of calls, and an identifier of the chunk """ call_counts = Counter() chunk_count = 0 cur_chrom = None cur_end = 0 cur_chunk = defaultdict(list) reference = pysam.FastaFile(params.reference) if params.reference is not None else None for key, entry in file_zipper(*files): if entry.filter_call(key == 'base'): cur_chunk['__filtered'].append(entry) call_counts['__filtered'] += 1 continue if entry.is_bnd() and params.bnddist == -1: cur_chunk['__filtered'].append(entry) call_counts['__filtered'] += 1 continue if not entry.is_bnd() and entry.filter_size(key == 'base'): cur_chunk['__filtered'].append(entry) call_counts['__filtered'] += 1 continue # check symbolic, resolve if needed/possible if entry.alts[0].startswith('<'): entry.resolve(reference) new_chrom = cur_chrom and entry.chrom != cur_chrom new_chunk = cur_end and cur_end + params.chunksize < entry.start if new_chunk or new_chrom: chunk_count += 1 yield cur_chunk, chunk_count # Reset cur_chrom = None cur_end = 0 cur_chunk = defaultdict(list) cur_chrom = entry.chrom cur_end = max(entry.end, cur_end) cur_chunk[key].append(entry) call_counts[key] += 1 chunk_count += 1 logging.info("%d chunks of %d variants %s", chunk_count, sum(call_counts.values()), call_counts) yield cur_chunk, chunk_count