topsql: add TopRU state and subscription plumbing (tipb upgrade)#66642
topsql: add TopRU state and subscription plumbing (tipb upgrade)#66642zimulala wants to merge 6 commits intopingcap:masterfrom
Conversation
|
Review Complete Findings: 5 issues |
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
Hi @zimulala. Thanks for your PR. PRs from untrusted users cannot be marked as trusted with I understand the commands that are listed here. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds TopRU (resource-usage) support across TopSQL: state ref‑counting and interval validation, pub/sub and single-target reporting for RU records, RU metrics, stmtstats RU types, build/dependency bumps, BUILD/test tweaks, and extensive tests across state, subscription, reporting, and sinks. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant PubSub as TopSQL PubSub
participant DataSink as pubSubDataSink
participant State as topsql.State
participant Agent as Remote Agent
participant Metrics
Client->>PubSub: Subscribe(req with TopRU/TopSQL)
PubSub->>DataSink: newPubSubDataSink(req, stream, registerer)
DataSink->>State: EnableTopSQL()/EnableTopRU() (based on req)
State-->>DataSink: ref-count updated
DataSink->>DataSink: run() - periodic doSend loop
loop periodic doSend
DataSink->>State: TopSQLEnabled() / TopRUEnabled()
alt TopSQL enabled
DataSink->>Agent: sendTopSQLRecords()
Agent-->>DataSink: ack
DataSink->>Metrics: observe TopSQL metrics
end
alt TopRU enabled
DataSink->>Agent: sendTopRURecords()
Agent-->>DataSink: ack
DataSink->>Metrics: observe RU metrics
end
end
Client->>PubSub: disconnect
DataSink->>State: DisableTopSQL()/DisableTopRU()
State-->>DataSink: ref-count updated (may reset interval when zero)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (5)
pkg/util/topsql/stmtstats/rustats.go (1)
57-61: Consider makingRUIncrement.Mergetolerant ofnilinput.A lightweight guard on
otheravoids panic paths when upstream pointer values are sparse/unset.Suggested diff
func (r *RUIncrement) Merge(other *RUIncrement) { + if other == nil { + return + } r.TotalRU += other.TotalRU r.ExecCount += other.ExecCount r.ExecDuration += other.ExecDuration }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/util/topsql/stmtstats/rustats.go` around lines 57 - 61, Add a nil-check at the start of the RUIncrement.Merge method so it safely returns if the incoming pointer is nil; specifically inside func (r *RUIncrement) Merge(other *RUIncrement) guard with if other == nil { return } before accessing other.TotalRU, other.ExecCount, and other.ExecDuration to prevent panics when upstream pointers are absent.pkg/util/topsql/state/BUILD.bazel (1)
21-21: Consider investigating flaky test root cause.Marking a test as
flaky = Trueallows it to pass intermittently, which can mask real issues. If the flakiness is due to timing/concurrency in the TopRU state management, consider adding synchronization or deterministic test helpers rather than accepting flakiness.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/util/topsql/state/BUILD.bazel` at line 21, The test target was marked flaky; instead inspect and fix the root cause in the TopRU state tests: remove flaky = True, reproduce the intermittent failure locally, and add deterministic synchronization in the TopRU/TopRUState code and its tests (e.g., use explicit wait groups, channels, clocks, or test hooks) to eliminate race/timing assumptions; update the test(s) (the TopRU-related test functions) to use these deterministic helpers and re-run CI before leaving flaky = True removed.pkg/util/topsql/reporter/pubsub_test.go (1)
215-218: Consider extracting a shared TopRU state reset helper for tests.The repeated global-state setup/cleanup blocks are correct but duplicated; a helper would reduce drift and simplify future test additions.
♻️ Possible helper extraction
+func resetTopRUState(t *testing.T) { + for topsqlstate.TopRUEnabled() { + topsqlstate.DisableTopRU() + } + topsqlstate.DisableTopSQL() + topsqlstate.ResetTopRUItemInterval() + t.Cleanup(func() { + for topsqlstate.TopRUEnabled() { + topsqlstate.DisableTopRU() + } + topsqlstate.DisableTopSQL() + topsqlstate.ResetTopRUItemInterval() + }) +}Also applies to: 269-271, 283-294, 439-450, 468-478
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/util/topsql/reporter/pubsub_test.go` around lines 215 - 218, Extract a test helper (e.g., resetTopRUState or ensureTopRUReset) to encapsulate the repeated teardown/setup: call topsqlstate.TopRUEnabled(), topsqlstate.DisableTopRU() as needed, and topsqlstate.ResetTopRUItemInterval() inside it, and replace the duplicated blocks at the locations referenced (the blocks that call TopRUEnabled, DisableTopRU, ResetTopRUItemInterval) with a single call to that helper from each test; ensure the helper is reachable by tests (put it in the same _test.go or a testutils_test.go) and document its expected post-condition.pkg/util/topsql/state/state.go (1)
158-164: Consider suppressing override logs when the interval does not change.Guarding the info log behind
current_interval_seconds != new_interval_secondscan reduce noisy subscription logs.♻️ Minimal logging guard
- logutil.BgLogger().Info( - "[top-sql] top ru item interval overridden by later subscription", - zap.Int64("current_interval_seconds", current), - zap.Int64("new_interval_seconds", intervalSeconds), - zap.Int64("active_subscribers", GlobalState.ruConsumerCount.Load()), - ) + if current != intervalSeconds { + logutil.BgLogger().Info( + "[top-sql] top ru item interval overridden by later subscription", + zap.Int64("current_interval_seconds", current), + zap.Int64("new_interval_seconds", intervalSeconds), + zap.Int64("active_subscribers", GlobalState.ruConsumerCount.Load()), + ) + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/util/topsql/state/state.go` around lines 158 - 164, The info log is noisy when the interval doesn't change; modify the block around logutil.BgLogger().Info so it only logs when current != intervalSeconds (compare the existing current value and the new intervalSeconds) while still calling GlobalState.TopRUItemIntervalSeconds.Store(intervalSeconds) as before; use the same fields (current_interval_seconds, new_interval_seconds, active_subscribers via GlobalState.ruConsumerCount.Load()) inside the guarded log to keep diagnostics when an actual override occurs.pkg/util/topsql/reporter/pubsub.go (1)
237-275: Consider extracting a shared emit helper for the four send loops.
sendTopSQLRecords,sendTopRURecords,sendSQLMeta, andsendPlanMetanow duplicate send/count/cancel patterns; a common helper would keep behavior and future fixes consistent.Also applies to: 277-321, 323-358, 360-395
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/util/topsql/reporter/pubsub.go` around lines 237 - 275, The four methods sendTopSQLRecords, sendTopRURecords, sendSQLMeta, and sendPlanMeta duplicate the same send/count/cancel/metrics pattern; refactor by extracting a shared helper (e.g., emitWithMetrics or pubSubDataSink.emit) that accepts context, a prebuilt message value or a function that sets the message payload, and the reporter metrics to update; the helper should perform the loop calling ds.stream.Send(...), increment sentCount, respect ctx.Done (setting err = ctx.Err()), and run the existing defer logic to observe sentCount and success/failure duration so the original functions simply prepare messages and call this helper to send them.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@pkg/util/topsql/reporter/datasink.go`:
- Around line 121-140: The Deregister logic currently returns early when
r.ctx.Done() is signaled, skipping removal and counter rollback; instead always
perform the sink cleanup: ensure the code removes dataSink from r.dataSinks and
updates r.topSQLSinkCount and calls
topsqlstate.DisableTopSQL()/topsqlstate.DisableTopRU even if r.ctx.Done() is
closed. Modify the select around r.ctx.Done() so it does not return early (e.g.,
remove the early return or move the ctx check after performing deletion), and
keep the existing checks against dataSink, pubSubDataSink, r.topSQLSinkCount,
and the calls to DisableTopSQL/DisableTopRU to guarantee global state is fixed
up.
---
Nitpick comments:
In `@pkg/util/topsql/reporter/pubsub_test.go`:
- Around line 215-218: Extract a test helper (e.g., resetTopRUState or
ensureTopRUReset) to encapsulate the repeated teardown/setup: call
topsqlstate.TopRUEnabled(), topsqlstate.DisableTopRU() as needed, and
topsqlstate.ResetTopRUItemInterval() inside it, and replace the duplicated
blocks at the locations referenced (the blocks that call TopRUEnabled,
DisableTopRU, ResetTopRUItemInterval) with a single call to that helper from
each test; ensure the helper is reachable by tests (put it in the same _test.go
or a testutils_test.go) and document its expected post-condition.
In `@pkg/util/topsql/reporter/pubsub.go`:
- Around line 237-275: The four methods sendTopSQLRecords, sendTopRURecords,
sendSQLMeta, and sendPlanMeta duplicate the same send/count/cancel/metrics
pattern; refactor by extracting a shared helper (e.g., emitWithMetrics or
pubSubDataSink.emit) that accepts context, a prebuilt message value or a
function that sets the message payload, and the reporter metrics to update; the
helper should perform the loop calling ds.stream.Send(...), increment sentCount,
respect ctx.Done (setting err = ctx.Err()), and run the existing defer logic to
observe sentCount and success/failure duration so the original functions simply
prepare messages and call this helper to send them.
In `@pkg/util/topsql/state/BUILD.bazel`:
- Line 21: The test target was marked flaky; instead inspect and fix the root
cause in the TopRU state tests: remove flaky = True, reproduce the intermittent
failure locally, and add deterministic synchronization in the TopRU/TopRUState
code and its tests (e.g., use explicit wait groups, channels, clocks, or test
hooks) to eliminate race/timing assumptions; update the test(s) (the
TopRU-related test functions) to use these deterministic helpers and re-run CI
before leaving flaky = True removed.
In `@pkg/util/topsql/state/state.go`:
- Around line 158-164: The info log is noisy when the interval doesn't change;
modify the block around logutil.BgLogger().Info so it only logs when current !=
intervalSeconds (compare the existing current value and the new intervalSeconds)
while still calling GlobalState.TopRUItemIntervalSeconds.Store(intervalSeconds)
as before; use the same fields (current_interval_seconds, new_interval_seconds,
active_subscribers via GlobalState.ruConsumerCount.Load()) inside the guarded
log to keep diagnostics when an actual override occurs.
In `@pkg/util/topsql/stmtstats/rustats.go`:
- Around line 57-61: Add a nil-check at the start of the RUIncrement.Merge
method so it safely returns if the incoming pointer is nil; specifically inside
func (r *RUIncrement) Merge(other *RUIncrement) guard with if other == nil {
return } before accessing other.TotalRU, other.ExecCount, and other.ExecDuration
to prevent panics when upstream pointers are absent.
ℹ️ Review info
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
go.sumis excluded by!**/*.sum
📒 Files selected for processing (17)
DEPS.bzlgo.modpkg/util/topsql/reporter/BUILD.bazelpkg/util/topsql/reporter/datasink.gopkg/util/topsql/reporter/datasink_test.gopkg/util/topsql/reporter/metrics/metrics.gopkg/util/topsql/reporter/mock/server.gopkg/util/topsql/reporter/pubsub.gopkg/util/topsql/reporter/pubsub_test.gopkg/util/topsql/reporter/single_target.gopkg/util/topsql/reporter/single_target_test.gopkg/util/topsql/state/BUILD.bazelpkg/util/topsql/state/state.gopkg/util/topsql/state/state_test.gopkg/util/topsql/stmtstats/BUILD.bazelpkg/util/topsql/stmtstats/rustats.gopkg/util/topsql/topsql_test.go
| select { | ||
| case <-r.ctx.Done(): | ||
| default: | ||
| if _, ok := r.dataSinks[dataSink]; !ok { | ||
| return | ||
| } | ||
|
|
||
| delete(r.dataSinks, dataSink) | ||
| if len(r.dataSinks) == 0 { | ||
| topsqlstate.DisableTopSQL() | ||
| if ds, ok := dataSink.(*pubSubDataSink); ok && ds.enableTopSQL { | ||
| if r.topSQLSinkCount > 0 { | ||
| r.topSQLSinkCount-- | ||
| } | ||
| if r.topSQLSinkCount == 0 { | ||
| topsqlstate.DisableTopSQL() | ||
| } | ||
| } | ||
|
|
||
| if ds, ok := dataSink.(*pubSubDataSink); ok && ds.enableTopRU { | ||
| topsqlstate.DisableTopRU() | ||
| } |
There was a problem hiding this comment.
Always execute Deregister cleanup, even after context cancellation.
On Line 121, the ctx.Done() branch skips sink removal and counter rollback. If cancellation happens before deregistration, TopSQL/TopRU can stay enabled and leak global state.
💡 Suggested fix
func (r *DefaultDataSinkRegisterer) Deregister(dataSink DataSink) {
r.Lock()
defer r.Unlock()
- select {
- case <-r.ctx.Done():
- default:
- if _, ok := r.dataSinks[dataSink]; !ok {
- return
- }
-
- delete(r.dataSinks, dataSink)
- if ds, ok := dataSink.(*pubSubDataSink); ok && ds.enableTopSQL {
- if r.topSQLSinkCount > 0 {
- r.topSQLSinkCount--
- }
- if r.topSQLSinkCount == 0 {
- topsqlstate.DisableTopSQL()
- }
- }
-
- if ds, ok := dataSink.(*pubSubDataSink); ok && ds.enableTopRU {
- topsqlstate.DisableTopRU()
- }
- }
+ if _, ok := r.dataSinks[dataSink]; !ok {
+ return
+ }
+
+ delete(r.dataSinks, dataSink)
+ if ds, ok := dataSink.(*pubSubDataSink); ok && ds.enableTopSQL {
+ if r.topSQLSinkCount > 0 {
+ r.topSQLSinkCount--
+ }
+ if r.topSQLSinkCount == 0 {
+ topsqlstate.DisableTopSQL()
+ }
+ }
+
+ if ds, ok := dataSink.(*pubSubDataSink); ok && ds.enableTopRU {
+ topsqlstate.DisableTopRU()
+ }
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| select { | |
| case <-r.ctx.Done(): | |
| default: | |
| if _, ok := r.dataSinks[dataSink]; !ok { | |
| return | |
| } | |
| delete(r.dataSinks, dataSink) | |
| if len(r.dataSinks) == 0 { | |
| topsqlstate.DisableTopSQL() | |
| if ds, ok := dataSink.(*pubSubDataSink); ok && ds.enableTopSQL { | |
| if r.topSQLSinkCount > 0 { | |
| r.topSQLSinkCount-- | |
| } | |
| if r.topSQLSinkCount == 0 { | |
| topsqlstate.DisableTopSQL() | |
| } | |
| } | |
| if ds, ok := dataSink.(*pubSubDataSink); ok && ds.enableTopRU { | |
| topsqlstate.DisableTopRU() | |
| } | |
| func (r *DefaultDataSinkRegisterer) Deregister(dataSink DataSink) { | |
| r.Lock() | |
| defer r.Unlock() | |
| if _, ok := r.dataSinks[dataSink]; !ok { | |
| return | |
| } | |
| delete(r.dataSinks, dataSink) | |
| if ds, ok := dataSink.(*pubSubDataSink); ok && ds.enableTopSQL { | |
| if r.topSQLSinkCount > 0 { | |
| r.topSQLSinkCount-- | |
| } | |
| if r.topSQLSinkCount == 0 { | |
| topsqlstate.DisableTopSQL() | |
| } | |
| } | |
| if ds, ok := dataSink.(*pubSubDataSink); ok && ds.enableTopRU { | |
| topsqlstate.DisableTopRU() | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/util/topsql/reporter/datasink.go` around lines 121 - 140, The Deregister
logic currently returns early when r.ctx.Done() is signaled, skipping removal
and counter rollback; instead always perform the sink cleanup: ensure the code
removes dataSink from r.dataSinks and updates r.topSQLSinkCount and calls
topsqlstate.DisableTopSQL()/topsqlstate.DisableTopRU even if r.ctx.Done() is
closed. Modify the select around r.ctx.Done() so it does not return early (e.g.,
remove the early return or move the ctx check after performing deletion), and
keep the existing checks against dataSink, pubSubDataSink, r.topSQLSinkCount,
and the calls to DisableTopSQL/DisableTopRU to guarantee global state is fixed
up.
|
@zimulala: PRs from untrusted users cannot be marked as trusted with DetailsIn response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
There was a problem hiding this comment.
🧹 Nitpick comments (1)
pkg/util/topsql/reporter/BUILD.bazel (1)
50-50:shard_countreaches the recommended upper boundary.Increasing from 37 to 50 is fine for now, but the test suite is at the practical limit (≤50 tests per package). If more tests are added in follow-up PRs, consider splitting the test package to maintain build/test performance.
Based on learnings: "Unit test suite size in one package SHOULD stay around 50 or fewer as a practical target."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/util/topsql/reporter/BUILD.bazel` at line 50, The BUILD rule's shard_count was raised to 50 (shard_count = 50), which hits the recommended upper boundary for tests in one package; revert shard_count to the previous safer value (e.g., shard_count = 37) to avoid hitting the practical limit, and if more tests are needed in follow-ups split the test package (or add another test target) to keep per-package tests ≤50—update the shard_count attribute and/or create a new test package/target accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@pkg/util/topsql/reporter/BUILD.bazel`:
- Line 50: The BUILD rule's shard_count was raised to 50 (shard_count = 50),
which hits the recommended upper boundary for tests in one package; revert
shard_count to the previous safer value (e.g., shard_count = 37) to avoid
hitting the practical limit, and if more tests are needed in follow-ups split
the test package (or add another test target) to keep per-package tests
≤50—update the shard_count attribute and/or create a new test package/target
accordingly.
|
/retest |
|
@zimulala: PRs from untrusted users cannot be marked as trusted with DetailsIn response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
|
@zimulala: PRs from untrusted users cannot be marked as trusted with DetailsIn response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
3956e57 to
56c17f7
Compare
56c17f7 to
26caba1
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (2)
pkg/util/topsql/reporter/pubsub_test.go (2)
176-183: Replace fixedtime.Sleepwith condition-based waiting.The fixed 1s sleep makes this test slower and potentially flaky under load. Prefer polling assertions (
require.Eventually) on stream counts.Proposed patch
- time.Sleep(1 * time.Second) - - mockStream.Lock() - assert.Len(t, mockStream.records, 1) - assert.Len(t, mockStream.ruRecords, 0) - assert.Len(t, mockStream.sqlMetas, 1) - assert.Len(t, mockStream.planMetas, 1) - mockStream.Unlock() + require.Eventually(t, func() bool { + mockStream.Lock() + defer mockStream.Unlock() + return len(mockStream.records) == 1 && + len(mockStream.ruRecords) == 0 && + len(mockStream.sqlMetas) == 1 && + len(mockStream.planMetas) == 1 + }, time.Second, 10*time.Millisecond)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/util/topsql/reporter/pubsub_test.go` around lines 176 - 183, Replace the fixed time.Sleep(1 * time.Second) with condition-based polling using require.Eventually: repeatedly check under mockStream.Lock() that len(mockStream.records)==1, len(mockStream.ruRecords)==0, len(mockStream.sqlMetas)==1 and len(mockStream.planMetas)==1 (unlock after reading) until the condition is true or a timeout elapses; use require.Eventually from the test package with a reasonable timeout and interval to avoid flakiness and remove the static sleep. Ensure you reference the mockStream object and its records/ruRecords/sqlMetas/planMetas slices in the condition closure so the assertions reflect the guarded state.
150-187: Uset.Cleanupto guarantee failpoint teardown and sink shutdown.
failpoint.Disableandds.OnReporterClosing()currently rely on reaching the end of the test body. Move both into cleanup to avoid cross-test contamination on early exits/panics.Proposed patch
func TestPubSubDataSink(t *testing.T) { mockStream := &mockPubSubDataSinkStream{} // Create a subscription request (TopSQL only). req := &tipb.TopSQLSubRequest{} ds := newPubSubDataSink(req, mockStream, &mockPubSubDataSinkRegisterer{}) go func() { _ = ds.run() }() panicPath := "github.com/pingcap/tidb/pkg/util/topsql/reporter/mockGrpcLogPanic" require.NoError(t, failpoint.Enable(panicPath, "panic")) + t.Cleanup(func() { + ds.OnReporterClosing() + assert.NoError(t, failpoint.Disable(panicPath)) + }) err := ds.TrySend(&ReportData{ ... }, time.Now().Add(10*time.Second)) assert.NoError(t, err) @@ - ds.OnReporterClosing() - require.NoError(t, failpoint.Disable(panicPath)) }As per coding guidelines, "
**/*_test.go: For unit tests in packages that use failpoints, MUST enable failpoints before tests and disable afterward".🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/util/topsql/reporter/pubsub_test.go` around lines 150 - 187, The test enables a failpoint and calls ds.OnReporterClosing() at the end which may not run on early return/panic, risking cross-test contamination; move the teardown into t.Cleanup by registering a cleanup that disables the failpoint (failpoint.Disable(panicPath)) and calls ds.OnReporterClosing(), and ensure the failpoint is enabled before registering cleanup so the cleanup always runs even if the test fails or panics; reference the panicPath variable, failpoint.Enable/Disable, ds.OnReporterClosing(), and use t.Cleanup to register the teardown.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@pkg/util/topsql/reporter/pubsub_test.go`:
- Around line 214-258: This test mutates global TopRU state without guaranteed
cleanup on failure; capture the original global state at the start (e.g.
originalEnabled := topsqlstate.TopRUEnabled(); originalInterval :=
topsqlstate.GetTopRUItemInterval()), then register a t.Cleanup closure that
restores it (call topsqlstate.DisableTopRU() or an enable function depending on
originalEnabled, and topsqlstate.ResetTopRUItemInterval() or set the interval
back to originalInterval using available APIs/constants like
topsqlstate.DefTiDBTopRUItemIntervalSeconds), so that
NewTopSQLPubSubService.Subscribe tests (and functions referenced:
TestPubSubMultiSubscriberIsolation, topsqlstate.TopRUEnabled,
topsqlstate.DisableTopRU, topsqlstate.ResetTopRUItemInterval,
topsqlstate.GetTopRUItemInterval, topsqlstate.DefTiDBTopRUItemIntervalSeconds)
always restore the global TopRU state even if the test fails.
---
Nitpick comments:
In `@pkg/util/topsql/reporter/pubsub_test.go`:
- Around line 176-183: Replace the fixed time.Sleep(1 * time.Second) with
condition-based polling using require.Eventually: repeatedly check under
mockStream.Lock() that len(mockStream.records)==1, len(mockStream.ruRecords)==0,
len(mockStream.sqlMetas)==1 and len(mockStream.planMetas)==1 (unlock after
reading) until the condition is true or a timeout elapses; use
require.Eventually from the test package with a reasonable timeout and interval
to avoid flakiness and remove the static sleep. Ensure you reference the
mockStream object and its records/ruRecords/sqlMetas/planMetas slices in the
condition closure so the assertions reflect the guarded state.
- Around line 150-187: The test enables a failpoint and calls
ds.OnReporterClosing() at the end which may not run on early return/panic,
risking cross-test contamination; move the teardown into t.Cleanup by
registering a cleanup that disables the failpoint (failpoint.Disable(panicPath))
and calls ds.OnReporterClosing(), and ensure the failpoint is enabled before
registering cleanup so the cleanup always runs even if the test fails or panics;
reference the panicPath variable, failpoint.Enable/Disable,
ds.OnReporterClosing(), and use t.Cleanup to register the teardown.
ℹ️ Review info
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
pkg/util/topsql/BUILD.bazelpkg/util/topsql/reporter/BUILD.bazelpkg/util/topsql/reporter/metrics/metrics.gopkg/util/topsql/reporter/pubsub_test.go
🚧 Files skipped from review as they are similar to previous changes (2)
- pkg/util/topsql/BUILD.bazel
- pkg/util/topsql/reporter/BUILD.bazel
9d19a1f to
90cb7d1
Compare
There was a problem hiding this comment.
🧹 Nitpick comments (1)
pkg/util/topsql/reporter/pubsub_test.go (1)
392-402: Consider adding cleanup for consistency with other subtests.This subtest clears global TopRU state at the start but lacks a
t.Cleanupto restore state on failure. While the risk is minimal (since it only disables), adding cleanup would align with the pattern used in other subtests and improve test isolation.Suggested cleanup
t.Run("register fail does not enable topru", func(t *testing.T) { for topsqlstate.TopRUEnabled() { topsqlstate.DisableTopRU() } + t.Cleanup(func() { + for topsqlstate.TopRUEnabled() { + topsqlstate.DisableTopRU() + } + }) req := mockTopRUSubRequest(tipb.ItemInterval_ITEM_INTERVAL_15S)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/util/topsql/reporter/pubsub_test.go` around lines 392 - 402, The subtest "register fail does not enable topru" disables global TopRU state but doesn't restore it on test failure; capture the original state via topsqlstate.TopRUEnabled() at the start of the test and register a t.Cleanup that restores it (call topsqlstate.DisableTopRU() or re-enable as needed) so the global TopRU state is returned to its prior value after the test; update the anonymous test function containing topsqlstate.TopRUEnabled(), topsqlstate.DisableTopRU(), and the Subscribe call on NewTopSQLPubSubService to include this t.Cleanup restoration.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@pkg/util/topsql/reporter/pubsub_test.go`:
- Around line 392-402: The subtest "register fail does not enable topru"
disables global TopRU state but doesn't restore it on test failure; capture the
original state via topsqlstate.TopRUEnabled() at the start of the test and
register a t.Cleanup that restores it (call topsqlstate.DisableTopRU() or
re-enable as needed) so the global TopRU state is returned to its prior value
after the test; update the anonymous test function containing
topsqlstate.TopRUEnabled(), topsqlstate.DisableTopRU(), and the Subscribe call
on NewTopSQLPubSubService to include this t.Cleanup restoration.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #66642 +/- ##
================================================
- Coverage 77.6781% 77.1198% -0.5584%
================================================
Files 2006 1931 -75
Lines 548248 547081 -1167
================================================
- Hits 425869 421908 -3961
- Misses 120719 125137 +4418
+ Partials 1660 36 -1624
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
|
Review Complete Findings: 2 issues ℹ️ Learn more details on Pantheon AI. |
| return err | ||
| } | ||
|
|
||
| logutil.BgLogger().Info( |
There was a problem hiding this comment.
[P2] Info log fires on every TopRU interval set, including the first subscriber and same-value updates
Why: SetTopRUItemInterval unconditionally emits an Info log at state.go:158 on every valid call, with the message "top ru item interval overridden by later subscription". This fires even on the first subscriber (no "override" occurred) and when the new interval equals the current one. In deployments with frequent subscribe/reconnect cycles this generates operational noise and the misleading wording can confuse operators.
Evidence:
pkg/util/topsql/state/state.go:158—logutil.BgLogger().Info("[top-sql] top ru item interval overridden by later subscription", ...)called unconditionally for any valid intervalpkg/util/topsql/reporter/datasink.go:83—SetTopRUItemIntervalcalled on every*pubSubDataSinkregistration withenableTopRU==truepkg/util/topsql/reporter/pubsub.go:49— a new sink is created and registered per client stream
Suggestion: Guard the log behind an actual override check (e.g., only log when newInterval != currentInterval) and update the message to distinguish first-set from override, or downgrade to Debug level.
|
/retest |
|
@zimulala: PRs from untrusted users cannot be marked as trusted with DetailsIn response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
|
@zimulala: The following test failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
What problem does this PR solve?
Issue Number: close #66625
Problem Summary:
TopRU requires shared state/interval handling and subscription transport plumbing. These primitives are missing, which blocks the later RU aggregation/reporting pipeline and makes review harder.
What changed and how does it work?
tipband deps plus minimal BUILD wiring for new sources/tests.Scope
state.go/state_test.gofor TopRU enable/interval handling.datasink.go,pubsub.go,single_target.go,server.go) and associated tests.datamodel.go,metrics.go,rustats.go).tipbupgrade / deps (DEPS.bzl,go.mod,go.sum) and minimal BUILD updates.ru_*).Behavior / Compatibility
tipbadds TopRU fields; older components should ignore unknown fields, and unimplemented TopRU RPCs are tolerated.Follow-ups
Reviewer Notes
ru_*targets yet.git diff --name-only master...HEADshould match the PR1 file list only.Check List
Tests
Test Results:
f664f2e987e8cc4d469b24eb305ecd4fa7231fe2):TestPubSubDataSinkDoSendOrderIsStableSide effects
Documentation
Release note
Summary by CodeRabbit
New Features
Metrics
Tests
Chores