"""
Collection of methods with event helpers
that compare events, coordinates, or transform vcf entries
"""
import re
import hashlib
import edlib
import truvari
[docs]
def coords_within(qstart, qend, rstart, rend, end_within):
"""
Returns if a span is within the provided [start, end). All coordinates assumed 0-based
One exception is made for REPL types with anchor bases. This is for TR callers and GIABTR benchmark
where variants will span the entire includebed region.
:param `qstart`: query start position
:type `qstart`: integer
:param `qend`: query end position
:type `qend`: integer
:param `start`: start of span
:type `start`: integer
:param `end`: end of span
:type `end`: integer
:param `end_within`: if true, qend <= rend, else qend < rend
:type `end_within`: bool
:return: If the coordinates are within the span
:rtype: bool
"""
# REPL types, probably
if qstart == rstart - 1 and qend == rend:
return True
if end_within:
ending = qend <= rend
else:
ending = qend < rend
return qstart >= rstart and ending
[docs]
def create_pos_haplotype(a1, a2, ref, min_len=0):
"""
Create haplotypes of two allele's regions that are assumed to be overlapping
:param `a1`: tuple of chrom, start, end, seq
:type `a1`: tuple
:param `a2`: tuple of chrom, start, end, seq
:type `a2`: tuple
:param `ref`: Reference genome
:type `ref`: :class:`pysam.FastaFile`
:param `min_len`: Minimum length of the haplotype sequence to create
:type `min_len`: int, optional
:return: allele haplotype sequences created
:rtupe: tuple (string, string)
"""
chrom, a1_start, a1_end, a1_seq = a1
chrom, a2_start, a2_end, a2_seq = a2
start = min(a1_start, a2_start)
end = max(a1_end, a2_end)
hap_len1 = (abs(a1_start - start) + len(a1_seq) + abs(a1_end - end))
hap_len2 = (abs(a2_start - start) + len(a2_seq) + abs(a2_end - end))
min_size = min(hap_len1, hap_len2)
if min_size < min_len:
start -= (min_len - min_size) // 2
end += (min_len + min_size) // 2
# no negative fetch
start = max(0, start)
hap1_seq = ref.fetch(chrom, start, a1_start) + \
a1_seq + ref.fetch(chrom, a1_end, end)
hap2_seq = ref.fetch(chrom, start, a2_start) + \
a2_seq + ref.fetch(chrom, a2_end, end)
return str(hap1_seq), str(hap2_seq)
[docs]
def entry_boundaries(entry, ins_inflate=False):
"""
Get the start and end of an entry
:param `entry`: entry to get bounds
:type `entry`: :class:`pysam.VariantRecord`
:param `ins_inflate`: inflate INS boundaries by sv length
:type `ins_inflate`: bool, optional
:return: start/end
:rtype: tuple (int, int)
"""
start = entry.start
end = entry.stop
if ins_inflate and entry_variant_type(entry) == truvari.SV.INS:
size = entry_size(entry)
start -= size // 2
end += size // 2
return start, end
[docs]
def entry_distance(entryA, entryB):
"""
Calculate the start and end distances of the pair. Negative distances
indicate entryA is upstream of entryB
:param `entryA`: first entry
:type `entryA`: :class:`pysam.VariantRecord`
:param `entryB`: second entry
:type `entryB`: :class:`pysam.VariantRecord`
:return: starts and ends distance
:rtype: tuple (int, int)
"""
astart, aend = entry_boundaries(entryA)
bstart, bend = entry_boundaries(entryB)
return astart - bstart, aend - bend
[docs]
def entry_gt_comp(entryA, entryB, sampleA=None, sampleB=None):
"""
Compare the genotypes of two entries
:param `entryA`: first entry
:type `entryA`: :class:`pysam.VariantRecord`
:param `entryB`: second entry
:type `entryB`: :class:`pysam.VariantRecord`
:param `sampleA`: sample of entryA to check
:type `sampleA`: string, optional
:param `sampleB`: sample of entryB to check
:type `sampleB`: string, optional
:return: True if the genotypes are concordant
:rtype: bool
Example
>>> import truvari
>>> import pysam
>>> v = pysam.VariantFile('repo_utils/test_files/variants/input1.vcf.gz')
>>> a = next(v)
>>> b = next(v)
>>> truvari.entry_gt_comp(a, b)
True
"""
if not sampleA:
sampleA = entryA.samples.keys()[0]
if not sampleB:
sampleB = entryB.samples.keys()[0]
return truvari.get_gt(entryA.samples[sampleA]["GT"]) == truvari.get_gt(entryB.samples[sampleB]["GT"])
[docs]
def entry_is_filtered(entry, values=None):
"""
Checks if entry should be filtered given the filter values provided.
If values is None, assume that filter must have PASS or be blank '.'
:param `entry`: entry to check
:type `entry`: :class:`pysam.VariantRecord`
:param `values`: set of filter values for intersection
:type `values`: set, optional
:return: True if entry should be filtered
:rtype: bool
Example
>>> import truvari
>>> import pysam
>>> v = pysam.VariantFile('repo_utils/test_files/variants/input1.vcf.gz')
>>> truvari.entry_is_filtered(next(v)) # PASS shouldn't be filtered
False
>>> truvari.entry_is_filtered(next(v), set(["lowQ"])) # Call isn't lowQ, so filter
True
"""
if values is None:
return len(entry.filter) != 0 and 'PASS' not in entry.filter
return len(set(values).intersection(set(entry.filter))) == 0
[docs]
def entry_is_present(entry, sample=None, allow_missing=True):
"""
Checks if entry's sample genotype is present and is heterozygous or homozygous (a.k.a. present)
If allow_missing, just check for a 1 in the genotype. Otherwise, a missing ('.') genotype isn't
considered present
:param `entry`: entry to check
:type `entry`: :class:`pysam.VariantRecord`
:param `sample`: sample name
:type `sample`: string, optional
:return: True if variant is present in the sample
:rtype: bool
Example
>>> import truvari
>>> import pysam
>>> v = pysam.VariantFile('repo_utils/test_files/variants/input1.vcf.gz')
>>> truvari.entry_is_present(next(v))
True
"""
if sample is None:
sample = entry.samples.keys()[0]
if allow_missing:
return 1 in entry.samples[sample]["GT"]
return "GT" in entry.samples[sample] and \
truvari.get_gt(entry.samples[sample]["GT"]) in [
truvari.GT.HET, truvari.GT.HOM]
[docs]
def entry_seq_similarity(entryA, entryB, ref=None, min_len=0):
"""
Calculate sequence similarity of two entries. If reference is not None,
compare their shared reference context. Otherwise, use the unroll technique.
:param `entryA`: first entry
:type `entryA`: :class:`pysam.VariantRecord`
:param `entryB`: second entry
:type `entryB`: :class:`pysam.VariantRecord`
:param `ref`: Reference genome
:type `ref`: :class:`pysam.FastaFile`
:param `min_len`: Minimum length of reference context to generate
:type `min_len`: float, optional
:return: sequence similarity
:rtype: float
"""
# Shortcut to save compute - probably unneeded
if entryA.ref == entryB.ref and entryA.alts[0] == entryB.alts[0]:
return 1.0
# Inversions handled differently
if entry_variant_type(entryA) == truvari.SV.INV and entry_variant_type(entryB) == truvari.SV.INV:
allele1 = entryA.alts[0]
allele2 = entryB.alts[0]
return seqsim(allele1, allele2)
a_seq = entryA.ref if entry_variant_type(
entryA) == truvari.SV.DEL else entryA.alts[0]
a_seq = a_seq.upper()
b_seq = entryB.ref if entry_variant_type(
entryB) == truvari.SV.DEL else entryB.alts[0]
b_seq = b_seq.upper()
st_dist, ed_dist = entry_distance(entryA, entryB)
if st_dist == 0 or ed_dist == 0:
return seqsim(a_seq, b_seq)
if ref is not None:
allele1, allele2 = entry_shared_ref_context(
entryA, entryB, ref, min_len=min_len)
return seqsim(allele1, allele2)
# Directionality of rolling makes a difference
if st_dist < 0:
st_dist *= -1
else:
a_seq, b_seq = b_seq, a_seq
# Roll both ends and compute direct similarity
# Whichever is highest is how similar these sequences can be
return max(unroll_compare(a_seq, b_seq, st_dist), unroll_compare(b_seq, a_seq, -st_dist), seqsim(a_seq, b_seq))
[docs]
def entry_reciprocal_overlap(entry1, entry2):
"""
Calculates reciprocal overlap of two entries
:param `entry1`: First entry
:type `entry1`: :class:`pysam.VariantRecord`
:param `entry2`: Second entry
:type `entry2`: :class:`pysam.VariantRecord`
:return: The reciprocal overlap
:rtype: float
Example
>>> import truvari
>>> import pysam
>>> v = pysam.VariantFile('repo_utils/test_files/variants/input1.vcf.gz')
>>> a = next(v)
>>> b = next(v)
>>> truvari.entry_reciprocal_overlap(a, b)
0
"""
astart, aend = entry_boundaries(entry1, True)
bstart, bend = entry_boundaries(entry2, True)
return reciprocal_overlap(astart, aend, bstart, bend)
[docs]
def entry_shared_ref_context(entryA, entryB, ref, use_ref_seq=False, min_len=0):
"""
Get the shared reference context of two entries and create the haplotype
:param `entryA`: first entry
:type `entryA`: :class:`pysam.VariantRecord`
:param `entryB`: second entry
:type `entryB`: :class:`pysam.VariantRecord`
:param `ref`: Reference genome
:type `ref`: :class:`pysam.FastaFile`
:param `use_ref_seq`: If True, use the reference genome to get the variant sequence instead the entries
:type `use_ref_seq`: bool, optional
:param `min_len`: Minimum length of the reference context to create
:type `min_len`: int, optional
:return: sequences created
:rtype: tuple : (string, string)
"""
def get_props(entry):
"""
We compare the longer of the ref/alt sequence to increase comparability
"""
if use_ref_seq and (entry.alts[0] == "<DEL>" or len(entry.alts[0]) < len(entry.ref)):
return entry.chrom, entry.start, entry.stop, ref.fetch(entry.chrom, entry.start, entry.stop)
return entry.chrom, entry.start, entry.stop, entry.alts[0]
a1 = get_props(entryA)
a2 = get_props(entryB)
return create_pos_haplotype(a1, a2, ref, min_len=min_len)
[docs]
def entry_same_variant_type(entryA, entryB, dup_to_ins=False):
"""
Check if entryA svtype == entryB svtype
:param `entryA`: first entry
:type `entryA`: :class:`pysam.VariantRecord`
:param `entryB`: second entry
:type `entryB`: :class:`pysam.VariantRecord`
:param `dup_to_ins`: Convert DUP to INS types
:type `dup_to_ins`: bool
:return: True if entry SVTYPEs match
:rtype: bool
"""
a_type = entry_variant_type(entryA)
b_type = entry_variant_type(entryB)
if dup_to_ins and a_type == truvari.SV.DUP:
a_type = truvari.SV.INS
if dup_to_ins and b_type == truvari.SV.DUP:
b_type = truvari.SV.INS
return a_type == b_type
[docs]
def entry_size(entry):
"""
Determine the size of the variant.
.. note:: How size is determined
- Starts by trying to use INFO/SVLEN
- If SVLEN is unavailable and ALT field is an SV (e.g. <INS>, <DEL>, etc), \
use abs(vcf.start - vcf.end). The INFO/END tag needs to be available, \
especially for INS.
- If len of vcf.REF and vcf.ALT[0] are equal, return len of vcf.REF
- Otherwise, return the size difference of the sequence resolved call using \
abs(len(vcf.REF) - len(str(vcf.ALT[0])))
:param `entry`: entry to look at
:type `entry`: :class:`pysam.VariantRecord`
:return: the entry's size
:rtype: int
"""
if "SVLEN" in entry.info:
if type(entry.info["SVLEN"]) in [list, tuple]:
size = abs(entry.info["SVLEN"][0])
else:
size = abs(entry.info["SVLEN"])
elif entry.alts is not None and entry.alts[0].count("<"):
start, end = entry_boundaries(entry)
size = end - start
else:
r_len = len(entry.ref)
a_len = len(entry.alts[0]) if entry.alts is not None else 0
if r_len == a_len:
if r_len == 1:
size = 0 # SNPs are special
else:
size = r_len
else:
size = abs(r_len - a_len)
return size
[docs]
def entry_size_similarity(entryA, entryB):
"""
Calculate the size similarity and difference for two entries
:param `entryA`: first entry
:type `entryA`: :class:`pysam.VariantRecord`
:param `entryB`: second entry
:type `entryB`: :class:`pysam.VariantRecord`
:return: size similarity and size diff (A - B)
:rtype: (float, int)
Example
>>> import truvari
>>> import pysam
>>> v = pysam.VariantFile('repo_utils/test_files/variants/input1.vcf.gz')
>>> a = next(v)
>>> b = next(v)
>>> truvari.entry_size_similarity(a, b)
(0.07142857142857142, 13)
"""
sizeA = entry_size(entryA)
sizeB = entry_size(entryB)
return sizesim(sizeA, sizeB)
[docs]
def entry_to_hash(entry, hasher=hashlib.sha1):
"""
Turn variant into a key and hash with provided hasher
:param `entry`: entry
:type `entry`: :class:`pysam.VariantRecord`
:param `hasher`: hashing function
:type `entry`: method
:return: hash
:rtype: string
"""
return hasher(entry_to_key(entry).encode()).hexdigest()
[docs]
def entry_to_key(entry, prefix="", bounds=False):
"""
Turn a vcf entry into a hashable key string. Use the prefix (base/comp) to indicate the source
VCF when consolidating multiple files' calls. If bounds: call entry_boundaries for start/stop.
.. warning::
If a caller redundantly calls a variant exactly the same it will not have a unique key
:param `entry`: entry to stringify
:type `entry`: :class:`pysam.VariantRecord`
:param `prefix`: prefix
:type `prefix`: string, optional
:param `bounds`: use entry_boundaries
:type `bounds`: bool, optional
:return: hashable string uniquely identifying the variant
:rtype: string
"""
if prefix:
prefix += '.'
alt = entry.alts[0] if entry.alts is not None else "."
if bounds:
start, end = entry_boundaries(entry)
return f"{prefix}{entry.chrom}:{start}-{end}({entry.ref}|{alt})"
return f"{prefix}{entry.chrom}:{entry.start}-{entry.stop}.{alt}"
[docs]
def entry_variant_type(entry):
"""
Return entry's SVTYPE
.. note::
How svtype is determined:
- Starts by trying to use INFO/SVTYPE
- If SVTYPE is unavailable, infer if entry is a insertion or deletion by \
looking at the REF/ALT sequence size differences
- If REF/ALT sequences are not available, try to parse the <INS>, <DEL>, \
etc from the ALT column.
- Otherwise, assume 'UNK'
:param `entry`:
:type `entry`: :class:`pysam.VariantRecord`
:return: SV type
:rtype: :class:`truvari.SV`
"""
sv_alt_match = re.compile(r"\<(?P<SVTYPE>.*)\>")
ret_type = None
if "SVTYPE" in entry.info:
ret_type = entry.info["SVTYPE"]
if isinstance(ret_type, (list, tuple)):
ret_type = ret_type[0]
return truvari.get_svtype(ret_type)
if entry.alts is not None and not (entry.alts[0].count("<") or entry.alts[0].count(":")):
# Doesn't have <INS> or BNDs as the alt seq, then we can assume it's sequence resolved..?
if len(entry.ref) < len(entry.alts[0]):
ret_type = "INS"
elif len(entry.ref) > len(entry.alts[0]):
ret_type = "DEL"
elif len(entry.ref) == len(entry.alts[0]):
ret_type = "SNP" if len(entry.ref) == 1 else "UNK"
return truvari.get_svtype(ret_type)
mat = sv_alt_match.match(entry.alts[0]) if entry.alts is not None else None
if mat is not None:
return truvari.get_svtype(mat.groupdict()["SVTYPE"])
return truvari.get_svtype("UNK")
def entry_within_tree(entry, tree):
"""
Extract entry and tree boundaries to call `coords_within`
"""
qstart, qend = truvari.entry_boundaries(entry)
m_ovl = tree[entry.chrom].overlap(qstart, qend)
if len(m_ovl) != 1:
return False
m_ovl = list(m_ovl)[0]
end_within = truvari.entry_variant_type(entry) != truvari.SV.INS
return truvari.coords_within(qstart, qend, m_ovl.begin, m_ovl.end - 1, end_within)
def entry_within(entry, rstart, rend):
"""
Extract entry boundaries and type to call `coords_within`
"""
qstart, qend = truvari.entry_boundaries(entry)
end_within = truvari.entry_variant_type(entry) != truvari.SV.INS
return coords_within(qstart, qend, rstart, rend, end_within)
[docs]
def overlap_percent(astart, aend, bstart, bend):
"""
Calculates the percent of range A which overlaps with range B
:param `astart`: First range's start position
:type `astart`: int
:param `aend`: First range's end position
:type `aend`: int
:param `bstart`: Second range's start position
:type `bstart`: int
:param `bend`: Second range's end position
:type `bend`: int
:return: overlap percent
:rtype: float
"""
if astart >= bstart and aend <= bend:
return 1
ovl_start = max(astart, bstart)
ovl_end = min(aend, bend)
if ovl_start < ovl_end: # Otherwise, they're not overlapping
ovl_pct = float(ovl_end - ovl_start) / (aend - astart)
else:
ovl_pct = 0
return ovl_pct
[docs]
def overlaps(s1, e1, s2, e2):
"""
Check if two start/end ranges have overlap
:param `s1`: range 1 start
:type `s1`: int
:param `e1`: range 1 end
:type `e1`: int
:param `s2`: range 2 start
:type `s2`: int
:param `e2`: range 2 end
:type `e2`: int
:return: True if ranges overlap
:rtype: bool
"""
s_cand = max(s1, s2)
e_cand = min(e1, e2)
return s_cand < e_cand
[docs]
def reciprocal_overlap(astart, aend, bstart, bend):
"""
Calculates reciprocal overlap of two ranges
:param `astart`: First range's start position
:type `astart`: int
:param `aend`: First range's end position
:type `aend`: int
:param `bstart`: Second range's start position
:type `bstart`: int
:param `bend`: Second range's end position
:type `bend`: int
:return: reciprocal overlap
:rtype: float
"""
ovl_start = max(astart, bstart)
ovl_end = min(aend, bend)
if ovl_start < ovl_end: # Otherwise, they're not overlapping
ovl_pct = float(ovl_end - ovl_start) / \
max(aend - astart, bend - bstart)
else:
ovl_pct = 0
return ovl_pct
[docs]
def seqsim(allele1, allele2):
"""
Calculate similarity of two sequences
:param `allele1`: first entry
:type `allele1`: :class:`pysam.VariantRecord`
:param `allele2`: second entry
:type `allele2`: :class:`pysam.VariantRecord`
:return: sequence similarity
:rtype: float
"""
allele1 = allele1.upper()
allele2 = allele2.upper()
scr = edlib.align(allele1, allele2)
totlen = len(allele1) + len(allele2)
return (totlen - scr["editDistance"]) / totlen
[docs]
def sizesim(sizeA, sizeB):
"""
Calculate the size similarity percent and size diff for two sizes
:param `sizeA`: first size
:type `sizeA`: int
:param `sizeB`: second size
:type `sizeB`: int
:return: size similarity percent and size diff (A - B)
:rtype: (float, int)
"""
if sizeA == 0 or sizeB == 0:
if sizeA == sizeB:
return 1, 0
sizeA = max(sizeA, 1)
sizeB = max(sizeB, 1)
return min(sizeA, sizeB) / float(max(sizeA, sizeB)), sizeA - sizeB
[docs]
def unroll_compare(seqA, seqB, p):
"""
Unroll two sequences and compare.
See https://gist.github.com/ACEnglish/1e7421c46ee10c71bee4c03982e5df6c for details
:param `seqA`: sequence upstream sequence
:type `seqA`: string
:param `seqB`: sequence downstream sequence
:type `seqB`: string
:param `p`: how many bases upstream seqA is from seqB
:type `p`: integer
:return: sequence similarity of seqA vs seqB after unrolling
:rtype: float
"""
f = p % len(seqB)
uB = seqB[-f:] + seqB[:-f]
return seqsim(seqA, uB)