diff --git a/src/log-store/src/kafka/producer.rs b/src/log-store/src/kafka/producer.rs index 80d214ed78..910465ba61 100644 --- a/src/log-store/src/kafka/producer.rs +++ b/src/log-store/src/kafka/producer.rs @@ -24,6 +24,10 @@ use tokio::sync::mpsc::{self, Receiver, Sender}; use crate::error::{self, Result}; use crate::kafka::index::IndexCollector; use crate::kafka::worker::{BackgroundProducerWorker, ProduceResultHandle, WorkerRequest}; +use crate::metrics::{ + METRIC_KAFKA_CLIENT_BYTES_TOTAL, METRIC_KAFKA_CLIENT_PRODUCE_ELAPSED, + METRIC_KAFKA_CLIENT_TRAFFIC_TOTAL, +}; pub type OrderedBatchProducerRef = Arc; @@ -106,6 +110,18 @@ impl ProducerClient for PartitionClient { records: Vec, compression: Compression, ) -> rskafka::client::error::Result> { + let total_size = records.iter().map(|r| r.approximate_size()).sum::(); + let partition = self.partition().to_string(); + METRIC_KAFKA_CLIENT_BYTES_TOTAL + .with_label_values(&[self.topic(), &partition]) + .inc_by(total_size as u64); + METRIC_KAFKA_CLIENT_TRAFFIC_TOTAL + .with_label_values(&[self.topic(), &partition]) + .inc(); + let _timer = METRIC_KAFKA_CLIENT_PRODUCE_ELAPSED + .with_label_values(&[self.topic(), &partition]) + .start_timer(); + self.produce(records, compression).await } diff --git a/src/log-store/src/metrics.rs b/src/log-store/src/metrics.rs index 787bcfe3a7..c82be98f66 100644 --- a/src/log-store/src/metrics.rs +++ b/src/log-store/src/metrics.rs @@ -19,6 +19,10 @@ use prometheus::*; pub const LOGSTORE_LABEL: &str = "logstore"; /// Operation type label. pub const OPTYPE_LABEL: &str = "optype"; +/// Kafka topic label. +pub const TOPIC_LABEL: &str = "topic"; +/// Kafka partition label. +pub const PARTITION_LABEL: &str = "partition"; lazy_static! { /// Counters of bytes of each operation on a logstore. @@ -62,4 +66,23 @@ lazy_static! { /// Timer of the append_batch operation on the raft-engine logstore. /// This timer only measures the duration of the read operation, not measures the total duration of replay. pub static ref METRIC_RAFT_ENGINE_READ_ELAPSED: Histogram = METRIC_LOGSTORE_OP_ELAPSED.with_label_values(&["raft-engine", "read"]); + + pub static ref METRIC_KAFKA_CLIENT_BYTES_TOTAL: IntCounterVec = register_int_counter_vec!( + "greptime_logstore_kafka_client_bytes_total", + "kafka logstore's bytes traffic total", + &[LOGSTORE_LABEL, PARTITION_LABEL], + ) + .unwrap(); + pub static ref METRIC_KAFKA_CLIENT_TRAFFIC_TOTAL: IntCounterVec = register_int_counter_vec!( + "greptime_logstore_kafka_client_traffic_total", + "kafka logstore's request count traffic total", + &[LOGSTORE_LABEL, PARTITION_LABEL], + ) + .unwrap(); + pub static ref METRIC_KAFKA_CLIENT_PRODUCE_ELAPSED: HistogramVec = register_histogram_vec!( + "greptime_logstore_kafka_client_produce_elapsed", + "kafka logstore produce operation elapsed", + &[LOGSTORE_LABEL, PARTITION_LABEL], + ) + .unwrap(); } diff --git a/src/store-api/src/logstore/entry.rs b/src/store-api/src/logstore/entry.rs index 8b7f838be1..f4cfe18b3e 100644 --- a/src/store-api/src/logstore/entry.rs +++ b/src/store-api/src/logstore/entry.rs @@ -57,8 +57,9 @@ pub struct NaiveEntry { } impl NaiveEntry { + /// Estimates the persisted size of the entry. fn estimated_size(&self) -> usize { - size_of::() + self.data.capacity() * size_of::() + size_of::() + self.data.len() * size_of::() } } @@ -84,14 +85,15 @@ impl MultiplePartEntry { && self.headers.contains(&MultiplePartHeader::Last) } + /// Estimates the persisted size of the entry. fn estimated_size(&self) -> usize { size_of::() + self .parts .iter() - .map(|data| data.capacity() * size_of::()) + .map(|data| data.len() * size_of::()) .sum::() - + self.headers.capacity() * size_of::() + + self.headers.len() * size_of::() } }