From e81213728b8ec1ffa5389bef8b2da0e43b2f4e48 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Tue, 25 Mar 2025 12:16:39 -0700 Subject: [PATCH] feat: add/correct some kafka-related metrics (#5757) * feat: add/correct some kafka-related metrics Signed-off-by: Ruihang Xia * fix clippy Signed-off-by: Ruihang Xia * fix dumb issues Signed-off-by: Ruihang Xia * per-partition produce latency Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- src/log-store/src/kafka/producer.rs | 16 ++++++++++++++++ src/log-store/src/metrics.rs | 23 +++++++++++++++++++++++ src/store-api/src/logstore/entry.rs | 8 +++++--- 3 files changed, 44 insertions(+), 3 deletions(-) diff --git a/src/log-store/src/kafka/producer.rs b/src/log-store/src/kafka/producer.rs index 80d214ed78..910465ba61 100644 --- a/src/log-store/src/kafka/producer.rs +++ b/src/log-store/src/kafka/producer.rs @@ -24,6 +24,10 @@ use tokio::sync::mpsc::{self, Receiver, Sender}; use crate::error::{self, Result}; use crate::kafka::index::IndexCollector; use crate::kafka::worker::{BackgroundProducerWorker, ProduceResultHandle, WorkerRequest}; +use crate::metrics::{ + METRIC_KAFKA_CLIENT_BYTES_TOTAL, METRIC_KAFKA_CLIENT_PRODUCE_ELAPSED, + METRIC_KAFKA_CLIENT_TRAFFIC_TOTAL, +}; pub type OrderedBatchProducerRef = Arc; @@ -106,6 +110,18 @@ impl ProducerClient for PartitionClient { records: Vec, compression: Compression, ) -> rskafka::client::error::Result> { + let total_size = records.iter().map(|r| r.approximate_size()).sum::(); + let partition = self.partition().to_string(); + METRIC_KAFKA_CLIENT_BYTES_TOTAL + .with_label_values(&[self.topic(), &partition]) + .inc_by(total_size as u64); + METRIC_KAFKA_CLIENT_TRAFFIC_TOTAL + .with_label_values(&[self.topic(), &partition]) + .inc(); + let _timer = METRIC_KAFKA_CLIENT_PRODUCE_ELAPSED + .with_label_values(&[self.topic(), &partition]) + .start_timer(); + self.produce(records, compression).await } diff --git a/src/log-store/src/metrics.rs b/src/log-store/src/metrics.rs index 787bcfe3a7..c82be98f66 100644 --- a/src/log-store/src/metrics.rs +++ b/src/log-store/src/metrics.rs @@ -19,6 +19,10 @@ use prometheus::*; pub const LOGSTORE_LABEL: &str = "logstore"; /// Operation type label. pub const OPTYPE_LABEL: &str = "optype"; +/// Kafka topic label. +pub const TOPIC_LABEL: &str = "topic"; +/// Kafka partition label. +pub const PARTITION_LABEL: &str = "partition"; lazy_static! { /// Counters of bytes of each operation on a logstore. @@ -62,4 +66,23 @@ lazy_static! { /// Timer of the append_batch operation on the raft-engine logstore. /// This timer only measures the duration of the read operation, not measures the total duration of replay. pub static ref METRIC_RAFT_ENGINE_READ_ELAPSED: Histogram = METRIC_LOGSTORE_OP_ELAPSED.with_label_values(&["raft-engine", "read"]); + + pub static ref METRIC_KAFKA_CLIENT_BYTES_TOTAL: IntCounterVec = register_int_counter_vec!( + "greptime_logstore_kafka_client_bytes_total", + "kafka logstore's bytes traffic total", + &[LOGSTORE_LABEL, PARTITION_LABEL], + ) + .unwrap(); + pub static ref METRIC_KAFKA_CLIENT_TRAFFIC_TOTAL: IntCounterVec = register_int_counter_vec!( + "greptime_logstore_kafka_client_traffic_total", + "kafka logstore's request count traffic total", + &[LOGSTORE_LABEL, PARTITION_LABEL], + ) + .unwrap(); + pub static ref METRIC_KAFKA_CLIENT_PRODUCE_ELAPSED: HistogramVec = register_histogram_vec!( + "greptime_logstore_kafka_client_produce_elapsed", + "kafka logstore produce operation elapsed", + &[LOGSTORE_LABEL, PARTITION_LABEL], + ) + .unwrap(); } diff --git a/src/store-api/src/logstore/entry.rs b/src/store-api/src/logstore/entry.rs index 8b7f838be1..f4cfe18b3e 100644 --- a/src/store-api/src/logstore/entry.rs +++ b/src/store-api/src/logstore/entry.rs @@ -57,8 +57,9 @@ pub struct NaiveEntry { } impl NaiveEntry { + /// Estimates the persisted size of the entry. fn estimated_size(&self) -> usize { - size_of::() + self.data.capacity() * size_of::() + size_of::() + self.data.len() * size_of::() } } @@ -84,14 +85,15 @@ impl MultiplePartEntry { && self.headers.contains(&MultiplePartHeader::Last) } + /// Estimates the persisted size of the entry. fn estimated_size(&self) -> usize { size_of::() + self .parts .iter() - .map(|data| data.capacity() * size_of::()) + .map(|data| data.len() * size_of::()) .sum::() - + self.headers.capacity() * size_of::() + + self.headers.len() * size_of::() } }