From 8b9671f376f7c6ccd786ce437e0c243cadca6662 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Tue, 14 Mar 2023 15:49:37 +0800 Subject: [PATCH 1/4] feat: manual flush http API --- src/datanode/src/server/grpc.rs | 12 ++++-- src/datanode/src/sql/flush_table.rs | 35 +++++++++++++----- src/frontend/src/server.rs | 2 + src/mito/src/table.rs | 8 ++-- src/servers/src/error.rs | 25 +++++++++++-- src/servers/src/http.rs | 23 ++++++++++-- src/servers/src/http/admin.rs | 57 +++++++++++++++++++++++++++++ src/servers/src/http/handler.rs | 1 + src/table/src/requests.rs | 2 +- src/table/src/table.rs | 6 ++- tests-integration/src/test_util.rs | 7 +++- 11 files changed, 150 insertions(+), 28 deletions(-) create mode 100644 src/servers/src/http/admin.rs diff --git a/src/datanode/src/server/grpc.rs b/src/datanode/src/server/grpc.rs index 33734ceae1..f15f248f5e 100644 --- a/src/datanode/src/server/grpc.rs +++ b/src/datanode/src/server/grpc.rs @@ -84,12 +84,19 @@ impl Instance { } pub(crate) async fn handle_flush_table(&self, expr: FlushTableExpr) -> Result { + let table_name = if expr.table_name.trim().is_empty() { + None + } else { + Some(expr.table_name) + }; + let req = FlushTableRequest { catalog_name: expr.catalog_name, schema_name: expr.schema_name, - table_name: expr.table_name, + table_name, region_number: expr.region_id, }; + println!("req: {:?}\n", req); self.sql_handler() .execute(SqlRequest::FlushTable(req), QueryContext::arc()) .await @@ -148,7 +155,6 @@ mod tests { } #[test] - fn test_create_column_schema() { let column_def = ColumnDef { name: "a".to_string(), @@ -242,7 +248,7 @@ mod tests { ConcreteDataType::timestamp_millisecond_datatype(), false, ) - .with_time_index(true), + .with_time_index(true), ColumnSchema::new("cpu", ConcreteDataType::float32_datatype(), true), ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true), ]; diff --git a/src/datanode/src/sql/flush_table.rs b/src/datanode/src/sql/flush_table.rs index 35ea7361cc..7b12ef36ca 100644 --- a/src/datanode/src/sql/flush_table.rs +++ b/src/datanode/src/sql/flush_table.rs @@ -13,28 +13,43 @@ // limitations under the License. use common_query::Output; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use table::engine::TableReference; use table::requests::FlushTableRequest; -use crate::error::{self, Result}; +use crate::error::{self, CatalogSnafu, DatabaseNotFoundSnafu, Result}; use crate::sql::SqlHandler; impl SqlHandler { pub(crate) async fn flush_table(&self, req: FlushTableRequest) -> Result { + if let Some(table) = &req.table_name { + self.flush_table_inner(&req.catalog_name, &req.schema_name, table, req.region_number).await?; + } else { + let schema = self.catalog_manager.schema(&req.catalog_name, &req.schema_name).context(CatalogSnafu)?.context(DatabaseNotFoundSnafu { + catalog: &req.catalog_name, + schema: &req.schema_name, + })?; + + let all_table_names = schema.table_names().context(CatalogSnafu)?; + futures::future::join_all(all_table_names.iter().map(|table| { + self.flush_table_inner(&req.catalog_name, &req.schema_name,table, req.region_number) + }) + ).await.into_iter().collect::>>()?; + } + Ok(Output::AffectedRows(0)) + } + + async fn flush_table_inner(&self, catalog:&str, schema:&str, table: &str, region: Option) -> Result<()> { let table_ref = TableReference { - catalog: &req.catalog_name, - schema: &req.schema_name, - table: &req.table_name, + catalog, + schema, + table, }; let full_table_name = table_ref.to_string(); - let table = self.get_table(&table_ref)?; - - table.flush(req).await.context(error::FlushTableSnafu { + table.flush(region).await.context(error::FlushTableSnafu { table_name: full_table_name, - })?; - Ok(Output::AffectedRows(0)) + }) } } diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index d6c03e2931..395a91d979 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -150,8 +150,10 @@ impl Services { if let Some(http_options) = &opts.http_options { let http_addr = parse_addr(&http_options.addr)?; + let mut http_server = HttpServer::new( ServerSqlQueryHandlerAdaptor::arc(instance.clone()), + ServerGrpcQueryHandlerAdaptor::arc(instance.clone()), http_options.clone(), ); if let Some(user_provider) = user_provider.clone() { diff --git a/src/mito/src/table.rs b/src/mito/src/table.rs index 8bf9a568ec..abb1188a54 100644 --- a/src/mito/src/table.rs +++ b/src/mito/src/table.rs @@ -44,7 +44,7 @@ use table::metadata::{ FilterPushDownType, RawTableInfo, TableInfo, TableInfoRef, TableMeta, TableType, }; use table::requests::{ - AddColumnRequest, AlterKind, AlterTableRequest, DeleteRequest, FlushTableRequest, InsertRequest, + AddColumnRequest, AlterKind, AlterTableRequest, DeleteRequest, InsertRequest, }; use table::table::scan::SimpleTableScan; use table::table::{AlterContext, Table}; @@ -323,9 +323,9 @@ impl Table for MitoTable { Ok(rows_deleted) } - async fn flush(&self, request: FlushTableRequest) -> TableResult<()> { - if let Some(region_id) = request.region_number { - if let Some(region) = self.regions.get(®ion_id) { + async fn flush(&self, region_number: Option) -> TableResult<()> { + if let Some(region_number) = region_number { + if let Some(region) = self.regions.get(®ion_number) { region .flush() .await diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 4e6a3b5e5f..8e7c19f76e 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -160,9 +160,9 @@ pub enum Error { }, #[snafu(display( - "Failed to put OpenTSDB data point: {:?}, source: {}", - data_point, - source + "Failed to put OpenTSDB data point: {:?}, source: {}", + data_point, + source ))] PutOpentsdbDataPoint { data_point: String, @@ -263,8 +263,26 @@ pub enum Error { #[snafu(backtrace)] source: common_mem_prof::error::Error, }, + #[snafu(display("Invalid prepare statement: {}", err_msg))] InvalidPrepareStatement { err_msg: String }, + + #[snafu(display("Invalid flush argument: {}", err_msg))] + InvalidFlushArgument { + err_msg: String, + }, + + // #[snafu(display("Failed to flush table, catalog: {}, schema: {}, table: {}, region: {:?}, source: {}", + // catalog, schema, table, region_id, source + // ))] + // FlushTable { + // catalog: String, + // schema: String, + // table: String, + // region_id: Option, + // #[snafu(backtrace)] + // source: Error, + // }, } pub type Result = std::result::Result; @@ -327,6 +345,7 @@ impl ErrorExt for Error { DatabaseNotFound { .. } => StatusCode::DatabaseNotFound, #[cfg(feature = "mem-prof")] DumpProfileData { source, .. } => source.status_code(), + InvalidFlushArgument { .. } => StatusCode::InvalidArguments, } } diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 4bdd945dc7..b731eb0542 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -21,6 +21,7 @@ pub mod script; #[cfg(feature = "mem-prof")] pub mod mem_prof; +mod admin; use std::net::SocketAddr; use std::sync::Arc; @@ -56,11 +57,13 @@ use self::authorize::HttpAuth; use self::influxdb::{influxdb_health, influxdb_ping, influxdb_write}; use crate::auth::UserProviderRef; use crate::error::{AlreadyStartedSnafu, Result, StartHttpSnafu}; +use crate::http::admin::flush; use crate::query_handler::sql::ServerSqlQueryHandlerRef; use crate::query_handler::{ InfluxdbLineProtocolHandlerRef, OpentsdbProtocolHandlerRef, PrometheusProtocolHandlerRef, ScriptHandlerRef, }; +use crate::query_handler::grpc::ServerGrpcQueryHandlerRef; use crate::server::Server; /// create query context from database name information, catalog and schema are @@ -96,6 +99,7 @@ pub static PUBLIC_APIS: [&str; 2] = ["/v1/influxdb/ping", "/v1/influxdb/health"] pub struct HttpServer { sql_handler: ServerSqlQueryHandlerRef, + grpc_handler: ServerGrpcQueryHandlerRef, options: HttpOptions, influxdb_handler: Option, opentsdb_handler: Option, @@ -345,13 +349,17 @@ async fn serve_docs() -> Html { #[derive(Clone)] pub struct ApiState { pub sql_handler: ServerSqlQueryHandlerRef, + pub grpc_handler: ServerGrpcQueryHandlerRef, pub script_handler: Option, } impl HttpServer { - pub fn new(sql_handler: ServerSqlQueryHandlerRef, options: HttpOptions) -> Self { + pub fn new(sql_handler: ServerSqlQueryHandlerRef, + grpc_handler: ServerGrpcQueryHandlerRef, + options: HttpOptions) -> Self { Self { sql_handler, + grpc_handler, options, opentsdb_handler: None, influxdb_handler: None, @@ -420,12 +428,14 @@ impl HttpServer { let sql_router = self .route_sql(ApiState { sql_handler: self.sql_handler.clone(), + grpc_handler: self.grpc_handler.clone(), script_handler: self.script_handler.clone(), }) .finish_api(&mut api) .layer(Extension(api)); let mut router = Router::new().nest(&format!("/{HTTP_API_VERSION}"), sql_router); + router = router.nest(&format!("/{HTTP_API_VERSION}/admin"), self.route_admin(self.grpc_handler.clone())); if let Some(opentsdb_handler) = self.opentsdb_handler.clone() { router = router.nest( @@ -517,6 +527,11 @@ impl HttpServer { .route("/api/put", routing::post(opentsdb::put)) .with_state(opentsdb_handler) } + + fn route_admin(&self, grpc_handler: ServerGrpcQueryHandlerRef) -> Router { + Router::new().route("/flush", routing::post(flush)) + .with_state(grpc_handler) + } } pub const HTTP_SERVER: &str = "HTTP_SERVER"; @@ -637,8 +652,10 @@ mod test { fn make_test_app(tx: mpsc::Sender<(String, Vec)>) -> Router { let instance = Arc::new(DummyInstance { _tx: tx }); - let instance = ServerSqlQueryHandlerAdaptor::arc(instance); - let server = HttpServer::new(instance, HttpOptions::default()); + let instance = ServerSqlQueryHandlerAdaptor::arc(instance.clone()); + let server = HttpServer::new(instance.clone(), + instance, + HttpOptions::default()); server.make_app().route( "/test/timeout", get(forever.layer( diff --git a/src/servers/src/http/admin.rs b/src/servers/src/http/admin.rs new file mode 100644 index 0000000000..ea60c7da38 --- /dev/null +++ b/src/servers/src/http/admin.rs @@ -0,0 +1,57 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + + +use axum::extract::{Query, RawBody, State}; +use axum::http::StatusCode as HttpStatusCode; +use axum::Json; +use crate::query_handler::grpc::ServerGrpcQueryHandlerRef; +use std::collections::HashMap; +use snafu::OptionExt; +use api::v1::greptime_request::Request; +use api::v1::{DdlRequest, FlushTableExpr}; +use api::v1::ddl_request::Expr; +use session::context::QueryContext; +use crate::error; + +use crate::error::{Result}; + +#[axum_macros::debug_handler] +pub async fn flush( + State(grpc_handler): State, + Query(params): Query>, + RawBody(_): RawBody, +) -> Result<(HttpStatusCode, Json)> { + let catalog_name = params.get("catalog_name").cloned().unwrap_or("greptime".to_string()); + let schema_name = params.get("schema_name").cloned().context(error::InvalidFlushArgumentSnafu { + err_msg: "schema_name is not present", + })?; + + // if table name is not present, flush all tables inside schema + let table_name = params.get("table_name").cloned().unwrap_or_default(); + + let region_id: Option = params.get("region_id").map(|v| v.parse()).transpose().ok().flatten(); + + let request = Request::Ddl(DdlRequest { + expr: Some(Expr::FlushTable(FlushTableExpr { + catalog_name: catalog_name.clone(), + schema_name: schema_name.clone(), + table_name: table_name.clone(), + region_id, + })) + }); + + grpc_handler.do_query(request, QueryContext::arc()).await?; + Ok((HttpStatusCode::OK, Json::from("hello, world".to_string()))) +} \ No newline at end of file diff --git a/src/servers/src/http/handler.rs b/src/servers/src/http/handler.rs index ac77fc6334..8f7ede1c91 100644 --- a/src/servers/src/http/handler.rs +++ b/src/servers/src/http/handler.rs @@ -43,6 +43,7 @@ pub async fn sql( Form(form_params): Form, ) -> Json { let sql_handler = &state.sql_handler; + let start = Instant::now(); let sql = query_params.sql.or(form_params.sql); let db = query_params.db.or(form_params.db); diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index c2e6d31d9a..76fff8a986 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -213,7 +213,7 @@ pub struct CopyTableFromRequest { pub struct FlushTableRequest { pub catalog_name: String, pub schema_name: String, - pub table_name: String, + pub table_name: Option, pub region_number: Option, } diff --git a/src/table/src/table.rs b/src/table/src/table.rs index 223f132dda..ad625df59e 100644 --- a/src/table/src/table.rs +++ b/src/table/src/table.rs @@ -23,10 +23,11 @@ use async_trait::async_trait; use common_query::logical_plan::Expr; use common_query::physical_plan::PhysicalPlanRef; use datatypes::schema::SchemaRef; +use store_api::storage::RegionNumber; use crate::error::{Result, UnsupportedSnafu}; use crate::metadata::{FilterPushDownType, TableId, TableInfoRef, TableType}; -use crate::requests::{AlterTableRequest, DeleteRequest, FlushTableRequest, InsertRequest}; +use crate::requests::{AlterTableRequest, DeleteRequest, InsertRequest}; pub type AlterContext = anymap::Map; @@ -95,7 +96,8 @@ pub trait Table: Send + Sync { } /// Flush table. - async fn flush(&self, _request: FlushTableRequest) -> Result<()> { + async fn flush(&self, region_number: Option) -> Result<()> { + let _ = region_number; UnsupportedSnafu { operation: "FLUSH" }.fail()? } diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 6013010d77..4b33b02b6e 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -275,7 +275,8 @@ pub async fn setup_test_http_app(store_type: StorageType, name: &str) -> (Router .await .unwrap(); let http_server = HttpServer::new( - ServerSqlQueryHandlerAdaptor::arc(Arc::new(build_frontend_instance(instance))), + ServerSqlQueryHandlerAdaptor::arc(Arc::new(build_frontend_instance(instance.clone()))), + ServerGrpcQueryHandlerAdaptor::arc(instance.clone()), HttpOptions::default(), ); (http_server.make_app(), guard) @@ -296,8 +297,10 @@ pub async fn setup_test_http_app_with_frontend( ) .await .unwrap(); + let frontend_ref = Arc::new(frontend); let mut http_server = HttpServer::new( - ServerSqlQueryHandlerAdaptor::arc(Arc::new(frontend)), + ServerSqlQueryHandlerAdaptor::arc(frontend_ref.clone()), + ServerGrpcQueryHandlerAdaptor::arc(frontend_ref), HttpOptions::default(), ); http_server.set_script_handler(instance.clone()); From d1ba9ca1261ebce264b8bc6bb26f04fa2c5ce407 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Tue, 14 Mar 2023 16:11:25 +0800 Subject: [PATCH 2/4] fix: unit tests --- src/datanode/src/server/grpc.rs | 1 - src/mito/src/engine/tests.rs | 8 ++++---- src/servers/src/http.rs | 21 ++++++++++++++++----- src/servers/src/http/admin.rs | 2 +- src/servers/tests/http/http_test.rs | 3 ++- src/servers/tests/http/influxdb_test.rs | 15 ++++++++++++++- src/servers/tests/http/opentsdb_test.rs | 13 ++++++++++++- src/servers/tests/http/prometheus_test.rs | 15 ++++++++++++++- src/servers/tests/mod.rs | 2 +- 9 files changed, 64 insertions(+), 16 deletions(-) diff --git a/src/datanode/src/server/grpc.rs b/src/datanode/src/server/grpc.rs index f15f248f5e..8170d96f67 100644 --- a/src/datanode/src/server/grpc.rs +++ b/src/datanode/src/server/grpc.rs @@ -96,7 +96,6 @@ impl Instance { table_name, region_number: expr.region_id, }; - println!("req: {:?}\n", req); self.sql_handler() .execute(SqlRequest::FlushTable(req), QueryContext::arc()) .await diff --git a/src/mito/src/engine/tests.rs b/src/mito/src/engine/tests.rs index 9771cb668e..c252d4f0c1 100644 --- a/src/mito/src/engine/tests.rs +++ b/src/mito/src/engine/tests.rs @@ -793,10 +793,10 @@ async fn test_flush_table_all_regions() { assert!(!has_parquet_file(®ion_dir)); // Trigger flush all region - table.flush(FlushTableRequest::default()).await.unwrap(); + table.flush(None).await.unwrap(); // Trigger again, wait for the previous task finished - table.flush(FlushTableRequest::default()).await.unwrap(); + table.flush(None).await.unwrap(); assert!(has_parquet_file(®ion_dir)); } @@ -832,10 +832,10 @@ async fn test_flush_table_with_region_id() { }; // Trigger flush all region - table.flush(req.clone()).await.unwrap(); + table.flush(req.region_number).await.unwrap(); // Trigger again, wait for the previous task finished - table.flush(req).await.unwrap(); + table.flush(req.region_number).await.unwrap(); assert!(has_parquet_file(®ion_dir)); } diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index b731eb0542..0c049fd4eb 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -349,7 +349,6 @@ async fn serve_docs() -> Html { #[derive(Clone)] pub struct ApiState { pub sql_handler: ServerSqlQueryHandlerRef, - pub grpc_handler: ServerGrpcQueryHandlerRef, pub script_handler: Option, } @@ -428,7 +427,6 @@ impl HttpServer { let sql_router = self .route_sql(ApiState { sql_handler: self.sql_handler.clone(), - grpc_handler: self.grpc_handler.clone(), script_handler: self.script_handler.clone(), }) .finish_api(&mut api) @@ -604,15 +602,26 @@ mod test { use query::parser::PromQuery; use session::context::QueryContextRef; use tokio::sync::mpsc; + use api::v1::greptime_request::Request; use super::*; use crate::error::Error; + use crate::query_handler::grpc::{GrpcQueryHandler, ServerGrpcQueryHandlerAdaptor}; use crate::query_handler::sql::{ServerSqlQueryHandlerAdaptor, SqlQueryHandler}; struct DummyInstance { _tx: mpsc::Sender<(String, Vec)>, } + #[async_trait] + impl GrpcQueryHandler for DummyInstance { + type Error = Error; + + async fn do_query(&self, _query: Request, _ctx: QueryContextRef) -> std::result::Result { + unimplemented!() + } + } + #[async_trait] impl SqlQueryHandler for DummyInstance { type Error = Error; @@ -652,9 +661,11 @@ mod test { fn make_test_app(tx: mpsc::Sender<(String, Vec)>) -> Router { let instance = Arc::new(DummyInstance { _tx: tx }); - let instance = ServerSqlQueryHandlerAdaptor::arc(instance.clone()); - let server = HttpServer::new(instance.clone(), - instance, + let sql_instance = ServerSqlQueryHandlerAdaptor::arc(instance.clone()); + let grpc_instance = ServerGrpcQueryHandlerAdaptor::arc(instance); + + let server = HttpServer::new(sql_instance, + grpc_instance, HttpOptions::default()); server.make_app().route( "/test/timeout", diff --git a/src/servers/src/http/admin.rs b/src/servers/src/http/admin.rs index ea60c7da38..466bb313fb 100644 --- a/src/servers/src/http/admin.rs +++ b/src/servers/src/http/admin.rs @@ -41,7 +41,7 @@ pub async fn flush( // if table name is not present, flush all tables inside schema let table_name = params.get("table_name").cloned().unwrap_or_default(); - let region_id: Option = params.get("region_id").map(|v| v.parse()).transpose().ok().flatten(); + let region_id: Option = params.get("region").map(|v| v.parse()).transpose().ok().flatten(); let request = Request::Ddl(DdlRequest { expr: Some(Expr::FlushTable(FlushTableExpr { diff --git a/src/servers/tests/http/http_test.rs b/src/servers/tests/http/http_test.rs index cf4ca38b07..c6398022b0 100644 --- a/src/servers/tests/http/http_test.rs +++ b/src/servers/tests/http/http_test.rs @@ -17,11 +17,12 @@ use axum_test_helper::TestClient; use servers::http::{HttpOptions, HttpServer}; use table::test_util::MemTable; -use crate::create_testing_sql_query_handler; +use crate::{create_testing_grpc_query_handler, create_testing_sql_query_handler}; fn make_test_app() -> Router { let server = HttpServer::new( create_testing_sql_query_handler(MemTable::default_numbers_table()), + create_testing_grpc_query_handler(MemTable::default_numbers_table()), HttpOptions::default(), ); server.make_app() diff --git a/src/servers/tests/http/influxdb_test.rs b/src/servers/tests/http/influxdb_test.rs index 123d9e0d02..3f1e2ab7fd 100644 --- a/src/servers/tests/http/influxdb_test.rs +++ b/src/servers/tests/http/influxdb_test.rs @@ -28,6 +28,8 @@ use servers::query_handler::sql::SqlQueryHandler; use servers::query_handler::InfluxdbLineProtocolHandler; use session::context::QueryContextRef; use tokio::sync::mpsc; +use api::v1::greptime_request::Request; +use servers::query_handler::grpc::GrpcQueryHandler; use crate::auth::{DatabaseAuthInfo, MockUserProvider}; @@ -35,6 +37,15 @@ struct DummyInstance { tx: Arc>, } +#[async_trait] +impl GrpcQueryHandler for DummyInstance { + type Error =Error; + + async fn do_query(&self, _query: Request, _ctx: QueryContextRef) -> std::result::Result { + unimplemented!() + } +} + #[async_trait] impl InfluxdbLineProtocolHandler for DummyInstance { async fn exec(&self, request: &InfluxdbRequest, ctx: QueryContextRef) -> Result<()> { @@ -79,7 +90,9 @@ impl SqlQueryHandler for DummyInstance { fn make_test_app(tx: Arc>, db_name: Option<&str>) -> Router { let instance = Arc::new(DummyInstance { tx }); - let mut server = HttpServer::new(instance.clone(), HttpOptions::default()); + let mut server = HttpServer::new(instance.clone(), + instance.clone(), + HttpOptions::default()); let mut user_provider = MockUserProvider::default(); if let Some(name) = db_name { user_provider.set_authorization_info(DatabaseAuthInfo { diff --git a/src/servers/tests/http/opentsdb_test.rs b/src/servers/tests/http/opentsdb_test.rs index 96e40d8fd0..453cb2f7ee 100644 --- a/src/servers/tests/http/opentsdb_test.rs +++ b/src/servers/tests/http/opentsdb_test.rs @@ -27,11 +27,22 @@ use servers::query_handler::sql::SqlQueryHandler; use servers::query_handler::OpentsdbProtocolHandler; use session::context::QueryContextRef; use tokio::sync::mpsc; +use api::v1::greptime_request::Request; +use servers::query_handler::grpc::GrpcQueryHandler; struct DummyInstance { tx: mpsc::Sender, } +#[async_trait] +impl GrpcQueryHandler for DummyInstance { + type Error = crate::Error; + + async fn do_query(&self, _query: Request, _ctx: QueryContextRef) -> std::result::Result { + unimplemented!() + } +} + #[async_trait] impl OpentsdbProtocolHandler for DummyInstance { async fn exec(&self, data_point: &DataPoint, _ctx: QueryContextRef) -> Result<()> { @@ -77,7 +88,7 @@ impl SqlQueryHandler for DummyInstance { fn make_test_app(tx: mpsc::Sender) -> Router { let instance = Arc::new(DummyInstance { tx }); - let mut server = HttpServer::new(instance.clone(), HttpOptions::default()); + let mut server = HttpServer::new(instance.clone(),instance.clone(), HttpOptions::default()); server.set_opentsdb_handler(instance); server.make_app() } diff --git a/src/servers/tests/http/prometheus_test.rs b/src/servers/tests/http/prometheus_test.rs index 2a40af43b0..5677fb6c8d 100644 --- a/src/servers/tests/http/prometheus_test.rs +++ b/src/servers/tests/http/prometheus_test.rs @@ -32,11 +32,22 @@ use servers::query_handler::sql::SqlQueryHandler; use servers::query_handler::{PrometheusProtocolHandler, PrometheusResponse}; use session::context::QueryContextRef; use tokio::sync::mpsc; +use api::v1::greptime_request::Request; +use servers::query_handler::grpc::GrpcQueryHandler; struct DummyInstance { tx: mpsc::Sender<(String, Vec)>, } +#[async_trait] +impl GrpcQueryHandler for DummyInstance { + type Error =Error; + + async fn do_query(&self, _query: Request, _ctx: QueryContextRef) -> std::result::Result { + unimplemented!() + } +} + #[async_trait] impl PrometheusProtocolHandler for DummyInstance { async fn write(&self, request: WriteRequest, ctx: QueryContextRef) -> Result<()> { @@ -102,7 +113,9 @@ impl SqlQueryHandler for DummyInstance { fn make_test_app(tx: mpsc::Sender<(String, Vec)>) -> Router { let instance = Arc::new(DummyInstance { tx }); - let mut server = HttpServer::new(instance.clone(), HttpOptions::default()); + let mut server = HttpServer::new(instance.clone(), + instance.clone(), + HttpOptions::default()); server.set_prom_handler(instance); server.make_app() } diff --git a/src/servers/tests/mod.rs b/src/servers/tests/mod.rs index 789b952c5b..785614d570 100644 --- a/src/servers/tests/mod.rs +++ b/src/servers/tests/mod.rs @@ -47,7 +47,7 @@ mod py_script; const LOCALHOST_WITH_0: &str = "127.0.0.1:0"; -struct DummyInstance { +pub struct DummyInstance { query_engine: QueryEngineRef, py_engine: Arc, scripts: RwLock>>, From c994e0de88cdf62a444cfe81a9fb6f0798ed473f Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Tue, 14 Mar 2023 16:13:57 +0800 Subject: [PATCH 3/4] fix: format --- src/datanode/src/server/grpc.rs | 2 +- src/datanode/src/sql/flush_table.rs | 41 +++++++++++++++++----- src/frontend/src/server.rs | 1 - src/servers/src/error.rs | 11 +++--- src/servers/src/http.rs | 32 ++++++++++------- src/servers/src/http/admin.rs | 42 +++++++++++++++-------- src/servers/tests/http/influxdb_test.rs | 16 +++++---- src/servers/tests/http/opentsdb_test.rs | 12 ++++--- src/servers/tests/http/prometheus_test.rs | 16 +++++---- 9 files changed, 110 insertions(+), 63 deletions(-) diff --git a/src/datanode/src/server/grpc.rs b/src/datanode/src/server/grpc.rs index 8170d96f67..a38093dc98 100644 --- a/src/datanode/src/server/grpc.rs +++ b/src/datanode/src/server/grpc.rs @@ -247,7 +247,7 @@ mod tests { ConcreteDataType::timestamp_millisecond_datatype(), false, ) - .with_time_index(true), + .with_time_index(true), ColumnSchema::new("cpu", ConcreteDataType::float32_datatype(), true), ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true), ]; diff --git a/src/datanode/src/sql/flush_table.rs b/src/datanode/src/sql/flush_table.rs index 7b12ef36ca..ab3b459311 100644 --- a/src/datanode/src/sql/flush_table.rs +++ b/src/datanode/src/sql/flush_table.rs @@ -23,23 +23,46 @@ use crate::sql::SqlHandler; impl SqlHandler { pub(crate) async fn flush_table(&self, req: FlushTableRequest) -> Result { if let Some(table) = &req.table_name { - self.flush_table_inner(&req.catalog_name, &req.schema_name, table, req.region_number).await?; + self.flush_table_inner( + &req.catalog_name, + &req.schema_name, + table, + req.region_number, + ) + .await?; } else { - let schema = self.catalog_manager.schema(&req.catalog_name, &req.schema_name).context(CatalogSnafu)?.context(DatabaseNotFoundSnafu { - catalog: &req.catalog_name, - schema: &req.schema_name, - })?; + let schema = self + .catalog_manager + .schema(&req.catalog_name, &req.schema_name) + .context(CatalogSnafu)? + .context(DatabaseNotFoundSnafu { + catalog: &req.catalog_name, + schema: &req.schema_name, + })?; let all_table_names = schema.table_names().context(CatalogSnafu)?; futures::future::join_all(all_table_names.iter().map(|table| { - self.flush_table_inner(&req.catalog_name, &req.schema_name,table, req.region_number) - }) - ).await.into_iter().collect::>>()?; + self.flush_table_inner( + &req.catalog_name, + &req.schema_name, + table, + req.region_number, + ) + })) + .await + .into_iter() + .collect::>>()?; } Ok(Output::AffectedRows(0)) } - async fn flush_table_inner(&self, catalog:&str, schema:&str, table: &str, region: Option) -> Result<()> { + async fn flush_table_inner( + &self, + catalog: &str, + schema: &str, + table: &str, + region: Option, + ) -> Result<()> { let table_ref = TableReference { catalog, schema, diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index 395a91d979..d4ab41dee8 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -150,7 +150,6 @@ impl Services { if let Some(http_options) = &opts.http_options { let http_addr = parse_addr(&http_options.addr)?; - let mut http_server = HttpServer::new( ServerSqlQueryHandlerAdaptor::arc(instance.clone()), ServerGrpcQueryHandlerAdaptor::arc(instance.clone()), diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 8e7c19f76e..92735341df 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -160,9 +160,9 @@ pub enum Error { }, #[snafu(display( - "Failed to put OpenTSDB data point: {:?}, source: {}", - data_point, - source + "Failed to put OpenTSDB data point: {:?}, source: {}", + data_point, + source ))] PutOpentsdbDataPoint { data_point: String, @@ -268,10 +268,7 @@ pub enum Error { InvalidPrepareStatement { err_msg: String }, #[snafu(display("Invalid flush argument: {}", err_msg))] - InvalidFlushArgument { - err_msg: String, - }, - + InvalidFlushArgument { err_msg: String }, // #[snafu(display("Failed to flush table, catalog: {}, schema: {}, table: {}, region: {:?}, source: {}", // catalog, schema, table, region_id, source // ))] diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 0c049fd4eb..0f7b9da563 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -19,9 +19,9 @@ pub mod opentsdb; pub mod prometheus; pub mod script; +mod admin; #[cfg(feature = "mem-prof")] pub mod mem_prof; -mod admin; use std::net::SocketAddr; use std::sync::Arc; @@ -58,12 +58,12 @@ use self::influxdb::{influxdb_health, influxdb_ping, influxdb_write}; use crate::auth::UserProviderRef; use crate::error::{AlreadyStartedSnafu, Result, StartHttpSnafu}; use crate::http::admin::flush; +use crate::query_handler::grpc::ServerGrpcQueryHandlerRef; use crate::query_handler::sql::ServerSqlQueryHandlerRef; use crate::query_handler::{ InfluxdbLineProtocolHandlerRef, OpentsdbProtocolHandlerRef, PrometheusProtocolHandlerRef, ScriptHandlerRef, }; -use crate::query_handler::grpc::ServerGrpcQueryHandlerRef; use crate::server::Server; /// create query context from database name information, catalog and schema are @@ -353,9 +353,11 @@ pub struct ApiState { } impl HttpServer { - pub fn new(sql_handler: ServerSqlQueryHandlerRef, - grpc_handler: ServerGrpcQueryHandlerRef, - options: HttpOptions) -> Self { + pub fn new( + sql_handler: ServerSqlQueryHandlerRef, + grpc_handler: ServerGrpcQueryHandlerRef, + options: HttpOptions, + ) -> Self { Self { sql_handler, grpc_handler, @@ -433,7 +435,10 @@ impl HttpServer { .layer(Extension(api)); let mut router = Router::new().nest(&format!("/{HTTP_API_VERSION}"), sql_router); - router = router.nest(&format!("/{HTTP_API_VERSION}/admin"), self.route_admin(self.grpc_handler.clone())); + router = router.nest( + &format!("/{HTTP_API_VERSION}/admin"), + self.route_admin(self.grpc_handler.clone()), + ); if let Some(opentsdb_handler) = self.opentsdb_handler.clone() { router = router.nest( @@ -527,7 +532,8 @@ impl HttpServer { } fn route_admin(&self, grpc_handler: ServerGrpcQueryHandlerRef) -> Router { - Router::new().route("/flush", routing::post(flush)) + Router::new() + .route("/flush", routing::post(flush)) .with_state(grpc_handler) } } @@ -591,6 +597,7 @@ mod test { use std::future::pending; use std::sync::Arc; + use api::v1::greptime_request::Request; use axum::handler::Handler; use axum::http::StatusCode; use axum::routing::get; @@ -602,7 +609,6 @@ mod test { use query::parser::PromQuery; use session::context::QueryContextRef; use tokio::sync::mpsc; - use api::v1::greptime_request::Request; use super::*; use crate::error::Error; @@ -617,7 +623,11 @@ mod test { impl GrpcQueryHandler for DummyInstance { type Error = Error; - async fn do_query(&self, _query: Request, _ctx: QueryContextRef) -> std::result::Result { + async fn do_query( + &self, + _query: Request, + _ctx: QueryContextRef, + ) -> std::result::Result { unimplemented!() } } @@ -664,9 +674,7 @@ mod test { let sql_instance = ServerSqlQueryHandlerAdaptor::arc(instance.clone()); let grpc_instance = ServerGrpcQueryHandlerAdaptor::arc(instance); - let server = HttpServer::new(sql_instance, - grpc_instance, - HttpOptions::default()); + let server = HttpServer::new(sql_instance, grpc_instance, HttpOptions::default()); server.make_app().route( "/test/timeout", get(forever.layer( diff --git a/src/servers/src/http/admin.rs b/src/servers/src/http/admin.rs index 466bb313fb..d801b1703c 100644 --- a/src/servers/src/http/admin.rs +++ b/src/servers/src/http/admin.rs @@ -12,20 +12,20 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; +use api::v1::ddl_request::Expr; +use api::v1::greptime_request::Request; +use api::v1::{DdlRequest, FlushTableExpr}; use axum::extract::{Query, RawBody, State}; use axum::http::StatusCode as HttpStatusCode; use axum::Json; -use crate::query_handler::grpc::ServerGrpcQueryHandlerRef; -use std::collections::HashMap; -use snafu::OptionExt; -use api::v1::greptime_request::Request; -use api::v1::{DdlRequest, FlushTableExpr}; -use api::v1::ddl_request::Expr; use session::context::QueryContext; -use crate::error; +use snafu::OptionExt; -use crate::error::{Result}; +use crate::error; +use crate::error::Result; +use crate::query_handler::grpc::ServerGrpcQueryHandlerRef; #[axum_macros::debug_handler] pub async fn flush( @@ -33,15 +33,27 @@ pub async fn flush( Query(params): Query>, RawBody(_): RawBody, ) -> Result<(HttpStatusCode, Json)> { - let catalog_name = params.get("catalog_name").cloned().unwrap_or("greptime".to_string()); - let schema_name = params.get("schema_name").cloned().context(error::InvalidFlushArgumentSnafu { - err_msg: "schema_name is not present", - })?; + let catalog_name = params + .get("catalog_name") + .cloned() + .unwrap_or("greptime".to_string()); + let schema_name = + params + .get("schema_name") + .cloned() + .context(error::InvalidFlushArgumentSnafu { + err_msg: "schema_name is not present", + })?; // if table name is not present, flush all tables inside schema let table_name = params.get("table_name").cloned().unwrap_or_default(); - let region_id: Option = params.get("region").map(|v| v.parse()).transpose().ok().flatten(); + let region_id: Option = params + .get("region") + .map(|v| v.parse()) + .transpose() + .ok() + .flatten(); let request = Request::Ddl(DdlRequest { expr: Some(Expr::FlushTable(FlushTableExpr { @@ -49,9 +61,9 @@ pub async fn flush( schema_name: schema_name.clone(), table_name: table_name.clone(), region_id, - })) + })), }); grpc_handler.do_query(request, QueryContext::arc()).await?; Ok((HttpStatusCode::OK, Json::from("hello, world".to_string()))) -} \ No newline at end of file +} diff --git a/src/servers/tests/http/influxdb_test.rs b/src/servers/tests/http/influxdb_test.rs index 3f1e2ab7fd..086c6403c5 100644 --- a/src/servers/tests/http/influxdb_test.rs +++ b/src/servers/tests/http/influxdb_test.rs @@ -14,6 +14,7 @@ use std::sync::Arc; +use api::v1::greptime_request::Request; use api::v1::InsertRequest; use async_trait::async_trait; use axum::{http, Router}; @@ -24,12 +25,11 @@ use query::parser::PromQuery; use servers::error::{Error, Result}; use servers::http::{HttpOptions, HttpServer}; use servers::influxdb::InfluxdbRequest; +use servers::query_handler::grpc::GrpcQueryHandler; use servers::query_handler::sql::SqlQueryHandler; use servers::query_handler::InfluxdbLineProtocolHandler; use session::context::QueryContextRef; use tokio::sync::mpsc; -use api::v1::greptime_request::Request; -use servers::query_handler::grpc::GrpcQueryHandler; use crate::auth::{DatabaseAuthInfo, MockUserProvider}; @@ -39,9 +39,13 @@ struct DummyInstance { #[async_trait] impl GrpcQueryHandler for DummyInstance { - type Error =Error; + type Error = Error; - async fn do_query(&self, _query: Request, _ctx: QueryContextRef) -> std::result::Result { + async fn do_query( + &self, + _query: Request, + _ctx: QueryContextRef, + ) -> std::result::Result { unimplemented!() } } @@ -90,9 +94,7 @@ impl SqlQueryHandler for DummyInstance { fn make_test_app(tx: Arc>, db_name: Option<&str>) -> Router { let instance = Arc::new(DummyInstance { tx }); - let mut server = HttpServer::new(instance.clone(), - instance.clone(), - HttpOptions::default()); + let mut server = HttpServer::new(instance.clone(), instance.clone(), HttpOptions::default()); let mut user_provider = MockUserProvider::default(); if let Some(name) = db_name { user_provider.set_authorization_info(DatabaseAuthInfo { diff --git a/src/servers/tests/http/opentsdb_test.rs b/src/servers/tests/http/opentsdb_test.rs index 453cb2f7ee..694751635e 100644 --- a/src/servers/tests/http/opentsdb_test.rs +++ b/src/servers/tests/http/opentsdb_test.rs @@ -14,6 +14,7 @@ use std::sync::Arc; +use api::v1::greptime_request::Request; use async_trait::async_trait; use axum::Router; use axum_test_helper::TestClient; @@ -23,12 +24,11 @@ use query::parser::PromQuery; use servers::error::{self, Result}; use servers::http::{HttpOptions, HttpServer}; use servers::opentsdb::codec::DataPoint; +use servers::query_handler::grpc::GrpcQueryHandler; use servers::query_handler::sql::SqlQueryHandler; use servers::query_handler::OpentsdbProtocolHandler; use session::context::QueryContextRef; use tokio::sync::mpsc; -use api::v1::greptime_request::Request; -use servers::query_handler::grpc::GrpcQueryHandler; struct DummyInstance { tx: mpsc::Sender, @@ -38,7 +38,11 @@ struct DummyInstance { impl GrpcQueryHandler for DummyInstance { type Error = crate::Error; - async fn do_query(&self, _query: Request, _ctx: QueryContextRef) -> std::result::Result { + async fn do_query( + &self, + _query: Request, + _ctx: QueryContextRef, + ) -> std::result::Result { unimplemented!() } } @@ -88,7 +92,7 @@ impl SqlQueryHandler for DummyInstance { fn make_test_app(tx: mpsc::Sender) -> Router { let instance = Arc::new(DummyInstance { tx }); - let mut server = HttpServer::new(instance.clone(),instance.clone(), HttpOptions::default()); + let mut server = HttpServer::new(instance.clone(), instance.clone(), HttpOptions::default()); server.set_opentsdb_handler(instance); server.make_app() } diff --git a/src/servers/tests/http/prometheus_test.rs b/src/servers/tests/http/prometheus_test.rs index 5677fb6c8d..173382b1e4 100644 --- a/src/servers/tests/http/prometheus_test.rs +++ b/src/servers/tests/http/prometheus_test.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use api::prometheus::remote::{ LabelMatcher, Query, QueryResult, ReadRequest, ReadResponse, WriteRequest, }; +use api::v1::greptime_request::Request; use async_trait::async_trait; use axum::Router; use axum_test_helper::TestClient; @@ -28,12 +29,11 @@ use servers::error::{Error, Result}; use servers::http::{HttpOptions, HttpServer}; use servers::prometheus; use servers::prometheus::{snappy_compress, Metrics}; +use servers::query_handler::grpc::GrpcQueryHandler; use servers::query_handler::sql::SqlQueryHandler; use servers::query_handler::{PrometheusProtocolHandler, PrometheusResponse}; use session::context::QueryContextRef; use tokio::sync::mpsc; -use api::v1::greptime_request::Request; -use servers::query_handler::grpc::GrpcQueryHandler; struct DummyInstance { tx: mpsc::Sender<(String, Vec)>, @@ -41,9 +41,13 @@ struct DummyInstance { #[async_trait] impl GrpcQueryHandler for DummyInstance { - type Error =Error; + type Error = Error; - async fn do_query(&self, _query: Request, _ctx: QueryContextRef) -> std::result::Result { + async fn do_query( + &self, + _query: Request, + _ctx: QueryContextRef, + ) -> std::result::Result { unimplemented!() } } @@ -113,9 +117,7 @@ impl SqlQueryHandler for DummyInstance { fn make_test_app(tx: mpsc::Sender<(String, Vec)>) -> Router { let instance = Arc::new(DummyInstance { tx }); - let mut server = HttpServer::new(instance.clone(), - instance.clone(), - HttpOptions::default()); + let mut server = HttpServer::new(instance.clone(), instance.clone(), HttpOptions::default()); server.set_prom_handler(instance); server.make_app() } From 0bd802c70dd76cc20ca0a230b792201dbaeabc95 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Tue, 14 Mar 2023 16:26:47 +0800 Subject: [PATCH 4/4] Update src/servers/src/error.rs Co-authored-by: Ruihang Xia --- src/servers/src/error.rs | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 92735341df..4c7038bad0 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -269,17 +269,6 @@ pub enum Error { #[snafu(display("Invalid flush argument: {}", err_msg))] InvalidFlushArgument { err_msg: String }, - // #[snafu(display("Failed to flush table, catalog: {}, schema: {}, table: {}, region: {:?}, source: {}", - // catalog, schema, table, region_id, source - // ))] - // FlushTable { - // catalog: String, - // schema: String, - // table: String, - // region_id: Option, - // #[snafu(backtrace)] - // source: Error, - // }, } pub type Result = std::result::Result;