Skip to content

Hottier fix#1645

Open
parmesant wants to merge 17 commits into
parseablehq:mainfrom
parmesant:hottier-fix
Open

Hottier fix#1645
parmesant wants to merge 17 commits into
parseablehq:mainfrom
parmesant:hottier-fix

Conversation

@parmesant
Copy link
Copy Markdown
Contributor

@parmesant parmesant commented May 8, 2026

Introduces multiple changes to the hottier flow

  • Split historic data sync and latest data sync into two tasks
  • Each stream gets its own hot-tier tasks
  • File downloads are parallel and chunked

New env vars-

  • P_HOT_TIER_DOWNLOAD_CHUNK_SIZE
  • P_HOT_TIER_DOWNLOAD_CONCURRENCY
  • P_HOT_TIER_FILES_PER_STREAM_CONCURRENCY
  • P_HOT_TIER_LATEST_MINUTES
  • P_HOT_TIER_HISTORIC_SYNC_MINUTES

Description


This PR has:

  • been tested to ensure log ingestion and log query works.
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added documentation for new or modified features or behaviors.

Summary by CodeRabbit

  • New Features

    • Additional CLI options to tune hot-tier downloads (chunk size, download concurrency, per-stream file concurrency, latest/historic windows).
    • Parallel ranged downloads with partial-file staging added across storage backends.
    • New billing and in-flight storage metrics for partial-file/chunk scans and request concurrency.
    • Hot-tier updates now trigger background sync tasks automatically.
  • Improvements

    • Hot-tier sync refactored to per-stream, phase-aware background loops with safer eviction, disk budgeting, and state reconciliation.
    • Bounded-concurrency downloads with improved failure handling and tracing instrumentation.

Review Change Stack

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 8, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • ✅ Review completed - (🔄 Check again to review again)

Walkthrough

Per-stream hot-tier sync now runs Latest/Historic background loops; ObjectStorage adds async buffered_write and partial-path helper. S3/GCS/Azure implement staged parallel ranged downloads; LocalFS supports atomic partial writes. New partial-file scan metrics and CLI tuning options were added; HTTP handler spawns stream tasks after config changes.

Changes

Hot-Tier Concurrent Download Architecture

Layer / File(s) Summary
CLI Configuration
src/cli.rs
Five new hot-tier tuning parameters: chunk size, download concurrency, per-stream file concurrency, Latest time window (minutes), and historic sync interval (minutes).
Partial Scan Metrics
src/metrics/mod.rs
New IntCounterVec PARTIAL_FILE_SCANS_IN_OBJECT_STORE_CALLS_BY_DATE; registered in custom_metrics and an increment helper added.
Storage Trait & Utilities
src/storage/object_storage.rs, src/storage/mod.rs
ObjectStorage gains buffered_write; cross-platform positional write helper write_all_at and partial_path helper compute .partial sibling filenames for atomic staged writes.
LocalFS Buffered Write
src/storage/localfs.rs
LocalFS::buffered_write copies to .partial then renames to final path; cleans up partial on error.
S3 Parallel Download
src/storage/s3.rs
Adds staged _parallel_download/_parallel_download_inner: HEAD sizing, pre-allocated partial, semaphore-limited ranged GETs, writes at offsets via write_all_at, fsync, rename; buffered_write delegates to it.
GCS Parallel Download
src/storage/gcs.rs
Implements parallel ranged download helpers and buffered_write with HEAD sizing, range GET concurrency, offset writes, sync, and rename; partial-scan metrics incremented.
Azure Blob Parallel Download
src/storage/azure_blob.rs
Wraps client in Arc and adds _parallel_download/inner for ranged concurrent downloads, offset writes, metrics, and buffered_write.
Hot-Tier Manager Types & Caching
src/hottier.rs
Adds per-(tenant,stream) StreamSyncState with async mutex, SyncPhase enum, state_cache and get_or_load_state/invalidate_state for cached reconciliation; imports adjusted for async primitives.
Reconcile, Boot & Phase Processing
src/hottier.rs
reconcile_stream removes .partial orphans and reconciles manifests; download_from_s3 discovers streams and spawns Latest/Historic loops; process_stream/process_manifest/process_parquet_file_concurrent implement phase-aware work lists and reservation/commit flow; eviction tracks freed bytes and skips newer candidates; delete_hot_tier aborts running stream tasks before cleanup.
HTTP Handler Integration
src/handlers/http/logstream.rs
put_stream_hot_tier calls spawn_stream_tasks after persisting hot-tier config to start background loops.

Sequence Diagram(s)

sequenceDiagram
  participant Client
  participant Handler
  participant HotTierManager
  participant ObjectStorage
  Client->>Handler: PUT hot-tier config
  Handler->>HotTierManager: persist config
  Handler->>HotTierManager: spawn_stream_tasks(stream, tenant)
  HotTierManager->>ObjectStorage: buffered_write(file path)
  ObjectStorage->>ObjectStorage: parallel ranged GETs -> write .partial -> fsync -> rename
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Suggested labels

for next release

Suggested reviewers

  • nikhilsinhaparseable
  • nitisht

Poem

🐰 I tunneled through manifests and streams,
chunked the bytes into tidy beams.
I reserved the space, fetched every part,
synced and renamed — a tiny art.
Background loops hum; hot-tier dreams.

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 inconclusive)

Check name Status Explanation Resolution
Title check ❓ Inconclusive The title 'Hottier fix' is vague and generic; it does not convey specific information about the substantial changes made to hot-tier architecture and parallel download implementation. Revise the title to specifically describe the main changes, such as 'Refactor hot-tier sync to per-stream tasks with parallel chunked downloads' or similar.
✅ Passed checks (4 passed)
Check name Status Explanation
Description check ✅ Passed The description covers the main objectives (per-stream tasks, split sync phases, parallel chunked downloads) and lists new environment variables; the template checklist items remain unchecked as expected for incomplete work.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Tip

💬 Introducing Slack Agent: The best way for teams to turn conversations into code.

Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.

  • Generate code and open pull requests
  • Plan features and break down work
  • Investigate incidents and troubleshoot customer tickets together
  • Automate recurring tasks and respond to alerts with triggers
  • Summarize progress and report instantly

Built for teams:

  • Shared memory across your entire org—no repeating context
  • Per-thread sandboxes to safely plan and execute work
  • Governance built-in—scoped access, auditability, and budget controls

One agent for your entire SDLC. Right inside Slack.

👉 Get started


Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 8

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/handlers/http/logstream.rs (1)

455-474: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Start the background sync only after the hot-tier config is durably persisted.

spawn_stream_tasks() runs before put_stream_json(). If the metastore write fails, this request returns an error even though the in-memory hot-tier state and background tasks are already live, so the feature is partially enabled.

Suggested fix
     hot_tier_manager
         .put_hot_tier(&stream_name, &mut hottier, &tenant_id)
         .await?;
-    hot_tier_manager
-        .spawn_stream_tasks(stream_name.clone(), tenant_id.clone())
-        .await;

     let mut stream_metadata: ObjectStoreFormat = serde_json::from_slice(
         &PARSEABLE
             .metastore
             .get_stream_json(&stream_name, false, &tenant_id)
@@
     PARSEABLE
         .metastore
         .put_stream_json(&stream_metadata, &stream_name, &tenant_id)
         .await?;
+
+    hot_tier_manager
+        .spawn_stream_tasks(stream_name.clone(), tenant_id.clone())
+        .await;
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/handlers/http/logstream.rs` around lines 455 - 474, The code starts
background tasks via hot_tier_manager.spawn_stream_tasks(...) before persisting
the updated stream metadata; reorder operations so you first update
stream_metadata.hot_tier_enabled and hot_tier, call
PARSEABLE.metastore.put_stream_json(&stream_metadata, &stream_name,
&tenant_id).await? to durably persist the change, and only after that call
hot_tier_manager.spawn_stream_tasks(stream_name.clone(),
tenant_id.clone()).await; keep the existing hot_tier_manager.put_hot_tier(...)
call but ensure the metastore write succeeds before spawning tasks to avoid
partially enabled state.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/cli.rs`:
- Around line 341-355: Make these clap args enforce valid ranges at parse time:
for the field hot_tier_latest_minutes (i64) add a value_parser that requires >=
1 (e.g. value_parser = clap::value_parser!(i64).range(1..)) so negatives/zero
are rejected, and for hot_tier_historic_sync_minutes (u32) add a value_parser
that requires >= 1 (e.g. value_parser = clap::value_parser!(u32).range(1..)) so
zero is rejected; update the #[arg(...)] attributes for those two fields in the
CLI struct accordingly.

In `@src/hottier.rs`:
- Around line 620-621: The cutoff boundary is recomputed separately causing race
classification and duplicate commits; instead compute a single stable cutoff
once per tick and use that same value in both phases (replace each recomputation
of latest_minutes/cutoff such as where PARSEABLE.options.hot_tier_latest_minutes
and chrono::Utc::now() are used at lines around the listed ranges) or implement
an in-flight claim keyed by file_path that both phases consult before
reserving/committing a parquet (i.e., add a shared claim
set/claim_file(file_path) check before parquet_path.exists() reservation and
before commit to prevent double-reserve/commit and duplicate entries in
hottier.manifest.json).
- Around line 109-127: The cached StreamSyncState returned by get_or_load_state
can become stale; change get_or_load_state so that when a state exists in
state_cache you still reconcile and refresh it before returning: call
reconcile_stream(stream, tenant_id).await to obtain the latest sht, create a new
Arc<StreamSyncState> (with AsyncMutex::new(sht)) and replace the existing entry
in state_cache (cache.insert(key, state.clone())) so the returned state reflects
updated .hot_tier.json limits; keep the existing fast-path only if you add an
explicit no-refresh flag, otherwise always refresh the cached entry before
returning from get_or_load_state (references: get_or_load_state, state_cache,
StreamSyncState, reconcile_stream, spawn_stream_tasks).
- Around line 762-820: The current logic treats an exact-size file as "no space"
because both comparisons use <=; change the two comparisons that check space to
strict < so exact-fit reservations are allowed: modify the condition using
self.is_disk_available(parquet_file.file_size).await? || sht.available_size <=
parquet_file.file_size to use sht.available_size < parquet_file.file_size (and
ensure the later if sht.available_size <= parquet_file.file_size becomes if
sht.available_size < parquet_file.file_size); keep the eviction flow
(cleanup_hot_tier_old_data, SyncPhase::Latest/ Historic) and the subsequent
sht.available_size -= parquet_file.file_size and self.put_hot_tier(...) behavior
unchanged.

In `@src/storage/azure_blob.rs`:
- Around line 271-296: The current loop spawns one Tokio task per range
(creating many idle tasks) even though concurrency is limited by semaphore;
instead, drive ranges through futures::stream::iter(ranges).map(|r| { let client
= client.clone(); let src = src.clone(); let std_file = std_file.clone(); let
semaphore = semaphore.clone(); async move { /* acquire permit, call
client.get_range(&src, r.clone()).await, write with spawn_blocking +
std_file.write_all_at(...), map errors to ObjectStorageError as before */ }
}).buffer_unordered(concurrency).collect::<Result<Vec<_>, _>>().await to bound
both task creation and execution; preserve existing error mapping
(ObjectStorageError::Custom with messages like "semaphore closed", "join", "join
error") and keep using r.start for the file offset and client.get_range(&src,
r.clone()) for reads.

In `@src/storage/s3.rs`:
- Around line 342-359: In _parallel_download (and similarly in other
buffered-write backends) ensure the temporary partial file produced by
partial_path(write_path) is removed if the final tokio::fs::rename(&partial,
&write_path).await fails: after calling _parallel_download_inner and before
returning Ok(()) call rename, and if rename returns Err attempt
tokio::fs::remove_file(&partial).await (ignoring its error) and then return the
rename error; also apply the same cleanup logic where write/rename is performed
so orphaned .partial files are always removed on failure.
- Around line 385-389: The code is clamping parsed values with .max(...) which
forces minimums and prevents lowering P_HOT_TIER_DOWNLOAD_CHUNK_SIZE and
P_HOT_TIER_DOWNLOAD_CONCURRENCY; remove the .max(...) calls on
PARSEABLE.options.hot_tier_download_chunk_size and
PARSEABLE.options.hot_tier_download_concurrency so the parsed values are
honored, and if you require hard minimums enforce/validate them at parse time in
the CLI parsing/validation logic that handles the P_HOT_TIER_DOWNLOAD_* options
(rather than silently overriding here).
- Around line 404-416: The parallel download path in _parallel_download_inner()
uses std::os::unix::fs::FileExt and std_file.write_all_at unconditionally (see
the tokio::spawn block that calls client.get_range, then spawn_blocking using
FileExt), which breaks Windows builds; either gate that entire offset-write
branch with #[cfg(unix)] or replace the offset-write logic with a
platform-independent approach (seek to offset on a duplicated/mutex-protected
std::fs::File or use a cross-platform write_at helper) in the S3, GCS, and Azure
Blob implementations (s3.rs, gcs.rs, azure_blob.rs) so the semaphore +
client.get_range + spawn_blocking flow compiles on non-Unix platforms.

---

Outside diff comments:
In `@src/handlers/http/logstream.rs`:
- Around line 455-474: The code starts background tasks via
hot_tier_manager.spawn_stream_tasks(...) before persisting the updated stream
metadata; reorder operations so you first update
stream_metadata.hot_tier_enabled and hot_tier, call
PARSEABLE.metastore.put_stream_json(&stream_metadata, &stream_name,
&tenant_id).await? to durably persist the change, and only after that call
hot_tier_manager.spawn_stream_tasks(stream_name.clone(),
tenant_id.clone()).await; keep the existing hot_tier_manager.put_hot_tier(...)
call but ensure the metastore write succeeds before spawning tasks to avoid
partially enabled state.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: c525c75e-27b3-4de4-a45f-270c9e53c4c5

📥 Commits

Reviewing files that changed from the base of the PR and between 090feb9 and 2b2fe61.

📒 Files selected for processing (10)
  • src/cli.rs
  • src/handlers/http/logstream.rs
  • src/hottier.rs
  • src/metrics/mod.rs
  • src/storage/azure_blob.rs
  • src/storage/gcs.rs
  • src/storage/localfs.rs
  • src/storage/mod.rs
  • src/storage/object_storage.rs
  • src/storage/s3.rs

Comment thread src/cli.rs
Comment thread src/hottier.rs
Comment thread src/hottier.rs Outdated
Comment thread src/hottier.rs
Comment thread src/storage/azure_blob.rs Outdated
Comment thread src/storage/s3.rs
Comment thread src/storage/s3.rs Outdated
Comment thread src/storage/s3.rs Outdated
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
src/storage/s3.rs (1)

401-426: 💤 Low value

All chunk tasks are spawned upfront; consider bounding task creation.

Unlike the Azure implementation which uses buffer_unordered(concurrency), this S3 implementation spawns all ranges.len() tasks immediately. For large files (e.g., 1GB with 8MB chunks = 128 tasks), this creates many idle tasks in the scheduler. The semaphore limits concurrent I/O but not task handle allocation.

Consider using futures::stream::iter(ranges).map(...).buffer_unordered(concurrency) as done in azure_blob.rs for consistent behavior and lower memory overhead.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/storage/s3.rs` around lines 401 - 426, The current loop in the S3
implementation spawns one tokio task per range (creating many idle tasks) which
wastes memory; change the logic to process ranges via a bounded async stream
like futures::stream::iter(ranges).map(...).buffer_unordered(concurrency) (as in
azure_blob.rs) so only up to `concurrency` fetch/write futures are in-flight.
Move the per-range closure that clones `client`, `src`, `std_file`, acquires the
`semaphore`, calls `client.get_range(&src, r.clone()).await`, and writes with
`std_file.write_all_at(&bytes, offset)` into the stream’s map closure, drive it
to completion with .for_each_concurrent or
.buffer_unordered(concurrency).collect/for_each, and remove the Vec of `handles`
and separate join loop.
src/storage/azure_blob.rs (1)

258-262: 💤 Low value

Inconsistent concurrency defaults between storage backends.

Azure uses .max(6) for concurrency while S3 uses .max(16). This inconsistency may confuse users and lead to different performance characteristics. Consider aligning the defaults or documenting the rationale for different values.

Additionally, the .max(...) clamping silently overrides user configuration (same issue as S3).

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/storage/azure_blob.rs` around lines 258 - 262, The Azure code uses
PARSEABLE.options.hot_tier_download_concurrency.max(6) which is inconsistent
with S3's .max(16) and also silently overrides user settings; update this by
aligning the concurrency ceiling with S3 (use the same max value or a shared
constant, e.g., HOT_TIER_DOWNLOAD_CONCURRENCY_MAX = 16) and replace the inline
.max(...) usage with explicit validation/clamping at config parse time (read
PARSEABLE.options.hot_tier_download_concurrency, clamp with
.min(HOT_TIER_DOWNLOAD_CONCURRENCY_MAX) or validate and log/warn/error if out of
range) so users are not silently overridden; apply the same pattern for
hot_tier_download_chunk_size if needed to avoid silent overrides.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/storage/azure_blob.rs`:
- Around line 226-228: The current success arm performs
tokio::fs::rename(&partial, &write_path).await? but doesn't remove the partial
file if the rename fails; update the Ok(()) match arm so the rename is done with
explicit error handling: call tokio::fs::rename(&partial, &write_path).await,
and on Err(e) attempt to remove the partial file via
tokio::fs::remove_file(&partial).await (ignore/remove any error from that
cleanup) and then return the original rename error; use the same identifiers
partial, write_path, tokio::fs::rename and tokio::fs::remove_file to locate and
implement the change.

---

Nitpick comments:
In `@src/storage/azure_blob.rs`:
- Around line 258-262: The Azure code uses
PARSEABLE.options.hot_tier_download_concurrency.max(6) which is inconsistent
with S3's .max(16) and also silently overrides user settings; update this by
aligning the concurrency ceiling with S3 (use the same max value or a shared
constant, e.g., HOT_TIER_DOWNLOAD_CONCURRENCY_MAX = 16) and replace the inline
.max(...) usage with explicit validation/clamping at config parse time (read
PARSEABLE.options.hot_tier_download_concurrency, clamp with
.min(HOT_TIER_DOWNLOAD_CONCURRENCY_MAX) or validate and log/warn/error if out of
range) so users are not silently overridden; apply the same pattern for
hot_tier_download_chunk_size if needed to avoid silent overrides.

In `@src/storage/s3.rs`:
- Around line 401-426: The current loop in the S3 implementation spawns one
tokio task per range (creating many idle tasks) which wastes memory; change the
logic to process ranges via a bounded async stream like
futures::stream::iter(ranges).map(...).buffer_unordered(concurrency) (as in
azure_blob.rs) so only up to `concurrency` fetch/write futures are in-flight.
Move the per-range closure that clones `client`, `src`, `std_file`, acquires the
`semaphore`, calls `client.get_range(&src, r.clone()).await`, and writes with
`std_file.write_all_at(&bytes, offset)` into the stream’s map closure, drive it
to completion with .for_each_concurrent or
.buffer_unordered(concurrency).collect/for_each, and remove the Vec of `handles`
and separate join loop.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: f59df567-5986-4f32-bec1-3415e208fcff

📥 Commits

Reviewing files that changed from the base of the PR and between 2b2fe61 and 43e437c.

📒 Files selected for processing (4)
  • src/handlers/http/logstream.rs
  • src/hottier.rs
  • src/storage/azure_blob.rs
  • src/storage/s3.rs
✅ Files skipped from review due to trivial changes (1)
  • src/hottier.rs

Comment thread src/storage/azure_blob.rs
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/hottier.rs`:
- Around line 860-870: The code currently mutates the cached StreamHotTier (sht)
before persisting, causing in-memory/state divergence if self.put_hot_tier
fails; instead create an updated StreamHotTier copy (clone or new struct)
reflecting the deducted/refunded/committed bytes, call self.put_hot_tier(stream,
&mut updated_sht, tenant_id).await?, and only after that succeeds swap or assign
the updated_sht back into sht; apply the same pattern for the other two spots
mentioned (the blocks around lines 909-913 and 935-937) so all mutations are
persisted before updating the in-memory cache.
- Around line 216-249: The reconciliation currently only reads the immediate
date_dir entries in drop_partials, so it misses nested hour=/minute=
subdirectories and uses file_name() which collides; change drop_partials to walk
the date_dir recursively (e.g., async recursive traversal) so you inspect files
in all subdirs, detect and remove any file ending with ".partial" anywhere under
date_dir, skip ".manifest.json" files, and insert the file's path relative to
date_dir (not file_name()) into on_disk (use a relative path string like
relative_path.to_string_lossy().into_owned()); make the same
recursive/relative-path change in the other similar block around lines 277-280
that builds the on_disk set.
- Around line 263-266: The code currently uses
serde_json::from_slice(&bytes).unwrap_or_default(), which hides JSON parse
errors and can lead to destructive cleanup; change this to propagate the
deserialization error instead (e.g. let manifest: Manifest =
serde_json::from_slice(&bytes)? or serde_json::from_slice(&bytes).map_err(|e| /*
add context */ e)?), removing unwrap_or_default so a corrupt manifest returns an
Err from the surrounding function (or, alternatively, detect the Err and
explicitly skip destructive cleanup for that date); refer to manifest_path,
Manifest, bytes, fs::read, and serde_json::from_slice in your change.
- Around line 1220-1226: After successfully writing the manifest and deleting
the minute (the block calling fs::write(...),
fs::remove_dir_all(minute_to_delete) and delete_empty_directory_hot_tier(...)),
set delete_successful = true so the function returns success and later calls
put_hot_tier; also change the eviction condition using
stream_hot_tier.available_size from <= parquet_file_size to < parquet_file_size
so exact-fit (available_size == parquet_file_size) stops evicting instead of
continuing—i.e., after the delete sequence, if stream_hot_tier.available_size <
parquet_file_size continue; else set delete_successful = true and break
'loop_dates.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: fcfd13c9-8755-4c28-9615-7dc10ccb77bf

📥 Commits

Reviewing files that changed from the base of the PR and between 43e437c and e00e8a3.

📒 Files selected for processing (1)
  • src/hottier.rs

Comment thread src/hottier.rs
Comment thread src/hottier.rs
Comment thread src/hottier.rs Outdated
Comment thread src/hottier.rs
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/hottier.rs (1)

993-1000: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

unwrap() on date parsing can panic on malformed directory names.

If a directory exists under the stream path that doesn't match the date=YYYY-MM-DD format (e.g., created manually or by another process), this unwrap() will panic and crash the hot-tier sync loop for all streams.

🐛 Proposed fix: skip malformed entries
             let date = NaiveDate::parse_from_str(
                 date.file_name()
                     .to_string_lossy()
                     .trim_start_matches("date="),
                 "%Y-%m-%d",
-            )
-            .unwrap();
-            date_list.push(date);
+            );
+            match date {
+                Ok(d) => date_list.push(d),
+                Err(_) => {
+                    tracing::warn!(
+                        path = %date.path().display(),
+                        "skipping directory with invalid date format"
+                    );
+                    continue;
+                }
+            }
         }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/hottier.rs` around lines 993 - 1000, The current use of
NaiveDate::parse_from_str(...).unwrap() can panic for malformed directory names;
replace the unwrap with error-handling to skip entries that fail to parse (e.g.,
use match or if let Ok(parsed) = NaiveDate::parse_from_str(...) and only call
date_list.push(parsed) on success), and optionally log a debug/warn including
the original file_name() when parsing fails; update the code around
NaiveDate::parse_from_str, the date variable, and date_list.push to implement
this non-panicking behavior.
♻️ Duplicate comments (2)
src/hottier.rs (2)

1247-1251: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Exact-fit still triggers unnecessary eviction iteration.

The condition stream_hot_tier.available_size <= parquet_file_size at line 1247 causes one extra eviction loop when space exactly matches. The past review flagged this (claimed addressed in commit 5ee0454), but it's still <=. Change to < so exact-fit stops evicting:

🐛 Proposed fix
-                    if stream_hot_tier.available_size <= parquet_file_size {
+                    if stream_hot_tier.available_size < parquet_file_size {
                         continue;
                     } else {
                         break 'loop_dates;
                     }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/hottier.rs` around lines 1247 - 1251, The eviction loop uses the
condition stream_hot_tier.available_size <= parquet_file_size which causes one
extra eviction when available_size exactly equals parquet_file_size; update that
comparison to stream_hot_tier.available_size < parquet_file_size so an exact fit
will break out of 'loop_dates instead of performing another eviction. Locate the
check inside the eviction logic where parquet_file_size is compared to
stream_hot_tier.available_size and replace the <= with <, keeping the
surrounding control flow (the continue and break 'loop_dates) unchanged.

263-268: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Corrupt manifest still treated as empty despite claimed fix.

The past review indicated this was addressed, but unwrap_or_default() is still present. A JSON parse failure will produce an empty Manifest, and the subsequent orphan-cleanup pass (lines 187-197) can delete every parquet file under that date. Propagate the error instead:

🐛 Proposed fix
         let mut manifest: Manifest = if manifest_path.exists() {
             let bytes = fs::read(&manifest_path).await?;
-            serde_json::from_slice(&bytes).unwrap_or_default()
+            serde_json::from_slice(&bytes)?
         } else {
             Manifest::default()
         };
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/hottier.rs` around lines 263 - 268, The manifest JSON parse is being
swallowed by unwrap_or_default() which can convert a corrupt manifest into an
empty Manifest and cause catastrophic orphan cleanup; change the read/parse
logic for manifest_path so that serde_json::from_slice(&bytes) returns its error
instead of defaulting—i.e. replace the unwrap_or_default() on the
serde_json::from_slice call with proper error propagation (use the ? operator or
map_err to attach context) so that failures reading/parsing the manifest (in the
block that assigns to manifest) surface to the caller.
🧹 Nitpick comments (1)
src/hottier.rs (1)

720-723: 💤 Low value

.max(4) enforces a minimum, not a maximum concurrency.

The option hot_tier_files_per_stream_concurrency suggests the user can limit concurrency, but .max(4) means the actual concurrency is max(user_value, 4). If an operator sets this to 2 to reduce load, they still get 4 concurrent downloads. This is likely inverted logic—consider .min(MAX_CAP).max(1) or just trust the configured value.

♻️ Suggested fix
         let concurrency = PARSEABLE
             .options
             .hot_tier_files_per_stream_concurrency
-            .max(4);
+            .max(1); // Ensure at least 1; trust the configured upper limit
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/hottier.rs` around lines 720 - 723, The current expression sets
concurrency = PARSEABLE.options.hot_tier_files_per_stream_concurrency.max(4)
which enforces a minimum of 4 rather than an upper bound; change this logic so
the configured value is respected and bounded above (and optionally below)
instead — replace the .max(4) usage with bounding logic such as .min(4).max(1)
(or .min(MAX_CAP).max(1) if you have a named cap) so
hot_tier_files_per_stream_concurrency provides the intended limit, and ensure
the variable name concurrency remains assigned from
PARSEABLE.options.hot_tier_files_per_stream_concurrency after applying the
correct bounds.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Outside diff comments:
In `@src/hottier.rs`:
- Around line 993-1000: The current use of
NaiveDate::parse_from_str(...).unwrap() can panic for malformed directory names;
replace the unwrap with error-handling to skip entries that fail to parse (e.g.,
use match or if let Ok(parsed) = NaiveDate::parse_from_str(...) and only call
date_list.push(parsed) on success), and optionally log a debug/warn including
the original file_name() when parsing fails; update the code around
NaiveDate::parse_from_str, the date variable, and date_list.push to implement
this non-panicking behavior.

---

Duplicate comments:
In `@src/hottier.rs`:
- Around line 1247-1251: The eviction loop uses the condition
stream_hot_tier.available_size <= parquet_file_size which causes one extra
eviction when available_size exactly equals parquet_file_size; update that
comparison to stream_hot_tier.available_size < parquet_file_size so an exact fit
will break out of 'loop_dates instead of performing another eviction. Locate the
check inside the eviction logic where parquet_file_size is compared to
stream_hot_tier.available_size and replace the <= with <, keeping the
surrounding control flow (the continue and break 'loop_dates) unchanged.
- Around line 263-268: The manifest JSON parse is being swallowed by
unwrap_or_default() which can convert a corrupt manifest into an empty Manifest
and cause catastrophic orphan cleanup; change the read/parse logic for
manifest_path so that serde_json::from_slice(&bytes) returns its error instead
of defaulting—i.e. replace the unwrap_or_default() on the serde_json::from_slice
call with proper error propagation (use the ? operator or map_err to attach
context) so that failures reading/parsing the manifest (in the block that
assigns to manifest) surface to the caller.

---

Nitpick comments:
In `@src/hottier.rs`:
- Around line 720-723: The current expression sets concurrency =
PARSEABLE.options.hot_tier_files_per_stream_concurrency.max(4) which enforces a
minimum of 4 rather than an upper bound; change this logic so the configured
value is respected and bounded above (and optionally below) instead — replace
the .max(4) usage with bounding logic such as .min(4).max(1) (or
.min(MAX_CAP).max(1) if you have a named cap) so
hot_tier_files_per_stream_concurrency provides the intended limit, and ensure
the variable name concurrency remains assigned from
PARSEABLE.options.hot_tier_files_per_stream_concurrency after applying the
correct bounds.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: b7f512ab-7b6d-40e2-8822-61826d3cbc26

📥 Commits

Reviewing files that changed from the base of the PR and between e00e8a3 and b185b38.

📒 Files selected for processing (1)
  • src/hottier.rs

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/storage/s3.rs`:
- Around line 388-396: Validate and reject zero values for
PARSEABLE.options.hot_tier_download_chunk_size and
PARSEABLE.options.hot_tier_download_concurrency at CLI parsing time (ensure both
> 0) so users cannot set them to 0 via env/flags; additionally add a defensive
runtime check in the S3 download path before using the values (the block that
computes ranges with step_by and creates tokio::sync::Semaphore) to return an
error if either value is zero, referencing
PARSEABLE.options.hot_tier_download_chunk_size,
PARSEABLE.options.hot_tier_download_concurrency, the ranges calculation and
semaphore creation to locate the code to change.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 55b8c22f-d985-43a3-8d56-ea02c81594d8

📥 Commits

Reviewing files that changed from the base of the PR and between b185b38 and f6f9c50.

📒 Files selected for processing (6)
  • src/cli.rs
  • src/hottier.rs
  • src/storage/azure_blob.rs
  • src/storage/gcs.rs
  • src/storage/mod.rs
  • src/storage/s3.rs
🚧 Files skipped from review as they are similar to previous changes (4)
  • src/cli.rs
  • src/storage/gcs.rs
  • src/storage/azure_blob.rs
  • src/hottier.rs

Comment thread src/storage/s3.rs Outdated
@parmesant parmesant force-pushed the hottier-fix branch 2 times, most recently from 783ce6a to 8479ace Compare May 10, 2026 14:09
coderabbitai[bot]
coderabbitai Bot previously approved these changes May 10, 2026
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
src/storage/object_storage.rs (2)

878-918: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Make the new timestamp lookup tenant-safe across all backends.

This helper now treats list_dates/list_hours/list_minutes as tenant-aware, but the current GCS and Azure implementations still build prefixes from stream_name only. On non-default tenants, hot-tier bound discovery will read the wrong namespace or return no timestamps at all.

Please align those backend methods with the S3 prefixing behavior before relying on this default implementation for hot-tier sync.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/storage/object_storage.rs` around lines 878 - 918, The tenant-aware
timestamp lookup assumes list_dates/list_hours/list_minutes are already
tenant-scoped, but GCS and Azure backends still build prefixes using only
stream_name, causing wrong/empty results for non-default tenants; update the GCS
and Azure implementations of list_dates, list_hours, and list_minutes to
construct prefixes the same way the S3 backend does (include tenant_id/namespace
when tenant_id.is_some(), matching S3's prefixing logic) so
get_first_and_latest_event_from_storage, extract_timestamp_for_date, and related
hot-tier discovery methods operate on the correct tenant namespace.

1127-1183: ⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Don't return on the first upload failure from JoinSet.

The early return Err(...) paths bypass the cleanup/finalization block below, so other paths already inserted into ACTIVE_OBJECT_STORE_SYNC_FILES stay pinned, and any uploads that already succeeded never reach update_snapshot_with_manifests. That leaves staged/object-store state inconsistent until a later retry.

Please drain the set first and then either commit the successful manifests or explicitly roll those uploads back.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/storage/object_storage.rs` around lines 1127 - 1183, The code currently
returns immediately on the first upload failure inside the join_set loop, which
skips draining the rest of the tasks and the final cleanup that removes entries
from ACTIVE_OBJECT_STORE_SYNC_FILES and commits manifests; change this to record
the first error (e.g. let mut first_error: Option<ObjectStorageError> = None)
instead of returning in the match arms for Ok(Err((path,e))) and Err(e),
continue draining join_set.join_next() to completion, and always run the cleanup
block that removes paths from ACTIVE_OBJECT_STORE_SYNC_FILES and processes
uploaded_files; after the loop, if first_error.is_some() then roll back or
remove any partial uploads/staged files (use remove_file on staged paths and
remove entries from ACTIVE_OBJECT_STORE_SYNC_FILES) and return that error,
otherwise proceed to build manifest_files and return Ok(manifest_files). Ensure
you reference and update the logic around ACTIVE_OBJECT_STORE_SYNC_FILES,
uploaded_files, join_set.join_next(), remove_file, and the error handling
branches.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Outside diff comments:
In `@src/storage/object_storage.rs`:
- Around line 878-918: The tenant-aware timestamp lookup assumes
list_dates/list_hours/list_minutes are already tenant-scoped, but GCS and Azure
backends still build prefixes using only stream_name, causing wrong/empty
results for non-default tenants; update the GCS and Azure implementations of
list_dates, list_hours, and list_minutes to construct prefixes the same way the
S3 backend does (include tenant_id/namespace when tenant_id.is_some(), matching
S3's prefixing logic) so get_first_and_latest_event_from_storage,
extract_timestamp_for_date, and related hot-tier discovery methods operate on
the correct tenant namespace.
- Around line 1127-1183: The code currently returns immediately on the first
upload failure inside the join_set loop, which skips draining the rest of the
tasks and the final cleanup that removes entries from
ACTIVE_OBJECT_STORE_SYNC_FILES and commits manifests; change this to record the
first error (e.g. let mut first_error: Option<ObjectStorageError> = None)
instead of returning in the match arms for Ok(Err((path,e))) and Err(e),
continue draining join_set.join_next() to completion, and always run the cleanup
block that removes paths from ACTIVE_OBJECT_STORE_SYNC_FILES and processes
uploaded_files; after the loop, if first_error.is_some() then roll back or
remove any partial uploads/staged files (use remove_file on staged paths and
remove entries from ACTIVE_OBJECT_STORE_SYNC_FILES) and return that error,
otherwise proceed to build manifest_files and return Ok(manifest_files). Ensure
you reference and update the logic around ACTIVE_OBJECT_STORE_SYNC_FILES,
uploaded_files, join_set.join_next(), remove_file, and the error handling
branches.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 8f222005-adee-48a4-b4bd-5db689fbb113

📥 Commits

Reviewing files that changed from the base of the PR and between b185b38 and 8479ace.

📒 Files selected for processing (8)
  • src/cli.rs
  • src/hottier.rs
  • src/storage/azure_blob.rs
  • src/storage/gcs.rs
  • src/storage/localfs.rs
  • src/storage/mod.rs
  • src/storage/object_storage.rs
  • src/storage/s3.rs
🚧 Files skipped from review as they are similar to previous changes (4)
  • src/storage/localfs.rs
  • src/cli.rs
  • src/storage/mod.rs
  • src/hottier.rs

@parmesant
Copy link
Copy Markdown
Contributor Author

@coderabbitai review

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 11, 2026

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/storage/metrics_layer.rs (1)

207-231: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Stream failures are currently recorded with status 200 latency labels.

Line 211 and Line 230 initialize status to "200", but stream errors never update it before the final observe, so failed LIST/LIST_OFFSET flows are mislabeled in STORAGE_REQUEST_RESPONSE_TIME.

Suggested fix
-struct StreamMetricWrapper<'a, T> {
+struct StreamMetricWrapper<'a> {
     time: time::Instant,
     provider: String,
     method: &'static str,
     status: &'static str,
     _guard: InflightGuard,
-    inner: BoxStream<'a, T>,
+    inner: BoxStream<'a, ObjectStoreResult<ObjectMeta>>,
 }

-impl<T> Stream for StreamMetricWrapper<'_, T> {
-    type Item = T;
+impl Stream for StreamMetricWrapper<'_> {
+    type Item = ObjectStoreResult<ObjectMeta>;

     fn poll_next(
         mut self: std::pin::Pin<&mut Self>,
         cx: &mut Context<'_>,
     ) -> Poll<Option<Self::Item>> {
-        match self.inner.poll_next_unpin(cx) {
+        let polled = self.inner.poll_next_unpin(cx);
+        if let Poll::Ready(Some(Err(err))) = &polled {
+            self.status = error_to_status_code(err);
+        }
+
+        match polled {
             t @ Poll::Ready(None) => {
                 STORAGE_REQUEST_RESPONSE_TIME
                     .with_label_values(&[&self.provider, self.method, self.status])
                     .observe(self.time.elapsed().as_secs_f64());
                 t
             }
             t => t,
         }
     }
 }

Also applies to: 300-306

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/storage/metrics_layer.rs` around lines 207 - 231, The StreamMetricWrapper
currently hardcodes status = "200" for list/list_with_offset so any stream error
keeps the success label; update StreamMetricWrapper's streaming wrapper (used by
list and list_with_offset) so that when the inner stream yields an
Err(ObjectStoreError) you set/replace the wrapper's status to the appropriate
error label (e.g., error code or "5xx"/"4xx" string) before returning the error,
and ensure the final observe of STORAGE_REQUEST_RESPONSE_TIME uses that updated
status; apply the same change to the other streaming wrappers referenced around
lines 300-306 so all streaming flows update status on Err in the Stream impl
rather than leaving it as "200".
♻️ Duplicate comments (4)
src/storage/s3.rs (1)

1018-1029: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Remove the extra #[tracing::instrument] on check().

The function is instrumented twice with the same span name, so every call creates nested duplicate spans. Keep only the second attribute that records ok.

Suggested fix
-    #[tracing::instrument(
-        name = "s3.check",
-        skip(self),
-        fields(tenant = ?tenant_id),
-        err
-    )]
     #[tracing::instrument(
         name = "s3.check",
         skip(self),
         fields(tenant = ?tenant_id, ok = tracing::field::Empty),
         err
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/storage/s3.rs` around lines 1018 - 1029, The check() function has
duplicate #[tracing::instrument] attributes creating nested spans; remove the
first #[tracing::instrument(name = "s3.check", skip(self), fields(tenant =
?tenant_id), err)] and keep the second attribute that adds fields(tenant =
?tenant_id, ok = tracing::field::Empty) so the span records the ok field
correctly; edit the attributes above the check() definition (referencing the
tracing::instrument annotations on check) to leave only the second attribute.
src/hottier.rs (3)

316-327: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Propagate manifest parse failures instead of treating them as empty.

unwrap_or_default() makes a corrupt hottier.manifest.json look like “no files”, and the cleanup pass can then delete every parquet under that date as an orphan.

♻️ Minimal fix
         let mut manifest: Manifest = if manifest_path.exists() {
             let bytes = fs::read(&manifest_path).await.map_err(|e| {
                 error!(
                     stream = %stream,
                     tenant = ?tenant_id,
                     manifest_path = ?manifest_path,
                     error = ?e
                 );
                 e
             })?;
-            serde_json::from_slice(&bytes).unwrap_or_default()
+            serde_json::from_slice(&bytes)?
         } else {
             Manifest::default()
         };
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/hottier.rs` around lines 316 - 327, The current code silently treats JSON
deserialization failures as an empty Manifest by calling
serde_json::from_slice(&bytes).unwrap_or_default(), which can cause accidental
deletion of files; change it to propagate the parse error instead of defaulting:
replace the unwrap_or_default usage in the manifest deserialization (the block
that reads bytes from manifest_path into bytes and then calls
serde_json::from_slice) with a fallible parse that maps or returns the
serde_json::Error (e.g., using serde_json::from_slice(&bytes).map_err(|e| { log
the error with stream/tenant/manifest_path context and return e })? ) so the
function returns an error on corrupt JSON rather than treating it as an empty
Manifest.

856-890: ⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Keep the Latest/Historic boundary stable across the paired loops.

cutoff is still computed independently inside each phase. A file near that boundary can be classified as Latest by one loop and Historic by the other before either creates the local parquet, so both phases can reserve/download/commit the same file.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/hottier.rs` around lines 856 - 890, The cutoff used to decide Latest vs
Historic must be computed once and reused across both SyncPhase loops to avoid
race classification; move the computation of cutoff (currently computed from
PARSEABLE.options.hot_tier_latest_minutes and chrono::Utc::now() into a single
variable named cutoff before iterating the phases, and remove any other
per-phase recomputation so that the is_latest check (dt.map(|d| d >= cutoff)) in
the work-building code for SyncPhase::Latest and SyncPhase::Historic uses the
same cutoff value.

1066-1080: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Persist counter changes before mutating the cached hot-tier state.

These paths update sht/stream_hot_tier first and only then persist metadata or finish the related filesystem mutation. If any later write/delete fails, in-memory accounting is ahead of .hot_tier.json and later ticks keep using poisoned used_size / available_size.

Also applies to: 1119-1123, 1145-1161, 1479-1501

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/hottier.rs` around lines 1066 - 1080, The in-memory hot-tier counters
(e.g., sht.available_size / stream_hot_tier used_size) are being mutated before
the change is persisted; change the sequence so you persist the updated counters
first (use put_hot_tier / equivalent persistence call with the new values) and
only after a successful await mutate the cached sht/stream_hot_tier fields.
Locate the blocks that update sht.available_size and used_size and replace them
with: compute the new value, call self.put_hot_tier(stream, &mut
temp_sht_or_copy, tenant_id).await? to persist, then assign the persisted value
into the in-memory sht/stream_hot_tier. Mirror this change at the other
occurrences called out (the blocks around the other specified ranges).
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/cli.rs`:
- Around line 335-341: The CLI accepts 0 for
hot_tier_files_per_stream_concurrency which is invalid; update the argument
validation for hot_tier_files_per_stream_concurrency so values < 1 are rejected
(e.g., use a clap value parser or custom validator to enforce range >= 1) and
provide a clear error message explaining that the concurrency must be at least
1; adjust any parsing logic that reads hot_tier_files_per_stream_concurrency to
assume it will never be 0.
- Around line 317-333: hot_tier_download_chunk_size and
hot_tier_download_concurrency are u64 but later get cast to usize for
ranged-downloads; on 32-bit targets values larger than usize::MAX wrap/truncate
and can produce zero-step or zero-permit semaphores. Fix by capping these u64
CLI values to usize::MAX before any cast (e.g., when converting to usize in
S3/GCS/Azure ranged-download code or immediately after parsing), so use
value.min(usize::MAX as u64) (or equivalent clamp) prior to usize::try_from()/as
to ensure safe non-zero values for step_by and semaphore creation; reference the
hot_tier_download_chunk_size and hot_tier_download_concurrency symbols when
applying the cap.

In `@src/hottier.rs`:
- Around line 116-132: Race occurs when both Latest and Historic miss
self.state_cache and each calls reconcile_stream producing distinct
Arc<StreamSyncState>; fix by ensuring only one Arc is stored and returned: after
creating state via reconcile_stream and before returning, take the write lock on
self.state_cache and use the map entry API (or check for an occupied entry) to
insert the newly created Arc only if absent, otherwise discard the newly created
Arc and return the existing Arc from the cache; update the block around
self.state_cache.write().await, cache.insert(...), and the returned value so
get_or_insert semantics are used and the same Arc<StreamSyncState> is returned
to callers.

---

Outside diff comments:
In `@src/storage/metrics_layer.rs`:
- Around line 207-231: The StreamMetricWrapper currently hardcodes status =
"200" for list/list_with_offset so any stream error keeps the success label;
update StreamMetricWrapper's streaming wrapper (used by list and
list_with_offset) so that when the inner stream yields an Err(ObjectStoreError)
you set/replace the wrapper's status to the appropriate error label (e.g., error
code or "5xx"/"4xx" string) before returning the error, and ensure the final
observe of STORAGE_REQUEST_RESPONSE_TIME uses that updated status; apply the
same change to the other streaming wrappers referenced around lines 300-306 so
all streaming flows update status on Err in the Stream impl rather than leaving
it as "200".

---

Duplicate comments:
In `@src/hottier.rs`:
- Around line 316-327: The current code silently treats JSON deserialization
failures as an empty Manifest by calling
serde_json::from_slice(&bytes).unwrap_or_default(), which can cause accidental
deletion of files; change it to propagate the parse error instead of defaulting:
replace the unwrap_or_default usage in the manifest deserialization (the block
that reads bytes from manifest_path into bytes and then calls
serde_json::from_slice) with a fallible parse that maps or returns the
serde_json::Error (e.g., using serde_json::from_slice(&bytes).map_err(|e| { log
the error with stream/tenant/manifest_path context and return e })? ) so the
function returns an error on corrupt JSON rather than treating it as an empty
Manifest.
- Around line 856-890: The cutoff used to decide Latest vs Historic must be
computed once and reused across both SyncPhase loops to avoid race
classification; move the computation of cutoff (currently computed from
PARSEABLE.options.hot_tier_latest_minutes and chrono::Utc::now() into a single
variable named cutoff before iterating the phases, and remove any other
per-phase recomputation so that the is_latest check (dt.map(|d| d >= cutoff)) in
the work-building code for SyncPhase::Latest and SyncPhase::Historic uses the
same cutoff value.
- Around line 1066-1080: The in-memory hot-tier counters (e.g.,
sht.available_size / stream_hot_tier used_size) are being mutated before the
change is persisted; change the sequence so you persist the updated counters
first (use put_hot_tier / equivalent persistence call with the new values) and
only after a successful await mutate the cached sht/stream_hot_tier fields.
Locate the blocks that update sht.available_size and used_size and replace them
with: compute the new value, call self.put_hot_tier(stream, &mut
temp_sht_or_copy, tenant_id).await? to persist, then assign the persisted value
into the in-memory sht/stream_hot_tier. Mirror this change at the other
occurrences called out (the blocks around the other specified ranges).

In `@src/storage/s3.rs`:
- Around line 1018-1029: The check() function has duplicate
#[tracing::instrument] attributes creating nested spans; remove the first
#[tracing::instrument(name = "s3.check", skip(self), fields(tenant =
?tenant_id), err)] and keep the second attribute that adds fields(tenant =
?tenant_id, ok = tracing::field::Empty) so the span records the ok field
correctly; edit the attributes above the check() definition (referencing the
tracing::instrument annotations on check) to leave only the second attribute.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: d6259f29-8863-440f-b3cc-8df52b4b2f42

📥 Commits

Reviewing files that changed from the base of the PR and between b185b38 and 8af9f0f.

📒 Files selected for processing (14)
  • src/cli.rs
  • src/handlers/http/logstream.rs
  • src/handlers/http/modal/mod.rs
  • src/hottier.rs
  • src/metrics/mod.rs
  • src/storage/azure_blob.rs
  • src/storage/gcs.rs
  • src/storage/localfs.rs
  • src/storage/metrics_layer.rs
  • src/storage/mod.rs
  • src/storage/object_storage.rs
  • src/storage/s3.rs
  • src/sync.rs
  • src/telemetry.rs
✅ Files skipped from review due to trivial changes (1)
  • src/handlers/http/modal/mod.rs
🚧 Files skipped from review as they are similar to previous changes (2)
  • src/metrics/mod.rs
  • src/storage/mod.rs

Comment thread src/cli.rs
Comment on lines +317 to +333
#[arg(
long = "hot-tier-download-chunk-size",
env = "P_HOT_TIER_DOWNLOAD_CHUNK_SIZE",
value_parser = clap::value_parser!(u64).range(5242880..),
default_value = "8388608",
help = "Chunk size in bytes for parallel hot tier downloads (default 8 MiB)"
)]
pub hot_tier_download_chunk_size: u64,

#[arg(
long = "hot-tier-download-concurrency",
env = "P_HOT_TIER_DOWNLOAD_CONCURRENCY",
value_parser = clap::value_parser!(u64).range(1..),
default_value = "16",
help = "Number of concurrent range requests per hot tier download"
)]
pub hot_tier_download_concurrency: u64,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify the new option types and the downstream `as usize` casts.
sed -n '317,359p' src/cli.rs
sed -n '394,405p' src/storage/s3.rs
sed -n '233,245p' src/storage/gcs.rs
sed -n '267,279p' src/storage/azure_blob.rs

Repository: parseablehq/parseable

Length of output: 3499


Cap these download knobs to usize::MAX for 32-bit builds.

These values are cast to usize in the S3/GCS/Azure ranged-download paths. On 32-bit targets, values above usize::MAX truncate to zero, which causes step_by(0) to panic and creates a zero-permit semaphore that deadlocks.

Suggested fix
     #[arg(
         long = "hot-tier-download-chunk-size",
         env = "P_HOT_TIER_DOWNLOAD_CHUNK_SIZE",
-        value_parser = clap::value_parser!(u64).range(5242880..),
+        value_parser = clap::value_parser!(u64).range(5242880..=(usize::MAX as u64)),
         default_value = "8388608",
         help = "Chunk size in bytes for parallel hot tier downloads (default 8 MiB)"
     )]
     pub hot_tier_download_chunk_size: u64,

     #[arg(
         long = "hot-tier-download-concurrency",
         env = "P_HOT_TIER_DOWNLOAD_CONCURRENCY",
-        value_parser = clap::value_parser!(u64).range(1..),
+        value_parser = clap::value_parser!(u64).range(1..=(usize::MAX as u64)),
         default_value = "16",
         help = "Number of concurrent range requests per hot tier download"
     )]
     pub hot_tier_download_concurrency: u64,
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
#[arg(
long = "hot-tier-download-chunk-size",
env = "P_HOT_TIER_DOWNLOAD_CHUNK_SIZE",
value_parser = clap::value_parser!(u64).range(5242880..),
default_value = "8388608",
help = "Chunk size in bytes for parallel hot tier downloads (default 8 MiB)"
)]
pub hot_tier_download_chunk_size: u64,
#[arg(
long = "hot-tier-download-concurrency",
env = "P_HOT_TIER_DOWNLOAD_CONCURRENCY",
value_parser = clap::value_parser!(u64).range(1..),
default_value = "16",
help = "Number of concurrent range requests per hot tier download"
)]
pub hot_tier_download_concurrency: u64,
#[arg(
long = "hot-tier-download-chunk-size",
env = "P_HOT_TIER_DOWNLOAD_CHUNK_SIZE",
value_parser = clap::value_parser!(u64).range(5242880..=(usize::MAX as u64)),
default_value = "8388608",
help = "Chunk size in bytes for parallel hot tier downloads (default 8 MiB)"
)]
pub hot_tier_download_chunk_size: u64,
#[arg(
long = "hot-tier-download-concurrency",
env = "P_HOT_TIER_DOWNLOAD_CONCURRENCY",
value_parser = clap::value_parser!(u64).range(1..=(usize::MAX as u64)),
default_value = "16",
help = "Number of concurrent range requests per hot tier download"
)]
pub hot_tier_download_concurrency: u64,
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/cli.rs` around lines 317 - 333, hot_tier_download_chunk_size and
hot_tier_download_concurrency are u64 but later get cast to usize for
ranged-downloads; on 32-bit targets values larger than usize::MAX wrap/truncate
and can produce zero-step or zero-permit semaphores. Fix by capping these u64
CLI values to usize::MAX before any cast (e.g., when converting to usize in
S3/GCS/Azure ranged-download code or immediately after parsing), so use
value.min(usize::MAX as u64) (or equivalent clamp) prior to usize::try_from()/as
to ensure safe non-zero values for step_by and semaphore creation; reference the
hot_tier_download_chunk_size and hot_tier_download_concurrency symbols when
applying the cap.

Comment thread src/cli.rs
Comment on lines +335 to +341
#[arg(
long = "hot-tier-files-per-stream-concurrency",
env = "P_HOT_TIER_FILES_PER_STREAM_CONCURRENCY",
default_value = "4",
help = "Number of concurrent parquet file downloads per stream during hot tier sync"
)]
pub hot_tier_files_per_stream_concurrency: usize,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Reject 0 for hot_tier_files_per_stream_concurrency.

This is the only new hot-tier concurrency knob that still accepts 0 from env/CLI. 0 is not a valid concurrency setting and will leave downstream hot-tier work with an invalid limit.

Suggested fix
     #[arg(
         long = "hot-tier-files-per-stream-concurrency",
         env = "P_HOT_TIER_FILES_PER_STREAM_CONCURRENCY",
+        value_parser = clap::value_parser!(usize).range(1..),
         default_value = "4",
         help = "Number of concurrent parquet file downloads per stream during hot tier sync"
     )]
     pub hot_tier_files_per_stream_concurrency: usize,
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
#[arg(
long = "hot-tier-files-per-stream-concurrency",
env = "P_HOT_TIER_FILES_PER_STREAM_CONCURRENCY",
default_value = "4",
help = "Number of concurrent parquet file downloads per stream during hot tier sync"
)]
pub hot_tier_files_per_stream_concurrency: usize,
#[arg(
long = "hot-tier-files-per-stream-concurrency",
env = "P_HOT_TIER_FILES_PER_STREAM_CONCURRENCY",
value_parser = clap::value_parser!(usize).range(1..),
default_value = "4",
help = "Number of concurrent parquet file downloads per stream during hot tier sync"
)]
pub hot_tier_files_per_stream_concurrency: usize,
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/cli.rs` around lines 335 - 341, The CLI accepts 0 for
hot_tier_files_per_stream_concurrency which is invalid; update the argument
validation for hot_tier_files_per_stream_concurrency so values < 1 are rejected
(e.g., use a clap value parser or custom validator to enforce range >= 1) and
provide a clear error message explaining that the concurrency must be at least
1; adjust any parsing logic that reads hot_tier_files_per_stream_concurrency to
assume it will never be 0.

Comment thread src/hottier.rs
Comment on lines +116 to +132
if let Some(state) = self.state_cache.read().await.get(&key).cloned() {
return Ok(state);
}
// key not present, reconcile
let sht = self.reconcile_stream(stream, tenant_id).await?;
let state = Arc::new(StreamSyncState {
sht: AsyncMutex::new(sht),
});

let mut cache = self.state_cache.write().await;
if cache.insert(key, state.clone()).is_some() {
tracing::warn!(
"Key- {:?} was absent during read lock but already exists after reconcile!",
(tenant_id, stream),
);
};
Ok(state)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

Do not replace a freshly created StreamSyncState on concurrent cache misses.

On a cold stream, the Latest and Historic loops can both miss state_cache, both reconcile, and then return different Arc<StreamSyncState> values. That breaks the per-stream mutex guarantee on the first tick, so reservations/commits can race and overbook capacity.

♻️ Minimal fix
         let mut cache = self.state_cache.write().await;
-        if cache.insert(key, state.clone()).is_some() {
-            tracing::warn!(
-                "Key- {:?} was absent during read lock but already exists after reconcile!",
-                (tenant_id, stream),
-            );
-        };
-        Ok(state)
+        if let Some(existing) = cache.get(&key).cloned() {
+            return Ok(existing);
+        }
+        cache.insert(key, state.clone());
+        Ok(state)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/hottier.rs` around lines 116 - 132, Race occurs when both Latest and
Historic miss self.state_cache and each calls reconcile_stream producing
distinct Arc<StreamSyncState>; fix by ensuring only one Arc is stored and
returned: after creating state via reconcile_stream and before returning, take
the write lock on self.state_cache and use the map entry API (or check for an
occupied entry) to insert the newly created Arc only if absent, otherwise
discard the newly created Arc and return the existing Arc from the cache; update
the block around self.state_cache.write().await, cache.insert(...), and the
returned value so get_or_insert semantics are used and the same
Arc<StreamSyncState> is returned to callers.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants