Alignment deduplication#

import oxbow as ox
import polars as pl

Walkthrough#

To illustrate step by step, let’s grab a small sample SAM file and materialize it in memory as a Polars DataFrame.

url = "https://oxbow-ngs.s3.us-east-2.amazonaws.com/Col0_C1.100k.sam"

# Let's use oxbow to read in the SAM file as a polars dataframe
df = ox.from_sam(url).to_polars()
df.head()
shape: (5, 12)
qnameflagrnameposmapqcigarrnextpnexttlenseqqualend
stru16cati32u8strcati32i32strstri32
"HWI-ST486:305:C0RH5ACXX:1:2104…99"1"115040"43S13M6872N45M""1"8048170"CAGACAGGAACTAGCAATGCTTGAAATCAA…"CCCFFFFFHHHHHJIJJJJIJJJJJJJJJJ…8079
"HWI-ST486:305:C0RH5ACXX:1:2301…163"1"365440"96M""1"3658105"GAGAAAACAAATACATAATCGGAGAAATAC…"@@BDFFFFHGHHHJJJJGIIIJGEIGFEHB…3749
"HWI-ST486:305:C0RH5ACXX:1:2301…83"1"365840"101M""1"3654-105"AAACAAATACATAATCGGAGAAATACAGAT…"C?CDDACDDDDDB?5DDDEEEDCDCDCDCA…3758
"HWI-ST486:305:C0RH5ACXX:1:2105…99"1"366640"98M""1"3743178"ACATAATCGGAGAAATACAGATTACAGAGA…"@C@FFFFFHHDHHJJDGGIJJIFIJJGIIG…3763
"HWI-ST486:305:C0RH5ACXX:1:2304…99"1"367540"91M""1"3737163"GAGAAATACAGATTACAGAGAGCGAGAGAG…"@@@FBDEDHHFHFGBGBECGGIGEHEEEDE…3765

Helper functions#

# If the bit 0x10 is set, the read is on the reverse strand
STRAND_BIT = 0x10


def parse_cigar(cigar_str: str) -> list[tuple[str, int]]:
    """Parse the CIGAR string into a list of tuples (operation, length)

    Example:
        >>> parse_cigar("76M")
        [('M', 76)]
        >>> parse_cigar("10M1I65M")
        [('M', 10), ('I', 1), ('M', 65)]
    """
    result = []
    current_number = ""

    for char in cigar_str:
        if char.isdigit():
            current_number += char
        else:
            if current_number:
                result.append((char, int(current_number)))
                current_number = ""

    return result


def get_unclipped_5p_start(row) -> int:
    """
    Get the unclipped 5′ start position from the CIGAR string.

    Accounts for both soft clips (S) and hard clips (H), matching the
    reference implementation in htsjdk.

    Args:
        row: Row from the SAM file

    Returns:
        int: Unclipped 5′ start position
    """
    pos = row["pos"]
    cigar = row["cigar"]
    flag = row["flag"]

    # Parse CIGAR string
    cigar_ops = parse_cigar(cigar)

    # Check if reverse strand (bit 0x10 set)
    is_reverse = flag & STRAND_BIT

    if not is_reverse:
        # Forward strand: 5′ end = POS - (number of leading soft/hard-clipped bases)
        leading_clips = 0
        for op, length in cigar_ops:
            if op in ("S", "H"):
                leading_clips += length
            else:
                break  # Stop at first non-clip operation
        return pos - leading_clips
    else:
        # Reverse strand: 5′ end is at POS + (aligned length) + (trailing soft/hard-clipped bases) - 1
        aligned_length = 0
        trailing_clips = 0

        # Calculate aligned length (M, =, X, D, N operations)
        for op, length in cigar_ops:
            if op in ["M", "=", "X", "D", "N"]:
                aligned_length += length

        # Find trailing soft/hard clips (S or H operations at the end)
        for op, length in reversed(cigar_ops):
            if op in ("S", "H"):
                trailing_clips += length
            else:
                break  # Stop at first non-clip operation from the end

        return pos + aligned_length + trailing_clips - 1


def get_quality_score_sum(qual_str):
    """Calculate the sum of quality scores from a string of quality scores"""
    return sum(ord(c) - 33 for c in qual_str if c != " ")


def build_dedup_key(rnames, positions, strands):
    """Makes a dedup key for a read pair"""
    items = sorted(zip(rnames, positions, strands))
    if len(items) < 2:
        print(f"WARNING: read is missing pair: {items}")
        return None
    return f"{items[0][0]}:{items[0][1]}:{items[0][2]}__{items[1][0]}:{items[1][1]}:{items[1][2]}"

Compute derived fields#

df = df.with_columns(
    pl.struct(["pos", "cigar", "flag"])
    .map_elements(get_unclipped_5p_start, return_dtype=pl.Int64)
    .alias("5p_start"),

    pl.when((pl.col("flag") & STRAND_BIT) == 0)
    .then(pl.lit("+"))
    .otherwise(pl.lit("-"))
    .alias("strand"),

    pl.col("qual").map_elements(get_quality_score_sum, return_dtype=pl.Int64)
    .alias("total_quality")
)

df.head()
shape: (5, 15)
qnameflagrnameposmapqcigarrnextpnexttlenseqqualend5p_startstrandtotal_quality
stru16cati32u8strcati32i32strstri32i64stri64
"HWI-ST486:305:C0RH5ACXX:1:2104…99"1"115040"43S13M6872N45M""1"8048170"CAGACAGGAACTAGCAATGCTTGAAATCAA…"CCCFFFFFHHHHHJIJJJJIJJJJJJJJJJ…80791107"+"3909
"HWI-ST486:305:C0RH5ACXX:1:2301…163"1"365440"96M""1"3658105"GAGAAAACAAATACATAATCGGAGAAATAC…"@@BDFFFFHGHHHJJJJGIIIJGEIGFEHB…37493654"+"3385
"HWI-ST486:305:C0RH5ACXX:1:2301…83"1"365840"101M""1"3654-105"AAACAAATACATAATCGGAGAAATACAGAT…"C?CDDACDDDDDB?5DDDEEEDCDCDCDCA…37583758"-"3690
"HWI-ST486:305:C0RH5ACXX:1:2105…99"1"366640"98M""1"3743178"ACATAATCGGAGAAATACAGATTACAGAGA…"@C@FFFFFHHDHHJJDGGIJJIFIJJGIIG…37633666"+"3371
"HWI-ST486:305:C0RH5ACXX:1:2304…99"1"367540"91M""1"3737163"GAGAAATACAGATTACAGAGAGCGAGAGAG…"@@@FBDEDHHFHFGBGBECGGIGEHEEEDE…37653675"+"2766

Group the reads into pairs#

We assume that the qname corresponds to read pairs. We group by qname and carry the original alignment records through along with the fields needed for deduplication.

pairs_df = df.group_by("qname").agg(
    [
        pl.col("rname").alias("rnames"),
        pl.col("5p_start").alias("5p_starts"),
        pl.col("strand").alias("strands"),
        pl.col("total_quality").alias("total_qualities"),
        pl.struct("*").alias("alignments"),
    ]
)
pairs_df.head()
shape: (5, 6)
qnamernames5p_startsstrandstotal_qualitiesalignments
strlist[cat]list[i64]list[str]list[i64]list[struct[14]]
"HWI-ST486:305:C0RH5ACXX:1:1103…["1", "1"][50419, 50551]["+", "-"][3438, 3411][{99,"1",50419,40,"101M","1",50451,133,"CTTCTTCTTCTCCTCAGCAGCAGGAGCAGCTGCCGCACCACCGCCAGCAGCTGGAGCAGCAGCTGCAACCGGAGCACCACCTCCACCACCAGCACCAACGT","@CCFFFFFFFHFGGIJJJIGIGIGHB@@EHIGGHFHHGEHAFGBF8FECCHHGFAHFD@>BEE@ACC>@A:=?/9?A?<<??BDDCBDDBAABC?AB<2<2",50519,50419,"+",3438}, {147,"1",50451,40,"101M","1",50419,-133,"CCGCACCACCGCCAGCAGCTGGAGCAGCAGCTGCAACCGGAGCACCACCTCCACCACCAGCACCAACGTTCATGATGAGATCAGTCACGTTACGTTTCTCA","9@>8)90)5@?::CDCA?C@DCDDCADCC>@>CCB<EHEGHC@4D@5FB;GEDHGFIHF9DBGIGFJIIJJJIEJHGIGGEGHHGFEIFDHHHFEDFF@BB",50551,50551,"-",3411}]
"HWI-ST486:305:C0RH5ACXX:1:1202…["1", "1"][34525, 34641]["+", "-"][3861, 3678][{163,"1",34525,40,"101M","1",34541,117,"TCCTCTTTATCTTTCTCCATTTTATCTAATGCATCAGTTTCTGCGTCACTCCCTGAAGGTGTATTTGAGCCACAGGATGAGCGGTCCACAAGATTTTTCTT","CC@FFFFFHHHHHIJJIJGHIIJGJJJJJJIJJJIJJIIJIJGIIGGIJJJIIIGHIJI8C@GIIJIIJJIIJJJHGHGHFFFDCBDDDDDDCCCACDDCD",34625,34525,"+",3861}, {83,"1",34541,40,"101M","1",34525,-117,"CCATTTTATCTAATGCATCAGTTTCTGCGTCACTCCCTGAAGGTGTATTTGAGCCACAGGATGAGCGGTCCACAAGATTTTTCTTCTGGGCAGTGTTTGAG","4:DDC@ACDDCEDCACCC@>C@AB@@BCA>;,@7FHH@HFEIGGIGGJIGIIGIHHGJJJJJIIGJJJIJGGJJJIJJJJIIJJJIJJHHHHHFFFFFCCC",34641,34641,"-",3678}]
"HWI-ST486:305:C0RH5ACXX:1:2204…["1", "1"][50617, 51160]["+", "-"][3584, 3673][{163,"1",50617,40,"15M251N83M3S","1",50898,131,"CGCGATTTTGTCAGCCGTGATAGCGATACCCTCGTCCTCGAGGATCATAACAGCGTAGCTGCAAGCAAGCTCTCCAACTGTCGACATTTTTTTATCCTAAG","@CCFFFDFHFDFHIGJJFHIJIIIJJGGHIIIIGHIHGHIBHJIEGHEEHAHGHG5;8ACECEDDDDDCDDDCCDDDDCD:@BBB55>DCC<@AACCCCCC",50965,50617,"+",3584}, {83,"1",50898,40,"66M162N35M","1",50617,-131,"CTCGTCCTCGAGGATCATAACAGCGTAGCTGCAAGCAAGCTCTCCAACTGTCGACATTTTTTTATCCTAAGAGTAAGATCGAAAATGCTTTTCGCAAGGCG","B<@A<D@@DCCCCCACDCC<B?BBDCCADDACDDBEECDDFFFFHHHHIGEEF@GIIIJJIIGGGIIJJIIIIIGGGGGIHGJJIJJIFHHHHFFFFFC@C",51160,51160,"-",3673}]
"HWI-ST486:305:C0RH5ACXX:1:1301…["1", "1"][73998, 74197]["+", "-"][3833, 3778][{163,"1",73998,40,"101M","1",74097,200,"CGCAAACCGTGTTGTAATGTATTGTTATTTAGTACAAGCTCGGATTTAGAAAACATCGTCATCGTTCATCTTATTCTGAAAATCAAAGCATTGAACTTATT","CCCFFFFFHFHHHJHHIJJFHIJIIGGHJJJJHIJIJIJHIJJGIIJJJGFHIIIIJJDHIJJJEHHFHFFFFFFFE>CEEEEDDDDDDDCCDDDDDDDDC",74098,73998,"+",3833}, {83,"1",74097,40,"101M","1",73998,-200,"TTATTTTCTCATTCCGGACGAGTCTTGACGATGAAATTCTGACGGCTAATGGGATTCATAACAACGGGAGGACCAGCGGTTCTTGGCCATGCTTGGAATTT","CDCDDCCDDCBB<B>B@ABDDCDDDBDDCDEEEFFFFFFHHHJIJJJJHDJJJIIIJJJGHHGBJJJJIIHFJJGJJJJJJIJJJJFJGHHHHFFFFFCCC",74197,74197,"-",3778}]
"HWI-ST486:305:C0RH5ACXX:1:2202…["1", "1"][50499, 50957]["+", "-"][3610, 3320][{163,"1",50499,40,"101M","1",50610,208,"CTCCACCACCAGCACCAACGTTCATGATGAGATCAGTCACGTTACGTTTCTCAGCCATCTTGGCGAATAGCATTGGCCAGTATGACTCAATACTAACACCA","@C@FFFFFHHHHHGIJJIJJGHIFJJIJGIEGIJIGCGHGI?GHIHFHIIDHG>FGHIIEECEGHHEFE@CC;CE;ACC=5>BCCDD@>9:@@:ACDC@??",50599,50499,"+",3610}, {83,"1",50610,40,"22M251N75M","1",50499,-208,"CCAAGGTCGCGATTTTGTCAGCCGTGATAGCGATCCCCTCGTCCTCGAGGATCATAACAGCGTAGCTGCAAGCAAGCTCTCCAACTGTCGACATTTT","C:4&BBB<B?::DCC>9<BDBD@CA>DB?88=/3'CEDEHIIIIIIHGIIIIIIIIIIIIHGGGH@GFEIIIIGHDEGIGEHFAAHHDHDDFFFCC@",50957,50957,"-",3320}]

Build deduplication keys#

Build a deduplication key for each read pair and filter out unpaired reads.

pairs_df = pairs_df.with_columns(
    pl.struct(["rnames", "5p_starts", "strands"])
    .map_elements(
        lambda s: build_dedup_key(s["rnames"], s["5p_starts"], s["strands"]),
        return_dtype=pl.String
    )
    .alias("dedup_key"),
).filter(pl.col("dedup_key").is_not_null())

pairs_df[("dedup_key",)].head()
WARNING: read is missing pair: [('1', 99987, '+')]
WARNING: read is missing pair: [('1', 99981, '+')]
shape: (5, 1)
dedup_key
str
"1:50419:+__1:50551:-"
"1:34525:+__1:34641:-"
"1:50617:+__1:51160:-"
"1:73998:+__1:74197:-"
"1:50499:+__1:50957:-"

Resolve duplicates#

We choose the best read pair across duplicates by the highest total quality score. Sorting by dedup_key first minimizes the shuffle when the data is already coordinate-sorted.

best_pairs_df = pairs_df.sort(
    ["dedup_key", "total_qualities"], descending=[False, True]
).unique(
    subset=["dedup_key"]
)

# Get the total number of duplicates
total_pair_dups = pairs_df.height - best_pairs_df.height

print("Total pair duplicates:", total_pair_dups)

best_pairs_df.head()
Total pair duplicates: 3123
shape: (5, 7)
qnamernames5p_startsstrandstotal_qualitiesalignmentsdedup_key
strlist[cat]list[i64]list[str]list[i64]list[struct[14]]str
"HWI-ST486:305:C0RH5ACXX:1:1105…["1", "1"][10761, 10937]["+", "-"][3934, 4016][{163,"1",10761,40,"101M","1",10837,177,"GAAAATTATGATCCGTAGAGACAGCATTTAAAAGTTCCTTACGTCCACGTAAAATAATATATCAATTTATACATATACATGTGTAAACTGTGTATATATAG","CCCFFFFFHHGHHJJGIJIJJIHHGGIJJJJJJIHHIJJJJJIGIJJJIIGHFHIJJJJJJJJJIJJJIJJJJJJHHHHHHFHFFFFFFFCE>CFEFEEFC",10861,10761,"+",3934}, {83,"1",10837,40,"101M","1",10761,-177,"ACATGTGTAAACTGTGTATATATAGGGTAGGTATATGTGTATATATATAGTAATTGACAAATGATTTAGGTTCTAACATATATTCTAAAAGTACTCATGAG","ADFFFFFFHHEHHJIIJJJIJJJJJJIIJIJGJJJJJJIJJJIJJJIJJIJJJIJIJJJJJJJJJJJJJJJJIJJJJJJJJJJJJIJJHHHHFFFFFFCC@",10937,10937,"-",4016}]"1:10761:+__1:10937:-"
"HWI-ST486:305:C0RH5ACXX:1:2104…["1", "1"][1107, 8148]["+", "-"][3909, 3977][{99,"1",1150,40,"43S13M6872N45M","1",8048,170,"CAGACAGGAACTAGCAATGCTTGAAATCAAGAACTTGAATTGAAATAGTTTTTTACTGGATCAGAGACTACTCAATATCCCCAAACTTGGAAATTAGTTTG","CCCFFFFFHHHHHJIJJJJIJJJJJJJJJJJIJJJJJJIJJJJJJJJJBGGJJJJJJJJIIJJJJJJJIHHHHHFFFFFFFDEDEDDDDDCDCCDDDCCED",8079,1107,"+",3909}, {147,"1",8048,40,"101M","1",1150,-170,"ACTCAATATCCCCAAACTTGGAAATTAGTTTGTTGCTTGAGGTCTAAGATACTTCTATATATGGAAAAAGATTTTCAAAGCCAGATATTTCCACAAGTTTG","ADDEFDDDDDDFFFFEHEHHHJIJJJJJJJJIJJJJJJJJJIJJJJJJIHHJJJIJJIJJJJJJJJJJJJJJJJJJJJJJJJIJJJJJHHHHFFFFFFCCC",8148,8148,"-",3977}]"1:1107:+__1:8148:-"
"HWI-ST486:305:C0RH5ACXX:1:1206…["1", "1"][11368, 11668]["+", "-"][3522, 3900][{163,"1",11369,40,"1S100M","1",11567,301,"TGGCACAGAGAGTACAATTCATGAAATTTATAAGCTTTTTTCCCACTCATCAATTATAATCTCAAGTTATAAATATCAAAACTGAAAAAAGAAGAAGATGA","?B@ADDFFFDDF+A:EGGHEDF@HEHEHIGHGGCD?CFGGDAD<FFH@FGCFHEDHG>BGBGBHCHFHGGIIGIECE>;AEH;@BCEFEDBDD?>CCCD@:",11468,11368,"+",3522}, {83,"1",11567,40,"90M1D11M","1",11369,-301,"AGTTATAGCGAATATTATGGATATAATTAGCTAACATTTGAGGCATGTGAACCTGTTATTTTATGTAAATTATATATATAGTTTATATACAAATTGAAAAG","DDDFFFFEEHHGIIJIHIGEGIJJJIIIJHGIHIEGHEGIGGEJJIHEIJIHEGGGIHIIHIJIIHIJIJJGGIJJHIJGJGIJIIHGAHHHFFFFFF@@C",11668,11668,"-",3900}]"1:11368:+__1:11668:-"
"HWI-ST486:305:C0RH5ACXX:1:2302…["1", "1"][11483, 11608]["+", "-"][3953, 3986][{99,"1",11483,40,"101M","1",11508,126,"ATGAAATGCTGTAGGCACAGAGGTTCACCTAGTGTCAAGTATTAAGATTAACATATAACTTATGAAGATGATGAGTAACCAACGAGTTATAGCGAATATTA","CCCFFFFFHHHHHJJJJJJJIJJGGIJJJIJJFHHIJJJIIJIJJJJIJJJJJJJJJJJJJJJJJJJJJIIJJJIGHIJIJJHGFF@DEEECEDBDDDEEC",11583,11483,"+",3953}, {147,"1",11508,40,"101M","1",11483,-126,"CACCTAGTGTCAAGTATTAAGATTAACATATAACTTATGAAGATGATGAGTAACCAACGAGTTATAGCGAATATTATGGATATAATTAGCTAACATTTGAG","AADEEEEFFFFFFHHHHHHHJJJJIHJJJJJIGJIIIIJJJJJJIJJJIJIHFJIJIJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJHHHHHFFFFFCCC",11608,11608,"-",3986}]"1:11483:+__1:11608:-"
"HWI-ST486:305:C0RH5ACXX:1:2304…["1", "1"][11582, 11716]["+", "-"][4000, 4000][{163,"1",11583,40,"1S18M1D82M","1",11616,135,"CATGGATATAATTAGCTAAATTTGAGGCATGTGAACCTGTTATTTTATGTAAATTATATATATAGTTTATATACAAAATTGAAAAGATGCGAGTTTCAACA","CCCFFFFFHHHHHJJJJJJJJJJJJJJIJIIHIJJJJJJIJJJJJJJJJIIIJJJJJJJJJJJJJIIJJJJJJJJJIJIJJGJJJHHHHHFFDCCDEEDDC",11683,11582,"+",4000}, {83,"1",11616,40,"101M","1",11583,-135,"AACCTGTTATTTTATGTAAATTATATATATAGTTTATATACAAAATTGAAAAGATGCGAGTTTCAACATGGTGACAAAAGCCTAATGATGATGAACATCAA","DCCEEEFFFFFFHHGHHHHHJJIJJJIJJJIJJJJJJJJIJJJIJJJIJJIJJJJJJJJJJIIIJJJJJJJJJHJIJJJJJJJJJJJJHHHGHFFFFFCCC",11716,11716,"-",4000}]"1:11582:+__1:11716:-"

Recover deduplicated alignments#

Explode and unnest the carried alignment records to recover the original fields.

deduped_df = best_pairs_df.select(
    "qname", "alignments"
).explode("alignments").select(
    pl.col("alignments").struct.unnest()
)

deduped_df.head()
shape: (5, 14)
flagrnameposmapqcigarrnextpnexttlenseqqualend5p_startstrandtotal_quality
u16cati32u8strcati32i32strstri32i64stri64
163"1"1076140"101M""1"10837177"GAAAATTATGATCCGTAGAGACAGCATTTA…"CCCFFFFFHHGHHJJGIJIJJIHHGGIJJJ…1086110761"+"3934
83"1"1083740"101M""1"10761-177"ACATGTGTAAACTGTGTATATATAGGGTAG…"ADFFFFFFHHEHHJIIJJJIJJJJJJIIJI…1093710937"-"4016
99"1"115040"43S13M6872N45M""1"8048170"CAGACAGGAACTAGCAATGCTTGAAATCAA…"CCCFFFFFHHHHHJIJJJJIJJJJJJJJJJ…80791107"+"3909
147"1"804840"101M""1"1150-170"ACTCAATATCCCCAAACTTGGAAATTAGTT…"ADDEFDDDDDDFFFFEHEHHHJIJJJJJJJ…81488148"-"3977
163"1"1136940"1S100M""1"11567301"TGGCACAGAGAGTACAATTCATGAAATTTA…"?B@ADDFFFDDF+A:EGGHEDF@HEHEHIG…1146811368"+"3522

Full streaming pipeline#

Here’s the entire deduplication pipeline chained together on a Polars LazyFrame:

ds = ox.from_sam(url)

ldf = ds.to_polars(lazy=True).with_columns(
    pl.struct(["pos", "cigar", "flag"])
    .map_elements(get_unclipped_5p_start, return_dtype=pl.Int64)
    .alias("5p_start"),

    pl.when((pl.col("flag") & STRAND_BIT) == 0)
    .then(pl.lit("+"))
    .otherwise(pl.lit("-"))
    .alias("strand"),

    pl.col("qual").map_elements(get_quality_score_sum, return_dtype=pl.Int64)
    .alias("total_quality")
).group_by("qname").agg(
    [
        pl.col("rname").alias("rnames"),
        pl.col("5p_start").alias("5p_starts"),
        pl.col("strand").alias("strands"),
        pl.col("total_quality").alias("total_qualities"),
        pl.struct(ds.schema.names).alias("alignments"),
    ]
).with_columns(
    pl.struct(["rnames", "5p_starts", "strands"])
    .map_elements(
        lambda s: build_dedup_key(s["rnames"], s["5p_starts"], s["strands"]),
        return_dtype=pl.String
    )
    .alias("dedup_key"),
).filter(
    pl.col("dedup_key").is_not_null()
).sort(
    ["dedup_key", "total_qualities"], descending=[False, True]
).unique(
    subset=["dedup_key"]
).select(
    "qname", "alignments"
).explode(
    "alignments"
).select(
    pl.col("alignments").struct.unnest()
)

ldf.show_graph()
../../_images/e589b95ebbaf5bb696b48e5f5e9fcc76d192f7117bf76c2f5a0e2a3745a1190a.svg

Let’s execute the query plan in streaming mode, writing the results to a Parquet file:

ldf.sink_parquet("data/Col0_C1.100k.dedup.pq")
WARNING: read is missing pair: [('1', 99981, '+')]
WARNING: read is missing pair: [('1', 99987, '+')]