Alignment deduplication#
import oxbow as ox
import polars as pl
Walkthrough#
To illustrate step by step, let’s grab a small sample SAM file and materialize it in memory as a Polars DataFrame.
url = "https://oxbow-ngs.s3.us-east-2.amazonaws.com/Col0_C1.100k.sam"
# Let's use oxbow to read in the SAM file as a polars dataframe
df = ox.from_sam(url).to_polars()
df.head()
| qname | flag | rname | pos | mapq | cigar | rnext | pnext | tlen | seq | qual | end |
|---|---|---|---|---|---|---|---|---|---|---|---|
| str | u16 | cat | i32 | u8 | str | cat | i32 | i32 | str | str | i32 |
| "HWI-ST486:305:C0RH5ACXX:1:2104… | 99 | "1" | 1150 | 40 | "43S13M6872N45M" | "1" | 8048 | 170 | "CAGACAGGAACTAGCAATGCTTGAAATCAA… | "CCCFFFFFHHHHHJIJJJJIJJJJJJJJJJ… | 8079 |
| "HWI-ST486:305:C0RH5ACXX:1:2301… | 163 | "1" | 3654 | 40 | "96M" | "1" | 3658 | 105 | "GAGAAAACAAATACATAATCGGAGAAATAC… | "@@BDFFFFHGHHHJJJJGIIIJGEIGFEHB… | 3749 |
| "HWI-ST486:305:C0RH5ACXX:1:2301… | 83 | "1" | 3658 | 40 | "101M" | "1" | 3654 | -105 | "AAACAAATACATAATCGGAGAAATACAGAT… | "C?CDDACDDDDDB?5DDDEEEDCDCDCDCA… | 3758 |
| "HWI-ST486:305:C0RH5ACXX:1:2105… | 99 | "1" | 3666 | 40 | "98M" | "1" | 3743 | 178 | "ACATAATCGGAGAAATACAGATTACAGAGA… | "@C@FFFFFHHDHHJJDGGIJJIFIJJGIIG… | 3763 |
| "HWI-ST486:305:C0RH5ACXX:1:2304… | 99 | "1" | 3675 | 40 | "91M" | "1" | 3737 | 163 | "GAGAAATACAGATTACAGAGAGCGAGAGAG… | "@@@FBDEDHHFHFGBGBECGGIGEHEEEDE… | 3765 |
Helper functions#
# If the bit 0x10 is set, the read is on the reverse strand
STRAND_BIT = 0x10
def parse_cigar(cigar_str: str) -> list[tuple[str, int]]:
"""Parse the CIGAR string into a list of tuples (operation, length)
Example:
>>> parse_cigar("76M")
[('M', 76)]
>>> parse_cigar("10M1I65M")
[('M', 10), ('I', 1), ('M', 65)]
"""
result = []
current_number = ""
for char in cigar_str:
if char.isdigit():
current_number += char
else:
if current_number:
result.append((char, int(current_number)))
current_number = ""
return result
def get_unclipped_5p_start(row) -> int:
"""
Get the unclipped 5′ start position from the CIGAR string.
Accounts for both soft clips (S) and hard clips (H), matching the
reference implementation in htsjdk.
Args:
row: Row from the SAM file
Returns:
int: Unclipped 5′ start position
"""
pos = row["pos"]
cigar = row["cigar"]
flag = row["flag"]
# Parse CIGAR string
cigar_ops = parse_cigar(cigar)
# Check if reverse strand (bit 0x10 set)
is_reverse = flag & STRAND_BIT
if not is_reverse:
# Forward strand: 5′ end = POS - (number of leading soft/hard-clipped bases)
leading_clips = 0
for op, length in cigar_ops:
if op in ("S", "H"):
leading_clips += length
else:
break # Stop at first non-clip operation
return pos - leading_clips
else:
# Reverse strand: 5′ end is at POS + (aligned length) + (trailing soft/hard-clipped bases) - 1
aligned_length = 0
trailing_clips = 0
# Calculate aligned length (M, =, X, D, N operations)
for op, length in cigar_ops:
if op in ["M", "=", "X", "D", "N"]:
aligned_length += length
# Find trailing soft/hard clips (S or H operations at the end)
for op, length in reversed(cigar_ops):
if op in ("S", "H"):
trailing_clips += length
else:
break # Stop at first non-clip operation from the end
return pos + aligned_length + trailing_clips - 1
def get_quality_score_sum(qual_str):
"""Calculate the sum of quality scores from a string of quality scores"""
return sum(ord(c) - 33 for c in qual_str if c != " ")
def build_dedup_key(rnames, positions, strands):
"""Makes a dedup key for a read pair"""
items = sorted(zip(rnames, positions, strands))
if len(items) < 2:
print(f"WARNING: read is missing pair: {items}")
return None
return f"{items[0][0]}:{items[0][1]}:{items[0][2]}__{items[1][0]}:{items[1][1]}:{items[1][2]}"
Compute derived fields#
df = df.with_columns(
pl.struct(["pos", "cigar", "flag"])
.map_elements(get_unclipped_5p_start, return_dtype=pl.Int64)
.alias("5p_start"),
pl.when((pl.col("flag") & STRAND_BIT) == 0)
.then(pl.lit("+"))
.otherwise(pl.lit("-"))
.alias("strand"),
pl.col("qual").map_elements(get_quality_score_sum, return_dtype=pl.Int64)
.alias("total_quality")
)
df.head()
| qname | flag | rname | pos | mapq | cigar | rnext | pnext | tlen | seq | qual | end | 5p_start | strand | total_quality |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| str | u16 | cat | i32 | u8 | str | cat | i32 | i32 | str | str | i32 | i64 | str | i64 |
| "HWI-ST486:305:C0RH5ACXX:1:2104… | 99 | "1" | 1150 | 40 | "43S13M6872N45M" | "1" | 8048 | 170 | "CAGACAGGAACTAGCAATGCTTGAAATCAA… | "CCCFFFFFHHHHHJIJJJJIJJJJJJJJJJ… | 8079 | 1107 | "+" | 3909 |
| "HWI-ST486:305:C0RH5ACXX:1:2301… | 163 | "1" | 3654 | 40 | "96M" | "1" | 3658 | 105 | "GAGAAAACAAATACATAATCGGAGAAATAC… | "@@BDFFFFHGHHHJJJJGIIIJGEIGFEHB… | 3749 | 3654 | "+" | 3385 |
| "HWI-ST486:305:C0RH5ACXX:1:2301… | 83 | "1" | 3658 | 40 | "101M" | "1" | 3654 | -105 | "AAACAAATACATAATCGGAGAAATACAGAT… | "C?CDDACDDDDDB?5DDDEEEDCDCDCDCA… | 3758 | 3758 | "-" | 3690 |
| "HWI-ST486:305:C0RH5ACXX:1:2105… | 99 | "1" | 3666 | 40 | "98M" | "1" | 3743 | 178 | "ACATAATCGGAGAAATACAGATTACAGAGA… | "@C@FFFFFHHDHHJJDGGIJJIFIJJGIIG… | 3763 | 3666 | "+" | 3371 |
| "HWI-ST486:305:C0RH5ACXX:1:2304… | 99 | "1" | 3675 | 40 | "91M" | "1" | 3737 | 163 | "GAGAAATACAGATTACAGAGAGCGAGAGAG… | "@@@FBDEDHHFHFGBGBECGGIGEHEEEDE… | 3765 | 3675 | "+" | 2766 |
Group the reads into pairs#
We assume that the qname corresponds to read pairs. We group by qname and carry the original alignment records through along with the fields needed for deduplication.
pairs_df = df.group_by("qname").agg(
[
pl.col("rname").alias("rnames"),
pl.col("5p_start").alias("5p_starts"),
pl.col("strand").alias("strands"),
pl.col("total_quality").alias("total_qualities"),
pl.struct("*").alias("alignments"),
]
)
pairs_df.head()
| qname | rnames | 5p_starts | strands | total_qualities | alignments |
|---|---|---|---|---|---|
| str | list[cat] | list[i64] | list[str] | list[i64] | list[struct[14]] |
| "HWI-ST486:305:C0RH5ACXX:1:1303… | ["1", "1"] | [93524, 93683] | ["+", "-"] | [3366, 3505] | [{163,"1",93524,40,"101M","1",93583,160,"TCACATCCCGAGCGAAGATCTTGGAACTCCAGAGAGGTTCAGGTTCATGCTTCCTGATAGGCATTGTCTTTGGGAGGTCCCACTAGTGGGACATAAGGGAA","B@CDFBA;FHFFFGGEHEGGGICC9CGFHGEEBDG7(?DF?FE=F>@<FGGEIJDECHIID<=AHF>CDFDD9ABC>B?CD:ADC:5:AB?A@>@A>A??<",93624,93524,"+",3366}, {83,"1",93583,40,"101M","1",93524,-160,"GGCATTGTCTTTGGGAGGTCCCACTAGTGGGACATAAGGGAAGAGTGATTGTGTATTGTGGTCTCCATGACAATCCAAAGAACTCAATTCATAAAGATGGA","CA>>?;;;==CCCCCAB@;EFEHFHGHCAD==CFBEIIIIGFBBDGGGIGIHCGHEGCGBFEC?<FBF@A9HEGGCIHGFFCAF?GIIFDHHFD?DDD@?<",93683,93683,"-",3505}] |
| "HWI-ST486:305:C0RH5ACXX:1:2104… | ["1", "1"] | [50481, 50621] | ["+", "-"] | [3056, 3122] | [{99,"1",50481,40,"97M","1",50521,141,"CTGCAACCGGAGCACCACCTCCACCACCAGCACCAACGTTCATGATGAGATCAGTCACGTTACGTTTCTCAGCCATCTTGGCGAATAGCATTGGCCA","@?<DDDDDD?:CCGHHIGIIICGGGHBBF=DFDBD;F@F==;=CGI9F@GGICG@AEHH>DC;A;;;6;>;>5=ACCC@(5:?B59?3(4>AA:9@C",50577,50481,"+",3056}, {147,"1",50521,40,"101M","1",50481,-141,"CATGATGAGATCAGTCACGTTACGTTTCTCAGCCATCTTGGCGAATAGCATTGGCCAGTATGACTCAATACTAACACCAGCAGCTTTCACCAAGGTCGCGA",":AC>3::>44(@C8<<C?1A95B>?A@BA?C>DE@FFHE>B@HC;CF>GCB=8=FBFGEFF@DGEHD??*FAEF?1<<EC9GCEBFFE<DFA8<<DB=@@@",50621,50621,"-",3122}] |
| "HWI-ST486:305:C0RH5ACXX:1:1104… | ["1", "1"] | [50552, 50925] | ["+", "-"] | [3681, 3535] | [{163,"1",50552,40,"80M251N21M","1",50574,123,"GCCATCTTGGCGAATAGCATTGGCCAGTATGACTCAATACTAACACCAGCAGCTTTCACCAAGGTCGCGATTTTGTCAGCCGTGATAGCGATACCCTCGTC","CCCFFFFFHGHHHIJJEIIJJJJIJII?EAHBGIIIIGGIGGIIIEGGG?=FDGIJEHJJGIIJCEHFFDBBDDCACDDDDDBDDDEDABBDDBCCC?@28",50903,50552,"+",3681}, {83,"1",50574,40,"58M251N43M","1",50552,-123,"GCCAGTATGACTCAATACTAACACCAGCAGCTTTCACCAAGGTCGCGATTTTGTCAGCCGTGATAGCGATACCCTCGTCCTCGAGGATCATAACAGCGTAG","8CDCDDAC@>CCADB>4@:<2C?ADCDCA=CACADCDDCDBDDBCA>CFFEFHCGIJJJIJJIIIIIHGB8IHG<IGEJIIJJIJJIJGHDDAFFFDFC@@",50925,50925,"-",3535}] |
| "HWI-ST486:305:C0RH5ACXX:1:2307… | ["1", "1"] | [50594, 50970] | ["+", "-"] | [3772, 3786] | [{163,"1",50595,40,"1S37M251N63M","1",50619,126,"GCACCAGCAGCTTTCACCAAGGTCGCGATTTTGTCAGCCGTGATAGCGATACCCTCGTCCTCGAGGATCATAACAGCGTAGCTGCAAGCAAGCTCTCCAAC","CCCFFFFFHHGHHJJIIJJJJJFHHJJHIJJJJHIJIJJJHIIIJJIIGIBEHHHFFEDDDEDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDD",50945,50594,"+",3772}, {83,"1",50619,40,"13M251N83M5S","1",50595,-126,"CGATTTTGTCAGCCGTGATAGCGATACCCTCGTCCTCGAGGATCATAACAGCGTAGCTGCAAGCAAGCTCTCCAACTGTCGACATTTTTTTATCCTAAGAG","<CADDDDDDDDDDDDDDDDDDDDC?<BBB?DDBDDDFFFFHHHHHHHHJJJJJJJJJJJJJJJJJJJJIJJJJIIJHIGIIHIIIJJJHGHHHFFFFFCCC",50965,50970,"-",3786}] |
| "HWI-ST486:305:C0RH5ACXX:1:1206… | ["1", "1"] | [50488, 50602] | ["+", "-"] | [3457, 2871] | [{99,"1",50488,40,"101M","1",50513,115,"CGGAGCACCACCTCCACCACCAGCACCAACGTTCATGATGAGATCAGTCACGTTACGTTTCTCAGCCATCTTGGCGAATAGCATTGGCCAGTATGACTCAA","<@@=DDFFD?FAHEEEHEGHGGIHHGD@GCBB@GH?B?DHFHA@FBFBCEGGGHGCGGGHHEEEHE=?CEDFC>A<>>@?@AC;>>@ACCB4:CDCCCCCC",50588,50488,"+",3457}, {147,"1",50513,40,"90M","1",50488,-115,"CCAACGTTCATGATGAGATCAGTCCCGTTACGTTTCTCAGCCATCTTGGCGAATAGCATTGGCCAGTATGACTCAATACTAACACCAGCA","BA<9BDC>;;(3A@>>CCCCDBA:'CAC;8@JIIGCGBFB<DB8)HGG>ABDIFEE3FEGFF<?+EJAFFFEGE@>FAHF?DDDDD?;??",50602,50602,"-",2871}] |
Build deduplication keys#
Build a deduplication key for each read pair and filter out unpaired reads.
pairs_df = pairs_df.with_columns(
pl.struct(["rnames", "5p_starts", "strands"])
.map_elements(
lambda s: build_dedup_key(s["rnames"], s["5p_starts"], s["strands"]),
return_dtype=pl.String
)
.alias("dedup_key"),
).filter(pl.col("dedup_key").is_not_null())
pairs_df[("dedup_key",)].head()
WARNING: read is missing pair: [('1', 99981, '+')]
WARNING: read is missing pair: [('1', 99987, '+')]
| dedup_key |
|---|
| str |
| "1:93524:+__1:93683:-" |
| "1:50481:+__1:50621:-" |
| "1:50552:+__1:50925:-" |
| "1:50594:+__1:50970:-" |
| "1:50488:+__1:50602:-" |
Resolve duplicates#
We choose the best read pair across duplicates by the highest total quality score. Sorting by dedup_key first minimizes the shuffle when the data is already coordinate-sorted.
best_pairs_df = pairs_df.sort(
["dedup_key", "total_qualities"], descending=[False, True]
).unique(
subset=["dedup_key"]
)
# Get the total number of duplicates
total_pair_dups = pairs_df.height - best_pairs_df.height
print("Total pair duplicates:", total_pair_dups)
best_pairs_df.head()
Total pair duplicates: 3123
| qname | rnames | 5p_starts | strands | total_qualities | alignments | dedup_key |
|---|---|---|---|---|---|---|
| str | list[cat] | list[i64] | list[str] | list[i64] | list[struct[14]] | str |
| "HWI-ST486:305:C0RH5ACXX:1:1105… | ["1", "1"] | [10761, 10937] | ["+", "-"] | [3934, 4016] | [{163,"1",10761,40,"101M","1",10837,177,"GAAAATTATGATCCGTAGAGACAGCATTTAAAAGTTCCTTACGTCCACGTAAAATAATATATCAATTTATACATATACATGTGTAAACTGTGTATATATAG","CCCFFFFFHHGHHJJGIJIJJIHHGGIJJJJJJIHHIJJJJJIGIJJJIIGHFHIJJJJJJJJJIJJJIJJJJJJHHHHHHFHFFFFFFFCE>CFEFEEFC",10861,10761,"+",3934}, {83,"1",10837,40,"101M","1",10761,-177,"ACATGTGTAAACTGTGTATATATAGGGTAGGTATATGTGTATATATATAGTAATTGACAAATGATTTAGGTTCTAACATATATTCTAAAAGTACTCATGAG","ADFFFFFFHHEHHJIIJJJIJJJJJJIIJIJGJJJJJJIJJJIJJJIJJIJJJIJIJJJJJJJJJJJJJJJJIJJJJJJJJJJJJIJJHHHHFFFFFFCC@",10937,10937,"-",4016}] | "1:10761:+__1:10937:-" |
| "HWI-ST486:305:C0RH5ACXX:1:2104… | ["1", "1"] | [1107, 8148] | ["+", "-"] | [3909, 3977] | [{99,"1",1150,40,"43S13M6872N45M","1",8048,170,"CAGACAGGAACTAGCAATGCTTGAAATCAAGAACTTGAATTGAAATAGTTTTTTACTGGATCAGAGACTACTCAATATCCCCAAACTTGGAAATTAGTTTG","CCCFFFFFHHHHHJIJJJJIJJJJJJJJJJJIJJJJJJIJJJJJJJJJBGGJJJJJJJJIIJJJJJJJIHHHHHFFFFFFFDEDEDDDDDCDCCDDDCCED",8079,1107,"+",3909}, {147,"1",8048,40,"101M","1",1150,-170,"ACTCAATATCCCCAAACTTGGAAATTAGTTTGTTGCTTGAGGTCTAAGATACTTCTATATATGGAAAAAGATTTTCAAAGCCAGATATTTCCACAAGTTTG","ADDEFDDDDDDFFFFEHEHHHJIJJJJJJJJIJJJJJJJJJIJJJJJJIHHJJJIJJIJJJJJJJJJJJJJJJJJJJJJJJJIJJJJJHHHHFFFFFFCCC",8148,8148,"-",3977}] | "1:1107:+__1:8148:-" |
| "HWI-ST486:305:C0RH5ACXX:1:1206… | ["1", "1"] | [11368, 11668] | ["+", "-"] | [3522, 3900] | [{163,"1",11369,40,"1S100M","1",11567,301,"TGGCACAGAGAGTACAATTCATGAAATTTATAAGCTTTTTTCCCACTCATCAATTATAATCTCAAGTTATAAATATCAAAACTGAAAAAAGAAGAAGATGA","?B@ADDFFFDDF+A:EGGHEDF@HEHEHIGHGGCD?CFGGDAD<FFH@FGCFHEDHG>BGBGBHCHFHGGIIGIECE>;AEH;@BCEFEDBDD?>CCCD@:",11468,11368,"+",3522}, {83,"1",11567,40,"90M1D11M","1",11369,-301,"AGTTATAGCGAATATTATGGATATAATTAGCTAACATTTGAGGCATGTGAACCTGTTATTTTATGTAAATTATATATATAGTTTATATACAAATTGAAAAG","DDDFFFFEEHHGIIJIHIGEGIJJJIIIJHGIHIEGHEGIGGEJJIHEIJIHEGGGIHIIHIJIIHIJIJJGGIJJHIJGJGIJIIHGAHHHFFFFFF@@C",11668,11668,"-",3900}] | "1:11368:+__1:11668:-" |
| "HWI-ST486:305:C0RH5ACXX:1:2302… | ["1", "1"] | [11483, 11608] | ["+", "-"] | [3953, 3986] | [{99,"1",11483,40,"101M","1",11508,126,"ATGAAATGCTGTAGGCACAGAGGTTCACCTAGTGTCAAGTATTAAGATTAACATATAACTTATGAAGATGATGAGTAACCAACGAGTTATAGCGAATATTA","CCCFFFFFHHHHHJJJJJJJIJJGGIJJJIJJFHHIJJJIIJIJJJJIJJJJJJJJJJJJJJJJJJJJJIIJJJIGHIJIJJHGFF@DEEECEDBDDDEEC",11583,11483,"+",3953}, {147,"1",11508,40,"101M","1",11483,-126,"CACCTAGTGTCAAGTATTAAGATTAACATATAACTTATGAAGATGATGAGTAACCAACGAGTTATAGCGAATATTATGGATATAATTAGCTAACATTTGAG","AADEEEEFFFFFFHHHHHHHJJJJIHJJJJJIGJIIIIJJJJJJIJJJIJIHFJIJIJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJHHHHHFFFFFCCC",11608,11608,"-",3986}] | "1:11483:+__1:11608:-" |
| "HWI-ST486:305:C0RH5ACXX:1:2304… | ["1", "1"] | [11582, 11716] | ["+", "-"] | [4000, 4000] | [{163,"1",11583,40,"1S18M1D82M","1",11616,135,"CATGGATATAATTAGCTAAATTTGAGGCATGTGAACCTGTTATTTTATGTAAATTATATATATAGTTTATATACAAAATTGAAAAGATGCGAGTTTCAACA","CCCFFFFFHHHHHJJJJJJJJJJJJJJIJIIHIJJJJJJIJJJJJJJJJIIIJJJJJJJJJJJJJIIJJJJJJJJJIJIJJGJJJHHHHHFFDCCDEEDDC",11683,11582,"+",4000}, {83,"1",11616,40,"101M","1",11583,-135,"AACCTGTTATTTTATGTAAATTATATATATAGTTTATATACAAAATTGAAAAGATGCGAGTTTCAACATGGTGACAAAAGCCTAATGATGATGAACATCAA","DCCEEEFFFFFFHHGHHHHHJJIJJJIJJJIJJJJJJJJIJJJIJJJIJJIJJJJJJJJJJIIIJJJJJJJJJHJIJJJJJJJJJJJJHHHGHFFFFFCCC",11716,11716,"-",4000}] | "1:11582:+__1:11716:-" |
Recover deduplicated alignments#
Explode and unnest the carried alignment records to recover the original fields.
deduped_df = best_pairs_df.select(
"qname", "alignments"
).explode("alignments").select(
pl.col("alignments").struct.unnest()
)
deduped_df.head()
| flag | rname | pos | mapq | cigar | rnext | pnext | tlen | seq | qual | end | 5p_start | strand | total_quality |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| u16 | cat | i32 | u8 | str | cat | i32 | i32 | str | str | i32 | i64 | str | i64 |
| 163 | "1" | 10761 | 40 | "101M" | "1" | 10837 | 177 | "GAAAATTATGATCCGTAGAGACAGCATTTA… | "CCCFFFFFHHGHHJJGIJIJJIHHGGIJJJ… | 10861 | 10761 | "+" | 3934 |
| 83 | "1" | 10837 | 40 | "101M" | "1" | 10761 | -177 | "ACATGTGTAAACTGTGTATATATAGGGTAG… | "ADFFFFFFHHEHHJIIJJJIJJJJJJIIJI… | 10937 | 10937 | "-" | 4016 |
| 99 | "1" | 1150 | 40 | "43S13M6872N45M" | "1" | 8048 | 170 | "CAGACAGGAACTAGCAATGCTTGAAATCAA… | "CCCFFFFFHHHHHJIJJJJIJJJJJJJJJJ… | 8079 | 1107 | "+" | 3909 |
| 147 | "1" | 8048 | 40 | "101M" | "1" | 1150 | -170 | "ACTCAATATCCCCAAACTTGGAAATTAGTT… | "ADDEFDDDDDDFFFFEHEHHHJIJJJJJJJ… | 8148 | 8148 | "-" | 3977 |
| 163 | "1" | 11369 | 40 | "1S100M" | "1" | 11567 | 301 | "TGGCACAGAGAGTACAATTCATGAAATTTA… | "?B@ADDFFFDDF+A:EGGHEDF@HEHEHIG… | 11468 | 11368 | "+" | 3522 |
Full streaming pipeline#
Here’s the entire deduplication pipeline chained together on a Polars LazyFrame:
ds = ox.from_sam(url)
ldf = ds.to_polars(lazy=True).with_columns(
pl.struct(["pos", "cigar", "flag"])
.map_elements(get_unclipped_5p_start, return_dtype=pl.Int64)
.alias("5p_start"),
pl.when((pl.col("flag") & STRAND_BIT) == 0)
.then(pl.lit("+"))
.otherwise(pl.lit("-"))
.alias("strand"),
pl.col("qual").map_elements(get_quality_score_sum, return_dtype=pl.Int64)
.alias("total_quality")
).group_by("qname").agg(
[
pl.col("rname").alias("rnames"),
pl.col("5p_start").alias("5p_starts"),
pl.col("strand").alias("strands"),
pl.col("total_quality").alias("total_qualities"),
pl.struct(ds.schema.names).alias("alignments"),
]
).with_columns(
pl.struct(["rnames", "5p_starts", "strands"])
.map_elements(
lambda s: build_dedup_key(s["rnames"], s["5p_starts"], s["strands"]),
return_dtype=pl.String
)
.alias("dedup_key"),
).filter(
pl.col("dedup_key").is_not_null()
).sort(
["dedup_key", "total_qualities"], descending=[False, True]
).unique(
subset=["dedup_key"]
).select(
"qname", "alignments"
).explode(
"alignments"
).select(
pl.col("alignments").struct.unnest()
)
ldf.show_graph()
Let’s execute the query plan in streaming mode, writing the results to a Parquet file:
ldf.sink_parquet("data/Col0_C1.100k.dedup.pq")
WARNING: read is missing pair: [('1', 99981, '+')]
WARNING: read is missing pair: [('1', 99987, '+')]