feat: introduce PersistStatsHandler (#6777)

* feat: add `Inserter` trait and impl

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: import items

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat: introduce `PersistStatsHandler`

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: disable persisting stats in sqlness

Signed-off-by: WenyXu <wenymedia@gmail.com>

* reset channel manager

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: avoid to collect

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor: remove insert options

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor: use `write_bytes` instead of `write_bytes_per_sec`

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor: compute write bytes delta

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

* test: add unit tests

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

* Update src/meta-srv/src/handler/persist_stats_handler.rs

Co-authored-by: Yingwen <realevenyag@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
Co-authored-by: Yingwen <realevenyag@gmail.com>
This commit is contained in:
Weny Xu
2025-08-21 17:34:58 +08:00
committed by GitHub
parent 5eec3485fe
commit 896d72191e
41 changed files with 1024 additions and 411 deletions

3
Cargo.lock generated
View File

@@ -1837,6 +1837,7 @@ dependencies = [
"enum_dispatch",
"futures",
"futures-util",
"humantime",
"lazy_static",
"moka",
"parking_lot 0.12.4",
@@ -1846,6 +1847,7 @@ dependencies = [
"rand 0.9.1",
"serde_json",
"snafu 0.8.6",
"store-api",
"substrait 0.17.0",
"substrait 0.37.3",
"tokio",
@@ -8622,6 +8624,7 @@ dependencies = [
"file-engine",
"futures",
"futures-util",
"humantime",
"jsonb",
"lazy_static",
"meta-client",

View File

@@ -387,6 +387,9 @@
| `wal.create_topic_timeout` | String | `30s` | The timeout for creating a Kafka topic.<br/>**It's only used when the provider is `kafka`**. |
| `event_recorder` | -- | -- | Configuration options for the event recorder. |
| `event_recorder.ttl` | String | `90d` | TTL for the events table that will be used to store the events. Default is `90d`. |
| `stats_persistence` | -- | -- | Configuration options for the stats persistence. |
| `stats_persistence.ttl` | String | `30d` | TTL for the stats table that will be used to store the stats. Default is `30d`.<br/>Set to `0s` to disable stats persistence. |
| `stats_persistence.interval` | String | `60s` | The interval to persist the stats. Default is `60s`.<br/>The minimum value is `60s`, if the value is less than `60s`, it will be overridden to `60s`. |
| `logging` | -- | -- | The logging options. |
| `logging.dir` | String | `./greptimedb_data/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.level` | String | Unset | The log level. Can be `info`/`debug`/`warn`/`error`. |

View File

@@ -256,6 +256,15 @@ create_topic_timeout = "30s"
## TTL for the events table that will be used to store the events. Default is `90d`.
ttl = "90d"
## Configuration options for the stats persistence.
[stats_persistence]
## TTL for the stats table that will be used to store the stats. Default is `30d`.
## Set to `0s` to disable stats persistence.
ttl = "30d"
## The interval to persist the stats. Default is `60s`.
## The minimum value is `60s`, if the value is less than `60s`, it will be overridden to `60s`.
interval = "60s"
## The logging options.
[logging]
## The directory to store the log files. If set to empty, logs will not be written to files.

View File

@@ -29,6 +29,7 @@ datatypes.workspace = true
enum_dispatch = "0.3"
futures.workspace = true
futures-util.workspace = true
humantime.workspace = true
lazy_static.workspace = true
moka = { workspace = true, features = ["future"] }
parking_lot.workspace = true
@@ -38,6 +39,7 @@ query.workspace = true
rand.workspace = true
serde_json.workspace = true
snafu.workspace = true
store-api.workspace = true
substrait.workspace = true
tokio.workspace = true
tokio-stream = { workspace = true, features = ["net"] }

View File

@@ -133,6 +133,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("External error"))]
External {
#[snafu(implicit)]
location: Location,
source: BoxedError,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -154,6 +161,7 @@ impl ErrorExt for Error {
Error::IllegalGrpcClientState { .. } => StatusCode::Unexpected,
Error::InvalidTonicMetadataValue { .. } => StatusCode::InvalidArguments,
Error::ConvertSchema { source, .. } => source.status_code(),
Error::External { source, .. } => source.status_code(),
}
}

View File

@@ -0,0 +1,60 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Duration;
use api::v1::RowInsertRequests;
use humantime::format_duration;
use store_api::mito_engine_options::{APPEND_MODE_KEY, TTL_KEY};
use crate::error::Result;
/// Context holds the catalog and schema information.
pub struct Context<'a> {
/// The catalog name.
pub catalog: &'a str,
/// The schema name.
pub schema: &'a str,
}
/// Options for insert operations.
#[derive(Debug, Clone, Copy)]
pub struct InsertOptions {
/// Time-to-live for the inserted data.
pub ttl: Duration,
/// Whether to use append mode for the insert.
pub append_mode: bool,
}
impl InsertOptions {
/// Converts the insert options to a list of key-value string hints.
pub fn to_hints(&self) -> Vec<(&'static str, String)> {
vec![
(TTL_KEY, format_duration(self.ttl).to_string()),
(APPEND_MODE_KEY, self.append_mode.to_string()),
]
}
}
/// [`Inserter`] allows different components to share a unified mechanism for inserting data.
///
/// An implementation may perform the insert locally (e.g., via a direct procedure call) or
/// delegate/forward it to another node for processing (e.g., MetaSrv forwarding to an
/// available Frontend).
#[async_trait::async_trait]
pub trait Inserter: Send + Sync {
async fn insert_rows(&self, context: &Context<'_>, requests: RowInsertRequests) -> Result<()>;
fn set_options(&mut self, options: &InsertOptions);
}

View File

@@ -19,6 +19,7 @@ pub mod client_manager;
pub mod database;
pub mod error;
pub mod flow;
pub mod inserter;
pub mod load_balance;
mod metrics;
pub mod region;

View File

@@ -834,6 +834,7 @@ impl InformationExtension for StandaloneInformationExtension {
region_manifest: region_stat.manifest.into(),
data_topic_latest_entry_id: region_stat.data_topic_latest_entry_id,
metadata_topic_latest_entry_id: region_stat.metadata_topic_latest_entry_id,
write_bytes: 0,
}
})
.collect::<Vec<_>>();

View File

@@ -22,12 +22,6 @@ use snafu::{Location, Snafu};
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("No available frontend"))]
NoAvailableFrontend {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Mismatched schema, expected: {:?}, actual: {:?}", expected, actual))]
MismatchedSchema {
#[snafu(implicit)]
@@ -69,9 +63,7 @@ impl ErrorExt for Error {
Error::MismatchedSchema { .. } | Error::SerializeEvent { .. } => {
StatusCode::InvalidArguments
}
Error::NoAvailableFrontend { .. }
| Error::InsertEvents { .. }
| Error::KvBackend { .. } => StatusCode::Internal,
Error::InsertEvents { .. } | Error::KvBackend { .. } => StatusCode::Internal,
}
}

View File

@@ -236,11 +236,6 @@ pub trait EventHandler: Send + Sync + 'static {
/// Processes and handles incoming events. The [DefaultEventHandlerImpl] implementation forwards events to frontend instances for persistence.
/// We use `&[Box<dyn Event>]` to avoid consuming the events, so the caller can buffer the events and retry if the handler fails.
async fn handle(&self, events: &[Box<dyn Event>]) -> Result<()>;
/// Returns the handler options for the event type. We can use different options for different event types.
fn options(&self, _event_type: &str) -> EventHandlerOptions {
EventHandlerOptions::default()
}
}
/// Configuration options for the event recorder.

View File

@@ -102,6 +102,8 @@ pub struct RegionStat {
pub index_size: u64,
/// The manifest infoof the region.
pub region_manifest: RegionManifestInfo,
/// The write bytes.
pub write_bytes: u64,
/// The latest entry id of topic used by data.
/// **Only used by remote WAL prune.**
pub data_topic_latest_entry_id: u64,
@@ -304,6 +306,7 @@ impl From<&api::v1::meta::RegionStat> for RegionStat {
sst_num: region_stat.sst_num,
index_size: region_stat.index_size,
region_manifest: region_stat.manifest.into(),
write_bytes: region_stat.write_bytes,
data_topic_latest_entry_id: region_stat.data_topic_latest_entry_id,
metadata_topic_latest_entry_id: region_stat.metadata_topic_latest_entry_id,
}

View File

@@ -15,68 +15,76 @@
use std::time::Duration;
use async_trait::async_trait;
use client::inserter::{Context, InsertOptions, Inserter};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME};
use common_error::ext::BoxedError;
use common_event_recorder::error::{InsertEventsSnafu, Result};
use common_event_recorder::{
build_row_inserts_request, group_events_by_type, Event, EventHandler, EventHandlerOptions,
};
use common_event_recorder::{build_row_inserts_request, group_events_by_type, Event, EventHandler};
use common_frontend::slow_query_event::SLOW_QUERY_EVENT_TYPE;
use humantime::format_duration;
use operator::insert::InserterRef;
use operator::statement::StatementExecutorRef;
use session::context::QueryContextBuilder;
use datafusion::common::HashMap;
use operator::statement::{InserterImpl, StatementExecutorRef};
use snafu::ResultExt;
use store_api::mito_engine_options::{APPEND_MODE_KEY, TTL_KEY};
/// EventHandlerImpl is the default event handler implementation in frontend.
pub struct EventHandlerImpl {
inserter: InserterRef,
statement_executor: StatementExecutorRef,
slow_query_ttl: Duration,
global_ttl: Duration,
default_inserter: Box<dyn Inserter>,
/// The inserters for the event types.
inserters: HashMap<String, Box<dyn Inserter>>,
}
impl EventHandlerImpl {
/// Create a new EventHandlerImpl.
pub fn new(
inserter: InserterRef,
statement_executor: StatementExecutorRef,
slow_query_ttl: Duration,
global_ttl: Duration,
) -> Self {
Self {
inserter,
statement_executor,
slow_query_ttl,
global_ttl,
inserters: HashMap::from([(
SLOW_QUERY_EVENT_TYPE.to_string(),
Box::new(InserterImpl::new(
statement_executor.clone(),
Some(InsertOptions {
ttl: slow_query_ttl,
append_mode: true,
}),
)) as _,
)]),
default_inserter: Box::new(InserterImpl::new(
statement_executor.clone(),
Some(InsertOptions {
ttl: global_ttl,
append_mode: true,
}),
)),
}
}
fn inserter(&self, event_type: &str) -> &dyn Inserter {
let Some(inserter) = self.inserters.get(event_type) else {
return self.default_inserter.as_ref();
};
inserter.as_ref()
}
}
const DEFAULT_CONTEXT: Context = Context {
catalog: DEFAULT_CATALOG_NAME,
schema: DEFAULT_PRIVATE_SCHEMA_NAME,
};
#[async_trait]
impl EventHandler for EventHandlerImpl {
async fn handle(&self, events: &[Box<dyn Event>]) -> Result<()> {
let event_groups = group_events_by_type(events);
for (event_type, events) in event_groups {
let opts = self.options(event_type);
let query_ctx = QueryContextBuilder::default()
.current_catalog(DEFAULT_CATALOG_NAME.to_string())
.current_schema(DEFAULT_PRIVATE_SCHEMA_NAME.to_string())
.set_extension(TTL_KEY.to_string(), format_duration(opts.ttl).to_string())
.set_extension(APPEND_MODE_KEY.to_string(), opts.append_mode.to_string())
.build()
.into();
let requests = build_row_inserts_request(&events)?;
let inserter = self.inserter(event_type);
self.inserter
.handle_row_inserts(
build_row_inserts_request(&events)?,
query_ctx,
&self.statement_executor,
false,
false,
)
inserter
.insert_rows(&DEFAULT_CONTEXT, requests)
.await
.map_err(BoxedError::new)
.context(InsertEventsSnafu)?;
@@ -84,17 +92,4 @@ impl EventHandler for EventHandlerImpl {
Ok(())
}
fn options(&self, event_type: &str) -> EventHandlerOptions {
match event_type {
SLOW_QUERY_EVENT_TYPE => EventHandlerOptions {
ttl: self.slow_query_ttl,
append_mode: true,
},
_ => EventHandlerOptions {
ttl: self.global_ttl,
append_mode: true,
},
}
}
}

View File

@@ -197,7 +197,6 @@ impl FrontendBuilder {
plugins.insert::<StatementExecutorRef>(statement_executor.clone());
let event_recorder = Arc::new(EventRecorderImpl::new(Box::new(EventHandlerImpl::new(
inserter.clone(),
statement_executor.clone(),
self.options.slow_query.ttl,
self.options.event_recorder.ttl,

View File

@@ -54,6 +54,19 @@ pub enum Error {
peer_id: u64,
},
#[snafu(display("Failed to lookup frontends"))]
LookupFrontends {
#[snafu(implicit)]
location: Location,
source: common_meta::error::Error,
},
#[snafu(display("No available frontend"))]
NoAvailableFrontend {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Another migration procedure is running for region: {}", region_id))]
MigrationRunning {
#[snafu(implicit)]
@@ -1062,6 +1075,8 @@ impl ErrorExt for Error {
| Error::UnexpectedLogicalRouteTable { source, .. }
| Error::UpdateTopicNameValue { source, .. }
| Error::ParseWalOptions { source, .. } => source.status_code(),
Error::LookupFrontends { source, .. } => source.status_code(),
Error::NoAvailableFrontend { .. } => StatusCode::IllegalState,
Error::InitMetadata { source, .. }
| Error::InitDdlManager { source, .. }

View File

@@ -12,43 +12,25 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use client::{Client, Database};
use client::inserter::{Context, Inserter};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME};
use common_error::ext::BoxedError;
use common_event_recorder::error::{
InsertEventsSnafu, KvBackendSnafu, NoAvailableFrontendSnafu, Result,
};
use common_event_recorder::{
build_row_inserts_request, group_events_by_type, Event, EventHandler, EventHandlerOptions,
};
use common_grpc::channel_manager::ChannelManager;
use common_meta::peer::PeerLookupServiceRef;
use common_telemetry::debug;
use snafu::{ensure, ResultExt};
use crate::cluster::MetaPeerClientRef;
use crate::lease::MetaPeerLookupService;
use common_event_recorder::error::{InsertEventsSnafu, Result};
use common_event_recorder::{build_row_inserts_request, group_events_by_type, Event, EventHandler};
use snafu::ResultExt;
pub mod region_migration_event;
/// EventHandlerImpl is the default event handler implementation in metasrv. It sends the received events to the frontend instances.
/// EventHandlerImpl is the default event handler implementation in metasrv.
/// It sends the received events to the frontend instances.
pub struct EventHandlerImpl {
peer_lookup_service: PeerLookupServiceRef,
channel_manager: ChannelManager,
ttl: Duration,
inserter: Box<dyn Inserter>,
}
impl EventHandlerImpl {
pub fn new(meta_peer_client: MetaPeerClientRef, ttl: Duration) -> Self {
Self {
peer_lookup_service: Arc::new(MetaPeerLookupService::new(meta_peer_client)),
channel_manager: ChannelManager::new(),
ttl,
}
pub fn new(inserter: Box<dyn Inserter>) -> Self {
Self { inserter }
}
}
@@ -57,18 +39,15 @@ impl EventHandler for EventHandlerImpl {
async fn handle(&self, events: &[Box<dyn Event>]) -> Result<()> {
let event_groups = group_events_by_type(events);
for (event_type, events) in event_groups {
let opts = self.options(event_type);
let hints = opts.to_hints();
self.build_database_client()
.await?
.row_inserts_with_hints(
build_row_inserts_request(&events)?,
&hints
.iter()
.map(|(k, v)| (*k, v.as_str()))
.collect::<Vec<_>>(),
for (_, events) in event_groups {
let requests = build_row_inserts_request(&events)?;
self.inserter
.insert_rows(
&Context {
catalog: DEFAULT_CATALOG_NAME,
schema: DEFAULT_PRIVATE_SCHEMA_NAME,
},
requests,
)
.await
.map_err(BoxedError::new)
@@ -77,37 +56,4 @@ impl EventHandler for EventHandlerImpl {
Ok(())
}
fn options(&self, _event_type: &str) -> EventHandlerOptions {
EventHandlerOptions {
ttl: self.ttl,
append_mode: true,
}
}
}
impl EventHandlerImpl {
async fn build_database_client(&self) -> Result<Database> {
let frontends = self
.peer_lookup_service
.active_frontends()
.await
.map_err(BoxedError::new)
.context(KvBackendSnafu)?;
ensure!(!frontends.is_empty(), NoAvailableFrontendSnafu);
let urls = frontends
.into_iter()
.map(|peer| peer.addr)
.collect::<Vec<_>>();
debug!("Available frontend addresses: {:?}", urls);
Ok(Database::new(
DEFAULT_CATALOG_NAME,
DEFAULT_PRIVATE_SCHEMA_NAME,
Client::with_manager_and_urls(self.channel_manager.clone(), urls),
))
}
}

View File

@@ -55,6 +55,7 @@ use tokio::sync::{oneshot, watch, Notify, RwLock};
use crate::error::{self, DeserializeFromJsonSnafu, Result, UnexpectedInstructionReplySnafu};
use crate::handler::collect_topic_stats_handler::CollectTopicStatsHandler;
use crate::handler::flow_state_handler::FlowStateHandler;
use crate::handler::persist_stats_handler::PersistStatsHandler;
use crate::metasrv::Context;
use crate::metrics::{METRIC_META_HANDLER_EXECUTE, METRIC_META_HEARTBEAT_CONNECTION_NUM};
use crate::pubsub::PublisherRef;
@@ -74,10 +75,12 @@ pub mod flow_state_handler;
pub mod keep_lease_handler;
pub mod mailbox_handler;
pub mod on_leader_start_handler;
pub mod persist_stats_handler;
pub mod publish_heartbeat_handler;
pub mod region_lease_handler;
pub mod remap_flow_peer_handler;
pub mod response_header_handler;
#[cfg(test)]
pub mod test_utils;
@@ -537,6 +540,9 @@ pub struct HeartbeatHandlerGroupBuilder {
/// A simple handler for flow internal state report
flow_state_handler: Option<FlowStateHandler>,
/// The handler to persist stats.
persist_stats_handler: Option<PersistStatsHandler>,
/// The plugins.
plugins: Option<Plugins>,
@@ -554,6 +560,7 @@ impl HeartbeatHandlerGroupBuilder {
region_lease_handler: None,
flush_stats_factor: None,
flow_state_handler: None,
persist_stats_handler: None,
plugins: None,
pushers,
handlers: vec![],
@@ -582,6 +589,11 @@ impl HeartbeatHandlerGroupBuilder {
self
}
pub fn with_persist_stats_handler(mut self, handler: Option<PersistStatsHandler>) -> Self {
self.persist_stats_handler = handler;
self
}
/// Sets the [`Plugins`].
pub fn with_plugins(mut self, plugins: Option<Plugins>) -> Self {
self.plugins = plugins;
@@ -624,6 +636,11 @@ impl HeartbeatHandlerGroupBuilder {
}
self.add_handler_last(CollectLeaderRegionHandler);
self.add_handler_last(CollectTopicStatsHandler);
// Persist stats handler should be in front of collect stats handler.
// Because collect stats handler will consume the stats from the accumulator.
if let Some(persist_stats_handler) = self.persist_stats_handler.take() {
self.add_handler_last(persist_stats_handler);
}
self.add_handler_last(CollectStatsHandler::new(self.flush_stats_factor));
self.add_handler_last(RemapFlowPeerHandler::default());

View File

@@ -87,6 +87,7 @@ mod tests {
index_size: 0,
data_topic_latest_entry_id: 0,
metadata_topic_latest_entry_id: 0,
write_bytes: 0,
}
}

View File

@@ -105,6 +105,7 @@ mod tests {
},
data_topic_latest_entry_id: 0,
metadata_topic_latest_entry_id: 0,
write_bytes: 0,
}
}
acc.stat = Some(Stat {

View File

@@ -0,0 +1,555 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::{Duration, Instant};
use api::v1::meta::{HeartbeatRequest, Role};
use api::v1::value::ValueData;
use api::v1::{ColumnSchema, Row, RowInsertRequest, RowInsertRequests, Rows, Value};
use client::inserter::{Context as InserterContext, Inserter};
use client::DEFAULT_CATALOG_NAME;
use common_catalog::consts::DEFAULT_PRIVATE_SCHEMA_NAME;
use common_macro::{Schema, ToRow};
use common_meta::datanode::RegionStat;
use common_meta::DatanodeId;
use common_telemetry::warn;
use dashmap::DashMap;
use store_api::region_engine::RegionRole;
use store_api::storage::RegionId;
use crate::error::Result;
use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::Context;
/// The handler to persist stats.
pub struct PersistStatsHandler {
inserter: Box<dyn Inserter>,
last_persisted_region_stats: DashMap<RegionId, PersistedRegionStat>,
last_persisted_time: DashMap<DatanodeId, Instant>,
persist_interval: Duration,
}
/// The name of the table to persist region stats.
const META_REGION_STATS_TABLE_NAME: &str = "region_statistics";
/// The default context to persist region stats.
const DEFAULT_CONTEXT: InserterContext = InserterContext {
catalog: DEFAULT_CATALOG_NAME,
schema: DEFAULT_PRIVATE_SCHEMA_NAME,
};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct PersistedRegionStat {
region_id: RegionId,
write_bytess: u64,
}
impl From<&RegionStat> for PersistedRegionStat {
fn from(stat: &RegionStat) -> Self {
Self {
region_id: stat.id,
write_bytess: stat.write_bytes,
}
}
}
#[derive(ToRow, Schema)]
struct PersistRegionStat<'a> {
table_id: u32,
region_id: u64,
region_number: u32,
manifest_size: u64,
datanode_id: u64,
#[col(datatype = "string")]
engine: &'a str,
num_rows: u64,
sst_num: u64,
sst_size: u64,
write_bytes_delta: u64,
#[col(
name = "greptime_timestamp",
semantic = "Timestamp",
datatype = "TimestampMillisecond"
)]
timestamp_millis: i64,
}
/// Compute the region stat to persist.
fn compute_persist_region_stat(
region_stat: &RegionStat,
datanode_id: DatanodeId,
timestamp_millis: i64,
persisted_region_stat: Option<PersistedRegionStat>,
) -> PersistRegionStat {
let write_bytes_delta = persisted_region_stat
.and_then(|persisted_region_stat| {
region_stat
.write_bytes
.checked_sub(persisted_region_stat.write_bytess)
})
.unwrap_or_default();
PersistRegionStat {
table_id: region_stat.id.table_id(),
region_id: region_stat.id.as_u64(),
region_number: region_stat.id.region_number(),
manifest_size: region_stat.manifest_size,
datanode_id,
engine: region_stat.engine.as_str(),
num_rows: region_stat.num_rows,
sst_num: region_stat.sst_num,
sst_size: region_stat.sst_size,
write_bytes_delta,
timestamp_millis,
}
}
fn to_persisted_if_leader(
region_stat: &RegionStat,
last_persisted_region_stats: &DashMap<RegionId, PersistedRegionStat>,
datanode_id: DatanodeId,
timestamp_millis: i64,
) -> Option<(Row, PersistedRegionStat)> {
if matches!(region_stat.role, RegionRole::Leader) {
let persisted_region_stat = last_persisted_region_stats.get(&region_stat.id).map(|s| *s);
Some((
compute_persist_region_stat(
region_stat,
datanode_id,
timestamp_millis,
persisted_region_stat,
)
.to_row(),
PersistedRegionStat::from(region_stat),
))
} else {
None
}
}
/// Align the timestamp to the nearest interval.
///
/// # Panics
/// Panics if `interval` as milliseconds is zero.
fn align_ts(ts: i64, interval: Duration) -> i64 {
assert!(
interval.as_millis() != 0,
"interval must be greater than zero"
);
ts / interval.as_millis() as i64 * interval.as_millis() as i64
}
impl PersistStatsHandler {
/// Creates a new [`PersistStatsHandler`].
pub fn new(inserter: Box<dyn Inserter>, mut persist_interval: Duration) -> Self {
if persist_interval < Duration::from_secs(60) {
warn!("persist_interval is less than 60 seconds, set to 60 seconds");
persist_interval = Duration::from_secs(60);
}
if persist_interval.as_millis() == 0 {
warn!("persist_interval as milliseconds is zero, set to 60 second");
persist_interval = Duration::from_secs(60);
}
Self {
inserter,
last_persisted_region_stats: DashMap::new(),
last_persisted_time: DashMap::new(),
persist_interval,
}
}
fn should_persist(&self, datanode_id: DatanodeId) -> bool {
let Some(last_persisted_time) = self.last_persisted_time.get(&datanode_id) else {
return true;
};
last_persisted_time.elapsed() >= self.persist_interval
}
async fn persist(
&self,
timestamp_millis: i64,
datanode_id: DatanodeId,
region_stats: &[RegionStat],
) {
// Safety: persist_interval is greater than zero.
let aligned_ts = align_ts(timestamp_millis, self.persist_interval);
let (rows, incoming_region_stats): (Vec<_>, Vec<_>) = region_stats
.iter()
.flat_map(|region_stat| {
to_persisted_if_leader(
region_stat,
&self.last_persisted_region_stats,
datanode_id,
aligned_ts,
)
})
.unzip();
if rows.is_empty() {
return;
}
if let Err(err) = self
.inserter
.insert_rows(
&DEFAULT_CONTEXT,
RowInsertRequests {
inserts: vec![RowInsertRequest {
table_name: META_REGION_STATS_TABLE_NAME.to_string(),
rows: Some(Rows {
schema: PersistRegionStat::schema(),
rows,
}),
}],
},
)
.await
{
warn!(
"Failed to persist region stats, datanode_id: {}, error: {:?}",
datanode_id, err
);
return;
}
self.last_persisted_time.insert(datanode_id, Instant::now());
for s in incoming_region_stats {
self.last_persisted_region_stats.insert(s.region_id, s);
}
}
}
#[async_trait::async_trait]
impl HeartbeatHandler for PersistStatsHandler {
fn is_acceptable(&self, role: Role) -> bool {
role == Role::Datanode
}
async fn handle(
&self,
_req: &HeartbeatRequest,
_: &mut Context,
acc: &mut HeartbeatAccumulator,
) -> Result<HandleControl> {
let Some(current_stat) = acc.stat.as_ref() else {
return Ok(HandleControl::Continue);
};
if !self.should_persist(current_stat.id) {
return Ok(HandleControl::Continue);
}
self.persist(
current_stat.timestamp_millis,
current_stat.id,
&current_stat.region_stats,
)
.await;
Ok(HandleControl::Continue)
}
}
#[cfg(test)]
mod tests {
use std::sync::{Arc, Mutex};
use client::inserter::{Context as InserterContext, InsertOptions};
use common_meta::datanode::{RegionManifestInfo, RegionStat, Stat};
use store_api::region_engine::RegionRole;
use store_api::storage::RegionId;
use super::*;
use crate::handler::test_utils::TestEnv;
fn create_test_region_stat(
table_id: u32,
region_number: u32,
write_bytes: u64,
engine: &str,
) -> RegionStat {
let region_id = RegionId::new(table_id, region_number);
RegionStat {
id: region_id,
rcus: 100,
wcus: 200,
approximate_bytes: 1024,
engine: engine.to_string(),
role: RegionRole::Leader,
num_rows: 1000,
memtable_size: 512,
manifest_size: 256,
sst_size: 2048,
sst_num: 5,
index_size: 128,
region_manifest: RegionManifestInfo::Mito {
manifest_version: 1,
flushed_entry_id: 100,
},
write_bytes,
data_topic_latest_entry_id: 200,
metadata_topic_latest_entry_id: 200,
}
}
#[test]
fn test_compute_persist_region_stat_with_no_persisted_stat() {
let region_stat = create_test_region_stat(1, 1, 1000, "mito");
let datanode_id = 123;
let timestamp_millis = 1640995200000; // 2022-01-01 00:00:00 UTC
let result = compute_persist_region_stat(&region_stat, datanode_id, timestamp_millis, None);
assert_eq!(result.table_id, 1);
assert_eq!(result.region_id, region_stat.id.as_u64());
assert_eq!(result.region_number, 1);
assert_eq!(result.manifest_size, 256);
assert_eq!(result.datanode_id, datanode_id);
assert_eq!(result.engine, "mito");
assert_eq!(result.num_rows, 1000);
assert_eq!(result.sst_num, 5);
assert_eq!(result.sst_size, 2048);
assert_eq!(result.write_bytes_delta, 0); // No previous stat, so delta is 0
assert_eq!(result.timestamp_millis, timestamp_millis);
}
#[test]
fn test_compute_persist_region_stat_with_persisted_stat_increase() {
let region_stat = create_test_region_stat(2, 3, 1500, "mito");
let datanode_id = 456;
let timestamp_millis = 1640995260000; // 2022-01-01 00:01:00 UTC
let persisted_stat = PersistedRegionStat {
region_id: region_stat.id,
write_bytess: 1000, // Previous write bytes
};
let result = compute_persist_region_stat(
&region_stat,
datanode_id,
timestamp_millis,
Some(persisted_stat),
);
assert_eq!(result.table_id, 2);
assert_eq!(result.region_id, region_stat.id.as_u64());
assert_eq!(result.region_number, 3);
assert_eq!(result.manifest_size, 256);
assert_eq!(result.datanode_id, datanode_id);
assert_eq!(result.engine, "mito");
assert_eq!(result.num_rows, 1000);
assert_eq!(result.sst_num, 5);
assert_eq!(result.sst_size, 2048);
assert_eq!(result.write_bytes_delta, 500); // 1500 - 1000 = 500
assert_eq!(result.timestamp_millis, timestamp_millis);
}
#[test]
fn test_compute_persist_region_stat_with_persisted_stat_decrease() {
let region_stat = create_test_region_stat(3, 5, 800, "mito");
let datanode_id = 789;
let timestamp_millis = 1640995320000; // 2022-01-01 00:02:00 UTC
let persisted_stat = PersistedRegionStat {
region_id: region_stat.id,
write_bytess: 1200, // Previous write bytes (higher than current)
};
let result = compute_persist_region_stat(
&region_stat,
datanode_id,
timestamp_millis,
Some(persisted_stat),
);
assert_eq!(result.table_id, 3);
assert_eq!(result.region_id, region_stat.id.as_u64());
assert_eq!(result.region_number, 5);
assert_eq!(result.manifest_size, 256);
assert_eq!(result.datanode_id, datanode_id);
assert_eq!(result.engine, "mito");
assert_eq!(result.num_rows, 1000);
assert_eq!(result.sst_num, 5);
assert_eq!(result.sst_size, 2048);
assert_eq!(result.write_bytes_delta, 0); // 800 - 1200 would be negative, so 0 due to checked_sub
assert_eq!(result.timestamp_millis, timestamp_millis);
}
#[test]
fn test_compute_persist_region_stat_with_persisted_stat_equal() {
let region_stat = create_test_region_stat(4, 7, 2000, "mito");
let datanode_id = 101;
let timestamp_millis = 1640995380000; // 2022-01-01 00:03:00 UTC
let persisted_stat = PersistedRegionStat {
region_id: region_stat.id,
write_bytess: 2000, // Same as current write bytes
};
let result = compute_persist_region_stat(
&region_stat,
datanode_id,
timestamp_millis,
Some(persisted_stat),
);
assert_eq!(result.table_id, 4);
assert_eq!(result.region_id, region_stat.id.as_u64());
assert_eq!(result.region_number, 7);
assert_eq!(result.manifest_size, 256);
assert_eq!(result.datanode_id, datanode_id);
assert_eq!(result.engine, "mito");
assert_eq!(result.num_rows, 1000);
assert_eq!(result.sst_num, 5);
assert_eq!(result.sst_size, 2048);
assert_eq!(result.write_bytes_delta, 0); // 2000 - 2000 = 0
assert_eq!(result.timestamp_millis, timestamp_millis);
}
#[test]
fn test_compute_persist_region_stat_with_overflow_protection() {
let region_stat = create_test_region_stat(8, 15, 500, "mito");
let datanode_id = 505;
let timestamp_millis = 1640995620000; // 2022-01-01 00:07:00 UTC
let persisted_stat = PersistedRegionStat {
region_id: region_stat.id,
write_bytess: 1000, // Higher than current, would cause underflow
};
let result = compute_persist_region_stat(
&region_stat,
datanode_id,
timestamp_millis,
Some(persisted_stat),
);
assert_eq!(result.table_id, 8);
assert_eq!(result.region_id, region_stat.id.as_u64());
assert_eq!(result.region_number, 15);
assert_eq!(result.manifest_size, 256);
assert_eq!(result.datanode_id, datanode_id);
assert_eq!(result.engine, "mito");
assert_eq!(result.num_rows, 1000);
assert_eq!(result.sst_num, 5);
assert_eq!(result.sst_size, 2048);
assert_eq!(result.write_bytes_delta, 0); // checked_sub returns None, so default to 0
assert_eq!(result.timestamp_millis, timestamp_millis);
}
struct MockInserter {
requests: Arc<Mutex<Vec<api::v1::RowInsertRequest>>>,
}
#[async_trait::async_trait]
impl Inserter for MockInserter {
async fn insert_rows(
&self,
_context: &InserterContext<'_>,
requests: api::v1::RowInsertRequests,
) -> client::error::Result<()> {
self.requests.lock().unwrap().extend(requests.inserts);
Ok(())
}
fn set_options(&mut self, _options: &InsertOptions) {}
}
#[tokio::test]
async fn test_not_persist_region_stats() {
let env = TestEnv::new();
let mut ctx = env.ctx();
let requests = Arc::new(Mutex::new(vec![]));
let inserter = MockInserter {
requests: requests.clone(),
};
let handler = PersistStatsHandler::new(Box::new(inserter), Duration::from_secs(10));
let mut acc = HeartbeatAccumulator {
stat: Some(Stat {
id: 1,
timestamp_millis: 1640995200000,
region_stats: vec![create_test_region_stat(1, 1, 1000, "mito")],
..Default::default()
}),
..Default::default()
};
handler.last_persisted_time.insert(1, Instant::now());
// Do not persist
handler
.handle(&HeartbeatRequest::default(), &mut ctx, &mut acc)
.await
.unwrap();
assert!(requests.lock().unwrap().is_empty());
}
#[tokio::test]
async fn test_persist_region_stats() {
let env = TestEnv::new();
let mut ctx = env.ctx();
let requests = Arc::new(Mutex::new(vec![]));
let inserter = MockInserter {
requests: requests.clone(),
};
let handler = PersistStatsHandler::new(Box::new(inserter), Duration::from_secs(10));
let region_stat = create_test_region_stat(1, 1, 1000, "mito");
let timestamp_millis = 1640995200000;
let datanode_id = 1;
let region_id = RegionId::new(1, 1);
let mut acc = HeartbeatAccumulator {
stat: Some(Stat {
id: datanode_id,
timestamp_millis,
region_stats: vec![region_stat.clone()],
..Default::default()
}),
..Default::default()
};
handler.last_persisted_region_stats.insert(
region_id,
PersistedRegionStat {
region_id,
write_bytess: 500,
},
);
let (expected_row, expected_persisted_region_stat) = to_persisted_if_leader(
&region_stat,
&handler.last_persisted_region_stats,
datanode_id,
timestamp_millis,
)
.unwrap();
let before_insert_time = Instant::now();
// Persist
handler
.handle(&HeartbeatRequest::default(), &mut ctx, &mut acc)
.await
.unwrap();
let request = {
let mut requests = requests.lock().unwrap();
assert_eq!(requests.len(), 1);
requests.pop().unwrap()
};
assert_eq!(request.table_name, META_REGION_STATS_TABLE_NAME.to_string());
assert_eq!(request.rows.unwrap().rows, vec![expected_row]);
// Check last persisted time
assert!(handler
.last_persisted_time
.get(&datanode_id)
.unwrap()
.gt(&before_insert_time));
// Check last persisted region stats
assert_eq!(
handler
.last_persisted_region_stats
.get(&region_id)
.unwrap()
.value(),
&expected_persisted_region_stat
);
}
}

View File

@@ -167,6 +167,7 @@ mod test {
},
data_topic_latest_entry_id: 0,
metadata_topic_latest_entry_id: 0,
write_bytes: 0,
}
}

View File

@@ -16,6 +16,7 @@
#![feature(assert_matches)]
#![feature(hash_set_entry)]
#![feature(let_chains)]
#![feature(duration_constructors)]
pub mod bootstrap;
pub mod cache_invalidator;

View File

@@ -100,6 +100,26 @@ pub enum BackendImpl {
MysqlStore,
}
/// Configuration options for the stats persistence.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct StatsPersistenceOptions {
/// TTL for the stats table that will be used to store the stats.
#[serde(with = "humantime_serde")]
pub ttl: Duration,
/// The interval to persist the stats.
#[serde(with = "humantime_serde")]
pub interval: Duration,
}
impl Default for StatsPersistenceOptions {
fn default() -> Self {
Self {
ttl: Duration::from_days(30),
interval: Duration::from_secs(60),
}
}
}
#[derive(Clone, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct MetasrvOptions {
@@ -184,6 +204,8 @@ pub struct MetasrvOptions {
pub node_max_idle_time: Duration,
/// The event recorder options.
pub event_recorder: EventRecorderOptions,
/// The stats persistence options.
pub stats_persistence: StatsPersistenceOptions,
}
impl fmt::Debug for MetasrvOptions {
@@ -213,7 +235,9 @@ impl fmt::Debug for MetasrvOptions {
.field("max_txn_ops", &self.max_txn_ops)
.field("flush_stats_factor", &self.flush_stats_factor)
.field("tracing", &self.tracing)
.field("backend", &self.backend);
.field("backend", &self.backend)
.field("event_recorder", &self.event_recorder)
.field("stats_persistence", &self.stats_persistence);
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
debug_struct.field("meta_table_name", &self.meta_table_name);
@@ -275,6 +299,7 @@ impl Default for MetasrvOptions {
meta_election_lock_id: common_meta::kv_backend::DEFAULT_META_ELECTION_LOCK_ID,
node_max_idle_time: Duration::from_secs(24 * 60 * 60),
event_recorder: EventRecorderOptions::default(),
stats_persistence: StatsPersistenceOptions::default(),
}
}
}

View File

@@ -17,6 +17,7 @@ use std::sync::atomic::AtomicBool;
use std::sync::{Arc, Mutex, RwLock};
use client::client_manager::NodeClients;
use client::inserter::InsertOptions;
use common_base::Plugins;
use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
use common_event_recorder::{EventRecorderImpl, EventRecorderRef};
@@ -55,6 +56,7 @@ use crate::flow_meta_alloc::FlowPeerAllocator;
use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
use crate::handler::failure_handler::RegionFailureHandler;
use crate::handler::flow_state_handler::FlowStateHandler;
use crate::handler::persist_stats_handler::PersistStatsHandler;
use crate::handler::region_lease_handler::{CustomizedRegionLeaseRenewerRef, RegionLeaseHandler};
use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatMailbox, Pushers};
use crate::lease::MetaPeerLookupService;
@@ -77,6 +79,7 @@ use crate::service::mailbox::MailboxRef;
use crate::service::store::cached_kv::LeaderCachedKvBackend;
use crate::state::State;
use crate::table_meta_alloc::MetasrvPeerAllocator;
use crate::utils::insert_forwarder::InsertForwarder;
// TODO(fys): try use derive_builder macro
pub struct MetasrvBuilder {
@@ -195,11 +198,18 @@ impl MetasrvBuilder {
let meta_peer_client = meta_peer_client
.unwrap_or_else(|| build_default_meta_peer_client(&election, &in_memory));
let peer_lookup_service = Arc::new(MetaPeerLookupService::new(meta_peer_client.clone()));
let event_inserter = Box::new(InsertForwarder::new(
peer_lookup_service.clone(),
Some(InsertOptions {
ttl: options.event_recorder.ttl,
append_mode: true,
}),
));
// Builds the event recorder to record important events and persist them as the system table.
let event_recorder = Arc::new(EventRecorderImpl::new(Box::new(EventHandlerImpl::new(
meta_peer_client.clone(),
options.event_recorder.ttl,
event_inserter,
))));
let selector = selector.unwrap_or_else(|| Arc::new(LeaseBasedSelector::default()));
@@ -294,7 +304,6 @@ impl MetasrvBuilder {
server_addr: options.grpc.server_addr.clone(),
},
));
let peer_lookup_service = Arc::new(MetaPeerLookupService::new(meta_peer_client.clone()));
if !is_remote_wal && options.enable_region_failover {
ensure!(
@@ -451,6 +460,23 @@ impl MetasrvBuilder {
.as_ref()
.and_then(|plugins| plugins.get::<CustomizedRegionLeaseRenewerRef>());
let persist_region_stats_handler = if !options.stats_persistence.ttl.is_zero() {
let inserter = Box::new(InsertForwarder::new(
peer_lookup_service.clone(),
Some(InsertOptions {
ttl: options.stats_persistence.ttl,
append_mode: true,
}),
));
Some(PersistStatsHandler::new(
inserter,
options.stats_persistence.interval,
))
} else {
None
};
let handler_group_builder = match handler_group_builder {
Some(handler_group_builder) => handler_group_builder,
None => {
@@ -467,6 +493,7 @@ impl MetasrvBuilder {
.with_region_lease_handler(Some(region_lease_handler))
.with_flush_stats_factor(Some(options.flush_stats_factor))
.with_flow_state_handler(Some(flow_state_handler))
.with_persist_stats_handler(persist_region_stats_handler)
.add_default_handlers()
}
};

View File

@@ -198,6 +198,7 @@ mod tests {
},
data_topic_latest_entry_id: 0,
metadata_topic_latest_entry_id: 0,
write_bytes: 0,
}],
..Default::default()
}
@@ -226,6 +227,7 @@ mod tests {
},
data_topic_latest_entry_id: 0,
metadata_topic_latest_entry_id: 0,
write_bytes: 0,
}],
..Default::default()
}
@@ -254,6 +256,7 @@ mod tests {
},
data_topic_latest_entry_id: 0,
metadata_topic_latest_entry_id: 0,
write_bytes: 0,
}],
..Default::default()
}

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod insert_forwarder;
#[macro_export]
macro_rules! define_ticker {
(

View File

@@ -0,0 +1,123 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use api::v1::RowInsertRequests;
use client::error::{ExternalSnafu, Result as ClientResult};
use client::inserter::{Context, InsertOptions, Inserter};
use client::{Client, Database};
use common_error::ext::BoxedError;
use common_meta::peer::PeerLookupServiceRef;
use common_telemetry::{debug, warn};
use snafu::{ensure, ResultExt};
use tokio::sync::RwLock;
use crate::error::{LookupFrontendsSnafu, NoAvailableFrontendSnafu};
pub type InsertForwarderRef = Arc<InsertForwarder>;
/// [`InsertForwarder`] is the inserter for the metasrv.
/// It forwards insert requests to available frontend instances.
pub struct InsertForwarder {
peer_lookup_service: PeerLookupServiceRef,
client: RwLock<Option<Client>>,
options: Option<InsertOptions>,
}
impl InsertForwarder {
/// Creates a new InsertForwarder with the given peer lookup service.
pub fn new(peer_lookup_service: PeerLookupServiceRef, options: Option<InsertOptions>) -> Self {
Self {
peer_lookup_service,
client: RwLock::new(None),
options,
}
}
/// Builds a new client.
async fn build_client(&self) -> crate::error::Result<Client> {
let frontends = self
.peer_lookup_service
.active_frontends()
.await
.context(LookupFrontendsSnafu)?;
ensure!(!frontends.is_empty(), NoAvailableFrontendSnafu);
let urls = frontends
.into_iter()
.map(|peer| peer.addr)
.collect::<Vec<_>>();
debug!("Available frontend addresses: {:?}", urls);
Ok(Client::with_urls(urls))
}
/// Initializes the client if it hasn't been initialized yet, or returns the cached client.
async fn maybe_init_client(&self) -> Result<Client, BoxedError> {
let mut guard = self.client.write().await;
if guard.is_none() {
let client = self.build_client().await.map_err(BoxedError::new)?;
*guard = Some(client);
}
// Safety: checked above that the client is Some.
Ok(guard.as_ref().unwrap().clone())
}
/// Resets the cached client, forcing a rebuild on the next use.
async fn reset_client(&self) {
warn!("Resetting the client");
let mut guard = self.client.write().await;
guard.take();
}
}
#[async_trait::async_trait]
impl Inserter for InsertForwarder {
async fn insert_rows(
&self,
context: &Context<'_>,
requests: RowInsertRequests,
) -> ClientResult<()> {
let client = self.maybe_init_client().await.context(ExternalSnafu)?;
let database = Database::new(context.catalog, context.schema, client);
let hints = self.options.as_ref().map_or(vec![], |o| o.to_hints());
if let Err(e) = database
.row_inserts_with_hints(
requests,
&hints
.iter()
.map(|(k, v)| (*k, v.as_str()))
.collect::<Vec<_>>(),
)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)
{
// Resets the client so it will be rebuilt next time.
self.reset_client().await;
return Err(e);
};
Ok(())
}
fn set_options(&mut self, options: &InsertOptions) {
self.options = Some(*options);
}
}

View File

@@ -62,6 +62,7 @@ pub fn get_region_statistic(mito: &MitoEngine, region_id: RegionId) -> Option<Re
metadata_flushed_entry_id: metadata_stat.manifest.data_flushed_entry_id(),
metadata_manifest_version: metadata_stat.manifest.data_manifest_version(),
},
write_bytes: metadata_stat.write_bytes + data_stat.write_bytes,
data_topic_latest_entry_id: data_stat.data_topic_latest_entry_id,
metadata_topic_latest_entry_id: metadata_stat.metadata_topic_latest_entry_id,
}),

View File

@@ -15,7 +15,7 @@
//! Basic tests for mito engine.
use std::collections::HashMap;
use std::time::Duration;
use std::sync::atomic::Ordering;
use api::v1::helper::row;
use api::v1::value::ValueData;
@@ -91,11 +91,7 @@ async fn test_write_to_region() {
};
put_rows(&engine, region_id, rows).await;
let region = engine.get_region(region_id).unwrap();
// Update the write bytes rate.
region
.write_bytes_per_sec
.update_rate(Duration::from_secs(1));
assert!(region.write_bytes_per_sec.get_rate() > 0);
assert!(region.write_bytes.load(Ordering::Relaxed) > 0);
}
#[apply(multiple_log_store_factories)]
@@ -168,7 +164,7 @@ async fn test_region_replay(factory: Option<LogStoreFactory>) {
// The replay won't update the write bytes rate meter.
let region = engine.get_region(region_id).unwrap();
assert_eq!(region.write_bytes_per_sec.get_total(), 0);
assert_eq!(region.write_bytes.load(Ordering::Relaxed), 0);
let request = ScanRequest::default();
let stream = engine.scan_to_stream(region_id, request).await.unwrap();

View File

@@ -1041,14 +1041,6 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to start repeated task: {}", name))]
StartRepeatedTask {
name: String,
#[snafu(implicit)]
location: Location,
source: common_runtime::error::Error,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -1211,8 +1203,6 @@ impl ErrorExt for Error {
InconsistentTimestampLength { .. } => StatusCode::InvalidArguments,
TooManyFilesToRead { .. } => StatusCode::RateLimited,
StartRepeatedTask { .. } => StatusCode::Internal,
}
}

View File

@@ -38,7 +38,6 @@ pub mod extension;
pub mod flush;
pub mod manifest;
pub mod memtable;
pub mod meter;
mod metrics;
pub mod read;
pub mod region;

View File

@@ -1,15 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod rate_meter;

View File

@@ -1,163 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
/// `RateMeter` tracks a cumulative value and computes the rate per interval.
#[derive(Default, Debug, Clone)]
pub struct RateMeter {
inner: Arc<RateMeterInner>,
}
#[derive(Default, Debug)]
struct RateMeterInner {
/// Accumulated value since last rate calculation.
value: AtomicU64,
/// The last computed rate (per second).
last_rate: AtomicU64,
/// Optional: total accumulated value, never reset.
total: AtomicU64,
}
impl RateMeter {
/// Creates a new `RateMeter` with an initial value.
pub fn new() -> Self {
Self {
inner: Arc::new(RateMeterInner::default()),
}
}
/// Increments the accumulated value by `v`.
pub fn inc_by(&self, v: u64) {
self.inner.value.fetch_add(v, Ordering::Relaxed);
self.inner.total.fetch_add(v, Ordering::Relaxed);
}
/// Returns the current accumulated value since last rate calculation.
pub fn get_value(&self) -> u64 {
self.inner.value.load(Ordering::Relaxed)
}
/// Returns the total accumulated value since creation.
pub fn get_total(&self) -> u64 {
self.inner.total.load(Ordering::Relaxed)
}
/// Returns the last computed rate (per second).
pub fn get_rate(&self) -> u64 {
self.inner.last_rate.load(Ordering::Relaxed)
}
/// Updates the current rate based on the accumulated value over the given interval.
///
/// `interval` should represent the duration since the last `update_rate` call.
/// This method resets the internal accumulated counter (`value`) to 0.
pub fn update_rate(&self, interval: Duration) {
let current_value = self.inner.value.swap(0, Ordering::Relaxed);
let interval_secs = interval.as_secs();
if interval_secs > 0 {
let rate = current_value / interval_secs;
self.inner.last_rate.store(rate, Ordering::Relaxed);
}
}
/// Resets the meter: clears both current value and last rate.
/// Total accumulated value remains unchanged.
pub fn reset(&self) {
self.inner.value.store(0, Ordering::Relaxed);
self.inner.last_rate.store(0, Ordering::Relaxed);
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use super::*;
#[test]
fn test_inc_and_get_value_and_total() {
let meter = RateMeter::new();
assert_eq!(meter.get_value(), 0);
assert_eq!(meter.get_total(), 0);
meter.inc_by(10);
assert_eq!(meter.get_value(), 10);
assert_eq!(meter.get_total(), 10);
meter.inc_by(5);
assert_eq!(meter.get_value(), 15);
assert_eq!(meter.get_total(), 15);
}
#[test]
fn test_update_rate_and_get_rate() {
let meter = RateMeter::new();
meter.inc_by(100);
meter.update_rate(Duration::from_secs(2));
assert_eq!(meter.get_rate(), 50);
// After update, value should be reset
assert_eq!(meter.get_value(), 0);
// If interval is zero, rate should not be updated
meter.inc_by(30);
meter.update_rate(Duration::from_secs(0));
// Should still be 50
assert_eq!(meter.get_rate(), 50);
}
#[test]
fn test_reset() {
let meter = RateMeter::new();
meter.inc_by(100);
meter.update_rate(Duration::from_secs(1));
assert_eq!(meter.get_rate(), 100);
meter.reset();
assert_eq!(meter.get_value(), 0);
assert_eq!(meter.get_rate(), 0);
// Total should remain unchanged
assert_eq!(meter.get_total(), 100);
}
#[test]
fn test_total_accumulates() {
let meter = RateMeter::new();
meter.inc_by(10);
meter.update_rate(Duration::from_secs(1));
meter.inc_by(20);
meter.update_rate(Duration::from_secs(2));
assert_eq!(meter.get_total(), 30);
assert_eq!(meter.get_rate(), 10);
}
#[test]
fn test_clone_and_shared_state() {
let meter = RateMeter::new();
let meter2 = meter.clone();
meter.inc_by(10);
meter2.inc_by(5);
assert_eq!(meter.get_value(), 15);
assert_eq!(meter2.get_value(), 15);
assert_eq!(meter.get_total(), 15);
assert_eq!(meter2.get_total(), 15);
meter.update_rate(Duration::from_secs(1));
assert_eq!(meter2.get_rate(), 15);
}
}

View File

@@ -43,7 +43,6 @@ use crate::error::{
use crate::manifest::action::{RegionManifest, RegionMetaAction, RegionMetaActionList};
use crate::manifest::manager::RegionManifestManager;
use crate::memtable::MemtableBuilderRef;
use crate::meter::rate_meter::RateMeter;
use crate::region::version::{VersionControlRef, VersionRef};
use crate::request::{OnFailure, OptionOutputTx};
use crate::sst::file_purger::FilePurgerRef;
@@ -132,9 +131,8 @@ pub struct MitoRegion {
/// There are no WAL entries in range [flushed_entry_id, topic_latest_entry_id] for current region,
/// which means these WAL entries maybe able to be pruned up to `topic_latest_entry_id`.
pub(crate) topic_latest_entry_id: AtomicU64,
/// `write_bytes_per_sec` tracks the memtable write throughput in bytes per second.
/// [RateUpdater](crate::worker::RateUpdater) will update the rate periodically.
pub(crate) write_bytes_per_sec: RateMeter,
/// The total bytes written to the region.
pub(crate) write_bytes: Arc<AtomicU64>,
/// Memtable builder for the region.
pub(crate) memtable_builder: MemtableBuilderRef,
/// manifest stats
@@ -453,6 +451,7 @@ impl MitoRegion {
let manifest_version = self.stats.manifest_version();
let topic_latest_entry_id = self.topic_latest_entry_id.load(Ordering::Relaxed);
let write_bytes = self.write_bytes.load(Ordering::Relaxed);
RegionStatistic {
num_rows,
@@ -468,6 +467,7 @@ impl MitoRegion {
},
data_topic_latest_entry_id: topic_latest_entry_id,
metadata_topic_latest_entry_id: topic_latest_entry_id,
write_bytes,
}
}
@@ -771,14 +771,6 @@ impl RegionMap {
regions.get(&region_id).cloned()
}
/// Iterates over all regions.
pub(crate) fn for_each_region(&self, f: impl Fn(&MitoRegionRef)) {
let regions = self.regions.read().unwrap();
for (_, region) in regions.iter() {
f(region);
}
}
/// Gets writable region by region id.
///
/// Returns error if the region does not exist or is readonly.
@@ -996,6 +988,7 @@ impl ManifestStats {
#[cfg(test)]
mod tests {
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use common_datasource::compression::CompressionType;
@@ -1010,7 +1003,6 @@ mod tests {
use crate::access_layer::AccessLayer;
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
use crate::meter::rate_meter::RateMeter;
use crate::region::{
ManifestContext, ManifestStats, MitoRegion, RegionLeaderState, RegionRoleState,
};
@@ -1180,7 +1172,7 @@ mod tests {
last_compaction_millis: Default::default(),
time_provider: Arc::new(StdTimeProvider),
topic_latest_entry_id: Default::default(),
write_bytes_per_sec: RateMeter::default(),
write_bytes: Arc::new(AtomicU64::new(0)),
memtable_builder: Arc::new(EmptyMemtableBuilder::default()),
stats: ManifestStats::default(),
};

View File

@@ -52,7 +52,6 @@ use crate::manifest::storage::manifest_compress_type;
use crate::memtable::bulk::part::BulkPart;
use crate::memtable::time_partition::TimePartitions;
use crate::memtable::MemtableBuilderProvider;
use crate::meter::rate_meter::RateMeter;
use crate::region::options::RegionOptions;
use crate::region::version::{VersionBuilder, VersionControl, VersionControlRef};
use crate::region::{
@@ -288,7 +287,7 @@ impl RegionOpener {
time_provider: self.time_provider.clone(),
topic_latest_entry_id: AtomicU64::new(0),
memtable_builder,
write_bytes_per_sec: RateMeter::default(),
write_bytes: Arc::new(AtomicU64::new(0)),
stats: self.stats,
})
}
@@ -472,7 +471,7 @@ impl RegionOpener {
last_compaction_millis: AtomicI64::new(now),
time_provider: self.time_provider.clone(),
topic_latest_entry_id: AtomicU64::new(0),
write_bytes_per_sec: RateMeter::default(),
write_bytes: Arc::new(AtomicU64::new(0)),
memtable_builder,
stats: self.stats.clone(),
};

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use std::mem;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use api::v1::{BulkWalEntry, Mutation, OpType, Rows, WalEntry, WriteHint};
@@ -25,7 +26,6 @@ use store_api::storage::{RegionId, SequenceNumber};
use crate::error::{Error, Result, WriteGroupSnafu};
use crate::memtable::bulk::part::BulkPart;
use crate::memtable::KeyValues;
use crate::meter::rate_meter::RateMeter;
use crate::metrics;
use crate::region::version::{VersionControlData, VersionControlRef, VersionRef};
use crate::request::OptionOutputTx;
@@ -108,7 +108,7 @@ pub(crate) struct RegionWriteCtx {
/// Rows to delete.
pub(crate) delete_num: usize,
/// Write bytes per second.
pub(crate) write_bytes_per_sec: Option<RateMeter>,
pub(crate) write_bytes: Option<Arc<AtomicU64>>,
}
impl RegionWriteCtx {
@@ -117,7 +117,7 @@ impl RegionWriteCtx {
region_id: RegionId,
version_control: &VersionControlRef,
provider: Provider,
write_bytes_per_sec: Option<RateMeter>,
write_bytes: Option<Arc<AtomicU64>>,
) -> RegionWriteCtx {
let VersionControlData {
version,
@@ -140,7 +140,7 @@ impl RegionWriteCtx {
put_num: 0,
delete_num: 0,
bulk_parts: vec![],
write_bytes_per_sec,
write_bytes,
}
}
@@ -219,7 +219,7 @@ impl RegionWriteCtx {
}
let mutable_memtable = self.version.memtables.mutable.clone();
let prev_memory_usage = if self.write_bytes_per_sec.is_some() {
let prev_memory_usage = if self.write_bytes.is_some() {
Some(mutable_memtable.memory_usage())
} else {
None
@@ -257,11 +257,11 @@ impl RegionWriteCtx {
}
}
if let Some(meter) = &self.write_bytes_per_sec {
if let Some(write_bytes) = &self.write_bytes {
let new_memory_usage = mutable_memtable.memory_usage();
let written_bytes =
new_memory_usage.saturating_sub(prev_memory_usage.unwrap_or_default());
meter.inc_by(written_bytes as u64);
write_bytes.fetch_add(written_bytes as u64, Ordering::Relaxed);
}
// Updates region sequence and entry id. Since we stores last sequence and entry id in region, we need
// to decrease `next_sequence` and `next_entry_id` by 1.
@@ -298,7 +298,7 @@ impl RegionWriteCtx {
.start_timer();
let mutable_memtable = &self.version.memtables.mutable;
let prev_memory_usage = if self.write_bytes_per_sec.is_some() {
let prev_memory_usage = if self.write_bytes.is_some() {
Some(mutable_memtable.memory_usage())
} else {
None
@@ -333,11 +333,11 @@ impl RegionWriteCtx {
}
}
if let Some(meter) = &self.write_bytes_per_sec {
if let Some(write_bytes) = &self.write_bytes {
let new_memory_usage = mutable_memtable.memory_usage();
let written_bytes =
new_memory_usage.saturating_sub(prev_memory_usage.unwrap_or_default());
meter.inc_by(written_bytes as u64);
write_bytes.fetch_add(written_bytes as u64, Ordering::Relaxed);
}
self.version_control
.set_sequence_and_entry_id(self.next_sequence - 1, self.next_entry_id - 1);

View File

@@ -36,7 +36,7 @@ use std::time::Duration;
use common_base::Plugins;
use common_error::ext::BoxedError;
use common_meta::key::SchemaMetadataManagerRef;
use common_runtime::{JoinHandle, RepeatedTask, TaskFunction};
use common_runtime::JoinHandle;
use common_telemetry::{error, info, warn};
use futures::future::try_join_all;
use object_store::manager::ObjectStoreManagerRef;
@@ -55,9 +55,7 @@ use crate::cache::write_cache::{WriteCache, WriteCacheRef};
use crate::cache::{CacheManager, CacheManagerRef};
use crate::compaction::CompactionScheduler;
use crate::config::MitoConfig;
use crate::error::{
self, CreateDirSnafu, Error, JoinSnafu, Result, StartRepeatedTaskSnafu, WorkerStoppedSnafu,
};
use crate::error::{self, CreateDirSnafu, JoinSnafu, Result, WorkerStoppedSnafu};
use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef};
use crate::memtable::MemtableBuilderProvider;
use crate::metrics::{REGION_COUNT, REQUEST_WAIT_TIME, WRITE_STALLING};
@@ -83,8 +81,6 @@ pub(crate) const DROPPING_MARKER_FILE: &str = ".dropping";
pub(crate) const CHECK_REGION_INTERVAL: Duration = Duration::from_secs(60);
/// Max delay to check region periodical tasks.
pub(crate) const MAX_INITIAL_CHECK_DELAY_SECS: u64 = 60 * 3;
/// Interval to update the rate meter for regions.
const RATE_UPDATE_INTERVAL: Duration = Duration::from_secs(30);
#[cfg_attr(doc, aquamarine::aquamarine)]
/// A fixed size group of [RegionWorkers](RegionWorker).
@@ -483,26 +479,12 @@ impl<S: LogStore> WorkerStarter<S> {
worker_thread.run().await;
});
let rate_update_task = RepeatedTask::new(
RATE_UPDATE_INTERVAL,
Box::new(RateUpdater {
regions: regions.clone(),
interval: RATE_UPDATE_INTERVAL,
}),
);
rate_update_task
.start(common_runtime::global_runtime())
.context(StartRepeatedTaskSnafu {
name: RateUpdater::NAME,
})?;
Ok(RegionWorker {
id: self.id,
regions,
opening_regions,
sender,
handle: Mutex::new(Some(handle)),
rate_update_task,
running,
})
}
@@ -520,8 +502,6 @@ pub(crate) struct RegionWorker {
sender: Sender<WorkerRequestWithTime>,
/// Handle to the worker thread.
handle: Mutex<Option<JoinHandle<()>>>,
/// rate update task.
rate_update_task: RepeatedTask<Error>,
/// Whether to run the worker thread.
running: Arc<AtomicBool>,
}
@@ -564,9 +544,6 @@ impl RegionWorker {
handle.await.context(JoinSnafu)?;
}
if let Err(err) = self.rate_update_task.stop().await {
error!(err; "Failed to stop rate update task");
}
Ok(())
}
@@ -686,29 +663,6 @@ impl StalledRequests {
}
}
struct RateUpdater {
regions: RegionMapRef,
interval: Duration,
}
impl RateUpdater {
const NAME: &str = "RateUpdater";
}
#[async_trait::async_trait]
impl TaskFunction<Error> for RateUpdater {
fn name(&self) -> &str {
Self::NAME
}
async fn call(&mut self) -> Result<()> {
self.regions.for_each_region(|region| {
region.write_bytes_per_sec.update_rate(self.interval);
});
Ok(())
}
}
/// Background worker loop to handle requests.
struct RegionWorkerLoop<S> {
/// Id of the worker.

View File

@@ -248,7 +248,7 @@ impl<S> RegionWorkerLoop<S> {
region.region_id,
&region.version_control,
region.provider.clone(),
Some(region.write_bytes_per_sec.clone()),
Some(region.write_bytes.clone()),
);
e.insert(region_ctx);
@@ -352,7 +352,7 @@ impl<S> RegionWorkerLoop<S> {
region.region_id,
&region.version_control,
region.provider.clone(),
Some(region.write_bytes_per_sec.clone()),
Some(region.write_bytes.clone()),
);
e.insert(region_ctx);

View File

@@ -45,6 +45,7 @@ datatypes.workspace = true
file-engine.workspace = true
futures.workspace = true
futures-util.workspace = true
humantime.workspace = true
jsonb.workspace = true
lazy_static.workspace = true
meta-client.workspace = true

View File

@@ -29,9 +29,12 @@ mod tql;
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::RowInsertRequests;
use catalog::kvbackend::KvBackendCatalogManager;
use catalog::process_manager::ProcessManagerRef;
use catalog::CatalogManagerRef;
use client::error::{ExternalSnafu as ClientExternalSnafu, Result as ClientResult};
use client::inserter::{InsertOptions, Inserter};
use client::RecordBatches;
use common_error::ext::BoxedError;
use common_meta::cache::TableRouteCacheRef;
@@ -48,10 +51,11 @@ use common_time::range::TimestampRange;
use common_time::Timestamp;
use datafusion_expr::LogicalPlan;
use datatypes::prelude::ConcreteDataType;
use humantime::format_duration;
use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
use query::parser::QueryStatement;
use query::QueryEngineRef;
use session::context::{Channel, QueryContextRef};
use session::context::{Channel, QueryContextBuilder, QueryContextRef};
use session::table_name::table_idents_to_full_name;
use set::{set_query_timeout, set_read_preference};
use snafu::{ensure, OptionExt, ResultExt};
@@ -65,6 +69,7 @@ use sql::statements::statement::Statement;
use sql::statements::OptionMap;
use sql::util::format_raw_object_name;
use sqlparser::ast::ObjectName;
use store_api::mito_engine_options::{APPEND_MODE_KEY, TTL_KEY};
use table::requests::{CopyDatabaseRequest, CopyDirection, CopyQueryToRequest, CopyTableRequest};
use table::table_name::TableName;
use table::table_reference::TableReference;
@@ -614,6 +619,11 @@ impl StatementExecutor {
}
Ok(time_ranges)
}
/// Returns the inserter for the statement executor.
pub(crate) fn inserter(&self) -> &InserterRef {
&self.inserter
}
}
fn to_copy_query_request(stmt: CopyQueryToArgument) -> Result<CopyQueryToRequest> {
@@ -749,6 +759,61 @@ fn idents_to_full_database_name(
}
}
/// The [`Inserter`] implementation for the statement executor.
pub struct InserterImpl {
statement_executor: StatementExecutorRef,
options: Option<InsertOptions>,
}
impl InserterImpl {
pub fn new(statement_executor: StatementExecutorRef, options: Option<InsertOptions>) -> Self {
Self {
statement_executor,
options,
}
}
}
#[async_trait::async_trait]
impl Inserter for InserterImpl {
async fn insert_rows(
&self,
context: &client::inserter::Context<'_>,
requests: RowInsertRequests,
) -> ClientResult<()> {
let mut ctx_builder = QueryContextBuilder::default()
.current_catalog(context.catalog.to_string())
.current_schema(context.schema.to_string());
if let Some(options) = self.options.as_ref() {
ctx_builder = ctx_builder
.set_extension(
TTL_KEY.to_string(),
format_duration(options.ttl).to_string(),
)
.set_extension(APPEND_MODE_KEY.to_string(), options.append_mode.to_string());
}
let query_ctx = ctx_builder.build().into();
self.statement_executor
.inserter()
.handle_row_inserts(
requests,
query_ctx,
self.statement_executor.as_ref(),
false,
false,
)
.await
.map_err(BoxedError::new)
.context(ClientExternalSnafu)
.map(|_| ())
}
fn set_options(&mut self, options: &InsertOptions) {
self.options = Some(*options);
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;

View File

@@ -479,6 +479,9 @@ pub struct RegionStatistic {
/// The details of the region.
#[serde(default)]
pub manifest: RegionManifestInfo,
/// The write throughput of the region.
#[serde(default)]
pub write_bytes: u64,
/// The latest entry id of the region's remote WAL since last flush.
/// For metric engine, there're two latest entry ids, one for data and one for metadata.
/// TODO(weny): remove this two fields and use single instead.

View File

@@ -21,3 +21,6 @@ topic_name_prefix = "distributed_test_greptimedb_wal_topic"
auto_prune_interval = "30s"
trigger_flush_threshold = 100
{{ endif }}
[stats_persistence]
ttl = "0s"