"""
Comparison engine
"""
import sys
import types
import logging
from collections import Counter, defaultdict
from functools import total_ordering
import pysam
import truvari
[docs]
@total_ordering
class MatchResult(): # pylint: disable=too-many-instance-attributes
"""
A base/comp match holder
"""
__slots__ = ["base", "comp", "base_gt", "base_gt_count", "comp_gt", "comp_gt_count",
"state", "seqsim", "sizesim", "ovlpct", "sizediff", "st_dist", "ed_dist",
"gt_match", "multi", "score", "matid"]
def __init__(self):
self.base = None
self.comp = None
self.base_gt = None
self.base_gt_count = 0
self.comp_gt = None
self.comp_gt_count = 0
self.matid = None
self.seqsim = None
self.sizesim = None
self.ovlpct = None
self.sizediff = None
self.st_dist = None
self.ed_dist = None
self.gt_match = None
self.multi = None
self.state = False
self.score = 0
[docs]
def calc_score(self):
"""
Unite the similarity measures and make a score
"""
if None not in [self.seqsim, self.sizesim, self.ovlpct]:
self.score = (self.seqsim + self.sizesim + self.ovlpct) / 3.0 * 100
def __lt__(self, other):
# Trues are always worth more
if self.state != other.state:
return self.state < other.state
return self.score < other.score
def __eq__(self, other):
return self.state == other.state and self.score == other.score
def __str__(self):
return f'{self.state} {self.score} ->\n {self.base} {self.comp}'
def __repr__(self):
sc = round(self.score, 3) if self.score is not None else None
return f'<truvari.MatchResult ({self.state} {sc})>'
[docs]
class Matcher():
"""
Holds matching parameters. Allows calls to be checked for filtering and matches to be made
Example
>>> import pysam
>>> import truvari
>>> mat = truvari.Matcher()
>>> mat.params.pctseq = 0
>>> v = pysam.VariantFile('repo_utils/test_files/variants/input1.vcf.gz')
>>> one = next(v); two = next(v)
>>> mat.build_match(one, two)
<truvari.bench.MatchResult (False 2.381)>
Look at `Matcher.make_match_params()` for a list of all params and their defaults
"""
def __init__(self, args=None):
"""
Initalize. args is a Namespace from argparse
"""
if args is not None:
self.params = self.make_match_params_from_args(args)
else:
self.params = self.make_match_params()
self.reference = None
if self.params.reference is not None:
#sys.stderr.write("WARNING `--reference` is no longer recommended for use with bench/collapse ")
#sys.stderr.write("results will be slower and less accurate.\n")
self.reference = pysam.FastaFile(self.params.reference)
[docs]
@staticmethod
def make_match_params():
"""
Makes a simple namespace of matching parameters. Holds defaults
"""
params = types.SimpleNamespace()
params.reference = None
params.refdist = 500
params.pctseq = 0.70
params.minhaplen = 50
params.pctsize = 0.70
params.pctovl = 0.0
params.typeignore = False
params.chunksize = 1000
params.bSample = 0
params.cSample = 0
params.dup_to_ins = False
params.sizemin = 50
params.sizefilt = 30
params.sizemax = 50000
params.passonly = False
params.no_ref = False
params.pick = 'single'
params.ignore_monref = True
params.check_multi = True
params.check_monref = True
return params
[docs]
@staticmethod
def make_match_params_from_args(args):
"""
Makes a simple namespace of matching parameters
"""
ret = types.SimpleNamespace()
ret.reference = args.reference
ret.refdist = args.refdist
ret.pctseq = args.pctseq
ret.minhaplen = args.minhaplen
ret.pctsize = args.pctsize
ret.pctovl = args.pctovl
ret.typeignore = args.typeignore
ret.chunksize = args.chunksize
ret.bSample = args.bSample if args.bSample else 0
ret.cSample = args.cSample if args.cSample else 0
ret.dup_to_ins = args.dup_to_ins if "dup_to_ins" in args else False
# filtering properties
ret.sizemin = args.sizemin
ret.sizefilt = args.sizefilt
ret.sizemax = args.sizemax
ret.passonly = args.passonly
ret.no_ref = args.no_ref
ret.pick = args.pick if "pick" in args else "single"
ret.check_monref = True
ret.check_multi = True
return ret
[docs]
def filter_call(self, entry, base=False):
"""
Returns True if the call should be filtered
Base has different filtering requirements, so let the method know
"""
if self.params.check_monref and entry.alts in (None, '*'): # ignore monomorphic reference
return True
if self.params.check_multi and len(entry.alts) > 1:
logging.error("Cannot compare multi-allelic records. Please split")
logging.error("line %s", str(entry))
sys.exit(10)
if self.params.passonly and truvari.entry_is_filtered(entry):
return True
size = truvari.entry_size(entry)
if (size > self.params.sizemax) \
or (base and size < self.params.sizemin) \
or (not base and size < self.params.sizefilt):
return True
prefix = 'b' if base else 'c'
if self.params.no_ref in ["a", prefix] or self.params.pick == 'ac':
samp = self.params.bSample if base else self.params.cSample
if not truvari.entry_is_present(entry, samp):
return True
return False
[docs]
def build_match(self, base, comp, matid=None, skip_gt=False, short_circuit=False):
"""
Build a MatchResult
if skip_gt, don't do genotype comparison
if short_circuit, return after first failure
"""
ret = MatchResult()
ret.base = base
ret.comp = comp
ret.matid = matid
ret.state = True
if not self.params.typeignore and not truvari.entry_same_variant_type(base, comp, self.params.dup_to_ins):
logging.debug("%s and %s are not the same SVTYPE",
str(base), str(comp))
ret.state = False
if short_circuit:
return ret
bstart, bend = truvari.entry_boundaries(base)
cstart, cend = truvari.entry_boundaries(comp)
if not truvari.overlaps(bstart - self.params.refdist, bend + self.params.refdist, cstart, cend):
logging.debug("%s and %s are not within REFDIST",
str(base), str(comp))
ret.state = False
if short_circuit:
return ret
ret.sizesim, ret.sizediff = truvari.entry_size_similarity(base, comp)
if ret.sizesim < self.params.pctsize:
logging.debug("%s and %s size similarity is too low (%.3f)",
str(base), str(comp), ret.sizesim)
ret.state = False
if short_circuit:
return ret
if not skip_gt:
if "GT" in base.samples[self.params.bSample]:
ret.base_gt = base.samples[self.params.bSample]["GT"]
ret.base_gt_count = sum(1 for _ in ret.base_gt if _ == 1)
if "GT" in comp.samples[self.params.cSample]:
ret.comp_gt = comp.samples[self.params.cSample]["GT"]
ret.comp_gt_count = sum(1 for _ in ret.comp_gt if _ == 1)
ret.gt_match = abs(ret.base_gt_count - ret.comp_gt_count)
ret.ovlpct = truvari.entry_reciprocal_overlap(base, comp)
if ret.ovlpct < self.params.pctovl:
logging.debug("%s and %s overlap percent is too low (%.3f)",
str(base), str(comp), ret.ovlpct)
ret.state = False
if short_circuit:
return ret
if self.params.pctseq > 0:
ret.seqsim = truvari.entry_seq_similarity(
base, comp, self.reference, self.params.minhaplen)
if ret.seqsim < self.params.pctseq:
logging.debug("%s and %s sequence similarity is too low (%.3ff)",
str(base), str(comp), ret.seqsim)
ret.state = False
if short_circuit:
return ret
else:
ret.seqsim = 0
ret.st_dist, ret.ed_dist = bstart - cstart, bend - cend
ret.calc_score()
return ret
############################
# Parsing and set building #
############################
[docs]
def file_zipper(*start_files):
"""
Zip files to yield the entries in order.
Each file must be sorted in the same order.
start_files is a tuple of ('key', iterable)
where key is the identifier (so we know which file the yielded entry came from)
and iterable is usually a pysam.VariantFile
yields key, pysam.VariantRecord
"""
markers = [] # list of lists: [name, file_handler, top_entry]
file_counts = Counter()
for name, i in start_files:
try:
markers.append([name, i, next(i)])
except StopIteration:
# For when there are no variants in the file
pass
while markers:
# Get the first entry among each of the files' tops
first_idx = 0
for idx, mk in enumerate(markers[1:]):
idx += 1
if mk[2].chrom < markers[first_idx][2].chrom or \
(mk[2].chrom == markers[first_idx][2].chrom and mk[2].start < markers[first_idx][2].start):
first_idx = idx
name, fh, entry = markers[first_idx]
file_counts[name] += 1
try:
# update this file's top
markers[first_idx][2] = next(fh)
except StopIteration:
# This file is done
markers.pop(first_idx)
yield name, entry
logging.info("Zipped %d variants %s", sum(
file_counts.values()), file_counts)
[docs]
def chunker(matcher, *files):
"""
Given a Matcher and multiple files, zip them and create chunks
Yields tuple of the chunk of calls, and an identifier of the chunk
"""
call_counts = Counter()
chunk_count = 0
cur_chrom = None
cur_end = 0
cur_chunk = defaultdict(list)
unresolved_warned = False
for key, entry in file_zipper(*files):
if matcher.params.pctseq != 0 and (entry.alts and entry.alts[0].startswith('<')):
if not unresolved_warned:
logging.warning(
"Unresolved SVs (e.g. ALT=<DEL>) are filtered when `--pctseq != 0`")
unresolved_warned = True
cur_chunk['__filtered'].append(entry)
call_counts['__filtered'] += 1
continue
new_chrom = cur_chrom and entry.chrom != cur_chrom
new_chunk = cur_end and cur_end + matcher.params.chunksize < entry.start
if new_chunk or new_chrom:
chunk_count += 1
yield cur_chunk, chunk_count
# Reset
cur_chrom = None
cur_end = 0
cur_chunk = defaultdict(list)
cur_chrom = entry.chrom
if not matcher.filter_call(entry, key == 'base'):
cur_end = max(entry.stop, cur_end)
cur_chunk[key].append(entry)
call_counts[key] += 1
else:
cur_chunk['__filtered'].append(entry)
call_counts['__filtered'] += 1
chunk_count += 1
logging.info("%d chunks of %d variants %s", chunk_count,
sum(call_counts.values()), call_counts)
yield cur_chunk, chunk_count