Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions node/derivation/batch_info.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package derivation

import (
"bytes"
"encoding/binary"
"fmt"
"math/big"
Expand Down Expand Up @@ -254,6 +255,46 @@ func encodeTransactions(txs []*eth.Transaction) [][]byte {
return enc
}

// blockContentMatches reports whether a local L2 block carries the same
// content the batch committed for that height: timestamp, gas limit, base
// fee and the ordered tx list (L1 messages then L2 txs, by binary encoding).
// The batch has no parent hashes, so this is content-only; deriveForce pairs
// it with a canonical anchor to conclude a block is canonical.
func blockContentMatches(local *eth.Block, sd *catalyst.SafeL2Data) bool {
h := local.Header()
if h.Time != sd.Timestamp {
return false
}
if h.GasLimit != sd.GasLimit {
return false
}
if !baseFeeEqual(h.BaseFee, sd.BaseFee) {
return false
}
txs := local.Transactions()
if len(txs) != len(sd.Transactions) {
return false
}
for i, tx := range txs {
enc, err := tx.MarshalBinary()
if err != nil || !bytes.Equal(enc, sd.Transactions[i]) {
return false
}
}
return true
}

// baseFeeEqual treats nil and zero as equal — ParseBatch normalises a zero
// base fee to nil, while a local header may carry an explicit zero.
func baseFeeEqual(a, b *big.Int) bool {
az := a == nil || a.Sign() == 0
bz := b == nil || b.Sign() == 0
if az || bz {
return az && bz
}
return a.Cmp(b) == 0
}

type txQueue struct {
txs eth.Transactions
pointer int
Expand Down
125 changes: 50 additions & 75 deletions node/derivation/derivation.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,8 +365,12 @@ func (d *Derivation) derivationBlock(ctx context.Context) {
"l2Latest", currentL2Latest)
return
}
// Scenario C: sequencer stopped → L1 blob fill-gap.
d.logger.Info("local verify: lastBlock missing and L2 head flat across polls; fallback to L1 blob fill-gap (scenario C)",
// Scenario C: batch tip missing locally and L2 head flat across
// polls (sequencer stopped, or node stuck on a fork below the
// batch end). Pull the real batch and let deriveForce reconcile
// in one pass — verify present blocks, append the missing tail,
// then advance.
d.logger.Info("local verify: lastBlock missing and L2 head flat across polls; reconciling batch from L1 (scenario C)",
"batchIndex", batchInfo.batchIndex,
"lastBlockNumber", batchInfo.lastBlockNumber,
"l2Latest", currentL2Latest)
Expand All @@ -375,7 +379,7 @@ func (d *Derivation) derivationBlock(ctx context.Context) {
if errors.Is(fetchErr, types.ErrNotCommitBatchTx) {
continue
}
d.logger.Error("local verify fill-gap: fetch real batch failed",
d.logger.Error("local verify reconcile: fetch real batch failed",
"batchIndex", batchInfo.batchIndex, "error", fetchErr)
return
}
Expand All @@ -386,16 +390,12 @@ func (d *Derivation) derivationBlock(ctx context.Context) {
// reactors stopped indefinitely (Stop is idempotent on
// retry, but Start is never reached).
err = d.withReactorsQuiesced(ctx, batchInfo.batchIndex, func() error {
localLatest, err := d.l2Client.BlockNumber(ctx)
if err != nil {
return fmt.Errorf("read local latest: %w", err)
}
var derErr error
lastHeader, derErr = d.deriveForce(batchInfoFull, localLatest)
lastHeader, derErr = d.deriveForce(batchInfoFull)
Comment on lines 392 to +394

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🩺 Stability & Availability | 🟠 Major | ⚡ Quick win

Thread the caller context through deriveForce.

deriveForce can perform many L2 reads/writes while reactors are quiesced, but it uses d.ctx and context.Background() instead of the derivationBlock context. Cancellation won’t stop an in-flight reconcile promptly, and each rewritten block can wait up to 60s.

Proposed fix
- lastHeader, derErr = d.deriveForce(batchInfoFull)
+ lastHeader, derErr = d.deriveForce(ctx, batchInfoFull)
- lastHeader, derErr = d.deriveForce(batchInfoFull)
+ lastHeader, derErr = d.deriveForce(ctx, batchInfoFull)
-func (d *Derivation) deriveForce(rollupData *BatchInfo) (*eth.Header, error) {
+func (d *Derivation) deriveForce(ctx context.Context, rollupData *BatchInfo) (*eth.Header, error) {
@@
- anchor, err := d.l2Client.HeaderByNumber(d.ctx, big.NewInt(int64(anchorNum)))
+ anchor, err := d.l2Client.HeaderByNumber(ctx, big.NewInt(int64(anchorNum)))
@@
- local, lErr := d.l2Client.BlockByNumber(d.ctx, big.NewInt(int64(num)))
+ local, lErr := d.l2Client.BlockByNumber(ctx, big.NewInt(int64(num)))
@@
- ctx, cancel := context.WithTimeout(context.Background(), time.Duration(60)*time.Second)
+ callCtx, cancel := context.WithTimeout(ctx, time.Duration(60)*time.Second)
  defer cancel()
- return d.l2Client.NewSafeL2Block(ctx, &safeData)
+ return d.l2Client.NewSafeL2Block(callCtx, &safeData)

Also applies to: 442-444, 918-980

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@node/derivation/derivation.go` around lines 398 - 400, `deriveForce` is still
using `d.ctx` and `context.Background()` instead of the caller’s
`derivationBlock` context, so cancellation and deadlines are not propagated
during quiesced work. Update the `deriveForce` call sites in `derivation.go` to
pass the active context from `withReactorsQuiesced`/`derivationBlock`, and
thread that context through `deriveForce` itself and the related helpers it
calls so all L2 reads/writes respect cancellation promptly.

return derErr
})
if err != nil {
d.logger.Error("local verify fill-gap: derive failed",
d.logger.Error("local verify reconcile: derive failed",
"batchIndex", batchInfo.batchIndex, "error", err)
return
}
Expand Down Expand Up @@ -435,7 +435,7 @@ func (d *Derivation) derivationBlock(ctx context.Context) {
// reactors stopped indefinitely.
err = d.withReactorsQuiesced(ctx, batchInfo.batchIndex, func() error {
var derErr error
lastHeader, derErr = d.deriveForce(batchInfoFull, 0)
lastHeader, derErr = d.deriveForce(batchInfoFull)
return derErr
})
if err != nil {
Expand Down Expand Up @@ -636,12 +636,7 @@ func (d *Derivation) fetchRollupDataByTxHash(txHash common.Hash, blockNumber uin
}
}

// Get L2 height
l2Height, err := d.l2Client.BlockNumber(d.ctx)
if err != nil {
return nil, fmt.Errorf("query l2 block number error:%v", err)
}
rollupData, err := d.parseBatch(batch, l2Height)
rollupData, err := d.parseBatch(batch)
if err != nil {
d.logger.Error("parse batch failed", "txNonce", tx.Nonce(), "txHash", txHash,
"l1BlockNumber", blockNumber)
Expand Down Expand Up @@ -751,25 +746,27 @@ func (d *Derivation) UnPackData(data []byte) (geth.RPCRollupBatch, error) {
return batch, nil
}

func (d *Derivation) parseBatch(batch geth.RPCRollupBatch, l2Height uint64) (*BatchInfo, error) {
func (d *Derivation) parseBatch(batch geth.RPCRollupBatch) (*BatchInfo, error) {
batchInfo := new(BatchInfo)
if err := batchInfo.ParseBatch(batch); err != nil {
return nil, fmt.Errorf("parse batch error:%v", err)
}
if err := d.handleL1Message(batchInfo, batchInfo.parentTotalL1MessagePopped, l2Height); err != nil {
if err := d.handleL1Message(batchInfo, batchInfo.parentTotalL1MessagePopped); err != nil {
return nil, fmt.Errorf("handle l1 message error:%v", err)
}
return batchInfo, nil
}

func (d *Derivation) handleL1Message(rollupData *BatchInfo, parentTotalL1MessagePopped, l2Height uint64) error {
// handleL1Message populates each block's SafeL2Data.Transactions with its L1
// messages (prepended before the L2 txs). It runs for EVERY block in the batch,
// including ones already present locally: deriveForce content-compares and may
// rewrite present blocks, so it needs the complete tx list — a partial list
// mis-compares and would write L1-message-less blocks. We rely on snapshots
// shipping the full DB so historical L1 messages are always local; if that ever
// breaks, the count check below fails loudly instead of corrupting the chain.
func (d *Derivation) handleL1Message(rollupData *BatchInfo, parentTotalL1MessagePopped uint64) error {
totalL1MessagePopped := parentTotalL1MessagePopped
for bIndex, block := range rollupData.blockContexts {
// This may happen to nodes started from snapshot, in which case we will no longer handle L1Msg
if block.Number <= l2Height {
totalL1MessagePopped += uint64(block.l1MsgNum)
continue
}
var l1Transactions []*eth.Transaction
l1Messages, err := d.getL1Message(totalL1MessagePopped, uint64(block.l1MsgNum))
if err != nil {
Expand Down Expand Up @@ -832,20 +829,6 @@ func (d *Derivation) derive(rollupData *BatchInfo) (*eth.Header, error) {
return lastHeader, nil
}

// deriveForce writes the batch's blocks via NewSafeL2Block.
//
// skipNumber lets one implementation serve two SPEC-005 §4.3 Path B scenarios:
//
// - skipNumber == 0 (scenario B, self-heal): every block is written; EL
// SetCanonical reorgs the local fork onto the L1-canonical chain.
// - skipNumber > 0 (scenario C, fill-gap): blocks with Number ≤ skipNumber
// are skipped (already present locally, presumed valid via P2P), only
// the missing tail is appended; no reorg of existing blocks.
//
// In both cases the parent of the first block we actually write must exist
// locally. For scenario B that's batch.firstBlockNumber-1 (above safe head).
// For scenario C with skipNumber == localLatestL2 that's localLatestL2 itself
// (necessarily ≥ firstBlockNumber-1 once skipNumber covers everything below).
// withReactorsQuiesced runs body while consensus reactors (blocksync /
// broadcast) are paused, guaranteeing StartReactorsAfterReorg runs even if
// body returns an error. The restart height comes from the L2 EL latest
Expand Down Expand Up @@ -897,45 +880,25 @@ func (d *Derivation) withReactorsQuiesced(ctx context.Context, batchIndex uint64
return body()
}

func (d *Derivation) deriveForce(rollupData *BatchInfo, skipNumber uint64) (*eth.Header, error) {
// deriveForce reconciles the local chain with the batch's canonical content
// over [firstBlockNumber, lastBlockNumber] and returns the tip header.
//
// Walking from the canonical anchor (firstBlockNumber-1), each height is
// kept while its local content matches the batch, then rewritten via
// NewSafeL2Block from the first divergent or missing height onward. A kept
// block is canonical by induction: matching content on a canonical parent
// reproduces the canonical block (the batch has no parent hashes, so content
// is the only signal — and it suffices given the anchor). This replaces the
// old skipNumber fill-gap, which blindly trusted local blocks and could grow
// a permanent shadow chain.
func (d *Derivation) deriveForce(rollupData *BatchInfo) (*eth.Header, error) {
firstNum := rollupData.firstBlockNumber
if firstNum == 0 {
return nil, fmt.Errorf("invalid firstBlockNumber 0 for batch %d", rollupData.batchIndex)
}

// Race short-circuit: scenario C dispatch is decided before reactors are
// quiesced (HeaderByNumber check at derivationBlock vs StopReactors inside
// withReactorsQuiesced), so blocksync can backfill past lastBlockNumber in
// that small window. When that happens, skipNumber (= localLatest read
// after Stop) ends up >= the batch tip. Without this guard the loop below
// would `continue` on every block, return header(skipNumber) — a block
// past the batch — and then verifyBatchRoots / advanceSafe upstream would
// run against the wrong header (false stateException + safe head pushed
// past the batch). Returning header(lastBlockNumber) collapses this case
// to the same outcome scenario A would have produced if the dispatch had
// caught the now-present batch tip.
if skipNumber >= rollupData.lastBlockNumber {
lastHeader, err := d.l2Client.HeaderByNumber(d.ctx, big.NewInt(int64(rollupData.lastBlockNumber)))
if err != nil {
return nil, fmt.Errorf("read batch tip at %d: %w", rollupData.lastBlockNumber, err)
}
if lastHeader == nil {
return nil, fmt.Errorf("batch tip at %d missing", rollupData.lastBlockNumber)
}
d.logger.Info("deriveForce: P2P caught up past batch tip during scenario-C dispatch window; no-op write",
"batchIndex", rollupData.batchIndex,
"lastBlockNumber", rollupData.lastBlockNumber,
"skipNumber", skipNumber)
return lastHeader, nil
}

// Anchor: parent of the first block we will WRITE must exist locally.
// scenario B (skipNumber==0): firstNum-1.
// scenario C: max(firstNum-1, skipNumber).
// Anchor: parent of the batch's first block must exist locally.
parentNum := firstNum - 1
if skipNumber > parentNum {
parentNum = skipNumber
}
lastHeader, err := d.l2Client.HeaderByNumber(d.ctx, big.NewInt(int64(parentNum)))
if err != nil {
return nil, fmt.Errorf("read parent header at %d: %w", parentNum, err)
Expand All @@ -944,11 +907,23 @@ func (d *Derivation) deriveForce(rollupData *BatchInfo, skipNumber uint64) (*eth
return nil, fmt.Errorf("parent header at %d missing", parentNum)
}

rewriting := false
for _, blockData := range rollupData.blockContexts {
// Skip blocks already present locally (scenario C). For scenario B
// skipNumber == 0 means this branch is never taken.
if blockData.SafeL2Data.Number <= skipNumber {
continue
// Keep the local block while its content still matches the batch; at
// the first divergent or missing height switch to rewrite for the
// rest of the range (canonical by induction from the anchor).
if !rewriting {
local, lErr := d.l2Client.BlockByNumber(d.ctx, big.NewInt(int64(blockData.SafeL2Data.Number)))
if lErr != nil && !errors.Is(lErr, ethereum.NotFound) {
return nil, fmt.Errorf("read local block %d: %w", blockData.SafeL2Data.Number, lErr)
}
if local != nil && blockContentMatches(local, blockData.SafeL2Data) {
lastHeader = local.Header()
continue
}
rewriting = true
d.logger.Info("deriveForce: local fork/gap; rewriting batch tail onto canonical",
"batchIndex", rollupData.batchIndex, "fromBlock", blockData.SafeL2Data.Number)
}

// Pin the parent so SetCanonical reorgs from the local fork to the
Expand Down
Loading