-
Notifications
You must be signed in to change notification settings - Fork 13.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-17016: Align the behavior of GaugeWrapper and MeterWrapper #16426
base: trunk
Are you sure you want to change the base?
Conversation
ab77222
to
e127c81
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@FrankYang0529 thanks for this patch!
e127c81
to
a1a3f08
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@FrankYang0529 thanks for your patch
} | ||
return gaugeObject | ||
return metric | ||
} | ||
|
||
def close(): Unit = gaugeLock synchronized { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems the lock is used to make sure we don't call removeMetric/newGauge repeatedly, so we can do a bit refactor for it. WDYT?
case class GaugeWrapper(metricType: String) {
private final val removed = new AtomicBoolean(false)
// The map to store:
// - per-partition value for topic-level metrics. The key will be the partition number
// - per-topic value for broker-level metrics. The key will be the topic name
private val metricValues = new ConcurrentHashMap[String, Long]()
def setValue(key: String, value: Long): Unit = {
newGaugeIfNeed()
metricValues.put(key, value)
}
def removeKey(key: String): Unit = {
newGaugeIfNeed()
metricValues.remove(key)
}
def close(): Unit = if (removed.compareAndSet(true, false)) {
metricsGroup.removeMetric(metricType, tags)
metricValues.clear()
}
private def newGaugeIfNeed(): Unit = if (removed.compareAndSet(false, true)) {
metricsGroup.newGauge(metricType, () => metricValues.values().stream().mapToLong(v => v).sum(), tags)
}
newGaugeIfNeed()
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the great suggestion. Updated it.
Signed-off-by: PoAn Yang <[email protected]>
a1a3f08
to
f734843
Compare
When using
MeterWrapper
, it initializesMeter
again after users close it.However, the
GaugeWrapper
behavior is different. We useaggregatedMetric
directly.If
GaugeWrapper
is closed, we don't have chance to initialize it again.Committer Checklist (excluded from commit message)