mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-09 14:52:58 +00:00
feat: introduce wal benchmarker (#3446)
* feat: introduce wal benchmarker * chore: add log store metrics * chore: add some comments to wal benchmarker * fix: ci * chore: add more metrics for kafka logstore * chore: add more timers for kafka logstore * chore: add more configs * chore: move humantime to common dependencies * refactor: refactor wal benchmarker * fix: apply suggestions from code review * doc: add a simple README for wal benchmarker * fix: Cargo.toml * fix: clippy * chore: rename wal.rs to wal_bench.rs * fix: compile
This commit is contained in:
22
Cargo.lock
generated
22
Cargo.lock
generated
@@ -876,14 +876,34 @@ dependencies = [
|
||||
name = "benchmarks"
|
||||
version = "0.7.1"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arrow",
|
||||
"chrono",
|
||||
"clap 4.4.11",
|
||||
"client",
|
||||
"common-base",
|
||||
"common-telemetry",
|
||||
"common-wal",
|
||||
"dotenv",
|
||||
"futures",
|
||||
"futures-util",
|
||||
"humantime",
|
||||
"humantime-serde",
|
||||
"indicatif",
|
||||
"itertools 0.10.5",
|
||||
"lazy_static",
|
||||
"log-store",
|
||||
"mito2",
|
||||
"num_cpus",
|
||||
"parquet",
|
||||
"prometheus",
|
||||
"rand",
|
||||
"rskafka",
|
||||
"serde",
|
||||
"store-api",
|
||||
"tokio",
|
||||
"toml 0.8.8",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -4845,6 +4865,8 @@ dependencies = [
|
||||
"futures",
|
||||
"futures-util",
|
||||
"itertools 0.10.5",
|
||||
"lazy_static",
|
||||
"prometheus",
|
||||
"protobuf",
|
||||
"protobuf-build",
|
||||
"raft-engine",
|
||||
|
||||
@@ -99,11 +99,13 @@ datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.g
|
||||
datafusion-sql = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
|
||||
datafusion-substrait = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
|
||||
derive_builder = "0.12"
|
||||
dotenv = "0.15"
|
||||
etcd-client = "0.12"
|
||||
fst = "0.4.7"
|
||||
futures = "0.3"
|
||||
futures-util = "0.3"
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "06f6297ff3cab578a1589741b504342fbad70453" }
|
||||
humantime = "2.1"
|
||||
humantime-serde = "1.1"
|
||||
itertools = "0.10"
|
||||
lazy_static = "1.4"
|
||||
|
||||
@@ -8,11 +8,31 @@ license.workspace = true
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
api.workspace = true
|
||||
arrow.workspace = true
|
||||
chrono.workspace = true
|
||||
clap.workspace = true
|
||||
client.workspace = true
|
||||
common-base.workspace = true
|
||||
common-telemetry.workspace = true
|
||||
common-wal.workspace = true
|
||||
dotenv.workspace = true
|
||||
futures.workspace = true
|
||||
futures-util.workspace = true
|
||||
humantime.workspace = true
|
||||
humantime-serde.workspace = true
|
||||
indicatif = "0.17.1"
|
||||
itertools.workspace = true
|
||||
lazy_static.workspace = true
|
||||
log-store.workspace = true
|
||||
mito2.workspace = true
|
||||
num_cpus.workspace = true
|
||||
parquet.workspace = true
|
||||
prometheus.workspace = true
|
||||
rand.workspace = true
|
||||
rskafka.workspace = true
|
||||
serde.workspace = true
|
||||
store-api.workspace = true
|
||||
tokio.workspace = true
|
||||
toml.workspace = true
|
||||
uuid.workspace = true
|
||||
|
||||
11
benchmarks/README.md
Normal file
11
benchmarks/README.md
Normal file
@@ -0,0 +1,11 @@
|
||||
Benchmarkers for GreptimeDB
|
||||
--------------------------------
|
||||
|
||||
## Wal Benchmarker
|
||||
The wal benchmarker serves to evaluate the performance of GreptimeDB's Write-Ahead Log (WAL) component. It meticulously assesses the read/write performance of the WAL under diverse workloads generated by the benchmarker.
|
||||
|
||||
|
||||
### How to use
|
||||
To compile the benchmarker, navigate to the `greptimedb/benchmarks` directory and execute `cargo build --release`. Subsequently, you'll find the compiled target located at `greptimedb/target/release/wal_bench`.
|
||||
|
||||
The `./wal_bench -h` command reveals numerous arguments that the target accepts. Among these, a notable one is the `cfg-file` argument. By utilizing a configuration file in the TOML format, you can bypass the need to repeatedly specify cumbersome arguments.
|
||||
21
benchmarks/config/wal_bench.example.toml
Normal file
21
benchmarks/config/wal_bench.example.toml
Normal file
@@ -0,0 +1,21 @@
|
||||
# Refers to the documents of `Args` in benchmarks/src/wal.rs`.
|
||||
wal_provider = "kafka"
|
||||
bootstrap_brokers = ["localhost:9092"]
|
||||
num_workers = 10
|
||||
num_topics = 32
|
||||
num_regions = 1000
|
||||
num_scrapes = 1000
|
||||
num_rows = 5
|
||||
col_types = "ifs"
|
||||
max_batch_size = "512KB"
|
||||
linger = "1ms"
|
||||
backoff_init = "10ms"
|
||||
backoff_max = "1ms"
|
||||
backoff_base = 2
|
||||
backoff_deadline = "3s"
|
||||
compression = "zstd"
|
||||
rng_seed = 42
|
||||
skip_read = false
|
||||
skip_write = false
|
||||
random_topics = true
|
||||
report_metrics = false
|
||||
326
benchmarks/src/bin/wal_bench.rs
Normal file
326
benchmarks/src/bin/wal_bench.rs
Normal file
@@ -0,0 +1,326 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#![feature(int_roundings)]
|
||||
|
||||
use std::fs;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
use api::v1::{ColumnDataType, ColumnSchema, SemanticType};
|
||||
use benchmarks::metrics;
|
||||
use benchmarks::wal_bench::{Args, Config, Region, WalProvider};
|
||||
use clap::Parser;
|
||||
use common_telemetry::info;
|
||||
use common_wal::config::kafka::common::BackoffConfig;
|
||||
use common_wal::config::kafka::DatanodeKafkaConfig as KafkaConfig;
|
||||
use common_wal::config::raft_engine::RaftEngineConfig;
|
||||
use common_wal::options::{KafkaWalOptions, WalOptions};
|
||||
use itertools::Itertools;
|
||||
use log_store::kafka::log_store::KafkaLogStore;
|
||||
use log_store::raft_engine::log_store::RaftEngineLogStore;
|
||||
use mito2::wal::Wal;
|
||||
use prometheus::{Encoder, TextEncoder};
|
||||
use rand::distributions::{Alphanumeric, DistString};
|
||||
use rand::rngs::SmallRng;
|
||||
use rand::SeedableRng;
|
||||
use rskafka::client::partition::Compression;
|
||||
use rskafka::client::ClientBuilder;
|
||||
use store_api::logstore::LogStore;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
async fn run_benchmarker<S: LogStore>(cfg: &Config, topics: &[String], wal: Arc<Wal<S>>) {
|
||||
let chunk_size = cfg.num_regions.div_ceil(cfg.num_workers);
|
||||
let region_chunks = (0..cfg.num_regions)
|
||||
.map(|id| {
|
||||
build_region(
|
||||
id as u64,
|
||||
topics,
|
||||
&mut SmallRng::seed_from_u64(cfg.rng_seed),
|
||||
cfg,
|
||||
)
|
||||
})
|
||||
.chunks(chunk_size as usize)
|
||||
.into_iter()
|
||||
.map(|chunk| Arc::new(chunk.collect::<Vec<_>>()))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let mut write_elapsed = 0;
|
||||
let mut read_elapsed = 0;
|
||||
|
||||
if !cfg.skip_write {
|
||||
info!("Benchmarking write ...");
|
||||
|
||||
let num_scrapes = cfg.num_scrapes;
|
||||
let timer = Instant::now();
|
||||
futures::future::join_all((0..cfg.num_workers).map(|i| {
|
||||
let wal = wal.clone();
|
||||
let regions = region_chunks[i as usize].clone();
|
||||
tokio::spawn(async move {
|
||||
for _ in 0..num_scrapes {
|
||||
let mut wal_writer = wal.writer();
|
||||
regions
|
||||
.iter()
|
||||
.for_each(|region| region.add_wal_entry(&mut wal_writer));
|
||||
wal_writer.write_to_wal().await.unwrap();
|
||||
}
|
||||
})
|
||||
}))
|
||||
.await;
|
||||
write_elapsed += timer.elapsed().as_millis();
|
||||
}
|
||||
|
||||
if !cfg.skip_read {
|
||||
info!("Benchmarking read ...");
|
||||
|
||||
let timer = Instant::now();
|
||||
futures::future::join_all((0..cfg.num_workers).map(|i| {
|
||||
let wal = wal.clone();
|
||||
let regions = region_chunks[i as usize].clone();
|
||||
tokio::spawn(async move {
|
||||
for region in regions.iter() {
|
||||
region.replay(&wal).await;
|
||||
}
|
||||
})
|
||||
}))
|
||||
.await;
|
||||
read_elapsed = timer.elapsed().as_millis();
|
||||
}
|
||||
|
||||
dump_report(cfg, write_elapsed, read_elapsed);
|
||||
}
|
||||
|
||||
fn build_region(id: u64, topics: &[String], rng: &mut SmallRng, cfg: &Config) -> Region {
|
||||
let wal_options = match cfg.wal_provider {
|
||||
WalProvider::Kafka => {
|
||||
assert!(!topics.is_empty());
|
||||
WalOptions::Kafka(KafkaWalOptions {
|
||||
topic: topics.get(id as usize % topics.len()).cloned().unwrap(),
|
||||
})
|
||||
}
|
||||
WalProvider::RaftEngine => WalOptions::RaftEngine,
|
||||
};
|
||||
Region::new(
|
||||
RegionId::from_u64(id),
|
||||
build_schema(&parse_col_types(&cfg.col_types), rng),
|
||||
wal_options,
|
||||
cfg.num_rows,
|
||||
cfg.rng_seed,
|
||||
)
|
||||
}
|
||||
|
||||
fn build_schema(col_types: &[ColumnDataType], mut rng: &mut SmallRng) -> Vec<ColumnSchema> {
|
||||
col_types
|
||||
.iter()
|
||||
.map(|col_type| ColumnSchema {
|
||||
column_name: Alphanumeric.sample_string(&mut rng, 5),
|
||||
datatype: *col_type as i32,
|
||||
semantic_type: SemanticType::Field as i32,
|
||||
datatype_extension: None,
|
||||
})
|
||||
.chain(vec![ColumnSchema {
|
||||
column_name: "ts".to_string(),
|
||||
datatype: ColumnDataType::TimestampMillisecond as i32,
|
||||
semantic_type: SemanticType::Tag as i32,
|
||||
datatype_extension: None,
|
||||
}])
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn dump_report(cfg: &Config, write_elapsed: u128, read_elapsed: u128) {
|
||||
let cost_report = format!(
|
||||
"write costs: {} ms, read costs: {} ms",
|
||||
write_elapsed, read_elapsed,
|
||||
);
|
||||
|
||||
let total_written_bytes = metrics::METRIC_WAL_WRITE_BYTES_TOTAL.get() as u128;
|
||||
let write_throughput = if write_elapsed > 0 {
|
||||
(total_written_bytes * 1000).div_floor(write_elapsed)
|
||||
} else {
|
||||
0
|
||||
};
|
||||
let total_read_bytes = metrics::METRIC_WAL_READ_BYTES_TOTAL.get() as u128;
|
||||
let read_throughput = if read_elapsed > 0 {
|
||||
(total_read_bytes * 1000).div_floor(read_elapsed)
|
||||
} else {
|
||||
0
|
||||
};
|
||||
|
||||
let throughput_report = format!(
|
||||
"total written bytes: {} bytes, total read bytes: {} bytes, write throuput: {} bytes/s ({} mb/s), read throughput: {} bytes/s ({} mb/s)",
|
||||
total_written_bytes,
|
||||
total_read_bytes,
|
||||
write_throughput,
|
||||
write_throughput.div_floor(1 << 20),
|
||||
read_throughput,
|
||||
read_throughput.div_floor(1 << 20),
|
||||
);
|
||||
|
||||
let metrics_report = if cfg.report_metrics {
|
||||
let mut buffer = Vec::new();
|
||||
let encoder = TextEncoder::new();
|
||||
let metrics = prometheus::gather();
|
||||
encoder.encode(&metrics, &mut buffer).unwrap();
|
||||
String::from_utf8(buffer).unwrap()
|
||||
} else {
|
||||
String::new()
|
||||
};
|
||||
|
||||
info!(
|
||||
r#"
|
||||
Benchmark config:
|
||||
{cfg:?}
|
||||
|
||||
Benchmark report:
|
||||
{cost_report}
|
||||
{throughput_report}
|
||||
{metrics_report}"#
|
||||
);
|
||||
}
|
||||
|
||||
async fn create_topics(cfg: &Config) -> Vec<String> {
|
||||
// Creates topics.
|
||||
let client = ClientBuilder::new(cfg.bootstrap_brokers.clone())
|
||||
.build()
|
||||
.await
|
||||
.unwrap();
|
||||
let ctrl_client = client.controller_client().unwrap();
|
||||
let (topics, tasks): (Vec<_>, Vec<_>) = (0..cfg.num_topics)
|
||||
.map(|i| {
|
||||
let topic = if cfg.random_topics {
|
||||
format!(
|
||||
"greptime_wal_bench_topic_{}_{}",
|
||||
uuid::Uuid::new_v4().as_u128(),
|
||||
i
|
||||
)
|
||||
} else {
|
||||
format!("greptime_wal_bench_topic_{}", i)
|
||||
};
|
||||
let task = ctrl_client.create_topic(
|
||||
topic.clone(),
|
||||
1,
|
||||
cfg.bootstrap_brokers.len() as i16,
|
||||
2000,
|
||||
);
|
||||
(topic, task)
|
||||
})
|
||||
.unzip();
|
||||
// Must ignore errors since we allow topics being created more than once.
|
||||
let _ = futures::future::try_join_all(tasks).await;
|
||||
|
||||
topics
|
||||
}
|
||||
|
||||
fn parse_compression(comp: &str) -> Compression {
|
||||
match comp {
|
||||
"no" => Compression::NoCompression,
|
||||
"gzip" => Compression::Gzip,
|
||||
"lz4" => Compression::Lz4,
|
||||
"snappy" => Compression::Snappy,
|
||||
"zstd" => Compression::Zstd,
|
||||
other => unreachable!("Unrecognized compression {other}"),
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_col_types(col_types: &str) -> Vec<ColumnDataType> {
|
||||
let parts = col_types.split('x').collect::<Vec<_>>();
|
||||
assert!(parts.len() <= 2);
|
||||
|
||||
let pattern = parts[0];
|
||||
let repeat = parts
|
||||
.get(1)
|
||||
.map(|r| r.parse::<usize>().unwrap())
|
||||
.unwrap_or(1);
|
||||
|
||||
pattern
|
||||
.chars()
|
||||
.map(|c| match c {
|
||||
'i' | 'I' => ColumnDataType::Int64,
|
||||
'f' | 'F' => ColumnDataType::Float64,
|
||||
's' | 'S' => ColumnDataType::String,
|
||||
other => unreachable!("Cannot parse {other} as a column data type"),
|
||||
})
|
||||
.cycle()
|
||||
.take(pattern.len() * repeat)
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn main() {
|
||||
// Sets the global logging to INFO and suppress loggings from rskafka other than ERROR and upper ones.
|
||||
std::env::set_var("UNITTEST_LOG_LEVEL", "info,rskafka=error");
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let args = Args::parse();
|
||||
let cfg = if !args.cfg_file.is_empty() {
|
||||
toml::from_str(&fs::read_to_string(&args.cfg_file).unwrap()).unwrap()
|
||||
} else {
|
||||
Config::from(args)
|
||||
};
|
||||
|
||||
// Validates arguments.
|
||||
if cfg.num_regions < cfg.num_workers {
|
||||
panic!("num_regions must be greater than or equal to num_workers");
|
||||
}
|
||||
if cfg
|
||||
.num_workers
|
||||
.min(cfg.num_topics)
|
||||
.min(cfg.num_regions)
|
||||
.min(cfg.num_scrapes)
|
||||
.min(cfg.max_batch_size.as_bytes() as u32)
|
||||
.min(cfg.bootstrap_brokers.len() as u32)
|
||||
== 0
|
||||
{
|
||||
panic!("Invalid arguments");
|
||||
}
|
||||
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap()
|
||||
.block_on(async {
|
||||
match cfg.wal_provider {
|
||||
WalProvider::Kafka => {
|
||||
let topics = create_topics(&cfg).await;
|
||||
let kafka_cfg = KafkaConfig {
|
||||
broker_endpoints: cfg.bootstrap_brokers.clone(),
|
||||
max_batch_size: cfg.max_batch_size,
|
||||
linger: cfg.linger,
|
||||
backoff: BackoffConfig {
|
||||
init: cfg.backoff_init,
|
||||
max: cfg.backoff_max,
|
||||
base: cfg.backoff_base,
|
||||
deadline: Some(cfg.backoff_deadline),
|
||||
},
|
||||
compression: parse_compression(&cfg.compression),
|
||||
..Default::default()
|
||||
};
|
||||
let store = Arc::new(KafkaLogStore::try_new(&kafka_cfg).await.unwrap());
|
||||
let wal = Arc::new(Wal::new(store));
|
||||
run_benchmarker(&cfg, &topics, wal).await;
|
||||
}
|
||||
WalProvider::RaftEngine => {
|
||||
// The benchmarker assumes the raft engine directory exists.
|
||||
let store = RaftEngineLogStore::try_new(
|
||||
"/tmp/greptimedb/raft-engine-wal".to_string(),
|
||||
RaftEngineConfig::default(),
|
||||
)
|
||||
.await
|
||||
.map(Arc::new)
|
||||
.unwrap();
|
||||
let wal = Arc::new(Wal::new(store));
|
||||
run_benchmarker(&cfg, &[], wal).await;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
16
benchmarks/src/lib.rs
Normal file
16
benchmarks/src/lib.rs
Normal file
@@ -0,0 +1,16 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
pub mod metrics;
|
||||
pub mod wal_bench;
|
||||
39
benchmarks/src/metrics.rs
Normal file
39
benchmarks/src/metrics.rs
Normal file
@@ -0,0 +1,39 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use lazy_static::lazy_static;
|
||||
use prometheus::*;
|
||||
|
||||
/// Logstore label.
|
||||
pub const LOGSTORE_LABEL: &str = "logstore";
|
||||
/// Operation type label.
|
||||
pub const OPTYPE_LABEL: &str = "optype";
|
||||
|
||||
lazy_static! {
|
||||
/// Counters of bytes of each operation on a logstore.
|
||||
pub static ref METRIC_WAL_OP_BYTES_TOTAL: IntCounterVec = register_int_counter_vec!(
|
||||
"greptime_bench_wal_op_bytes_total",
|
||||
"wal operation bytes total",
|
||||
&[OPTYPE_LABEL],
|
||||
)
|
||||
.unwrap();
|
||||
/// Counter of bytes of the append_batch operation.
|
||||
pub static ref METRIC_WAL_WRITE_BYTES_TOTAL: IntCounter = METRIC_WAL_OP_BYTES_TOTAL.with_label_values(
|
||||
&["write"],
|
||||
);
|
||||
/// Counter of bytes of the read operation.
|
||||
pub static ref METRIC_WAL_READ_BYTES_TOTAL: IntCounter = METRIC_WAL_OP_BYTES_TOTAL.with_label_values(
|
||||
&["read"],
|
||||
);
|
||||
}
|
||||
361
benchmarks/src/wal_bench.rs
Normal file
361
benchmarks/src/wal_bench.rs
Normal file
@@ -0,0 +1,361 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::mem::size_of;
|
||||
use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Duration;
|
||||
|
||||
use api::v1::value::ValueData;
|
||||
use api::v1::{ColumnDataType, ColumnSchema, Mutation, OpType, Row, Rows, Value, WalEntry};
|
||||
use clap::{Parser, ValueEnum};
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_wal::options::WalOptions;
|
||||
use futures::StreamExt;
|
||||
use mito2::wal::{Wal, WalWriter};
|
||||
use rand::distributions::{Alphanumeric, DistString, Uniform};
|
||||
use rand::rngs::SmallRng;
|
||||
use rand::{Rng, SeedableRng};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use store_api::logstore::LogStore;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::metrics;
|
||||
|
||||
/// The wal provider.
|
||||
#[derive(Clone, ValueEnum, Default, Debug, PartialEq, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum WalProvider {
|
||||
#[default]
|
||||
RaftEngine,
|
||||
Kafka,
|
||||
}
|
||||
|
||||
#[derive(Parser)]
|
||||
pub struct Args {
|
||||
/// The provided configuration file.
|
||||
/// The example configuration file can be found at `greptimedb/benchmarks/config/wal_bench.example.toml`.
|
||||
#[clap(long, short = 'c')]
|
||||
pub cfg_file: String,
|
||||
|
||||
/// The wal provider.
|
||||
#[clap(long, value_enum, default_value_t = WalProvider::default())]
|
||||
pub wal_provider: WalProvider,
|
||||
|
||||
/// The advertised addresses of the kafka brokers.
|
||||
/// If there're multiple bootstrap brokers, their addresses should be separated by comma, for e.g. "localhost:9092,localhost:9093".
|
||||
#[clap(long, short = 'b', default_value = "localhost:9092")]
|
||||
pub bootstrap_brokers: String,
|
||||
|
||||
/// The number of workers each running in a dedicated thread.
|
||||
#[clap(long, default_value_t = num_cpus::get() as u32)]
|
||||
pub num_workers: u32,
|
||||
|
||||
/// The number of kafka topics to be created.
|
||||
#[clap(long, default_value_t = 32)]
|
||||
pub num_topics: u32,
|
||||
|
||||
/// The number of regions.
|
||||
#[clap(long, default_value_t = 1000)]
|
||||
pub num_regions: u32,
|
||||
|
||||
/// The number of times each region is scraped.
|
||||
#[clap(long, default_value_t = 1000)]
|
||||
pub num_scrapes: u32,
|
||||
|
||||
/// The number of rows in each wal entry.
|
||||
/// Each time a region is scraped, a wal entry containing will be produced.
|
||||
#[clap(long, default_value_t = 5)]
|
||||
pub num_rows: u32,
|
||||
|
||||
/// The column types of the schema for each region.
|
||||
/// Currently, three column types are supported:
|
||||
/// - i = ColumnDataType::Int64
|
||||
/// - f = ColumnDataType::Float64
|
||||
/// - s = ColumnDataType::String
|
||||
/// For e.g., "ifs" will be parsed as three columns: i64, f64, and string.
|
||||
///
|
||||
/// Additionally, a "x" sign can be provided to repeat the column types for a given number of times.
|
||||
/// For e.g., "iix2" will be parsed as 4 columns: i64, i64, i64, and i64.
|
||||
/// This feature is useful if you want to specify many columns.
|
||||
#[clap(long, default_value = "ifs")]
|
||||
pub col_types: String,
|
||||
|
||||
/// The maximum size of a batch of kafka records.
|
||||
/// The default value is 1mb.
|
||||
#[clap(long, default_value = "512KB")]
|
||||
pub max_batch_size: ReadableSize,
|
||||
|
||||
/// The minimum latency the kafka client issues a batch of kafka records.
|
||||
/// However, a batch of kafka records would be immediately issued if a record cannot be fit into the batch.
|
||||
#[clap(long, default_value = "1ms")]
|
||||
pub linger: String,
|
||||
|
||||
/// The initial backoff delay of the kafka consumer.
|
||||
#[clap(long, default_value = "10ms")]
|
||||
pub backoff_init: String,
|
||||
|
||||
/// The maximum backoff delay of the kafka consumer.
|
||||
#[clap(long, default_value = "1s")]
|
||||
pub backoff_max: String,
|
||||
|
||||
/// The exponential backoff rate of the kafka consumer. The next back off = base * the current backoff.
|
||||
#[clap(long, default_value_t = 2)]
|
||||
pub backoff_base: u32,
|
||||
|
||||
/// The deadline of backoff. The backoff ends if the total backoff delay reaches the deadline.
|
||||
#[clap(long, default_value = "3s")]
|
||||
pub backoff_deadline: String,
|
||||
|
||||
/// The client-side compression algorithm for kafka records.
|
||||
#[clap(long, default_value = "zstd")]
|
||||
pub compression: String,
|
||||
|
||||
/// The seed of random number generators.
|
||||
#[clap(long, default_value_t = 42)]
|
||||
pub rng_seed: u64,
|
||||
|
||||
/// Skips the read phase, aka. region replay, if set to true.
|
||||
#[clap(long, default_value_t = false)]
|
||||
pub skip_read: bool,
|
||||
|
||||
/// Skips the write phase if set to true.
|
||||
#[clap(long, default_value_t = false)]
|
||||
pub skip_write: bool,
|
||||
|
||||
/// Randomly generates topic names if set to true.
|
||||
/// Useful when you want to run the benchmarker without worrying about the topics created before.
|
||||
#[clap(long, default_value_t = false)]
|
||||
pub random_topics: bool,
|
||||
|
||||
/// Logs out the gathered prometheus metrics when the benchmarker ends.
|
||||
#[clap(long, default_value_t = false)]
|
||||
pub report_metrics: bool,
|
||||
}
|
||||
|
||||
/// Benchmarker config.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct Config {
|
||||
pub wal_provider: WalProvider,
|
||||
pub bootstrap_brokers: Vec<String>,
|
||||
pub num_workers: u32,
|
||||
pub num_topics: u32,
|
||||
pub num_regions: u32,
|
||||
pub num_scrapes: u32,
|
||||
pub num_rows: u32,
|
||||
pub col_types: String,
|
||||
pub max_batch_size: ReadableSize,
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub linger: Duration,
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub backoff_init: Duration,
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub backoff_max: Duration,
|
||||
pub backoff_base: u32,
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub backoff_deadline: Duration,
|
||||
pub compression: String,
|
||||
pub rng_seed: u64,
|
||||
pub skip_read: bool,
|
||||
pub skip_write: bool,
|
||||
pub random_topics: bool,
|
||||
pub report_metrics: bool,
|
||||
}
|
||||
|
||||
impl From<Args> for Config {
|
||||
fn from(args: Args) -> Self {
|
||||
let cfg = Self {
|
||||
wal_provider: args.wal_provider,
|
||||
bootstrap_brokers: args
|
||||
.bootstrap_brokers
|
||||
.split(',')
|
||||
.map(ToString::to_string)
|
||||
.collect::<Vec<_>>(),
|
||||
num_workers: args.num_workers.min(num_cpus::get() as u32),
|
||||
num_topics: args.num_topics,
|
||||
num_regions: args.num_regions,
|
||||
num_scrapes: args.num_scrapes,
|
||||
num_rows: args.num_rows,
|
||||
col_types: args.col_types,
|
||||
max_batch_size: args.max_batch_size,
|
||||
linger: humantime::parse_duration(&args.linger).unwrap(),
|
||||
backoff_init: humantime::parse_duration(&args.backoff_init).unwrap(),
|
||||
backoff_max: humantime::parse_duration(&args.backoff_max).unwrap(),
|
||||
backoff_base: args.backoff_base,
|
||||
backoff_deadline: humantime::parse_duration(&args.backoff_deadline).unwrap(),
|
||||
compression: args.compression,
|
||||
rng_seed: args.rng_seed,
|
||||
skip_read: args.skip_read,
|
||||
skip_write: args.skip_write,
|
||||
random_topics: args.random_topics,
|
||||
report_metrics: args.report_metrics,
|
||||
};
|
||||
|
||||
cfg
|
||||
}
|
||||
}
|
||||
|
||||
/// The region used for wal benchmarker.
|
||||
pub struct Region {
|
||||
id: RegionId,
|
||||
schema: Vec<ColumnSchema>,
|
||||
wal_options: WalOptions,
|
||||
next_sequence: AtomicU64,
|
||||
next_entry_id: AtomicU64,
|
||||
next_timestamp: AtomicI64,
|
||||
rng: Mutex<Option<SmallRng>>,
|
||||
num_rows: u32,
|
||||
}
|
||||
|
||||
impl Region {
|
||||
/// Creates a new region.
|
||||
pub fn new(
|
||||
id: RegionId,
|
||||
schema: Vec<ColumnSchema>,
|
||||
wal_options: WalOptions,
|
||||
num_rows: u32,
|
||||
rng_seed: u64,
|
||||
) -> Self {
|
||||
Self {
|
||||
id,
|
||||
schema,
|
||||
wal_options,
|
||||
next_sequence: AtomicU64::new(1),
|
||||
next_entry_id: AtomicU64::new(1),
|
||||
next_timestamp: AtomicI64::new(1655276557000),
|
||||
rng: Mutex::new(Some(SmallRng::seed_from_u64(rng_seed))),
|
||||
num_rows,
|
||||
}
|
||||
}
|
||||
|
||||
/// Scrapes the region and adds the generated entry to wal.
|
||||
pub fn add_wal_entry<S: LogStore>(&self, wal_writer: &mut WalWriter<S>) {
|
||||
let mutation = Mutation {
|
||||
op_type: OpType::Put as i32,
|
||||
sequence: self
|
||||
.next_sequence
|
||||
.fetch_add(self.num_rows as u64, Ordering::Relaxed),
|
||||
rows: Some(self.build_rows()),
|
||||
};
|
||||
let entry = WalEntry {
|
||||
mutations: vec![mutation],
|
||||
};
|
||||
metrics::METRIC_WAL_WRITE_BYTES_TOTAL.inc_by(Self::entry_estimated_size(&entry) as u64);
|
||||
|
||||
wal_writer
|
||||
.add_entry(
|
||||
self.id,
|
||||
self.next_entry_id.fetch_add(1, Ordering::Relaxed),
|
||||
&entry,
|
||||
&self.wal_options,
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
/// Replays the region.
|
||||
pub async fn replay<S: LogStore>(&self, wal: &Arc<Wal<S>>) {
|
||||
let mut wal_stream = wal.scan(self.id, 0, &self.wal_options).unwrap();
|
||||
while let Some(res) = wal_stream.next().await {
|
||||
let (_, entry) = res.unwrap();
|
||||
metrics::METRIC_WAL_READ_BYTES_TOTAL.inc_by(Self::entry_estimated_size(&entry) as u64);
|
||||
}
|
||||
}
|
||||
|
||||
/// Computes the estimated size in bytes of the entry.
|
||||
pub fn entry_estimated_size(entry: &WalEntry) -> usize {
|
||||
let wrapper_size = size_of::<WalEntry>()
|
||||
+ entry.mutations.capacity() * size_of::<Mutation>()
|
||||
+ size_of::<Rows>();
|
||||
|
||||
let rows = entry.mutations[0].rows.as_ref().unwrap();
|
||||
|
||||
let schema_size = rows.schema.capacity() * size_of::<ColumnSchema>()
|
||||
+ rows
|
||||
.schema
|
||||
.iter()
|
||||
.map(|s| s.column_name.capacity())
|
||||
.sum::<usize>();
|
||||
let values_size = (rows.rows.capacity() * size_of::<Row>())
|
||||
+ rows
|
||||
.rows
|
||||
.iter()
|
||||
.map(|r| r.values.capacity() * size_of::<Value>())
|
||||
.sum::<usize>();
|
||||
|
||||
wrapper_size + schema_size + values_size
|
||||
}
|
||||
|
||||
fn build_rows(&self) -> Rows {
|
||||
let cols = self
|
||||
.schema
|
||||
.iter()
|
||||
.map(|col_schema| {
|
||||
let col_data_type = ColumnDataType::try_from(col_schema.datatype).unwrap();
|
||||
self.build_col(&col_data_type, self.num_rows)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let rows = (0..self.num_rows)
|
||||
.map(|i| {
|
||||
let values = cols.iter().map(|col| col[i as usize].clone()).collect();
|
||||
Row { values }
|
||||
})
|
||||
.collect();
|
||||
|
||||
Rows {
|
||||
schema: self.schema.clone(),
|
||||
rows,
|
||||
}
|
||||
}
|
||||
|
||||
fn build_col(&self, col_data_type: &ColumnDataType, num_rows: u32) -> Vec<Value> {
|
||||
let mut rng_guard = self.rng.lock().unwrap();
|
||||
let rng = rng_guard.as_mut().unwrap();
|
||||
match col_data_type {
|
||||
ColumnDataType::TimestampMillisecond => (0..num_rows)
|
||||
.map(|_| {
|
||||
let ts = self.next_timestamp.fetch_add(1000, Ordering::Relaxed);
|
||||
Value {
|
||||
value_data: Some(ValueData::TimestampMillisecondValue(ts)),
|
||||
}
|
||||
})
|
||||
.collect(),
|
||||
ColumnDataType::Int64 => (0..num_rows)
|
||||
.map(|_| {
|
||||
let v = rng.sample(Uniform::new(0, 10_000));
|
||||
Value {
|
||||
value_data: Some(ValueData::I64Value(v)),
|
||||
}
|
||||
})
|
||||
.collect(),
|
||||
ColumnDataType::Float64 => (0..num_rows)
|
||||
.map(|_| {
|
||||
let v = rng.sample(Uniform::new(0.0, 5000.0));
|
||||
Value {
|
||||
value_data: Some(ValueData::F64Value(v)),
|
||||
}
|
||||
})
|
||||
.collect(),
|
||||
ColumnDataType::String => (0..num_rows)
|
||||
.map(|_| {
|
||||
let v = Alphanumeric.sample_string(rng, 10);
|
||||
Value {
|
||||
value_data: Some(ValueData::StringValue(v)),
|
||||
}
|
||||
})
|
||||
.collect(),
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -26,6 +26,8 @@ common-time.workspace = true
|
||||
common-wal.workspace = true
|
||||
futures.workspace = true
|
||||
futures-util.workspace = true
|
||||
lazy_static.workspace = true
|
||||
prometheus.workspace = true
|
||||
protobuf = { version = "2", features = ["bytes"] }
|
||||
raft-engine.workspace = true
|
||||
rskafka.workspace = true
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::mem::size_of;
|
||||
pub(crate) mod client_manager;
|
||||
pub mod log_store;
|
||||
pub(crate) mod util;
|
||||
@@ -69,6 +70,10 @@ impl Entry for EntryImpl {
|
||||
fn namespace(&self) -> Self::Namespace {
|
||||
self.ns.clone()
|
||||
}
|
||||
|
||||
fn estimated_size(&self) -> usize {
|
||||
size_of::<Self>() + self.data.capacity() * size_of::<u8>() + self.ns.topic.capacity()
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for EntryImpl {
|
||||
@@ -82,3 +87,27 @@ impl Display for EntryImpl {
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::mem::size_of;
|
||||
|
||||
use store_api::logstore::entry::Entry;
|
||||
|
||||
use crate::kafka::{EntryImpl, NamespaceImpl};
|
||||
|
||||
#[test]
|
||||
fn test_estimated_size() {
|
||||
let entry = EntryImpl {
|
||||
data: Vec::with_capacity(100),
|
||||
id: 0,
|
||||
ns: NamespaceImpl {
|
||||
region_id: 0,
|
||||
topic: String::with_capacity(10),
|
||||
},
|
||||
};
|
||||
let expected = size_of::<EntryImpl>() + 100 * size_of::<u8>() + 10;
|
||||
let got = entry.estimated_size();
|
||||
assert_eq!(expected, got);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,7 +22,7 @@ use futures_util::StreamExt;
|
||||
use rskafka::client::consumer::{StartOffset, StreamConsumerBuilder};
|
||||
use rskafka::client::partition::OffsetAt;
|
||||
use snafu::ResultExt;
|
||||
use store_api::logstore::entry::Id as EntryId;
|
||||
use store_api::logstore::entry::{Entry as EntryTrait, Id as EntryId};
|
||||
use store_api::logstore::entry_stream::SendableEntryStream;
|
||||
use store_api::logstore::namespace::Id as NamespaceId;
|
||||
use store_api::logstore::{AppendBatchResponse, AppendResponse, LogStore};
|
||||
@@ -32,6 +32,7 @@ use crate::kafka::client_manager::{ClientManager, ClientManagerRef};
|
||||
use crate::kafka::util::offset::Offset;
|
||||
use crate::kafka::util::record::{maybe_emit_entry, Record, RecordProducer};
|
||||
use crate::kafka::{EntryImpl, NamespaceImpl};
|
||||
use crate::metrics;
|
||||
|
||||
/// A log store backed by Kafka.
|
||||
#[derive(Debug)]
|
||||
@@ -86,6 +87,15 @@ impl LogStore for KafkaLogStore {
|
||||
/// Appends a batch of entries and returns a response containing a map where the key is a region id
|
||||
/// while the value is the id of the last successfully written entry of the region.
|
||||
async fn append_batch(&self, entries: Vec<Self::Entry>) -> Result<AppendBatchResponse> {
|
||||
metrics::METRIC_KAFKA_APPEND_BATCH_CALLS_TOTAL.inc();
|
||||
metrics::METRIC_KAFKA_APPEND_BATCH_BYTES_TOTAL.inc_by(
|
||||
entries
|
||||
.iter()
|
||||
.map(EntryTrait::estimated_size)
|
||||
.sum::<usize>() as u64,
|
||||
);
|
||||
let _timer = metrics::METRIC_KAFKA_APPEND_BATCH_ELAPSED.start_timer();
|
||||
|
||||
if entries.is_empty() {
|
||||
return Ok(AppendBatchResponse::default());
|
||||
}
|
||||
@@ -124,6 +134,9 @@ impl LogStore for KafkaLogStore {
|
||||
ns: &Self::Namespace,
|
||||
entry_id: EntryId,
|
||||
) -> Result<SendableEntryStream<Self::Entry, Self::Error>> {
|
||||
metrics::METRIC_KAFKA_READ_CALLS_TOTAL.inc();
|
||||
let _timer = metrics::METRIC_KAFKA_READ_ELAPSED.start_timer();
|
||||
|
||||
// Gets the client associated with the topic.
|
||||
let client = self
|
||||
.client_manager
|
||||
@@ -183,6 +196,9 @@ impl LogStore for KafkaLogStore {
|
||||
})?;
|
||||
let (kafka_record, offset) = (record_and_offset.record, record_and_offset.offset);
|
||||
|
||||
metrics::METRIC_KAFKA_READ_RECORD_BYTES_TOTAL
|
||||
.inc_by(kafka_record.approximate_size() as u64);
|
||||
|
||||
debug!(
|
||||
"Read a record at offset {} for ns {}, high watermark: {}",
|
||||
offset, ns_clone, high_watermark
|
||||
|
||||
@@ -25,6 +25,7 @@ use crate::error::{
|
||||
use crate::kafka::client_manager::ClientManagerRef;
|
||||
use crate::kafka::util::offset::Offset;
|
||||
use crate::kafka::{EntryId, EntryImpl, NamespaceImpl};
|
||||
use crate::metrics;
|
||||
|
||||
/// The current version of Record.
|
||||
pub(crate) const VERSION: u32 = 0;
|
||||
@@ -97,6 +98,7 @@ impl TryFrom<Record> for KafkaRecord {
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(niebayes): improve the performance of decoding kafka record.
|
||||
impl TryFrom<KafkaRecord> for Record {
|
||||
type Error = crate::error::Error;
|
||||
|
||||
@@ -150,6 +152,7 @@ impl RecordProducer {
|
||||
|
||||
/// Produces the buffered entries to Kafka sever. Those entries may span several Kafka records.
|
||||
/// Returns the offset of the last successfully produced record.
|
||||
// TODO(niebayes): maybe requires more fine-grained metrics to measure stages of writing to kafka.
|
||||
pub(crate) async fn produce(self, client_manager: &ClientManagerRef) -> Result<Offset> {
|
||||
ensure!(!self.entries.is_empty(), EmptyEntriesSnafu);
|
||||
|
||||
@@ -173,6 +176,11 @@ impl RecordProducer {
|
||||
for entry in self.entries {
|
||||
for record in build_records(entry, max_record_size) {
|
||||
let kafka_record = KafkaRecord::try_from(record)?;
|
||||
|
||||
metrics::METRIC_KAFKA_PRODUCE_RECORD_COUNTS.inc();
|
||||
metrics::METRIC_KAFKA_PRODUCE_RECORD_BYTES_TOTAL
|
||||
.inc_by(kafka_record.approximate_size() as u64);
|
||||
|
||||
// Records of a certain region cannot be produced in parallel since their order must be static.
|
||||
let offset = producer
|
||||
.produce(kafka_record.clone())
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
|
||||
pub mod error;
|
||||
pub mod kafka;
|
||||
pub mod metrics;
|
||||
mod noop;
|
||||
pub mod raft_engine;
|
||||
pub mod test_util;
|
||||
|
||||
107
src/log-store/src/metrics.rs
Normal file
107
src/log-store/src/metrics.rs
Normal file
@@ -0,0 +1,107 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use lazy_static::lazy_static;
|
||||
use prometheus::*;
|
||||
|
||||
/// Logstore label.
|
||||
pub const LOGSTORE_LABEL: &str = "logstore";
|
||||
/// Operation type label.
|
||||
pub const OPTYPE_LABEL: &str = "optype";
|
||||
|
||||
lazy_static! {
|
||||
/// Counters of bytes of each operation on a logstore.
|
||||
pub static ref METRIC_LOGSTORE_OP_BYTES_TOTAL: IntCounterVec = register_int_counter_vec!(
|
||||
"greptime_logstore_op_bytes_total",
|
||||
"logstore operation bytes total",
|
||||
&[LOGSTORE_LABEL, OPTYPE_LABEL],
|
||||
)
|
||||
.unwrap();
|
||||
/// Counter of bytes of the append_batch operation on the kafka logstore.
|
||||
pub static ref METRIC_KAFKA_APPEND_BATCH_BYTES_TOTAL: IntCounter = METRIC_LOGSTORE_OP_BYTES_TOTAL.with_label_values(
|
||||
&["kafka", "append_batch"],
|
||||
);
|
||||
/// Counter of bytes of the read operation on the kafka logstore.
|
||||
pub static ref METRIC_KAFKA_READ_BYTES_TOTAL: IntCounter = METRIC_LOGSTORE_OP_BYTES_TOTAL.with_label_values(
|
||||
&["kafka", "read"],
|
||||
);
|
||||
/// Counter of bytes of the append_batch operation on the raft-engine logstore.
|
||||
pub static ref METRIC_RAFT_ENGINE_APPEND_BATCH_BYTES_TOTAL: IntCounter = METRIC_LOGSTORE_OP_BYTES_TOTAL.with_label_values(
|
||||
&["raft-engine", "append_batch"],
|
||||
);
|
||||
/// Counter of bytes of the read operation on the raft-engine logstore.
|
||||
pub static ref METRIC_RAFT_ENGINE_READ_BYTES_TOTAL: IntCounter = METRIC_LOGSTORE_OP_BYTES_TOTAL.with_label_values(
|
||||
&["raft-engine", "read"],
|
||||
);
|
||||
|
||||
/// Counter of bytes of the records read by the kafka logstore.
|
||||
pub static ref METRIC_KAFKA_READ_RECORD_BYTES_TOTAL: IntCounter = register_int_counter!(
|
||||
"greptime_kafka_read_record_bytes_total",
|
||||
"kafka read record bytes total"
|
||||
).unwrap();
|
||||
|
||||
/// Counter of the numbers of the records produced by the kafka logstore.
|
||||
pub static ref METRIC_KAFKA_PRODUCE_RECORD_COUNTS: IntCounter = register_int_counter!(
|
||||
"greptime_kafka_produce_record_counts",
|
||||
"kafka produce record counts",
|
||||
).unwrap();
|
||||
|
||||
/// Counter of bytes of the records produced by the kafka logstore.
|
||||
pub static ref METRIC_KAFKA_PRODUCE_RECORD_BYTES_TOTAL: IntCounter = register_int_counter!(
|
||||
"greptime_kafka_produce_record_bytes_total",
|
||||
"kafka produce record bytes total"
|
||||
).unwrap();
|
||||
|
||||
/// Counters of calls of each operation on a logstore.
|
||||
pub static ref METRIC_LOGSTORE_OP_CALLS_TOTAL: IntCounterVec = register_int_counter_vec!(
|
||||
"greptime_logstore_op_calls_total",
|
||||
"logstore operation calls total",
|
||||
&[LOGSTORE_LABEL, OPTYPE_LABEL],
|
||||
)
|
||||
.unwrap();
|
||||
/// Counter of calls of the append_batch operation on the kafka logstore.
|
||||
pub static ref METRIC_KAFKA_APPEND_BATCH_CALLS_TOTAL: IntCounter = METRIC_LOGSTORE_OP_CALLS_TOTAL.with_label_values(
|
||||
&["kafka", "append_batch"],
|
||||
);
|
||||
/// Counter of calls of the read operation on the kafka logstore.
|
||||
pub static ref METRIC_KAFKA_READ_CALLS_TOTAL: IntCounter = METRIC_LOGSTORE_OP_CALLS_TOTAL.with_label_values(
|
||||
&["kafka", "read"],
|
||||
);
|
||||
/// Counter of calls of the append_batch operation on the raft-engine logstore.
|
||||
pub static ref METRIC_RAFT_ENGINE_APPEND_BATCH_CALLS_TOTAL: IntCounter = METRIC_LOGSTORE_OP_CALLS_TOTAL.with_label_values(
|
||||
&["raft-engine", "append_batch"],
|
||||
);
|
||||
/// Counter of calls of the read operation on the raft-engine logstore.
|
||||
pub static ref METRIC_RAFT_ENGINE_READ_CALLS_TOTAL: IntCounter = METRIC_LOGSTORE_OP_CALLS_TOTAL.with_label_values(
|
||||
&["raft-engine", "read"],
|
||||
);
|
||||
|
||||
/// Timer of operations on a logstore.
|
||||
pub static ref METRIC_LOGSTORE_OP_ELAPSED: HistogramVec = register_histogram_vec!(
|
||||
"greptime_logstore_op_elapsed",
|
||||
"logstore operation elapsed",
|
||||
&[LOGSTORE_LABEL, OPTYPE_LABEL],
|
||||
)
|
||||
.unwrap();
|
||||
/// Timer of the append_batch operation on the kafka logstore.
|
||||
pub static ref METRIC_KAFKA_APPEND_BATCH_ELAPSED: Histogram = METRIC_LOGSTORE_OP_ELAPSED.with_label_values(&["kafka", "append_batch"]);
|
||||
/// Timer of the append_batch operation on the kafka logstore.
|
||||
/// This timer only measures the duration of the read operation, not measures the total duration of replay.
|
||||
pub static ref METRIC_KAFKA_READ_ELAPSED: Histogram = METRIC_LOGSTORE_OP_ELAPSED.with_label_values(&["kafka", "read"]);
|
||||
/// Timer of the append_batch operation on the raft-engine logstore.
|
||||
pub static ref METRIC_RAFT_ENGINE_APPEND_BATCH_ELAPSED: Histogram = METRIC_LOGSTORE_OP_ELAPSED.with_label_values(&["raft-engine", "append_batch"]);
|
||||
/// Timer of the append_batch operation on the raft-engine logstore.
|
||||
/// This timer only measures the duration of the read operation, not measures the total duration of replay.
|
||||
pub static ref METRIC_RAFT_ENGINE_READ_ELAPSED: Histogram = METRIC_LOGSTORE_OP_ELAPSED.with_label_values(&["raft-engine", "read"]);
|
||||
}
|
||||
@@ -50,6 +50,10 @@ impl Entry for EntryImpl {
|
||||
fn namespace(&self) -> Self::Namespace {
|
||||
Default::default()
|
||||
}
|
||||
|
||||
fn estimated_size(&self) -> usize {
|
||||
0
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::hash::{Hash, Hasher};
|
||||
use std::mem::size_of;
|
||||
|
||||
use store_api::logstore::entry::{Entry, Id as EntryId};
|
||||
use store_api::logstore::namespace::{Id as NamespaceId, Namespace};
|
||||
@@ -83,4 +84,25 @@ impl Entry for EntryImpl {
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
fn estimated_size(&self) -> usize {
|
||||
size_of::<Self>() + self.data.capacity() * size_of::<u8>()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::mem::size_of;
|
||||
|
||||
use store_api::logstore::entry::Entry;
|
||||
|
||||
use crate::raft_engine::protos::logstore::EntryImpl;
|
||||
|
||||
#[test]
|
||||
fn test_estimated_size() {
|
||||
let entry = EntryImpl::create(1, 1, Vec::with_capacity(100));
|
||||
let expected = size_of::<EntryImpl>() + 100;
|
||||
let got = entry.estimated_size();
|
||||
assert_eq!(expected, got);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -25,7 +25,7 @@ use common_wal::config::raft_engine::RaftEngineConfig;
|
||||
use common_wal::options::WalOptions;
|
||||
use raft_engine::{Config, Engine, LogBatch, MessageExt, ReadableSize, RecoveryMode};
|
||||
use snafu::{ensure, ResultExt};
|
||||
use store_api::logstore::entry::Id as EntryId;
|
||||
use store_api::logstore::entry::{Entry as EntryTrait, Id as EntryId};
|
||||
use store_api::logstore::entry_stream::SendableEntryStream;
|
||||
use store_api::logstore::namespace::{Id as NamespaceId, Namespace as NamespaceTrait};
|
||||
use store_api::logstore::{AppendBatchResponse, AppendResponse, LogStore};
|
||||
@@ -35,6 +35,7 @@ use crate::error::{
|
||||
IllegalNamespaceSnafu, IllegalStateSnafu, OverrideCompactedEntrySnafu, RaftEngineSnafu, Result,
|
||||
StartGcTaskSnafu, StopGcTaskSnafu,
|
||||
};
|
||||
use crate::metrics;
|
||||
use crate::raft_engine::backend::SYSTEM_NAMESPACE;
|
||||
use crate::raft_engine::protos::logstore::{EntryImpl, NamespaceImpl as Namespace};
|
||||
|
||||
@@ -248,6 +249,15 @@ impl LogStore for RaftEngineLogStore {
|
||||
/// Appends a batch of entries to logstore. `RaftEngineLogStore` assures the atomicity of
|
||||
/// batch append.
|
||||
async fn append_batch(&self, entries: Vec<Self::Entry>) -> Result<AppendBatchResponse> {
|
||||
metrics::METRIC_RAFT_ENGINE_APPEND_BATCH_CALLS_TOTAL.inc();
|
||||
metrics::METRIC_RAFT_ENGINE_APPEND_BATCH_BYTES_TOTAL.inc_by(
|
||||
entries
|
||||
.iter()
|
||||
.map(EntryTrait::estimated_size)
|
||||
.sum::<usize>() as u64,
|
||||
);
|
||||
let _timer = metrics::METRIC_RAFT_ENGINE_APPEND_BATCH_ELAPSED.start_timer();
|
||||
|
||||
ensure!(self.started(), IllegalStateSnafu);
|
||||
if entries.is_empty() {
|
||||
return Ok(AppendBatchResponse::default());
|
||||
@@ -280,6 +290,9 @@ impl LogStore for RaftEngineLogStore {
|
||||
ns: &Self::Namespace,
|
||||
entry_id: EntryId,
|
||||
) -> Result<SendableEntryStream<'_, Self::Entry, Self::Error>> {
|
||||
metrics::METRIC_RAFT_ENGINE_READ_CALLS_TOTAL.inc();
|
||||
let _timer = metrics::METRIC_RAFT_ENGINE_READ_ELAPSED.start_timer();
|
||||
|
||||
ensure!(self.started(), IllegalStateSnafu);
|
||||
let engine = self.engine.clone();
|
||||
|
||||
|
||||
@@ -33,7 +33,7 @@ etcd-client.workspace = true
|
||||
futures.workspace = true
|
||||
h2 = "0.3"
|
||||
http-body = "0.4"
|
||||
humantime = "2.1"
|
||||
humantime.workspace = true
|
||||
humantime-serde.workspace = true
|
||||
itertools.workspace = true
|
||||
lazy_static.workspace = true
|
||||
|
||||
@@ -40,7 +40,7 @@ datatypes.workspace = true
|
||||
futures = "0.3"
|
||||
futures-util.workspace = true
|
||||
greptime-proto.workspace = true
|
||||
humantime = "2.1"
|
||||
humantime.workspace = true
|
||||
lazy_static.workspace = true
|
||||
meter-core.workspace = true
|
||||
meter-macros.workspace = true
|
||||
|
||||
@@ -35,4 +35,7 @@ pub trait Entry: Send + Sync {
|
||||
|
||||
/// Returns the namespace of the entry.
|
||||
fn namespace(&self) -> Self::Namespace;
|
||||
|
||||
/// Computes the estimated size in bytes of the entry.
|
||||
fn estimated_size(&self) -> usize;
|
||||
}
|
||||
|
||||
@@ -31,6 +31,7 @@ pub type SendableEntryStream<'a, I, E> = Pin<Box<dyn Stream<Item = Result<Vec<I>
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::any::Any;
|
||||
use std::mem::size_of;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use common_error::ext::StackError;
|
||||
@@ -87,6 +88,10 @@ mod tests {
|
||||
fn namespace(&self) -> Self::Namespace {
|
||||
Namespace {}
|
||||
}
|
||||
|
||||
fn estimated_size(&self) -> usize {
|
||||
self.data.capacity() * size_of::<u8>()
|
||||
}
|
||||
}
|
||||
|
||||
impl SimpleEntry {
|
||||
|
||||
@@ -29,7 +29,7 @@ datafusion-physical-expr.workspace = true
|
||||
datatypes.workspace = true
|
||||
derive_builder.workspace = true
|
||||
futures.workspace = true
|
||||
humantime = "2.1"
|
||||
humantime.workspace = true
|
||||
humantime-serde.workspace = true
|
||||
paste = "1.0"
|
||||
serde.workspace = true
|
||||
|
||||
@@ -40,6 +40,7 @@ sqlx = { version = "0.6", features = [
|
||||
] }
|
||||
|
||||
[dev-dependencies]
|
||||
dotenv.workspace = true
|
||||
tokio = { workspace = true }
|
||||
|
||||
[[bin]]
|
||||
|
||||
@@ -36,7 +36,7 @@ common-test-util.workspace = true
|
||||
common-wal.workspace = true
|
||||
datanode = { workspace = true }
|
||||
datatypes.workspace = true
|
||||
dotenv = "0.15"
|
||||
dotenv.workspace = true
|
||||
frontend = { workspace = true, features = ["testing"] }
|
||||
futures.workspace = true
|
||||
meta-client.workspace = true
|
||||
|
||||
Reference in New Issue
Block a user