From 3e2a7d00e8e057b0f1b06be55d6df1d628cea63f Mon Sep 17 00:00:00 2001 From: Usiel Riedl Date: Fri, 16 Feb 2024 12:19:05 +0800 Subject: [PATCH 1/3] Ensure consumer group's partition status metrics are deleted The `burrow_kafka_topic_partition_status` metric for a deleted consumer group was dangling because we neglected to clear the corresponding gauge. --- core/internal/httpserver/prometheus.go | 1 + 1 file changed, 1 insertion(+) diff --git a/core/internal/httpserver/prometheus.go b/core/internal/httpserver/prometheus.go index 97e71146..2e5e4b82 100644 --- a/core/internal/httpserver/prometheus.go +++ b/core/internal/httpserver/prometheus.go @@ -73,6 +73,7 @@ func DeleteConsumerMetrics(cluster, consumer string) { consumerStatusGauge.Delete(labels) consumerPartitionLagGauge.DeletePartialMatch(labels) consumerPartitionCurrentOffset.DeletePartialMatch(labels) + partitionStatusGauge.DeletePartialMatch(labels) } // DeleteTopicMetrics deletes all metrics that are labeled with a topic From 8f8c2ed752b18bf61240ee757eb899303ee71839 Mon Sep 17 00:00:00 2001 From: Usiel Riedl Date: Fri, 16 Feb 2024 12:20:41 +0800 Subject: [PATCH 2/3] Correctly handle deletion of metrics when consumer group topic is deleted Previously, we deleted the all consumer group metrics when a group's topic was deleted. WIth this change we make sure to check for the case where a topic is provided (i.e. user wants to delete only a specific topic of a consumer group) and delete only the metrics that match both consumer group and topic. --- core/internal/httpserver/prometheus.go | 13 +++++++++++++ core/internal/storage/inmemory.go | 12 +++++++++++- 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/core/internal/httpserver/prometheus.go b/core/internal/httpserver/prometheus.go index 2e5e4b82..e7795b6c 100644 --- a/core/internal/httpserver/prometheus.go +++ b/core/internal/httpserver/prometheus.go @@ -93,6 +93,19 @@ func DeleteTopicMetrics(cluster, topic string) { consumerStatusGauge.DeletePartialMatch(labels) } +// DeleteConsumerTopicMetrics deletes all metrics that are labeled with the provided consumer group AND topic +func DeleteConsumerTopicMetrics(cluster, consumer string, topic string) { + labels := map[string]string{ + "cluster": cluster, + "consumer_group": consumer, + "topic": topic, + } + + partitionStatusGauge.DeletePartialMatch(labels) + consumerPartitionCurrentOffset.DeletePartialMatch(labels) + consumerPartitionLagGauge.DeletePartialMatch(labels) +} + func (hc *Coordinator) handlePrometheusMetrics() http.HandlerFunc { promHandler := promhttp.Handler() diff --git a/core/internal/storage/inmemory.go b/core/internal/storage/inmemory.go index a4088234..09ddc91f 100644 --- a/core/internal/storage/inmemory.go +++ b/core/internal/storage/inmemory.go @@ -667,16 +667,26 @@ func (module *InMemoryStorage) deleteGroup(request *protocol.StorageRequest, req } clusterMap.consumerLock.Lock() + deleteAllGroupMetrics := true if group, ok := clusterMap.consumer[request.Group]; ok && request.Topic != "" { delete(group.topics, request.Topic) if len(group.topics) == 0 { delete(clusterMap.consumer, request.Group) + } else { + // The consumer group consumes other topics, thus we need to keep its metrics + deleteAllGroupMetrics = false } } else { delete(clusterMap.consumer, request.Group) } clusterMap.consumerLock.Unlock() - httpserver.DeleteConsumerMetrics(request.Cluster, request.Group) + + if deleteAllGroupMetrics { + httpserver.DeleteConsumerMetrics(request.Cluster, request.Group) + } else { + // only a specific topic was deleted and the consumer group still exists, thus we delete only a subset of the metrics + httpserver.DeleteConsumerTopicMetrics(request.Cluster, request.Group, request.Topic) + } requestLogger.Debug("ok") } From 63514ff7e39c521315489027786150828629a454 Mon Sep 17 00:00:00 2001 From: Usiel Riedl Date: Mon, 29 Apr 2024 19:25:25 +0800 Subject: [PATCH 3/3] Fixes lint error --- core/internal/httpserver/prometheus.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/internal/httpserver/prometheus.go b/core/internal/httpserver/prometheus.go index e7795b6c..1ba1059e 100644 --- a/core/internal/httpserver/prometheus.go +++ b/core/internal/httpserver/prometheus.go @@ -94,7 +94,7 @@ func DeleteTopicMetrics(cluster, topic string) { } // DeleteConsumerTopicMetrics deletes all metrics that are labeled with the provided consumer group AND topic -func DeleteConsumerTopicMetrics(cluster, consumer string, topic string) { +func DeleteConsumerTopicMetrics(cluster, consumer, topic string) { labels := map[string]string{ "cluster": cluster, "consumer_group": consumer,