diff --git a/src/datanode/src/server/grpc.rs b/src/datanode/src/server/grpc.rs index 33734ceae1..f15f248f5e 100644 --- a/src/datanode/src/server/grpc.rs +++ b/src/datanode/src/server/grpc.rs @@ -84,12 +84,19 @@ impl Instance { } pub(crate) async fn handle_flush_table(&self, expr: FlushTableExpr) -> Result { + let table_name = if expr.table_name.trim().is_empty() { + None + } else { + Some(expr.table_name) + }; + let req = FlushTableRequest { catalog_name: expr.catalog_name, schema_name: expr.schema_name, - table_name: expr.table_name, + table_name, region_number: expr.region_id, }; + println!("req: {:?}\n", req); self.sql_handler() .execute(SqlRequest::FlushTable(req), QueryContext::arc()) .await @@ -148,7 +155,6 @@ mod tests { } #[test] - fn test_create_column_schema() { let column_def = ColumnDef { name: "a".to_string(), @@ -242,7 +248,7 @@ mod tests { ConcreteDataType::timestamp_millisecond_datatype(), false, ) - .with_time_index(true), + .with_time_index(true), ColumnSchema::new("cpu", ConcreteDataType::float32_datatype(), true), ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true), ]; diff --git a/src/datanode/src/sql/flush_table.rs b/src/datanode/src/sql/flush_table.rs index 35ea7361cc..7b12ef36ca 100644 --- a/src/datanode/src/sql/flush_table.rs +++ b/src/datanode/src/sql/flush_table.rs @@ -13,28 +13,43 @@ // limitations under the License. use common_query::Output; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use table::engine::TableReference; use table::requests::FlushTableRequest; -use crate::error::{self, Result}; +use crate::error::{self, CatalogSnafu, DatabaseNotFoundSnafu, Result}; use crate::sql::SqlHandler; impl SqlHandler { pub(crate) async fn flush_table(&self, req: FlushTableRequest) -> Result { + if let Some(table) = &req.table_name { + self.flush_table_inner(&req.catalog_name, &req.schema_name, table, req.region_number).await?; + } else { + let schema = self.catalog_manager.schema(&req.catalog_name, &req.schema_name).context(CatalogSnafu)?.context(DatabaseNotFoundSnafu { + catalog: &req.catalog_name, + schema: &req.schema_name, + })?; + + let all_table_names = schema.table_names().context(CatalogSnafu)?; + futures::future::join_all(all_table_names.iter().map(|table| { + self.flush_table_inner(&req.catalog_name, &req.schema_name,table, req.region_number) + }) + ).await.into_iter().collect::>>()?; + } + Ok(Output::AffectedRows(0)) + } + + async fn flush_table_inner(&self, catalog:&str, schema:&str, table: &str, region: Option) -> Result<()> { let table_ref = TableReference { - catalog: &req.catalog_name, - schema: &req.schema_name, - table: &req.table_name, + catalog, + schema, + table, }; let full_table_name = table_ref.to_string(); - let table = self.get_table(&table_ref)?; - - table.flush(req).await.context(error::FlushTableSnafu { + table.flush(region).await.context(error::FlushTableSnafu { table_name: full_table_name, - })?; - Ok(Output::AffectedRows(0)) + }) } } diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index d6c03e2931..395a91d979 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -150,8 +150,10 @@ impl Services { if let Some(http_options) = &opts.http_options { let http_addr = parse_addr(&http_options.addr)?; + let mut http_server = HttpServer::new( ServerSqlQueryHandlerAdaptor::arc(instance.clone()), + ServerGrpcQueryHandlerAdaptor::arc(instance.clone()), http_options.clone(), ); if let Some(user_provider) = user_provider.clone() { diff --git a/src/mito/src/table.rs b/src/mito/src/table.rs index 8bf9a568ec..abb1188a54 100644 --- a/src/mito/src/table.rs +++ b/src/mito/src/table.rs @@ -44,7 +44,7 @@ use table::metadata::{ FilterPushDownType, RawTableInfo, TableInfo, TableInfoRef, TableMeta, TableType, }; use table::requests::{ - AddColumnRequest, AlterKind, AlterTableRequest, DeleteRequest, FlushTableRequest, InsertRequest, + AddColumnRequest, AlterKind, AlterTableRequest, DeleteRequest, InsertRequest, }; use table::table::scan::SimpleTableScan; use table::table::{AlterContext, Table}; @@ -323,9 +323,9 @@ impl Table for MitoTable { Ok(rows_deleted) } - async fn flush(&self, request: FlushTableRequest) -> TableResult<()> { - if let Some(region_id) = request.region_number { - if let Some(region) = self.regions.get(®ion_id) { + async fn flush(&self, region_number: Option) -> TableResult<()> { + if let Some(region_number) = region_number { + if let Some(region) = self.regions.get(®ion_number) { region .flush() .await diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 4e6a3b5e5f..8e7c19f76e 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -160,9 +160,9 @@ pub enum Error { }, #[snafu(display( - "Failed to put OpenTSDB data point: {:?}, source: {}", - data_point, - source + "Failed to put OpenTSDB data point: {:?}, source: {}", + data_point, + source ))] PutOpentsdbDataPoint { data_point: String, @@ -263,8 +263,26 @@ pub enum Error { #[snafu(backtrace)] source: common_mem_prof::error::Error, }, + #[snafu(display("Invalid prepare statement: {}", err_msg))] InvalidPrepareStatement { err_msg: String }, + + #[snafu(display("Invalid flush argument: {}", err_msg))] + InvalidFlushArgument { + err_msg: String, + }, + + // #[snafu(display("Failed to flush table, catalog: {}, schema: {}, table: {}, region: {:?}, source: {}", + // catalog, schema, table, region_id, source + // ))] + // FlushTable { + // catalog: String, + // schema: String, + // table: String, + // region_id: Option, + // #[snafu(backtrace)] + // source: Error, + // }, } pub type Result = std::result::Result; @@ -327,6 +345,7 @@ impl ErrorExt for Error { DatabaseNotFound { .. } => StatusCode::DatabaseNotFound, #[cfg(feature = "mem-prof")] DumpProfileData { source, .. } => source.status_code(), + InvalidFlushArgument { .. } => StatusCode::InvalidArguments, } } diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 4bdd945dc7..b731eb0542 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -21,6 +21,7 @@ pub mod script; #[cfg(feature = "mem-prof")] pub mod mem_prof; +mod admin; use std::net::SocketAddr; use std::sync::Arc; @@ -56,11 +57,13 @@ use self::authorize::HttpAuth; use self::influxdb::{influxdb_health, influxdb_ping, influxdb_write}; use crate::auth::UserProviderRef; use crate::error::{AlreadyStartedSnafu, Result, StartHttpSnafu}; +use crate::http::admin::flush; use crate::query_handler::sql::ServerSqlQueryHandlerRef; use crate::query_handler::{ InfluxdbLineProtocolHandlerRef, OpentsdbProtocolHandlerRef, PrometheusProtocolHandlerRef, ScriptHandlerRef, }; +use crate::query_handler::grpc::ServerGrpcQueryHandlerRef; use crate::server::Server; /// create query context from database name information, catalog and schema are @@ -96,6 +99,7 @@ pub static PUBLIC_APIS: [&str; 2] = ["/v1/influxdb/ping", "/v1/influxdb/health"] pub struct HttpServer { sql_handler: ServerSqlQueryHandlerRef, + grpc_handler: ServerGrpcQueryHandlerRef, options: HttpOptions, influxdb_handler: Option, opentsdb_handler: Option, @@ -345,13 +349,17 @@ async fn serve_docs() -> Html { #[derive(Clone)] pub struct ApiState { pub sql_handler: ServerSqlQueryHandlerRef, + pub grpc_handler: ServerGrpcQueryHandlerRef, pub script_handler: Option, } impl HttpServer { - pub fn new(sql_handler: ServerSqlQueryHandlerRef, options: HttpOptions) -> Self { + pub fn new(sql_handler: ServerSqlQueryHandlerRef, + grpc_handler: ServerGrpcQueryHandlerRef, + options: HttpOptions) -> Self { Self { sql_handler, + grpc_handler, options, opentsdb_handler: None, influxdb_handler: None, @@ -420,12 +428,14 @@ impl HttpServer { let sql_router = self .route_sql(ApiState { sql_handler: self.sql_handler.clone(), + grpc_handler: self.grpc_handler.clone(), script_handler: self.script_handler.clone(), }) .finish_api(&mut api) .layer(Extension(api)); let mut router = Router::new().nest(&format!("/{HTTP_API_VERSION}"), sql_router); + router = router.nest(&format!("/{HTTP_API_VERSION}/admin"), self.route_admin(self.grpc_handler.clone())); if let Some(opentsdb_handler) = self.opentsdb_handler.clone() { router = router.nest( @@ -517,6 +527,11 @@ impl HttpServer { .route("/api/put", routing::post(opentsdb::put)) .with_state(opentsdb_handler) } + + fn route_admin(&self, grpc_handler: ServerGrpcQueryHandlerRef) -> Router { + Router::new().route("/flush", routing::post(flush)) + .with_state(grpc_handler) + } } pub const HTTP_SERVER: &str = "HTTP_SERVER"; @@ -637,8 +652,10 @@ mod test { fn make_test_app(tx: mpsc::Sender<(String, Vec)>) -> Router { let instance = Arc::new(DummyInstance { _tx: tx }); - let instance = ServerSqlQueryHandlerAdaptor::arc(instance); - let server = HttpServer::new(instance, HttpOptions::default()); + let instance = ServerSqlQueryHandlerAdaptor::arc(instance.clone()); + let server = HttpServer::new(instance.clone(), + instance, + HttpOptions::default()); server.make_app().route( "/test/timeout", get(forever.layer( diff --git a/src/servers/src/http/admin.rs b/src/servers/src/http/admin.rs new file mode 100644 index 0000000000..ea60c7da38 --- /dev/null +++ b/src/servers/src/http/admin.rs @@ -0,0 +1,57 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + + +use axum::extract::{Query, RawBody, State}; +use axum::http::StatusCode as HttpStatusCode; +use axum::Json; +use crate::query_handler::grpc::ServerGrpcQueryHandlerRef; +use std::collections::HashMap; +use snafu::OptionExt; +use api::v1::greptime_request::Request; +use api::v1::{DdlRequest, FlushTableExpr}; +use api::v1::ddl_request::Expr; +use session::context::QueryContext; +use crate::error; + +use crate::error::{Result}; + +#[axum_macros::debug_handler] +pub async fn flush( + State(grpc_handler): State, + Query(params): Query>, + RawBody(_): RawBody, +) -> Result<(HttpStatusCode, Json)> { + let catalog_name = params.get("catalog_name").cloned().unwrap_or("greptime".to_string()); + let schema_name = params.get("schema_name").cloned().context(error::InvalidFlushArgumentSnafu { + err_msg: "schema_name is not present", + })?; + + // if table name is not present, flush all tables inside schema + let table_name = params.get("table_name").cloned().unwrap_or_default(); + + let region_id: Option = params.get("region_id").map(|v| v.parse()).transpose().ok().flatten(); + + let request = Request::Ddl(DdlRequest { + expr: Some(Expr::FlushTable(FlushTableExpr { + catalog_name: catalog_name.clone(), + schema_name: schema_name.clone(), + table_name: table_name.clone(), + region_id, + })) + }); + + grpc_handler.do_query(request, QueryContext::arc()).await?; + Ok((HttpStatusCode::OK, Json::from("hello, world".to_string()))) +} \ No newline at end of file diff --git a/src/servers/src/http/handler.rs b/src/servers/src/http/handler.rs index ac77fc6334..8f7ede1c91 100644 --- a/src/servers/src/http/handler.rs +++ b/src/servers/src/http/handler.rs @@ -43,6 +43,7 @@ pub async fn sql( Form(form_params): Form, ) -> Json { let sql_handler = &state.sql_handler; + let start = Instant::now(); let sql = query_params.sql.or(form_params.sql); let db = query_params.db.or(form_params.db); diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index c2e6d31d9a..76fff8a986 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -213,7 +213,7 @@ pub struct CopyTableFromRequest { pub struct FlushTableRequest { pub catalog_name: String, pub schema_name: String, - pub table_name: String, + pub table_name: Option, pub region_number: Option, } diff --git a/src/table/src/table.rs b/src/table/src/table.rs index 223f132dda..ad625df59e 100644 --- a/src/table/src/table.rs +++ b/src/table/src/table.rs @@ -23,10 +23,11 @@ use async_trait::async_trait; use common_query::logical_plan::Expr; use common_query::physical_plan::PhysicalPlanRef; use datatypes::schema::SchemaRef; +use store_api::storage::RegionNumber; use crate::error::{Result, UnsupportedSnafu}; use crate::metadata::{FilterPushDownType, TableId, TableInfoRef, TableType}; -use crate::requests::{AlterTableRequest, DeleteRequest, FlushTableRequest, InsertRequest}; +use crate::requests::{AlterTableRequest, DeleteRequest, InsertRequest}; pub type AlterContext = anymap::Map; @@ -95,7 +96,8 @@ pub trait Table: Send + Sync { } /// Flush table. - async fn flush(&self, _request: FlushTableRequest) -> Result<()> { + async fn flush(&self, region_number: Option) -> Result<()> { + let _ = region_number; UnsupportedSnafu { operation: "FLUSH" }.fail()? } diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 6013010d77..4b33b02b6e 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -275,7 +275,8 @@ pub async fn setup_test_http_app(store_type: StorageType, name: &str) -> (Router .await .unwrap(); let http_server = HttpServer::new( - ServerSqlQueryHandlerAdaptor::arc(Arc::new(build_frontend_instance(instance))), + ServerSqlQueryHandlerAdaptor::arc(Arc::new(build_frontend_instance(instance.clone()))), + ServerGrpcQueryHandlerAdaptor::arc(instance.clone()), HttpOptions::default(), ); (http_server.make_app(), guard) @@ -296,8 +297,10 @@ pub async fn setup_test_http_app_with_frontend( ) .await .unwrap(); + let frontend_ref = Arc::new(frontend); let mut http_server = HttpServer::new( - ServerSqlQueryHandlerAdaptor::arc(Arc::new(frontend)), + ServerSqlQueryHandlerAdaptor::arc(frontend_ref.clone()), + ServerGrpcQueryHandlerAdaptor::arc(frontend_ref), HttpOptions::default(), ); http_server.set_script_handler(instance.clone());