feat: add/correct some kafka-related metrics (#5757)

* feat: add/correct some kafka-related metrics

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix dumb issues

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* per-partition produce latency

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2025-03-25 12:16:39 -07:00
committed by GitHub
parent d88482b996
commit e81213728b
3 changed files with 44 additions and 3 deletions

View File

@@ -24,6 +24,10 @@ use tokio::sync::mpsc::{self, Receiver, Sender};
use crate::error::{self, Result};
use crate::kafka::index::IndexCollector;
use crate::kafka::worker::{BackgroundProducerWorker, ProduceResultHandle, WorkerRequest};
use crate::metrics::{
METRIC_KAFKA_CLIENT_BYTES_TOTAL, METRIC_KAFKA_CLIENT_PRODUCE_ELAPSED,
METRIC_KAFKA_CLIENT_TRAFFIC_TOTAL,
};
pub type OrderedBatchProducerRef = Arc<OrderedBatchProducer>;
@@ -106,6 +110,18 @@ impl ProducerClient for PartitionClient {
records: Vec<Record>,
compression: Compression,
) -> rskafka::client::error::Result<Vec<i64>> {
let total_size = records.iter().map(|r| r.approximate_size()).sum::<usize>();
let partition = self.partition().to_string();
METRIC_KAFKA_CLIENT_BYTES_TOTAL
.with_label_values(&[self.topic(), &partition])
.inc_by(total_size as u64);
METRIC_KAFKA_CLIENT_TRAFFIC_TOTAL
.with_label_values(&[self.topic(), &partition])
.inc();
let _timer = METRIC_KAFKA_CLIENT_PRODUCE_ELAPSED
.with_label_values(&[self.topic(), &partition])
.start_timer();
self.produce(records, compression).await
}

View File

@@ -19,6 +19,10 @@ use prometheus::*;
pub const LOGSTORE_LABEL: &str = "logstore";
/// Operation type label.
pub const OPTYPE_LABEL: &str = "optype";
/// Kafka topic label.
pub const TOPIC_LABEL: &str = "topic";
/// Kafka partition label.
pub const PARTITION_LABEL: &str = "partition";
lazy_static! {
/// Counters of bytes of each operation on a logstore.
@@ -62,4 +66,23 @@ lazy_static! {
/// Timer of the append_batch operation on the raft-engine logstore.
/// This timer only measures the duration of the read operation, not measures the total duration of replay.
pub static ref METRIC_RAFT_ENGINE_READ_ELAPSED: Histogram = METRIC_LOGSTORE_OP_ELAPSED.with_label_values(&["raft-engine", "read"]);
pub static ref METRIC_KAFKA_CLIENT_BYTES_TOTAL: IntCounterVec = register_int_counter_vec!(
"greptime_logstore_kafka_client_bytes_total",
"kafka logstore's bytes traffic total",
&[LOGSTORE_LABEL, PARTITION_LABEL],
)
.unwrap();
pub static ref METRIC_KAFKA_CLIENT_TRAFFIC_TOTAL: IntCounterVec = register_int_counter_vec!(
"greptime_logstore_kafka_client_traffic_total",
"kafka logstore's request count traffic total",
&[LOGSTORE_LABEL, PARTITION_LABEL],
)
.unwrap();
pub static ref METRIC_KAFKA_CLIENT_PRODUCE_ELAPSED: HistogramVec = register_histogram_vec!(
"greptime_logstore_kafka_client_produce_elapsed",
"kafka logstore produce operation elapsed",
&[LOGSTORE_LABEL, PARTITION_LABEL],
)
.unwrap();
}

View File

@@ -57,8 +57,9 @@ pub struct NaiveEntry {
}
impl NaiveEntry {
/// Estimates the persisted size of the entry.
fn estimated_size(&self) -> usize {
size_of::<Self>() + self.data.capacity() * size_of::<u8>()
size_of::<Self>() + self.data.len() * size_of::<u8>()
}
}
@@ -84,14 +85,15 @@ impl MultiplePartEntry {
&& self.headers.contains(&MultiplePartHeader::Last)
}
/// Estimates the persisted size of the entry.
fn estimated_size(&self) -> usize {
size_of::<Self>()
+ self
.parts
.iter()
.map(|data| data.capacity() * size_of::<u8>())
.map(|data| data.len() * size_of::<u8>())
.sum::<usize>()
+ self.headers.capacity() * size_of::<MultiplePartHeader>()
+ self.headers.len() * size_of::<MultiplePartHeader>()
}
}