From 896d72191eb491169313a30fe13c030fc11ac387 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Thu, 21 Aug 2025 17:34:58 +0800 Subject: [PATCH] feat: introduce `PersistStatsHandler` (#6777) * feat: add `Inserter` trait and impl Signed-off-by: WenyXu * chore: import items Signed-off-by: WenyXu * feat: introduce `PersistStatsHandler` Signed-off-by: WenyXu * chore: disable persisting stats in sqlness Signed-off-by: WenyXu * reset channel manager Signed-off-by: WenyXu * chore: apply suggestions Signed-off-by: WenyXu * chore: avoid to collect Signed-off-by: WenyXu * refactor: remove insert options Signed-off-by: WenyXu * refactor: use `write_bytes` instead of `write_bytes_per_sec` Signed-off-by: WenyXu * refactor: compute write bytes delta Signed-off-by: WenyXu * chore: apply suggestions Signed-off-by: WenyXu * test: add unit tests Signed-off-by: WenyXu * chore: apply suggestions Signed-off-by: WenyXu * Update src/meta-srv/src/handler/persist_stats_handler.rs Co-authored-by: Yingwen --------- Signed-off-by: WenyXu Co-authored-by: Yingwen --- Cargo.lock | 3 + config/config.md | 3 + config/metasrv.example.toml | 9 + src/client/Cargo.toml | 2 + src/client/src/error.rs | 8 + src/client/src/inserter.rs | 60 ++ src/client/src/lib.rs | 1 + src/cmd/src/standalone.rs | 1 + src/common/event-recorder/src/error.rs | 10 +- src/common/event-recorder/src/recorder.rs | 5 - src/common/meta/src/datanode.rs | 3 + src/frontend/src/events.rs | 87 ++- src/frontend/src/instance/builder.rs | 1 - src/meta-srv/src/error.rs | 15 + src/meta-srv/src/events.rs | 90 +-- src/meta-srv/src/handler.rs | 17 + .../handler/collect_leader_region_handler.rs | 1 + src/meta-srv/src/handler/failure_handler.rs | 1 + .../src/handler/persist_stats_handler.rs | 555 ++++++++++++++++++ .../src/handler/region_lease_handler.rs | 1 + src/meta-srv/src/lib.rs | 1 + src/meta-srv/src/metasrv.rs | 27 +- src/meta-srv/src/metasrv/builder.rs | 33 +- src/meta-srv/src/selector/weight_compute.rs | 3 + src/meta-srv/src/utils.rs | 2 + src/meta-srv/src/utils/insert_forwarder.rs | 123 ++++ src/metric-engine/src/utils.rs | 1 + src/mito2/src/engine/basic_test.rs | 10 +- src/mito2/src/error.rs | 10 - src/mito2/src/lib.rs | 1 - src/mito2/src/meter.rs | 15 - src/mito2/src/meter/rate_meter.rs | 163 ----- src/mito2/src/region.rs | 20 +- src/mito2/src/region/opener.rs | 5 +- src/mito2/src/region_write_ctx.rs | 20 +- src/mito2/src/worker.rs | 50 +- src/mito2/src/worker/handle_write.rs | 4 +- src/operator/Cargo.toml | 1 + src/operator/src/statement.rs | 67 ++- src/store-api/src/region_engine.rs | 3 + tests/conf/metasrv-test.toml.template | 3 + 41 files changed, 1024 insertions(+), 411 deletions(-) create mode 100644 src/client/src/inserter.rs create mode 100644 src/meta-srv/src/handler/persist_stats_handler.rs create mode 100644 src/meta-srv/src/utils/insert_forwarder.rs delete mode 100644 src/mito2/src/meter.rs delete mode 100644 src/mito2/src/meter/rate_meter.rs diff --git a/Cargo.lock b/Cargo.lock index 42d9f5c731..3df10fd42e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1837,6 +1837,7 @@ dependencies = [ "enum_dispatch", "futures", "futures-util", + "humantime", "lazy_static", "moka", "parking_lot 0.12.4", @@ -1846,6 +1847,7 @@ dependencies = [ "rand 0.9.1", "serde_json", "snafu 0.8.6", + "store-api", "substrait 0.17.0", "substrait 0.37.3", "tokio", @@ -8622,6 +8624,7 @@ dependencies = [ "file-engine", "futures", "futures-util", + "humantime", "jsonb", "lazy_static", "meta-client", diff --git a/config/config.md b/config/config.md index 50a56269cf..14e3e61235 100644 --- a/config/config.md +++ b/config/config.md @@ -387,6 +387,9 @@ | `wal.create_topic_timeout` | String | `30s` | The timeout for creating a Kafka topic.
**It's only used when the provider is `kafka`**. | | `event_recorder` | -- | -- | Configuration options for the event recorder. | | `event_recorder.ttl` | String | `90d` | TTL for the events table that will be used to store the events. Default is `90d`. | +| `stats_persistence` | -- | -- | Configuration options for the stats persistence. | +| `stats_persistence.ttl` | String | `30d` | TTL for the stats table that will be used to store the stats. Default is `30d`.
Set to `0s` to disable stats persistence. | +| `stats_persistence.interval` | String | `60s` | The interval to persist the stats. Default is `60s`.
The minimum value is `60s`, if the value is less than `60s`, it will be overridden to `60s`. | | `logging` | -- | -- | The logging options. | | `logging.dir` | String | `./greptimedb_data/logs` | The directory to store the log files. If set to empty, logs will not be written to files. | | `logging.level` | String | Unset | The log level. Can be `info`/`debug`/`warn`/`error`. | diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index 82b7dd1e81..8e65152ade 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -256,6 +256,15 @@ create_topic_timeout = "30s" ## TTL for the events table that will be used to store the events. Default is `90d`. ttl = "90d" +## Configuration options for the stats persistence. +[stats_persistence] +## TTL for the stats table that will be used to store the stats. Default is `30d`. +## Set to `0s` to disable stats persistence. +ttl = "30d" +## The interval to persist the stats. Default is `60s`. +## The minimum value is `60s`, if the value is less than `60s`, it will be overridden to `60s`. +interval = "60s" + ## The logging options. [logging] ## The directory to store the log files. If set to empty, logs will not be written to files. diff --git a/src/client/Cargo.toml b/src/client/Cargo.toml index 27327bbe56..509ea9c96f 100644 --- a/src/client/Cargo.toml +++ b/src/client/Cargo.toml @@ -29,6 +29,7 @@ datatypes.workspace = true enum_dispatch = "0.3" futures.workspace = true futures-util.workspace = true +humantime.workspace = true lazy_static.workspace = true moka = { workspace = true, features = ["future"] } parking_lot.workspace = true @@ -38,6 +39,7 @@ query.workspace = true rand.workspace = true serde_json.workspace = true snafu.workspace = true +store-api.workspace = true substrait.workspace = true tokio.workspace = true tokio-stream = { workspace = true, features = ["net"] } diff --git a/src/client/src/error.rs b/src/client/src/error.rs index eefa04ee4e..e25f66bc9e 100644 --- a/src/client/src/error.rs +++ b/src/client/src/error.rs @@ -133,6 +133,13 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("External error"))] + External { + #[snafu(implicit)] + location: Location, + source: BoxedError, + }, } pub type Result = std::result::Result; @@ -154,6 +161,7 @@ impl ErrorExt for Error { Error::IllegalGrpcClientState { .. } => StatusCode::Unexpected, Error::InvalidTonicMetadataValue { .. } => StatusCode::InvalidArguments, Error::ConvertSchema { source, .. } => source.status_code(), + Error::External { source, .. } => source.status_code(), } } diff --git a/src/client/src/inserter.rs b/src/client/src/inserter.rs new file mode 100644 index 0000000000..38c0590c8e --- /dev/null +++ b/src/client/src/inserter.rs @@ -0,0 +1,60 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Duration; + +use api::v1::RowInsertRequests; +use humantime::format_duration; +use store_api::mito_engine_options::{APPEND_MODE_KEY, TTL_KEY}; + +use crate::error::Result; + +/// Context holds the catalog and schema information. +pub struct Context<'a> { + /// The catalog name. + pub catalog: &'a str, + /// The schema name. + pub schema: &'a str, +} + +/// Options for insert operations. +#[derive(Debug, Clone, Copy)] +pub struct InsertOptions { + /// Time-to-live for the inserted data. + pub ttl: Duration, + /// Whether to use append mode for the insert. + pub append_mode: bool, +} + +impl InsertOptions { + /// Converts the insert options to a list of key-value string hints. + pub fn to_hints(&self) -> Vec<(&'static str, String)> { + vec![ + (TTL_KEY, format_duration(self.ttl).to_string()), + (APPEND_MODE_KEY, self.append_mode.to_string()), + ] + } +} + +/// [`Inserter`] allows different components to share a unified mechanism for inserting data. +/// +/// An implementation may perform the insert locally (e.g., via a direct procedure call) or +/// delegate/forward it to another node for processing (e.g., MetaSrv forwarding to an +/// available Frontend). +#[async_trait::async_trait] +pub trait Inserter: Send + Sync { + async fn insert_rows(&self, context: &Context<'_>, requests: RowInsertRequests) -> Result<()>; + + fn set_options(&mut self, options: &InsertOptions); +} diff --git a/src/client/src/lib.rs b/src/client/src/lib.rs index 7078e71795..bf383acff9 100644 --- a/src/client/src/lib.rs +++ b/src/client/src/lib.rs @@ -19,6 +19,7 @@ pub mod client_manager; pub mod database; pub mod error; pub mod flow; +pub mod inserter; pub mod load_balance; mod metrics; pub mod region; diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index ccf676004d..a6a09925e3 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -834,6 +834,7 @@ impl InformationExtension for StandaloneInformationExtension { region_manifest: region_stat.manifest.into(), data_topic_latest_entry_id: region_stat.data_topic_latest_entry_id, metadata_topic_latest_entry_id: region_stat.metadata_topic_latest_entry_id, + write_bytes: 0, } }) .collect::>(); diff --git a/src/common/event-recorder/src/error.rs b/src/common/event-recorder/src/error.rs index 01f7327718..dc39943f67 100644 --- a/src/common/event-recorder/src/error.rs +++ b/src/common/event-recorder/src/error.rs @@ -22,12 +22,6 @@ use snafu::{Location, Snafu}; #[snafu(visibility(pub))] #[stack_trace_debug] pub enum Error { - #[snafu(display("No available frontend"))] - NoAvailableFrontend { - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("Mismatched schema, expected: {:?}, actual: {:?}", expected, actual))] MismatchedSchema { #[snafu(implicit)] @@ -69,9 +63,7 @@ impl ErrorExt for Error { Error::MismatchedSchema { .. } | Error::SerializeEvent { .. } => { StatusCode::InvalidArguments } - Error::NoAvailableFrontend { .. } - | Error::InsertEvents { .. } - | Error::KvBackend { .. } => StatusCode::Internal, + Error::InsertEvents { .. } | Error::KvBackend { .. } => StatusCode::Internal, } } diff --git a/src/common/event-recorder/src/recorder.rs b/src/common/event-recorder/src/recorder.rs index 3aeebb2377..b3575587aa 100644 --- a/src/common/event-recorder/src/recorder.rs +++ b/src/common/event-recorder/src/recorder.rs @@ -236,11 +236,6 @@ pub trait EventHandler: Send + Sync + 'static { /// Processes and handles incoming events. The [DefaultEventHandlerImpl] implementation forwards events to frontend instances for persistence. /// We use `&[Box]` to avoid consuming the events, so the caller can buffer the events and retry if the handler fails. async fn handle(&self, events: &[Box]) -> Result<()>; - - /// Returns the handler options for the event type. We can use different options for different event types. - fn options(&self, _event_type: &str) -> EventHandlerOptions { - EventHandlerOptions::default() - } } /// Configuration options for the event recorder. diff --git a/src/common/meta/src/datanode.rs b/src/common/meta/src/datanode.rs index 2ffb722cfd..f1ca6c15d7 100644 --- a/src/common/meta/src/datanode.rs +++ b/src/common/meta/src/datanode.rs @@ -102,6 +102,8 @@ pub struct RegionStat { pub index_size: u64, /// The manifest infoof the region. pub region_manifest: RegionManifestInfo, + /// The write bytes. + pub write_bytes: u64, /// The latest entry id of topic used by data. /// **Only used by remote WAL prune.** pub data_topic_latest_entry_id: u64, @@ -304,6 +306,7 @@ impl From<&api::v1::meta::RegionStat> for RegionStat { sst_num: region_stat.sst_num, index_size: region_stat.index_size, region_manifest: region_stat.manifest.into(), + write_bytes: region_stat.write_bytes, data_topic_latest_entry_id: region_stat.data_topic_latest_entry_id, metadata_topic_latest_entry_id: region_stat.metadata_topic_latest_entry_id, } diff --git a/src/frontend/src/events.rs b/src/frontend/src/events.rs index ade5045f50..23f4f71767 100644 --- a/src/frontend/src/events.rs +++ b/src/frontend/src/events.rs @@ -15,68 +15,76 @@ use std::time::Duration; use async_trait::async_trait; +use client::inserter::{Context, InsertOptions, Inserter}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME}; use common_error::ext::BoxedError; use common_event_recorder::error::{InsertEventsSnafu, Result}; -use common_event_recorder::{ - build_row_inserts_request, group_events_by_type, Event, EventHandler, EventHandlerOptions, -}; +use common_event_recorder::{build_row_inserts_request, group_events_by_type, Event, EventHandler}; use common_frontend::slow_query_event::SLOW_QUERY_EVENT_TYPE; -use humantime::format_duration; -use operator::insert::InserterRef; -use operator::statement::StatementExecutorRef; -use session::context::QueryContextBuilder; +use datafusion::common::HashMap; +use operator::statement::{InserterImpl, StatementExecutorRef}; use snafu::ResultExt; -use store_api::mito_engine_options::{APPEND_MODE_KEY, TTL_KEY}; /// EventHandlerImpl is the default event handler implementation in frontend. pub struct EventHandlerImpl { - inserter: InserterRef, - statement_executor: StatementExecutorRef, - slow_query_ttl: Duration, - global_ttl: Duration, + default_inserter: Box, + /// The inserters for the event types. + inserters: HashMap>, } impl EventHandlerImpl { /// Create a new EventHandlerImpl. pub fn new( - inserter: InserterRef, statement_executor: StatementExecutorRef, slow_query_ttl: Duration, global_ttl: Duration, ) -> Self { Self { - inserter, - statement_executor, - slow_query_ttl, - global_ttl, + inserters: HashMap::from([( + SLOW_QUERY_EVENT_TYPE.to_string(), + Box::new(InserterImpl::new( + statement_executor.clone(), + Some(InsertOptions { + ttl: slow_query_ttl, + append_mode: true, + }), + )) as _, + )]), + default_inserter: Box::new(InserterImpl::new( + statement_executor.clone(), + Some(InsertOptions { + ttl: global_ttl, + append_mode: true, + }), + )), } } + + fn inserter(&self, event_type: &str) -> &dyn Inserter { + let Some(inserter) = self.inserters.get(event_type) else { + return self.default_inserter.as_ref(); + }; + + inserter.as_ref() + } } +const DEFAULT_CONTEXT: Context = Context { + catalog: DEFAULT_CATALOG_NAME, + schema: DEFAULT_PRIVATE_SCHEMA_NAME, +}; + #[async_trait] impl EventHandler for EventHandlerImpl { async fn handle(&self, events: &[Box]) -> Result<()> { let event_groups = group_events_by_type(events); for (event_type, events) in event_groups { - let opts = self.options(event_type); - let query_ctx = QueryContextBuilder::default() - .current_catalog(DEFAULT_CATALOG_NAME.to_string()) - .current_schema(DEFAULT_PRIVATE_SCHEMA_NAME.to_string()) - .set_extension(TTL_KEY.to_string(), format_duration(opts.ttl).to_string()) - .set_extension(APPEND_MODE_KEY.to_string(), opts.append_mode.to_string()) - .build() - .into(); + let requests = build_row_inserts_request(&events)?; + let inserter = self.inserter(event_type); - self.inserter - .handle_row_inserts( - build_row_inserts_request(&events)?, - query_ctx, - &self.statement_executor, - false, - false, - ) + inserter + .insert_rows(&DEFAULT_CONTEXT, requests) .await .map_err(BoxedError::new) .context(InsertEventsSnafu)?; @@ -84,17 +92,4 @@ impl EventHandler for EventHandlerImpl { Ok(()) } - - fn options(&self, event_type: &str) -> EventHandlerOptions { - match event_type { - SLOW_QUERY_EVENT_TYPE => EventHandlerOptions { - ttl: self.slow_query_ttl, - append_mode: true, - }, - _ => EventHandlerOptions { - ttl: self.global_ttl, - append_mode: true, - }, - } - } } diff --git a/src/frontend/src/instance/builder.rs b/src/frontend/src/instance/builder.rs index bc4aaee4b5..2bc959c403 100644 --- a/src/frontend/src/instance/builder.rs +++ b/src/frontend/src/instance/builder.rs @@ -197,7 +197,6 @@ impl FrontendBuilder { plugins.insert::(statement_executor.clone()); let event_recorder = Arc::new(EventRecorderImpl::new(Box::new(EventHandlerImpl::new( - inserter.clone(), statement_executor.clone(), self.options.slow_query.ttl, self.options.event_recorder.ttl, diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 62e1138875..e389e548ea 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -54,6 +54,19 @@ pub enum Error { peer_id: u64, }, + #[snafu(display("Failed to lookup frontends"))] + LookupFrontends { + #[snafu(implicit)] + location: Location, + source: common_meta::error::Error, + }, + + #[snafu(display("No available frontend"))] + NoAvailableFrontend { + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Another migration procedure is running for region: {}", region_id))] MigrationRunning { #[snafu(implicit)] @@ -1062,6 +1075,8 @@ impl ErrorExt for Error { | Error::UnexpectedLogicalRouteTable { source, .. } | Error::UpdateTopicNameValue { source, .. } | Error::ParseWalOptions { source, .. } => source.status_code(), + Error::LookupFrontends { source, .. } => source.status_code(), + Error::NoAvailableFrontend { .. } => StatusCode::IllegalState, Error::InitMetadata { source, .. } | Error::InitDdlManager { source, .. } diff --git a/src/meta-srv/src/events.rs b/src/meta-srv/src/events.rs index cede03f12a..5c2e555baa 100644 --- a/src/meta-srv/src/events.rs +++ b/src/meta-srv/src/events.rs @@ -12,43 +12,25 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; -use std::time::Duration; - use async_trait::async_trait; -use client::{Client, Database}; +use client::inserter::{Context, Inserter}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME}; use common_error::ext::BoxedError; -use common_event_recorder::error::{ - InsertEventsSnafu, KvBackendSnafu, NoAvailableFrontendSnafu, Result, -}; -use common_event_recorder::{ - build_row_inserts_request, group_events_by_type, Event, EventHandler, EventHandlerOptions, -}; -use common_grpc::channel_manager::ChannelManager; -use common_meta::peer::PeerLookupServiceRef; -use common_telemetry::debug; -use snafu::{ensure, ResultExt}; - -use crate::cluster::MetaPeerClientRef; -use crate::lease::MetaPeerLookupService; +use common_event_recorder::error::{InsertEventsSnafu, Result}; +use common_event_recorder::{build_row_inserts_request, group_events_by_type, Event, EventHandler}; +use snafu::ResultExt; pub mod region_migration_event; -/// EventHandlerImpl is the default event handler implementation in metasrv. It sends the received events to the frontend instances. +/// EventHandlerImpl is the default event handler implementation in metasrv. +/// It sends the received events to the frontend instances. pub struct EventHandlerImpl { - peer_lookup_service: PeerLookupServiceRef, - channel_manager: ChannelManager, - ttl: Duration, + inserter: Box, } impl EventHandlerImpl { - pub fn new(meta_peer_client: MetaPeerClientRef, ttl: Duration) -> Self { - Self { - peer_lookup_service: Arc::new(MetaPeerLookupService::new(meta_peer_client)), - channel_manager: ChannelManager::new(), - ttl, - } + pub fn new(inserter: Box) -> Self { + Self { inserter } } } @@ -57,18 +39,15 @@ impl EventHandler for EventHandlerImpl { async fn handle(&self, events: &[Box]) -> Result<()> { let event_groups = group_events_by_type(events); - for (event_type, events) in event_groups { - let opts = self.options(event_type); - let hints = opts.to_hints(); - - self.build_database_client() - .await? - .row_inserts_with_hints( - build_row_inserts_request(&events)?, - &hints - .iter() - .map(|(k, v)| (*k, v.as_str())) - .collect::>(), + for (_, events) in event_groups { + let requests = build_row_inserts_request(&events)?; + self.inserter + .insert_rows( + &Context { + catalog: DEFAULT_CATALOG_NAME, + schema: DEFAULT_PRIVATE_SCHEMA_NAME, + }, + requests, ) .await .map_err(BoxedError::new) @@ -77,37 +56,4 @@ impl EventHandler for EventHandlerImpl { Ok(()) } - - fn options(&self, _event_type: &str) -> EventHandlerOptions { - EventHandlerOptions { - ttl: self.ttl, - append_mode: true, - } - } -} - -impl EventHandlerImpl { - async fn build_database_client(&self) -> Result { - let frontends = self - .peer_lookup_service - .active_frontends() - .await - .map_err(BoxedError::new) - .context(KvBackendSnafu)?; - - ensure!(!frontends.is_empty(), NoAvailableFrontendSnafu); - - let urls = frontends - .into_iter() - .map(|peer| peer.addr) - .collect::>(); - - debug!("Available frontend addresses: {:?}", urls); - - Ok(Database::new( - DEFAULT_CATALOG_NAME, - DEFAULT_PRIVATE_SCHEMA_NAME, - Client::with_manager_and_urls(self.channel_manager.clone(), urls), - )) - } } diff --git a/src/meta-srv/src/handler.rs b/src/meta-srv/src/handler.rs index 0e4bc763f0..8b241ce8ee 100644 --- a/src/meta-srv/src/handler.rs +++ b/src/meta-srv/src/handler.rs @@ -55,6 +55,7 @@ use tokio::sync::{oneshot, watch, Notify, RwLock}; use crate::error::{self, DeserializeFromJsonSnafu, Result, UnexpectedInstructionReplySnafu}; use crate::handler::collect_topic_stats_handler::CollectTopicStatsHandler; use crate::handler::flow_state_handler::FlowStateHandler; +use crate::handler::persist_stats_handler::PersistStatsHandler; use crate::metasrv::Context; use crate::metrics::{METRIC_META_HANDLER_EXECUTE, METRIC_META_HEARTBEAT_CONNECTION_NUM}; use crate::pubsub::PublisherRef; @@ -74,10 +75,12 @@ pub mod flow_state_handler; pub mod keep_lease_handler; pub mod mailbox_handler; pub mod on_leader_start_handler; +pub mod persist_stats_handler; pub mod publish_heartbeat_handler; pub mod region_lease_handler; pub mod remap_flow_peer_handler; pub mod response_header_handler; + #[cfg(test)] pub mod test_utils; @@ -537,6 +540,9 @@ pub struct HeartbeatHandlerGroupBuilder { /// A simple handler for flow internal state report flow_state_handler: Option, + /// The handler to persist stats. + persist_stats_handler: Option, + /// The plugins. plugins: Option, @@ -554,6 +560,7 @@ impl HeartbeatHandlerGroupBuilder { region_lease_handler: None, flush_stats_factor: None, flow_state_handler: None, + persist_stats_handler: None, plugins: None, pushers, handlers: vec![], @@ -582,6 +589,11 @@ impl HeartbeatHandlerGroupBuilder { self } + pub fn with_persist_stats_handler(mut self, handler: Option) -> Self { + self.persist_stats_handler = handler; + self + } + /// Sets the [`Plugins`]. pub fn with_plugins(mut self, plugins: Option) -> Self { self.plugins = plugins; @@ -624,6 +636,11 @@ impl HeartbeatHandlerGroupBuilder { } self.add_handler_last(CollectLeaderRegionHandler); self.add_handler_last(CollectTopicStatsHandler); + // Persist stats handler should be in front of collect stats handler. + // Because collect stats handler will consume the stats from the accumulator. + if let Some(persist_stats_handler) = self.persist_stats_handler.take() { + self.add_handler_last(persist_stats_handler); + } self.add_handler_last(CollectStatsHandler::new(self.flush_stats_factor)); self.add_handler_last(RemapFlowPeerHandler::default()); diff --git a/src/meta-srv/src/handler/collect_leader_region_handler.rs b/src/meta-srv/src/handler/collect_leader_region_handler.rs index 149660c413..ae00a6dc9a 100644 --- a/src/meta-srv/src/handler/collect_leader_region_handler.rs +++ b/src/meta-srv/src/handler/collect_leader_region_handler.rs @@ -87,6 +87,7 @@ mod tests { index_size: 0, data_topic_latest_entry_id: 0, metadata_topic_latest_entry_id: 0, + write_bytes: 0, } } diff --git a/src/meta-srv/src/handler/failure_handler.rs b/src/meta-srv/src/handler/failure_handler.rs index 9530d0d128..1a5a838c8c 100644 --- a/src/meta-srv/src/handler/failure_handler.rs +++ b/src/meta-srv/src/handler/failure_handler.rs @@ -105,6 +105,7 @@ mod tests { }, data_topic_latest_entry_id: 0, metadata_topic_latest_entry_id: 0, + write_bytes: 0, } } acc.stat = Some(Stat { diff --git a/src/meta-srv/src/handler/persist_stats_handler.rs b/src/meta-srv/src/handler/persist_stats_handler.rs new file mode 100644 index 0000000000..3eba86c659 --- /dev/null +++ b/src/meta-srv/src/handler/persist_stats_handler.rs @@ -0,0 +1,555 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::{Duration, Instant}; + +use api::v1::meta::{HeartbeatRequest, Role}; +use api::v1::value::ValueData; +use api::v1::{ColumnSchema, Row, RowInsertRequest, RowInsertRequests, Rows, Value}; +use client::inserter::{Context as InserterContext, Inserter}; +use client::DEFAULT_CATALOG_NAME; +use common_catalog::consts::DEFAULT_PRIVATE_SCHEMA_NAME; +use common_macro::{Schema, ToRow}; +use common_meta::datanode::RegionStat; +use common_meta::DatanodeId; +use common_telemetry::warn; +use dashmap::DashMap; +use store_api::region_engine::RegionRole; +use store_api::storage::RegionId; + +use crate::error::Result; +use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler}; +use crate::metasrv::Context; + +/// The handler to persist stats. +pub struct PersistStatsHandler { + inserter: Box, + last_persisted_region_stats: DashMap, + last_persisted_time: DashMap, + persist_interval: Duration, +} + +/// The name of the table to persist region stats. +const META_REGION_STATS_TABLE_NAME: &str = "region_statistics"; +/// The default context to persist region stats. +const DEFAULT_CONTEXT: InserterContext = InserterContext { + catalog: DEFAULT_CATALOG_NAME, + schema: DEFAULT_PRIVATE_SCHEMA_NAME, +}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +struct PersistedRegionStat { + region_id: RegionId, + write_bytess: u64, +} + +impl From<&RegionStat> for PersistedRegionStat { + fn from(stat: &RegionStat) -> Self { + Self { + region_id: stat.id, + write_bytess: stat.write_bytes, + } + } +} + +#[derive(ToRow, Schema)] +struct PersistRegionStat<'a> { + table_id: u32, + region_id: u64, + region_number: u32, + manifest_size: u64, + datanode_id: u64, + #[col(datatype = "string")] + engine: &'a str, + num_rows: u64, + sst_num: u64, + sst_size: u64, + write_bytes_delta: u64, + #[col( + name = "greptime_timestamp", + semantic = "Timestamp", + datatype = "TimestampMillisecond" + )] + timestamp_millis: i64, +} + +/// Compute the region stat to persist. +fn compute_persist_region_stat( + region_stat: &RegionStat, + datanode_id: DatanodeId, + timestamp_millis: i64, + persisted_region_stat: Option, +) -> PersistRegionStat { + let write_bytes_delta = persisted_region_stat + .and_then(|persisted_region_stat| { + region_stat + .write_bytes + .checked_sub(persisted_region_stat.write_bytess) + }) + .unwrap_or_default(); + + PersistRegionStat { + table_id: region_stat.id.table_id(), + region_id: region_stat.id.as_u64(), + region_number: region_stat.id.region_number(), + manifest_size: region_stat.manifest_size, + datanode_id, + engine: region_stat.engine.as_str(), + num_rows: region_stat.num_rows, + sst_num: region_stat.sst_num, + sst_size: region_stat.sst_size, + write_bytes_delta, + timestamp_millis, + } +} + +fn to_persisted_if_leader( + region_stat: &RegionStat, + last_persisted_region_stats: &DashMap, + datanode_id: DatanodeId, + timestamp_millis: i64, +) -> Option<(Row, PersistedRegionStat)> { + if matches!(region_stat.role, RegionRole::Leader) { + let persisted_region_stat = last_persisted_region_stats.get(®ion_stat.id).map(|s| *s); + Some(( + compute_persist_region_stat( + region_stat, + datanode_id, + timestamp_millis, + persisted_region_stat, + ) + .to_row(), + PersistedRegionStat::from(region_stat), + )) + } else { + None + } +} + +/// Align the timestamp to the nearest interval. +/// +/// # Panics +/// Panics if `interval` as milliseconds is zero. +fn align_ts(ts: i64, interval: Duration) -> i64 { + assert!( + interval.as_millis() != 0, + "interval must be greater than zero" + ); + ts / interval.as_millis() as i64 * interval.as_millis() as i64 +} + +impl PersistStatsHandler { + /// Creates a new [`PersistStatsHandler`]. + pub fn new(inserter: Box, mut persist_interval: Duration) -> Self { + if persist_interval < Duration::from_secs(60) { + warn!("persist_interval is less than 60 seconds, set to 60 seconds"); + persist_interval = Duration::from_secs(60); + } + if persist_interval.as_millis() == 0 { + warn!("persist_interval as milliseconds is zero, set to 60 second"); + persist_interval = Duration::from_secs(60); + } + + Self { + inserter, + last_persisted_region_stats: DashMap::new(), + last_persisted_time: DashMap::new(), + persist_interval, + } + } + + fn should_persist(&self, datanode_id: DatanodeId) -> bool { + let Some(last_persisted_time) = self.last_persisted_time.get(&datanode_id) else { + return true; + }; + + last_persisted_time.elapsed() >= self.persist_interval + } + + async fn persist( + &self, + timestamp_millis: i64, + datanode_id: DatanodeId, + region_stats: &[RegionStat], + ) { + // Safety: persist_interval is greater than zero. + let aligned_ts = align_ts(timestamp_millis, self.persist_interval); + let (rows, incoming_region_stats): (Vec<_>, Vec<_>) = region_stats + .iter() + .flat_map(|region_stat| { + to_persisted_if_leader( + region_stat, + &self.last_persisted_region_stats, + datanode_id, + aligned_ts, + ) + }) + .unzip(); + + if rows.is_empty() { + return; + } + + if let Err(err) = self + .inserter + .insert_rows( + &DEFAULT_CONTEXT, + RowInsertRequests { + inserts: vec![RowInsertRequest { + table_name: META_REGION_STATS_TABLE_NAME.to_string(), + rows: Some(Rows { + schema: PersistRegionStat::schema(), + rows, + }), + }], + }, + ) + .await + { + warn!( + "Failed to persist region stats, datanode_id: {}, error: {:?}", + datanode_id, err + ); + return; + } + + self.last_persisted_time.insert(datanode_id, Instant::now()); + for s in incoming_region_stats { + self.last_persisted_region_stats.insert(s.region_id, s); + } + } +} + +#[async_trait::async_trait] +impl HeartbeatHandler for PersistStatsHandler { + fn is_acceptable(&self, role: Role) -> bool { + role == Role::Datanode + } + + async fn handle( + &self, + _req: &HeartbeatRequest, + _: &mut Context, + acc: &mut HeartbeatAccumulator, + ) -> Result { + let Some(current_stat) = acc.stat.as_ref() else { + return Ok(HandleControl::Continue); + }; + + if !self.should_persist(current_stat.id) { + return Ok(HandleControl::Continue); + } + + self.persist( + current_stat.timestamp_millis, + current_stat.id, + ¤t_stat.region_stats, + ) + .await; + + Ok(HandleControl::Continue) + } +} + +#[cfg(test)] +mod tests { + use std::sync::{Arc, Mutex}; + + use client::inserter::{Context as InserterContext, InsertOptions}; + use common_meta::datanode::{RegionManifestInfo, RegionStat, Stat}; + use store_api::region_engine::RegionRole; + use store_api::storage::RegionId; + + use super::*; + use crate::handler::test_utils::TestEnv; + + fn create_test_region_stat( + table_id: u32, + region_number: u32, + write_bytes: u64, + engine: &str, + ) -> RegionStat { + let region_id = RegionId::new(table_id, region_number); + RegionStat { + id: region_id, + rcus: 100, + wcus: 200, + approximate_bytes: 1024, + engine: engine.to_string(), + role: RegionRole::Leader, + num_rows: 1000, + memtable_size: 512, + manifest_size: 256, + sst_size: 2048, + sst_num: 5, + index_size: 128, + region_manifest: RegionManifestInfo::Mito { + manifest_version: 1, + flushed_entry_id: 100, + }, + write_bytes, + data_topic_latest_entry_id: 200, + metadata_topic_latest_entry_id: 200, + } + } + + #[test] + fn test_compute_persist_region_stat_with_no_persisted_stat() { + let region_stat = create_test_region_stat(1, 1, 1000, "mito"); + let datanode_id = 123; + let timestamp_millis = 1640995200000; // 2022-01-01 00:00:00 UTC + let result = compute_persist_region_stat(®ion_stat, datanode_id, timestamp_millis, None); + assert_eq!(result.table_id, 1); + assert_eq!(result.region_id, region_stat.id.as_u64()); + assert_eq!(result.region_number, 1); + assert_eq!(result.manifest_size, 256); + assert_eq!(result.datanode_id, datanode_id); + assert_eq!(result.engine, "mito"); + assert_eq!(result.num_rows, 1000); + assert_eq!(result.sst_num, 5); + assert_eq!(result.sst_size, 2048); + assert_eq!(result.write_bytes_delta, 0); // No previous stat, so delta is 0 + assert_eq!(result.timestamp_millis, timestamp_millis); + } + + #[test] + fn test_compute_persist_region_stat_with_persisted_stat_increase() { + let region_stat = create_test_region_stat(2, 3, 1500, "mito"); + let datanode_id = 456; + let timestamp_millis = 1640995260000; // 2022-01-01 00:01:00 UTC + let persisted_stat = PersistedRegionStat { + region_id: region_stat.id, + write_bytess: 1000, // Previous write bytes + }; + let result = compute_persist_region_stat( + ®ion_stat, + datanode_id, + timestamp_millis, + Some(persisted_stat), + ); + assert_eq!(result.table_id, 2); + assert_eq!(result.region_id, region_stat.id.as_u64()); + assert_eq!(result.region_number, 3); + assert_eq!(result.manifest_size, 256); + assert_eq!(result.datanode_id, datanode_id); + assert_eq!(result.engine, "mito"); + assert_eq!(result.num_rows, 1000); + assert_eq!(result.sst_num, 5); + assert_eq!(result.sst_size, 2048); + assert_eq!(result.write_bytes_delta, 500); // 1500 - 1000 = 500 + assert_eq!(result.timestamp_millis, timestamp_millis); + } + + #[test] + fn test_compute_persist_region_stat_with_persisted_stat_decrease() { + let region_stat = create_test_region_stat(3, 5, 800, "mito"); + let datanode_id = 789; + let timestamp_millis = 1640995320000; // 2022-01-01 00:02:00 UTC + let persisted_stat = PersistedRegionStat { + region_id: region_stat.id, + write_bytess: 1200, // Previous write bytes (higher than current) + }; + let result = compute_persist_region_stat( + ®ion_stat, + datanode_id, + timestamp_millis, + Some(persisted_stat), + ); + assert_eq!(result.table_id, 3); + assert_eq!(result.region_id, region_stat.id.as_u64()); + assert_eq!(result.region_number, 5); + assert_eq!(result.manifest_size, 256); + assert_eq!(result.datanode_id, datanode_id); + assert_eq!(result.engine, "mito"); + assert_eq!(result.num_rows, 1000); + assert_eq!(result.sst_num, 5); + assert_eq!(result.sst_size, 2048); + assert_eq!(result.write_bytes_delta, 0); // 800 - 1200 would be negative, so 0 due to checked_sub + assert_eq!(result.timestamp_millis, timestamp_millis); + } + + #[test] + fn test_compute_persist_region_stat_with_persisted_stat_equal() { + let region_stat = create_test_region_stat(4, 7, 2000, "mito"); + let datanode_id = 101; + let timestamp_millis = 1640995380000; // 2022-01-01 00:03:00 UTC + let persisted_stat = PersistedRegionStat { + region_id: region_stat.id, + write_bytess: 2000, // Same as current write bytes + }; + let result = compute_persist_region_stat( + ®ion_stat, + datanode_id, + timestamp_millis, + Some(persisted_stat), + ); + assert_eq!(result.table_id, 4); + assert_eq!(result.region_id, region_stat.id.as_u64()); + assert_eq!(result.region_number, 7); + assert_eq!(result.manifest_size, 256); + assert_eq!(result.datanode_id, datanode_id); + assert_eq!(result.engine, "mito"); + assert_eq!(result.num_rows, 1000); + assert_eq!(result.sst_num, 5); + assert_eq!(result.sst_size, 2048); + assert_eq!(result.write_bytes_delta, 0); // 2000 - 2000 = 0 + assert_eq!(result.timestamp_millis, timestamp_millis); + } + + #[test] + fn test_compute_persist_region_stat_with_overflow_protection() { + let region_stat = create_test_region_stat(8, 15, 500, "mito"); + let datanode_id = 505; + let timestamp_millis = 1640995620000; // 2022-01-01 00:07:00 UTC + let persisted_stat = PersistedRegionStat { + region_id: region_stat.id, + write_bytess: 1000, // Higher than current, would cause underflow + }; + let result = compute_persist_region_stat( + ®ion_stat, + datanode_id, + timestamp_millis, + Some(persisted_stat), + ); + assert_eq!(result.table_id, 8); + assert_eq!(result.region_id, region_stat.id.as_u64()); + assert_eq!(result.region_number, 15); + assert_eq!(result.manifest_size, 256); + assert_eq!(result.datanode_id, datanode_id); + assert_eq!(result.engine, "mito"); + assert_eq!(result.num_rows, 1000); + assert_eq!(result.sst_num, 5); + assert_eq!(result.sst_size, 2048); + assert_eq!(result.write_bytes_delta, 0); // checked_sub returns None, so default to 0 + assert_eq!(result.timestamp_millis, timestamp_millis); + } + + struct MockInserter { + requests: Arc>>, + } + + #[async_trait::async_trait] + impl Inserter for MockInserter { + async fn insert_rows( + &self, + _context: &InserterContext<'_>, + requests: api::v1::RowInsertRequests, + ) -> client::error::Result<()> { + self.requests.lock().unwrap().extend(requests.inserts); + + Ok(()) + } + + fn set_options(&mut self, _options: &InsertOptions) {} + } + + #[tokio::test] + async fn test_not_persist_region_stats() { + let env = TestEnv::new(); + let mut ctx = env.ctx(); + + let requests = Arc::new(Mutex::new(vec![])); + let inserter = MockInserter { + requests: requests.clone(), + }; + let handler = PersistStatsHandler::new(Box::new(inserter), Duration::from_secs(10)); + let mut acc = HeartbeatAccumulator { + stat: Some(Stat { + id: 1, + timestamp_millis: 1640995200000, + region_stats: vec![create_test_region_stat(1, 1, 1000, "mito")], + ..Default::default() + }), + ..Default::default() + }; + handler.last_persisted_time.insert(1, Instant::now()); + // Do not persist + handler + .handle(&HeartbeatRequest::default(), &mut ctx, &mut acc) + .await + .unwrap(); + assert!(requests.lock().unwrap().is_empty()); + } + + #[tokio::test] + async fn test_persist_region_stats() { + let env = TestEnv::new(); + let mut ctx = env.ctx(); + let requests = Arc::new(Mutex::new(vec![])); + let inserter = MockInserter { + requests: requests.clone(), + }; + + let handler = PersistStatsHandler::new(Box::new(inserter), Duration::from_secs(10)); + + let region_stat = create_test_region_stat(1, 1, 1000, "mito"); + let timestamp_millis = 1640995200000; + let datanode_id = 1; + let region_id = RegionId::new(1, 1); + let mut acc = HeartbeatAccumulator { + stat: Some(Stat { + id: datanode_id, + timestamp_millis, + region_stats: vec![region_stat.clone()], + ..Default::default() + }), + ..Default::default() + }; + + handler.last_persisted_region_stats.insert( + region_id, + PersistedRegionStat { + region_id, + write_bytess: 500, + }, + ); + let (expected_row, expected_persisted_region_stat) = to_persisted_if_leader( + ®ion_stat, + &handler.last_persisted_region_stats, + datanode_id, + timestamp_millis, + ) + .unwrap(); + let before_insert_time = Instant::now(); + // Persist + handler + .handle(&HeartbeatRequest::default(), &mut ctx, &mut acc) + .await + .unwrap(); + let request = { + let mut requests = requests.lock().unwrap(); + assert_eq!(requests.len(), 1); + requests.pop().unwrap() + }; + assert_eq!(request.table_name, META_REGION_STATS_TABLE_NAME.to_string()); + assert_eq!(request.rows.unwrap().rows, vec![expected_row]); + + // Check last persisted time + assert!(handler + .last_persisted_time + .get(&datanode_id) + .unwrap() + .gt(&before_insert_time)); + + // Check last persisted region stats + assert_eq!( + handler + .last_persisted_region_stats + .get(®ion_id) + .unwrap() + .value(), + &expected_persisted_region_stat + ); + } +} diff --git a/src/meta-srv/src/handler/region_lease_handler.rs b/src/meta-srv/src/handler/region_lease_handler.rs index 311b101103..c37c3d221b 100644 --- a/src/meta-srv/src/handler/region_lease_handler.rs +++ b/src/meta-srv/src/handler/region_lease_handler.rs @@ -167,6 +167,7 @@ mod test { }, data_topic_latest_entry_id: 0, metadata_topic_latest_entry_id: 0, + write_bytes: 0, } } diff --git a/src/meta-srv/src/lib.rs b/src/meta-srv/src/lib.rs index 8b78c2aa89..aef561e1dc 100644 --- a/src/meta-srv/src/lib.rs +++ b/src/meta-srv/src/lib.rs @@ -16,6 +16,7 @@ #![feature(assert_matches)] #![feature(hash_set_entry)] #![feature(let_chains)] +#![feature(duration_constructors)] pub mod bootstrap; pub mod cache_invalidator; diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 9a4d97e1cc..dc3bc6c32b 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -100,6 +100,26 @@ pub enum BackendImpl { MysqlStore, } +/// Configuration options for the stats persistence. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct StatsPersistenceOptions { + /// TTL for the stats table that will be used to store the stats. + #[serde(with = "humantime_serde")] + pub ttl: Duration, + /// The interval to persist the stats. + #[serde(with = "humantime_serde")] + pub interval: Duration, +} + +impl Default for StatsPersistenceOptions { + fn default() -> Self { + Self { + ttl: Duration::from_days(30), + interval: Duration::from_secs(60), + } + } +} + #[derive(Clone, PartialEq, Serialize, Deserialize)] #[serde(default)] pub struct MetasrvOptions { @@ -184,6 +204,8 @@ pub struct MetasrvOptions { pub node_max_idle_time: Duration, /// The event recorder options. pub event_recorder: EventRecorderOptions, + /// The stats persistence options. + pub stats_persistence: StatsPersistenceOptions, } impl fmt::Debug for MetasrvOptions { @@ -213,7 +235,9 @@ impl fmt::Debug for MetasrvOptions { .field("max_txn_ops", &self.max_txn_ops) .field("flush_stats_factor", &self.flush_stats_factor) .field("tracing", &self.tracing) - .field("backend", &self.backend); + .field("backend", &self.backend) + .field("event_recorder", &self.event_recorder) + .field("stats_persistence", &self.stats_persistence); #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))] debug_struct.field("meta_table_name", &self.meta_table_name); @@ -275,6 +299,7 @@ impl Default for MetasrvOptions { meta_election_lock_id: common_meta::kv_backend::DEFAULT_META_ELECTION_LOCK_ID, node_max_idle_time: Duration::from_secs(24 * 60 * 60), event_recorder: EventRecorderOptions::default(), + stats_persistence: StatsPersistenceOptions::default(), } } } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 89c064e71c..769dbd883f 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -17,6 +17,7 @@ use std::sync::atomic::AtomicBool; use std::sync::{Arc, Mutex, RwLock}; use client::client_manager::NodeClients; +use client::inserter::InsertOptions; use common_base::Plugins; use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID}; use common_event_recorder::{EventRecorderImpl, EventRecorderRef}; @@ -55,6 +56,7 @@ use crate::flow_meta_alloc::FlowPeerAllocator; use crate::greptimedb_telemetry::get_greptimedb_telemetry_task; use crate::handler::failure_handler::RegionFailureHandler; use crate::handler::flow_state_handler::FlowStateHandler; +use crate::handler::persist_stats_handler::PersistStatsHandler; use crate::handler::region_lease_handler::{CustomizedRegionLeaseRenewerRef, RegionLeaseHandler}; use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatMailbox, Pushers}; use crate::lease::MetaPeerLookupService; @@ -77,6 +79,7 @@ use crate::service::mailbox::MailboxRef; use crate::service::store::cached_kv::LeaderCachedKvBackend; use crate::state::State; use crate::table_meta_alloc::MetasrvPeerAllocator; +use crate::utils::insert_forwarder::InsertForwarder; // TODO(fys): try use derive_builder macro pub struct MetasrvBuilder { @@ -195,11 +198,18 @@ impl MetasrvBuilder { let meta_peer_client = meta_peer_client .unwrap_or_else(|| build_default_meta_peer_client(&election, &in_memory)); + let peer_lookup_service = Arc::new(MetaPeerLookupService::new(meta_peer_client.clone())); + let event_inserter = Box::new(InsertForwarder::new( + peer_lookup_service.clone(), + Some(InsertOptions { + ttl: options.event_recorder.ttl, + append_mode: true, + }), + )); // Builds the event recorder to record important events and persist them as the system table. let event_recorder = Arc::new(EventRecorderImpl::new(Box::new(EventHandlerImpl::new( - meta_peer_client.clone(), - options.event_recorder.ttl, + event_inserter, )))); let selector = selector.unwrap_or_else(|| Arc::new(LeaseBasedSelector::default())); @@ -294,7 +304,6 @@ impl MetasrvBuilder { server_addr: options.grpc.server_addr.clone(), }, )); - let peer_lookup_service = Arc::new(MetaPeerLookupService::new(meta_peer_client.clone())); if !is_remote_wal && options.enable_region_failover { ensure!( @@ -451,6 +460,23 @@ impl MetasrvBuilder { .as_ref() .and_then(|plugins| plugins.get::()); + let persist_region_stats_handler = if !options.stats_persistence.ttl.is_zero() { + let inserter = Box::new(InsertForwarder::new( + peer_lookup_service.clone(), + Some(InsertOptions { + ttl: options.stats_persistence.ttl, + append_mode: true, + }), + )); + + Some(PersistStatsHandler::new( + inserter, + options.stats_persistence.interval, + )) + } else { + None + }; + let handler_group_builder = match handler_group_builder { Some(handler_group_builder) => handler_group_builder, None => { @@ -467,6 +493,7 @@ impl MetasrvBuilder { .with_region_lease_handler(Some(region_lease_handler)) .with_flush_stats_factor(Some(options.flush_stats_factor)) .with_flow_state_handler(Some(flow_state_handler)) + .with_persist_stats_handler(persist_region_stats_handler) .add_default_handlers() } }; diff --git a/src/meta-srv/src/selector/weight_compute.rs b/src/meta-srv/src/selector/weight_compute.rs index ab16c5f27c..cc15f75562 100644 --- a/src/meta-srv/src/selector/weight_compute.rs +++ b/src/meta-srv/src/selector/weight_compute.rs @@ -198,6 +198,7 @@ mod tests { }, data_topic_latest_entry_id: 0, metadata_topic_latest_entry_id: 0, + write_bytes: 0, }], ..Default::default() } @@ -226,6 +227,7 @@ mod tests { }, data_topic_latest_entry_id: 0, metadata_topic_latest_entry_id: 0, + write_bytes: 0, }], ..Default::default() } @@ -254,6 +256,7 @@ mod tests { }, data_topic_latest_entry_id: 0, metadata_topic_latest_entry_id: 0, + write_bytes: 0, }], ..Default::default() } diff --git a/src/meta-srv/src/utils.rs b/src/meta-srv/src/utils.rs index 5c3bfd6a48..1641f4eb9a 100644 --- a/src/meta-srv/src/utils.rs +++ b/src/meta-srv/src/utils.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod insert_forwarder; + #[macro_export] macro_rules! define_ticker { ( diff --git a/src/meta-srv/src/utils/insert_forwarder.rs b/src/meta-srv/src/utils/insert_forwarder.rs new file mode 100644 index 0000000000..4c8ff5222d --- /dev/null +++ b/src/meta-srv/src/utils/insert_forwarder.rs @@ -0,0 +1,123 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use api::v1::RowInsertRequests; +use client::error::{ExternalSnafu, Result as ClientResult}; +use client::inserter::{Context, InsertOptions, Inserter}; +use client::{Client, Database}; +use common_error::ext::BoxedError; +use common_meta::peer::PeerLookupServiceRef; +use common_telemetry::{debug, warn}; +use snafu::{ensure, ResultExt}; +use tokio::sync::RwLock; + +use crate::error::{LookupFrontendsSnafu, NoAvailableFrontendSnafu}; + +pub type InsertForwarderRef = Arc; + +/// [`InsertForwarder`] is the inserter for the metasrv. +/// It forwards insert requests to available frontend instances. +pub struct InsertForwarder { + peer_lookup_service: PeerLookupServiceRef, + client: RwLock>, + options: Option, +} + +impl InsertForwarder { + /// Creates a new InsertForwarder with the given peer lookup service. + pub fn new(peer_lookup_service: PeerLookupServiceRef, options: Option) -> Self { + Self { + peer_lookup_service, + client: RwLock::new(None), + options, + } + } + + /// Builds a new client. + async fn build_client(&self) -> crate::error::Result { + let frontends = self + .peer_lookup_service + .active_frontends() + .await + .context(LookupFrontendsSnafu)?; + + ensure!(!frontends.is_empty(), NoAvailableFrontendSnafu); + + let urls = frontends + .into_iter() + .map(|peer| peer.addr) + .collect::>(); + + debug!("Available frontend addresses: {:?}", urls); + + Ok(Client::with_urls(urls)) + } + + /// Initializes the client if it hasn't been initialized yet, or returns the cached client. + async fn maybe_init_client(&self) -> Result { + let mut guard = self.client.write().await; + if guard.is_none() { + let client = self.build_client().await.map_err(BoxedError::new)?; + *guard = Some(client); + } + + // Safety: checked above that the client is Some. + Ok(guard.as_ref().unwrap().clone()) + } + + /// Resets the cached client, forcing a rebuild on the next use. + async fn reset_client(&self) { + warn!("Resetting the client"); + let mut guard = self.client.write().await; + guard.take(); + } +} + +#[async_trait::async_trait] +impl Inserter for InsertForwarder { + async fn insert_rows( + &self, + context: &Context<'_>, + requests: RowInsertRequests, + ) -> ClientResult<()> { + let client = self.maybe_init_client().await.context(ExternalSnafu)?; + let database = Database::new(context.catalog, context.schema, client); + let hints = self.options.as_ref().map_or(vec![], |o| o.to_hints()); + + if let Err(e) = database + .row_inserts_with_hints( + requests, + &hints + .iter() + .map(|(k, v)| (*k, v.as_str())) + .collect::>(), + ) + .await + .map_err(BoxedError::new) + .context(ExternalSnafu) + { + // Resets the client so it will be rebuilt next time. + self.reset_client().await; + return Err(e); + }; + + Ok(()) + } + + fn set_options(&mut self, options: &InsertOptions) { + self.options = Some(*options); + } +} diff --git a/src/metric-engine/src/utils.rs b/src/metric-engine/src/utils.rs index 4744d0b49d..446e41d0e7 100644 --- a/src/metric-engine/src/utils.rs +++ b/src/metric-engine/src/utils.rs @@ -62,6 +62,7 @@ pub fn get_region_statistic(mito: &MitoEngine, region_id: RegionId) -> Option 0); + assert!(region.write_bytes.load(Ordering::Relaxed) > 0); } #[apply(multiple_log_store_factories)] @@ -168,7 +164,7 @@ async fn test_region_replay(factory: Option) { // The replay won't update the write bytes rate meter. let region = engine.get_region(region_id).unwrap(); - assert_eq!(region.write_bytes_per_sec.get_total(), 0); + assert_eq!(region.write_bytes.load(Ordering::Relaxed), 0); let request = ScanRequest::default(); let stream = engine.scan_to_stream(region_id, request).await.unwrap(); diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index cc7dc514c2..aa3d91ff4b 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -1041,14 +1041,6 @@ pub enum Error { #[snafu(implicit)] location: Location, }, - - #[snafu(display("Failed to start repeated task: {}", name))] - StartRepeatedTask { - name: String, - #[snafu(implicit)] - location: Location, - source: common_runtime::error::Error, - }, } pub type Result = std::result::Result; @@ -1211,8 +1203,6 @@ impl ErrorExt for Error { InconsistentTimestampLength { .. } => StatusCode::InvalidArguments, TooManyFilesToRead { .. } => StatusCode::RateLimited, - - StartRepeatedTask { .. } => StatusCode::Internal, } } diff --git a/src/mito2/src/lib.rs b/src/mito2/src/lib.rs index eba689845f..ad4045c86e 100644 --- a/src/mito2/src/lib.rs +++ b/src/mito2/src/lib.rs @@ -38,7 +38,6 @@ pub mod extension; pub mod flush; pub mod manifest; pub mod memtable; -pub mod meter; mod metrics; pub mod read; pub mod region; diff --git a/src/mito2/src/meter.rs b/src/mito2/src/meter.rs deleted file mode 100644 index 7bad3a2962..0000000000 --- a/src/mito2/src/meter.rs +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -pub mod rate_meter; diff --git a/src/mito2/src/meter/rate_meter.rs b/src/mito2/src/meter/rate_meter.rs deleted file mode 100644 index 87526627bd..0000000000 --- a/src/mito2/src/meter/rate_meter.rs +++ /dev/null @@ -1,163 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::atomic::{AtomicU64, Ordering}; -use std::sync::Arc; -use std::time::Duration; - -/// `RateMeter` tracks a cumulative value and computes the rate per interval. -#[derive(Default, Debug, Clone)] -pub struct RateMeter { - inner: Arc, -} - -#[derive(Default, Debug)] -struct RateMeterInner { - /// Accumulated value since last rate calculation. - value: AtomicU64, - - /// The last computed rate (per second). - last_rate: AtomicU64, - - /// Optional: total accumulated value, never reset. - total: AtomicU64, -} - -impl RateMeter { - /// Creates a new `RateMeter` with an initial value. - pub fn new() -> Self { - Self { - inner: Arc::new(RateMeterInner::default()), - } - } - - /// Increments the accumulated value by `v`. - pub fn inc_by(&self, v: u64) { - self.inner.value.fetch_add(v, Ordering::Relaxed); - self.inner.total.fetch_add(v, Ordering::Relaxed); - } - - /// Returns the current accumulated value since last rate calculation. - pub fn get_value(&self) -> u64 { - self.inner.value.load(Ordering::Relaxed) - } - - /// Returns the total accumulated value since creation. - pub fn get_total(&self) -> u64 { - self.inner.total.load(Ordering::Relaxed) - } - - /// Returns the last computed rate (per second). - pub fn get_rate(&self) -> u64 { - self.inner.last_rate.load(Ordering::Relaxed) - } - - /// Updates the current rate based on the accumulated value over the given interval. - /// - /// `interval` should represent the duration since the last `update_rate` call. - /// This method resets the internal accumulated counter (`value`) to 0. - pub fn update_rate(&self, interval: Duration) { - let current_value = self.inner.value.swap(0, Ordering::Relaxed); - - let interval_secs = interval.as_secs(); - if interval_secs > 0 { - let rate = current_value / interval_secs; - self.inner.last_rate.store(rate, Ordering::Relaxed); - } - } - - /// Resets the meter: clears both current value and last rate. - /// Total accumulated value remains unchanged. - pub fn reset(&self) { - self.inner.value.store(0, Ordering::Relaxed); - self.inner.last_rate.store(0, Ordering::Relaxed); - } -} - -#[cfg(test)] -mod tests { - use std::time::Duration; - - use super::*; - - #[test] - fn test_inc_and_get_value_and_total() { - let meter = RateMeter::new(); - assert_eq!(meter.get_value(), 0); - assert_eq!(meter.get_total(), 0); - - meter.inc_by(10); - assert_eq!(meter.get_value(), 10); - assert_eq!(meter.get_total(), 10); - - meter.inc_by(5); - assert_eq!(meter.get_value(), 15); - assert_eq!(meter.get_total(), 15); - } - - #[test] - fn test_update_rate_and_get_rate() { - let meter = RateMeter::new(); - meter.inc_by(100); - meter.update_rate(Duration::from_secs(2)); - assert_eq!(meter.get_rate(), 50); - // After update, value should be reset - assert_eq!(meter.get_value(), 0); - - // If interval is zero, rate should not be updated - meter.inc_by(30); - meter.update_rate(Duration::from_secs(0)); - // Should still be 50 - assert_eq!(meter.get_rate(), 50); - } - - #[test] - fn test_reset() { - let meter = RateMeter::new(); - meter.inc_by(100); - meter.update_rate(Duration::from_secs(1)); - assert_eq!(meter.get_rate(), 100); - meter.reset(); - assert_eq!(meter.get_value(), 0); - assert_eq!(meter.get_rate(), 0); - // Total should remain unchanged - assert_eq!(meter.get_total(), 100); - } - - #[test] - fn test_total_accumulates() { - let meter = RateMeter::new(); - meter.inc_by(10); - meter.update_rate(Duration::from_secs(1)); - meter.inc_by(20); - meter.update_rate(Duration::from_secs(2)); - assert_eq!(meter.get_total(), 30); - assert_eq!(meter.get_rate(), 10); - } - - #[test] - fn test_clone_and_shared_state() { - let meter = RateMeter::new(); - let meter2 = meter.clone(); - meter.inc_by(10); - meter2.inc_by(5); - assert_eq!(meter.get_value(), 15); - assert_eq!(meter2.get_value(), 15); - assert_eq!(meter.get_total(), 15); - assert_eq!(meter2.get_total(), 15); - - meter.update_rate(Duration::from_secs(1)); - assert_eq!(meter2.get_rate(), 15); - } -} diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index 385253a922..9dedd4af06 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -43,7 +43,6 @@ use crate::error::{ use crate::manifest::action::{RegionManifest, RegionMetaAction, RegionMetaActionList}; use crate::manifest::manager::RegionManifestManager; use crate::memtable::MemtableBuilderRef; -use crate::meter::rate_meter::RateMeter; use crate::region::version::{VersionControlRef, VersionRef}; use crate::request::{OnFailure, OptionOutputTx}; use crate::sst::file_purger::FilePurgerRef; @@ -132,9 +131,8 @@ pub struct MitoRegion { /// There are no WAL entries in range [flushed_entry_id, topic_latest_entry_id] for current region, /// which means these WAL entries maybe able to be pruned up to `topic_latest_entry_id`. pub(crate) topic_latest_entry_id: AtomicU64, - /// `write_bytes_per_sec` tracks the memtable write throughput in bytes per second. - /// [RateUpdater](crate::worker::RateUpdater) will update the rate periodically. - pub(crate) write_bytes_per_sec: RateMeter, + /// The total bytes written to the region. + pub(crate) write_bytes: Arc, /// Memtable builder for the region. pub(crate) memtable_builder: MemtableBuilderRef, /// manifest stats @@ -453,6 +451,7 @@ impl MitoRegion { let manifest_version = self.stats.manifest_version(); let topic_latest_entry_id = self.topic_latest_entry_id.load(Ordering::Relaxed); + let write_bytes = self.write_bytes.load(Ordering::Relaxed); RegionStatistic { num_rows, @@ -468,6 +467,7 @@ impl MitoRegion { }, data_topic_latest_entry_id: topic_latest_entry_id, metadata_topic_latest_entry_id: topic_latest_entry_id, + write_bytes, } } @@ -771,14 +771,6 @@ impl RegionMap { regions.get(®ion_id).cloned() } - /// Iterates over all regions. - pub(crate) fn for_each_region(&self, f: impl Fn(&MitoRegionRef)) { - let regions = self.regions.read().unwrap(); - for (_, region) in regions.iter() { - f(region); - } - } - /// Gets writable region by region id. /// /// Returns error if the region does not exist or is readonly. @@ -996,6 +988,7 @@ impl ManifestStats { #[cfg(test)] mod tests { + use std::sync::atomic::AtomicU64; use std::sync::Arc; use common_datasource::compression::CompressionType; @@ -1010,7 +1003,6 @@ mod tests { use crate::access_layer::AccessLayer; use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; - use crate::meter::rate_meter::RateMeter; use crate::region::{ ManifestContext, ManifestStats, MitoRegion, RegionLeaderState, RegionRoleState, }; @@ -1180,7 +1172,7 @@ mod tests { last_compaction_millis: Default::default(), time_provider: Arc::new(StdTimeProvider), topic_latest_entry_id: Default::default(), - write_bytes_per_sec: RateMeter::default(), + write_bytes: Arc::new(AtomicU64::new(0)), memtable_builder: Arc::new(EmptyMemtableBuilder::default()), stats: ManifestStats::default(), }; diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 54534f3e41..9b4c790c06 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -52,7 +52,6 @@ use crate::manifest::storage::manifest_compress_type; use crate::memtable::bulk::part::BulkPart; use crate::memtable::time_partition::TimePartitions; use crate::memtable::MemtableBuilderProvider; -use crate::meter::rate_meter::RateMeter; use crate::region::options::RegionOptions; use crate::region::version::{VersionBuilder, VersionControl, VersionControlRef}; use crate::region::{ @@ -288,7 +287,7 @@ impl RegionOpener { time_provider: self.time_provider.clone(), topic_latest_entry_id: AtomicU64::new(0), memtable_builder, - write_bytes_per_sec: RateMeter::default(), + write_bytes: Arc::new(AtomicU64::new(0)), stats: self.stats, }) } @@ -472,7 +471,7 @@ impl RegionOpener { last_compaction_millis: AtomicI64::new(now), time_provider: self.time_provider.clone(), topic_latest_entry_id: AtomicU64::new(0), - write_bytes_per_sec: RateMeter::default(), + write_bytes: Arc::new(AtomicU64::new(0)), memtable_builder, stats: self.stats.clone(), }; diff --git a/src/mito2/src/region_write_ctx.rs b/src/mito2/src/region_write_ctx.rs index 06a9cccdc1..dcbc851190 100644 --- a/src/mito2/src/region_write_ctx.rs +++ b/src/mito2/src/region_write_ctx.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::mem; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use api::v1::{BulkWalEntry, Mutation, OpType, Rows, WalEntry, WriteHint}; @@ -25,7 +26,6 @@ use store_api::storage::{RegionId, SequenceNumber}; use crate::error::{Error, Result, WriteGroupSnafu}; use crate::memtable::bulk::part::BulkPart; use crate::memtable::KeyValues; -use crate::meter::rate_meter::RateMeter; use crate::metrics; use crate::region::version::{VersionControlData, VersionControlRef, VersionRef}; use crate::request::OptionOutputTx; @@ -108,7 +108,7 @@ pub(crate) struct RegionWriteCtx { /// Rows to delete. pub(crate) delete_num: usize, /// Write bytes per second. - pub(crate) write_bytes_per_sec: Option, + pub(crate) write_bytes: Option>, } impl RegionWriteCtx { @@ -117,7 +117,7 @@ impl RegionWriteCtx { region_id: RegionId, version_control: &VersionControlRef, provider: Provider, - write_bytes_per_sec: Option, + write_bytes: Option>, ) -> RegionWriteCtx { let VersionControlData { version, @@ -140,7 +140,7 @@ impl RegionWriteCtx { put_num: 0, delete_num: 0, bulk_parts: vec![], - write_bytes_per_sec, + write_bytes, } } @@ -219,7 +219,7 @@ impl RegionWriteCtx { } let mutable_memtable = self.version.memtables.mutable.clone(); - let prev_memory_usage = if self.write_bytes_per_sec.is_some() { + let prev_memory_usage = if self.write_bytes.is_some() { Some(mutable_memtable.memory_usage()) } else { None @@ -257,11 +257,11 @@ impl RegionWriteCtx { } } - if let Some(meter) = &self.write_bytes_per_sec { + if let Some(write_bytes) = &self.write_bytes { let new_memory_usage = mutable_memtable.memory_usage(); let written_bytes = new_memory_usage.saturating_sub(prev_memory_usage.unwrap_or_default()); - meter.inc_by(written_bytes as u64); + write_bytes.fetch_add(written_bytes as u64, Ordering::Relaxed); } // Updates region sequence and entry id. Since we stores last sequence and entry id in region, we need // to decrease `next_sequence` and `next_entry_id` by 1. @@ -298,7 +298,7 @@ impl RegionWriteCtx { .start_timer(); let mutable_memtable = &self.version.memtables.mutable; - let prev_memory_usage = if self.write_bytes_per_sec.is_some() { + let prev_memory_usage = if self.write_bytes.is_some() { Some(mutable_memtable.memory_usage()) } else { None @@ -333,11 +333,11 @@ impl RegionWriteCtx { } } - if let Some(meter) = &self.write_bytes_per_sec { + if let Some(write_bytes) = &self.write_bytes { let new_memory_usage = mutable_memtable.memory_usage(); let written_bytes = new_memory_usage.saturating_sub(prev_memory_usage.unwrap_or_default()); - meter.inc_by(written_bytes as u64); + write_bytes.fetch_add(written_bytes as u64, Ordering::Relaxed); } self.version_control .set_sequence_and_entry_id(self.next_sequence - 1, self.next_entry_id - 1); diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 69d1caaffa..4df4832c8f 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -36,7 +36,7 @@ use std::time::Duration; use common_base::Plugins; use common_error::ext::BoxedError; use common_meta::key::SchemaMetadataManagerRef; -use common_runtime::{JoinHandle, RepeatedTask, TaskFunction}; +use common_runtime::JoinHandle; use common_telemetry::{error, info, warn}; use futures::future::try_join_all; use object_store::manager::ObjectStoreManagerRef; @@ -55,9 +55,7 @@ use crate::cache::write_cache::{WriteCache, WriteCacheRef}; use crate::cache::{CacheManager, CacheManagerRef}; use crate::compaction::CompactionScheduler; use crate::config::MitoConfig; -use crate::error::{ - self, CreateDirSnafu, Error, JoinSnafu, Result, StartRepeatedTaskSnafu, WorkerStoppedSnafu, -}; +use crate::error::{self, CreateDirSnafu, JoinSnafu, Result, WorkerStoppedSnafu}; use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef}; use crate::memtable::MemtableBuilderProvider; use crate::metrics::{REGION_COUNT, REQUEST_WAIT_TIME, WRITE_STALLING}; @@ -83,8 +81,6 @@ pub(crate) const DROPPING_MARKER_FILE: &str = ".dropping"; pub(crate) const CHECK_REGION_INTERVAL: Duration = Duration::from_secs(60); /// Max delay to check region periodical tasks. pub(crate) const MAX_INITIAL_CHECK_DELAY_SECS: u64 = 60 * 3; -/// Interval to update the rate meter for regions. -const RATE_UPDATE_INTERVAL: Duration = Duration::from_secs(30); #[cfg_attr(doc, aquamarine::aquamarine)] /// A fixed size group of [RegionWorkers](RegionWorker). @@ -483,26 +479,12 @@ impl WorkerStarter { worker_thread.run().await; }); - let rate_update_task = RepeatedTask::new( - RATE_UPDATE_INTERVAL, - Box::new(RateUpdater { - regions: regions.clone(), - interval: RATE_UPDATE_INTERVAL, - }), - ); - rate_update_task - .start(common_runtime::global_runtime()) - .context(StartRepeatedTaskSnafu { - name: RateUpdater::NAME, - })?; - Ok(RegionWorker { id: self.id, regions, opening_regions, sender, handle: Mutex::new(Some(handle)), - rate_update_task, running, }) } @@ -520,8 +502,6 @@ pub(crate) struct RegionWorker { sender: Sender, /// Handle to the worker thread. handle: Mutex>>, - /// rate update task. - rate_update_task: RepeatedTask, /// Whether to run the worker thread. running: Arc, } @@ -564,9 +544,6 @@ impl RegionWorker { handle.await.context(JoinSnafu)?; } - if let Err(err) = self.rate_update_task.stop().await { - error!(err; "Failed to stop rate update task"); - } Ok(()) } @@ -686,29 +663,6 @@ impl StalledRequests { } } -struct RateUpdater { - regions: RegionMapRef, - interval: Duration, -} - -impl RateUpdater { - const NAME: &str = "RateUpdater"; -} - -#[async_trait::async_trait] -impl TaskFunction for RateUpdater { - fn name(&self) -> &str { - Self::NAME - } - - async fn call(&mut self) -> Result<()> { - self.regions.for_each_region(|region| { - region.write_bytes_per_sec.update_rate(self.interval); - }); - Ok(()) - } -} - /// Background worker loop to handle requests. struct RegionWorkerLoop { /// Id of the worker. diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs index 32182243b3..b3e2937df5 100644 --- a/src/mito2/src/worker/handle_write.rs +++ b/src/mito2/src/worker/handle_write.rs @@ -248,7 +248,7 @@ impl RegionWorkerLoop { region.region_id, ®ion.version_control, region.provider.clone(), - Some(region.write_bytes_per_sec.clone()), + Some(region.write_bytes.clone()), ); e.insert(region_ctx); @@ -352,7 +352,7 @@ impl RegionWorkerLoop { region.region_id, ®ion.version_control, region.provider.clone(), - Some(region.write_bytes_per_sec.clone()), + Some(region.write_bytes.clone()), ); e.insert(region_ctx); diff --git a/src/operator/Cargo.toml b/src/operator/Cargo.toml index c4c43956d0..d883c15689 100644 --- a/src/operator/Cargo.toml +++ b/src/operator/Cargo.toml @@ -45,6 +45,7 @@ datatypes.workspace = true file-engine.workspace = true futures.workspace = true futures-util.workspace = true +humantime.workspace = true jsonb.workspace = true lazy_static.workspace = true meta-client.workspace = true diff --git a/src/operator/src/statement.rs b/src/operator/src/statement.rs index 8fb98daa80..f20047f8ad 100644 --- a/src/operator/src/statement.rs +++ b/src/operator/src/statement.rs @@ -29,9 +29,12 @@ mod tql; use std::collections::HashMap; use std::sync::Arc; +use api::v1::RowInsertRequests; use catalog::kvbackend::KvBackendCatalogManager; use catalog::process_manager::ProcessManagerRef; use catalog::CatalogManagerRef; +use client::error::{ExternalSnafu as ClientExternalSnafu, Result as ClientResult}; +use client::inserter::{InsertOptions, Inserter}; use client::RecordBatches; use common_error::ext::BoxedError; use common_meta::cache::TableRouteCacheRef; @@ -48,10 +51,11 @@ use common_time::range::TimestampRange; use common_time::Timestamp; use datafusion_expr::LogicalPlan; use datatypes::prelude::ConcreteDataType; +use humantime::format_duration; use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef}; use query::parser::QueryStatement; use query::QueryEngineRef; -use session::context::{Channel, QueryContextRef}; +use session::context::{Channel, QueryContextBuilder, QueryContextRef}; use session::table_name::table_idents_to_full_name; use set::{set_query_timeout, set_read_preference}; use snafu::{ensure, OptionExt, ResultExt}; @@ -65,6 +69,7 @@ use sql::statements::statement::Statement; use sql::statements::OptionMap; use sql::util::format_raw_object_name; use sqlparser::ast::ObjectName; +use store_api::mito_engine_options::{APPEND_MODE_KEY, TTL_KEY}; use table::requests::{CopyDatabaseRequest, CopyDirection, CopyQueryToRequest, CopyTableRequest}; use table::table_name::TableName; use table::table_reference::TableReference; @@ -614,6 +619,11 @@ impl StatementExecutor { } Ok(time_ranges) } + + /// Returns the inserter for the statement executor. + pub(crate) fn inserter(&self) -> &InserterRef { + &self.inserter + } } fn to_copy_query_request(stmt: CopyQueryToArgument) -> Result { @@ -749,6 +759,61 @@ fn idents_to_full_database_name( } } +/// The [`Inserter`] implementation for the statement executor. +pub struct InserterImpl { + statement_executor: StatementExecutorRef, + options: Option, +} + +impl InserterImpl { + pub fn new(statement_executor: StatementExecutorRef, options: Option) -> Self { + Self { + statement_executor, + options, + } + } +} + +#[async_trait::async_trait] +impl Inserter for InserterImpl { + async fn insert_rows( + &self, + context: &client::inserter::Context<'_>, + requests: RowInsertRequests, + ) -> ClientResult<()> { + let mut ctx_builder = QueryContextBuilder::default() + .current_catalog(context.catalog.to_string()) + .current_schema(context.schema.to_string()); + if let Some(options) = self.options.as_ref() { + ctx_builder = ctx_builder + .set_extension( + TTL_KEY.to_string(), + format_duration(options.ttl).to_string(), + ) + .set_extension(APPEND_MODE_KEY.to_string(), options.append_mode.to_string()); + } + let query_ctx = ctx_builder.build().into(); + + self.statement_executor + .inserter() + .handle_row_inserts( + requests, + query_ctx, + self.statement_executor.as_ref(), + false, + false, + ) + .await + .map_err(BoxedError::new) + .context(ClientExternalSnafu) + .map(|_| ()) + } + + fn set_options(&mut self, options: &InsertOptions) { + self.options = Some(*options); + } +} + #[cfg(test)] mod tests { use std::assert_matches::assert_matches; diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index e3426bca74..7cf37dbc2d 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -479,6 +479,9 @@ pub struct RegionStatistic { /// The details of the region. #[serde(default)] pub manifest: RegionManifestInfo, + /// The write throughput of the region. + #[serde(default)] + pub write_bytes: u64, /// The latest entry id of the region's remote WAL since last flush. /// For metric engine, there're two latest entry ids, one for data and one for metadata. /// TODO(weny): remove this two fields and use single instead. diff --git a/tests/conf/metasrv-test.toml.template b/tests/conf/metasrv-test.toml.template index f4bbbf3ae9..910601d6ff 100644 --- a/tests/conf/metasrv-test.toml.template +++ b/tests/conf/metasrv-test.toml.template @@ -21,3 +21,6 @@ topic_name_prefix = "distributed_test_greptimedb_wal_topic" auto_prune_interval = "30s" trigger_flush_threshold = 100 {{ endif }} + +[stats_persistence] +ttl = "0s"