Skip to content

feat: Event-Driven DA Follower with WebSocket Subscriptions#3131

Open
alpe wants to merge 10 commits intomainfrom
alex/2803_best_2worlds
Open

feat: Event-Driven DA Follower with WebSocket Subscriptions#3131
alpe wants to merge 10 commits intomainfrom
alex/2803_best_2worlds

Conversation

@alpe
Copy link
Contributor

@alpe alpe commented Mar 3, 2026

Summary

Replaces the Syncer's polling-based DA worker with an event-driven DAFollower that subscribes to DA header and data events in real time. This eliminates unnecessary polling latency in follow mode and brings zero-latency block processing when the node is caught up with the DA layer.

Changes

Core: Event-Driven DAFollower (block/internal/syncing/)

  • Introduces DAFollower, a new component that subscribes to both the header and data namespaces of the DA layer and processes them inline, achieving zero-latency follow mode.
  • Adds an inline blob processing path so blobs arriving with their header are handled immediately without a separate retrieval round-trip.
  • Adds a subscription watchdog that detects stalled DA subscriptions and triggers recovery.
  • Syncer refactored to delegate event-driven follow logic to DAFollower; the old polling worker is removed.
  • Extends the DARetriever interface and its tracing/mock implementations to support the new subscription flow.

DA Client: WebSocket-Based JSON-RPC (pkg/da/, block/internal/da/)

  • Updates the JSON-RPC DA client to use WebSockets for subscriptions, replacing the previous HTTP-polling approach.
  • Adds a proper WebSocket constructor to the client so callers can opt into streaming.
  • Extends DA types with subscription-relevant fields.
  • Security hardening on the DA subscription path.

Local DA Tooling (tools/local-da/)

  • Adds blob subscription support to the local dummy DA, enabling full E2E testing of the event-driven path without a live Celestia node.
  • Extends local.go with subscription broadcasting and rpc.go with the corresponding RPC endpoints.

Test Infrastructure

  • New da_retriever_mock.go and updates to test/mocks/da.go and test/testda/dummy.go to cover subscription interfaces.
  • syncer_backoff_test.go and syncer_test.go updated and significantly extended to cover the new event-driven flows.
  • E2E tests updated for new evnode flags and P2P address retrieval patterns.

Summary by CodeRabbit

Release Notes

  • New Features

    • Added WebSocket-based blob client connectivity for improved real-time communication
    • Introduced blob subscription mechanism enabling event-driven DA synchronization
    • Added DA follower component for continuous monitoring and processing of finalized blobs
  • Bug Fixes

    • Fixed configuration flag naming for proper full node setup
  • Tests

    • Enhanced test coverage for catchup logic and inline blob processing

alpe added 3 commits March 3, 2026 13:02
…ollower and introduce DA client subscription.
…w mode

When the DA subscription delivers blobs at the current local DA height,
the followLoop now processes them inline via ProcessBlobs — avoiding
a round-trip re-fetch from the DA layer.

Architecture:
- followLoop: processes subscription blobs inline when caught up (fast path),
  falls through to catchupLoop when behind (slow path).
- catchupLoop: unchanged — sequential RetrieveFromDA() for bulk sync.

Changes:
- Add Blobs field to SubscriptionEvent for carrying raw blob data
- Add extractBlobData() to DA client Subscribe adapter
- Export ProcessBlobs on DARetriever interface
- Add handleSubscriptionEvent() to DAFollower with inline fast path
- Add TestDAFollower_InlineProcessing with 3 sub-tests
When header and data use different DA namespaces, the DAFollower now
subscribes to both and merges events via a fan-in goroutine. This ensures
inline blob processing works correctly for split-namespace configurations.

Changes:
- Add DataNamespace to DAFollowerConfig and daFollower
- Subscribe to both namespaces in runSubscription with mergeSubscriptions fan-in
- Guard handleSubscriptionEvent to only advance localDAHeight when
  ProcessBlobs returns at least one complete event (header+data matched)
- Pass DataNamespace from syncer.go
- Implement Subscribe on DummyDA test helper with subscriber notification
@github-actions
Copy link
Contributor

github-actions bot commented Mar 3, 2026

The latest Buf updates on your PR. Results from workflow CI / buf-check (pull_request).

BuildFormatLintBreakingUpdated (UTC)
✅ passed⏩ skipped✅ passed✅ passedMar 4, 2026, 4:09 PM

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Mar 3, 2026

📝 Walkthrough

Walkthrough

This PR introduces WebSocket-based DA subscription with a new DAFollower component that manages blob synchronization through concurrent follow and catch-up loops. Blob client constructors are updated to use NewWSClient across multiple applications. Subscribe methods are added to DA interfaces and implementations, alongside ProcessBlobs for inline blob processing. The Syncer is refactored to delegate DA synchronization to the DAFollower instead of polling.

Changes

Cohort / File(s) Summary
Blob Client Constructor Updates
apps/evm/cmd/run.go, apps/grpc/cmd/run.go, apps/testapp/cmd/run.go, pkg/cmd/run_node.go
Replaced NewClient with NewWSClient for blob client initialization, enabling WebSocket-based subscriptions.
DA Interface & Client Implementation
block/internal/da/interface.go, block/internal/da/client.go, block/internal/da/tracing.go
Added Subscribe method to Client interface; implemented Subscribe in da client with namespace validation, blob subscription, and event streaming with data filtering.
DAFollower Component
block/internal/syncing/da_follower.go
New component with concurrent followLoop and catchupLoop for DA synchronization; manages subscriptions, fast-path inline processing, backoff, and state tracking (localDAHeight, highestSeenDAHeight, headReached).
DARetriever Extensions
block/internal/syncing/da_retriever.go, block/internal/syncing/da_retriever_tracing.go
Added public ProcessBlobs method to DARetriever interface for inline blob processing; implemented wrapper in retriever and tracing client.
Syncer Refactoring
block/internal/syncing/syncer.go
Replaced daHeadReached and daWorkerLoop with daFollower field; initialize and start DAFollower in Start; delegate HasReachedDAHead to follower; removed polling-based DA catch-up logic.
Syncer Test Updates
block/internal/syncing/syncer_backoff_test.go, block/internal/syncing/syncer_benchmark_test.go, block/internal/syncing/syncer_test.go
Migrated tests to DAFollower; added TestDAFollower_BackoffOnCatchupError, TestDAFollower_BackoffResetOnSuccess, TestDAFollower_CatchupThenReachHead, TestDAFollower_InlineProcessing; updated BenchmarkSyncerIO to use follower-driven DA retrieval.
Test Infrastructure & Mocks
apps/evm/server/force_inclusion_test.go, block/internal/da/tracing_test.go, block/internal/syncing/da_retriever_mock.go, block/internal/syncing/da_retriever_tracing_test.go, test/mocks/da.go, test/testda/dummy.go, tools/local-da/local.go, tools/local-da/rpc.go
Added Subscribe method to all mock and test DA implementations; added ProcessBlobs to mock retriever; implemented subscriber tracking and notifications in DummyDA and LocalDA with namespace filtering.
DA Types & WebSocket Client
pkg/da/types/types.go, pkg/da/jsonrpc/client.go
Added SubscriptionEvent type with Height and Blobs fields; implemented NewWSClient constructor that derives WebSocket URL from HTTP(S) address via httpToWS helper.
E2E Test Updates
test/e2e/evm_force_inclusion_e2e_test.go, test/e2e/evm_test_common.go
Updated full node P2P connection to use sequencer's specific P2P address; replaced rollkit.\* flags with evnode.\* equivalents (da.block_time, da.address, da.namespace, da.batching_strategy, rpc.address, p2p.listen_address, p2p.peers).

Sequence Diagram

sequenceDiagram
    participant Syncer
    participant DAFollower
    participant FollowLoop as followLoop
    participant CatchupLoop as catchupLoop
    participant DA as DA Client
    participant Retriever
    participant Pipeline as Event Pipeline

    Syncer->>DAFollower: Start(ctx)
    activate DAFollower
    DAFollower->>FollowLoop: spawn concurrent loop
    DAFollower->>CatchupLoop: spawn concurrent loop
    activate FollowLoop
    activate CatchupLoop

    FollowLoop->>DA: Subscribe(ctx, namespace)
    activate DA
    DA-->>FollowLoop: event channel
    deactivate DA

    loop Subscription Events
        FollowLoop->>FollowLoop: handleSubscriptionEvent(event)
        FollowLoop->>FollowLoop: updateHighest(highestSeenDAHeight)
        alt Inline Processing (fast-path)
            FollowLoop->>Retriever: ProcessBlobs(ctx, blobs, height)
            Retriever-->>FollowLoop: DAHeightEvent[]
            FollowLoop->>Pipeline: pipe event
        else Backlog Processing
            FollowLoop->>CatchupLoop: signal catchup needed
        end
    end

    loop Catchup Sequence
        CatchupLoop->>CatchupLoop: waitOnCatchupSignal()
        CatchupLoop->>CatchupLoop: runCatchup(ctx)
        loop Heights: localDAHeight → highestSeenDAHeight
            CatchupLoop->>Retriever: RetrieveFromDA(ctx, height)
            Retriever-->>CatchupLoop: blobs/error
            CatchupLoop->>Retriever: ProcessBlobs(ctx, blobs, height)
            Retriever-->>CatchupLoop: DAHeightEvent[]
            CatchupLoop->>Pipeline: pipe event
        end
    end

    CatchupLoop->>CatchupLoop: hasReachedHead = true
    deactivate FollowLoop
    deactivate CatchupLoop
    deactivate DAFollower
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

  • feat(sequencer): catchup from base #3057: Directly related through shared DA subscription, catch-up, and forced-inclusion code paths; both PRs modify DA synchronization infrastructure and syncer behavior.

Suggested labels

T:testing

Suggested reviewers

  • julienrbrt
  • tac0turtle
  • tzdybal
  • tuxcanfly

Poem

🐰 WebSockets dance, blobs now flow in streams,
FollowerLoops chase heights through syncing dreams,
Catch-up cascades with backoff in the night,
Subscribe and process—DA's shining bright! ✨

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 inconclusive)

Check name Status Explanation Resolution
Description check ❓ Inconclusive The PR description comprehensively covers the changes with clear sections (Core DAFollower, DA Client, Local DA Tooling, Test Infrastructure) and explains the motivation, but does not follow the provided template which requires an 'Overview' section with context, background, goal, and rationale. Consider restructuring the description to match the template structure: add an 'Overview' section with the broader context, goal, and rationale, then organize the 'Changes' section accordingly.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The PR title accurately and clearly describes the main change: introducing an event-driven DA Follower component with WebSocket subscriptions, replacing the previous polling-based approach.
Docstring Coverage ✅ Passed Docstring coverage is 81.82% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch alex/2803_best_2worlds

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@claude
Copy link
Contributor

claude bot commented Mar 3, 2026

Claude finished @alpe's task in 2m 56s —— View job


Code Review: Adding Sync Mode

Tasks:

  • Gather context on PR changes
  • Review block/internal/syncing/da_follower.go (new file)
  • Review block/internal/syncing/syncer.go (major changes)
  • Review block/internal/da/client.go
  • Review pkg/da/jsonrpc/client.go
  • Review pkg/da/types/types.go
  • Review test changes
  • Review supporting files
  • Compile and post final review

Overview

This PR replaces the old polling-based DA sync with an event-driven subscription model via the new DAFollower component. The architecture is well thought out — two goroutines (followLoop + catchupLoop) communicate through an atomic localDAHeight / highestSeenDAHeight pair with a CAS-based fast path to process subscription blobs inline. Overall a solid improvement.


Bugs / Correctness

1. Race on shared mutable state in processBlobs across different heights

File: block/internal/syncing/da_retriever.go:202 and block/internal/syncing/da_follower.go:260

The comment on ProcessBlobs says:

NOT thread-safe: the caller (DAFollower) must ensure exclusive access via CAS on localDAHeight before calling this method.

But the CAS at da_follower.go:260 only prevents two goroutines from processing the same height simultaneously. pendingHeaders and pendingData are maps that accumulate across heights — if catchupLoop is processing height N (inside fetchAndPipeHeightRetrieveFromDAprocessBlobs) while followLoop concurrently processes the subscription event for height N+1 (via ProcessBlobs), both goroutines mutate the same maps. This is a data race.

Fix this →

2. Partial-pipe rollback duplicates already-delivered events

File: block/internal/syncing/da_follower.go:260-278

When the fast-path processes blobs inline:

events := f.retriever.ProcessBlobs(ctx, ev.Blobs, ev.Height)
for _, event := range events {
    if err := f.pipeEvent(ctx, event); err != nil {
        // Roll back so catchupLoop can retry this height.
        f.localDAHeight.Store(ev.Height)
        ...
        return
    }
}

If pipeEvent succeeds for the first event but fails on the second, localDAHeight is rolled back and catchupLoop will re-fetch and re-pipe all events at that height — including the one already successfully delivered. The heightInCh / cache deduplication may absorb this (IsHeaderSeen in processHeightEvent), but the comment says "retry this height" without acknowledging the duplicate.

Fix this →

3. errCaughtUp is defined but never returned — dead code

File: block/internal/syncing/da_follower.go:398

errCaughtUp is defined and its guard in waitOnCatchupError is checked, but nothing in the catchup path ever returns this error. The "reached head" case in fetchAndPipeHeight returns datypes.ErrHeightFromFuture, not errCaughtUp. The detection in waitOnCatchupError is effectively dead.

Fix this →

4. Double-close panic risk in DummyDA.Reset()

File: test/testda/dummy.go:329-347

Reset() closes all subscriber channels while holding d.mu. Each Subscribe call spawns a goroutine that, on ctx.Done(), acquires d.mu and then calls close(ch) on the same channel. If Reset() runs first and closes the channels, the goroutine's subsequent close(ch) will panic with "close of closed channel".

// In Subscribe goroutine:
<-ctx.Done()
d.mu.Lock()
...
close(ch)   // panics if Reset already closed ch

Fix this →


Architecture / Design

5. LocalDA.Submit/SubmitWithOptions don't populate blobData

File: tools/local-da/local.go:236-263

The new blobData map was added for subscription support, but the direct Submit and SubmitWithOptions methods on LocalDA only populate the old data map. The notifySubscribers call at the end sends the height event but with nil blobs, since d.blobData[height] is empty. This means subscribers get zero blobs via the direct API, while the RPC path (blobServer.Submit) correctly populates blobData. This inconsistency could confuse test authors using LocalDA directly.

Fix this →

6. startSyncWorkers uses wg.Add/go inconsistently

File: block/internal/syncing/syncer.go:381-386

func (s *Syncer) startSyncWorkers(ctx context.Context) {
    s.wg.Add(2)
    go s.pendingWorkerLoop(ctx)
    go s.p2pWorkerLoop(ctx)
}

Start() uses s.wg.Go(...) (line 198), but startSyncWorkers uses the older Add+go pattern. Minor inconsistency — should align with the codebase style.

7. Watchdog may fire under normal slow-DA conditions

File: block/internal/syncing/da_follower.go:432-437

The watchdog timeout is 3 × daBlockTime. For DA layers with block times > 10s and occasional gaps, the watchdog may trigger reconnects during normal operation. The 3× multiplier might be too tight; consider making it configurable or using a longer minimum.


Performance

8. Benchmark regression flagged by CI

The CI benchmark alert shows:

Benchmark Ratio
Batch.Put 1.93× slower
Batch.SaveBlockData 1.50× slower
Store.Height 2.02× slower

These regressions are significant. Store.Height being 2× slower warrants investigation — the p2pWorkerLoop and pendingWorkerLoop both call store.Height frequently (10ms and 50ms ticks respectively), and the event-driven subscription path may now produce blocks faster than before, increasing contention on the store.


Minor / Style

9. SplitID now returns an error in pkg/da/types/types.go but not in pkg/da/jsonrpc

File: pkg/da/types/types.go:78

The new datypes.SplitID signature is (uint64, []byte, error), but blobrpc.SplitID (used in block/internal/da/client.go:434) still returns (uint64, []byte). This is two different functions — confusing but not a bug. Consider whether they should be unified.

10. Coverage drop

Codecov reports 28.4% patch coverage, with da_follower.go at 38.34% (112 missing lines) and the new da/client.go at 0%. The da_follower.go catchup/follow loop logic is the most critical new code and has the most complex error paths — it deserves unit tests for:

  • The reconnection / watchdog firing
  • The priority-height drain in runCatchup
  • Inline fast-path partial-failure rollback

Positive Highlights

  • Clean separation of concerns: DAFollower neatly encapsulates subscription + catchup, removing that responsibility from Syncer.
  • Lock-free design: Using atomic.Uint64 + CAS for localDAHeight / highestSeenDAHeight avoids contention between followLoop and catchupLoop.
  • Watchdog pattern: Re-using the watchdog pattern from prior DA subscription hardening is good defensive coding.
  • Split namespace support: mergeSubscriptions correctly fan-ins two channels when header and data namespaces differ.
  • testda/dummy.go: Well-structured test DA with subscription support, height ticker, and reset capability.

If no subscription events arrive within 3× the DA block time (default
30s), the watchdog triggers and returns an error. The followLoop then
reconnects the subscription with the standard backoff. This prevents
the node from silently stopping sync when the DA subscription stalls
(e.g., network partition, DA node freeze).
@codecov
Copy link

codecov bot commented Mar 3, 2026

Codecov Report

❌ Patch coverage is 28.40909% with 189 lines in your changes missing coverage. Please review.
✅ Project coverage is 59.96%. Comparing base (2c75e9e) to head (dd7e0cd).

Files with missing lines Patch % Lines
block/internal/syncing/da_follower.go 38.34% 112 Missing and 7 partials ⚠️
block/internal/da/client.go 0.00% 36 Missing ⚠️
block/internal/syncing/syncer.go 5.55% 16 Missing and 1 partial ⚠️
pkg/da/jsonrpc/client.go 0.00% 6 Missing ⚠️
pkg/cmd/run_node.go 0.00% 5 Missing ⚠️
block/internal/da/tracing.go 0.00% 2 Missing ⚠️
block/internal/syncing/da_retriever.go 0.00% 2 Missing ⚠️
block/internal/syncing/da_retriever_tracing.go 0.00% 2 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3131      +/-   ##
==========================================
- Coverage   60.81%   59.96%   -0.86%     
==========================================
  Files         113      114       +1     
  Lines       11692    11891     +199     
==========================================
+ Hits         7111     7130      +19     
- Misses       3777     3953     +176     
- Partials      804      808       +4     
Flag Coverage Δ
combined 59.96% <28.40%> (-0.86%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

alpe added 2 commits March 3, 2026 15:24
…ient to use WebSockets, along with E2E test updates for new `evnode` flags and P2P address retrieval.
Copy link
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark 'Spamoor Trace Benchmarks'.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 1.50.

Benchmark suite Current: dd7e0cd Previous: 2c75e9e Ratio
SpamoorSmoke - Batch.SetHeight (avg) 15.020408163265307 us 8.824561403508772 us 1.70
SpamoorSmoke - Store.GetBlockData (avg) 13.59433962264151 us 7.780701754385965 us 1.75

This comment was automatically generated by workflow using github-action-benchmark.

alpe added 4 commits March 4, 2026 14:13
* main:
  chore: add stricter linting (#3132)
  feat(benchmarking): adding ERC20 benchmarking test (#3114)
  feat: ensure p2p DAHint within limits (#3128)
@alpe alpe changed the title [WIP] adding sync mode feat: Event-Driven DA Follower with WebSocket Subscriptions Mar 4, 2026
@alpe alpe marked this pull request as ready for review March 4, 2026 17:23
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

🧹 Nitpick comments (4)
block/internal/da/interface.go (1)

20-23: Clarify the post-cancellation drain contract in the comment.

“Callers MUST drain the channel after cancellation” can be interpreted differently; consider explicitly stating the expected sequence (cancel context, keep receiving until channel closes).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/da/interface.go` around lines 20 - 23, Update the comment for
Subscribe to explicitly describe the post-cancellation drain sequence: state
that callers should cancel the provided ctx, then continue receiving from the
returned <-chan datypes.SubscriptionEvent (e.g., range over the channel) until
it is closed by the implementation; clarify that the channel will be closed once
cancellation is observed and that draining (receiving until close) prevents
goroutine/resource leaks. Reference the Subscribe(ctx context.Context, namespace
[]byte) signature and datypes.SubscriptionEvent in the comment.
block/internal/syncing/da_retriever_tracing.go (1)

67-69: Consider instrumenting ProcessBlobs in the tracing wrapper.

This new hot path currently emits no span/attributes, which limits visibility into inline blob processing latency and output volume.

Suggested refactor
 func (t *tracedDARetriever) ProcessBlobs(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent {
-	return t.inner.ProcessBlobs(ctx, blobs, daHeight)
+	ctx, span := t.tracer.Start(ctx, "DARetriever.ProcessBlobs",
+		trace.WithAttributes(
+			attribute.Int64("da.height", int64(daHeight)),
+			attribute.Int("blob.count", len(blobs)),
+		),
+	)
+	defer span.End()
+
+	events := t.inner.ProcessBlobs(ctx, blobs, daHeight)
+	span.SetAttributes(attribute.Int("event.count", len(events)))
+	return events
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/syncing/da_retriever_tracing.go` around lines 67 - 69, The
tracing wrapper tracedDARetriever.ProcessBlobs is not creating any span or
attributes, so add instrumentation around the call to t.inner.ProcessBlobs:
start a span (using the same tracer used elsewhere in this file), set attributes
for blob_count (len(blobs)), da_height (daHeight) and optionally
total_blob_bytes (sum of len for each blob), call t.inner.ProcessBlobs(ctx,
blobs, daHeight), record the resulting events count as an attribute (e.g.,
da_events_count) and any error/status if applicable, then end the span; keep the
wrapper behavior identical except for adding the span and attributes to improve
visibility into latency and output volume.
apps/evm/server/force_inclusion_test.go (1)

53-58: Prefer a mockery-generated DA mock over extending the hand-written stub.

This local Subscribe stub works, but it increases interface-drift maintenance as DA APIs evolve. Using the generated DA mock keeps contract changes centralized.

As per coding guidelines, "Mock external dependencies using mockery".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/evm/server/force_inclusion_test.go` around lines 53 - 58, Replace the
hand-written Subscribe implementation on mockDA with the mockery-generated DA
mock: remove or stop using the local mockDA.Subscribe stub and instead import
and instantiate the mockery-created mock (e.g., MockDA) and use its EXPECT/On
setup to return a closed (<-chan da.SubscriptionEvent) or desired channel for
the test; update test setup where mockDA is constructed to use the mockery mock
and set the Subscribe return behavior via the mock's EXPECT/On methods so the
external DA contract is maintained centrally.
block/internal/da/tracing.go (1)

148-150: Consider tracing subscription setup for consistency.

Subscribe currently bypasses the tracing pattern used by the rest of this wrapper, so subscribe failures won’t be visible in DA spans.

Suggested refactor
 func (t *tracedClient) Subscribe(ctx context.Context, namespace []byte) (<-chan datypes.SubscriptionEvent, error) {
-	return t.inner.Subscribe(ctx, namespace)
+	ctx, span := t.tracer.Start(ctx, "DA.Subscribe",
+		trace.WithAttributes(
+			attribute.Int("ns.length", len(namespace)),
+			attribute.String("da.namespace", hex.EncodeToString(namespace)),
+		),
+	)
+	defer span.End()
+
+	ch, err := t.inner.Subscribe(ctx, namespace)
+	if err != nil {
+		span.RecordError(err)
+		span.SetStatus(codes.Error, err.Error())
+		return nil, err
+	}
+	return ch, nil
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/da/tracing.go` around lines 148 - 150, tracedClient.Subscribe
bypasses the wrapper's tracing; update the method to create a DA span (matching
the tracing pattern used elsewhere in this file), start the span with the
incoming ctx, add relevant attributes (e.g., namespace), call
t.inner.Subscribe(ctx, namespace), record any returned error on the span, and
end the span before returning the channel and error so subscription setup
failures are captured in DA spans; reference the tracedClient.Subscribe method
and t.inner.Subscribe call when making this change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@apps/evm/cmd/run.go`:
- Around line 63-66: The WS dial is using context.Background() when calling
blobrpc.NewWSClient which prevents CLI cancellation from propagating; replace
the background context with the command-scoped context (use cmd.Context() or a
derived context) when calling blobrpc.NewWSClient so that the DA WebSocket
handshake is canceled on CLI shutdown and respects timeouts; update the call
site where blobrpc.NewWSClient(context.Background(), nodeConfig.DA.Address,
nodeConfig.DA.AuthToken, "") is invoked to pass cmd.Context() (or ctx :=
cmd.Context() / ctx, cancel := context.WithTimeout(cmd.Context(), ...) if you
need a timeout) instead.

In `@block/internal/syncing/da_follower.go`:
- Around line 265-267: The unconditional f.localDAHeight.Store(ev.Height) can
regress localDAHeight if runCatchup advanced it concurrently; change the
rollback to a CAS so we only set it back when the current value equals the value
we expect to overwrite. Replace the Store(ev.Height) rollback with a
compare-and-swap using the atomic type/method (e.g.,
f.localDAHeight.CompareAndSwap(expectedPrev, ev.Height) or
atomic.CompareAndSwapUint64) where expectedPrev is the value read before
attempting the rollback; apply the same CAS pattern in both places noted around
the runCatchup logic to avoid moving localDAHeight backwards.
- Around line 271-273: The code sets f.headReached.Store(true) whenever inline
processing yields events, which can be incorrect if highestSeenDAHeight is still
ahead; modify the condition around the len(events) branch in da_follower.go (the
block using ev.Height and f.headReached) to only set f.headReached when the
event height indicates we are actually caught up (e.g., ev.Height >=
f.highestSeenDAHeight or when highestSeenDAHeight is unset/zero), otherwise do
not flip headReached; update the conditional that logs with f.logger.Debug() so
the headReached.Store(true) call is guarded by this explicit comparison to
highestSeenDAHeight.

In `@block/internal/syncing/syncer_benchmark_test.go`:
- Around line 46-61: The test starts goroutines with b.Context() which doesn't
get cancelled by the fixture teardown (fixt.s.cancel()), causing potential
goroutine leaks; change all uses of b.Context() when launching long-lived worker
goroutines to use the fixture's cancelable context (fixt.s.ctx) instead —
specifically pass fixt.s.ctx into fixt.s.processLoop, follower.runCatchup (and
the NewDAFollower call site that spawns it), and fixt.s.startSyncWorkers; apply
the same replacement for the other occurrences mentioned (lines near 64-69 and
78-79) so teardown via fixt.s.cancel() and fixt.s.wg.Wait() correctly shuts down
workers.

In `@block/internal/syncing/syncer.go`:
- Around line 198-213: processLoop is started with s.wg.Go before starting
s.daFollower, so if s.daFollower.Start(ctx) fails the goroutine can leak; wrap
the work with a cancellable child context (e.g., childCtx, cancel :=
context.WithCancel(ctx)) and pass childCtx to s.processLoop and to
daFollower.Start, then on Start failure call cancel() and wait for in-flight
goroutines to finish (s.wg.Wait or equivalent) before returning the error;
update usages of s.wg.Go(func() { s.processLoop(ctx) }) and
s.daFollower.Start(ctx) to use childCtx and ensure cancel + wait are performed
on the error path.

In `@test/testda/dummy.go`:
- Around line 66-77: The goroutine unconditionally closes ch on ctx.Done(),
causing a double-close if Reset() already closed it; modify the removal logic in
the ctx.Done() goroutine (which locks d.mu and iterates d.subscribers comparing
s to sub) to only close ch when this goroutine actually removed the subscriber
(e.g., set a removed bool when s==sub, perform d.subscribers = append(...),
break, and then close ch only if removed is true), so Reset() and this goroutine
cannot both close the same channel; reference d.mu, d.subscribers, sub, ch and
Reset() when making the change.

In `@tools/local-da/local.go`:
- Around line 230-231: The subscription events lose blob payloads because
notifySubscribers reads from d.blobData[height] while SubmitWithOptions and
other submit paths only write to d.data and never populate d.blobData; update
the submit code paths (e.g., SubmitWithOptions and the other submit handlers
around the noted regions) to also populate d.blobData[height] with the
corresponding blob list when they set d.data[height] (or move notifySubscribers
to read from d.data if that is the intended single source); ensure you update
the same symbol names (d.data, d.blobData) so notifySubscribers(d.height) will
see the blobs and subscribers receive non-empty blob lists.

---

Nitpick comments:
In `@apps/evm/server/force_inclusion_test.go`:
- Around line 53-58: Replace the hand-written Subscribe implementation on mockDA
with the mockery-generated DA mock: remove or stop using the local
mockDA.Subscribe stub and instead import and instantiate the mockery-created
mock (e.g., MockDA) and use its EXPECT/On setup to return a closed (<-chan
da.SubscriptionEvent) or desired channel for the test; update test setup where
mockDA is constructed to use the mockery mock and set the Subscribe return
behavior via the mock's EXPECT/On methods so the external DA contract is
maintained centrally.

In `@block/internal/da/interface.go`:
- Around line 20-23: Update the comment for Subscribe to explicitly describe the
post-cancellation drain sequence: state that callers should cancel the provided
ctx, then continue receiving from the returned <-chan datypes.SubscriptionEvent
(e.g., range over the channel) until it is closed by the implementation; clarify
that the channel will be closed once cancellation is observed and that draining
(receiving until close) prevents goroutine/resource leaks. Reference the
Subscribe(ctx context.Context, namespace []byte) signature and
datypes.SubscriptionEvent in the comment.

In `@block/internal/da/tracing.go`:
- Around line 148-150: tracedClient.Subscribe bypasses the wrapper's tracing;
update the method to create a DA span (matching the tracing pattern used
elsewhere in this file), start the span with the incoming ctx, add relevant
attributes (e.g., namespace), call t.inner.Subscribe(ctx, namespace), record any
returned error on the span, and end the span before returning the channel and
error so subscription setup failures are captured in DA spans; reference the
tracedClient.Subscribe method and t.inner.Subscribe call when making this
change.

In `@block/internal/syncing/da_retriever_tracing.go`:
- Around line 67-69: The tracing wrapper tracedDARetriever.ProcessBlobs is not
creating any span or attributes, so add instrumentation around the call to
t.inner.ProcessBlobs: start a span (using the same tracer used elsewhere in this
file), set attributes for blob_count (len(blobs)), da_height (daHeight) and
optionally total_blob_bytes (sum of len for each blob), call
t.inner.ProcessBlobs(ctx, blobs, daHeight), record the resulting events count as
an attribute (e.g., da_events_count) and any error/status if applicable, then
end the span; keep the wrapper behavior identical except for adding the span and
attributes to improve visibility into latency and output volume.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 24171a7e-5193-4ec4-9f32-030b0acc2d20

📥 Commits

Reviewing files that changed from the base of the PR and between 2c75e9e and dd7e0cd.

📒 Files selected for processing (26)
  • apps/evm/cmd/run.go
  • apps/evm/server/force_inclusion_test.go
  • apps/grpc/cmd/run.go
  • apps/testapp/cmd/run.go
  • block/internal/da/client.go
  • block/internal/da/interface.go
  • block/internal/da/tracing.go
  • block/internal/da/tracing_test.go
  • block/internal/syncing/da_follower.go
  • block/internal/syncing/da_retriever.go
  • block/internal/syncing/da_retriever_mock.go
  • block/internal/syncing/da_retriever_tracing.go
  • block/internal/syncing/da_retriever_tracing_test.go
  • block/internal/syncing/syncer.go
  • block/internal/syncing/syncer_backoff_test.go
  • block/internal/syncing/syncer_benchmark_test.go
  • block/internal/syncing/syncer_test.go
  • pkg/cmd/run_node.go
  • pkg/da/jsonrpc/client.go
  • pkg/da/types/types.go
  • test/e2e/evm_force_inclusion_e2e_test.go
  • test/e2e/evm_test_common.go
  • test/mocks/da.go
  • test/testda/dummy.go
  • tools/local-da/local.go
  • tools/local-da/rpc.go

Comment on lines +63 to 66
blobClient, err := blobrpc.NewWSClient(context.Background(), nodeConfig.DA.Address, nodeConfig.DA.AuthToken, "")
if err != nil {
return fmt.Errorf("failed to create blob client: %w", err)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Use command-scoped context for DA WS dialing.

Line 63 uses context.Background(), which disconnects the WS dial from CLI cancellation/shutdown flow. Pass cmd.Context() (or a derived context) instead.

Suggested fix
-		blobClient, err := blobrpc.NewWSClient(context.Background(), nodeConfig.DA.Address, nodeConfig.DA.AuthToken, "")
+		blobClient, err := blobrpc.NewWSClient(cmd.Context(), nodeConfig.DA.Address, nodeConfig.DA.AuthToken, "")

As per coding guidelines, "Use context.Context for cancellation".

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
blobClient, err := blobrpc.NewWSClient(context.Background(), nodeConfig.DA.Address, nodeConfig.DA.AuthToken, "")
if err != nil {
return fmt.Errorf("failed to create blob client: %w", err)
}
blobClient, err := blobrpc.NewWSClient(cmd.Context(), nodeConfig.DA.Address, nodeConfig.DA.AuthToken, "")
if err != nil {
return fmt.Errorf("failed to create blob client: %w", err)
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/evm/cmd/run.go` around lines 63 - 66, The WS dial is using
context.Background() when calling blobrpc.NewWSClient which prevents CLI
cancellation from propagating; replace the background context with the
command-scoped context (use cmd.Context() or a derived context) when calling
blobrpc.NewWSClient so that the DA WebSocket handshake is canceled on CLI
shutdown and respects timeouts; update the call site where
blobrpc.NewWSClient(context.Background(), nodeConfig.DA.Address,
nodeConfig.DA.AuthToken, "") is invoked to pass cmd.Context() (or ctx :=
cmd.Context() / ctx, cancel := context.WithTimeout(cmd.Context(), ...) if you
need a timeout) instead.

Comment on lines +265 to +267
f.localDAHeight.Store(ev.Height)
f.logger.Warn().Err(err).Uint64("da_height", ev.Height).
Msg("failed to pipe inline event, catchup will retry")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Use CAS-based rollback to avoid regressing localDAHeight

At Line 265 and Line 277, unconditional rollback via Store(ev.Height) can move localDAHeight backward if runCatchup has already advanced it, which can cause duplicate/out-of-order processing.

🐛 Proposed fix
-				f.localDAHeight.Store(ev.Height)
+				if !f.localDAHeight.CompareAndSwap(ev.Height+1, ev.Height) {
+					f.logger.Debug().
+						Uint64("da_height", ev.Height).
+						Uint64("local_da_height", f.localDAHeight.Load()).
+						Msg("skip rollback: local DA height already advanced")
+				}
@@
-			f.localDAHeight.Store(ev.Height)
+			if !f.localDAHeight.CompareAndSwap(ev.Height+1, ev.Height) {
+				f.logger.Debug().
+					Uint64("da_height", ev.Height).
+					Uint64("local_da_height", f.localDAHeight.Load()).
+					Msg("skip rollback: local DA height already advanced")
+			}

Also applies to: 276-278

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/syncing/da_follower.go` around lines 265 - 267, The
unconditional f.localDAHeight.Store(ev.Height) can regress localDAHeight if
runCatchup advanced it concurrently; change the rollback to a CAS so we only set
it back when the current value equals the value we expect to overwrite. Replace
the Store(ev.Height) rollback with a compare-and-swap using the atomic
type/method (e.g., f.localDAHeight.CompareAndSwap(expectedPrev, ev.Height) or
atomic.CompareAndSwapUint64) where expectedPrev is the value read before
attempting the rollback; apply the same CAS pattern in both places noted around
the runCatchup logic to avoid moving localDAHeight backwards.

Comment on lines +271 to +273
if len(events) != 0 {
f.headReached.Store(true)
f.logger.Debug().Uint64("da_height", ev.Height).Int("events", len(events)).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Only mark headReached when actually caught up

Line 272 sets headReached to true whenever inline processing yields events. If highestSeenDAHeight is still ahead, this reports a false “caught up” state.

✅ Proposed guard
-		if len(events) != 0 {
-			f.headReached.Store(true)
+		if len(events) != 0 {
+			if f.localDAHeight.Load() > f.highestSeenDAHeight.Load() {
+				f.headReached.Store(true)
+			}
 			f.logger.Debug().Uint64("da_height", ev.Height).Int("events", len(events)).
 				Msg("processed subscription blobs inline (fast path)")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if len(events) != 0 {
f.headReached.Store(true)
f.logger.Debug().Uint64("da_height", ev.Height).Int("events", len(events)).
if len(events) != 0 {
if f.localDAHeight.Load() > f.highestSeenDAHeight.Load() {
f.headReached.Store(true)
}
f.logger.Debug().Uint64("da_height", ev.Height).Int("events", len(events)).
Msg("processed subscription blobs inline (fast path)")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/syncing/da_follower.go` around lines 271 - 273, The code sets
f.headReached.Store(true) whenever inline processing yields events, which can be
incorrect if highestSeenDAHeight is still ahead; modify the condition around the
len(events) branch in da_follower.go (the block using ev.Height and
f.headReached) to only set f.headReached when the event height indicates we are
actually caught up (e.g., ev.Height >= f.highestSeenDAHeight or when
highestSeenDAHeight is unset/zero), otherwise do not flip headReached; update
the conditional that logs with f.logger.Debug() so the headReached.Store(true)
call is guarded by this explicit comparison to highestSeenDAHeight.

Comment on lines +46 to +61
ctx := b.Context()
go fixt.s.processLoop(ctx)

// Create a DAFollower to drive DA retrieval.
follower := NewDAFollower(DAFollowerConfig{
Retriever: fixt.s.daRetriever,
Logger: zerolog.Nop(),
PipeEvent: fixt.s.pipeEvent,
Namespace: []byte("ns"),
StartDAHeight: fixt.s.daRetrieverHeight.Load(),
DABlockTime: 0,
}).(*daFollower)
follower.highestSeenDAHeight.Store(spec.heights + daHeightOffset)
go follower.runCatchup(ctx)

fixt.s.startSyncWorkers(ctx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Use the fixture’s cancelable context for worker lifecycles.

Line 46 moves worker/follower goroutines onto b.Context(), but teardown still calls fixt.s.cancel() (which cancels fixt.s.ctx). That mismatch can leave workers running and block on fixt.s.wg.Wait().

🛠️ Proposed fix
-				ctx := b.Context()
-				go fixt.s.processLoop(ctx)
+				runCtx := fixt.s.ctx
+				queryCtx := b.Context()
+				go fixt.s.processLoop(runCtx)
@@
-				go follower.runCatchup(ctx)
+				go follower.runCatchup(runCtx)
@@
-				fixt.s.startSyncWorkers(ctx)
+				fixt.s.startSyncWorkers(runCtx)
@@
-					processedHeight, _ := fixt.s.store.Height(ctx)
+					processedHeight, _ := fixt.s.store.Height(queryCtx)
@@
-				gotStoreHeight, err := fixt.s.store.Height(ctx)
+				gotStoreHeight, err := fixt.s.store.Height(queryCtx)
As per coding guidelines: "Be mindful of goroutine leaks".

Also applies to: 64-69, 78-79

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/syncing/syncer_benchmark_test.go` around lines 46 - 61, The
test starts goroutines with b.Context() which doesn't get cancelled by the
fixture teardown (fixt.s.cancel()), causing potential goroutine leaks; change
all uses of b.Context() when launching long-lived worker goroutines to use the
fixture's cancelable context (fixt.s.ctx) instead — specifically pass fixt.s.ctx
into fixt.s.processLoop, follower.runCatchup (and the NewDAFollower call site
that spawns it), and fixt.s.startSyncWorkers; apply the same replacement for the
other occurrences mentioned (lines near 64-69 and 78-79) so teardown via
fixt.s.cancel() and fixt.s.wg.Wait() correctly shuts down workers.

Comment on lines 198 to +213
s.wg.Go(func() { s.processLoop(ctx) })

// Start dedicated workers for DA, and pending processing
// Start the DA follower (subscribe + catchup) and other workers
s.daFollower = NewDAFollower(DAFollowerConfig{
Client: s.daClient,
Retriever: s.daRetriever,
Logger: s.logger,
PipeEvent: s.pipeEvent,
Namespace: s.daClient.GetHeaderNamespace(),
DataNamespace: s.daClient.GetDataNamespace(),
StartDAHeight: s.daRetrieverHeight.Load(),
DABlockTime: s.config.DA.BlockTime.Duration,
})
if err := s.daFollower.Start(ctx); err != nil {
return fmt.Errorf("failed to start DA follower: %w", err)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Clean up started goroutines when DA follower startup fails.

Line 198 starts processLoop before follower startup. If s.daFollower.Start(ctx) fails on Line 211, Start() returns without canceling/waiting, leaving goroutines running after a failed startup.

🛠️ Proposed fix
 	// Start main processing loop
 	s.wg.Go(func() { s.processLoop(ctx) })
@@
 	if err := s.daFollower.Start(ctx); err != nil {
+		s.cancel()
+		s.wg.Wait()
 		return fmt.Errorf("failed to start DA follower: %w", err)
 	}
As per coding guidelines: "Be mindful of goroutine leaks".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/syncing/syncer.go` around lines 198 - 213, processLoop is
started with s.wg.Go before starting s.daFollower, so if s.daFollower.Start(ctx)
fails the goroutine can leak; wrap the work with a cancellable child context
(e.g., childCtx, cancel := context.WithCancel(ctx)) and pass childCtx to
s.processLoop and to daFollower.Start, then on Start failure call cancel() and
wait for in-flight goroutines to finish (s.wg.Wait or equivalent) before
returning the error; update usages of s.wg.Go(func() { s.processLoop(ctx) }) and
s.daFollower.Start(ctx) to use childCtx and ensure cancel + wait are performed
on the error path.

Comment on lines +66 to +77
go func() {
<-ctx.Done()
d.mu.Lock()
defer d.mu.Unlock()
for i, s := range d.subscribers {
if s == sub {
d.subscribers = append(d.subscribers[:i], d.subscribers[i+1:]...)
break
}
}
close(ch)
}()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Prevent double-close panic on subscriber channels.

Line 76 always closes ch on ctx.Done(). If Reset() already closed that channel (Line 337), this goroutine will panic with close of closed channel.

🛠️ Proposed fix
 	go func() {
 		<-ctx.Done()
 		d.mu.Lock()
 		defer d.mu.Unlock()
 		for i, s := range d.subscribers {
 			if s == sub {
 				d.subscribers = append(d.subscribers[:i], d.subscribers[i+1:]...)
-				break
+				close(ch)
+				return
 			}
 		}
-		close(ch)
 	}()
As per coding guidelines: "Be careful with concurrent access to shared state".

Also applies to: 335-339

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@test/testda/dummy.go` around lines 66 - 77, The goroutine unconditionally
closes ch on ctx.Done(), causing a double-close if Reset() already closed it;
modify the removal logic in the ctx.Done() goroutine (which locks d.mu and
iterates d.subscribers comparing s to sub) to only close ch when this goroutine
actually removed the subscriber (e.g., set a removed bool when s==sub, perform
d.subscribers = append(...), break, and then close ch only if removed is true),
so Reset() and this goroutine cannot both close the same channel; reference
d.mu, d.subscribers, sub, ch and Reset() when making the change.

Comment on lines +230 to 231
d.notifySubscribers(d.height)
d.logger.Info().Uint64("newHeight", d.height).Int("count", len(ids)).Msg("SubmitWithOptions successful")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Subscription events currently lose blob payloads.

notifySubscribers publishes from d.blobData[height], but submit paths write only to d.data and never populate d.blobData. Result: subscribers can receive height events with empty blob lists even when blobs were submitted.

Also applies to: 261-262, 399-413

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tools/local-da/local.go` around lines 230 - 231, The subscription events lose
blob payloads because notifySubscribers reads from d.blobData[height] while
SubmitWithOptions and other submit paths only write to d.data and never populate
d.blobData; update the submit code paths (e.g., SubmitWithOptions and the other
submit handlers around the noted regions) to also populate d.blobData[height]
with the corresponding blob list when they set d.data[height] (or move
notifySubscribers to read from d.data if that is the intended single source);
ensure you update the same symbol names (d.data, d.blobData) so
notifySubscribers(d.height) will see the blobs and subscribers receive non-empty
blob lists.

@julienrbrt julienrbrt self-requested a review March 4, 2026 17:50
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we able to use that in the async block retriever?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants