fix+perf: eliminate data races & replace all synchronization with Highway futex (zero mutex, wall -7%, sys -51%)#684
Open
KimBioInfoStudio wants to merge 18 commits intoOpenGene:masterfrom
Conversation
mNextSeq was a plain size_t array written by worker threads in inputPwrite() and read by setInputCompletedPwrite() with no synchronization -- a C++ data race (undefined behaviour). A stale read could produce a wrong lastSeq value, causing ftruncate() to silently truncate the output file at the wrong offset and drop the final gz member(s). Fix: change mNextSeq to std::atomic<size_t>[]. - Worker threads write with memory_order_release after each pack, establishing a happens-before edge for the completion reader. - setInputCompletedPwrite() opens with an acquire fence before reading with memory_order_relaxed, ensuring all prior worker writes are visible before the ftruncate() call.
The `head` pointer in SingleProducerSingleConsumerList was a plain
(non-atomic) pointer despite being written by the producer thread
(produce(), first-item branch) and read concurrently by the consumer
thread (canBeConsumed(), consume()). ThreadSanitizer reported 15 data
races at singleproducersingleconsumerlist.h:100.
Fixes applied:
- `head` declared as `std::atomic<LockFreeListItem<T>*>` (tail stays
non-atomic — producer-private after first item is published)
- Constructor: `head.store(NULL, relaxed)`
- produce() first-item branch:
set tail = item first (producer-private write),
then `head.store(item, release)` to publish atomically to consumer
then `item->nextItemReady.store(true, release)` to signal readiness
- canBeConsumed():
`head.load(acquire)` for NULL check (syncs with produce release),
`head.load(relaxed)` for nextItemReady dereference (covered by
the preceding acquire)
- consume():
`head.load(acquire)` to read current head,
`head.store(h->nextItem, release)` to advance — establishes
happens-before with next canBeConsumed() acquire on head
Also fixes the else-branch nextItemReady assignment to use
`memory_order_release` (was implicit seq_cst, which does NOT prevent
compiler reordering of the preceding `tail->nextItem = item` write).
ThreadSanitizer reported data races in ReadPool and SPSC when multiple
worker threads called ReadPool::input() concurrently:
readpool.cpp:23 — mIsFull read vs. updateFullStatus() write
readpool.cpp:27 — mProduced++ (non-atomic RMW) by multiple threads
readpool.cpp:53 — mIsFull write vs. concurrent reads
spsc.h:90 — size(): produced (producer-written) vs. consumed
(consumer-written) read without synchronization
Fixes in readpool.h:
- mIsFull : bool → std::atomic<bool>
- mProduced : size_t → std::atomic<size_t>
(atomic::operator++ and atomic::operator= are sufficient;
no changes to readpool.cpp required)
Fixes in singleproducersingleconsumerlist.h:
- produced, consumed : unsigned long → std::atomic<unsigned long>
- size(): load both with memory_order_relaxed (approximate count used
only as a soft back-pressure threshold)
- produce(): produced.fetch_add(1, relaxed)
- consume(): consumed.fetch_add(1, relaxed) with local snapshot for
the (consumed & 0xFFF) recycle check
- makeItem(): produced.load(relaxed) snapshot before >> and & ops
- recycle(): consumed.load(relaxed) before >> op
After all four commits (mNextSeq, SPSC head, ReadPool/SPSC atomics),
ThreadSanitizer reports zero data races on 5k-read PE mode 8-thread
workload.
- Use /home/kimy/build-env g++ (GCC 15.2.0, conda-forge)
- Upgrade -std=c++11 -> -std=c++23
- Default INCLUDE_DIRS and LIBRARY_DIRS to build-env paths
Enables: std::jthread, std::latch, std::counting_semaphore,
atomic::wait/notify, std::println
…emove manual join/delete
…re in writer output loop
…quire mismatch with N-producer round-robin
Replace all std::this_thread::yield() busy-spin loops with hwy::BlockUntilDifferent/WakeAll from Highway's futex.h polyfill. This provides cross-platform kernel-level thread blocking (Linux futex, macOS __ulock, FreeBSD NanoSleep fallback) instead of CPU-burning spins. Changes: - writerthread: output() waits on mBufferLength via BlockUntilDifferent, input() wakes writer via WakeAll after produce - writerthread.h: add waitForBufferBelow() using BlockUntilDifferent loop - peprocessor: replace 6 yield() sites with atomic wait/notify on mPackProducedCounter and mPackProcessedCounter - seprocessor: same pattern as peprocessor for SE pipeline - Change counter types from atomic_long to atomic<uint32_t> for Highway futex compatibility (uint32_t required by BlockUntilDifferent) Benchmark (5M PE reads, gz→gz, -w 3): master: 56.5s wall, 8.0s sys, 2680K page-faults yield (before): 79.4s wall, 26.8s sys, 3278K page-faults futex (after): 47.8s wall, 1.4s sys, 120K page-faults wall -15%, sys -82%, page-faults -95% vs master Output md5 matches master (correctness verified)
439cdc0 to
2f33c85
Compare
- std::jthread → std::thread + explicit join() - std::latch → atomic<uint32_t> + hwy futex wait/wake - std::println → cerr << - Remove #include <print>, <latch>; use <iostream>, <atomic> - Makefile: -std=c++23 → -std=c++11 Preserves Highway futex performance (sys ~1.7s, page-faults ~150K). Apple Clang on macOS CI does not support jthread/latch/println.
BlockUntilDifferent(prev, atom) only returns when atom != prev. When mBufferLength stays 0, WakeAll cannot break the loop. Fix: use separate mWriterNotify counter for writer thread blocking. - input(): increments mWriterNotify + WakeAll to wake writer - setInputCompleted(): increments mWriterNotify + WakeAll to wake writer - output(): blocks on mWriterNotify instead of mBufferLength Also move setInputCompleted() back to last worker thread (not main thread), matching the original master pattern. This avoids a race where main thread waits on latch while writer is already blocked. Verified: SE smoke ✅, PE smoke ✅, PE benchmark wall -10%, sys -71%.
The pwrite ring buffer used std::this_thread::sleep_for(1µs) to poll for the previous slot's published_seq. Replace with hwy::BlockUntilDifferent + hwy::WakeAll for precise wakeup. Changes: - OffsetSlot::published_seq: atomic<size_t> → atomic<uint32_t> (seq values are small; uint32_t required by Highway futex API) - Wait loop: sleep_for(1µs) → BlockUntilDifferent(cur, published_seq) - Publish: store + WakeAll to notify waiting workers
Replace all std::mutex + std::condition_variable synchronization in BgzfMtReader with lock-free Highway futex primitives: - Per-slot atomic<uint32_t> state: BlockUntilDifferent waits for state transitions (FREE→COMPRESSED→DECOMPRESSING→READY→FREE) - Global mSlotNotify counter: incremented on every state transition, used by decompressor threads to block when no COMPRESSED slots are available (replaces condvar broadcast) - Remove <mutex>/<condition_variable> includes from pe/seprocessor.h and writerthread.h (no longer used anywhere in the pipeline) This eliminates all kernel mutex contention in the BGZF decompression pipeline, which is the hot path for .gz input files.
Member
|
I tested this PR, but it caused deadlock when the input is a gz file, and it showed no performance enhancement against v1.3.2. Tested on MacBook (M3 chip). |
Member
Author
|
@sfchen this not for perf, but for deadlock fix, let me continue to triage the deadlock |
Make mInputCompleted atomic with acquire/release ordering to fix a race between producer and writer threads. Replace pwrite ring's published_seq with a monotonic generation counter to prevent ABA on slot reuse. Wake producers after buffer-length decrement so they unblock promptly. 🐘 Generated with Crush Co-Authored-By: Crush <[email protected]>
…up races Three connected thread-synchronization bugs caused fastp to hang under -w>=23 + plain (non-gz) output + --adapter_fasta. 1. Mid-flight deadlock: reader gated on mLeftWriter->waitForBufferBelow, writer drained mBufferLists in strict round-robin. When one worker ran slightly behind, its per-worker slot stayed empty while other slots piled up, pushing mBufferLength above the limit. The reader then halted at waitForBufferBelow, so the slow worker never received more input, its slot never filled, the writer stayed blocked, and every thread deadlocked. Confirmed by stack sample: 24 workers in peprocessor.cpp:1003, 2 readers in peprocessor.cpp:807, 2 writers in writerthread.cpp:110. Removed the writer-buffer backpressure — the pack-level backpressure (mLeftPackReadCounter - mPackProcessedCounter) already bounds in-flight memory without creating the cycle. 2. Reader-shutdown lost wakeup: readerTask/interleavedReaderTask/SE readerTask called setProducerFinished() without bumping mPackProducedCounter. A worker that had just snapshotted the counter in BlockUntilDifferent would miss the completion signal and sleep forever. Added a counter bump + WakeAll after setProducerFinished. 3. Writer-shutdown lost wakeup: WriterThread::output() checked mInputCompleted before snapshotting mWriterNotify. If setInputCompleted ran between the check and BlockUntilDifferent, cur captured the post-bump value and the writer blocked forever. Swapped the order so the snapshot is taken first; any subsequent bump is then guaranteed to make cur \!= current and return immediately. Verified on macOS ARM64 with 10M simulated pairs, -w 24, plain fq, --adapter_fasta: previously hung indefinitely, now completes in 38s. 🤖 Generated with Claude Code Co-Authored-By: Claude <[email protected]>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
This PR eliminates all data races found by ThreadSanitizer, then replaces every mutex, condition variable,
usleep(), andsleep_for()busy-wait in fastp's threading pipeline with lock-freehwy::BlockUntilDifferent/hwy::WakeAllfrom Highway's cross-platform futex polyfill.The result is a zero-mutex, zero-condvar threading architecture that is both faster and more correct than master.
Data Race Fixes (TSan verified)
cf64a66mNextSeqin pwrite pathb32ea3amHeadpointer atomic6316345ReadPoolcounters (mProduced/mConsumed) atomicSynchronization Replacements
yield()spinBlockUntilDifferenton pack countersyield()spinBlockUntilDifferenton buffer lengthoutput()wait for datausleep(100)BlockUntilDifferentonmWriterNotifycounterinput()notifyfetch_add+WakeAllonmWriterNotifysleep_for(1µs)spinBlockUntilDifferentonpublished_seqmutex+condition_variableatomic<uint32_t>state +mSlotNotifycountersetInputCompleted()bumpsmWriterNotify+WakeAllCounter types:
atomic_long/atomic<size_t>→atomic<uint32_t>(required by Highway futex API).All
<mutex>and<condition_variable>includes removed frompeprocessor.h,seprocessor.h,writerthread.h.Key Design:
mWriterNotifyPatternHighway's
BlockUntilDifferent(prev, atom)only returns whenatom != prev—WakeAllalone cannot break the loop if the value is unchanged. This caused a deadlock whenmBufferLengthstayed 0 at completion time.Fix: a separate
mWriterNotifymonotonic counter that gets incremented by bothinput()(new data) andsetInputCompleted()(EOF). The value always changes, soBlockUntilDifferentalways returns.Platform Support
Highway's
futex.hprovides native kernel-level blocking on all target platforms:SYS_futex(kernel 2.6.22+)__ulock_wait(10.12+)NanoSleepfallbackCompatible with C++11 and above. No new dependencies — fastp already links Highway.
Benchmark (5M PE reads, 150bp, gz→gz,
-w 3)Correctness
Output MD5 matches master exactly for both R1 and R2 (gz→gz, PE mode). ✅
Supersedes #683 (all 3 race-condition fixes included).
Commits
cf64a66b32ea3a6316345fe5d994→04452e5548135611c0249f5f4bca2f348b3