feat: Event-Driven DA Follower with WebSocket Subscriptions#3131
feat: Event-Driven DA Follower with WebSocket Subscriptions#3131
Conversation
…ollower and introduce DA client subscription.
…w mode When the DA subscription delivers blobs at the current local DA height, the followLoop now processes them inline via ProcessBlobs — avoiding a round-trip re-fetch from the DA layer. Architecture: - followLoop: processes subscription blobs inline when caught up (fast path), falls through to catchupLoop when behind (slow path). - catchupLoop: unchanged — sequential RetrieveFromDA() for bulk sync. Changes: - Add Blobs field to SubscriptionEvent for carrying raw blob data - Add extractBlobData() to DA client Subscribe adapter - Export ProcessBlobs on DARetriever interface - Add handleSubscriptionEvent() to DAFollower with inline fast path - Add TestDAFollower_InlineProcessing with 3 sub-tests
When header and data use different DA namespaces, the DAFollower now subscribes to both and merges events via a fan-in goroutine. This ensures inline blob processing works correctly for split-namespace configurations. Changes: - Add DataNamespace to DAFollowerConfig and daFollower - Subscribe to both namespaces in runSubscription with mergeSubscriptions fan-in - Guard handleSubscriptionEvent to only advance localDAHeight when ProcessBlobs returns at least one complete event (header+data matched) - Pass DataNamespace from syncer.go - Implement Subscribe on DummyDA test helper with subscriber notification
|
The latest Buf updates on your PR. Results from workflow CI / buf-check (pull_request).
|
📝 WalkthroughWalkthroughThis PR introduces WebSocket-based DA subscription with a new DAFollower component that manages blob synchronization through concurrent follow and catch-up loops. Blob client constructors are updated to use NewWSClient across multiple applications. Subscribe methods are added to DA interfaces and implementations, alongside ProcessBlobs for inline blob processing. The Syncer is refactored to delegate DA synchronization to the DAFollower instead of polling. Changes
Sequence DiagramsequenceDiagram
participant Syncer
participant DAFollower
participant FollowLoop as followLoop
participant CatchupLoop as catchupLoop
participant DA as DA Client
participant Retriever
participant Pipeline as Event Pipeline
Syncer->>DAFollower: Start(ctx)
activate DAFollower
DAFollower->>FollowLoop: spawn concurrent loop
DAFollower->>CatchupLoop: spawn concurrent loop
activate FollowLoop
activate CatchupLoop
FollowLoop->>DA: Subscribe(ctx, namespace)
activate DA
DA-->>FollowLoop: event channel
deactivate DA
loop Subscription Events
FollowLoop->>FollowLoop: handleSubscriptionEvent(event)
FollowLoop->>FollowLoop: updateHighest(highestSeenDAHeight)
alt Inline Processing (fast-path)
FollowLoop->>Retriever: ProcessBlobs(ctx, blobs, height)
Retriever-->>FollowLoop: DAHeightEvent[]
FollowLoop->>Pipeline: pipe event
else Backlog Processing
FollowLoop->>CatchupLoop: signal catchup needed
end
end
loop Catchup Sequence
CatchupLoop->>CatchupLoop: waitOnCatchupSignal()
CatchupLoop->>CatchupLoop: runCatchup(ctx)
loop Heights: localDAHeight → highestSeenDAHeight
CatchupLoop->>Retriever: RetrieveFromDA(ctx, height)
Retriever-->>CatchupLoop: blobs/error
CatchupLoop->>Retriever: ProcessBlobs(ctx, blobs, height)
Retriever-->>CatchupLoop: DAHeightEvent[]
CatchupLoop->>Pipeline: pipe event
end
end
CatchupLoop->>CatchupLoop: hasReachedHead = true
deactivate FollowLoop
deactivate CatchupLoop
deactivate DAFollower
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 inconclusive)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
Claude finished @alpe's task in 2m 56s —— View job Code Review: Adding Sync Mode
|
| Benchmark | Ratio |
|---|---|
Batch.Put |
1.93× slower |
Batch.SaveBlockData |
1.50× slower |
Store.Height |
2.02× slower |
These regressions are significant. Store.Height being 2× slower warrants investigation — the p2pWorkerLoop and pendingWorkerLoop both call store.Height frequently (10ms and 50ms ticks respectively), and the event-driven subscription path may now produce blocks faster than before, increasing contention on the store.
Minor / Style
9. SplitID now returns an error in pkg/da/types/types.go but not in pkg/da/jsonrpc
File: pkg/da/types/types.go:78
The new datypes.SplitID signature is (uint64, []byte, error), but blobrpc.SplitID (used in block/internal/da/client.go:434) still returns (uint64, []byte). This is two different functions — confusing but not a bug. Consider whether they should be unified.
10. Coverage drop
Codecov reports 28.4% patch coverage, with da_follower.go at 38.34% (112 missing lines) and the new da/client.go at 0%. The da_follower.go catchup/follow loop logic is the most critical new code and has the most complex error paths — it deserves unit tests for:
- The reconnection / watchdog firing
- The priority-height drain in
runCatchup - Inline fast-path partial-failure rollback
Positive Highlights
- Clean separation of concerns:
DAFollowerneatly encapsulates subscription + catchup, removing that responsibility fromSyncer. - Lock-free design: Using
atomic.Uint64+ CAS forlocalDAHeight/highestSeenDAHeightavoids contention betweenfollowLoopandcatchupLoop. - Watchdog pattern: Re-using the watchdog pattern from prior DA subscription hardening is good defensive coding.
- Split namespace support:
mergeSubscriptionscorrectly fan-ins two channels when header and data namespaces differ. testda/dummy.go: Well-structured test DA with subscription support, height ticker, and reset capability.
If no subscription events arrive within 3× the DA block time (default 30s), the watchdog triggers and returns an error. The followLoop then reconnects the subscription with the standard backoff. This prevents the node from silently stopping sync when the DA subscription stalls (e.g., network partition, DA node freeze).
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #3131 +/- ##
==========================================
- Coverage 60.81% 59.96% -0.86%
==========================================
Files 113 114 +1
Lines 11692 11891 +199
==========================================
+ Hits 7111 7130 +19
- Misses 3777 3953 +176
- Partials 804 808 +4
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
…ient to use WebSockets, along with E2E test updates for new `evnode` flags and P2P address retrieval.
There was a problem hiding this comment.
⚠️ Performance Alert ⚠️
Possible performance regression was detected for benchmark 'Spamoor Trace Benchmarks'.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 1.50.
| Benchmark suite | Current: dd7e0cd | Previous: 2c75e9e | Ratio |
|---|---|---|---|
SpamoorSmoke - Batch.SetHeight (avg) |
15.020408163265307 us |
8.824561403508772 us |
1.70 |
SpamoorSmoke - Store.GetBlockData (avg) |
13.59433962264151 us |
7.780701754385965 us |
1.75 |
This comment was automatically generated by workflow using github-action-benchmark.
There was a problem hiding this comment.
Actionable comments posted: 7
🧹 Nitpick comments (4)
block/internal/da/interface.go (1)
20-23: Clarify the post-cancellation drain contract in the comment.“Callers MUST drain the channel after cancellation” can be interpreted differently; consider explicitly stating the expected sequence (cancel context, keep receiving until channel closes).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@block/internal/da/interface.go` around lines 20 - 23, Update the comment for Subscribe to explicitly describe the post-cancellation drain sequence: state that callers should cancel the provided ctx, then continue receiving from the returned <-chan datypes.SubscriptionEvent (e.g., range over the channel) until it is closed by the implementation; clarify that the channel will be closed once cancellation is observed and that draining (receiving until close) prevents goroutine/resource leaks. Reference the Subscribe(ctx context.Context, namespace []byte) signature and datypes.SubscriptionEvent in the comment.block/internal/syncing/da_retriever_tracing.go (1)
67-69: Consider instrumentingProcessBlobsin the tracing wrapper.This new hot path currently emits no span/attributes, which limits visibility into inline blob processing latency and output volume.
Suggested refactor
func (t *tracedDARetriever) ProcessBlobs(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent { - return t.inner.ProcessBlobs(ctx, blobs, daHeight) + ctx, span := t.tracer.Start(ctx, "DARetriever.ProcessBlobs", + trace.WithAttributes( + attribute.Int64("da.height", int64(daHeight)), + attribute.Int("blob.count", len(blobs)), + ), + ) + defer span.End() + + events := t.inner.ProcessBlobs(ctx, blobs, daHeight) + span.SetAttributes(attribute.Int("event.count", len(events))) + return events }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@block/internal/syncing/da_retriever_tracing.go` around lines 67 - 69, The tracing wrapper tracedDARetriever.ProcessBlobs is not creating any span or attributes, so add instrumentation around the call to t.inner.ProcessBlobs: start a span (using the same tracer used elsewhere in this file), set attributes for blob_count (len(blobs)), da_height (daHeight) and optionally total_blob_bytes (sum of len for each blob), call t.inner.ProcessBlobs(ctx, blobs, daHeight), record the resulting events count as an attribute (e.g., da_events_count) and any error/status if applicable, then end the span; keep the wrapper behavior identical except for adding the span and attributes to improve visibility into latency and output volume.apps/evm/server/force_inclusion_test.go (1)
53-58: Prefer a mockery-generated DA mock over extending the hand-written stub.This local
Subscribestub works, but it increases interface-drift maintenance as DA APIs evolve. Using the generated DA mock keeps contract changes centralized.As per coding guidelines, "Mock external dependencies using mockery".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/evm/server/force_inclusion_test.go` around lines 53 - 58, Replace the hand-written Subscribe implementation on mockDA with the mockery-generated DA mock: remove or stop using the local mockDA.Subscribe stub and instead import and instantiate the mockery-created mock (e.g., MockDA) and use its EXPECT/On setup to return a closed (<-chan da.SubscriptionEvent) or desired channel for the test; update test setup where mockDA is constructed to use the mockery mock and set the Subscribe return behavior via the mock's EXPECT/On methods so the external DA contract is maintained centrally.block/internal/da/tracing.go (1)
148-150: Consider tracing subscription setup for consistency.
Subscribecurrently bypasses the tracing pattern used by the rest of this wrapper, so subscribe failures won’t be visible in DA spans.Suggested refactor
func (t *tracedClient) Subscribe(ctx context.Context, namespace []byte) (<-chan datypes.SubscriptionEvent, error) { - return t.inner.Subscribe(ctx, namespace) + ctx, span := t.tracer.Start(ctx, "DA.Subscribe", + trace.WithAttributes( + attribute.Int("ns.length", len(namespace)), + attribute.String("da.namespace", hex.EncodeToString(namespace)), + ), + ) + defer span.End() + + ch, err := t.inner.Subscribe(ctx, namespace) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return nil, err + } + return ch, nil }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@block/internal/da/tracing.go` around lines 148 - 150, tracedClient.Subscribe bypasses the wrapper's tracing; update the method to create a DA span (matching the tracing pattern used elsewhere in this file), start the span with the incoming ctx, add relevant attributes (e.g., namespace), call t.inner.Subscribe(ctx, namespace), record any returned error on the span, and end the span before returning the channel and error so subscription setup failures are captured in DA spans; reference the tracedClient.Subscribe method and t.inner.Subscribe call when making this change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@apps/evm/cmd/run.go`:
- Around line 63-66: The WS dial is using context.Background() when calling
blobrpc.NewWSClient which prevents CLI cancellation from propagating; replace
the background context with the command-scoped context (use cmd.Context() or a
derived context) when calling blobrpc.NewWSClient so that the DA WebSocket
handshake is canceled on CLI shutdown and respects timeouts; update the call
site where blobrpc.NewWSClient(context.Background(), nodeConfig.DA.Address,
nodeConfig.DA.AuthToken, "") is invoked to pass cmd.Context() (or ctx :=
cmd.Context() / ctx, cancel := context.WithTimeout(cmd.Context(), ...) if you
need a timeout) instead.
In `@block/internal/syncing/da_follower.go`:
- Around line 265-267: The unconditional f.localDAHeight.Store(ev.Height) can
regress localDAHeight if runCatchup advanced it concurrently; change the
rollback to a CAS so we only set it back when the current value equals the value
we expect to overwrite. Replace the Store(ev.Height) rollback with a
compare-and-swap using the atomic type/method (e.g.,
f.localDAHeight.CompareAndSwap(expectedPrev, ev.Height) or
atomic.CompareAndSwapUint64) where expectedPrev is the value read before
attempting the rollback; apply the same CAS pattern in both places noted around
the runCatchup logic to avoid moving localDAHeight backwards.
- Around line 271-273: The code sets f.headReached.Store(true) whenever inline
processing yields events, which can be incorrect if highestSeenDAHeight is still
ahead; modify the condition around the len(events) branch in da_follower.go (the
block using ev.Height and f.headReached) to only set f.headReached when the
event height indicates we are actually caught up (e.g., ev.Height >=
f.highestSeenDAHeight or when highestSeenDAHeight is unset/zero), otherwise do
not flip headReached; update the conditional that logs with f.logger.Debug() so
the headReached.Store(true) call is guarded by this explicit comparison to
highestSeenDAHeight.
In `@block/internal/syncing/syncer_benchmark_test.go`:
- Around line 46-61: The test starts goroutines with b.Context() which doesn't
get cancelled by the fixture teardown (fixt.s.cancel()), causing potential
goroutine leaks; change all uses of b.Context() when launching long-lived worker
goroutines to use the fixture's cancelable context (fixt.s.ctx) instead —
specifically pass fixt.s.ctx into fixt.s.processLoop, follower.runCatchup (and
the NewDAFollower call site that spawns it), and fixt.s.startSyncWorkers; apply
the same replacement for the other occurrences mentioned (lines near 64-69 and
78-79) so teardown via fixt.s.cancel() and fixt.s.wg.Wait() correctly shuts down
workers.
In `@block/internal/syncing/syncer.go`:
- Around line 198-213: processLoop is started with s.wg.Go before starting
s.daFollower, so if s.daFollower.Start(ctx) fails the goroutine can leak; wrap
the work with a cancellable child context (e.g., childCtx, cancel :=
context.WithCancel(ctx)) and pass childCtx to s.processLoop and to
daFollower.Start, then on Start failure call cancel() and wait for in-flight
goroutines to finish (s.wg.Wait or equivalent) before returning the error;
update usages of s.wg.Go(func() { s.processLoop(ctx) }) and
s.daFollower.Start(ctx) to use childCtx and ensure cancel + wait are performed
on the error path.
In `@test/testda/dummy.go`:
- Around line 66-77: The goroutine unconditionally closes ch on ctx.Done(),
causing a double-close if Reset() already closed it; modify the removal logic in
the ctx.Done() goroutine (which locks d.mu and iterates d.subscribers comparing
s to sub) to only close ch when this goroutine actually removed the subscriber
(e.g., set a removed bool when s==sub, perform d.subscribers = append(...),
break, and then close ch only if removed is true), so Reset() and this goroutine
cannot both close the same channel; reference d.mu, d.subscribers, sub, ch and
Reset() when making the change.
In `@tools/local-da/local.go`:
- Around line 230-231: The subscription events lose blob payloads because
notifySubscribers reads from d.blobData[height] while SubmitWithOptions and
other submit paths only write to d.data and never populate d.blobData; update
the submit code paths (e.g., SubmitWithOptions and the other submit handlers
around the noted regions) to also populate d.blobData[height] with the
corresponding blob list when they set d.data[height] (or move notifySubscribers
to read from d.data if that is the intended single source); ensure you update
the same symbol names (d.data, d.blobData) so notifySubscribers(d.height) will
see the blobs and subscribers receive non-empty blob lists.
---
Nitpick comments:
In `@apps/evm/server/force_inclusion_test.go`:
- Around line 53-58: Replace the hand-written Subscribe implementation on mockDA
with the mockery-generated DA mock: remove or stop using the local
mockDA.Subscribe stub and instead import and instantiate the mockery-created
mock (e.g., MockDA) and use its EXPECT/On setup to return a closed (<-chan
da.SubscriptionEvent) or desired channel for the test; update test setup where
mockDA is constructed to use the mockery mock and set the Subscribe return
behavior via the mock's EXPECT/On methods so the external DA contract is
maintained centrally.
In `@block/internal/da/interface.go`:
- Around line 20-23: Update the comment for Subscribe to explicitly describe the
post-cancellation drain sequence: state that callers should cancel the provided
ctx, then continue receiving from the returned <-chan datypes.SubscriptionEvent
(e.g., range over the channel) until it is closed by the implementation; clarify
that the channel will be closed once cancellation is observed and that draining
(receiving until close) prevents goroutine/resource leaks. Reference the
Subscribe(ctx context.Context, namespace []byte) signature and
datypes.SubscriptionEvent in the comment.
In `@block/internal/da/tracing.go`:
- Around line 148-150: tracedClient.Subscribe bypasses the wrapper's tracing;
update the method to create a DA span (matching the tracing pattern used
elsewhere in this file), start the span with the incoming ctx, add relevant
attributes (e.g., namespace), call t.inner.Subscribe(ctx, namespace), record any
returned error on the span, and end the span before returning the channel and
error so subscription setup failures are captured in DA spans; reference the
tracedClient.Subscribe method and t.inner.Subscribe call when making this
change.
In `@block/internal/syncing/da_retriever_tracing.go`:
- Around line 67-69: The tracing wrapper tracedDARetriever.ProcessBlobs is not
creating any span or attributes, so add instrumentation around the call to
t.inner.ProcessBlobs: start a span (using the same tracer used elsewhere in this
file), set attributes for blob_count (len(blobs)), da_height (daHeight) and
optionally total_blob_bytes (sum of len for each blob), call
t.inner.ProcessBlobs(ctx, blobs, daHeight), record the resulting events count as
an attribute (e.g., da_events_count) and any error/status if applicable, then
end the span; keep the wrapper behavior identical except for adding the span and
attributes to improve visibility into latency and output volume.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 24171a7e-5193-4ec4-9f32-030b0acc2d20
📒 Files selected for processing (26)
apps/evm/cmd/run.goapps/evm/server/force_inclusion_test.goapps/grpc/cmd/run.goapps/testapp/cmd/run.goblock/internal/da/client.goblock/internal/da/interface.goblock/internal/da/tracing.goblock/internal/da/tracing_test.goblock/internal/syncing/da_follower.goblock/internal/syncing/da_retriever.goblock/internal/syncing/da_retriever_mock.goblock/internal/syncing/da_retriever_tracing.goblock/internal/syncing/da_retriever_tracing_test.goblock/internal/syncing/syncer.goblock/internal/syncing/syncer_backoff_test.goblock/internal/syncing/syncer_benchmark_test.goblock/internal/syncing/syncer_test.gopkg/cmd/run_node.gopkg/da/jsonrpc/client.gopkg/da/types/types.gotest/e2e/evm_force_inclusion_e2e_test.gotest/e2e/evm_test_common.gotest/mocks/da.gotest/testda/dummy.gotools/local-da/local.gotools/local-da/rpc.go
| blobClient, err := blobrpc.NewWSClient(context.Background(), nodeConfig.DA.Address, nodeConfig.DA.AuthToken, "") | ||
| if err != nil { | ||
| return fmt.Errorf("failed to create blob client: %w", err) | ||
| } |
There was a problem hiding this comment.
Use command-scoped context for DA WS dialing.
Line 63 uses context.Background(), which disconnects the WS dial from CLI cancellation/shutdown flow. Pass cmd.Context() (or a derived context) instead.
Suggested fix
- blobClient, err := blobrpc.NewWSClient(context.Background(), nodeConfig.DA.Address, nodeConfig.DA.AuthToken, "")
+ blobClient, err := blobrpc.NewWSClient(cmd.Context(), nodeConfig.DA.Address, nodeConfig.DA.AuthToken, "")As per coding guidelines, "Use context.Context for cancellation".
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| blobClient, err := blobrpc.NewWSClient(context.Background(), nodeConfig.DA.Address, nodeConfig.DA.AuthToken, "") | |
| if err != nil { | |
| return fmt.Errorf("failed to create blob client: %w", err) | |
| } | |
| blobClient, err := blobrpc.NewWSClient(cmd.Context(), nodeConfig.DA.Address, nodeConfig.DA.AuthToken, "") | |
| if err != nil { | |
| return fmt.Errorf("failed to create blob client: %w", err) | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/evm/cmd/run.go` around lines 63 - 66, The WS dial is using
context.Background() when calling blobrpc.NewWSClient which prevents CLI
cancellation from propagating; replace the background context with the
command-scoped context (use cmd.Context() or a derived context) when calling
blobrpc.NewWSClient so that the DA WebSocket handshake is canceled on CLI
shutdown and respects timeouts; update the call site where
blobrpc.NewWSClient(context.Background(), nodeConfig.DA.Address,
nodeConfig.DA.AuthToken, "") is invoked to pass cmd.Context() (or ctx :=
cmd.Context() / ctx, cancel := context.WithTimeout(cmd.Context(), ...) if you
need a timeout) instead.
| f.localDAHeight.Store(ev.Height) | ||
| f.logger.Warn().Err(err).Uint64("da_height", ev.Height). | ||
| Msg("failed to pipe inline event, catchup will retry") |
There was a problem hiding this comment.
Use CAS-based rollback to avoid regressing localDAHeight
At Line 265 and Line 277, unconditional rollback via Store(ev.Height) can move localDAHeight backward if runCatchup has already advanced it, which can cause duplicate/out-of-order processing.
🐛 Proposed fix
- f.localDAHeight.Store(ev.Height)
+ if !f.localDAHeight.CompareAndSwap(ev.Height+1, ev.Height) {
+ f.logger.Debug().
+ Uint64("da_height", ev.Height).
+ Uint64("local_da_height", f.localDAHeight.Load()).
+ Msg("skip rollback: local DA height already advanced")
+ }
@@
- f.localDAHeight.Store(ev.Height)
+ if !f.localDAHeight.CompareAndSwap(ev.Height+1, ev.Height) {
+ f.logger.Debug().
+ Uint64("da_height", ev.Height).
+ Uint64("local_da_height", f.localDAHeight.Load()).
+ Msg("skip rollback: local DA height already advanced")
+ }Also applies to: 276-278
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@block/internal/syncing/da_follower.go` around lines 265 - 267, The
unconditional f.localDAHeight.Store(ev.Height) can regress localDAHeight if
runCatchup advanced it concurrently; change the rollback to a CAS so we only set
it back when the current value equals the value we expect to overwrite. Replace
the Store(ev.Height) rollback with a compare-and-swap using the atomic
type/method (e.g., f.localDAHeight.CompareAndSwap(expectedPrev, ev.Height) or
atomic.CompareAndSwapUint64) where expectedPrev is the value read before
attempting the rollback; apply the same CAS pattern in both places noted around
the runCatchup logic to avoid moving localDAHeight backwards.
| if len(events) != 0 { | ||
| f.headReached.Store(true) | ||
| f.logger.Debug().Uint64("da_height", ev.Height).Int("events", len(events)). |
There was a problem hiding this comment.
Only mark headReached when actually caught up
Line 272 sets headReached to true whenever inline processing yields events. If highestSeenDAHeight is still ahead, this reports a false “caught up” state.
✅ Proposed guard
- if len(events) != 0 {
- f.headReached.Store(true)
+ if len(events) != 0 {
+ if f.localDAHeight.Load() > f.highestSeenDAHeight.Load() {
+ f.headReached.Store(true)
+ }
f.logger.Debug().Uint64("da_height", ev.Height).Int("events", len(events)).
Msg("processed subscription blobs inline (fast path)")📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if len(events) != 0 { | |
| f.headReached.Store(true) | |
| f.logger.Debug().Uint64("da_height", ev.Height).Int("events", len(events)). | |
| if len(events) != 0 { | |
| if f.localDAHeight.Load() > f.highestSeenDAHeight.Load() { | |
| f.headReached.Store(true) | |
| } | |
| f.logger.Debug().Uint64("da_height", ev.Height).Int("events", len(events)). | |
| Msg("processed subscription blobs inline (fast path)") |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@block/internal/syncing/da_follower.go` around lines 271 - 273, The code sets
f.headReached.Store(true) whenever inline processing yields events, which can be
incorrect if highestSeenDAHeight is still ahead; modify the condition around the
len(events) branch in da_follower.go (the block using ev.Height and
f.headReached) to only set f.headReached when the event height indicates we are
actually caught up (e.g., ev.Height >= f.highestSeenDAHeight or when
highestSeenDAHeight is unset/zero), otherwise do not flip headReached; update
the conditional that logs with f.logger.Debug() so the headReached.Store(true)
call is guarded by this explicit comparison to highestSeenDAHeight.
| ctx := b.Context() | ||
| go fixt.s.processLoop(ctx) | ||
|
|
||
| // Create a DAFollower to drive DA retrieval. | ||
| follower := NewDAFollower(DAFollowerConfig{ | ||
| Retriever: fixt.s.daRetriever, | ||
| Logger: zerolog.Nop(), | ||
| PipeEvent: fixt.s.pipeEvent, | ||
| Namespace: []byte("ns"), | ||
| StartDAHeight: fixt.s.daRetrieverHeight.Load(), | ||
| DABlockTime: 0, | ||
| }).(*daFollower) | ||
| follower.highestSeenDAHeight.Store(spec.heights + daHeightOffset) | ||
| go follower.runCatchup(ctx) | ||
|
|
||
| fixt.s.startSyncWorkers(ctx) |
There was a problem hiding this comment.
Use the fixture’s cancelable context for worker lifecycles.
Line 46 moves worker/follower goroutines onto b.Context(), but teardown still calls fixt.s.cancel() (which cancels fixt.s.ctx). That mismatch can leave workers running and block on fixt.s.wg.Wait().
🛠️ Proposed fix
- ctx := b.Context()
- go fixt.s.processLoop(ctx)
+ runCtx := fixt.s.ctx
+ queryCtx := b.Context()
+ go fixt.s.processLoop(runCtx)
@@
- go follower.runCatchup(ctx)
+ go follower.runCatchup(runCtx)
@@
- fixt.s.startSyncWorkers(ctx)
+ fixt.s.startSyncWorkers(runCtx)
@@
- processedHeight, _ := fixt.s.store.Height(ctx)
+ processedHeight, _ := fixt.s.store.Height(queryCtx)
@@
- gotStoreHeight, err := fixt.s.store.Height(ctx)
+ gotStoreHeight, err := fixt.s.store.Height(queryCtx)Also applies to: 64-69, 78-79
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@block/internal/syncing/syncer_benchmark_test.go` around lines 46 - 61, The
test starts goroutines with b.Context() which doesn't get cancelled by the
fixture teardown (fixt.s.cancel()), causing potential goroutine leaks; change
all uses of b.Context() when launching long-lived worker goroutines to use the
fixture's cancelable context (fixt.s.ctx) instead — specifically pass fixt.s.ctx
into fixt.s.processLoop, follower.runCatchup (and the NewDAFollower call site
that spawns it), and fixt.s.startSyncWorkers; apply the same replacement for the
other occurrences mentioned (lines near 64-69 and 78-79) so teardown via
fixt.s.cancel() and fixt.s.wg.Wait() correctly shuts down workers.
| s.wg.Go(func() { s.processLoop(ctx) }) | ||
|
|
||
| // Start dedicated workers for DA, and pending processing | ||
| // Start the DA follower (subscribe + catchup) and other workers | ||
| s.daFollower = NewDAFollower(DAFollowerConfig{ | ||
| Client: s.daClient, | ||
| Retriever: s.daRetriever, | ||
| Logger: s.logger, | ||
| PipeEvent: s.pipeEvent, | ||
| Namespace: s.daClient.GetHeaderNamespace(), | ||
| DataNamespace: s.daClient.GetDataNamespace(), | ||
| StartDAHeight: s.daRetrieverHeight.Load(), | ||
| DABlockTime: s.config.DA.BlockTime.Duration, | ||
| }) | ||
| if err := s.daFollower.Start(ctx); err != nil { | ||
| return fmt.Errorf("failed to start DA follower: %w", err) | ||
| } |
There was a problem hiding this comment.
Clean up started goroutines when DA follower startup fails.
Line 198 starts processLoop before follower startup. If s.daFollower.Start(ctx) fails on Line 211, Start() returns without canceling/waiting, leaving goroutines running after a failed startup.
🛠️ Proposed fix
// Start main processing loop
s.wg.Go(func() { s.processLoop(ctx) })
@@
if err := s.daFollower.Start(ctx); err != nil {
+ s.cancel()
+ s.wg.Wait()
return fmt.Errorf("failed to start DA follower: %w", err)
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@block/internal/syncing/syncer.go` around lines 198 - 213, processLoop is
started with s.wg.Go before starting s.daFollower, so if s.daFollower.Start(ctx)
fails the goroutine can leak; wrap the work with a cancellable child context
(e.g., childCtx, cancel := context.WithCancel(ctx)) and pass childCtx to
s.processLoop and to daFollower.Start, then on Start failure call cancel() and
wait for in-flight goroutines to finish (s.wg.Wait or equivalent) before
returning the error; update usages of s.wg.Go(func() { s.processLoop(ctx) }) and
s.daFollower.Start(ctx) to use childCtx and ensure cancel + wait are performed
on the error path.
| go func() { | ||
| <-ctx.Done() | ||
| d.mu.Lock() | ||
| defer d.mu.Unlock() | ||
| for i, s := range d.subscribers { | ||
| if s == sub { | ||
| d.subscribers = append(d.subscribers[:i], d.subscribers[i+1:]...) | ||
| break | ||
| } | ||
| } | ||
| close(ch) | ||
| }() |
There was a problem hiding this comment.
Prevent double-close panic on subscriber channels.
Line 76 always closes ch on ctx.Done(). If Reset() already closed that channel (Line 337), this goroutine will panic with close of closed channel.
🛠️ Proposed fix
go func() {
<-ctx.Done()
d.mu.Lock()
defer d.mu.Unlock()
for i, s := range d.subscribers {
if s == sub {
d.subscribers = append(d.subscribers[:i], d.subscribers[i+1:]...)
- break
+ close(ch)
+ return
}
}
- close(ch)
}()Also applies to: 335-339
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@test/testda/dummy.go` around lines 66 - 77, The goroutine unconditionally
closes ch on ctx.Done(), causing a double-close if Reset() already closed it;
modify the removal logic in the ctx.Done() goroutine (which locks d.mu and
iterates d.subscribers comparing s to sub) to only close ch when this goroutine
actually removed the subscriber (e.g., set a removed bool when s==sub, perform
d.subscribers = append(...), break, and then close ch only if removed is true),
so Reset() and this goroutine cannot both close the same channel; reference
d.mu, d.subscribers, sub, ch and Reset() when making the change.
| d.notifySubscribers(d.height) | ||
| d.logger.Info().Uint64("newHeight", d.height).Int("count", len(ids)).Msg("SubmitWithOptions successful") |
There was a problem hiding this comment.
Subscription events currently lose blob payloads.
notifySubscribers publishes from d.blobData[height], but submit paths write only to d.data and never populate d.blobData. Result: subscribers can receive height events with empty blob lists even when blobs were submitted.
Also applies to: 261-262, 399-413
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tools/local-da/local.go` around lines 230 - 231, The subscription events lose
blob payloads because notifySubscribers reads from d.blobData[height] while
SubmitWithOptions and other submit paths only write to d.data and never populate
d.blobData; update the submit code paths (e.g., SubmitWithOptions and the other
submit handlers around the noted regions) to also populate d.blobData[height]
with the corresponding blob list when they set d.data[height] (or move
notifySubscribers to read from d.data if that is the intended single source);
ensure you update the same symbol names (d.data, d.blobData) so
notifySubscribers(d.height) will see the blobs and subscribers receive non-empty
blob lists.
There was a problem hiding this comment.
Are we able to use that in the async block retriever?

Summary
Replaces the Syncer's polling-based DA worker with an event-driven
DAFollowerthat subscribes to DA header and data events in real time. This eliminates unnecessary polling latency in follow mode and brings zero-latency block processing when the node is caught up with the DA layer.Changes
Core: Event-Driven
DAFollower(block/internal/syncing/)DAFollower, a new component that subscribes to both the header and data namespaces of the DA layer and processes them inline, achieving zero-latency follow mode.Syncerrefactored to delegate event-driven follow logic toDAFollower; the old polling worker is removed.DARetrieverinterface and its tracing/mock implementations to support the new subscription flow.DA Client: WebSocket-Based JSON-RPC (
pkg/da/,block/internal/da/)Local DA Tooling (
tools/local-da/)local.gowith subscription broadcasting andrpc.gowith the corresponding RPC endpoints.Test Infrastructure
da_retriever_mock.goand updates totest/mocks/da.goandtest/testda/dummy.goto cover subscription interfaces.syncer_backoff_test.goandsyncer_test.goupdated and significantly extended to cover the new event-driven flows.evnodeflags and P2P address retrieval patterns.Summary by CodeRabbit
Release Notes
New Features
Bug Fixes
Tests