From 17eb99bc52f26e4193f585887ad30361279c9c98 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Wed, 15 Mar 2023 20:15:34 +0800 Subject: [PATCH] feat: allow manual table flush through HTTP API (#1184) --- src/datanode/src/server/grpc.rs | 9 ++- src/datanode/src/sql/flush_table.rs | 58 +++++++++++++++---- src/frontend/src/server.rs | 1 + src/mito/src/engine/tests.rs | 8 +-- src/mito/src/table.rs | 8 +-- src/servers/src/error.rs | 5 ++ src/servers/src/http.rs | 42 +++++++++++++- src/servers/src/http/admin.rs | 69 +++++++++++++++++++++++ src/servers/src/http/handler.rs | 1 + src/servers/tests/http/http_test.rs | 3 +- src/servers/tests/http/influxdb_test.rs | 17 +++++- src/servers/tests/http/opentsdb_test.rs | 17 +++++- src/servers/tests/http/prometheus_test.rs | 17 +++++- src/servers/tests/mod.rs | 2 +- src/table/src/requests.rs | 2 +- src/table/src/table.rs | 6 +- tests-integration/src/test_util.rs | 7 ++- 17 files changed, 239 insertions(+), 33 deletions(-) create mode 100644 src/servers/src/http/admin.rs diff --git a/src/datanode/src/server/grpc.rs b/src/datanode/src/server/grpc.rs index 33734ceae1..a38093dc98 100644 --- a/src/datanode/src/server/grpc.rs +++ b/src/datanode/src/server/grpc.rs @@ -84,10 +84,16 @@ impl Instance { } pub(crate) async fn handle_flush_table(&self, expr: FlushTableExpr) -> Result { + let table_name = if expr.table_name.trim().is_empty() { + None + } else { + Some(expr.table_name) + }; + let req = FlushTableRequest { catalog_name: expr.catalog_name, schema_name: expr.schema_name, - table_name: expr.table_name, + table_name, region_number: expr.region_id, }; self.sql_handler() @@ -148,7 +154,6 @@ mod tests { } #[test] - fn test_create_column_schema() { let column_def = ColumnDef { name: "a".to_string(), diff --git a/src/datanode/src/sql/flush_table.rs b/src/datanode/src/sql/flush_table.rs index 35ea7361cc..ab3b459311 100644 --- a/src/datanode/src/sql/flush_table.rs +++ b/src/datanode/src/sql/flush_table.rs @@ -13,28 +13,66 @@ // limitations under the License. use common_query::Output; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use table::engine::TableReference; use table::requests::FlushTableRequest; -use crate::error::{self, Result}; +use crate::error::{self, CatalogSnafu, DatabaseNotFoundSnafu, Result}; use crate::sql::SqlHandler; impl SqlHandler { pub(crate) async fn flush_table(&self, req: FlushTableRequest) -> Result { + if let Some(table) = &req.table_name { + self.flush_table_inner( + &req.catalog_name, + &req.schema_name, + table, + req.region_number, + ) + .await?; + } else { + let schema = self + .catalog_manager + .schema(&req.catalog_name, &req.schema_name) + .context(CatalogSnafu)? + .context(DatabaseNotFoundSnafu { + catalog: &req.catalog_name, + schema: &req.schema_name, + })?; + + let all_table_names = schema.table_names().context(CatalogSnafu)?; + futures::future::join_all(all_table_names.iter().map(|table| { + self.flush_table_inner( + &req.catalog_name, + &req.schema_name, + table, + req.region_number, + ) + })) + .await + .into_iter() + .collect::>>()?; + } + Ok(Output::AffectedRows(0)) + } + + async fn flush_table_inner( + &self, + catalog: &str, + schema: &str, + table: &str, + region: Option, + ) -> Result<()> { let table_ref = TableReference { - catalog: &req.catalog_name, - schema: &req.schema_name, - table: &req.table_name, + catalog, + schema, + table, }; let full_table_name = table_ref.to_string(); - let table = self.get_table(&table_ref)?; - - table.flush(req).await.context(error::FlushTableSnafu { + table.flush(region).await.context(error::FlushTableSnafu { table_name: full_table_name, - })?; - Ok(Output::AffectedRows(0)) + }) } } diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index d6c03e2931..d4ab41dee8 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -152,6 +152,7 @@ impl Services { let mut http_server = HttpServer::new( ServerSqlQueryHandlerAdaptor::arc(instance.clone()), + ServerGrpcQueryHandlerAdaptor::arc(instance.clone()), http_options.clone(), ); if let Some(user_provider) = user_provider.clone() { diff --git a/src/mito/src/engine/tests.rs b/src/mito/src/engine/tests.rs index 9771cb668e..c252d4f0c1 100644 --- a/src/mito/src/engine/tests.rs +++ b/src/mito/src/engine/tests.rs @@ -793,10 +793,10 @@ async fn test_flush_table_all_regions() { assert!(!has_parquet_file(®ion_dir)); // Trigger flush all region - table.flush(FlushTableRequest::default()).await.unwrap(); + table.flush(None).await.unwrap(); // Trigger again, wait for the previous task finished - table.flush(FlushTableRequest::default()).await.unwrap(); + table.flush(None).await.unwrap(); assert!(has_parquet_file(®ion_dir)); } @@ -832,10 +832,10 @@ async fn test_flush_table_with_region_id() { }; // Trigger flush all region - table.flush(req.clone()).await.unwrap(); + table.flush(req.region_number).await.unwrap(); // Trigger again, wait for the previous task finished - table.flush(req).await.unwrap(); + table.flush(req.region_number).await.unwrap(); assert!(has_parquet_file(®ion_dir)); } diff --git a/src/mito/src/table.rs b/src/mito/src/table.rs index 2e837a04db..9404fb3be3 100644 --- a/src/mito/src/table.rs +++ b/src/mito/src/table.rs @@ -44,7 +44,7 @@ use table::metadata::{ FilterPushDownType, RawTableInfo, TableInfo, TableInfoRef, TableMeta, TableType, }; use table::requests::{ - AddColumnRequest, AlterKind, AlterTableRequest, DeleteRequest, FlushTableRequest, InsertRequest, + AddColumnRequest, AlterKind, AlterTableRequest, DeleteRequest, InsertRequest, }; use table::table::scan::SimpleTableScan; use table::table::{AlterContext, RegionStat, Table}; @@ -323,9 +323,9 @@ impl Table for MitoTable { Ok(rows_deleted) } - async fn flush(&self, request: FlushTableRequest) -> TableResult<()> { - if let Some(region_id) = request.region_number { - if let Some(region) = self.regions.get(®ion_id) { + async fn flush(&self, region_number: Option) -> TableResult<()> { + if let Some(region_number) = region_number { + if let Some(region) = self.regions.get(®ion_number) { region .flush() .await diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 4e6a3b5e5f..4c7038bad0 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -263,8 +263,12 @@ pub enum Error { #[snafu(backtrace)] source: common_mem_prof::error::Error, }, + #[snafu(display("Invalid prepare statement: {}", err_msg))] InvalidPrepareStatement { err_msg: String }, + + #[snafu(display("Invalid flush argument: {}", err_msg))] + InvalidFlushArgument { err_msg: String }, } pub type Result = std::result::Result; @@ -327,6 +331,7 @@ impl ErrorExt for Error { DatabaseNotFound { .. } => StatusCode::DatabaseNotFound, #[cfg(feature = "mem-prof")] DumpProfileData { source, .. } => source.status_code(), + InvalidFlushArgument { .. } => StatusCode::InvalidArguments, } } diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 4bdd945dc7..0f7b9da563 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -19,6 +19,7 @@ pub mod opentsdb; pub mod prometheus; pub mod script; +mod admin; #[cfg(feature = "mem-prof")] pub mod mem_prof; @@ -56,6 +57,8 @@ use self::authorize::HttpAuth; use self::influxdb::{influxdb_health, influxdb_ping, influxdb_write}; use crate::auth::UserProviderRef; use crate::error::{AlreadyStartedSnafu, Result, StartHttpSnafu}; +use crate::http::admin::flush; +use crate::query_handler::grpc::ServerGrpcQueryHandlerRef; use crate::query_handler::sql::ServerSqlQueryHandlerRef; use crate::query_handler::{ InfluxdbLineProtocolHandlerRef, OpentsdbProtocolHandlerRef, PrometheusProtocolHandlerRef, @@ -96,6 +99,7 @@ pub static PUBLIC_APIS: [&str; 2] = ["/v1/influxdb/ping", "/v1/influxdb/health"] pub struct HttpServer { sql_handler: ServerSqlQueryHandlerRef, + grpc_handler: ServerGrpcQueryHandlerRef, options: HttpOptions, influxdb_handler: Option, opentsdb_handler: Option, @@ -349,9 +353,14 @@ pub struct ApiState { } impl HttpServer { - pub fn new(sql_handler: ServerSqlQueryHandlerRef, options: HttpOptions) -> Self { + pub fn new( + sql_handler: ServerSqlQueryHandlerRef, + grpc_handler: ServerGrpcQueryHandlerRef, + options: HttpOptions, + ) -> Self { Self { sql_handler, + grpc_handler, options, opentsdb_handler: None, influxdb_handler: None, @@ -426,6 +435,10 @@ impl HttpServer { .layer(Extension(api)); let mut router = Router::new().nest(&format!("/{HTTP_API_VERSION}"), sql_router); + router = router.nest( + &format!("/{HTTP_API_VERSION}/admin"), + self.route_admin(self.grpc_handler.clone()), + ); if let Some(opentsdb_handler) = self.opentsdb_handler.clone() { router = router.nest( @@ -517,6 +530,12 @@ impl HttpServer { .route("/api/put", routing::post(opentsdb::put)) .with_state(opentsdb_handler) } + + fn route_admin(&self, grpc_handler: ServerGrpcQueryHandlerRef) -> Router { + Router::new() + .route("/flush", routing::post(flush)) + .with_state(grpc_handler) + } } pub const HTTP_SERVER: &str = "HTTP_SERVER"; @@ -578,6 +597,7 @@ mod test { use std::future::pending; use std::sync::Arc; + use api::v1::greptime_request::Request; use axum::handler::Handler; use axum::http::StatusCode; use axum::routing::get; @@ -592,12 +612,26 @@ mod test { use super::*; use crate::error::Error; + use crate::query_handler::grpc::{GrpcQueryHandler, ServerGrpcQueryHandlerAdaptor}; use crate::query_handler::sql::{ServerSqlQueryHandlerAdaptor, SqlQueryHandler}; struct DummyInstance { _tx: mpsc::Sender<(String, Vec)>, } + #[async_trait] + impl GrpcQueryHandler for DummyInstance { + type Error = Error; + + async fn do_query( + &self, + _query: Request, + _ctx: QueryContextRef, + ) -> std::result::Result { + unimplemented!() + } + } + #[async_trait] impl SqlQueryHandler for DummyInstance { type Error = Error; @@ -637,8 +671,10 @@ mod test { fn make_test_app(tx: mpsc::Sender<(String, Vec)>) -> Router { let instance = Arc::new(DummyInstance { _tx: tx }); - let instance = ServerSqlQueryHandlerAdaptor::arc(instance); - let server = HttpServer::new(instance, HttpOptions::default()); + let sql_instance = ServerSqlQueryHandlerAdaptor::arc(instance.clone()); + let grpc_instance = ServerGrpcQueryHandlerAdaptor::arc(instance); + + let server = HttpServer::new(sql_instance, grpc_instance, HttpOptions::default()); server.make_app().route( "/test/timeout", get(forever.layer( diff --git a/src/servers/src/http/admin.rs b/src/servers/src/http/admin.rs new file mode 100644 index 0000000000..d801b1703c --- /dev/null +++ b/src/servers/src/http/admin.rs @@ -0,0 +1,69 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use api::v1::ddl_request::Expr; +use api::v1::greptime_request::Request; +use api::v1::{DdlRequest, FlushTableExpr}; +use axum::extract::{Query, RawBody, State}; +use axum::http::StatusCode as HttpStatusCode; +use axum::Json; +use session::context::QueryContext; +use snafu::OptionExt; + +use crate::error; +use crate::error::Result; +use crate::query_handler::grpc::ServerGrpcQueryHandlerRef; + +#[axum_macros::debug_handler] +pub async fn flush( + State(grpc_handler): State, + Query(params): Query>, + RawBody(_): RawBody, +) -> Result<(HttpStatusCode, Json)> { + let catalog_name = params + .get("catalog_name") + .cloned() + .unwrap_or("greptime".to_string()); + let schema_name = + params + .get("schema_name") + .cloned() + .context(error::InvalidFlushArgumentSnafu { + err_msg: "schema_name is not present", + })?; + + // if table name is not present, flush all tables inside schema + let table_name = params.get("table_name").cloned().unwrap_or_default(); + + let region_id: Option = params + .get("region") + .map(|v| v.parse()) + .transpose() + .ok() + .flatten(); + + let request = Request::Ddl(DdlRequest { + expr: Some(Expr::FlushTable(FlushTableExpr { + catalog_name: catalog_name.clone(), + schema_name: schema_name.clone(), + table_name: table_name.clone(), + region_id, + })), + }); + + grpc_handler.do_query(request, QueryContext::arc()).await?; + Ok((HttpStatusCode::OK, Json::from("hello, world".to_string()))) +} diff --git a/src/servers/src/http/handler.rs b/src/servers/src/http/handler.rs index ac77fc6334..8f7ede1c91 100644 --- a/src/servers/src/http/handler.rs +++ b/src/servers/src/http/handler.rs @@ -43,6 +43,7 @@ pub async fn sql( Form(form_params): Form, ) -> Json { let sql_handler = &state.sql_handler; + let start = Instant::now(); let sql = query_params.sql.or(form_params.sql); let db = query_params.db.or(form_params.db); diff --git a/src/servers/tests/http/http_test.rs b/src/servers/tests/http/http_test.rs index cf4ca38b07..c6398022b0 100644 --- a/src/servers/tests/http/http_test.rs +++ b/src/servers/tests/http/http_test.rs @@ -17,11 +17,12 @@ use axum_test_helper::TestClient; use servers::http::{HttpOptions, HttpServer}; use table::test_util::MemTable; -use crate::create_testing_sql_query_handler; +use crate::{create_testing_grpc_query_handler, create_testing_sql_query_handler}; fn make_test_app() -> Router { let server = HttpServer::new( create_testing_sql_query_handler(MemTable::default_numbers_table()), + create_testing_grpc_query_handler(MemTable::default_numbers_table()), HttpOptions::default(), ); server.make_app() diff --git a/src/servers/tests/http/influxdb_test.rs b/src/servers/tests/http/influxdb_test.rs index 123d9e0d02..086c6403c5 100644 --- a/src/servers/tests/http/influxdb_test.rs +++ b/src/servers/tests/http/influxdb_test.rs @@ -14,6 +14,7 @@ use std::sync::Arc; +use api::v1::greptime_request::Request; use api::v1::InsertRequest; use async_trait::async_trait; use axum::{http, Router}; @@ -24,6 +25,7 @@ use query::parser::PromQuery; use servers::error::{Error, Result}; use servers::http::{HttpOptions, HttpServer}; use servers::influxdb::InfluxdbRequest; +use servers::query_handler::grpc::GrpcQueryHandler; use servers::query_handler::sql::SqlQueryHandler; use servers::query_handler::InfluxdbLineProtocolHandler; use session::context::QueryContextRef; @@ -35,6 +37,19 @@ struct DummyInstance { tx: Arc>, } +#[async_trait] +impl GrpcQueryHandler for DummyInstance { + type Error = Error; + + async fn do_query( + &self, + _query: Request, + _ctx: QueryContextRef, + ) -> std::result::Result { + unimplemented!() + } +} + #[async_trait] impl InfluxdbLineProtocolHandler for DummyInstance { async fn exec(&self, request: &InfluxdbRequest, ctx: QueryContextRef) -> Result<()> { @@ -79,7 +94,7 @@ impl SqlQueryHandler for DummyInstance { fn make_test_app(tx: Arc>, db_name: Option<&str>) -> Router { let instance = Arc::new(DummyInstance { tx }); - let mut server = HttpServer::new(instance.clone(), HttpOptions::default()); + let mut server = HttpServer::new(instance.clone(), instance.clone(), HttpOptions::default()); let mut user_provider = MockUserProvider::default(); if let Some(name) = db_name { user_provider.set_authorization_info(DatabaseAuthInfo { diff --git a/src/servers/tests/http/opentsdb_test.rs b/src/servers/tests/http/opentsdb_test.rs index 96e40d8fd0..694751635e 100644 --- a/src/servers/tests/http/opentsdb_test.rs +++ b/src/servers/tests/http/opentsdb_test.rs @@ -14,6 +14,7 @@ use std::sync::Arc; +use api::v1::greptime_request::Request; use async_trait::async_trait; use axum::Router; use axum_test_helper::TestClient; @@ -23,6 +24,7 @@ use query::parser::PromQuery; use servers::error::{self, Result}; use servers::http::{HttpOptions, HttpServer}; use servers::opentsdb::codec::DataPoint; +use servers::query_handler::grpc::GrpcQueryHandler; use servers::query_handler::sql::SqlQueryHandler; use servers::query_handler::OpentsdbProtocolHandler; use session::context::QueryContextRef; @@ -32,6 +34,19 @@ struct DummyInstance { tx: mpsc::Sender, } +#[async_trait] +impl GrpcQueryHandler for DummyInstance { + type Error = crate::Error; + + async fn do_query( + &self, + _query: Request, + _ctx: QueryContextRef, + ) -> std::result::Result { + unimplemented!() + } +} + #[async_trait] impl OpentsdbProtocolHandler for DummyInstance { async fn exec(&self, data_point: &DataPoint, _ctx: QueryContextRef) -> Result<()> { @@ -77,7 +92,7 @@ impl SqlQueryHandler for DummyInstance { fn make_test_app(tx: mpsc::Sender) -> Router { let instance = Arc::new(DummyInstance { tx }); - let mut server = HttpServer::new(instance.clone(), HttpOptions::default()); + let mut server = HttpServer::new(instance.clone(), instance.clone(), HttpOptions::default()); server.set_opentsdb_handler(instance); server.make_app() } diff --git a/src/servers/tests/http/prometheus_test.rs b/src/servers/tests/http/prometheus_test.rs index 2a40af43b0..173382b1e4 100644 --- a/src/servers/tests/http/prometheus_test.rs +++ b/src/servers/tests/http/prometheus_test.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use api::prometheus::remote::{ LabelMatcher, Query, QueryResult, ReadRequest, ReadResponse, WriteRequest, }; +use api::v1::greptime_request::Request; use async_trait::async_trait; use axum::Router; use axum_test_helper::TestClient; @@ -28,6 +29,7 @@ use servers::error::{Error, Result}; use servers::http::{HttpOptions, HttpServer}; use servers::prometheus; use servers::prometheus::{snappy_compress, Metrics}; +use servers::query_handler::grpc::GrpcQueryHandler; use servers::query_handler::sql::SqlQueryHandler; use servers::query_handler::{PrometheusProtocolHandler, PrometheusResponse}; use session::context::QueryContextRef; @@ -37,6 +39,19 @@ struct DummyInstance { tx: mpsc::Sender<(String, Vec)>, } +#[async_trait] +impl GrpcQueryHandler for DummyInstance { + type Error = Error; + + async fn do_query( + &self, + _query: Request, + _ctx: QueryContextRef, + ) -> std::result::Result { + unimplemented!() + } +} + #[async_trait] impl PrometheusProtocolHandler for DummyInstance { async fn write(&self, request: WriteRequest, ctx: QueryContextRef) -> Result<()> { @@ -102,7 +117,7 @@ impl SqlQueryHandler for DummyInstance { fn make_test_app(tx: mpsc::Sender<(String, Vec)>) -> Router { let instance = Arc::new(DummyInstance { tx }); - let mut server = HttpServer::new(instance.clone(), HttpOptions::default()); + let mut server = HttpServer::new(instance.clone(), instance.clone(), HttpOptions::default()); server.set_prom_handler(instance); server.make_app() } diff --git a/src/servers/tests/mod.rs b/src/servers/tests/mod.rs index 789b952c5b..785614d570 100644 --- a/src/servers/tests/mod.rs +++ b/src/servers/tests/mod.rs @@ -47,7 +47,7 @@ mod py_script; const LOCALHOST_WITH_0: &str = "127.0.0.1:0"; -struct DummyInstance { +pub struct DummyInstance { query_engine: QueryEngineRef, py_engine: Arc, scripts: RwLock>>, diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index c2e6d31d9a..76fff8a986 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -213,7 +213,7 @@ pub struct CopyTableFromRequest { pub struct FlushTableRequest { pub catalog_name: String, pub schema_name: String, - pub table_name: String, + pub table_name: Option, pub region_number: Option, } diff --git a/src/table/src/table.rs b/src/table/src/table.rs index 844fd8af18..a733473a77 100644 --- a/src/table/src/table.rs +++ b/src/table/src/table.rs @@ -23,10 +23,11 @@ use async_trait::async_trait; use common_query::logical_plan::Expr; use common_query::physical_plan::PhysicalPlanRef; use datatypes::schema::SchemaRef; +use store_api::storage::RegionNumber; use crate::error::{Result, UnsupportedSnafu}; use crate::metadata::{FilterPushDownType, TableId, TableInfoRef, TableType}; -use crate::requests::{AlterTableRequest, DeleteRequest, FlushTableRequest, InsertRequest}; +use crate::requests::{AlterTableRequest, DeleteRequest, InsertRequest}; pub type AlterContext = anymap::Map; @@ -95,7 +96,8 @@ pub trait Table: Send + Sync { } /// Flush table. - async fn flush(&self, _request: FlushTableRequest) -> Result<()> { + async fn flush(&self, region_number: Option) -> Result<()> { + let _ = region_number; UnsupportedSnafu { operation: "FLUSH" }.fail()? } diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 6013010d77..4b33b02b6e 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -275,7 +275,8 @@ pub async fn setup_test_http_app(store_type: StorageType, name: &str) -> (Router .await .unwrap(); let http_server = HttpServer::new( - ServerSqlQueryHandlerAdaptor::arc(Arc::new(build_frontend_instance(instance))), + ServerSqlQueryHandlerAdaptor::arc(Arc::new(build_frontend_instance(instance.clone()))), + ServerGrpcQueryHandlerAdaptor::arc(instance.clone()), HttpOptions::default(), ); (http_server.make_app(), guard) @@ -296,8 +297,10 @@ pub async fn setup_test_http_app_with_frontend( ) .await .unwrap(); + let frontend_ref = Arc::new(frontend); let mut http_server = HttpServer::new( - ServerSqlQueryHandlerAdaptor::arc(Arc::new(frontend)), + ServerSqlQueryHandlerAdaptor::arc(frontend_ref.clone()), + ServerGrpcQueryHandlerAdaptor::arc(frontend_ref), HttpOptions::default(), ); http_server.set_script_handler(instance.clone());