diff --git a/src/frontend/src/catalog.rs b/src/frontend/src/catalog.rs index c58c7fdf68..4a8874428d 100644 --- a/src/frontend/src/catalog.rs +++ b/src/frontend/src/catalog.rs @@ -20,6 +20,7 @@ use crate::partitioning::range::RangePartitionRule; use crate::table::route::TableRoutes; use crate::table::DistTable; +#[derive(Clone)] pub struct FrontendCatalogManager { backend: KvBackendRef, table_routes: Arc, diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index 996e5bb435..63b9a2055f 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -193,6 +193,33 @@ pub enum Error { table_name: String, backtrace: Backtrace, }, + + #[snafu(display("Failed to find catalog by name: {}", catalog_name))] + CatalogNotFound { + catalog_name: String, + backtrace: Backtrace, + }, + + #[snafu(display("Failed to find schema, schema info: {}", schema_info))] + SchemaNotFound { + schema_info: String, + backtrace: Backtrace, + }, + + #[snafu(display("Table occurs error, source: {}", source))] + Table { + #[snafu(backtrace)] + source: table::error::Error, + }, + + #[snafu(display("Failed to get catalog manager"))] + CatalogManager { backtrace: Backtrace }, + + #[snafu(display("Failed to get full table name, source: {}", source))] + FullTableName { + #[snafu(backtrace)] + source: sql::error::Error, + }, } pub type Result = std::result::Result; @@ -208,6 +235,7 @@ impl ErrorExt for Error { | Error::InvalidInsertRequest { .. } | Error::FindPartitionColumn { .. } | Error::ColumnValuesNumberMismatch { .. } + | Error::CatalogManager { .. } | Error::RegionKeysSize { .. } => StatusCode::InvalidArguments, Error::RuntimeResource { source, .. } => source.status_code(), @@ -216,6 +244,10 @@ impl ErrorExt for Error { Error::ParseSql { source } => source.status_code(), + Error::FullTableName { source, .. } => source.status_code(), + + Error::Table { source } => source.status_code(), + Error::ConvertColumnDefaultConstraint { source, .. } | Error::ConvertScalarValue { source, .. } => source.status_code(), @@ -234,7 +266,9 @@ impl ErrorExt for Error { Error::TableNotFound { .. } => StatusCode::TableNotFound, Error::ColumnNotFound { .. } => StatusCode::TableColumnNotFound, - Error::JoinTask { .. } => StatusCode::Unexpected, + Error::JoinTask { .. } + | Error::SchemaNotFound { .. } + | Error::CatalogNotFound { .. } => StatusCode::Unexpected, Error::Catalog { source, .. } => source.status_code(), Error::ParseCatalogEntry { source, .. } => source.status_code(), diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index a4205e89d0..d69b69f2c2 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -13,6 +13,7 @@ use api::v1::{ }; use async_trait::async_trait; use catalog::remote::MetaKvBackend; +use catalog::{CatalogList, CatalogProviderRef, SchemaProviderRef}; use client::admin::{admin_result_to_output, Admin}; use client::{Client, Database, Select}; use common_error::prelude::BoxedError; @@ -29,6 +30,7 @@ use servers::query_handler::{ use snafu::prelude::*; use sql::ast::{ColumnDef, TableConstraint}; use sql::statements::create::{CreateTable, TIME_INDEX}; +use sql::statements::insert::Insert; use sql::statements::statement::Statement; use sql::statements::{column_def_to_schema, table_idents_to_full_name}; use sql::{dialect::GenericDialect, parser::ParserContext}; @@ -37,6 +39,7 @@ use crate::catalog::FrontendCatalogManager; use crate::datanode::DatanodeClients; use crate::error::{self, ConvertColumnDefaultConstraintSnafu, Result}; use crate::frontend::{FrontendOptions, Mode}; +use crate::sql::insert_to_request; use crate::table::route::TableRoutes; #[async_trait] @@ -56,13 +59,27 @@ pub trait FrontendInstance: pub type FrontendInstanceRef = Arc; -#[derive(Default)] +#[derive(Clone)] pub struct Instance { // TODO(hl): In standalone mode, there is only one client. // But in distribute mode, frontend should fetch datanodes' addresses from metasrv. client: Client, /// catalog manager is None in standalone mode, datanode will keep their own catalog_manager: Option, + // TODO(fys): it should be a trait that corresponds to two implementations: + // Standalone and Distributed, then the code behind it doesn't need to use so + // many match statements. + mode: Mode, +} + +impl Default for Instance { + fn default() -> Self { + Self { + client: Client::default(), + catalog_manager: None, + mode: Mode::Standalone, + } + } } impl Instance { @@ -115,6 +132,43 @@ impl Instance { pub fn admin(&self) -> Admin { Admin::new("greptime", self.client.clone()) } + + fn get_catalog(&self, catalog_name: &str) -> Result { + self.catalog_manager + .as_ref() + .context(error::CatalogManagerSnafu)? + .catalog(catalog_name) + .context(error::CatalogSnafu)? + .context(error::CatalogNotFoundSnafu { catalog_name }) + } + + fn get_schema(provider: CatalogProviderRef, schema_name: &str) -> Result { + provider + .schema(schema_name) + .context(error::CatalogSnafu)? + .context(error::SchemaNotFoundSnafu { + schema_info: schema_name, + }) + } + + async fn sql_dist_insert(&self, insert: Box) -> Result { + let (catalog, schema, table) = insert.full_table_name().context(error::ParseSqlSnafu)?; + + let catalog_provider = self.get_catalog(&catalog)?; + let schema_provider = Self::get_schema(catalog_provider, &schema)?; + + let insert_request = insert_to_request(&schema_provider, *insert)?; + + let table = schema_provider + .table(&table) + .context(error::CatalogSnafu)? + .context(error::TableNotFoundSnafu { table_name: &table })?; + + table + .insert(insert_request) + .await + .context(error::TableSnafu) + } } #[async_trait] @@ -131,6 +185,7 @@ impl Instance { Self { client, catalog_manager: None, + mode: Mode::Standalone, } } } @@ -156,26 +211,44 @@ impl SqlQueryHandler for Instance { .database() .select(Select::Sql(query.to_string())) .await - .and_then(|object_result| object_result.try_into()), + .and_then(|object_result| object_result.try_into()) + .map_err(BoxedError::new) + .context(server_error::ExecuteQuerySnafu { query }), Statement::Insert(insert) => { - // TODO(dennis): respect schema_name when inserting data - let (_catalog_name, _schema_name, table_name) = insert - .full_table_name() - .context(error::ParseSqlSnafu) - .map_err(BoxedError::new) - .context(server_error::ExecuteInsertSnafu { - msg: "Failed to get table name", - })?; + match self.mode { + Mode::Standalone => { + // TODO(dennis): respect schema_name when inserting data + let (_catalog_name, _schema_name, table_name) = insert + .full_table_name() + .context(error::ParseSqlSnafu) + .map_err(BoxedError::new) + .context(server_error::ExecuteInsertSnafu { + msg: "Failed to get table name", + })?; - let expr = InsertExpr { - table_name, - expr: Some(insert_expr::Expr::Sql(query.to_string())), - options: HashMap::default(), - }; - self.database() - .insert(expr) - .await - .and_then(|object_result| object_result.try_into()) + let expr = InsertExpr { + table_name, + expr: Some(insert_expr::Expr::Sql(query.to_string())), + options: HashMap::default(), + }; + self.database() + .insert(expr) + .await + .and_then(|object_result| object_result.try_into()) + .map_err(BoxedError::new) + .context(server_error::ExecuteQuerySnafu { query }) + } + Mode::Distributed => { + let affected = self + .sql_dist_insert(insert) + .await + .map_err(BoxedError::new) + .context(server_error::ExecuteInsertSnafu { + msg: "execute insert failed", + })?; + Ok(Output::AffectedRows(affected)) + } + } } Statement::CreateTable(create) => { let expr = create_to_expr(create) @@ -185,13 +258,17 @@ impl SqlQueryHandler for Instance { .create(expr) .await .and_then(admin_result_to_output) + .map_err(BoxedError::new) + .context(server_error::ExecuteQuerySnafu { query }) } Statement::ShowDatabases(_) | Statement::ShowTables(_) => self .database() .select(Select::Sql(query.to_string())) .await - .and_then(|object_result| object_result.try_into()), + .and_then(|object_result| object_result.try_into()) + .map_err(BoxedError::new) + .context(server_error::ExecuteQuerySnafu { query }), Statement::CreateDatabase(c) => { let expr = CreateDatabaseExpr { @@ -201,6 +278,8 @@ impl SqlQueryHandler for Instance { .create_database(expr) .await .and_then(admin_result_to_output) + .map_err(BoxedError::new) + .context(server_error::ExecuteQuerySnafu { query }) } Statement::Alter(alter_stmt) => self .admin() @@ -210,13 +289,13 @@ impl SqlQueryHandler for Instance { .context(server_error::ExecuteAlterSnafu { query })?, ) .await - .and_then(admin_result_to_output), + .and_then(admin_result_to_output) + .map_err(BoxedError::new) + .context(server_error::ExecuteQuerySnafu { query }), Statement::ShowCreateTable(_) => { return server_error::NotSupportedSnafu { feat: query }.fail() } } - .map_err(BoxedError::new) - .context(server_error::ExecuteQuerySnafu { query }) } async fn insert_script(&self, _name: &str, _script: &str) -> server_error::Result<()> { diff --git a/src/frontend/src/instance/influxdb.rs b/src/frontend/src/instance/influxdb.rs index 09e26bfa24..da6de4e715 100644 --- a/src/frontend/src/instance/influxdb.rs +++ b/src/frontend/src/instance/influxdb.rs @@ -2,22 +2,72 @@ use api::v1::InsertExpr; use async_trait::async_trait; use common_error::prelude::BoxedError; use servers::influxdb::InfluxdbRequest; -use servers::{error::ExecuteQuerySnafu, query_handler::InfluxdbLineProtocolHandler}; -use snafu::ResultExt; +use servers::{error as server_error, query_handler::InfluxdbLineProtocolHandler}; +use snafu::{OptionExt, ResultExt}; +use table::requests::InsertRequest; +use crate::error; +use crate::error::Result; +use crate::frontend::Mode; use crate::instance::Instance; #[async_trait] impl InfluxdbLineProtocolHandler for Instance { async fn exec(&self, request: &InfluxdbRequest) -> servers::error::Result<()> { - let exprs: Vec = request.try_into()?; - self.database() - .batch_insert(exprs) - .await - .map_err(BoxedError::new) - .context(ExecuteQuerySnafu { - query: &request.lines, - })?; + match self.mode { + Mode::Standalone => { + let exprs: Vec = request.try_into()?; + self.database() + .batch_insert(exprs) + .await + .map_err(BoxedError::new) + .context(server_error::ExecuteQuerySnafu { + query: &request.lines, + })?; + } + Mode::Distributed => { + self.dist_insert(request.try_into()?) + .await + .map_err(BoxedError::new) + .context(server_error::ExecuteInsertSnafu { + msg: "execute insert failed", + })?; + } + } + Ok(()) } } + +impl Instance { + pub(crate) async fn dist_insert(&self, inserts: Vec) -> Result { + let mut joins = Vec::with_capacity(inserts.len()); + + for insert in inserts { + let self_clone = self.clone(); + + // TODO(fys): need a separate runtime here + let join = tokio::spawn(async move { + let catalog = self_clone.get_catalog(&insert.catalog_name)?; + let schema = Self::get_schema(catalog, &insert.schema_name)?; + let table = schema + .table(&insert.table_name) + .context(error::CatalogSnafu)? + .context(error::TableNotFoundSnafu { + table_name: &insert.table_name, + })?; + + table.insert(insert).await.context(error::TableSnafu) + }); + joins.push(join); + } + + let mut affected = 0; + + for join in joins { + affected += join.await.context(error::JoinTaskSnafu)??; + } + + Ok(affected) + } +} diff --git a/src/frontend/src/instance/opentsdb.rs b/src/frontend/src/instance/opentsdb.rs index d2b6b13502..1645bc9778 100644 --- a/src/frontend/src/instance/opentsdb.rs +++ b/src/frontend/src/instance/opentsdb.rs @@ -7,6 +7,7 @@ use servers::query_handler::OpentsdbProtocolHandler; use snafu::prelude::*; use crate::error::{self, Result}; +use crate::frontend::Mode; use crate::instance::Instance; #[async_trait] @@ -14,12 +15,25 @@ impl OpentsdbProtocolHandler for Instance { async fn exec(&self, data_point: &DataPoint) -> server_error::Result<()> { // TODO(LFC): Insert metrics in batch, then make OpentsdbLineProtocolHandler::exec received multiple data points, when // metric table and tags can be created upon insertion. - self.insert_opentsdb_metric(data_point) - .await - .map_err(BoxedError::new) - .with_context(|_| server_error::PutOpentsdbDataPointSnafu { - data_point: format!("{:?}", data_point), - })?; + match self.mode { + Mode::Standalone => { + self.insert_opentsdb_metric(data_point) + .await + .map_err(BoxedError::new) + .with_context(|_| server_error::PutOpentsdbDataPointSnafu { + data_point: format!("{:?}", data_point), + })?; + } + Mode::Distributed => { + self.dist_insert(vec![data_point.as_insert_request()]) + .await + .map_err(BoxedError::new) + .context(server_error::ExecuteInsertSnafu { + msg: "execute insert failed", + })?; + } + } + Ok(()) } } diff --git a/src/frontend/src/instance/prometheus.rs b/src/frontend/src/instance/prometheus.rs index dc466aaa1c..f48c9e463f 100644 --- a/src/frontend/src/instance/prometheus.rs +++ b/src/frontend/src/instance/prometheus.rs @@ -12,6 +12,7 @@ use servers::prometheus::{self, Metrics}; use servers::query_handler::{PrometheusProtocolHandler, PrometheusResponse}; use snafu::{OptionExt, ResultExt}; +use crate::frontend::Mode; use crate::instance::Instance; const SAMPLES_RESPONSE_TYPE: i32 = ResponseType::Samples as i32; @@ -90,15 +91,29 @@ async fn handle_remote_queries( #[async_trait] impl PrometheusProtocolHandler for Instance { async fn write(&self, request: WriteRequest) -> ServerResult<()> { - let exprs = prometheus::write_request_to_insert_exprs(request)?; + match self.mode { + Mode::Standalone => { + let exprs = prometheus::write_request_to_insert_exprs(request)?; - self.database() - .batch_insert(exprs) - .await - .map_err(BoxedError::new) - .context(error::ExecuteInsertSnafu { - msg: "failed to write prometheus remote request", - })?; + self.database() + .batch_insert(exprs) + .await + .map_err(BoxedError::new) + .context(error::ExecuteInsertSnafu { + msg: "failed to write prometheus remote request", + })?; + } + Mode::Distributed => { + let inserts = prometheus::write_request_to_insert_reqs(request)?; + + self.dist_insert(inserts) + .await + .map_err(BoxedError::new) + .context(error::ExecuteInsertSnafu { + msg: "execute insert failed", + })?; + } + } Ok(()) }