feat: meter write request (#1447)

* chore: add write meter

* chore: update meter macro

* chore: update meter framework url to https
This commit is contained in:
shuiyisong
2023-04-24 11:42:06 +08:00
committed by GitHub
parent e64fea3a15
commit 7c6754d03e
10 changed files with 105 additions and 31 deletions

20
Cargo.lock generated
View File

@@ -3030,6 +3030,8 @@ dependencies = [
"itertools",
"meta-client",
"meta-srv",
"meter-core",
"meter-macros",
"mito",
"moka",
"object-store",
@@ -4826,6 +4828,24 @@ dependencies = [
"url",
]
[[package]]
name = "meter-core"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-meter.git?rev=f0798c4c648d89f51abe63e870919c75dd463199#f0798c4c648d89f51abe63e870919c75dd463199"
dependencies = [
"anymap",
"once_cell",
"parking_lot",
]
[[package]]
name = "meter-macros"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-meter.git?rev=f0798c4c648d89f51abe63e870919c75dd463199#f0798c4c648d89f51abe63e870919c75dd463199"
dependencies = [
"meter-core",
]
[[package]]
name = "metrics"
version = "0.20.1"

View File

@@ -84,6 +84,11 @@ tokio-util = { version = "0.7", features = ["io-util"] }
tonic = { version = "0.9", features = ["tls"] }
uuid = { version = "1", features = ["serde", "v4", "fast-rng"] }
metrics = "0.20"
meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "f0798c4c648d89f51abe63e870919c75dd463199" }
[workspace.dependencies.meter-macros]
git = "https://github.com/GreptimeTeam/greptime-meter.git"
rev = "f0798c4c648d89f51abe63e870919c75dd463199"
[profile.release]
debug = true

View File

@@ -36,6 +36,8 @@ futures = "0.3"
futures-util.workspace = true
itertools = "0.10"
meta-client = { path = "../meta-client" }
meter-core.workspace = true
meter-macros.workspace = true
mito = { path = "../mito", features = ["test"] }
moka = { version = "0.9", features = ["future"] }
object-store = { path = "../object-store" }

View File

@@ -75,7 +75,7 @@ use crate::error::{
use crate::expr_factory::{CreateExprFactoryRef, DefaultCreateExprFactory};
use crate::frontend::FrontendOptions;
use crate::instance::standalone::StandaloneGrpcQueryHandler;
use crate::metric;
use crate::metrics;
use crate::script::ScriptExecutor;
use crate::server::{start_server, ServerHandlers, Services};
use crate::statement::StatementExecutor;
@@ -451,7 +451,7 @@ impl SqlQueryHandler for Instance {
type Error = Error;
async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>> {
let _timer = timer!(metric::METRIC_HANDLE_SQL_ELAPSED);
let _timer = timer!(metrics::METRIC_HANDLE_SQL_ELAPSED);
let query_interceptor = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
let query = match query_interceptor.pre_parsing(query, query_ctx.clone()) {

View File

@@ -20,7 +20,7 @@ use common_telemetry::timer;
use servers::query_handler::ScriptHandler;
use crate::instance::Instance;
use crate::metric;
use crate::metrics;
#[async_trait]
impl ScriptHandler for Instance {
@@ -30,7 +30,7 @@ impl ScriptHandler for Instance {
name: &str,
script: &str,
) -> servers::error::Result<()> {
let _timer = timer!(metric::METRIC_HANDLE_SCRIPTS_ELAPSED);
let _timer = timer!(metrics::METRIC_HANDLE_SCRIPTS_ELAPSED);
self.script_executor
.insert_script(schema, name, script)
.await
@@ -42,7 +42,7 @@ impl ScriptHandler for Instance {
name: &str,
params: HashMap<String, String>,
) -> servers::error::Result<Output> {
let _timer = timer!(metric::METRIC_RUN_SCRIPT_ELAPSED);
let _timer = timer!(metrics::METRIC_RUN_SCRIPT_ELAPSED);
self.script_executor
.execute_script(schema, name, params)
.await

View File

@@ -23,8 +23,7 @@ pub mod frontend;
pub mod grpc;
pub mod influxdb;
pub mod instance;
pub(crate) mod metric;
mod metrics;
pub(crate) mod metrics;
pub mod mysql;
pub mod opentsdb;
pub mod postgres;

View File

@@ -1,17 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub(crate) const METRIC_HANDLE_SQL_ELAPSED: &str = "frontend.handle_sql_elapsed";
pub(crate) const METRIC_HANDLE_SCRIPTS_ELAPSED: &str = "frontend.handle_scripts_elapsed";
pub(crate) const METRIC_RUN_SCRIPT_ELAPSED: &str = "frontend.run_script_elapsed";

View File

@@ -12,11 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//! frontend metrics
pub(crate) const METRIC_HANDLE_SQL_ELAPSED: &str = "frontend.handle_sql_elapsed";
pub(crate) const METRIC_HANDLE_SCRIPTS_ELAPSED: &str = "frontend.handle_scripts_elapsed";
pub(crate) const METRIC_RUN_SCRIPT_ELAPSED: &str = "frontend.run_script_elapsed";
/// frontend metrics
/// Metrics for creating table in dist mode.
pub const DIST_CREATE_TABLE: &str = "frontend.dist.create_table";
pub const DIST_CREATE_TABLE_IN_META: &str = "frontend.dist.create_table.update_meta";
pub const DIST_CREATE_TABLE_IN_DATANODE: &str = "frontend.dist.create_table.invoke_datanode";

View File

@@ -41,7 +41,7 @@ use table::error::TableOperationSnafu;
use table::metadata::{FilterPushDownType, TableInfo, TableInfoRef};
use table::requests::{AlterTableRequest, InsertRequest};
use table::table::AlterContext;
use table::Table;
use table::{meter_insert_request, Table};
use tokio::sync::RwLock;
use crate::datanode::DatanodeClients;
@@ -75,6 +75,8 @@ impl Table for DistTable {
}
async fn insert(&self, request: InsertRequest) -> table::Result<usize> {
meter_insert_request!(request);
let splits = self
.partition_manager
.split_insert_request(&self.table_name, request)
@@ -376,9 +378,10 @@ impl PartitionExec {
#[cfg(test)]
mod test {
use std::collections::HashMap;
use std::sync::atomic::{AtomicU32, Ordering};
use api::v1::column::SemanticType;
use api::v1::{column, Column, ColumnDataType, InsertRequest};
use api::v1::{column, Column, ColumnDataType, InsertRequest as GrpcInsertRequest};
use catalog::error::Result;
use catalog::remote::{KvBackend, ValueIter};
use common_query::physical_plan::DfPhysicalPlanAdapter;
@@ -399,6 +402,10 @@ mod test {
use meta_client::client::MetaClient;
use meta_client::rpc::router::RegionRoute;
use meta_client::rpc::{Region, Table, TableRoute};
use meter_core::collect::Collect;
use meter_core::data::{ReadRecord, WriteRecord};
use meter_core::global::global_registry;
use meter_core::write_calc::WriteCalculator;
use partition::columns::RangeColumnsPartitionRule;
use partition::manager::PartitionRuleManager;
use partition::partition::{PartitionBound, PartitionDef};
@@ -410,7 +417,7 @@ mod test {
use sql::statements::statement::Statement;
use store_api::storage::RegionNumber;
use table::metadata::{TableInfoBuilder, TableMetaBuilder};
use table::TableRef;
use table::{meter_insert_request, TableRef};
use super::*;
use crate::expr_factory;
@@ -925,7 +932,7 @@ mod test {
..Default::default()
},
];
let request = InsertRequest {
let request = GrpcInsertRequest {
table_name: table_name.table_name.clone(),
columns,
row_count,
@@ -1057,4 +1064,48 @@ mod test {
partition::error::Error::FindRegions { .. }
));
}
#[derive(Default)]
struct MockCollector {
pub write_sum: AtomicU32,
}
impl Collect for MockCollector {
fn on_write(&self, record: WriteRecord) {
self.write_sum
.fetch_add(record.byte_count, Ordering::Relaxed);
}
fn on_read(&self, _record: ReadRecord) {
todo!()
}
}
struct MockCalculator;
impl WriteCalculator<InsertRequest> for MockCalculator {
fn calc_byte(&self, _value: &InsertRequest) -> u32 {
1024 * 10
}
}
#[test]
#[ignore]
fn test_meter_insert_request() {
let collector = Arc::new(MockCollector::default());
global_registry().set_collector(collector.clone());
global_registry().register_calculator(Arc::new(MockCalculator));
let req = InsertRequest {
catalog_name: "greptime".to_string(),
schema_name: "public".to_string(),
table_name: "numbers".to_string(),
columns_values: Default::default(),
region_number: 0,
};
meter_insert_request!(req);
let re = collector.write_sum.load(Ordering::Relaxed);
assert_eq!(re, 1024 * 10);
}
}

View File

@@ -275,6 +275,19 @@ pub struct FlushTableRequest {
pub wait: Option<bool>,
}
#[macro_export]
macro_rules! meter_insert_request {
($req: expr) => {
meter_macros::write_meter!(
$req.catalog_name.to_string(),
$req.schema_name.to_string(),
$req.table_name.to_string(),
$req.region_number,
$req
);
};
}
#[cfg(test)]
mod tests {
use super::*;