From 62fcb54258b1db36592487347a44e9c2b3192a4b Mon Sep 17 00:00:00 2001 From: LFC Date: Mon, 21 Nov 2022 15:15:14 +0800 Subject: [PATCH] fix: correctly open table when distributed datanode restart (#576) Co-authored-by: luofucong --- Cargo.lock | 4 +- src/api/src/column_def.rs | 38 +++++++++++++ src/api/src/error.rs | 24 ++++++++ src/api/src/lib.rs | 1 + src/catalog/src/local/manager.rs | 1 + src/catalog/src/remote/manager.rs | 6 +- src/catalog/src/system.rs | 1 + src/cmd/src/datanode.rs | 52 ++++++++---------- src/datanode/src/error.rs | 19 +++++-- src/datanode/src/server/grpc/ddl.rs | 56 ++++++++----------- src/frontend/src/error.rs | 18 +++++- src/frontend/src/instance/distributed.rs | 58 ++++++++++---------- src/mito/src/engine.rs | 4 +- src/query/src/optimizer.rs | 5 -- src/servers/Cargo.toml | 2 +- src/servers/src/mysql/handler.rs | 40 ++++++++------ src/servers/src/mysql/server.rs | 28 ++++++++-- src/servers/src/mysql/writer.rs | 53 +++++++++++------- src/servers/tests/mysql/mysql_server_test.rs | 1 + src/table/src/requests.rs | 2 + 20 files changed, 262 insertions(+), 151 deletions(-) create mode 100644 src/api/src/column_def.rs diff --git a/Cargo.lock b/Cargo.lock index a26a0a365f..b6bf3838e9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3616,9 +3616,9 @@ dependencies = [ [[package]] name = "opensrv-mysql" -version = "0.1.0" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8bcb5fc2fda7e5e5f8478cd637285bbdd6196a9601e32293d0897e469a7dd020" +checksum = "e4c24c12fd688cb5aa5b1a54c6ccb2e30fb9b5132debb0e89fcb432b3f73db8f" dependencies = [ "async-trait", "byteorder", diff --git a/src/api/src/column_def.rs b/src/api/src/column_def.rs new file mode 100644 index 0000000000..131ad75764 --- /dev/null +++ b/src/api/src/column_def.rs @@ -0,0 +1,38 @@ +// Copyright 2022 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema}; +use snafu::ResultExt; + +use crate::error::{self, Result}; +use crate::helper::ColumnDataTypeWrapper; +use crate::v1::ColumnDef; + +impl ColumnDef { + pub fn try_as_column_schema(&self) -> Result { + let data_type = ColumnDataTypeWrapper::try_new(self.datatype)?; + + let constraint = match &self.default_constraint { + None => None, + Some(v) => Some( + ColumnDefaultConstraint::try_from(&v[..]) + .context(error::ConvertColumnDefaultConstraintSnafu { column: &self.name })?, + ), + }; + + ColumnSchema::new(&self.name, data_type.into(), self.is_nullable) + .with_default_constraint(constraint) + .context(error::InvalidColumnDefaultConstraintSnafu { column: &self.name }) + } +} diff --git a/src/api/src/error.rs b/src/api/src/error.rs index 2320e199d7..562ea5a818 100644 --- a/src/api/src/error.rs +++ b/src/api/src/error.rs @@ -33,6 +33,28 @@ pub enum Error { from: ConcreteDataType, backtrace: Backtrace, }, + + #[snafu(display( + "Failed to convert column default constraint, column: {}, source: {}", + column, + source + ))] + ConvertColumnDefaultConstraint { + column: String, + #[snafu(backtrace)] + source: datatypes::error::Error, + }, + + #[snafu(display( + "Invalid column default constraint, column: {}, source: {}", + column, + source + ))] + InvalidColumnDefaultConstraint { + column: String, + #[snafu(backtrace)] + source: datatypes::error::Error, + }, } impl ErrorExt for Error { @@ -40,6 +62,8 @@ impl ErrorExt for Error { match self { Error::UnknownColumnDataType { .. } => StatusCode::InvalidArguments, Error::IntoColumnDataType { .. } => StatusCode::Unexpected, + Error::ConvertColumnDefaultConstraint { source, .. } + | Error::InvalidColumnDefaultConstraint { source, .. } => source.status_code(), } } fn backtrace_opt(&self) -> Option<&Backtrace> { diff --git a/src/api/src/lib.rs b/src/api/src/lib.rs index d6c415d8cf..73aa6c4363 100644 --- a/src/api/src/lib.rs +++ b/src/api/src/lib.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod column_def; pub mod error; pub mod helper; pub mod prometheus; diff --git a/src/catalog/src/local/manager.rs b/src/catalog/src/local/manager.rs index 89fc62b6b9..ed6783c68f 100644 --- a/src/catalog/src/local/manager.rs +++ b/src/catalog/src/local/manager.rs @@ -241,6 +241,7 @@ impl LocalCatalogManager { schema_name: t.schema_name.clone(), table_name: t.table_name.clone(), table_id: t.table_id, + region_numbers: vec![0], }; let option = self diff --git a/src/catalog/src/remote/manager.rs b/src/catalog/src/remote/manager.rs index 5c4ddd680e..5369f6ce0d 100644 --- a/src/catalog/src/remote/manager.rs +++ b/src/catalog/src/remote/manager.rs @@ -316,11 +316,15 @@ impl RemoteCatalogManager { .. } = table_value; + // unwrap safety: checked in yielding this table when `iter_remote_tables` + let region_numbers = regions_id_map.get(&self.node_id).unwrap(); + let request = OpenTableRequest { catalog_name: catalog_name.clone(), schema_name: schema_name.clone(), table_name: table_name.clone(), table_id, + region_numbers: region_numbers.clone(), }; match self .engine @@ -361,7 +365,7 @@ impl RemoteCatalogManager { table_name: table_name.clone(), desc: None, schema: Arc::new(schema), - region_numbers: regions_id_map.get(&self.node_id).unwrap().clone(), // this unwrap is safe because region_id_map is checked in `iter_remote_tables` + region_numbers: region_numbers.clone(), primary_key_indices: meta.primary_key_indices.clone(), create_if_not_exists: true, table_options: meta.options.clone(), diff --git a/src/catalog/src/system.rs b/src/catalog/src/system.rs index 07084248f2..564acc7ba5 100644 --- a/src/catalog/src/system.rs +++ b/src/catalog/src/system.rs @@ -87,6 +87,7 @@ impl SystemCatalogTable { schema_name: INFORMATION_SCHEMA_NAME.to_string(), table_name: SYSTEM_CATALOG_TABLE_NAME.to_string(), table_id: SYSTEM_CATALOG_TABLE_ID, + region_numbers: vec![0], }; let schema = Arc::new(build_system_catalog_schema()); let ctx = EngineContext::default(); diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index 44d4d4c4f5..d386bfa64e 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -14,7 +14,7 @@ use clap::Parser; use common_telemetry::logging; -use datanode::datanode::{Datanode, DatanodeOptions}; +use datanode::datanode::{Datanode, DatanodeOptions, ObjectStoreConfig}; use meta_client::MetaClientOpts; use servers::Mode; use snafu::ResultExt; @@ -47,7 +47,7 @@ impl SubCommand { } } -#[derive(Debug, Parser)] +#[derive(Debug, Parser, Default)] struct StartCommand { #[clap(long)] node_id: Option, @@ -59,6 +59,10 @@ struct StartCommand { metasrv_addr: Option, #[clap(short, long)] config_file: Option, + #[clap(long)] + data_dir: Option, + #[clap(long)] + wal_dir: Option, } impl StartCommand { @@ -115,6 +119,14 @@ impl TryFrom for DatanodeOptions { } .fail(); } + + if let Some(data_dir) = cmd.data_dir { + opts.storage = ObjectStoreConfig::File { data_dir }; + } + + if let Some(wal_dir) = cmd.wal_dir { + opts.wal_dir = wal_dir; + } Ok(opts) } } @@ -131,14 +143,11 @@ mod tests { #[test] fn test_read_from_config_file() { let cmd = StartCommand { - node_id: None, - rpc_addr: None, - mysql_addr: None, - metasrv_addr: None, config_file: Some(format!( "{}/../../config/datanode.example.toml", std::env::current_dir().unwrap().as_path().to_str().unwrap() )), + ..Default::default() }; let options: DatanodeOptions = cmd.try_into().unwrap(); assert_eq!("127.0.0.1:3001".to_string(), options.rpc_addr); @@ -168,44 +177,30 @@ mod tests { fn test_try_from_cmd() { assert_eq!( Mode::Standalone, - DatanodeOptions::try_from(StartCommand { - node_id: None, - rpc_addr: None, - mysql_addr: None, - metasrv_addr: None, - config_file: None - }) - .unwrap() - .mode + DatanodeOptions::try_from(StartCommand::default()) + .unwrap() + .mode ); let mode = DatanodeOptions::try_from(StartCommand { node_id: Some(42), - rpc_addr: None, - mysql_addr: None, metasrv_addr: Some("127.0.0.1:3002".to_string()), - config_file: None, + ..Default::default() }) .unwrap() .mode; assert_matches!(mode, Mode::Distributed); assert!(DatanodeOptions::try_from(StartCommand { - node_id: None, - rpc_addr: None, - mysql_addr: None, metasrv_addr: Some("127.0.0.1:3002".to_string()), - config_file: None, + ..Default::default() }) .is_err()); // Providing node_id but leave metasrv_addr absent is ok since metasrv_addr has default value DatanodeOptions::try_from(StartCommand { node_id: Some(42), - rpc_addr: None, - mysql_addr: None, - metasrv_addr: None, - config_file: None, + ..Default::default() }) .unwrap(); } @@ -213,14 +208,11 @@ mod tests { #[test] fn test_merge_config() { let dn_opts = DatanodeOptions::try_from(StartCommand { - node_id: None, - rpc_addr: None, - mysql_addr: None, - metasrv_addr: None, config_file: Some(format!( "{}/../../config/datanode.example.toml", std::env::current_dir().unwrap().as_path().to_str().unwrap() )), + ..Default::default() }) .unwrap(); assert_eq!(Some(42), dn_opts.node_id); diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 5acb415900..a6ecd963a4 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -208,10 +208,15 @@ pub enum Error { source: api::error::Error, }, - #[snafu(display("Invalid column default constraint, source: {}", source))] - ColumnDefaultConstraint { + #[snafu(display( + "Invalid column proto definition, column: {}, source: {}", + column, + source + ))] + InvalidColumnDef { + column: String, #[snafu(backtrace)] - source: datatypes::error::Error, + source: api::error::Error, }, #[snafu(display("Failed to parse SQL, source: {}", source))] @@ -311,8 +316,7 @@ impl ErrorExt for Error { source.status_code() } - Error::ColumnDefaultConstraint { source, .. } - | Error::CreateSchema { source, .. } + Error::CreateSchema { source, .. } | Error::ConvertSchema { source, .. } | Error::VectorComputation { source } => source.status_code(), @@ -337,9 +341,12 @@ impl ErrorExt for Error { | Error::RegisterSchema { .. } | Error::IntoPhysicalPlan { .. } | Error::UnsupportedExpr { .. } - | Error::ColumnDataType { .. } | Error::Catalog { .. } => StatusCode::Internal, + Error::ColumnDataType { source } | Error::InvalidColumnDef { source, .. } => { + source.status_code() + } + Error::InitBackend { .. } => StatusCode::StorageUnavailable, Error::OpenLogStore { source } => source.status_code(), Error::StartScriptManager { source } => source.status_code(), diff --git a/src/datanode/src/server/grpc/ddl.rs b/src/datanode/src/server/grpc/ddl.rs index 1bb2c5d7f9..7a3980c6f6 100644 --- a/src/datanode/src/server/grpc/ddl.rs +++ b/src/datanode/src/server/grpc/ddl.rs @@ -14,23 +14,20 @@ use std::sync::Arc; -use api::helper::ColumnDataTypeWrapper; use api::result::AdminResultBuilder; use api::v1::alter_expr::Kind; -use api::v1::{AdminResult, AlterExpr, ColumnDef, CreateExpr, DropColumns}; +use api::v1::{AdminResult, AlterExpr, CreateExpr, DropColumns}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_error::prelude::{ErrorExt, StatusCode}; use common_query::Output; use common_telemetry::{error, info}; -use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, SchemaBuilder, SchemaRef}; +use datatypes::schema::{ColumnSchema, SchemaBuilder, SchemaRef}; use futures::TryFutureExt; use snafu::prelude::*; use table::metadata::TableId; use table::requests::{AddColumnRequest, AlterKind, AlterTableRequest, CreateTableRequest}; -use crate::error::{ - self, BumpTableIdSnafu, ColumnDefaultConstraintSnafu, MissingFieldSnafu, Result, -}; +use crate::error::{self, BumpTableIdSnafu, MissingFieldSnafu, Result}; use crate::instance::Instance; use crate::sql::SqlRequest; @@ -172,7 +169,12 @@ fn alter_expr_to_request(expr: AlterExpr) -> Result> { field: "column_def", })?; - let schema = create_column_schema(&column_def)?; + let schema = + column_def + .try_as_column_schema() + .context(error::InvalidColumnDefSnafu { + column: &column_def.name, + })?; add_column_requests.push(AddColumnRequest { column_schema: schema, is_key: add_column_expr.is_key, @@ -212,7 +214,10 @@ fn create_table_schema(expr: &CreateExpr) -> Result { let column_schemas = expr .column_defs .iter() - .map(create_column_schema) + .map(|x| { + x.try_as_column_schema() + .context(error::InvalidColumnDefSnafu { column: &x.name }) + }) .collect::>>()?; ensure!( @@ -243,28 +248,12 @@ fn create_table_schema(expr: &CreateExpr) -> Result { )) } -fn create_column_schema(column_def: &ColumnDef) -> Result { - let data_type = - ColumnDataTypeWrapper::try_new(column_def.datatype).context(error::ColumnDataTypeSnafu)?; - let default_constraint = match &column_def.default_constraint { - None => None, - Some(v) => { - Some(ColumnDefaultConstraint::try_from(&v[..]).context(ColumnDefaultConstraintSnafu)?) - } - }; - ColumnSchema::new( - column_def.name.clone(), - data_type.into(), - column_def.is_nullable, - ) - .with_default_constraint(default_constraint) - .context(ColumnDefaultConstraintSnafu) -} - #[cfg(test)] mod tests { + use api::v1::ColumnDef; use common_catalog::consts::MIN_USER_TABLE_ID; use datatypes::prelude::ConcreteDataType; + use datatypes::schema::ColumnDefaultConstraint; use datatypes::value::Value; use super::*; @@ -321,12 +310,11 @@ mod tests { is_nullable: true, default_constraint: None, }; - let result = create_column_schema(&column_def); - assert!(result.is_err()); - assert_eq!( - result.unwrap_err().to_string(), - "Column datatype error, source: Unknown proto column datatype: 1024" - ); + let result = column_def.try_as_column_schema(); + assert!(matches!( + result.unwrap_err(), + api::error::Error::UnknownColumnDataType { .. } + )); let column_def = ColumnDef { name: "a".to_string(), @@ -334,7 +322,7 @@ mod tests { is_nullable: true, default_constraint: None, }; - let column_schema = create_column_schema(&column_def).unwrap(); + let column_schema = column_def.try_as_column_schema().unwrap(); assert_eq!(column_schema.name, "a"); assert_eq!(column_schema.data_type, ConcreteDataType::string_datatype()); assert!(column_schema.is_nullable()); @@ -346,7 +334,7 @@ mod tests { is_nullable: true, default_constraint: Some(default_constraint.clone().try_into().unwrap()), }; - let column_schema = create_column_schema(&column_def).unwrap(); + let column_schema = column_def.try_as_column_schema().unwrap(); assert_eq!(column_schema.name, "a"); assert_eq!(column_schema.data_type, ConcreteDataType::string_datatype()); assert!(column_schema.is_nullable()); diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index e3a268a8de..9a23a2320a 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -65,6 +65,17 @@ pub enum Error { source: api::error::Error, }, + #[snafu(display( + "Invalid column proto definition, column: {}, source: {}", + column, + source + ))] + InvalidColumnDef { + column: String, + #[snafu(backtrace)] + source: api::error::Error, + }, + #[snafu(display( "Failed to convert column default constraint, column: {}, source: {}", column_name, @@ -452,8 +463,11 @@ impl ErrorExt for Error { | Error::RequestDatanode { source } | Error::InvalidAdminResult { source } => source.status_code(), - Error::ColumnDataType { .. } - | Error::FindDatanode { .. } + Error::ColumnDataType { source } | Error::InvalidColumnDef { source, .. } => { + source.status_code() + } + + Error::FindDatanode { .. } | Error::GetCache { .. } | Error::FindTableRoutes { .. } | Error::SerializeJson { .. } diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index 689b957c96..f3b133096f 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -24,7 +24,7 @@ use common_catalog::{SchemaKey, SchemaValue, TableGlobalKey, TableGlobalValue}; use common_query::Output; use common_telemetry::{debug, info}; use datatypes::prelude::ConcreteDataType; -use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, RawSchema}; +use datatypes::schema::RawSchema; use meta_client::client::MetaClient; use meta_client::rpc::{ CreateRequest as MetaCreateRequest, Partition as MetaPartition, PutRequest, RouteResponse, @@ -42,8 +42,8 @@ use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType}; use crate::catalog::FrontendCatalogManager; use crate::datanode::DatanodeClients; use crate::error::{ - self, CatalogEntrySerdeSnafu, ColumnDataTypeSnafu, ConvertColumnDefaultConstraintSnafu, - PrimaryKeyNotFoundSnafu, RequestMetaSnafu, Result, StartMetaClientSnafu, + self, CatalogEntrySerdeSnafu, ColumnDataTypeSnafu, PrimaryKeyNotFoundSnafu, RequestMetaSnafu, + Result, StartMetaClientSnafu, }; use crate::partitioning::{PartitionBound, PartitionDef}; @@ -229,17 +229,40 @@ fn create_table_global_value( let node_id = region_routes[0] .leader_peer .as_ref() - .context(error::FindLeaderPeerSnafu { + .with_context(|| error::FindLeaderPeerSnafu { region: region_routes[0].region.id, table_name: table_name.to_string(), })? .id; + let mut regions_id_map = HashMap::new(); + for route in region_routes.iter() { + let node_id = route + .leader_peer + .as_ref() + .with_context(|| error::FindLeaderPeerSnafu { + region: route.region.id, + table_name: table_name.to_string(), + })? + .id; + regions_id_map + .entry(node_id) + .or_insert_with(Vec::new) + .push(route.region.id as u32); + } + let mut column_schemas = Vec::with_capacity(create_table.column_defs.len()); let mut column_name_to_index_map = HashMap::new(); for (idx, column) in create_table.column_defs.iter().enumerate() { - column_schemas.push(create_column_schema(column)?); + let schema = column + .try_as_column_schema() + .context(error::InvalidColumnDefSnafu { + column: &column.name, + })?; + let schema = schema.with_time_index(column.name == create_table.time_index); + + column_schemas.push(schema); column_name_to_index_map.insert(column.name.clone(), idx); } @@ -291,34 +314,11 @@ fn create_table_global_value( Ok(TableGlobalValue { node_id, - regions_id_map: HashMap::new(), + regions_id_map, table_info, }) } -// Remove this duplication in the future -fn create_column_schema(column_def: &api::v1::ColumnDef) -> Result { - let data_type = - ColumnDataTypeWrapper::try_new(column_def.datatype).context(error::ColumnDataTypeSnafu)?; - let default_constraint = match &column_def.default_constraint { - None => None, - Some(v) => Some(ColumnDefaultConstraint::try_from(&v[..]).context( - ConvertColumnDefaultConstraintSnafu { - column_name: &column_def.name, - }, - )?), - }; - ColumnSchema::new( - column_def.name.clone(), - data_type.into(), - column_def.is_nullable, - ) - .with_default_constraint(default_constraint) - .context(ConvertColumnDefaultConstraintSnafu { - column_name: &column_def.name, - }) -} - fn parse_partitions( create_table: &CreateExpr, partitions: Option, diff --git a/src/mito/src/engine.rs b/src/mito/src/engine.rs index 7fbc6479fa..627ef5bdf4 100644 --- a/src/mito/src/engine.rs +++ b/src/mito/src/engine.rs @@ -404,7 +404,8 @@ impl MitoEngineInner { let table_id = request.table_id; // TODO(dennis): supports multi regions; - let region_number = 0; + assert_eq!(request.region_numbers.len(), 1); + let region_number = request.region_numbers[0]; let region_name = region_name(table_id, region_number); let region = match self @@ -804,6 +805,7 @@ mod tests { table_name: test_util::TABLE_NAME.to_string(), // the test table id is 1 table_id: 1, + region_numbers: vec![0], }; let (engine, table, object_store, _dir) = { diff --git a/src/query/src/optimizer.rs b/src/query/src/optimizer.rs index e8841f95f1..cee0c2727a 100644 --- a/src/query/src/optimizer.rs +++ b/src/query/src/optimizer.rs @@ -15,7 +15,6 @@ use std::str::FromStr; use std::sync::Arc; -use common_telemetry::debug; use common_time::timestamp::{TimeUnit, Timestamp}; use datafusion::execution::context::ExecutionProps; use datafusion::logical_plan::plan::Filter; @@ -169,10 +168,6 @@ impl<'a> TypeConverter<'a> { match (left, right) { (Expr::Column(col), Expr::Literal(value)) => { let casted_right = Self::cast_scalar_value(value, left_type)?; - debug!( - "Converting type, origin_left:{:?}, type:{:?}, right:{:?}, casted_right:{:?}", - col, left_type, value, casted_right - ); if casted_right.is_null() { return Err(DataFusionError::Plan(format!( "column:{:?} value:{:?} is invalid", diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index baf8845bd7..5a74c223fa 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -29,7 +29,7 @@ metrics = "0.20" num_cpus = "1.13" once_cell = "1.16" openmetrics-parser = "0.4" -opensrv-mysql = "0.1" +opensrv-mysql = "0.2" pgwire = "0.5" prost = "0.11" regex = "1.6" diff --git a/src/servers/src/mysql/handler.rs b/src/servers/src/mysql/handler.rs index ce77d952cc..c1614377a7 100644 --- a/src/servers/src/mysql/handler.rs +++ b/src/servers/src/mysql/handler.rs @@ -12,15 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::io; use std::sync::Arc; +use std::time::Instant; use async_trait::async_trait; -use common_telemetry::error; +use common_telemetry::{debug, error}; use opensrv_mysql::{ AsyncMysqlShim, ErrorKind, ParamParser, QueryResultWriter, StatementMetaWriter, }; use rand::RngCore; +use tokio::io::AsyncWrite; use tokio::sync::RwLock; use crate::context::AuthHashMethod::DoubleSha1; @@ -63,7 +64,7 @@ impl MysqlInstanceShim { } #[async_trait] -impl AsyncMysqlShim for MysqlInstanceShim { +impl AsyncMysqlShim for MysqlInstanceShim { type Error = error::Error; fn salt(&self) -> [u8; 20] { @@ -108,15 +109,12 @@ impl AsyncMysqlShim for MysqlInstanceShim { }; } - async fn on_prepare<'a>( - &'a mut self, - _: &'a str, - writer: StatementMetaWriter<'a, W>, - ) -> Result<()> { - writer.error( + async fn on_prepare<'a>(&'a mut self, _: &'a str, w: StatementMetaWriter<'a, W>) -> Result<()> { + w.error( ErrorKind::ER_UNKNOWN_ERROR, - "prepare statement is not supported yet".as_bytes(), - )?; + b"prepare statement is not supported yet", + ) + .await?; Ok(()) } @@ -124,12 +122,13 @@ impl AsyncMysqlShim for MysqlInstanceShim { &'a mut self, _: u32, _: ParamParser<'a>, - writer: QueryResultWriter<'a, W>, + w: QueryResultWriter<'a, W>, ) -> Result<()> { - writer.error( + w.error( ErrorKind::ER_UNKNOWN_ERROR, - "prepare statement is not supported yet".as_bytes(), - )?; + b"prepare statement is not supported yet", + ) + .await?; Ok(()) } @@ -145,6 +144,9 @@ impl AsyncMysqlShim for MysqlInstanceShim { query: &'a str, writer: QueryResultWriter<'a, W>, ) -> Result<()> { + debug!("Start executing query: '{}'", query); + let start = Instant::now(); + // TODO(LFC): Find a better way: // `check` uses regex to filter out unsupported statements emitted by MySQL's federated // components, this is quick and dirty, there must be a better way to do it. @@ -154,7 +156,13 @@ impl AsyncMysqlShim for MysqlInstanceShim { self.query_handler.do_query(query).await }; + debug!( + "Finished executing query: '{}', total time costs in microseconds: {}", + query, + start.elapsed().as_micros() + ); + let mut writer = MysqlResultWriter::new(writer); - writer.write(output).await + writer.write(query, output).await } } diff --git a/src/servers/src/mysql/server.rs b/src/servers/src/mysql/server.rs index e8064d20f1..f66669303c 100644 --- a/src/servers/src/mysql/server.rs +++ b/src/servers/src/mysql/server.rs @@ -22,6 +22,7 @@ use common_telemetry::logging::{error, info}; use futures::StreamExt; use opensrv_mysql::AsyncMysqlIntermediary; use tokio; +use tokio::io::BufWriter; use tokio::net::TcpStream; use crate::error::Result; @@ -29,6 +30,9 @@ use crate::mysql::handler::MysqlInstanceShim; use crate::query_handler::SqlQueryHandlerRef; use crate::server::{AbortableStream, BaseTcpServer, Server}; +// Default size of ResultSet write buffer: 100KB +const DEFAULT_RESULT_SET_WRITE_BUFFER_SIZE: usize = 100 * 1024; + pub struct MysqlServer { base_server: BaseTcpServer, query_handler: SqlQueryHandlerRef, @@ -58,7 +62,8 @@ impl MysqlServer { match tcp_stream { Err(error) => error!("Broken pipe: {}", error), // IoError doesn't impl ErrorExt. Ok(io_stream) => { - if let Err(error) = Self::handle(io_stream, io_runtime, query_handler) { + if let Err(error) = Self::handle(io_stream, io_runtime, query_handler).await + { error!(error; "Unexpected error when handling TcpStream"); }; } @@ -67,15 +72,30 @@ impl MysqlServer { }) } - pub fn handle( + async fn handle( stream: TcpStream, io_runtime: Arc, query_handler: SqlQueryHandlerRef, ) -> Result<()> { info!("MySQL connection coming from: {}", stream.peer_addr()?); let shim = MysqlInstanceShim::create(query_handler, stream.peer_addr()?.to_string()); - // TODO(LFC): Relate "handler" with MySQL session; also deal with panics there. - let _handler = io_runtime.spawn(AsyncMysqlIntermediary::run_on(shim, stream)); + + let (r, w) = stream.into_split(); + let w = BufWriter::with_capacity(DEFAULT_RESULT_SET_WRITE_BUFFER_SIZE, w); + // TODO(LFC): Use `output_stream` to write large MySQL ResultSet to client. + let spawn_result = io_runtime + .spawn(AsyncMysqlIntermediary::run_on(shim, r, w)) + .await; + match spawn_result { + Ok(run_result) => { + if let Err(e) = run_result { + // TODO(LFC): Write this error and the below one to client as well, in MySQL text protocol. + // Looks like we have to expose opensrv-mysql's `PacketWriter`? + error!(e; "Internal error occurred during query exec, server actively close the channel to let client try next time.") + } + } + Err(e) => error!("IO runtime cannot execute task, error: {}", e), + } Ok(()) } } diff --git a/src/servers/src/mysql/writer.rs b/src/servers/src/mysql/writer.rs index 66e4278687..5a0373fbc3 100644 --- a/src/servers/src/mysql/writer.rs +++ b/src/servers/src/mysql/writer.rs @@ -12,11 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::io; use std::ops::Deref; use common_query::Output; use common_recordbatch::{util, RecordBatch}; +use common_telemetry::error; use common_time::datetime::DateTime; use common_time::timestamp::TimeUnit; use datatypes::prelude::{ConcreteDataType, Value}; @@ -25,6 +25,7 @@ use opensrv_mysql::{ Column, ColumnFlags, ColumnType, ErrorKind, OkResponse, QueryResultWriter, RowWriter, }; use snafu::prelude::*; +use tokio::io::AsyncWrite; use crate::error::{self, Error, Result}; @@ -33,18 +34,18 @@ struct QueryResult { schema: SchemaRef, } -pub struct MysqlResultWriter<'a, W: io::Write> { +pub struct MysqlResultWriter<'a, W: AsyncWrite + Unpin> { // `QueryResultWriter` will be consumed when the write completed (see // QueryResultWriter::completed), thus we use an option to wrap it. inner: Option>, } -impl<'a, W: io::Write> MysqlResultWriter<'a, W> { +impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> { pub fn new(inner: QueryResultWriter<'a, W>) -> MysqlResultWriter<'a, W> { MysqlResultWriter::<'a, W> { inner: Some(inner) } } - pub async fn write(&mut self, output: Result) -> Result<()> { + pub async fn write(&mut self, query: &str, output: Result) -> Result<()> { let writer = self.inner.take().context(error::InternalSnafu { err_msg: "inner MySQL writer is consumed", })?; @@ -59,48 +60,53 @@ impl<'a, W: io::Write> MysqlResultWriter<'a, W> { recordbatches, schema, }; - Self::write_query_result(query_result, writer)? + Self::write_query_result(query, query_result, writer).await? } Output::RecordBatches(recordbatches) => { let query_result = QueryResult { schema: recordbatches.schema(), recordbatches: recordbatches.take(), }; - Self::write_query_result(query_result, writer)? + Self::write_query_result(query, query_result, writer).await? } - Output::AffectedRows(rows) => Self::write_affected_rows(writer, rows)?, + Output::AffectedRows(rows) => Self::write_affected_rows(writer, rows).await?, }, - Err(error) => Self::write_query_error(error, writer)?, + Err(error) => Self::write_query_error(query, error, writer).await?, } Ok(()) } - fn write_affected_rows(writer: QueryResultWriter, rows: usize) -> Result<()> { - writer.completed(OkResponse { + async fn write_affected_rows(w: QueryResultWriter<'a, W>, rows: usize) -> Result<()> { + w.completed(OkResponse { affected_rows: rows as u64, ..Default::default() - })?; + }) + .await?; Ok(()) } - fn write_query_result( + async fn write_query_result( + query: &str, query_result: QueryResult, writer: QueryResultWriter<'a, W>, ) -> Result<()> { match create_mysql_column_def(&query_result.schema) { Ok(column_def) => { - let mut row_writer = writer.start(&column_def)?; + let mut row_writer = writer.start(&column_def).await?; for recordbatch in &query_result.recordbatches { - Self::write_recordbatch(&mut row_writer, recordbatch)?; + Self::write_recordbatch(&mut row_writer, recordbatch).await?; } - row_writer.finish()?; + row_writer.finish().await?; Ok(()) } - Err(error) => Self::write_query_error(error, writer), + Err(error) => Self::write_query_error(query, error, writer).await, } } - fn write_recordbatch(row_writer: &mut RowWriter, recordbatch: &RecordBatch) -> Result<()> { + async fn write_recordbatch( + row_writer: &mut RowWriter<'_, W>, + recordbatch: &RecordBatch, + ) -> Result<()> { for row in recordbatch.rows() { let row = row.context(error::CollectRecordbatchSnafu)?; for value in row.into_iter() { @@ -133,13 +139,20 @@ impl<'a, W: io::Write> MysqlResultWriter<'a, W> { } } } - row_writer.end_row()?; + row_writer.end_row().await?; } Ok(()) } - fn write_query_error(error: Error, writer: QueryResultWriter<'a, W>) -> Result<()> { - writer.error(ErrorKind::ER_INTERNAL_ERROR, error.to_string().as_bytes())?; + async fn write_query_error( + query: &str, + error: Error, + w: QueryResultWriter<'a, W>, + ) -> Result<()> { + error!(error; "Failed to execute query '{}'", query); + + let kind = ErrorKind::ER_INTERNAL_ERROR; + w.error(kind, error.to_string().as_bytes()).await?; Ok(()) } } diff --git a/src/servers/tests/mysql/mysql_server_test.rs b/src/servers/tests/mysql/mysql_server_test.rs index ce351fce6b..ba82e8d68f 100644 --- a/src/servers/tests/mysql/mysql_server_test.rs +++ b/src/servers/tests/mysql/mysql_server_test.rs @@ -184,6 +184,7 @@ async fn test_query_concurrently() -> Result<()> { let should_recreate_conn = expected == 1; if should_recreate_conn { + connection.disconnect().await.unwrap(); connection = create_connection(server_port, index % 2 == 0) .await .unwrap(); diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index a921cf3bd2..bc0b1a8e34 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -17,6 +17,7 @@ use std::collections::HashMap; use datatypes::prelude::VectorRef; use datatypes::schema::{ColumnSchema, SchemaRef}; +use store_api::storage::RegionNumber; use crate::metadata::TableId; @@ -56,6 +57,7 @@ pub struct OpenTableRequest { pub schema_name: String, pub table_name: String, pub table_id: TableId, + pub region_numbers: Vec, } /// Alter table request