diff --git a/Cargo.lock b/Cargo.lock index 85cbbb10ff..c79900ea15 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3030,6 +3030,8 @@ dependencies = [ "itertools", "meta-client", "meta-srv", + "meter-core", + "meter-macros", "mito", "moka", "object-store", @@ -4826,6 +4828,24 @@ dependencies = [ "url", ] +[[package]] +name = "meter-core" +version = "0.1.0" +source = "git+https://github.com/GreptimeTeam/greptime-meter.git?rev=f0798c4c648d89f51abe63e870919c75dd463199#f0798c4c648d89f51abe63e870919c75dd463199" +dependencies = [ + "anymap", + "once_cell", + "parking_lot", +] + +[[package]] +name = "meter-macros" +version = "0.1.0" +source = "git+https://github.com/GreptimeTeam/greptime-meter.git?rev=f0798c4c648d89f51abe63e870919c75dd463199#f0798c4c648d89f51abe63e870919c75dd463199" +dependencies = [ + "meter-core", +] + [[package]] name = "metrics" version = "0.20.1" diff --git a/Cargo.toml b/Cargo.toml index a082d36022..1f73c124ed 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -84,6 +84,11 @@ tokio-util = { version = "0.7", features = ["io-util"] } tonic = { version = "0.9", features = ["tls"] } uuid = { version = "1", features = ["serde", "v4", "fast-rng"] } metrics = "0.20" +meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "f0798c4c648d89f51abe63e870919c75dd463199" } + +[workspace.dependencies.meter-macros] +git = "https://github.com/GreptimeTeam/greptime-meter.git" +rev = "f0798c4c648d89f51abe63e870919c75dd463199" [profile.release] debug = true diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index f288654a17..da3048ee02 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -36,6 +36,8 @@ futures = "0.3" futures-util.workspace = true itertools = "0.10" meta-client = { path = "../meta-client" } +meter-core.workspace = true +meter-macros.workspace = true mito = { path = "../mito", features = ["test"] } moka = { version = "0.9", features = ["future"] } object-store = { path = "../object-store" } diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 391fab4154..27b7363eac 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -75,7 +75,7 @@ use crate::error::{ use crate::expr_factory::{CreateExprFactoryRef, DefaultCreateExprFactory}; use crate::frontend::FrontendOptions; use crate::instance::standalone::StandaloneGrpcQueryHandler; -use crate::metric; +use crate::metrics; use crate::script::ScriptExecutor; use crate::server::{start_server, ServerHandlers, Services}; use crate::statement::StatementExecutor; @@ -451,7 +451,7 @@ impl SqlQueryHandler for Instance { type Error = Error; async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec> { - let _timer = timer!(metric::METRIC_HANDLE_SQL_ELAPSED); + let _timer = timer!(metrics::METRIC_HANDLE_SQL_ELAPSED); let query_interceptor = self.plugins.get::>(); let query = match query_interceptor.pre_parsing(query, query_ctx.clone()) { diff --git a/src/frontend/src/instance/script.rs b/src/frontend/src/instance/script.rs index fc7757a365..d3eb5cb29f 100644 --- a/src/frontend/src/instance/script.rs +++ b/src/frontend/src/instance/script.rs @@ -20,7 +20,7 @@ use common_telemetry::timer; use servers::query_handler::ScriptHandler; use crate::instance::Instance; -use crate::metric; +use crate::metrics; #[async_trait] impl ScriptHandler for Instance { @@ -30,7 +30,7 @@ impl ScriptHandler for Instance { name: &str, script: &str, ) -> servers::error::Result<()> { - let _timer = timer!(metric::METRIC_HANDLE_SCRIPTS_ELAPSED); + let _timer = timer!(metrics::METRIC_HANDLE_SCRIPTS_ELAPSED); self.script_executor .insert_script(schema, name, script) .await @@ -42,7 +42,7 @@ impl ScriptHandler for Instance { name: &str, params: HashMap, ) -> servers::error::Result { - let _timer = timer!(metric::METRIC_RUN_SCRIPT_ELAPSED); + let _timer = timer!(metrics::METRIC_RUN_SCRIPT_ELAPSED); self.script_executor .execute_script(schema, name, params) .await diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index 51bc340a65..371b6eadd1 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -23,8 +23,7 @@ pub mod frontend; pub mod grpc; pub mod influxdb; pub mod instance; -pub(crate) mod metric; -mod metrics; +pub(crate) mod metrics; pub mod mysql; pub mod opentsdb; pub mod postgres; diff --git a/src/frontend/src/metric.rs b/src/frontend/src/metric.rs deleted file mode 100644 index 8c54526cfe..0000000000 --- a/src/frontend/src/metric.rs +++ /dev/null @@ -1,17 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -pub(crate) const METRIC_HANDLE_SQL_ELAPSED: &str = "frontend.handle_sql_elapsed"; -pub(crate) const METRIC_HANDLE_SCRIPTS_ELAPSED: &str = "frontend.handle_scripts_elapsed"; -pub(crate) const METRIC_RUN_SCRIPT_ELAPSED: &str = "frontend.run_script_elapsed"; diff --git a/src/frontend/src/metrics.rs b/src/frontend/src/metrics.rs index 968d9b1b3c..43c694d425 100644 --- a/src/frontend/src/metrics.rs +++ b/src/frontend/src/metrics.rs @@ -12,11 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! frontend metrics +pub(crate) const METRIC_HANDLE_SQL_ELAPSED: &str = "frontend.handle_sql_elapsed"; +pub(crate) const METRIC_HANDLE_SCRIPTS_ELAPSED: &str = "frontend.handle_scripts_elapsed"; +pub(crate) const METRIC_RUN_SCRIPT_ELAPSED: &str = "frontend.run_script_elapsed"; +/// frontend metrics /// Metrics for creating table in dist mode. pub const DIST_CREATE_TABLE: &str = "frontend.dist.create_table"; - pub const DIST_CREATE_TABLE_IN_META: &str = "frontend.dist.create_table.update_meta"; - pub const DIST_CREATE_TABLE_IN_DATANODE: &str = "frontend.dist.create_table.invoke_datanode"; diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index ff59e892d4..2531d382e9 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -41,7 +41,7 @@ use table::error::TableOperationSnafu; use table::metadata::{FilterPushDownType, TableInfo, TableInfoRef}; use table::requests::{AlterTableRequest, InsertRequest}; use table::table::AlterContext; -use table::Table; +use table::{meter_insert_request, Table}; use tokio::sync::RwLock; use crate::datanode::DatanodeClients; @@ -75,6 +75,8 @@ impl Table for DistTable { } async fn insert(&self, request: InsertRequest) -> table::Result { + meter_insert_request!(request); + let splits = self .partition_manager .split_insert_request(&self.table_name, request) @@ -376,9 +378,10 @@ impl PartitionExec { #[cfg(test)] mod test { use std::collections::HashMap; + use std::sync::atomic::{AtomicU32, Ordering}; use api::v1::column::SemanticType; - use api::v1::{column, Column, ColumnDataType, InsertRequest}; + use api::v1::{column, Column, ColumnDataType, InsertRequest as GrpcInsertRequest}; use catalog::error::Result; use catalog::remote::{KvBackend, ValueIter}; use common_query::physical_plan::DfPhysicalPlanAdapter; @@ -399,6 +402,10 @@ mod test { use meta_client::client::MetaClient; use meta_client::rpc::router::RegionRoute; use meta_client::rpc::{Region, Table, TableRoute}; + use meter_core::collect::Collect; + use meter_core::data::{ReadRecord, WriteRecord}; + use meter_core::global::global_registry; + use meter_core::write_calc::WriteCalculator; use partition::columns::RangeColumnsPartitionRule; use partition::manager::PartitionRuleManager; use partition::partition::{PartitionBound, PartitionDef}; @@ -410,7 +417,7 @@ mod test { use sql::statements::statement::Statement; use store_api::storage::RegionNumber; use table::metadata::{TableInfoBuilder, TableMetaBuilder}; - use table::TableRef; + use table::{meter_insert_request, TableRef}; use super::*; use crate::expr_factory; @@ -925,7 +932,7 @@ mod test { ..Default::default() }, ]; - let request = InsertRequest { + let request = GrpcInsertRequest { table_name: table_name.table_name.clone(), columns, row_count, @@ -1057,4 +1064,48 @@ mod test { partition::error::Error::FindRegions { .. } )); } + + #[derive(Default)] + struct MockCollector { + pub write_sum: AtomicU32, + } + + impl Collect for MockCollector { + fn on_write(&self, record: WriteRecord) { + self.write_sum + .fetch_add(record.byte_count, Ordering::Relaxed); + } + + fn on_read(&self, _record: ReadRecord) { + todo!() + } + } + + struct MockCalculator; + + impl WriteCalculator for MockCalculator { + fn calc_byte(&self, _value: &InsertRequest) -> u32 { + 1024 * 10 + } + } + + #[test] + #[ignore] + fn test_meter_insert_request() { + let collector = Arc::new(MockCollector::default()); + global_registry().set_collector(collector.clone()); + global_registry().register_calculator(Arc::new(MockCalculator)); + + let req = InsertRequest { + catalog_name: "greptime".to_string(), + schema_name: "public".to_string(), + table_name: "numbers".to_string(), + columns_values: Default::default(), + region_number: 0, + }; + meter_insert_request!(req); + + let re = collector.write_sum.load(Ordering::Relaxed); + assert_eq!(re, 1024 * 10); + } } diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index 3cebfbe006..d29bd484e3 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -275,6 +275,19 @@ pub struct FlushTableRequest { pub wait: Option, } +#[macro_export] +macro_rules! meter_insert_request { + ($req: expr) => { + meter_macros::write_meter!( + $req.catalog_name.to_string(), + $req.schema_name.to_string(), + $req.table_name.to_string(), + $req.region_number, + $req + ); + }; +} + #[cfg(test)] mod tests { use super::*;