From 7dde9ce3ceb2460f1f492c8b0ee69ebee753f1ab Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Tue, 5 Sep 2023 11:23:54 +0800 Subject: [PATCH] feat(frontend): migrate insert to region server (#2318) * feat(frontend): migrate insert to region server Signed-off-by: Zhenchi * refactor: move converter to Inserter Signed-off-by: Zhenchi * chore: rename convert function Signed-off-by: Zhenchi * fix: address comments Signed-off-by: Zhenchi * fix: address comments Signed-off-by: Zhenchi * fix: add span id Signed-off-by: Zhenchi * fix: compilation Signed-off-by: Zhenchi * retrigger action * retrigger action --------- Signed-off-by: Zhenchi --- src/api/src/helper.rs | 135 ++++++- src/client/src/region.rs | 34 +- src/cmd/src/standalone.rs | 7 +- src/common/meta/src/datanode_manager.rs | 4 +- src/common/meta/src/ddl/create_table.rs | 11 +- src/common/meta/src/ddl/drop_table.rs | 20 +- src/datanode/src/datanode.rs | 10 +- src/frontend/src/error.rs | 20 +- src/frontend/src/inserter.rs | 247 +++++++++++- src/frontend/src/instance.rs | 22 +- src/frontend/src/instance/distributed.rs | 122 +++--- .../src/instance/distributed/deleter.rs | 4 +- .../src/instance/distributed/inserter.rs | 353 +++++++++--------- .../src/instance/distributed/row_inserter.rs | 125 ------- src/frontend/src/instance/region_handler.rs | 32 ++ src/frontend/src/instance/standalone.rs | 34 +- src/frontend/src/statement.rs | 71 ++-- src/frontend/src/statement/copy_table_from.rs | 26 +- src/frontend/src/statement/dml.rs | 4 +- src/frontend/src/table/insert.rs | 99 +---- src/partition/src/manager.rs | 26 +- src/partition/src/row_splitter.rs | 40 +- 22 files changed, 837 insertions(+), 609 deletions(-) delete mode 100644 src/frontend/src/instance/distributed/row_inserter.rs create mode 100644 src/frontend/src/instance/region_handler.rs diff --git a/src/api/src/helper.rs b/src/api/src/helper.rs index cc2134302b..6a426084e8 100644 --- a/src/api/src/helper.rs +++ b/src/api/src/helper.rs @@ -33,17 +33,16 @@ use datatypes::vectors::{ TimestampMillisecondVector, TimestampNanosecondVector, TimestampSecondVector, UInt32Vector, UInt64Vector, VectorRef, }; -use greptime_proto::v1; use greptime_proto::v1::ddl_request::Expr; use greptime_proto::v1::greptime_request::Request; use greptime_proto::v1::query_request::Query; use greptime_proto::v1::value::ValueData; -use greptime_proto::v1::{DdlRequest, IntervalMonthDayNano, QueryRequest, SemanticType}; +use greptime_proto::v1::{self, DdlRequest, IntervalMonthDayNano, QueryRequest, Row, SemanticType}; use snafu::prelude::*; use crate::error::{self, Result}; use crate::v1::column::Values; -use crate::v1::{Column, ColumnDataType}; +use crate::v1::{Column, ColumnDataType, Value as GrpcValue}; #[derive(Debug, PartialEq, Eq)] pub struct ColumnDataTypeWrapper(ColumnDataType); @@ -804,6 +803,59 @@ pub fn to_column_data_type(data_type: &ConcreteDataType) -> Option( + columns: impl Iterator, + row_count: usize, +) -> Vec { + let mut rows = vec![Row { values: vec![] }; row_count]; + for column in columns { + for (row_index, row) in rows.iter_mut().enumerate() { + row.values.push(GrpcValue { + value_data: match column.get(row_index) { + Value::Null => None, + Value::Boolean(v) => Some(ValueData::BoolValue(v)), + Value::UInt8(v) => Some(ValueData::U8Value(v as _)), + Value::UInt16(v) => Some(ValueData::U16Value(v as _)), + Value::UInt32(v) => Some(ValueData::U32Value(v)), + Value::UInt64(v) => Some(ValueData::U64Value(v)), + Value::Int8(v) => Some(ValueData::I8Value(v as _)), + Value::Int16(v) => Some(ValueData::I16Value(v as _)), + Value::Int32(v) => Some(ValueData::I32Value(v)), + Value::Int64(v) => Some(ValueData::I64Value(v)), + Value::Float32(v) => Some(ValueData::F32Value(*v)), + Value::Float64(v) => Some(ValueData::F64Value(*v)), + Value::String(v) => Some(ValueData::StringValue(v.as_utf8().to_string())), + Value::Binary(v) => Some(ValueData::BinaryValue(v.to_vec())), + Value::Date(v) => Some(ValueData::DateValue(v.val())), + Value::DateTime(v) => Some(ValueData::DatetimeValue(v.val())), + Value::Timestamp(v) => Some(match v.unit() { + TimeUnit::Second => ValueData::TimeSecondValue(v.value()), + TimeUnit::Millisecond => ValueData::TimeMillisecondValue(v.value()), + TimeUnit::Microsecond => ValueData::TimeMicrosecondValue(v.value()), + TimeUnit::Nanosecond => ValueData::TimeNanosecondValue(v.value()), + }), + Value::Time(v) => Some(match v.unit() { + TimeUnit::Second => ValueData::TimeSecondValue(v.value()), + TimeUnit::Millisecond => ValueData::TimeMillisecondValue(v.value()), + TimeUnit::Microsecond => ValueData::TimeMicrosecondValue(v.value()), + TimeUnit::Nanosecond => ValueData::TimeNanosecondValue(v.value()), + }), + Value::Interval(v) => Some(match v.unit() { + IntervalUnit::YearMonth => ValueData::IntervalYearMonthValues(v.to_i32()), + IntervalUnit::DayTime => ValueData::IntervalDayTimeValues(v.to_i64()), + IntervalUnit::MonthDayNano => ValueData::IntervalMonthDayNanoValues( + convert_i128_to_interval(v.to_i128()), + ), + }), + Value::List(_) => unreachable!(), + }, + }) + } + } + + rows +} + /// Returns true if the column type is equal to expected type. fn is_column_type_eq(column_type: ColumnDataType, expect_type: &ConcreteDataType) -> bool { if let Some(expect) = to_column_data_type(expect_type) { @@ -818,8 +870,9 @@ mod tests { use std::sync::Arc; use datatypes::types::{ - IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType, TimeMillisecondType, - TimeSecondType, TimestampMillisecondType, TimestampSecondType, + Int32Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType, + TimeMillisecondType, TimeSecondType, TimestampMillisecondType, TimestampSecondType, + UInt32Type, }; use datatypes::vectors::{ BooleanVector, IntervalDayTimeVector, IntervalMonthDayNanoVector, IntervalYearMonthVector, @@ -1522,4 +1575,76 @@ mod tests { Value::DateTime(3.into()) ] ); + + #[test] + fn test_vectors_to_rows_for_different_types() { + let boolean_vec = BooleanVector::from_vec(vec![true, false, true]); + let int8_vec = PrimitiveVector::::from_iter_values(vec![1, 2, 3]); + let int32_vec = PrimitiveVector::::from_iter_values(vec![100, 200, 300]); + let uint8_vec = PrimitiveVector::::from_iter_values(vec![10, 20, 30]); + let uint32_vec = PrimitiveVector::::from_iter_values(vec![1000, 2000, 3000]); + let float32_vec = Float32Vector::from_vec(vec![1.1, 2.2, 3.3]); + let date_vec = DateVector::from_vec(vec![10, 20, 30]); + let string_vec = StringVector::from_vec(vec!["a", "b", "c"]); + + let vector_refs: Vec = vec![ + Arc::new(boolean_vec), + Arc::new(int8_vec), + Arc::new(int32_vec), + Arc::new(uint8_vec), + Arc::new(uint32_vec), + Arc::new(float32_vec), + Arc::new(date_vec), + Arc::new(string_vec), + ]; + + let result = vectors_to_rows(vector_refs.iter(), 3); + + assert_eq!(result.len(), 3); + + assert_eq!(result[0].values.len(), 8); + let values = result[0] + .values + .iter() + .map(|v| v.value_data.clone().unwrap()) + .collect::>(); + assert_eq!(values[0], ValueData::BoolValue(true)); + assert_eq!(values[1], ValueData::I8Value(1)); + assert_eq!(values[2], ValueData::I32Value(100)); + assert_eq!(values[3], ValueData::U8Value(10)); + assert_eq!(values[4], ValueData::U32Value(1000)); + assert_eq!(values[5], ValueData::F32Value(1.1)); + assert_eq!(values[6], ValueData::DateValue(10)); + assert_eq!(values[7], ValueData::StringValue("a".to_string())); + + assert_eq!(result[1].values.len(), 8); + let values = result[1] + .values + .iter() + .map(|v| v.value_data.clone().unwrap()) + .collect::>(); + assert_eq!(values[0], ValueData::BoolValue(false)); + assert_eq!(values[1], ValueData::I8Value(2)); + assert_eq!(values[2], ValueData::I32Value(200)); + assert_eq!(values[3], ValueData::U8Value(20)); + assert_eq!(values[4], ValueData::U32Value(2000)); + assert_eq!(values[5], ValueData::F32Value(2.2)); + assert_eq!(values[6], ValueData::DateValue(20)); + assert_eq!(values[7], ValueData::StringValue("b".to_string())); + + assert_eq!(result[2].values.len(), 8); + let values = result[2] + .values + .iter() + .map(|v| v.value_data.clone().unwrap()) + .collect::>(); + assert_eq!(values[0], ValueData::BoolValue(true)); + assert_eq!(values[1], ValueData::I8Value(3)); + assert_eq!(values[2], ValueData::I32Value(300)); + assert_eq!(values[3], ValueData::U8Value(30)); + assert_eq!(values[4], ValueData::U32Value(3000)); + assert_eq!(values[5], ValueData::F32Value(3.3)); + assert_eq!(values[6], ValueData::DateValue(30)); + assert_eq!(values[7], ValueData::StringValue("c".to_string())); + } } diff --git a/src/client/src/region.rs b/src/client/src/region.rs index 2ebb80ae55..4c325a0a56 100644 --- a/src/client/src/region.rs +++ b/src/client/src/region.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use api::v1::region::{region_request, RegionRequest, RegionRequestHeader, RegionResponse}; +use api::v1::region::{RegionRequest, RegionResponse}; use api::v1::ResponseHeader; use async_trait::async_trait; use common_error::ext::BoxedError; @@ -23,19 +23,17 @@ use common_telemetry::timer; use snafu::{location, Location, OptionExt}; use crate::error::Error::FlightGet; -use crate::error::{IllegalDatabaseResponseSnafu, Result, ServerSnafu}; +use crate::error::{IllegalDatabaseResponseSnafu, MissingFieldSnafu, Result, ServerSnafu}; use crate::{metrics, Client}; #[derive(Debug)] pub struct RegionRequester { - trace_id: u64, - span_id: u64, client: Client, } #[async_trait] impl Datanode for RegionRequester { - async fn handle(&self, request: region_request::Body) -> MetaResult { + async fn handle(&self, request: RegionRequest) -> MetaResult { self.handle_inner(request).await.map_err(|err| { if matches!(err, FlightGet { .. }) { meta_error::Error::RetryLater { @@ -53,24 +51,16 @@ impl Datanode for RegionRequester { impl RegionRequester { pub fn new(client: Client) -> Self { - // TODO(LFC): Pass in trace_id and span_id from some context when we have it. - Self { - trace_id: 0, - span_id: 0, - client, - } + Self { client } } - async fn handle_inner(&self, request: region_request::Body) -> Result { - let request_type = request.as_ref().to_string(); - - let request = RegionRequest { - header: Some(RegionRequestHeader { - trace_id: self.trace_id, - span_id: self.span_id, - }), - body: Some(request), - }; + async fn handle_inner(&self, request: RegionRequest) -> Result { + let request_type = request + .body + .as_ref() + .with_context(|| MissingFieldSnafu { field: "body" })? + .as_ref() + .to_string(); let _timer = timer!( metrics::METRIC_REGION_REQUEST_GRPC, @@ -89,7 +79,7 @@ impl RegionRequester { Ok(affected_rows) } - pub async fn handle(&self, request: region_request::Body) -> Result { + pub async fn handle(&self, request: RegionRequest) -> Result { self.handle_inner(request).await } } diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index c373524516..d91ea42beb 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -21,6 +21,7 @@ use common_telemetry::info; use common_telemetry::logging::LoggingOptions; use datanode::datanode::{Datanode, DatanodeOptions, ProcedureConfig, StorageConfig}; use datanode::instance::InstanceRef; +use datanode::region_server::RegionServer; use frontend::frontend::FrontendOptions; use frontend::instance::{FrontendInstance, Instance as FeInstance}; use frontend::service_config::{ @@ -293,7 +294,8 @@ impl StartCommand { .context(StartDatanodeSnafu)?; // TODO: build frontend instance like in distributed mode - let mut frontend = build_frontend(plugins.clone(), todo!()).await?; + let mut frontend = + build_frontend(plugins.clone(), todo!(), datanode.region_server()).await?; frontend .build_servers(&fe_opts) @@ -308,8 +310,9 @@ impl StartCommand { async fn build_frontend( plugins: Arc, datanode_instance: InstanceRef, + region_server: RegionServer, ) -> Result { - let mut frontend_instance = FeInstance::try_new_standalone(datanode_instance.clone()) + let mut frontend_instance = FeInstance::try_new_standalone(datanode_instance, region_server) .await .context(StartFrontendSnafu)?; frontend_instance.set_plugins(plugins.clone()); diff --git a/src/common/meta/src/datanode_manager.rs b/src/common/meta/src/datanode_manager.rs index 18e70f94c9..93733664ad 100644 --- a/src/common/meta/src/datanode_manager.rs +++ b/src/common/meta/src/datanode_manager.rs @@ -14,7 +14,7 @@ use std::sync::Arc; -use api::v1::region::region_request; +use api::v1::region::RegionRequest; use crate::error::Result; use crate::peer::Peer; @@ -24,7 +24,7 @@ pub type AffectedRows = u64; #[async_trait::async_trait] pub trait Datanode: Send + Sync { /// Handles DML, and DDL requests. - async fn handle(&self, request: region_request::Body) -> Result; + async fn handle(&self, request: RegionRequest) -> Result; } pub type DatanodeRef = Arc; diff --git a/src/common/meta/src/ddl/create_table.rs b/src/common/meta/src/ddl/create_table.rs index e4c2c68258..82f2903a7a 100644 --- a/src/common/meta/src/ddl/create_table.rs +++ b/src/common/meta/src/ddl/create_table.rs @@ -13,7 +13,9 @@ // limitations under the License. use api::v1::region::region_request::Body as PbRegionRequest; -use api::v1::region::{ColumnDef, CreateRequest as PbCreateRegionRequest}; +use api::v1::region::{ + ColumnDef, CreateRequest as PbCreateRegionRequest, RegionRequest, RegionRequestHeader, +}; use api::v1::SemanticType; use async_trait::async_trait; use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu}; @@ -197,6 +199,13 @@ impl CreateTableProcedure { for request in requests { let requester = manager.datanode(&datanode).await; + let request = RegionRequest { + header: Some(RegionRequestHeader { + trace_id: 0, + span_id: 0, + }), + body: Some(request), + }; if let Err(err) = requester.handle(request).await { return Err(handle_operate_region_error(datanode)(err)); } diff --git a/src/common/meta/src/ddl/drop_table.rs b/src/common/meta/src/ddl/drop_table.rs index d0f38617d4..f6dfb72262 100644 --- a/src/common/meta/src/ddl/drop_table.rs +++ b/src/common/meta/src/ddl/drop_table.rs @@ -12,7 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use api::v1::region::{region_request, DropRequest as PbDropRegionRequest}; +use api::v1::region::{ + region_request, DropRequest as PbDropRegionRequest, RegionRequest, RegionRequestHeader, +}; use async_trait::async_trait; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; @@ -162,13 +164,17 @@ impl DropTableProcedure { for region_id in region_ids { debug!("Dropping region {region_id} on Datanode {datanode:?}"); - let request = region_request::Body::Drop(PbDropRegionRequest { - region_id: region_id.as_u64(), - }); + let request = RegionRequest { + header: Some(RegionRequestHeader { + trace_id: 0, + span_id: 0, + }), + body: Some(region_request::Body::Drop(PbDropRegionRequest { + region_id: region_id.as_u64(), + })), + }; - let requester = clients.datanode(&datanode).await; - - if let Err(err) = requester.handle(request).await { + if let Err(err) = clients.datanode(&datanode).await.handle(request).await { if err.status_code() != StatusCode::RegionNotFound { return Err(handle_operate_region_error(datanode)(err)); } diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index d9ff9756c6..ca25f45d10 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -398,6 +398,7 @@ pub struct Datanode { opts: DatanodeOptions, services: Option, heartbeat_task: Option, + region_server: RegionServer, } impl Datanode { @@ -434,7 +435,9 @@ impl Datanode { Mode::Standalone => None, }; let heartbeat_task = match opts.mode { - Mode::Distributed => Some(HeartbeatTask::try_new(&opts, Some(region_server)).await?), + Mode::Distributed => { + Some(HeartbeatTask::try_new(&opts, Some(region_server.clone())).await?) + } Mode::Standalone => None, }; @@ -442,6 +445,7 @@ impl Datanode { opts, services, heartbeat_task, + region_server, }) } @@ -483,6 +487,10 @@ impl Datanode { Ok(()) } + pub fn region_server(&self) -> RegionServer { + self.region_server.clone() + } + // internal utils /// Build [RaftEngineLogStore] diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index 84577d5fcd..dc3c6ebefc 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -57,6 +57,12 @@ pub enum Error { source: client::Error, }, + #[snafu(display("Failed to insert data, source: {}", source))] + RequestInserts { + #[snafu(backtrace)] + source: common_meta::error::Error, + }, + #[snafu(display("Runtime resource error, source: {}", source))] RuntimeResource { #[snafu(backtrace)] @@ -189,12 +195,12 @@ pub enum Error { }, #[snafu(display( - "Failed to find table route for table {}, source: {}", - table_name, + "Failed to find table route for table id {}, source: {}", + table_id, source ))] FindTableRoute { - table_name: String, + table_id: u32, #[snafu(backtrace)] source: partition::error::Error, }, @@ -325,6 +331,12 @@ pub enum Error { source: datanode::error::Error, }, + #[snafu(display("{source}"))] + InvokeRegionServer { + #[snafu(backtrace)] + source: servers::error::Error, + }, + #[snafu(display("Missing meta_client_options section in config"))] MissingMetasrvOpts { location: Location }, @@ -674,6 +686,7 @@ impl ErrorExt for Error { | Error::IntoVectors { source } => source.status_code(), Error::RequestDatanode { source } => source.status_code(), + Error::RequestInserts { source } => source.status_code(), Error::ColumnDataType { source } | Error::InvalidColumnDef { source, .. } => { source.status_code() @@ -725,6 +738,7 @@ impl ErrorExt for Error { Error::TableAlreadyExist { .. } => StatusCode::TableAlreadyExists, Error::EncodeSubstraitLogicalPlan { source } => source.status_code(), Error::InvokeDatanode { source } => source.status_code(), + Error::InvokeRegionServer { source } => source.status_code(), Error::External { source } => source.status_code(), Error::DeserializePartition { source, .. } diff --git a/src/frontend/src/inserter.rs b/src/frontend/src/inserter.rs index 078a82984a..a5d06173cf 100644 --- a/src/frontend/src/inserter.rs +++ b/src/frontend/src/inserter.rs @@ -12,14 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; + use api::helper::ColumnDataTypeWrapper; use api::v1::alter_expr::Kind; use api::v1::ddl_request::Expr as DdlExpr; use api::v1::greptime_request::Request; +use api::v1::region::{ + region_request, InsertRequest as RegionInsertRequest, InsertRequests as RegionInsertRequests, +}; use api::v1::value::ValueData; use api::v1::{ AlterExpr, Column, ColumnDataType, ColumnSchema, DdlRequest, InsertRequest, InsertRequests, - Row, RowInsertRequest, RowInsertRequests, Rows, Value, + Row, RowInsertRequest, RowInsertRequests, Rows, SemanticType, Value, }; use catalog::CatalogManagerRef; use common_base::BitVec; @@ -28,22 +33,29 @@ use common_grpc_expr::util::{extract_new_columns, ColumnExpr}; use common_query::Output; use common_telemetry::info; use datatypes::schema::Schema; +use datatypes::vectors::VectorRef; use servers::query_handler::grpc::GrpcQueryHandlerRef; use session::context::QueryContextRef; use snafu::prelude::*; +use store_api::storage::RegionId; use table::engine::TableReference; +use table::metadata::TableInfoRef; +use table::requests::InsertRequest as TableInsertRequest; use table::TableRef; use crate::error::{ - CatalogSnafu, ColumnDataTypeSnafu, EmptyDataSnafu, Error, FindNewColumnsOnInsertionSnafu, - InvalidInsertRequestSnafu, Result, + CatalogSnafu, ColumnDataTypeSnafu, ColumnNotFoundSnafu, EmptyDataSnafu, Error, + FindNewColumnsOnInsertionSnafu, InvalidInsertRequestSnafu, MissingTimeIndexColumnSnafu, Result, + TableNotFoundSnafu, }; use crate::expr_factory::CreateExprFactory; +use crate::instance::region_handler::RegionRequestHandlerRef; pub(crate) struct Inserter<'a> { catalog_manager: &'a CatalogManagerRef, create_expr_factory: &'a CreateExprFactory, grpc_query_handler: &'a GrpcQueryHandlerRef, + region_request_handler: &'a RegionRequestHandlerRef, } impl<'a> Inserter<'a> { @@ -51,11 +63,13 @@ impl<'a> Inserter<'a> { catalog_manager: &'a CatalogManagerRef, create_expr_factory: &'a CreateExprFactory, grpc_query_handler: &'a GrpcQueryHandlerRef, + region_request_handler: &'a RegionRequestHandlerRef, ) -> Self { Self { catalog_manager, create_expr_factory, grpc_query_handler, + region_request_handler, } } @@ -81,10 +95,30 @@ impl<'a> Inserter<'a> { }) })?; - self.create_or_alter_tables_on_demand(&requests, ctx.clone()) + self.create_or_alter_tables_on_demand(&requests, &ctx) .await?; - let query = Request::RowInserts(requests); - self.grpc_query_handler.do_query(query, ctx).await + let region_request = self.convert_req_row_to_region(requests, &ctx).await?; + let response = self + .region_request_handler + .handle(region_request, ctx) + .await?; + Ok(Output::AffectedRows(response.affected_rows as _)) + } + + pub fn convert_req_table_to_region( + table_info: &TableInfoRef, + insert: TableInsertRequest, + ) -> Result { + let region_id = RegionId::new(table_info.table_id(), insert.region_number).into(); + let row_count = row_count(&insert.columns_values)?; + let schema = column_schema(table_info, &insert.columns_values)?; + let rows = api::helper::vectors_to_rows(insert.columns_values.values(), row_count); + Ok(RegionInsertRequests { + requests: vec![RegionInsertRequest { + region_id, + rows: Some(Rows { schema, rows }), + }], + }) } } @@ -95,16 +129,16 @@ impl<'a> Inserter<'a> { async fn create_or_alter_tables_on_demand( &self, requests: &RowInsertRequests, - ctx: QueryContextRef, + ctx: &QueryContextRef, ) -> Result<()> { // TODO(jeremy): create and alter in batch? for req in &requests.inserts { - match self.get_table(req, &ctx).await? { + match self.get_table(req, ctx).await? { Some(table) => { validate_request_with_table(req, &table)?; - self.alter_table_on_demand(req, table, &ctx).await? + self.alter_table_on_demand(req, table, ctx).await? } - None => self.create_table(req, &ctx).await?, + None => self.create_table(req, ctx).await?, } } @@ -192,6 +226,31 @@ impl<'a> Inserter<'a> { Ok(()) } + + async fn convert_req_row_to_region( + &self, + requests: RowInsertRequests, + ctx: &QueryContextRef, + ) -> Result { + let mut region_request = Vec::with_capacity(requests.inserts.len()); + for request in requests.inserts { + let table = self.get_table(&request, ctx).await?; + let table = table.with_context(|| TableNotFoundSnafu { + table_name: request.table_name.clone(), + })?; + + let region_id = RegionId::new(table.table_info().table_id(), request.region_number); + let insert_request = RegionInsertRequest { + region_id: region_id.into(), + rows: request.rows, + }; + region_request.push(insert_request); + } + + Ok(region_request::Body::Inserts(RegionInsertRequests { + requests: region_request, + })) + } } fn requests_column_to_row(requests: InsertRequests) -> Result { @@ -363,13 +422,81 @@ fn validate_required_columns(request_schema: &[ColumnSchema], table_schema: &Sch Ok(()) } +fn row_count(columns: &HashMap) -> Result { + let mut columns_iter = columns.values(); + + let len = columns_iter + .next() + .map(|column| column.len()) + .unwrap_or_default(); + ensure!( + columns_iter.all(|column| column.len() == len), + InvalidInsertRequestSnafu { + reason: "The row count of columns is not the same." + } + ); + + Ok(len) +} + +fn column_schema( + table_info: &TableInfoRef, + columns: &HashMap, +) -> Result> { + let table_meta = &table_info.meta; + let mut schema = vec![]; + + for (column_name, vector) in columns { + let time_index_column = &table_meta + .schema + .timestamp_column() + .with_context(|| table::error::MissingTimeIndexColumnSnafu { + table_name: table_info.name.to_string(), + }) + .context(MissingTimeIndexColumnSnafu)? + .name; + let semantic_type = if column_name == time_index_column { + SemanticType::Timestamp + } else { + let column_index = table_meta + .schema + .column_index_by_name(column_name) + .context(ColumnNotFoundSnafu { + msg: format!("unable to find column {column_name} in table schema"), + })?; + if table_meta.primary_key_indices.contains(&column_index) { + SemanticType::Tag + } else { + SemanticType::Field + } + }; + + let datatype: ColumnDataTypeWrapper = + vector.data_type().try_into().context(ColumnDataTypeSnafu)?; + + schema.push(ColumnSchema { + column_name: column_name.clone(), + datatype: datatype.datatype().into(), + semantic_type: semantic_type.into(), + }); + } + + Ok(schema) +} + #[cfg(test)] mod tests { + use std::sync::Arc; + use api::v1::column::Values; use api::v1::SemanticType; use common_base::bit_vec::prelude::*; + use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use datatypes::prelude::{ConcreteDataType, Value as DtValue}; + use datatypes::scalars::ScalarVectorBuilder; use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema as DtColumnSchema}; + use datatypes::vectors::{Int16VectorBuilder, MutableVector, StringVectorBuilder}; + use table::metadata::{TableInfoBuilder, TableMetaBuilder}; use super::*; @@ -551,4 +678,104 @@ mod tests { }; assert!(request_column_to_row(invalid_request_with_wrong_row_count).is_err()); } + + #[test] + fn test_insert_request_table_to_region() { + let schema = Schema::new(vec![ + DtColumnSchema::new("ts", ConcreteDataType::int64_datatype(), false) + .with_time_index(true), + DtColumnSchema::new("id", ConcreteDataType::int16_datatype(), false), + DtColumnSchema::new("host", ConcreteDataType::string_datatype(), false), + ]); + + let table_meta = TableMetaBuilder::default() + .schema(Arc::new(schema)) + .primary_key_indices(vec![2]) + .next_column_id(3) + .build() + .unwrap(); + + let table_info = Arc::new( + TableInfoBuilder::default() + .name("demo") + .meta(table_meta) + .table_id(1) + .build() + .unwrap(), + ); + + let insert_request = mock_insert_request(); + let mut request = + Inserter::convert_req_table_to_region(&table_info, insert_request).unwrap(); + + assert_eq!(request.requests.len(), 1); + verify_region_insert_request(request.requests.pop().unwrap()); + } + + fn mock_insert_request() -> TableInsertRequest { + let mut builder = StringVectorBuilder::with_capacity(3); + builder.push(Some("host1")); + builder.push(None); + builder.push(Some("host3")); + let host = builder.to_vector(); + + let mut builder = Int16VectorBuilder::with_capacity(3); + builder.push(Some(1_i16)); + builder.push(Some(2_i16)); + builder.push(Some(3_i16)); + let id = builder.to_vector(); + + let columns_values = HashMap::from([("host".to_string(), host), ("id".to_string(), id)]); + + TableInsertRequest { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + table_name: "demo".to_string(), + columns_values, + region_number: 0, + } + } + + fn verify_region_insert_request(request: RegionInsertRequest) { + assert_eq!(request.region_id, RegionId::new(1, 0).as_u64()); + + let rows = request.rows.unwrap(); + for (i, column) in rows.schema.iter().enumerate() { + let name = &column.column_name; + if name == "id" { + assert_eq!(ColumnDataType::Int16 as i32, column.datatype); + assert_eq!(SemanticType::Field as i32, column.semantic_type); + let values = rows + .rows + .iter() + .map(|row| row.values[i].value_data.clone()) + .collect::>(); + assert_eq!( + vec![ + Some(ValueData::I16Value(1)), + Some(ValueData::I16Value(2)), + Some(ValueData::I16Value(3)) + ], + values + ); + } + if name == "host" { + assert_eq!(ColumnDataType::String as i32, column.datatype); + assert_eq!(SemanticType::Tag as i32, column.semantic_type); + let values = rows + .rows + .iter() + .map(|row| row.values[i].value_data.clone()) + .collect::>(); + assert_eq!( + vec![ + Some(ValueData::StringValue("host1".to_string())), + None, + Some(ValueData::StringValue("host3".to_string())) + ], + values + ); + } + } + } } diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 2ee768af5b..020b674f89 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -18,6 +18,7 @@ mod influxdb; mod opentsdb; mod otlp; mod prom_store; +pub mod region_handler; mod script; mod standalone; @@ -43,6 +44,7 @@ use common_telemetry::logging::info; use common_telemetry::{error, timer}; use datanode::instance::sql::table_idents_to_full_name; use datanode::instance::InstanceRef as DnInstanceRef; +use datanode::region_server::RegionServer; use distributed::DistInstance; use meta_client::client::{MetaClient, MetaClientBuilder}; use partition::manager::PartitionRuleManager; @@ -72,6 +74,9 @@ use sql::statements::copy::CopyTable; use sql::statements::statement::Statement; use sqlparser::ast::ObjectName; +use self::distributed::DistRegionRequestHandler; +use self::region_handler::RegionRequestHandlerRef; +use self::standalone::StandaloneRegionRequestHandler; use crate::catalog::FrontendCatalogManager; use crate::error::{ self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, MissingMetasrvOptsSnafu, @@ -115,6 +120,7 @@ pub struct Instance { statement_executor: Arc, query_engine: QueryEngineRef, grpc_query_handler: GrpcQueryHandlerRef, + region_request_handler: RegionRequestHandlerRef, create_expr_factory: CreateExprFactory, /// plugins: this map holds extensions to customize query or auth /// behaviours. @@ -165,7 +171,7 @@ impl Instance { catalog_manager.clone(), true, Some(partition_manager.clone()), - Some(datanode_clients), + Some(datanode_clients.clone()), plugins.clone(), ) .query_engine(); @@ -173,10 +179,13 @@ impl Instance { let script_executor = Arc::new(ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()).await?); + let region_request_handler = DistRegionRequestHandler::arc(catalog_manager.clone()); + let statement_executor = Arc::new(StatementExecutor::new( catalog_manager.clone(), query_engine.clone(), dist_instance.clone(), + region_request_handler.clone(), )); plugins.insert::(statement_executor.clone()); @@ -205,6 +214,7 @@ impl Instance { create_expr_factory, statement_executor, query_engine, + region_request_handler, grpc_query_handler: dist_instance, plugins: plugins.clone(), servers: Arc::new(HashMap::new()), @@ -249,16 +259,21 @@ impl Instance { Ok(Arc::new(meta_client)) } - pub async fn try_new_standalone(dn_instance: DnInstanceRef) -> Result { + pub async fn try_new_standalone( + dn_instance: DnInstanceRef, + region_server: RegionServer, + ) -> Result { let catalog_manager = dn_instance.catalog_manager(); let query_engine = dn_instance.query_engine(); let script_executor = Arc::new(ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()).await?); + let region_request_handler = StandaloneRegionRequestHandler::arc(region_server); let statement_executor = Arc::new(StatementExecutor::new( catalog_manager.clone(), query_engine.clone(), dn_instance.clone(), + region_request_handler.clone(), )); let create_expr_factory = CreateExprFactory; @@ -271,6 +286,7 @@ impl Instance { statement_executor, query_engine, grpc_query_handler, + region_request_handler, plugins: Default::default(), servers: Arc::new(HashMap::new()), heartbeat_task: None, @@ -298,6 +314,7 @@ impl Instance { &self.catalog_manager, &self.create_expr_factory, &self.grpc_query_handler, + &self.region_request_handler, ); inserter.handle_row_inserts(requests, ctx).await } @@ -312,6 +329,7 @@ impl Instance { &self.catalog_manager, &self.create_expr_factory, &self.grpc_query_handler, + &self.region_request_handler, ); inserter.handle_column_inserts(requests, ctx).await } diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index 3fbddce0e0..d7ee060986 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -14,7 +14,6 @@ pub mod deleter; pub(crate) mod inserter; -pub(crate) mod row_inserter; use std::collections::HashMap; use std::sync::Arc; @@ -22,9 +21,9 @@ use std::sync::Arc; use api::helper::ColumnDataTypeWrapper; use api::v1::ddl_request::Expr as DdlExpr; use api::v1::greptime_request::Request; +use api::v1::region::{region_request, RegionResponse}; use api::v1::{ - column_def, AlterExpr, CreateDatabaseExpr, CreateTableExpr, DeleteRequests, InsertRequests, - RowInsertRequests, TruncateTableExpr, + column_def, AlterExpr, CreateDatabaseExpr, CreateTableExpr, DeleteRequests, TruncateTableExpr, }; use async_trait::async_trait; use catalog::{CatalogManager, DeregisterTableRequest, RegisterTableRequest}; @@ -54,22 +53,21 @@ use sql::ast::{Ident, Value as SqlValue}; use sql::statements::create::{PartitionEntry, Partitions}; use sql::statements::statement::Statement; use sql::statements::{self, sql_value_to_value}; -use table::error::TableOperationSnafu; use table::metadata::{RawTableInfo, RawTableMeta, TableId, TableIdent, TableInfo, TableType}; use table::requests::{AlterTableRequest, TableOptions}; use table::TableRef; +use super::region_handler::RegionRequestHandler; use crate::catalog::FrontendCatalogManager; use crate::error::{ self, AlterExprToRequestSnafu, CatalogSnafu, ColumnDataTypeSnafu, ColumnNotFoundSnafu, DeserializePartitionSnafu, InvokeDatanodeSnafu, NotSupportedSnafu, ParseSqlSnafu, Result, SchemaExistsSnafu, TableAlreadyExistSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu, - TableSnafu, UnrecognizedTableOptionSnafu, + UnrecognizedTableOptionSnafu, }; use crate::expr_factory; use crate::instance::distributed::deleter::DistDeleter; use crate::instance::distributed::inserter::DistInserter; -use crate::instance::distributed::row_inserter::RowDistInserter; use crate::table::DistTable; const MAX_VALUE: &str = "MAXVALUE"; @@ -266,23 +264,13 @@ impl DistInstance { self.drop_table(table_name).await } Statement::Insert(insert) => { - let (catalog, schema, _) = - table_idents_to_full_name(insert.table_name(), query_ctx.clone()) - .map_err(BoxedError::new) - .context(error::ExternalSnafu)?; - let insert_request = SqlHandler::insert_to_request(self.catalog_manager.clone(), &insert, query_ctx) .await .context(InvokeDatanodeSnafu)?; - let inserter = DistInserter::new(catalog, schema, self.catalog_manager.clone()); - let affected_rows = inserter - .insert(vec![insert_request]) - .await - .map_err(BoxedError::new) - .context(TableOperationSnafu) - .context(TableSnafu)?; + let inserter = DistInserter::new(&self.catalog_manager); + let affected_rows = inserter.insert_table_request(insert_request).await?; Ok(Output::AffectedRows(affected_rows as usize)) } Statement::ShowCreateTable(show) => { @@ -517,34 +505,6 @@ impl DistInstance { .context(error::ExecuteDdlSnafu) } - async fn handle_dist_insert( - &self, - requests: InsertRequests, - ctx: QueryContextRef, - ) -> Result { - let inserter = DistInserter::new( - ctx.current_catalog().to_owned(), - ctx.current_schema().to_owned(), - self.catalog_manager.clone(), - ); - let affected_rows = inserter.grpc_insert(requests).await?; - Ok(Output::AffectedRows(affected_rows as usize)) - } - - async fn handle_row_dist_insert( - &self, - requests: RowInsertRequests, - ctx: QueryContextRef, - ) -> Result { - let inserter = RowDistInserter::new( - ctx.current_catalog().to_owned(), - ctx.current_schema().to_owned(), - self.catalog_manager.clone(), - ); - let affected_rows = inserter.insert(requests).await?; - Ok(Output::AffectedRows(affected_rows as usize)) - } - async fn handle_dist_delete( &self, request: DeleteRequests, @@ -584,8 +544,11 @@ impl GrpcQueryHandler for DistInstance { async fn do_query(&self, request: Request, ctx: QueryContextRef) -> Result { match request { - Request::Inserts(requests) => self.handle_dist_insert(requests, ctx).await, - Request::RowInserts(requests) => self.handle_row_dist_insert(requests, ctx).await, + Request::Inserts(_) => NotSupportedSnafu { feat: "inserts" }.fail(), + Request::RowInserts(_) => NotSupportedSnafu { + feat: "row inserts", + } + .fail(), Request::RowDeletes(_) => NotSupportedSnafu { feat: "row deletes", } @@ -621,6 +584,69 @@ impl GrpcQueryHandler for DistInstance { } } +pub(crate) struct DistRegionRequestHandler { + catalog_manager: Arc, +} + +impl DistRegionRequestHandler { + pub fn arc(catalog_manager: Arc) -> Arc { + Arc::new(Self { catalog_manager }) + } +} + +#[async_trait] +impl RegionRequestHandler for DistRegionRequestHandler { + async fn handle( + &self, + request: region_request::Body, + ctx: QueryContextRef, + ) -> Result { + match request { + region_request::Body::Inserts(inserts) => { + let inserter = + DistInserter::new(&self.catalog_manager).with_trace_id(ctx.trace_id()); + let affected_rows = inserter.insert_region_requests(inserts).await? as _; + Ok(RegionResponse { + header: Some(Default::default()), + affected_rows, + }) + } + region_request::Body::Deletes(_) => NotSupportedSnafu { + feat: "region deletes", + } + .fail(), + region_request::Body::Create(_) => NotSupportedSnafu { + feat: "region create", + } + .fail(), + region_request::Body::Drop(_) => NotSupportedSnafu { + feat: "region drop", + } + .fail(), + region_request::Body::Open(_) => NotSupportedSnafu { + feat: "region open", + } + .fail(), + region_request::Body::Close(_) => NotSupportedSnafu { + feat: "region close", + } + .fail(), + region_request::Body::Alter(_) => NotSupportedSnafu { + feat: "region alter", + } + .fail(), + region_request::Body::Flush(_) => NotSupportedSnafu { + feat: "region flush", + } + .fail(), + region_request::Body::Compact(_) => NotSupportedSnafu { + feat: "region compact", + } + .fail(), + } + } +} + fn create_partitions_stmt(partitions: Vec) -> Result> { if partitions.is_empty() { return Ok(None); diff --git a/src/frontend/src/instance/distributed/deleter.rs b/src/frontend/src/instance/distributed/deleter.rs index 65c5cc95a5..3586e4a156 100644 --- a/src/frontend/src/instance/distributed/deleter.rs +++ b/src/frontend/src/instance/distributed/deleter.rs @@ -125,9 +125,7 @@ impl DistDeleter { let table_route = partition_manager .find_table_route(table_id) .await - .with_context(|_| FindTableRouteSnafu { - table_name: table_name.to_string(), - })?; + .with_context(|_| FindTableRouteSnafu { table_id })?; for (region_number, delete) in split { let datanode = diff --git a/src/frontend/src/instance/distributed/inserter.rs b/src/frontend/src/instance/distributed/inserter.rs index 9039f41987..41216fd8ac 100644 --- a/src/frontend/src/instance/distributed/inserter.rs +++ b/src/frontend/src/instance/distributed/inserter.rs @@ -13,172 +13,149 @@ // limitations under the License. use std::collections::HashMap; -use std::sync::Arc; -use api::v1::InsertRequests; +use api::v1::region::{region_request, InsertRequests, RegionRequest, RegionRequestHeader}; use catalog::CatalogManager; -use client::Database; -use common_grpc_expr::insert::to_table_insert_request; +use common_meta::datanode_manager::DatanodeManager; use common_meta::peer::Peer; -use common_meta::table_name::TableName; -use futures::future; +use futures_util::future; use metrics::counter; use snafu::{OptionExt, ResultExt}; -use table::metadata::TableInfoRef; -use table::meter_insert_request; -use table::requests::InsertRequest; +use store_api::storage::RegionId; +use table::requests::InsertRequest as TableInsertRequest; use crate::catalog::FrontendCatalogManager; use crate::error::{ - CatalogSnafu, FindDatanodeSnafu, FindTableRouteSnafu, JoinTaskSnafu, RequestDatanodeSnafu, - Result, SplitInsertSnafu, TableNotFoundSnafu, ToTableInsertRequestSnafu, + CatalogSnafu, FindDatanodeSnafu, FindTableRouteSnafu, JoinTaskSnafu, RequestInsertsSnafu, + Result, SplitInsertSnafu, TableNotFoundSnafu, }; -use crate::table::insert::to_grpc_insert_request; +use crate::inserter::Inserter; -/// A distributed inserter. It ingests GRPC [InsertRequests] or table [InsertRequest] (so it can be -/// used in protocol handlers or table insertion API). +/// A distributed inserter. It ingests gRPC [InsertRequests]. /// /// Table data partitioning and Datanode requests batching are handled inside. -/// -/// Note that the inserter is confined to a single catalog and schema. I.e., it cannot handle -/// multiple insert requests with different catalog or schema (will throw "NotSupported" error). -/// This is because we currently do not have this kind of requirements. Let's keep it simple for now. -pub(crate) struct DistInserter { - catalog: String, - schema: String, - catalog_manager: Arc, +pub struct DistInserter<'a> { + catalog_manager: &'a FrontendCatalogManager, + trace_id: Option, + span_id: Option, } -impl DistInserter { - pub(crate) fn new( - catalog: String, - schema: String, - catalog_manager: Arc, - ) -> Self { +impl<'a> DistInserter<'a> { + pub fn new(catalog_manager: &'a FrontendCatalogManager) -> Self { Self { - catalog, - schema, catalog_manager, + trace_id: None, + span_id: None, } } - pub(crate) async fn grpc_insert(&self, requests: InsertRequests) -> Result { - let inserts = requests - .inserts - .into_iter() - .map(|x| { - to_table_insert_request(&self.catalog, &self.schema, x) - .context(ToTableInsertRequestSnafu) - }) - .collect::>>()?; - - self.insert(inserts).await + pub fn with_trace_id(mut self, trace_id: u64) -> Self { + self.trace_id = Some(trace_id); + self } - pub(crate) async fn insert(&self, requests: Vec) -> Result { - debug_assert!(requests - .iter() - .all(|x| x.catalog_name == self.catalog && x.schema_name == self.schema)); - - let inserts = self.split_inserts(requests).await?; - - self.request_datanodes(inserts).await + #[allow(dead_code)] + pub fn with_span_id(mut self, span_id: u64) -> Self { + self.span_id = Some(span_id); + self } - /// Splits multiple table [InsertRequest]s into multiple GRPC [InsertRequests]s, each of which - /// is grouped by the peer of Datanode, so we can batch them together when invoking gRPC write - /// method in Datanode. - async fn split_inserts( - &self, - requests: Vec, - ) -> Result> { - let partition_manager = self.catalog_manager.partition_manager(); - - let mut inserts = HashMap::new(); - - for request in requests { - meter_insert_request!(request); - - let table_name = TableName::new(&self.catalog, &self.schema, &request.table_name); - let table_info = self.find_table_info(&request.table_name).await?; - let table_meta = &table_info.meta; - - let table_id = table_info.table_id(); - let split = partition_manager - .split_insert_request(table_id, request, table_meta.schema.as_ref()) - .await - .context(SplitInsertSnafu)?; - - let table_route = partition_manager - .find_table_route(table_id) - .await - .with_context(|_| FindTableRouteSnafu { - table_name: table_name.to_string(), - })?; - - for (region_number, insert) in split { - let datanode = - table_route - .find_region_leader(region_number) - .context(FindDatanodeSnafu { - region: region_number, - })?; - - let insert = to_grpc_insert_request(table_meta, region_number, insert)?; - - inserts - .entry(datanode.clone()) - .or_insert_with(|| InsertRequests { inserts: vec![] }) - .inserts - .push(insert); - } - } - Ok(inserts) - } - - async fn find_table_info(&self, table_name: &str) -> Result { - let table = self - .catalog_manager - .table(&self.catalog, &self.schema, table_name) - .await - .context(CatalogSnafu)? - .with_context(|| TableNotFoundSnafu { - table_name: common_catalog::format_full_table_name( - &self.catalog, - &self.schema, - table_name, - ), - })?; - Ok(table.table_info()) - } - - async fn request_datanodes(&self, inserts: HashMap) -> Result { - let results = future::try_join_all(inserts.into_iter().map(|(peer, inserts)| { + pub(crate) async fn insert_region_requests(&self, requests: InsertRequests) -> Result { + let requests = self.split(requests).await?; + let trace_id = self.trace_id.unwrap_or_default(); + let span_id = self.span_id.unwrap_or_default(); + let results = future::try_join_all(requests.into_iter().map(|(peer, inserts)| { let datanode_clients = self.catalog_manager.datanode_clients(); - let catalog = self.catalog.clone(); - let schema = self.schema.clone(); - common_runtime::spawn_write(async move { - let client = datanode_clients.get_client(&peer).await; - let database = Database::new(&catalog, &schema, client); - database.insert(inserts).await.context(RequestDatanodeSnafu) + let request = RegionRequest { + header: Some(RegionRequestHeader { trace_id, span_id }), + body: Some(region_request::Body::Inserts(inserts)), + }; + datanode_clients + .datanode(&peer) + .await + .handle(request) + .await + .context(RequestInsertsSnafu) }) })) .await .context(JoinTaskSnafu)?; - let affected_rows = results.into_iter().sum::>()?; - counter!(crate::metrics::DIST_INGEST_ROW_COUNT, affected_rows as u64); + let affected_rows = results.into_iter().sum::>()?; + counter!(crate::metrics::DIST_INGEST_ROW_COUNT, affected_rows); Ok(affected_rows) } + + pub(crate) async fn insert_table_request(&self, request: TableInsertRequest) -> Result { + let table = self + .catalog_manager + .table( + &request.catalog_name, + &request.schema_name, + &request.table_name, + ) + .await + .context(CatalogSnafu)? + .with_context(|| TableNotFoundSnafu { + table_name: format!( + "{}.{}.{}", + request.catalog_name, request.schema_name, request.table_name + ), + })?; + + let table_info = table.table_info(); + let request = Inserter::convert_req_table_to_region(&table_info, request)?; + self.insert_region_requests(request).await + } + + /// Splits gRPC [InsertRequests] into multiple gRPC [InsertRequests]s, each of which + /// is grouped by the peer of Datanode, so we can batch them together when invoking gRPC write + /// method in Datanode. + async fn split(&self, requests: InsertRequests) -> Result> { + let partition_manager = self.catalog_manager.partition_manager(); + let mut inserts: HashMap = HashMap::new(); + + for req in requests.requests { + let table_id = RegionId::from_u64(req.region_id).table_id(); + + let req_splits = partition_manager + .split_insert_request(table_id, req) + .await + .context(SplitInsertSnafu)?; + let table_route = partition_manager + .find_table_route(table_id) + .await + .context(FindTableRouteSnafu { table_id })?; + + for (region_number, insert) in req_splits { + let peer = + table_route + .find_region_leader(region_number) + .context(FindDatanodeSnafu { + region: region_number, + })?; + inserts + .entry(peer.clone()) + .or_default() + .requests + .push(insert); + } + } + + Ok(inserts) + } } #[cfg(test)] mod tests { - use api::v1::column::Values; - use api::v1::{Column, ColumnDataType, InsertRequest as GrpcInsertRequest, SemanticType}; + use std::sync::Arc; + + use api::helper::vectors_to_rows; + use api::v1::region::InsertRequest; + use api::v1::value::ValueData; + use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value}; use client::client_manager::DatanodeClients; - use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_meta::key::catalog_name::{CatalogManager, CatalogNameKey}; use common_meta::key::schema_name::{SchemaManager, SchemaNameKey}; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; @@ -186,7 +163,7 @@ mod tests { use common_meta::kv_backend::KvBackendRef; use common_meta::rpc::router::{Region, RegionRoute}; use datatypes::prelude::{ConcreteDataType, VectorRef}; - use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, Schema}; + use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema as DtColumnSchema, Schema}; use datatypes::vectors::Int32Vector; use table::metadata::{RawTableInfo, TableInfoBuilder, TableMetaBuilder}; @@ -217,13 +194,13 @@ mod tests { table_metadata_manager: &TableMetadataManagerRef, ) { let schema = Arc::new(Schema::new(vec![ - ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), false) + DtColumnSchema::new("ts", ConcreteDataType::int64_datatype(), false) .with_time_index(true) .with_default_constraint(Some(ColumnDefaultConstraint::Function( "current_timestamp()".to_string(), ))) .unwrap(), - ColumnSchema::new("a", ConcreteDataType::int32_datatype(), true), + DtColumnSchema::new("a", ConcreteDataType::int32_datatype(), true), ])); let table_meta = TableMetaBuilder::default() @@ -279,61 +256,65 @@ mod tests { table_metadata_manager, )); - let inserter = DistInserter::new( - DEFAULT_CATALOG_NAME.to_string(), - DEFAULT_SCHEMA_NAME.to_string(), - catalog_manager, - ); + let inserter = DistInserter::new(&catalog_manager); let new_insert_request = |vector: VectorRef| -> InsertRequest { + let row_count = vector.len(); InsertRequest { - catalog_name: DEFAULT_CATALOG_NAME.to_string(), - schema_name: DEFAULT_SCHEMA_NAME.to_string(), - table_name: table_name.to_string(), - columns_values: HashMap::from([("a".to_string(), vector)]), - region_number: 0, + region_id: RegionId::new(1, 0).into(), + rows: Some(Rows { + schema: vec![ColumnSchema { + column_name: "a".to_string(), + datatype: ColumnDataType::Int32 as i32, + semantic_type: SemanticType::Field as i32, + }], + rows: vectors_to_rows([vector].iter(), row_count), + }), } }; - let requests = vec![ - new_insert_request(Arc::new(Int32Vector::from(vec![ - Some(1), - None, - Some(11), - Some(101), - ]))), - new_insert_request(Arc::new(Int32Vector::from(vec![ - Some(2), - Some(12), - None, - Some(102), - ]))), - ]; - let mut inserts = inserter.split_inserts(requests).await.unwrap(); + let requests = InsertRequests { + requests: vec![ + new_insert_request(Arc::new(Int32Vector::from(vec![ + Some(1), + None, + Some(11), + Some(101), + ]))), + new_insert_request(Arc::new(Int32Vector::from(vec![ + Some(2), + Some(12), + None, + Some(102), + ]))), + ], + }; + + let mut inserts = inserter.split(requests).await.unwrap(); assert_eq!(inserts.len(), 3); - let new_grpc_insert_request = |column_values: Vec, - null_mask: Vec, - row_count: u32, - region_number: u32| - -> GrpcInsertRequest { - GrpcInsertRequest { - table_name: table_name.to_string(), - columns: vec![Column { - column_name: "a".to_string(), - semantic_type: SemanticType::Field as i32, - values: Some(Values { - i32_values: column_values, - ..Default::default() + let new_split_insert_request = + |rows: Vec>, region_id: RegionId| -> InsertRequest { + InsertRequest { + region_id: region_id.into(), + rows: Some(Rows { + schema: vec![ColumnSchema { + column_name: "a".to_string(), + datatype: ColumnDataType::Int32 as i32, + semantic_type: SemanticType::Field as i32, + }], + rows: rows + .into_iter() + .map(|v| Row { + values: vec![Value { + value_data: v.map(ValueData::I32Value), + }], + }) + .collect(), }), - null_mask, - datatype: ColumnDataType::Int32 as i32, - }], - row_count, - region_number, - } - }; + } + }; // region to datanode placement: // 1 -> 1 @@ -345,37 +326,37 @@ mod tests { // 2 -> [10, 50) // 3 -> (min, 10) - let datanode_inserts = inserts.remove(&Peer::new(1, "")).unwrap().inserts; + let datanode_inserts = inserts.remove(&Peer::new(1, "")).unwrap().requests; assert_eq!(datanode_inserts.len(), 2); assert_eq!( datanode_inserts[0], - new_grpc_insert_request(vec![101], vec![0], 1, 1) + new_split_insert_request(vec![Some(101)], RegionId::new(1, 1)) ); assert_eq!( datanode_inserts[1], - new_grpc_insert_request(vec![102], vec![0], 1, 1) + new_split_insert_request(vec![Some(102)], RegionId::new(1, 1)) ); - let datanode_inserts = inserts.remove(&Peer::new(2, "")).unwrap().inserts; + let datanode_inserts = inserts.remove(&Peer::new(2, "")).unwrap().requests; assert_eq!(datanode_inserts.len(), 2); assert_eq!( datanode_inserts[0], - new_grpc_insert_request(vec![11], vec![0], 1, 2) + new_split_insert_request(vec![Some(11)], RegionId::new(1, 2)) ); assert_eq!( datanode_inserts[1], - new_grpc_insert_request(vec![12], vec![0], 1, 2) + new_split_insert_request(vec![Some(12)], RegionId::new(1, 2)) ); - let datanode_inserts = inserts.remove(&Peer::new(3, "")).unwrap().inserts; + let datanode_inserts = inserts.remove(&Peer::new(3, "")).unwrap().requests; assert_eq!(datanode_inserts.len(), 2); assert_eq!( datanode_inserts[0], - new_grpc_insert_request(vec![1], vec![2], 2, 3) + new_split_insert_request(vec![Some(1), None], RegionId::new(1, 3)) ); assert_eq!( datanode_inserts[1], - new_grpc_insert_request(vec![2], vec![2], 2, 3) + new_split_insert_request(vec![Some(2), None], RegionId::new(1, 3)) ); } } diff --git a/src/frontend/src/instance/distributed/row_inserter.rs b/src/frontend/src/instance/distributed/row_inserter.rs deleted file mode 100644 index 4eabb21de9..0000000000 --- a/src/frontend/src/instance/distributed/row_inserter.rs +++ /dev/null @@ -1,125 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashMap; -use std::sync::Arc; - -use api::v1::RowInsertRequests; -use catalog::CatalogManager; -use client::Database; -use common_meta::peer::Peer; -use futures_util::future; -use metrics::counter; -use snafu::{OptionExt, ResultExt}; -use table::metadata::TableId; - -use crate::catalog::FrontendCatalogManager; -use crate::error::{ - CatalogSnafu, FindDatanodeSnafu, FindTableRouteSnafu, JoinTaskSnafu, RequestDatanodeSnafu, - Result, SplitInsertSnafu, TableNotFoundSnafu, -}; - -pub struct RowDistInserter { - catalog_name: String, - schema_name: String, - catalog_manager: Arc, -} - -impl RowDistInserter { - pub fn new( - catalog_name: String, - schema_name: String, - catalog_manager: Arc, - ) -> Self { - Self { - catalog_name, - schema_name, - catalog_manager, - } - } - - pub(crate) async fn insert(&self, requests: RowInsertRequests) -> Result { - let requests = self.split(requests).await?; - let results = future::try_join_all(requests.into_iter().map(|(peer, inserts)| { - let datanode_clients = self.catalog_manager.datanode_clients(); - let catalog = self.catalog_name.clone(); - let schema = self.schema_name.clone(); - - common_runtime::spawn_write(async move { - let client = datanode_clients.get_client(&peer).await; - let database = Database::new(catalog, schema, client); - database - .row_insert(inserts) - .await - .context(RequestDatanodeSnafu) - }) - })) - .await - .context(JoinTaskSnafu)?; - - let affected_rows = results.into_iter().sum::>()?; - counter!(crate::metrics::DIST_INGEST_ROW_COUNT, affected_rows as u64); - Ok(affected_rows) - } - - async fn split(&self, requests: RowInsertRequests) -> Result> { - let partition_manager = self.catalog_manager.partition_manager(); - let mut inserts: HashMap = HashMap::new(); - - for req in requests.inserts { - let table_name = req.table_name.clone(); - let table_id = self.get_table_id(table_name.as_str()).await?; - - let req_splits = partition_manager - .split_row_insert_request(table_id, req) - .await - .context(SplitInsertSnafu)?; - let table_route = partition_manager - .find_table_route(table_id) - .await - .context(FindTableRouteSnafu { table_name })?; - - for (region_number, insert) in req_splits { - let peer = - table_route - .find_region_leader(region_number) - .context(FindDatanodeSnafu { - region: region_number, - })?; - inserts - .entry(peer.clone()) - .or_default() - .inserts - .push(insert); - } - } - - Ok(inserts) - } - - async fn get_table_id(&self, table_name: &str) -> Result { - self.catalog_manager - .table(&self.catalog_name, &self.schema_name, table_name) - .await - .context(CatalogSnafu)? - .with_context(|| TableNotFoundSnafu { - table_name: common_catalog::format_full_table_name( - &self.catalog_name, - &self.schema_name, - table_name, - ), - }) - .map(|table| table.table_info().table_id()) - } -} diff --git a/src/frontend/src/instance/region_handler.rs b/src/frontend/src/instance/region_handler.rs new file mode 100644 index 0000000000..5d0c9d5b63 --- /dev/null +++ b/src/frontend/src/instance/region_handler.rs @@ -0,0 +1,32 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use api::v1::region::{region_request, RegionResponse}; +use async_trait::async_trait; +use session::context::QueryContextRef; + +use crate::error::Result; + +#[async_trait] +pub trait RegionRequestHandler: Send + Sync { + async fn handle( + &self, + request: region_request::Body, + ctx: QueryContextRef, + ) -> Result; +} + +pub type RegionRequestHandlerRef = Arc; diff --git a/src/frontend/src/instance/standalone.rs b/src/frontend/src/instance/standalone.rs index af5c224e7c..2607c3fb98 100644 --- a/src/frontend/src/instance/standalone.rs +++ b/src/frontend/src/instance/standalone.rs @@ -15,14 +15,18 @@ use std::sync::Arc; use api::v1::greptime_request::Request; +use api::v1::region::{region_request, RegionResponse}; use async_trait::async_trait; use common_query::Output; use datanode::error::Error as DatanodeError; +use datanode::region_server::RegionServer; +use servers::grpc::region_server::RegionServerHandler; use servers::query_handler::grpc::{GrpcQueryHandler, GrpcQueryHandlerRef}; use session::context::QueryContextRef; use snafu::ResultExt; -use crate::error::{self, Result}; +use super::region_handler::RegionRequestHandler; +use crate::error::{Error, InvokeDatanodeSnafu, InvokeRegionServerSnafu, Result}; pub(crate) struct StandaloneGrpcQueryHandler(GrpcQueryHandlerRef); @@ -34,12 +38,36 @@ impl StandaloneGrpcQueryHandler { #[async_trait] impl GrpcQueryHandler for StandaloneGrpcQueryHandler { - type Error = error::Error; + type Error = Error; async fn do_query(&self, query: Request, ctx: QueryContextRef) -> Result { self.0 .do_query(query, ctx) .await - .context(error::InvokeDatanodeSnafu) + .context(InvokeDatanodeSnafu) + } +} + +pub(crate) struct StandaloneRegionRequestHandler { + region_server: RegionServer, +} + +impl StandaloneRegionRequestHandler { + pub fn arc(region_server: RegionServer) -> Arc { + Arc::new(Self { region_server }) + } +} + +#[async_trait] +impl RegionRequestHandler for StandaloneRegionRequestHandler { + async fn handle( + &self, + request: region_request::Body, + _ctx: QueryContextRef, + ) -> Result { + self.region_server + .handle(request) + .await + .context(InvokeRegionServerSnafu) } } diff --git a/src/frontend/src/statement.rs b/src/frontend/src/statement.rs index 1105ac9299..3e0dd05354 100644 --- a/src/frontend/src/statement.rs +++ b/src/frontend/src/statement.rs @@ -24,6 +24,7 @@ use std::collections::HashMap; use std::str::FromStr; use std::sync::Arc; +use api::v1::region::region_request; use catalog::CatalogManagerRef; use common_error::ext::BoxedError; use common_query::Output; @@ -50,8 +51,9 @@ use crate::error::{ self, CatalogSnafu, ExecLogicalPlanSnafu, ExecuteStatementSnafu, ExternalSnafu, InsertSnafu, PlanStatementSnafu, Result, TableNotFoundSnafu, }; +use crate::inserter::Inserter; use crate::instance::distributed::deleter::DistDeleter; -use crate::instance::distributed::inserter::DistInserter; +use crate::instance::region_handler::RegionRequestHandlerRef; use crate::statement::backup::{COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY}; #[derive(Clone)] @@ -59,6 +61,7 @@ pub struct StatementExecutor { catalog_manager: CatalogManagerRef, query_engine: QueryEngineRef, sql_stmt_executor: SqlStatementExecutorRef, + region_request_handler: RegionRequestHandlerRef, } impl StatementExecutor { @@ -66,11 +69,13 @@ impl StatementExecutor { catalog_manager: CatalogManagerRef, query_engine: QueryEngineRef, sql_stmt_executor: SqlStatementExecutorRef, + region_request_handler: RegionRequestHandlerRef, ) -> Self { Self { catalog_manager, query_engine, sql_stmt_executor, + region_request_handler, } } @@ -110,9 +115,10 @@ impl StatementExecutor { .copy_table_to(req, query_ctx) .await .map(Output::AffectedRows), - CopyDirection::Import => { - self.copy_table_from(req).await.map(Output::AffectedRows) - } + CopyDirection::Import => self + .copy_table_from(req, query_ctx) + .await + .map(Output::AffectedRows), } } @@ -165,45 +171,26 @@ impl StatementExecutor { }) } - // TODO(zhongzc): A middle state that eliminates calls to table.insert, - // For DistTable, its insert is not invoked; for MitoTable, it is still called but eventually eliminated. - async fn send_insert_request(&self, request: InsertRequest) -> Result { - let frontend_catalog_manager = self - .catalog_manager - .as_any() - .downcast_ref::(); + async fn handle_table_insert_request( + &self, + request: InsertRequest, + query_ctx: QueryContextRef, + ) -> Result { + let table_ref = TableReference::full( + &request.catalog_name, + &request.schema_name, + &request.table_name, + ); + let table = self.get_table(&table_ref).await?; + let table_info = table.table_info(); - let table_name = request.table_name.clone(); - match frontend_catalog_manager { - Some(frontend_catalog_manager) => { - let inserter = DistInserter::new( - request.catalog_name.clone(), - request.schema_name.clone(), - Arc::new(frontend_catalog_manager.clone()), - ); - let affected_rows = inserter - .insert(vec![request]) - .await - .map_err(BoxedError::new) - .context(TableOperationSnafu) - .context(InsertSnafu { table_name })?; - Ok(affected_rows as usize) - } - None => { - let table_ref = TableReference::full( - &request.catalog_name, - &request.schema_name, - &request.table_name, - ); - let affected_rows = self - .get_table(&table_ref) - .await? - .insert(request) - .await - .context(InsertSnafu { table_name })?; - Ok(affected_rows) - } - } + let request = Inserter::convert_req_table_to_region(&table_info, request)?; + let region_response = self + .region_request_handler + .handle(region_request::Body::Inserts(request), query_ctx) + .await?; + + Ok(region_response.affected_rows as _) } // TODO(zhongzc): A middle state that eliminates calls to table.delete, diff --git a/src/frontend/src/statement/copy_table_from.rs b/src/frontend/src/statement/copy_table_from.rs index 30548d737f..034443f0ec 100644 --- a/src/frontend/src/statement/copy_table_from.rs +++ b/src/frontend/src/statement/copy_table_from.rs @@ -40,6 +40,7 @@ use datatypes::vectors::Helper; use futures_util::StreamExt; use object_store::{Entry, EntryMode, Metakey, ObjectStore}; use regex::Regex; +use session::context::QueryContextRef; use snafu::ResultExt; use table::engine::TableReference; use table::requests::{CopyTableRequest, InsertRequest}; @@ -235,7 +236,11 @@ impl StatementExecutor { } } - pub async fn copy_table_from(&self, req: CopyTableRequest) -> Result { + pub async fn copy_table_from( + &self, + req: CopyTableRequest, + query_ctx: QueryContextRef, + ) -> Result { let table_ref = TableReference { catalog: &req.catalog_name, schema: &req.schema_name, @@ -322,14 +327,17 @@ impl StatementExecutor { .zip(vectors) .collect::>(); - pending.push(self.send_insert_request(InsertRequest { - catalog_name: req.catalog_name.to_string(), - schema_name: req.schema_name.to_string(), - table_name: req.table_name.to_string(), - columns_values, - //TODO: support multi-regions - region_number: 0, - })); + pending.push(self.handle_table_insert_request( + InsertRequest { + catalog_name: req.catalog_name.to_string(), + schema_name: req.schema_name.to_string(), + table_name: req.table_name.to_string(), + columns_values, + // TODO: support multi-regions + region_number: 0, + }, + query_ctx.clone(), + )); if pending_mem_size as u64 >= pending_mem_threshold { rows_inserted += batch_insert(&mut pending, &mut pending_mem_size).await?; diff --git a/src/frontend/src/statement/dml.rs b/src/frontend/src/statement/dml.rs index 9d6deabbc5..0b6c8cc47b 100644 --- a/src/frontend/src/statement/dml.rs +++ b/src/frontend/src/statement/dml.rs @@ -74,7 +74,9 @@ impl StatementExecutor { let record_batch = batch.context(ReadRecordBatchSnafu)?; let insert_request = build_insert_request(record_batch, table.schema(), &table_info)?; - affected_rows += self.send_insert_request(insert_request).await?; + affected_rows += self + .handle_table_insert_request(insert_request, query_ctx.clone()) + .await?; } Ok(Output::AffectedRows(affected_rows)) diff --git a/src/frontend/src/table/insert.rs b/src/frontend/src/table/insert.rs index 035eda040c..0e7e5aa186 100644 --- a/src/frontend/src/table/insert.rs +++ b/src/frontend/src/table/insert.rs @@ -16,12 +16,10 @@ use std::collections::HashMap; use api::helper::{push_vals, ColumnDataTypeWrapper}; use api::v1::column::Values; -use api::v1::{Column, InsertRequest as GrpcInsertRequest, SemanticType}; +use api::v1::{Column, SemanticType}; use datatypes::prelude::*; use snafu::{ensure, OptionExt, ResultExt}; -use store_api::storage::RegionNumber; use table::metadata::TableMeta; -use table::requests::InsertRequest; use crate::error::{self, ColumnDataTypeSnafu, NotSupportedSnafu, Result, VectorToGrpcColumnSnafu}; @@ -55,21 +53,6 @@ pub(crate) fn to_grpc_columns( Ok((columns, row_count)) } -pub(crate) fn to_grpc_insert_request( - table_meta: &TableMeta, - region_number: RegionNumber, - insert: InsertRequest, -) -> Result { - let table_name = insert.table_name.clone(); - let (columns, row_count) = to_grpc_columns(table_meta, &insert.columns_values)?; - Ok(GrpcInsertRequest { - table_name, - region_number, - columns, - row_count, - }) -} - fn vector_to_grpc_column( table_meta: &TableMeta, column_name: &str, @@ -114,19 +97,12 @@ fn vector_to_grpc_column( #[cfg(test)] mod tests { - use std::collections::HashMap; use std::sync::Arc; use api::v1::ColumnDataType; - use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; - use datatypes::prelude::ScalarVectorBuilder; use datatypes::schema::{ColumnSchema, Schema}; - use datatypes::vectors::{ - Int16VectorBuilder, Int32Vector, Int64Vector, MutableVector, StringVector, - StringVectorBuilder, - }; + use datatypes::vectors::{Int32Vector, Int64Vector, StringVector}; use table::metadata::TableMetaBuilder; - use table::requests::InsertRequest; use super::*; @@ -189,75 +165,4 @@ mod tests { assert_eq!(column.null_mask, vec![2]); assert_eq!(column.datatype, ColumnDataType::String as i32); } - - #[test] - fn test_to_grpc_insert_request() { - let schema = Schema::new(vec![ - ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), false) - .with_time_index(true), - ColumnSchema::new("id", ConcreteDataType::int16_datatype(), false), - ColumnSchema::new("host", ConcreteDataType::string_datatype(), false), - ]); - - let table_meta = TableMetaBuilder::default() - .schema(Arc::new(schema)) - .primary_key_indices(vec![]) - .next_column_id(3) - .build() - .unwrap(); - - let insert_request = mock_insert_request(); - let request = to_grpc_insert_request(&table_meta, 12, insert_request).unwrap(); - - verify_grpc_insert_request(request); - } - - fn mock_insert_request() -> InsertRequest { - let mut builder = StringVectorBuilder::with_capacity(3); - builder.push(Some("host1")); - builder.push(None); - builder.push(Some("host3")); - let host = builder.to_vector(); - - let mut builder = Int16VectorBuilder::with_capacity(3); - builder.push(Some(1_i16)); - builder.push(Some(2_i16)); - builder.push(Some(3_i16)); - let id = builder.to_vector(); - - let columns_values = HashMap::from([("host".to_string(), host), ("id".to_string(), id)]); - - InsertRequest { - catalog_name: DEFAULT_CATALOG_NAME.to_string(), - schema_name: DEFAULT_SCHEMA_NAME.to_string(), - table_name: "demo".to_string(), - columns_values, - region_number: 0, - } - } - - fn verify_grpc_insert_request(request: GrpcInsertRequest) { - let table_name = request.table_name; - assert_eq!("demo", table_name); - - for column in request.columns { - let name = column.column_name; - if name == "id" { - assert_eq!(0, column.null_mask[0]); - assert_eq!(ColumnDataType::Int16 as i32, column.datatype); - assert_eq!(vec![1, 2, 3], column.values.as_ref().unwrap().i16_values); - } - if name == "host" { - assert_eq!(2, column.null_mask[0]); - assert_eq!(ColumnDataType::String as i32, column.datatype); - assert_eq!( - vec!["host1", "host3"], - column.values.as_ref().unwrap().string_values - ); - } - } - - let region_number = request.region_number; - assert_eq!(12, region_number); - } } diff --git a/src/partition/src/manager.rs b/src/partition/src/manager.rs index 36353b13f3..541313c577 100644 --- a/src/partition/src/manager.rs +++ b/src/partition/src/manager.rs @@ -15,25 +15,24 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; -use api::v1::RowInsertRequest; +use api::v1::region::InsertRequest; use common_meta::peer::Peer; use common_meta::rpc::router::TableRoute; use common_query::prelude::Expr; use datafusion_expr::{BinaryExpr, Expr as DfExpr, Operator}; use datatypes::prelude::Value; -use datatypes::schema::Schema; use snafu::{ensure, OptionExt, ResultExt}; use store_api::storage::{RegionId, RegionNumber}; use table::metadata::TableId; -use table::requests::{DeleteRequest, InsertRequest}; +use table::requests::DeleteRequest; use crate::columns::RangeColumnsPartitionRule; use crate::error::{FindLeaderSnafu, Result}; use crate::partition::{PartitionBound, PartitionDef, PartitionExpr}; use crate::range::RangePartitionRule; use crate::route::TableRoutes; -use crate::row_splitter::{RowInsertRequestSplits, RowSplitter}; -use crate::splitter::{DeleteRequestSplit, InsertRequestSplit, WriteSplitter}; +use crate::row_splitter::{InsertRequestSplits, RowSplitter}; +use crate::splitter::{DeleteRequestSplit, WriteSplitter}; use crate::{error, PartitionRuleRef}; #[async_trait::async_trait] @@ -236,26 +235,13 @@ impl PartitionRuleManager { Ok(regions) } - /// Split [InsertRequest] into [InsertRequestSplit] according to the partition rule + /// Split [InsertRequest] into [InsertRequestSplits] according to the partition rule /// of given table. pub async fn split_insert_request( &self, table: TableId, req: InsertRequest, - schema: &Schema, - ) -> Result { - let partition_rule = self.find_table_partition_rule(table).await?; - let splitter = WriteSplitter::with_partition_rule(partition_rule); - splitter.split_insert(req, schema) - } - - /// Split [RowInsertRequest] into [RowInsertRequestSplits] according to the partition rule - /// of given table. - pub async fn split_row_insert_request( - &self, - table: TableId, - req: RowInsertRequest, - ) -> Result { + ) -> Result { let partition_rule = self.find_table_partition_rule(table).await?; RowSplitter::new(partition_rule).split(req) } diff --git a/src/partition/src/row_splitter.rs b/src/partition/src/row_splitter.rs index e3f0dda83b..cfbc25c9f2 100644 --- a/src/partition/src/row_splitter.rs +++ b/src/partition/src/row_splitter.rs @@ -15,14 +15,15 @@ use std::collections::HashMap; use api::helper; -use api::v1::{ColumnSchema, Row, RowInsertRequest, Rows}; +use api::v1::region::InsertRequest; +use api::v1::{ColumnSchema, Row, Rows}; use datatypes::value::Value; -use store_api::storage::RegionNumber; +use store_api::storage::{RegionId, RegionNumber, TableId}; use crate::error::Result; use crate::PartitionRuleRef; -pub type RowInsertRequestSplits = HashMap; +pub type InsertRequestSplits = HashMap; pub struct RowSplitter { partition_rule: PartitionRuleRef, @@ -33,7 +34,7 @@ impl RowSplitter { Self { partition_rule } } - pub fn split(&self, req: RowInsertRequest) -> Result { + pub fn split(&self, req: InsertRequest) -> Result { // No partition let partition_columns = self.partition_rule.partition_columns(); if partition_columns.is_empty() { @@ -45,12 +46,13 @@ impl RowSplitter { return Ok(HashMap::new()); }; - SplitReadRowHelper::new(req.table_name, rows, &self.partition_rule).split_to_requests() + let table_id = RegionId::from_u64(req.region_id).table_id(); + SplitReadRowHelper::new(table_id, rows, &self.partition_rule).split_to_requests() } } struct SplitReadRowHelper<'a> { - table_name: String, + table_id: TableId, schema: Vec, rows: Vec, partition_rule: &'a PartitionRuleRef, @@ -59,7 +61,7 @@ struct SplitReadRowHelper<'a> { } impl<'a> SplitReadRowHelper<'a> { - fn new(table_name: String, rows: Rows, partition_rule: &'a PartitionRuleRef) -> Self { + fn new(table_id: TableId, rows: Rows, partition_rule: &'a PartitionRuleRef) -> Self { let col_name_to_idx = rows .schema .iter() @@ -73,7 +75,7 @@ impl<'a> SplitReadRowHelper<'a> { .collect::>(); Self { - table_name, + table_id, schema: rows.schema, rows: rows.rows, partition_rule, @@ -81,7 +83,7 @@ impl<'a> SplitReadRowHelper<'a> { } } - fn split_to_requests(mut self) -> Result { + fn split_to_requests(mut self) -> Result { let request_splits = self .split_to_regions()? .into_iter() @@ -90,13 +92,12 @@ impl<'a> SplitReadRowHelper<'a> { .into_iter() .map(|row_idx| std::mem::take(&mut self.rows[row_idx])) .collect(); - let req = RowInsertRequest { - table_name: self.table_name.clone(), + let req = InsertRequest { rows: Some(Rows { schema: self.schema.clone(), rows, }), - region_number, + region_id: RegionId::new(self.table_id, region_number).into(), }; (region_number, req) }) @@ -145,7 +146,7 @@ mod tests { use crate::partition::PartitionExpr; use crate::PartitionRule; - fn mock_insert_request() -> RowInsertRequest { + fn mock_insert_request() -> InsertRequest { let schema = vec![ ColumnSchema { column_name: "id".to_string(), @@ -186,10 +187,9 @@ mod tests { ], }, ]; - RowInsertRequest { - table_name: "t".to_string(), + InsertRequest { rows: Some(Rows { schema, rows }), - region_number: 0, + region_id: 0, } } @@ -279,8 +279,8 @@ mod tests { let req0 = &splits[&0]; let req1 = &splits[&1]; - assert_eq!(req0.region_number, 0); - assert_eq!(req1.region_number, 1); + assert_eq!(req0.region_id, 0); + assert_eq!(req1.region_id, 1); let rows0 = req0.rows.as_ref().unwrap(); let rows1 = req1.rows.as_ref().unwrap(); @@ -298,7 +298,7 @@ mod tests { assert_eq!(splits.len(), 1); let req = &splits[&1]; - assert_eq!(req.region_number, 1); + assert_eq!(req.region_id, 1); let rows = req.rows.as_ref().unwrap(); assert_eq!(rows.rows.len(), 3); @@ -314,7 +314,7 @@ mod tests { assert_eq!(splits.len(), 1); let req = &splits[&0]; - assert_eq!(req.region_number, 0); + assert_eq!(req.region_id, 0); let rows = req.rows.as_ref().unwrap(); assert_eq!(rows.rows.len(), 3);