Skip to content

topsql: add TopRU state and subscription plumbing (tipb upgrade)#66642

Open
zimulala wants to merge 6 commits intopingcap:masterfrom
zimulala:codex/topru-pr1
Open

topsql: add TopRU state and subscription plumbing (tipb upgrade)#66642
zimulala wants to merge 6 commits intopingcap:masterfrom
zimulala:codex/topru-pr1

Conversation

@zimulala
Copy link
Contributor

@zimulala zimulala commented Mar 2, 2026

What problem does this PR solve?

Issue Number: close #66625

Problem Summary:
TopRU requires shared state/interval handling and subscription transport plumbing. These primitives are missing, which blocks the later RU aggregation/reporting pipeline and makes review harder.

What changed and how does it work?

  • Added TopRU state/interval management with ref-counted enablement and default reset behavior.
  • Added subscription plumbing (datasink/pubsub/single_target/server) to gate TopRU delivery paths.
  • Added datamodel/metrics scaffolding for RU-related reporting.
  • Updated tipb and deps plus minimal BUILD wiring for new sources/tests.

Scope

  • Includes: state.go/state_test.go for TopRU enable/interval handling.
  • Includes: subscription plumbing (datasink.go, pubsub.go, single_target.go, server.go) and associated tests.
  • Includes: datamodel/metrics scaffolding (datamodel.go, metrics.go, rustats.go).
  • Includes: tipb upgrade / deps (DEPS.bzl, go.mod, go.sum) and minimal BUILD updates.
  • Excludes: execution-time collection (stmtstats/executor) and RU window aggregator/reporting main pipeline (reporter ru_*).

Behavior / Compatibility

  • Default behavior is unchanged unless TopRU is explicitly enabled via subscription.
  • TopRU enablement is ref-counted; item interval is last-write-wins while enabled and resets on last deregister.
  • Invalid item intervals are normalized in state handling; pubsub keeps raw values until state normalization.
  • tipb adds TopRU fields; older components should ignore unknown fields, and unimplemented TopRU RPCs are tolerated.

Follow-ups

  • PR2 (depends on PR1): collect RU deltas for TopRU
  • PR3 (depends on PR1/PR2): TopRU aggregation + reporting

Reviewer Notes

  • BUILD.bazel is staged minimally to include only newly added sources/tests; no ru_* targets yet.
  • Quick verification: git diff --name-only master...HEAD should match the PR1 file list only.
  • Commit range: 6a74e74-4db51ae4e4e5db24898e2f738df65bdc4f14d08b

Check List

Tests

  • Unit test
  • Integration test
  • Manual test (add detailed scripts or steps below)
  • No need to test
    • I checked and no code files have been changed.

Test Results:

  • Unit tests ran (commit f664f2e987e8cc4d469b24eb305ecd4fa7231fe2): TestPubSubDataSinkDoSendOrderIsStable

Side effects

  • Performance regression: Consumes more CPU
  • Performance regression: Consumes more Memory
  • Breaking backward compatibility

Documentation

  • Affects user behaviors
  • Contains syntax changes
  • Contains variable changes
  • Contains experimental features
  • Changes MySQL compatibility

Release note

None

Summary by CodeRabbit

  • New Features

    • Added TopRU (resource-usage) reporting alongside TopSQL: independent enable/disable, configurable item intervals, streaming delivery, and end-to-end reporting paths.
  • Metrics

    • Added RU-specific counters and histograms to observe RU record counts and reporting durations.
  • Tests

    • Extensive tests for TopRU/TopSQL parsing, subscriptions, streaming, concurrency, error handling, and lifecycle.
  • Chores

    • Updated dependency versions and build/test shard/configuration adjustments.

@ti-chi-bot ti-chi-bot bot added the release-note-none Denotes a PR that doesn't merit a release note. label Mar 2, 2026
@pantheon-ai
Copy link

pantheon-ai bot commented Mar 2, 2026

Review Complete

Findings: 5 issues
Posted: 2
Duplicates/Skipped: 3

@ti-chi-bot ti-chi-bot bot added the size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. label Mar 2, 2026
@ti-chi-bot
Copy link

ti-chi-bot bot commented Mar 2, 2026

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign cfzjywxk for approval. For more information see the Code Review Process.
Please ensure that each of them provides their approval before proceeding.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@tiprow
Copy link

tiprow bot commented Mar 2, 2026

Hi @zimulala. Thanks for your PR.

PRs from untrusted users cannot be marked as trusted with /ok-to-test in this repo meaning untrusted PR authors can never trigger tests themselves. Collaborators can still trigger tests on the PR using /test all.

I understand the commands that are listed here.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository.

@coderabbitai
Copy link

coderabbitai bot commented Mar 2, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds TopRU (resource-usage) support across TopSQL: state ref‑counting and interval validation, pub/sub and single-target reporting for RU records, RU metrics, stmtstats RU types, build/dependency bumps, BUILD/test tweaks, and extensive tests across state, subscription, reporting, and sinks.

Changes

Cohort / File(s) Summary
Deps / Module
DEPS.bzl, go.mod
Bump github.com/pingcap/tipb pseudo-version; update sha256, strip_prefix, and download URLs; update go.mod pseudo-version.
TopRU State
pkg/util/topsql/state/...
pkg/util/topsql/state/state.go, pkg/util/topsql/state/BUILD.bazel, pkg/util/topsql/state/state_test.go
Add TopRU ref-count, item-interval validation/constants, public APIs (Enable/Disable/TopRUEnabled/Set/Get/Reset interval), logging/errors, and unit tests; BUILD adds test target and deps.
Reporter registration / datasink
pkg/util/topsql/reporter/datasink.go, pkg/util/topsql/reporter/datasink_test.go, pkg/util/topsql/reporter/BUILD.bazel
Track RURecords in ReportData; per-sink flags (enableTopSQL, enableTopRU); topSQLSinkCount ref-count; idempotent register/deregister behavior; tests updated; BUILD adds gRPC deps and increases shard_count.
PubSub reporter
pkg/util/topsql/reporter/pubsub.go, pkg/util/topsql/reporter/pubsub_test.go
Subscribe now accepts request; parse TopSQL/TopRU subscriptions and item interval; pubSubDataSink stores enable flags and interval; doSend sends RU records when enabled; added RU metrics and extensive tests for parsing, gating, isolation, and errors.
Single-target reporter
pkg/util/topsql/reporter/single_target.go, pkg/util/topsql/reporter/single_target_test.go
Add streaming send for TopRU (sendBatchTopRURecord/sendTopRURecords), handle Unimplemented responses gracefully, RU metrics instrumentation, and tests for enablement and stream behavior.
Metrics & Mock Agent
pkg/util/topsql/reporter/metrics/metrics.go, pkg/util/topsql/reporter/mock/server.go
Add RU Prometheus counters/histograms and duration metrics; extend mock agent with ruRecords, ReportTopRURecords RPC, and accessors for tests.
Statement stats (stmtstats)
pkg/util/topsql/stmtstats/BUILD.bazel, pkg/util/topsql/stmtstats/rustats.go, pkg/util/topsql/topsql_test.go
Add rustats.go types (RUKey, ExecutionContext, RUIncrement, RUIncrementMap) for RU aggregation; update BUILD deps; add TopRU-related test.
Tests / BUILD tweaks
pkg/util/topsql/BUILD.bazel, pkg/util/topsql/reporter/BUILD.bazel
Adjust test shard counts and add gRPC codes/status deps to reporter BUILD; add state test target and other test scaffolding updates.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant PubSub as TopSQL PubSub
    participant DataSink as pubSubDataSink
    participant State as topsql.State
    participant Agent as Remote Agent
    participant Metrics

    Client->>PubSub: Subscribe(req with TopRU/TopSQL)
    PubSub->>DataSink: newPubSubDataSink(req, stream, registerer)
    DataSink->>State: EnableTopSQL()/EnableTopRU() (based on req)
    State-->>DataSink: ref-count updated
    DataSink->>DataSink: run() - periodic doSend loop

    loop periodic doSend
        DataSink->>State: TopSQLEnabled() / TopRUEnabled()
        alt TopSQL enabled
            DataSink->>Agent: sendTopSQLRecords()
            Agent-->>DataSink: ack
            DataSink->>Metrics: observe TopSQL metrics
        end
        alt TopRU enabled
            DataSink->>Agent: sendTopRURecords()
            Agent-->>DataSink: ack
            DataSink->>Metrics: observe RU metrics
        end
    end

    Client->>PubSub: disconnect
    DataSink->>State: DisableTopSQL()/DisableTopRU()
    State-->>DataSink: ref-count updated (may reset interval when zero)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Suggested labels

component/statistics, size/XL

Suggested reviewers

  • nolouch
  • yudongusa
  • yibin87

Poem

🐰 I counted hops for RU and SQL so bright,
enable flags and intervals tucked tight,
streams and metrics hum into the night,
tests ensure each sink behaves just right,
a rabbit twirls — telemetry takes flight.

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 68.29% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly summarizes the main change: adding TopRU state and subscription plumbing with a tipb upgrade.
Description check ✅ Passed The description comprehensively covers the problem, changes, scope, behavior, and includes the required linked issue (#66625) and test information.
Linked Issues check ✅ Passed The PR successfully implements all objectives from issue #66625: TopRU state/interval management, subscription transport plumbing, datamodel/metrics scaffolding, and tipb upgrade with backward compatibility.
Out of Scope Changes check ✅ Passed All changes are in scope with linked issue #66625; execution-time collection and RU aggregation/reporting are properly deferred to follow-up PRs as noted.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (5)
pkg/util/topsql/stmtstats/rustats.go (1)

57-61: Consider making RUIncrement.Merge tolerant of nil input.

A lightweight guard on other avoids panic paths when upstream pointer values are sparse/unset.

Suggested diff
 func (r *RUIncrement) Merge(other *RUIncrement) {
+	if other == nil {
+		return
+	}
 	r.TotalRU += other.TotalRU
 	r.ExecCount += other.ExecCount
 	r.ExecDuration += other.ExecDuration
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/util/topsql/stmtstats/rustats.go` around lines 57 - 61, Add a nil-check
at the start of the RUIncrement.Merge method so it safely returns if the
incoming pointer is nil; specifically inside func (r *RUIncrement) Merge(other
*RUIncrement) guard with if other == nil { return } before accessing
other.TotalRU, other.ExecCount, and other.ExecDuration to prevent panics when
upstream pointers are absent.
pkg/util/topsql/state/BUILD.bazel (1)

21-21: Consider investigating flaky test root cause.

Marking a test as flaky = True allows it to pass intermittently, which can mask real issues. If the flakiness is due to timing/concurrency in the TopRU state management, consider adding synchronization or deterministic test helpers rather than accepting flakiness.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/util/topsql/state/BUILD.bazel` at line 21, The test target was marked
flaky; instead inspect and fix the root cause in the TopRU state tests: remove
flaky = True, reproduce the intermittent failure locally, and add deterministic
synchronization in the TopRU/TopRUState code and its tests (e.g., use explicit
wait groups, channels, clocks, or test hooks) to eliminate race/timing
assumptions; update the test(s) (the TopRU-related test functions) to use these
deterministic helpers and re-run CI before leaving flaky = True removed.
pkg/util/topsql/reporter/pubsub_test.go (1)

215-218: Consider extracting a shared TopRU state reset helper for tests.

The repeated global-state setup/cleanup blocks are correct but duplicated; a helper would reduce drift and simplify future test additions.

♻️ Possible helper extraction
+func resetTopRUState(t *testing.T) {
+	for topsqlstate.TopRUEnabled() {
+		topsqlstate.DisableTopRU()
+	}
+	topsqlstate.DisableTopSQL()
+	topsqlstate.ResetTopRUItemInterval()
+	t.Cleanup(func() {
+		for topsqlstate.TopRUEnabled() {
+			topsqlstate.DisableTopRU()
+		}
+		topsqlstate.DisableTopSQL()
+		topsqlstate.ResetTopRUItemInterval()
+	})
+}

Also applies to: 269-271, 283-294, 439-450, 468-478

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/util/topsql/reporter/pubsub_test.go` around lines 215 - 218, Extract a
test helper (e.g., resetTopRUState or ensureTopRUReset) to encapsulate the
repeated teardown/setup: call topsqlstate.TopRUEnabled(),
topsqlstate.DisableTopRU() as needed, and topsqlstate.ResetTopRUItemInterval()
inside it, and replace the duplicated blocks at the locations referenced (the
blocks that call TopRUEnabled, DisableTopRU, ResetTopRUItemInterval) with a
single call to that helper from each test; ensure the helper is reachable by
tests (put it in the same _test.go or a testutils_test.go) and document its
expected post-condition.
pkg/util/topsql/state/state.go (1)

158-164: Consider suppressing override logs when the interval does not change.

Guarding the info log behind current_interval_seconds != new_interval_seconds can reduce noisy subscription logs.

♻️ Minimal logging guard
-	logutil.BgLogger().Info(
-		"[top-sql] top ru item interval overridden by later subscription",
-		zap.Int64("current_interval_seconds", current),
-		zap.Int64("new_interval_seconds", intervalSeconds),
-		zap.Int64("active_subscribers", GlobalState.ruConsumerCount.Load()),
-	)
+	if current != intervalSeconds {
+		logutil.BgLogger().Info(
+			"[top-sql] top ru item interval overridden by later subscription",
+			zap.Int64("current_interval_seconds", current),
+			zap.Int64("new_interval_seconds", intervalSeconds),
+			zap.Int64("active_subscribers", GlobalState.ruConsumerCount.Load()),
+		)
+	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/util/topsql/state/state.go` around lines 158 - 164, The info log is noisy
when the interval doesn't change; modify the block around
logutil.BgLogger().Info so it only logs when current != intervalSeconds (compare
the existing current value and the new intervalSeconds) while still calling
GlobalState.TopRUItemIntervalSeconds.Store(intervalSeconds) as before; use the
same fields (current_interval_seconds, new_interval_seconds, active_subscribers
via GlobalState.ruConsumerCount.Load()) inside the guarded log to keep
diagnostics when an actual override occurs.
pkg/util/topsql/reporter/pubsub.go (1)

237-275: Consider extracting a shared emit helper for the four send loops.

sendTopSQLRecords, sendTopRURecords, sendSQLMeta, and sendPlanMeta now duplicate send/count/cancel patterns; a common helper would keep behavior and future fixes consistent.

Also applies to: 277-321, 323-358, 360-395

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/util/topsql/reporter/pubsub.go` around lines 237 - 275, The four methods
sendTopSQLRecords, sendTopRURecords, sendSQLMeta, and sendPlanMeta duplicate the
same send/count/cancel/metrics pattern; refactor by extracting a shared helper
(e.g., emitWithMetrics or pubSubDataSink.emit) that accepts context, a prebuilt
message value or a function that sets the message payload, and the reporter
metrics to update; the helper should perform the loop calling
ds.stream.Send(...), increment sentCount, respect ctx.Done (setting err =
ctx.Err()), and run the existing defer logic to observe sentCount and
success/failure duration so the original functions simply prepare messages and
call this helper to send them.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@pkg/util/topsql/reporter/datasink.go`:
- Around line 121-140: The Deregister logic currently returns early when
r.ctx.Done() is signaled, skipping removal and counter rollback; instead always
perform the sink cleanup: ensure the code removes dataSink from r.dataSinks and
updates r.topSQLSinkCount and calls
topsqlstate.DisableTopSQL()/topsqlstate.DisableTopRU even if r.ctx.Done() is
closed. Modify the select around r.ctx.Done() so it does not return early (e.g.,
remove the early return or move the ctx check after performing deletion), and
keep the existing checks against dataSink, pubSubDataSink, r.topSQLSinkCount,
and the calls to DisableTopSQL/DisableTopRU to guarantee global state is fixed
up.

---

Nitpick comments:
In `@pkg/util/topsql/reporter/pubsub_test.go`:
- Around line 215-218: Extract a test helper (e.g., resetTopRUState or
ensureTopRUReset) to encapsulate the repeated teardown/setup: call
topsqlstate.TopRUEnabled(), topsqlstate.DisableTopRU() as needed, and
topsqlstate.ResetTopRUItemInterval() inside it, and replace the duplicated
blocks at the locations referenced (the blocks that call TopRUEnabled,
DisableTopRU, ResetTopRUItemInterval) with a single call to that helper from
each test; ensure the helper is reachable by tests (put it in the same _test.go
or a testutils_test.go) and document its expected post-condition.

In `@pkg/util/topsql/reporter/pubsub.go`:
- Around line 237-275: The four methods sendTopSQLRecords, sendTopRURecords,
sendSQLMeta, and sendPlanMeta duplicate the same send/count/cancel/metrics
pattern; refactor by extracting a shared helper (e.g., emitWithMetrics or
pubSubDataSink.emit) that accepts context, a prebuilt message value or a
function that sets the message payload, and the reporter metrics to update; the
helper should perform the loop calling ds.stream.Send(...), increment sentCount,
respect ctx.Done (setting err = ctx.Err()), and run the existing defer logic to
observe sentCount and success/failure duration so the original functions simply
prepare messages and call this helper to send them.

In `@pkg/util/topsql/state/BUILD.bazel`:
- Line 21: The test target was marked flaky; instead inspect and fix the root
cause in the TopRU state tests: remove flaky = True, reproduce the intermittent
failure locally, and add deterministic synchronization in the TopRU/TopRUState
code and its tests (e.g., use explicit wait groups, channels, clocks, or test
hooks) to eliminate race/timing assumptions; update the test(s) (the
TopRU-related test functions) to use these deterministic helpers and re-run CI
before leaving flaky = True removed.

In `@pkg/util/topsql/state/state.go`:
- Around line 158-164: The info log is noisy when the interval doesn't change;
modify the block around logutil.BgLogger().Info so it only logs when current !=
intervalSeconds (compare the existing current value and the new intervalSeconds)
while still calling GlobalState.TopRUItemIntervalSeconds.Store(intervalSeconds)
as before; use the same fields (current_interval_seconds, new_interval_seconds,
active_subscribers via GlobalState.ruConsumerCount.Load()) inside the guarded
log to keep diagnostics when an actual override occurs.

In `@pkg/util/topsql/stmtstats/rustats.go`:
- Around line 57-61: Add a nil-check at the start of the RUIncrement.Merge
method so it safely returns if the incoming pointer is nil; specifically inside
func (r *RUIncrement) Merge(other *RUIncrement) guard with if other == nil {
return } before accessing other.TotalRU, other.ExecCount, and other.ExecDuration
to prevent panics when upstream pointers are absent.

ℹ️ Review info

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a171feb and 4db51ae.

⛔ Files ignored due to path filters (1)
  • go.sum is excluded by !**/*.sum
📒 Files selected for processing (17)
  • DEPS.bzl
  • go.mod
  • pkg/util/topsql/reporter/BUILD.bazel
  • pkg/util/topsql/reporter/datasink.go
  • pkg/util/topsql/reporter/datasink_test.go
  • pkg/util/topsql/reporter/metrics/metrics.go
  • pkg/util/topsql/reporter/mock/server.go
  • pkg/util/topsql/reporter/pubsub.go
  • pkg/util/topsql/reporter/pubsub_test.go
  • pkg/util/topsql/reporter/single_target.go
  • pkg/util/topsql/reporter/single_target_test.go
  • pkg/util/topsql/state/BUILD.bazel
  • pkg/util/topsql/state/state.go
  • pkg/util/topsql/state/state_test.go
  • pkg/util/topsql/stmtstats/BUILD.bazel
  • pkg/util/topsql/stmtstats/rustats.go
  • pkg/util/topsql/topsql_test.go

Comment on lines 121 to 140
select {
case <-r.ctx.Done():
default:
if _, ok := r.dataSinks[dataSink]; !ok {
return
}

delete(r.dataSinks, dataSink)
if len(r.dataSinks) == 0 {
topsqlstate.DisableTopSQL()
if ds, ok := dataSink.(*pubSubDataSink); ok && ds.enableTopSQL {
if r.topSQLSinkCount > 0 {
r.topSQLSinkCount--
}
if r.topSQLSinkCount == 0 {
topsqlstate.DisableTopSQL()
}
}

if ds, ok := dataSink.(*pubSubDataSink); ok && ds.enableTopRU {
topsqlstate.DisableTopRU()
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Always execute Deregister cleanup, even after context cancellation.

On Line 121, the ctx.Done() branch skips sink removal and counter rollback. If cancellation happens before deregistration, TopSQL/TopRU can stay enabled and leak global state.

💡 Suggested fix
 func (r *DefaultDataSinkRegisterer) Deregister(dataSink DataSink) {
 	r.Lock()
 	defer r.Unlock()

-	select {
-	case <-r.ctx.Done():
-	default:
-		if _, ok := r.dataSinks[dataSink]; !ok {
-			return
-		}
-
-		delete(r.dataSinks, dataSink)
-		if ds, ok := dataSink.(*pubSubDataSink); ok && ds.enableTopSQL {
-			if r.topSQLSinkCount > 0 {
-				r.topSQLSinkCount--
-			}
-			if r.topSQLSinkCount == 0 {
-				topsqlstate.DisableTopSQL()
-			}
-		}
-
-		if ds, ok := dataSink.(*pubSubDataSink); ok && ds.enableTopRU {
-			topsqlstate.DisableTopRU()
-		}
-	}
+	if _, ok := r.dataSinks[dataSink]; !ok {
+		return
+	}
+
+	delete(r.dataSinks, dataSink)
+	if ds, ok := dataSink.(*pubSubDataSink); ok && ds.enableTopSQL {
+		if r.topSQLSinkCount > 0 {
+			r.topSQLSinkCount--
+		}
+		if r.topSQLSinkCount == 0 {
+			topsqlstate.DisableTopSQL()
+		}
+	}
+
+	if ds, ok := dataSink.(*pubSubDataSink); ok && ds.enableTopRU {
+		topsqlstate.DisableTopRU()
+	}
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
select {
case <-r.ctx.Done():
default:
if _, ok := r.dataSinks[dataSink]; !ok {
return
}
delete(r.dataSinks, dataSink)
if len(r.dataSinks) == 0 {
topsqlstate.DisableTopSQL()
if ds, ok := dataSink.(*pubSubDataSink); ok && ds.enableTopSQL {
if r.topSQLSinkCount > 0 {
r.topSQLSinkCount--
}
if r.topSQLSinkCount == 0 {
topsqlstate.DisableTopSQL()
}
}
if ds, ok := dataSink.(*pubSubDataSink); ok && ds.enableTopRU {
topsqlstate.DisableTopRU()
}
func (r *DefaultDataSinkRegisterer) Deregister(dataSink DataSink) {
r.Lock()
defer r.Unlock()
if _, ok := r.dataSinks[dataSink]; !ok {
return
}
delete(r.dataSinks, dataSink)
if ds, ok := dataSink.(*pubSubDataSink); ok && ds.enableTopSQL {
if r.topSQLSinkCount > 0 {
r.topSQLSinkCount--
}
if r.topSQLSinkCount == 0 {
topsqlstate.DisableTopSQL()
}
}
if ds, ok := dataSink.(*pubSubDataSink); ok && ds.enableTopRU {
topsqlstate.DisableTopRU()
}
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/util/topsql/reporter/datasink.go` around lines 121 - 140, The Deregister
logic currently returns early when r.ctx.Done() is signaled, skipping removal
and counter rollback; instead always perform the sink cleanup: ensure the code
removes dataSink from r.dataSinks and updates r.topSQLSinkCount and calls
topsqlstate.DisableTopSQL()/topsqlstate.DisableTopRU even if r.ctx.Done() is
closed. Modify the select around r.ctx.Done() so it does not return early (e.g.,
remove the early return or move the ctx check after performing deletion), and
keep the existing checks against dataSink, pubSubDataSink, r.topSQLSinkCount,
and the calls to DisableTopSQL/DisableTopRU to guarantee global state is fixed
up.

@tiprow
Copy link

tiprow bot commented Mar 3, 2026

@zimulala: PRs from untrusted users cannot be marked as trusted with /ok-to-test in this repo meaning untrusted PR authors can never trigger tests themselves. Collaborators can still trigger tests on the PR using /test.

Details

In response to this:

/retest

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
pkg/util/topsql/reporter/BUILD.bazel (1)

50-50: shard_count reaches the recommended upper boundary.

Increasing from 37 to 50 is fine for now, but the test suite is at the practical limit (≤50 tests per package). If more tests are added in follow-up PRs, consider splitting the test package to maintain build/test performance.

Based on learnings: "Unit test suite size in one package SHOULD stay around 50 or fewer as a practical target."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/util/topsql/reporter/BUILD.bazel` at line 50, The BUILD rule's
shard_count was raised to 50 (shard_count = 50), which hits the recommended
upper boundary for tests in one package; revert shard_count to the previous
safer value (e.g., shard_count = 37) to avoid hitting the practical limit, and
if more tests are needed in follow-ups split the test package (or add another
test target) to keep per-package tests ≤50—update the shard_count attribute
and/or create a new test package/target accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@pkg/util/topsql/reporter/BUILD.bazel`:
- Line 50: The BUILD rule's shard_count was raised to 50 (shard_count = 50),
which hits the recommended upper boundary for tests in one package; revert
shard_count to the previous safer value (e.g., shard_count = 37) to avoid
hitting the practical limit, and if more tests are needed in follow-ups split
the test package (or add another test target) to keep per-package tests
≤50—update the shard_count attribute and/or create a new test package/target
accordingly.

ℹ️ Review info

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4db51ae and 3956e57.

📒 Files selected for processing (2)
  • pkg/util/topsql/BUILD.bazel
  • pkg/util/topsql/reporter/BUILD.bazel

@zimulala
Copy link
Contributor Author

zimulala commented Mar 3, 2026

/retest

@tiprow
Copy link

tiprow bot commented Mar 3, 2026

@zimulala: PRs from untrusted users cannot be marked as trusted with /ok-to-test in this repo meaning untrusted PR authors can never trigger tests themselves. Collaborators can still trigger tests on the PR using /test.

Details

In response to this:

/retest

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository.

@tiprow
Copy link

tiprow bot commented Mar 3, 2026

@zimulala: PRs from untrusted users cannot be marked as trusted with /ok-to-test in this repo meaning untrusted PR authors can never trigger tests themselves. Collaborators can still trigger tests on the PR using /test.

Details

In response to this:

/retest

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository.

@pingcap pingcap deleted a comment from tiprow bot Mar 3, 2026
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
pkg/util/topsql/reporter/pubsub_test.go (2)

176-183: Replace fixed time.Sleep with condition-based waiting.

The fixed 1s sleep makes this test slower and potentially flaky under load. Prefer polling assertions (require.Eventually) on stream counts.

Proposed patch
-	time.Sleep(1 * time.Second)
-
-	mockStream.Lock()
-	assert.Len(t, mockStream.records, 1)
-	assert.Len(t, mockStream.ruRecords, 0)
-	assert.Len(t, mockStream.sqlMetas, 1)
-	assert.Len(t, mockStream.planMetas, 1)
-	mockStream.Unlock()
+	require.Eventually(t, func() bool {
+		mockStream.Lock()
+		defer mockStream.Unlock()
+		return len(mockStream.records) == 1 &&
+			len(mockStream.ruRecords) == 0 &&
+			len(mockStream.sqlMetas) == 1 &&
+			len(mockStream.planMetas) == 1
+	}, time.Second, 10*time.Millisecond)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/util/topsql/reporter/pubsub_test.go` around lines 176 - 183, Replace the
fixed time.Sleep(1 * time.Second) with condition-based polling using
require.Eventually: repeatedly check under mockStream.Lock() that
len(mockStream.records)==1, len(mockStream.ruRecords)==0,
len(mockStream.sqlMetas)==1 and len(mockStream.planMetas)==1 (unlock after
reading) until the condition is true or a timeout elapses; use
require.Eventually from the test package with a reasonable timeout and interval
to avoid flakiness and remove the static sleep. Ensure you reference the
mockStream object and its records/ruRecords/sqlMetas/planMetas slices in the
condition closure so the assertions reflect the guarded state.

150-187: Use t.Cleanup to guarantee failpoint teardown and sink shutdown.

failpoint.Disable and ds.OnReporterClosing() currently rely on reaching the end of the test body. Move both into cleanup to avoid cross-test contamination on early exits/panics.

Proposed patch
 func TestPubSubDataSink(t *testing.T) {
 	mockStream := &mockPubSubDataSinkStream{}
 	// Create a subscription request (TopSQL only).
 	req := &tipb.TopSQLSubRequest{}
 	ds := newPubSubDataSink(req, mockStream, &mockPubSubDataSinkRegisterer{})
 	go func() {
 		_ = ds.run()
 	}()

 	panicPath := "github.com/pingcap/tidb/pkg/util/topsql/reporter/mockGrpcLogPanic"
 	require.NoError(t, failpoint.Enable(panicPath, "panic"))
+	t.Cleanup(func() {
+		ds.OnReporterClosing()
+		assert.NoError(t, failpoint.Disable(panicPath))
+	})
 	err := ds.TrySend(&ReportData{
 		...
 	}, time.Now().Add(10*time.Second))
 	assert.NoError(t, err)
@@
-	ds.OnReporterClosing()
-	require.NoError(t, failpoint.Disable(panicPath))
 }

As per coding guidelines, "**/*_test.go: For unit tests in packages that use failpoints, MUST enable failpoints before tests and disable afterward".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/util/topsql/reporter/pubsub_test.go` around lines 150 - 187, The test
enables a failpoint and calls ds.OnReporterClosing() at the end which may not
run on early return/panic, risking cross-test contamination; move the teardown
into t.Cleanup by registering a cleanup that disables the failpoint
(failpoint.Disable(panicPath)) and calls ds.OnReporterClosing(), and ensure the
failpoint is enabled before registering cleanup so the cleanup always runs even
if the test fails or panics; reference the panicPath variable,
failpoint.Enable/Disable, ds.OnReporterClosing(), and use t.Cleanup to register
the teardown.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@pkg/util/topsql/reporter/pubsub_test.go`:
- Around line 214-258: This test mutates global TopRU state without guaranteed
cleanup on failure; capture the original global state at the start (e.g.
originalEnabled := topsqlstate.TopRUEnabled(); originalInterval :=
topsqlstate.GetTopRUItemInterval()), then register a t.Cleanup closure that
restores it (call topsqlstate.DisableTopRU() or an enable function depending on
originalEnabled, and topsqlstate.ResetTopRUItemInterval() or set the interval
back to originalInterval using available APIs/constants like
topsqlstate.DefTiDBTopRUItemIntervalSeconds), so that
NewTopSQLPubSubService.Subscribe tests (and functions referenced:
TestPubSubMultiSubscriberIsolation, topsqlstate.TopRUEnabled,
topsqlstate.DisableTopRU, topsqlstate.ResetTopRUItemInterval,
topsqlstate.GetTopRUItemInterval, topsqlstate.DefTiDBTopRUItemIntervalSeconds)
always restore the global TopRU state even if the test fails.

---

Nitpick comments:
In `@pkg/util/topsql/reporter/pubsub_test.go`:
- Around line 176-183: Replace the fixed time.Sleep(1 * time.Second) with
condition-based polling using require.Eventually: repeatedly check under
mockStream.Lock() that len(mockStream.records)==1, len(mockStream.ruRecords)==0,
len(mockStream.sqlMetas)==1 and len(mockStream.planMetas)==1 (unlock after
reading) until the condition is true or a timeout elapses; use
require.Eventually from the test package with a reasonable timeout and interval
to avoid flakiness and remove the static sleep. Ensure you reference the
mockStream object and its records/ruRecords/sqlMetas/planMetas slices in the
condition closure so the assertions reflect the guarded state.
- Around line 150-187: The test enables a failpoint and calls
ds.OnReporterClosing() at the end which may not run on early return/panic,
risking cross-test contamination; move the teardown into t.Cleanup by
registering a cleanup that disables the failpoint (failpoint.Disable(panicPath))
and calls ds.OnReporterClosing(), and ensure the failpoint is enabled before
registering cleanup so the cleanup always runs even if the test fails or panics;
reference the panicPath variable, failpoint.Enable/Disable,
ds.OnReporterClosing(), and use t.Cleanup to register the teardown.

ℹ️ Review info

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 56c17f7 and 26caba1.

📒 Files selected for processing (4)
  • pkg/util/topsql/BUILD.bazel
  • pkg/util/topsql/reporter/BUILD.bazel
  • pkg/util/topsql/reporter/metrics/metrics.go
  • pkg/util/topsql/reporter/pubsub_test.go
🚧 Files skipped from review as they are similar to previous changes (2)
  • pkg/util/topsql/BUILD.bazel
  • pkg/util/topsql/reporter/BUILD.bazel

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
pkg/util/topsql/reporter/pubsub_test.go (1)

392-402: Consider adding cleanup for consistency with other subtests.

This subtest clears global TopRU state at the start but lacks a t.Cleanup to restore state on failure. While the risk is minimal (since it only disables), adding cleanup would align with the pattern used in other subtests and improve test isolation.

Suggested cleanup
 	t.Run("register fail does not enable topru", func(t *testing.T) {
 		for topsqlstate.TopRUEnabled() {
 			topsqlstate.DisableTopRU()
 		}
+		t.Cleanup(func() {
+			for topsqlstate.TopRUEnabled() {
+				topsqlstate.DisableTopRU()
+			}
+		})
 
 		req := mockTopRUSubRequest(tipb.ItemInterval_ITEM_INTERVAL_15S)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/util/topsql/reporter/pubsub_test.go` around lines 392 - 402, The subtest
"register fail does not enable topru" disables global TopRU state but doesn't
restore it on test failure; capture the original state via
topsqlstate.TopRUEnabled() at the start of the test and register a t.Cleanup
that restores it (call topsqlstate.DisableTopRU() or re-enable as needed) so the
global TopRU state is returned to its prior value after the test; update the
anonymous test function containing topsqlstate.TopRUEnabled(),
topsqlstate.DisableTopRU(), and the Subscribe call on NewTopSQLPubSubService to
include this t.Cleanup restoration.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@pkg/util/topsql/reporter/pubsub_test.go`:
- Around line 392-402: The subtest "register fail does not enable topru"
disables global TopRU state but doesn't restore it on test failure; capture the
original state via topsqlstate.TopRUEnabled() at the start of the test and
register a t.Cleanup that restores it (call topsqlstate.DisableTopRU() or
re-enable as needed) so the global TopRU state is returned to its prior value
after the test; update the anonymous test function containing
topsqlstate.TopRUEnabled(), topsqlstate.DisableTopRU(), and the Subscribe call
on NewTopSQLPubSubService to include this t.Cleanup restoration.

ℹ️ Review info

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 26caba1 and 9d19a1f.

📒 Files selected for processing (1)
  • pkg/util/topsql/reporter/pubsub_test.go

@codecov
Copy link

codecov bot commented Mar 3, 2026

Codecov Report

❌ Patch coverage is 93.43629% with 17 lines in your changes missing coverage. Please review.
✅ Project coverage is 77.1198%. Comparing base (5302a5b) to head (9414b6e).
⚠️ Report is 55 commits behind head on master.

Additional details and impacted files
@@               Coverage Diff                @@
##             master     #66642        +/-   ##
================================================
- Coverage   77.6781%   77.1198%   -0.5584%     
================================================
  Files          2006       1931        -75     
  Lines        548248     547081      -1167     
================================================
- Hits         425869     421908      -3961     
- Misses       120719     125137      +4418     
+ Partials       1660         36      -1624     
Flag Coverage Δ
integration 41.3413% <4.4642%> (-6.8468%) ⬇️
unit 76.0876% <93.4362%> (-0.2398%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Components Coverage Δ
dumpling 56.7974% <ø> (ø)
parser ∅ <ø> (∅)
br 48.5831% <ø> (-12.2858%) ⬇️
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@zimulala zimulala requested review from XuHuaiyu, nolouch and yibin87 March 3, 2026 09:41
@pantheon-ai
Copy link

pantheon-ai bot commented Mar 3, 2026

Review Complete

Findings: 2 issues
Posted: 1
Duplicates/Skipped: 1

ℹ️ Learn more details on Pantheon AI.

return err
}

logutil.BgLogger().Info(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Info log fires on every TopRU interval set, including the first subscriber and same-value updates

Why: SetTopRUItemInterval unconditionally emits an Info log at state.go:158 on every valid call, with the message "top ru item interval overridden by later subscription". This fires even on the first subscriber (no "override" occurred) and when the new interval equals the current one. In deployments with frequent subscribe/reconnect cycles this generates operational noise and the misleading wording can confuse operators.

Evidence:

  • pkg/util/topsql/state/state.go:158logutil.BgLogger().Info("[top-sql] top ru item interval overridden by later subscription", ...) called unconditionally for any valid interval
  • pkg/util/topsql/reporter/datasink.go:83SetTopRUItemInterval called on every *pubSubDataSink registration with enableTopRU==true
  • pkg/util/topsql/reporter/pubsub.go:49 — a new sink is created and registered per client stream

Suggestion: Guard the log behind an actual override check (e.g., only log when newInterval != currentInterval) and update the message to distinguish first-set from override, or downgrade to Debug level.

@zimulala
Copy link
Contributor Author

zimulala commented Mar 3, 2026

/retest

@tiprow
Copy link

tiprow bot commented Mar 3, 2026

@zimulala: PRs from untrusted users cannot be marked as trusted with /ok-to-test in this repo meaning untrusted PR authors can never trigger tests themselves. Collaborators can still trigger tests on the PR using /test.

Details

In response to this:

/retest

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository.

@ti-chi-bot
Copy link

ti-chi-bot bot commented Mar 3, 2026

@zimulala: The following test failed, say /retest to rerun all failed tests or /retest-required to rerun all mandatory failed tests:

Test name Commit Details Required Rerun command
idc-jenkins-ci-tidb/unit-test 9414b6e link true /test unit-test

Full PR test history. Your PR dashboard.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

release-note-none Denotes a PR that doesn't merit a release note. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Introduce TopRU state/interval + subscription transport (tipb)

1 participant