diff --git a/src/client/src/region.rs b/src/client/src/region.rs index 44e5ce107f..5869f5971a 100644 --- a/src/client/src/region.rs +++ b/src/client/src/region.rs @@ -164,7 +164,7 @@ impl RegionRequester { } } -fn check_response_header(header: Option) -> Result<()> { +pub fn check_response_header(header: Option) -> Result<()> { let status = header .and_then(|header| header.status) .context(IllegalDatabaseResponseSnafu { diff --git a/src/client/src/region_handler.rs b/src/client/src/region_handler.rs index dc3ad1df93..9238389f6d 100644 --- a/src/client/src/region_handler.rs +++ b/src/client/src/region_handler.rs @@ -14,8 +14,9 @@ use std::sync::Arc; -use api::v1::region::{region_request, QueryRequest, RegionResponse}; +use api::v1::region::{region_request, QueryRequest}; use async_trait::async_trait; +use common_meta::datanode_manager::AffectedRows; use common_recordbatch::SendableRecordBatchStream; use session::context::QueryContextRef; @@ -27,7 +28,7 @@ pub trait RegionRequestHandler: Send + Sync { &self, request: region_request::Body, ctx: QueryContextRef, - ) -> Result; + ) -> Result; // TODO(ruihang): add trace id and span id in the request. async fn do_get(&self, request: QueryRequest) -> Result; diff --git a/src/frontend/src/delete.rs b/src/frontend/src/delete.rs new file mode 100644 index 0000000000..3c9c685086 --- /dev/null +++ b/src/frontend/src/delete.rs @@ -0,0 +1,168 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashSet; +use std::{iter, mem}; + +use api::v1::region::region_request; +use api::v1::{DeleteRequests, RowDeleteRequest, RowDeleteRequests}; +use catalog::CatalogManager; +use client::region_handler::RegionRequestHandler; +use common_query::Output; +use session::context::QueryContextRef; +use snafu::{ensure, OptionExt, ResultExt}; +use table::TableRef; + +use crate::error::{ + CatalogSnafu, InvalidDeleteRequestSnafu, MissingTimeIndexColumnSnafu, RequestDatanodeSnafu, + Result, TableNotFoundSnafu, +}; +use crate::req_convert::delete::{ColumnToRow, RowToRegion}; + +pub(crate) struct Deleter<'a> { + catalog_manager: &'a dyn CatalogManager, + region_request_handler: &'a dyn RegionRequestHandler, +} + +impl<'a> Deleter<'a> { + pub fn new( + catalog_manager: &'a dyn CatalogManager, + region_request_handler: &'a dyn RegionRequestHandler, + ) -> Self { + Self { + catalog_manager, + region_request_handler, + } + } + + pub async fn handle_column_deletes( + &self, + requests: DeleteRequests, + ctx: QueryContextRef, + ) -> Result { + let row_deletes = ColumnToRow::convert(requests)?; + self.handle_row_deletes(row_deletes, ctx).await + } + + pub async fn handle_row_deletes( + &self, + mut requests: RowDeleteRequests, + ctx: QueryContextRef, + ) -> Result { + // remove empty requests + requests.deletes.retain(|req| { + req.rows + .as_ref() + .map(|r| !r.rows.is_empty()) + .unwrap_or_default() + }); + validate_row_count_match(&requests)?; + + let requests = self.trim_columns(requests, &ctx).await?; + let region_request = RowToRegion::new(self.catalog_manager, &ctx) + .convert(requests) + .await?; + let region_request = region_request::Body::Deletes(region_request); + + let affected_rows = self + .region_request_handler + .handle(region_request, ctx) + .await + .context(RequestDatanodeSnafu)?; + Ok(Output::AffectedRows(affected_rows as _)) + } +} + +impl<'a> Deleter<'a> { + async fn trim_columns( + &self, + mut requests: RowDeleteRequests, + ctx: &QueryContextRef, + ) -> Result { + for req in &mut requests.deletes { + let table = self.get_table(req, ctx).await?; + let key_column_names = self.key_column_names(&table)?; + + let rows = req.rows.as_mut().unwrap(); + let all_key = rows + .schema + .iter() + .all(|column| key_column_names.contains(&column.column_name)); + if all_key { + continue; + } + + for row in &mut rows.rows { + let source_values = mem::take(&mut row.values); + row.values = rows + .schema + .iter() + .zip(source_values) + .filter_map(|(column, v)| { + key_column_names.contains(&column.column_name).then_some(v) + }) + .collect(); + } + rows.schema + .retain(|column| key_column_names.contains(&column.column_name)); + } + Ok(requests) + } + + fn key_column_names(&self, table: &TableRef) -> Result> { + let time_index = table + .schema() + .timestamp_column() + .with_context(|| table::error::MissingTimeIndexColumnSnafu { + table_name: table.table_info().name.clone(), + }) + .context(MissingTimeIndexColumnSnafu)? + .name + .clone(); + + let key_column_names = table + .table_info() + .meta + .row_key_column_names() + .cloned() + .chain(iter::once(time_index)) + .collect(); + + Ok(key_column_names) + } + + async fn get_table(&self, req: &RowDeleteRequest, ctx: &QueryContextRef) -> Result { + self.catalog_manager + .table(ctx.current_catalog(), ctx.current_schema(), &req.table_name) + .await + .context(CatalogSnafu)? + .with_context(|| TableNotFoundSnafu { + table_name: req.table_name.clone(), + }) + } +} + +fn validate_row_count_match(requests: &RowDeleteRequests) -> Result<()> { + for request in &requests.deletes { + let rows = request.rows.as_ref().unwrap(); + let column_count = rows.schema.len(); + ensure!( + rows.rows.iter().all(|r| r.values.len() == column_count), + InvalidDeleteRequestSnafu { + reason: "row count mismatch" + } + ) + } + Ok(()) +} diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index 3e1dcecc8a..75632fce06 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -63,6 +63,12 @@ pub enum Error { source: common_meta::error::Error, }, + #[snafu(display("Failed to delete data, source: {}", source))] + RequestDeletes { + #[snafu(backtrace)] + source: common_meta::error::Error, + }, + #[snafu(display("Runtime resource error, source: {}", source))] RuntimeResource { #[snafu(backtrace)] @@ -146,6 +152,9 @@ pub enum Error { #[snafu(display("Invalid InsertRequest, reason: {}", reason))] InvalidInsertRequest { reason: String, location: Location }, + #[snafu(display("Invalid DeleteRequest, reason: {}", reason))] + InvalidDeleteRequest { reason: String, location: Location }, + #[snafu(display("Table not found: {}", table_name))] TableNotFound { table_name: String, @@ -663,6 +672,7 @@ impl ErrorExt for Error { Error::ParseAddr { .. } | Error::InvalidSql { .. } | Error::InvalidInsertRequest { .. } + | Error::InvalidDeleteRequest { .. } | Error::IllegalPrimaryKeysDef { .. } | Error::CatalogNotFound { .. } | Error::SchemaNotFound { .. } @@ -711,6 +721,7 @@ impl ErrorExt for Error { Error::RequestDatanode { source } => source.status_code(), Error::RequestInserts { source } => source.status_code(), + Error::RequestDeletes { source } => source.status_code(), Error::ColumnDataType { source } | Error::InvalidColumnDef { source, .. } => { source.status_code() diff --git a/src/frontend/src/inserter.rs b/src/frontend/src/insert.rs similarity index 87% rename from src/frontend/src/inserter.rs rename to src/frontend/src/insert.rs index 31d66235b0..4283be3e4b 100644 --- a/src/frontend/src/inserter.rs +++ b/src/frontend/src/insert.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub(crate) mod req_convert; - use api::v1::alter_expr::Kind; use api::v1::ddl_request::Expr as DdlExpr; use api::v1::greptime_request::Request; @@ -21,8 +19,8 @@ use api::v1::region::region_request; use api::v1::{ AlterExpr, ColumnSchema, DdlRequest, InsertRequests, RowInsertRequest, RowInsertRequests, }; -use catalog::CatalogManagerRef; -use client::region_handler::RegionRequestHandlerRef; +use catalog::CatalogManager; +use client::region_handler::RegionRequestHandler; use common_catalog::consts::default_engine; use common_grpc_expr::util::{extract_new_columns, ColumnExpr}; use common_query::Output; @@ -34,26 +32,26 @@ use snafu::prelude::*; use table::engine::TableReference; use table::TableRef; -use self::req_convert::{ColumnToRow, RowToRegion}; use crate::error::{ - CatalogSnafu, EmptyDataSnafu, Error, FindNewColumnsOnInsertionSnafu, InvalidInsertRequestSnafu, + CatalogSnafu, Error, FindNewColumnsOnInsertionSnafu, InvalidInsertRequestSnafu, RequestDatanodeSnafu, Result, }; use crate::expr_factory::CreateExprFactory; +use crate::req_convert::insert::{ColumnToRow, RowToRegion}; pub(crate) struct Inserter<'a> { - catalog_manager: &'a CatalogManagerRef, + catalog_manager: &'a dyn CatalogManager, create_expr_factory: &'a CreateExprFactory, grpc_query_handler: &'a GrpcQueryHandlerRef, - region_request_handler: &'a RegionRequestHandlerRef, + region_request_handler: &'a dyn RegionRequestHandler, } impl<'a> Inserter<'a> { pub fn new( - catalog_manager: &'a CatalogManagerRef, + catalog_manager: &'a dyn CatalogManager, create_expr_factory: &'a CreateExprFactory, grpc_query_handler: &'a GrpcQueryHandlerRef, - region_request_handler: &'a RegionRequestHandlerRef, + region_request_handler: &'a dyn RegionRequestHandler, ) -> Self { Self { catalog_manager, @@ -74,29 +72,31 @@ impl<'a> Inserter<'a> { pub async fn handle_row_inserts( &self, - requests: RowInsertRequests, + mut requests: RowInsertRequests, ctx: QueryContextRef, ) -> Result { - requests.inserts.iter().try_for_each(|req| { - let non_empty = req.rows.as_ref().map(|r| !r.rows.is_empty()); - let non_empty = non_empty.unwrap_or_default(); - non_empty.then_some(()).with_context(|| EmptyDataSnafu { - msg: format!("insert to table: {:?}", &req.table_name), - }) - })?; + // remove empty requests + requests.inserts.retain(|req| { + req.rows + .as_ref() + .map(|r| !r.rows.is_empty()) + .unwrap_or_default() + }); + validate_row_count_match(&requests)?; self.create_or_alter_tables_on_demand(&requests, &ctx) .await?; - let region_request = RowToRegion::new(self.catalog_manager.as_ref(), &ctx) + let region_request = RowToRegion::new(self.catalog_manager, &ctx) .convert(requests) .await?; let region_request = region_request::Body::Inserts(region_request); - let response = self + + let affected_rows = self .region_request_handler .handle(region_request, ctx) .await .context(RequestDatanodeSnafu)?; - Ok(Output::AffectedRows(response.affected_rows as _)) + Ok(Output::AffectedRows(affected_rows as _)) } } @@ -205,6 +205,20 @@ impl<'a> Inserter<'a> { } } +fn validate_row_count_match(requests: &RowInsertRequests) -> Result<()> { + for request in &requests.inserts { + let rows = request.rows.as_ref().unwrap(); + let column_count = rows.schema.len(); + ensure!( + rows.rows.iter().all(|r| r.values.len() == column_count), + InvalidInsertRequestSnafu { + reason: "row count mismatch" + } + ) + } + Ok(()) +} + fn validate_request_with_table(req: &RowInsertRequest, table: &TableRef) -> Result<()> { let request_schema = req.rows.as_ref().unwrap().schema.as_slice(); let table_schema = table.schema(); diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 5f6fc6c0ba..80becaea49 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -26,7 +26,7 @@ use std::sync::Arc; use std::time::Duration; use api::v1::meta::Role; -use api::v1::{InsertRequests, RowInsertRequests}; +use api::v1::{DeleteRequests, InsertRequests, RowDeleteRequests, RowInsertRequests}; use async_trait::async_trait; use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; use catalog::remote::CachedMetaKvBackend; @@ -77,6 +77,7 @@ use sqlparser::ast::ObjectName; use self::distributed::DistRegionRequestHandler; use self::standalone::StandaloneRegionRequestHandler; use crate::catalog::FrontendCatalogManager; +use crate::delete::Deleter; use crate::error::{ self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, MissingMetasrvOptsSnafu, ParseSqlSnafu, PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu, @@ -85,7 +86,7 @@ use crate::expr_factory::CreateExprFactory; use crate::frontend::FrontendOptions; use crate::heartbeat::handler::invalidate_table_cache::InvalidateTableCacheHandler; use crate::heartbeat::HeartbeatTask; -use crate::inserter::Inserter; +use crate::insert::Inserter; use crate::instance::standalone::StandaloneGrpcQueryHandler; use crate::metrics; use crate::script::ScriptExecutor; @@ -310,10 +311,10 @@ impl Instance { ctx: QueryContextRef, ) -> Result { let inserter = Inserter::new( - &self.catalog_manager, + self.catalog_manager.as_ref(), &self.create_expr_factory, &self.grpc_query_handler, - &self.region_request_handler, + self.region_request_handler.as_ref(), ); inserter.handle_row_inserts(requests, ctx).await } @@ -325,14 +326,40 @@ impl Instance { ctx: QueryContextRef, ) -> Result { let inserter = Inserter::new( - &self.catalog_manager, + self.catalog_manager.as_ref(), &self.create_expr_factory, &self.grpc_query_handler, - &self.region_request_handler, + self.region_request_handler.as_ref(), ); inserter.handle_column_inserts(requests, ctx).await } + /// Handle batch deletes with row-format + pub async fn handle_row_deletes( + &self, + requests: RowDeleteRequests, + ctx: QueryContextRef, + ) -> Result { + let deleter = Deleter::new( + self.catalog_manager.as_ref(), + self.region_request_handler.as_ref(), + ); + deleter.handle_row_deletes(requests, ctx).await + } + + /// Handle batch deletes + pub async fn handle_deletes( + &self, + requests: DeleteRequests, + ctx: QueryContextRef, + ) -> Result { + let deleter = Deleter::new( + self.catalog_manager.as_ref(), + self.region_request_handler.as_ref(), + ); + deleter.handle_column_deletes(requests, ctx).await + } + pub fn set_plugins(&mut self, map: Arc) { self.plugins = map; } diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index 45d7ca1221..f4452d5447 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -21,10 +21,8 @@ use std::sync::Arc; use api::helper::ColumnDataTypeWrapper; use api::v1::ddl_request::Expr as DdlExpr; use api::v1::greptime_request::Request; -use api::v1::region::{region_request, QueryRequest, RegionResponse}; -use api::v1::{ - column_def, AlterExpr, CreateDatabaseExpr, CreateTableExpr, DeleteRequests, TruncateTableExpr, -}; +use api::v1::region::{region_request, QueryRequest}; +use api::v1::{column_def, AlterExpr, CreateDatabaseExpr, CreateTableExpr, TruncateTableExpr}; use arrow_flight::Ticket; use async_trait::async_trait; use catalog::{CatalogManager, DeregisterTableRequest, RegisterTableRequest}; @@ -35,6 +33,7 @@ use client::region_handler::RegionRequestHandler; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_catalog::format_full_table_name; use common_error::ext::BoxedError; +use common_meta::datanode_manager::AffectedRows; use common_meta::ddl::{DdlExecutorRef, ExecutorContext}; use common_meta::key::schema_name::{SchemaNameKey, SchemaNameValue}; use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse}; @@ -71,7 +70,6 @@ use crate::error::{ TableMetadataManagerSnafu, TableNotFoundSnafu, UnrecognizedTableOptionSnafu, }; use crate::expr_factory; -use crate::inserter::req_convert::StatementToRegion; use crate::instance::distributed::deleter::DistDeleter; use crate::instance::distributed::inserter::DistInserter; use crate::table::DistTable; @@ -269,14 +267,6 @@ impl DistInstance { let table_name = TableName::new(catalog, schema, table); self.drop_table(table_name).await } - Statement::Insert(insert) => { - let request = StatementToRegion::new(self.catalog_manager.as_ref(), &query_ctx) - .convert(&insert) - .await?; - let inserter = DistInserter::new(&self.catalog_manager); - let affected_rows = inserter.insert(request).await?; - Ok(Output::AffectedRows(affected_rows as usize)) - } Statement::ShowCreateTable(show) => { let (catalog, schema, table) = table_idents_to_full_name(&show.table_name, query_ctx.clone()) @@ -507,20 +497,6 @@ impl DistInstance { .context(error::ExecuteDdlSnafu) } - async fn handle_dist_delete( - &self, - request: DeleteRequests, - ctx: QueryContextRef, - ) -> Result { - let deleter = DistDeleter::new( - ctx.current_catalog().to_string(), - ctx.current_schema().to_string(), - self.catalog_manager(), - ); - let affected_rows = deleter.grpc_delete(request).await?; - Ok(Output::AffectedRows(affected_rows)) - } - pub fn catalog_manager(&self) -> Arc { self.catalog_manager.clone() } @@ -547,6 +523,7 @@ impl GrpcQueryHandler for DistInstance { async fn do_query(&self, request: Request, ctx: QueryContextRef) -> Result { match request { Request::Inserts(_) => NotSupportedSnafu { feat: "inserts" }.fail(), + Request::Deletes(_) => NotSupportedSnafu { feat: "deletes" }.fail(), Request::RowInserts(_) => NotSupportedSnafu { feat: "row inserts", } @@ -555,7 +532,6 @@ impl GrpcQueryHandler for DistInstance { feat: "row deletes", } .fail(), - Request::Deletes(requests) => self.handle_dist_delete(requests, ctx).await, Request::Query(_) => { unreachable!("Query should have been handled directly in Frontend Instance!") } @@ -602,7 +578,7 @@ impl RegionRequestHandler for DistRegionRequestHandler { &self, request: region_request::Body, ctx: QueryContextRef, - ) -> ClientResult { + ) -> ClientResult { self.handle_inner(request, ctx) .await .map_err(BoxedError::new) @@ -622,21 +598,17 @@ impl DistRegionRequestHandler { &self, request: region_request::Body, ctx: QueryContextRef, - ) -> Result { + ) -> Result { match request { region_request::Body::Inserts(inserts) => { let inserter = DistInserter::new(&self.catalog_manager).with_trace_id(ctx.trace_id()); - let affected_rows = inserter.insert(inserts).await? as _; - Ok(RegionResponse { - header: Some(Default::default()), - affected_rows, - }) + inserter.insert(inserts).await } - region_request::Body::Deletes(_) => NotSupportedSnafu { - feat: "region deletes", + region_request::Body::Deletes(deletes) => { + let deleter = DistDeleter::new(&self.catalog_manager).with_trace_id(ctx.trace_id()); + deleter.delete(deletes).await } - .fail(), region_request::Body::Create(_) => NotSupportedSnafu { feat: "region create", } diff --git a/src/frontend/src/instance/distributed/deleter.rs b/src/frontend/src/instance/distributed/deleter.rs index 3586e4a156..7a5c0d489b 100644 --- a/src/frontend/src/instance/distributed/deleter.rs +++ b/src/frontend/src/instance/distributed/deleter.rs @@ -13,164 +13,123 @@ // limitations under the License. use std::collections::HashMap; -use std::iter; -use std::sync::Arc; -use api::v1::DeleteRequests; -use catalog::CatalogManager; -use client::Database; -use common_grpc_expr::delete::to_table_delete_request; +use api::v1::region::{region_request, DeleteRequests, RegionRequest, RegionRequestHeader}; +use common_meta::datanode_manager::{AffectedRows, DatanodeManager}; use common_meta::peer::Peer; -use common_meta::table_name::TableName; use futures::future; +use metrics::counter; use snafu::{OptionExt, ResultExt}; -use table::requests::DeleteRequest; +use store_api::storage::RegionId; use crate::catalog::FrontendCatalogManager; use crate::error::{ - CatalogSnafu, FindDatanodeSnafu, FindTableRouteSnafu, JoinTaskSnafu, - MissingTimeIndexColumnSnafu, RequestDatanodeSnafu, Result, SplitDeleteSnafu, - TableNotFoundSnafu, ToTableDeleteRequestSnafu, + FindDatanodeSnafu, FindTableRouteSnafu, JoinTaskSnafu, RequestDeletesSnafu, Result, + SplitDeleteSnafu, }; -use crate::table::delete::to_grpc_delete_request; -/// A distributed deleter. It ingests GRPC [DeleteRequests] or table [DeleteRequest] (so it can be -/// used in protocol handlers or table deletion API). +/// A distributed deleter. It ingests gRPC [DeleteRequests]. /// /// Table data partitioning and Datanode requests batching are handled inside. -/// -/// Note that the deleter is confined to a single catalog and schema. I.e., it cannot handle -/// multiple deletes requests with different catalog or schema (will throw "NotSupported" error). -/// This is because we currently do not have this kind of requirements. Let's keep it simple for now. -pub(crate) struct DistDeleter { - catalog: String, - schema: String, - catalog_manager: Arc, +pub struct DistDeleter<'a> { + catalog_manager: &'a FrontendCatalogManager, + trace_id: Option, + span_id: Option, } -impl DistDeleter { - pub(crate) fn new( - catalog: String, - schema: String, - catalog_manager: Arc, - ) -> Self { +impl<'a> DistDeleter<'a> { + pub(crate) fn new(catalog_manager: &'a FrontendCatalogManager) -> Self { Self { - catalog, - schema, catalog_manager, + trace_id: None, + span_id: None, } } - pub async fn grpc_delete(&self, requests: DeleteRequests) -> Result { - let deletes = requests - .deletes - .into_iter() - .map(|delete| { - to_table_delete_request(&self.catalog, &self.schema, delete) - .context(ToTableDeleteRequestSnafu) - }) - .collect::>>()?; - self.delete(deletes).await + pub fn with_trace_id(mut self, trace_id: u64) -> Self { + self.trace_id = Some(trace_id); + self } - pub(crate) async fn delete(&self, requests: Vec) -> Result { - debug_assert!(requests - .iter() - .all(|x| x.catalog_name == self.catalog && x.schema_name == self.schema)); - let deletes = self.split_deletes(requests).await?; - self.request_datanodes(deletes).await + #[allow(dead_code)] + pub fn with_span_id(mut self, span_id: u64) -> Self { + self.span_id = Some(span_id); + self } - async fn split_deletes( - &self, - requests: Vec, - ) -> Result> { - let partition_manager = self.catalog_manager.partition_manager(); - - let mut deletes = HashMap::new(); - - for request in requests { - let table_name = &request.table_name; - let table = self - .catalog_manager - .table(&self.catalog, &self.schema, table_name) - .await - .context(CatalogSnafu)? - .with_context(|| TableNotFoundSnafu { - table_name: table_name.to_string(), - })?; - let table_info = table.table_info(); - let table_meta = &table_info.meta; - - let table_id = table_info.table_id(); - let table_name = &request.table_name; - let schema = table.schema(); - let time_index = &schema - .timestamp_column() - .with_context(|| table::error::MissingTimeIndexColumnSnafu { - table_name: table_name.to_string(), - }) - .context(MissingTimeIndexColumnSnafu)? - .name; - let primary_key_column_names = table_info - .meta - .row_key_column_names() - .chain(iter::once(time_index)) - .collect::>(); - let table_name = request.table_name.clone(); - let split = partition_manager - .split_delete_request(table_id, request, primary_key_column_names) - .await - .context(SplitDeleteSnafu)?; - let table_route = partition_manager - .find_table_route(table_id) - .await - .with_context(|_| FindTableRouteSnafu { table_id })?; - - for (region_number, delete) in split { - let datanode = - table_route - .find_region_leader(region_number) - .context(FindDatanodeSnafu { - region: region_number, - })?; - let table_name = TableName::new(&self.catalog, &self.schema, &table_name); - let delete = - to_grpc_delete_request(table_meta, &table_name, region_number, delete)?; - deletes - .entry(datanode.clone()) - .or_insert_with(|| DeleteRequests { deletes: vec![] }) - .deletes - .push(delete); - } - } - Ok(deletes) - } - - async fn request_datanodes(&self, deletes: HashMap) -> Result { - let results = future::try_join_all(deletes.into_iter().map(|(peer, deletes)| { + pub(crate) async fn delete(&self, requests: DeleteRequests) -> Result { + let requests = self.split(requests).await?; + let trace_id = self.trace_id.unwrap_or_default(); + let span_id = self.span_id.unwrap_or_default(); + let results = future::try_join_all(requests.into_iter().map(|(peer, deletes)| { let datanode_clients = self.catalog_manager.datanode_clients(); - let catalog = self.catalog.clone(); - let schema = self.schema.clone(); - common_runtime::spawn_write(async move { - let client = datanode_clients.get_client(&peer).await; - let database = Database::new(&catalog, &schema, client); - database.delete(deletes).await.context(RequestDatanodeSnafu) + let request = RegionRequest { + header: Some(RegionRequestHeader { trace_id, span_id }), + body: Some(region_request::Body::Deletes(deletes)), + }; + datanode_clients + .datanode(&peer) + .await + .handle(request) + .await + .context(RequestDeletesSnafu) }) })) .await .context(JoinTaskSnafu)?; - let affected_rows = results.into_iter().sum::>()?; - Ok(affected_rows as usize) + let affected_rows = results.into_iter().sum::>()?; + counter!(crate::metrics::DIST_DELETE_ROW_COUNT, affected_rows); + Ok(affected_rows) + } + + /// Splits gRPC [DeleteRequests] into multiple gRPC [DeleteRequests]s, each of which + /// is grouped by the peer of Datanode, so we can batch them together when invoking gRPC write + /// method in Datanode. + async fn split(&self, requests: DeleteRequests) -> Result> { + let partition_manager = self.catalog_manager.partition_manager(); + let mut deletes: HashMap = HashMap::new(); + + for req in requests.requests { + let table_id = RegionId::from_u64(req.region_id).table_id(); + + let req_splits = partition_manager + .split_delete_request(table_id, req) + .await + .context(SplitDeleteSnafu)?; + let table_route = partition_manager + .find_table_route(table_id) + .await + .context(FindTableRouteSnafu { table_id })?; + + for (region_number, delete) in req_splits { + let peer = + table_route + .find_region_leader(region_number) + .context(FindDatanodeSnafu { + region: region_number, + })?; + deletes + .entry(peer.clone()) + .or_default() + .requests + .push(delete); + } + } + + Ok(deletes) } } #[cfg(test)] mod tests { - use api::v1::column::Values; - use api::v1::{Column, ColumnDataType, DeleteRequest as GrpcDeleteRequest, SemanticType}; + use std::sync::Arc; + + use api::helper::vectors_to_rows; + use api::v1::region::DeleteRequest; + use api::v1::value::ValueData; + use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value}; use client::client_manager::DatanodeClients; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_meta::helper::{CatalogValue, SchemaValue}; @@ -182,7 +141,7 @@ mod tests { use common_meta::rpc::router::{Region, RegionRoute}; use common_meta::rpc::store::PutRequest; use datatypes::prelude::{ConcreteDataType, VectorRef}; - use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, Schema}; + use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema as DtColumnSchema, Schema}; use datatypes::vectors::Int32Vector; use table::metadata::{RawTableInfo, TableInfoBuilder, TableMetaBuilder}; @@ -220,14 +179,14 @@ mod tests { table_metadata_manager: &TableMetadataManagerRef, ) { let schema = Arc::new(Schema::new(vec![ - ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), false) + DtColumnSchema::new("ts", ConcreteDataType::int64_datatype(), false) .with_time_index(true) .with_default_constraint(Some(ColumnDefaultConstraint::Function( "current_timestamp()".to_string(), ))) .unwrap(), - ColumnSchema::new("a", ConcreteDataType::int32_datatype(), true), - ColumnSchema::new("value", ConcreteDataType::int32_datatype(), false), + DtColumnSchema::new("a", ConcreteDataType::int32_datatype(), true), + DtColumnSchema::new("value", ConcreteDataType::int32_datatype(), false), ])); let table_meta = TableMetaBuilder::default() @@ -284,56 +243,60 @@ mod tests { )); let new_delete_request = |vector: VectorRef| -> DeleteRequest { + let row_count = vector.len(); DeleteRequest { - catalog_name: DEFAULT_CATALOG_NAME.to_string(), - schema_name: DEFAULT_SCHEMA_NAME.to_string(), - table_name: table_name.to_string(), - key_column_values: HashMap::from([("a".to_string(), vector)]), + region_id: RegionId::new(1, 0).into(), + rows: Some(Rows { + schema: vec![ColumnSchema { + column_name: "a".to_string(), + datatype: ColumnDataType::Int32 as i32, + semantic_type: SemanticType::Tag as i32, + }], + rows: vectors_to_rows([vector].iter(), row_count), + }), } }; - let requests = vec![ - new_delete_request(Arc::new(Int32Vector::from(vec![ - Some(1), - Some(11), - Some(50), - ]))), - new_delete_request(Arc::new(Int32Vector::from(vec![ - Some(2), - Some(12), - Some(102), - ]))), - ]; + let requests = DeleteRequests { + requests: vec![ + new_delete_request(Arc::new(Int32Vector::from(vec![ + Some(1), + Some(11), + Some(50), + ]))), + new_delete_request(Arc::new(Int32Vector::from(vec![ + Some(2), + Some(12), + Some(102), + ]))), + ], + }; - let deleter = DistDeleter::new( - DEFAULT_CATALOG_NAME.to_string(), - DEFAULT_SCHEMA_NAME.to_string(), - catalog_manager, - ); - let mut deletes = deleter.split_deletes(requests).await.unwrap(); + let deleter = DistDeleter::new(&catalog_manager); + let mut deletes = deleter.split(requests).await.unwrap(); assert_eq!(deletes.len(), 3); - let new_grpc_delete_request = |column_values: Vec, - null_mask: Vec, - row_count: u32, - region_number: u32| - -> GrpcDeleteRequest { - GrpcDeleteRequest { - table_name: table_name.to_string(), - key_columns: vec![Column { - column_name: "a".to_string(), - semantic_type: SemanticType::Tag as i32, - values: Some(Values { - i32_values: column_values, - ..Default::default() + let new_split_delete_request = + |rows: Vec>, region_id: RegionId| -> DeleteRequest { + DeleteRequest { + region_id: region_id.into(), + rows: Some(Rows { + schema: vec![ColumnSchema { + column_name: "a".to_string(), + datatype: ColumnDataType::Int32 as i32, + semantic_type: SemanticType::Tag as i32, + }], + rows: rows + .into_iter() + .map(|v| Row { + values: vec![Value { + value_data: v.map(ValueData::I32Value), + }], + }) + .collect(), }), - null_mask, - datatype: ColumnDataType::Int32 as i32, - }], - row_count, - region_number, - } - }; + } + }; // region to datanode placement: // 1 -> 1 @@ -345,38 +308,38 @@ mod tests { // 2 -> [10, 50) // 3 -> (min, 10) - let datanode_deletes = deletes.remove(&Peer::new(1, "")).unwrap().deletes; + let datanode_deletes = deletes.remove(&Peer::new(1, "")).unwrap().requests; assert_eq!(datanode_deletes.len(), 2); assert_eq!( datanode_deletes[0], - new_grpc_delete_request(vec![50], vec![0], 1, 1) + new_split_delete_request(vec![Some(50)], RegionId::new(1, 1)) ); assert_eq!( datanode_deletes[1], - new_grpc_delete_request(vec![102], vec![0], 1, 1) + new_split_delete_request(vec![Some(102)], RegionId::new(1, 1)) ); - let datanode_deletes = deletes.remove(&Peer::new(2, "")).unwrap().deletes; + let datanode_deletes = deletes.remove(&Peer::new(2, "")).unwrap().requests; assert_eq!(datanode_deletes.len(), 2); assert_eq!( datanode_deletes[0], - new_grpc_delete_request(vec![11], vec![0], 1, 2) + new_split_delete_request(vec![Some(11)], RegionId::new(1, 2)) ); assert_eq!( datanode_deletes[1], - new_grpc_delete_request(vec![12], vec![0], 1, 2) + new_split_delete_request(vec![Some(12)], RegionId::new(1, 2)) ); - let datanode_deletes = deletes.remove(&Peer::new(3, "")).unwrap().deletes; + let datanode_deletes = deletes.remove(&Peer::new(3, "")).unwrap().requests; assert_eq!(datanode_deletes.len(), 2); assert_eq!( datanode_deletes[0], - new_grpc_delete_request(vec![1], vec![0], 1, 3) + new_split_delete_request(vec![Some(1)], RegionId::new(1, 3)) ); assert_eq!( datanode_deletes[1], - new_grpc_delete_request(vec![2], vec![0], 1, 3) + new_split_delete_request(vec![Some(2)], RegionId::new(1, 3)) ); } } diff --git a/src/frontend/src/instance/distributed/inserter.rs b/src/frontend/src/instance/distributed/inserter.rs index 459e017bd2..83d8bf6097 100644 --- a/src/frontend/src/instance/distributed/inserter.rs +++ b/src/frontend/src/instance/distributed/inserter.rs @@ -15,7 +15,7 @@ use std::collections::HashMap; use api::v1::region::{region_request, InsertRequests, RegionRequest, RegionRequestHeader}; -use common_meta::datanode_manager::DatanodeManager; +use common_meta::datanode_manager::{AffectedRows, DatanodeManager}; use common_meta::peer::Peer; use futures_util::future; use metrics::counter; @@ -57,7 +57,7 @@ impl<'a> DistInserter<'a> { self } - pub(crate) async fn insert(&self, requests: InsertRequests) -> Result { + pub(crate) async fn insert(&self, requests: InsertRequests) -> Result { let requests = self.split(requests).await?; let trace_id = self.trace_id.unwrap_or_default(); let span_id = self.span_id.unwrap_or_default(); diff --git a/src/frontend/src/instance/grpc.rs b/src/frontend/src/instance/grpc.rs index e888cff1db..97c387a743 100644 --- a/src/frontend/src/instance/grpc.rs +++ b/src/frontend/src/instance/grpc.rs @@ -45,12 +45,8 @@ impl GrpcQueryHandler for Instance { let output = match request { Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?, Request::RowInserts(requests) => self.handle_row_inserts(requests, ctx.clone()).await?, - Request::RowDeletes(_) => { - return NotSupportedSnafu { - feat: "row deletes", - } - .fail(); - } + Request::Deletes(requests) => self.handle_deletes(requests, ctx.clone()).await?, + Request::RowDeletes(requests) => self.handle_row_deletes(requests, ctx.clone()).await?, Request::Query(query_request) => { let query = query_request.query.context(IncompleteGrpcResultSnafu { err_msg: "Missing field 'QueryRequest.query'", @@ -91,7 +87,7 @@ impl GrpcQueryHandler for Instance { } } } - Request::Ddl(_) | Request::Deletes(_) => { + Request::Ddl(_) => { GrpcQueryHandler::do_query(self.grpc_query_handler.as_ref(), request, ctx.clone()) .await? } diff --git a/src/frontend/src/instance/standalone.rs b/src/frontend/src/instance/standalone.rs index f63c198170..6d151b75f4 100644 --- a/src/frontend/src/instance/standalone.rs +++ b/src/frontend/src/instance/standalone.rs @@ -15,11 +15,13 @@ use std::sync::Arc; use api::v1::greptime_request::Request; -use api::v1::region::{region_request, QueryRequest, RegionResponse}; +use api::v1::region::{region_request, QueryRequest}; use async_trait::async_trait; use client::error::{HandleRequestSnafu, Result as ClientResult}; +use client::region::check_response_header; use client::region_handler::RegionRequestHandler; use common_error::ext::BoxedError; +use common_meta::datanode_manager::AffectedRows; use common_query::Output; use common_recordbatch::SendableRecordBatchStream; use datanode::error::Error as DatanodeError; @@ -67,13 +69,17 @@ impl RegionRequestHandler for StandaloneRegionRequestHandler { &self, request: region_request::Body, _ctx: QueryContextRef, - ) -> ClientResult { - self.region_server + ) -> ClientResult { + let response = self + .region_server .handle(request) .await .context(InvokeRegionServerSnafu) .map_err(BoxedError::new) - .context(HandleRequestSnafu) + .context(HandleRequestSnafu)?; + + check_response_header(response.header)?; + Ok(response.affected_rows) } async fn do_get(&self, request: QueryRequest) -> ClientResult { diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index 261b049f20..d9279012a2 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -16,13 +16,15 @@ #![feature(trait_upcasting)] pub mod catalog; +pub(crate) mod delete; pub mod error; pub mod expr_factory; pub mod frontend; pub mod heartbeat; -pub(crate) mod inserter; +pub(crate) mod insert; pub mod instance; pub(crate) mod metrics; +pub(crate) mod req_convert; mod script; mod server; pub mod service_config; diff --git a/src/frontend/src/metrics.rs b/src/frontend/src/metrics.rs index cbfcab962c..3414bf54bf 100644 --- a/src/frontend/src/metrics.rs +++ b/src/frontend/src/metrics.rs @@ -22,6 +22,7 @@ pub(crate) const METRIC_RUN_SCRIPT_ELAPSED: &str = "frontend.run_script_elapsed" /// Metrics for creating table in dist mode. pub const DIST_CREATE_TABLE: &str = "frontend.dist.create_table"; pub const DIST_INGEST_ROW_COUNT: &str = "frontend.dist.ingest_rows"; +pub const DIST_DELETE_ROW_COUNT: &str = "frontend.dist.delete_rows"; /// The samples count of Prometheus remote write. pub const PROM_STORE_REMOTE_WRITE_SAMPLES: &str = "frontend.prometheus.remote_write.samples"; diff --git a/src/frontend/src/req_convert.rs b/src/frontend/src/req_convert.rs new file mode 100644 index 0000000000..e3a04044be --- /dev/null +++ b/src/frontend/src/req_convert.rs @@ -0,0 +1,17 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod common; +pub mod delete; +pub mod insert; diff --git a/src/frontend/src/inserter/req_convert/column_to_row.rs b/src/frontend/src/req_convert/common.rs similarity index 58% rename from src/frontend/src/inserter/req_convert/column_to_row.rs rename to src/frontend/src/req_convert/common.rs index 2655c4123d..73e5c4eadc 100644 --- a/src/frontend/src/inserter/req_convert/column_to_row.rs +++ b/src/frontend/src/req_convert/common.rs @@ -12,34 +12,26 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; + use api::helper::ColumnDataTypeWrapper; use api::v1::value::ValueData; -use api::v1::{ - Column, ColumnDataType, ColumnSchema, InsertRequest, InsertRequests, Row, RowInsertRequest, - RowInsertRequests, Rows, Value, -}; +use api::v1::{Column, ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value}; use common_base::BitVec; +use datatypes::prelude::ConcreteDataType; +use datatypes::vectors::VectorRef; use snafu::prelude::*; use snafu::ResultExt; +use table::metadata::TableInfo; -use crate::error::{ColumnDataTypeSnafu, InvalidInsertRequestSnafu, Result}; +use crate::error::{ + ColumnDataTypeSnafu, ColumnNotFoundSnafu, InvalidInsertRequestSnafu, + MissingTimeIndexColumnSnafu, Result, +}; -pub struct ColumnToRow; - -impl ColumnToRow { - pub fn convert(requests: InsertRequests) -> Result { - requests - .inserts - .into_iter() - .map(request_column_to_row) - .collect::>>() - .map(|inserts| RowInsertRequests { inserts }) - } -} - -fn request_column_to_row(request: InsertRequest) -> Result { - let row_count = request.row_count as usize; - let column_count = request.columns.len(); +pub fn columns_to_rows(columns: Vec, row_count: u32) -> Result { + let row_count = row_count as usize; + let column_count = columns.len(); let mut schema = Vec::with_capacity(column_count); let mut rows = vec![ Row { @@ -47,7 +39,7 @@ fn request_column_to_row(request: InsertRequest) -> Result { }; row_count ]; - for column in request.columns { + for column in columns { let column_schema = ColumnSchema { column_name: column.column_name.clone(), datatype: column.datatype, @@ -58,11 +50,7 @@ fn request_column_to_row(request: InsertRequest) -> Result { push_column_to_rows(column, &mut rows)?; } - Ok(RowInsertRequest { - table_name: request.table_name, - rows: Some(Rows { schema, rows }), - region_number: request.region_number, - }) + Ok(Rows { schema, rows }) } fn push_column_to_rows(column: Column, rows: &mut [Row]) -> Result<()> { @@ -168,6 +156,74 @@ fn push_column_to_rows(column: Column, rows: &mut [Row]) -> Result<()> { Ok(()) } +pub fn row_count(columns: &HashMap) -> Result { + let mut columns_iter = columns.values(); + + let len = columns_iter + .next() + .map(|column| column.len()) + .unwrap_or_default(); + ensure!( + columns_iter.all(|column| column.len() == len), + InvalidInsertRequestSnafu { + reason: "The row count of columns is not the same." + } + ); + + Ok(len) +} + +pub fn column_schema( + table_info: &TableInfo, + columns: &HashMap, +) -> Result> { + columns + .iter() + .map(|(column_name, vector)| { + Ok(ColumnSchema { + column_name: column_name.clone(), + datatype: data_type(vector.data_type())?.into(), + semantic_type: semantic_type(table_info, column_name)?.into(), + }) + }) + .collect::>>() +} + +fn semantic_type(table_info: &TableInfo, column: &str) -> Result { + let table_meta = &table_info.meta; + let table_schema = &table_meta.schema; + + let time_index_column = &table_schema + .timestamp_column() + .with_context(|| table::error::MissingTimeIndexColumnSnafu { + table_name: table_info.name.to_string(), + }) + .context(MissingTimeIndexColumnSnafu)? + .name; + + let semantic_type = if column == time_index_column { + SemanticType::Timestamp + } else { + let column_index = table_schema.column_index_by_name(column); + let column_index = column_index.context(ColumnNotFoundSnafu { + msg: format!("unable to find column {column} in table schema"), + })?; + + if table_meta.primary_key_indices.contains(&column_index) { + SemanticType::Tag + } else { + SemanticType::Field + } + }; + + Ok(semantic_type) +} + +fn data_type(data_type: ConcreteDataType) -> Result { + let datatype: ColumnDataTypeWrapper = data_type.try_into().context(ColumnDataTypeSnafu)?; + Ok(datatype.datatype()) +} + #[cfg(test)] mod tests { use api::v1::column::Values; @@ -178,43 +234,36 @@ mod tests { #[test] fn test_request_column_to_row() { - let insert_request = InsertRequest { - table_name: String::from("test_table"), - row_count: 3, - region_number: 1, - columns: vec![ - Column { - column_name: String::from("col1"), - datatype: ColumnDataType::Int32.into(), - semantic_type: SemanticType::Field.into(), - null_mask: bitvec![u8, Lsb0; 1, 0, 1].into_vec(), - values: Some(Values { - i32_values: vec![42], - ..Default::default() - }), - }, - Column { - column_name: String::from("col2"), - datatype: ColumnDataType::String.into(), - semantic_type: SemanticType::Tag.into(), - null_mask: vec![], - values: Some(Values { - string_values: vec![ - String::from("value1"), - String::from("value2"), - String::from("value3"), - ], - ..Default::default() - }), - }, - ], - }; + let columns = vec![ + Column { + column_name: String::from("col1"), + datatype: ColumnDataType::Int32.into(), + semantic_type: SemanticType::Field.into(), + null_mask: bitvec![u8, Lsb0; 1, 0, 1].into_vec(), + values: Some(Values { + i32_values: vec![42], + ..Default::default() + }), + }, + Column { + column_name: String::from("col2"), + datatype: ColumnDataType::String.into(), + semantic_type: SemanticType::Tag.into(), + null_mask: vec![], + values: Some(Values { + string_values: vec![ + String::from("value1"), + String::from("value2"), + String::from("value3"), + ], + ..Default::default() + }), + }, + ]; + let row_count = 3; - let result = request_column_to_row(insert_request); - let row_insert_request = result.unwrap(); - assert_eq!(row_insert_request.table_name, "test_table"); - assert_eq!(row_insert_request.region_number, 1); - let rows = row_insert_request.rows.unwrap(); + let result = columns_to_rows(columns, row_count); + let rows = result.unwrap(); assert_eq!(rows.schema.len(), 2); assert_eq!(rows.schema[0].column_name, "col1"); @@ -250,55 +299,46 @@ mod tests { Some(ValueData::StringValue(String::from("value3"))) ); - let invalid_request_with_wrong_type = InsertRequest { - table_name: String::from("test_table"), - row_count: 3, - region_number: 1, - columns: vec![Column { - column_name: String::from("col1"), - datatype: ColumnDataType::Int32.into(), - semantic_type: SemanticType::Field.into(), - null_mask: bitvec![u8, Lsb0; 1, 0, 1].into_vec(), - values: Some(Values { - i8_values: vec![42], - ..Default::default() - }), - }], - }; - assert!(request_column_to_row(invalid_request_with_wrong_type).is_err()); + // wrong type + let columns = vec![Column { + column_name: String::from("col1"), + datatype: ColumnDataType::Int32.into(), + semantic_type: SemanticType::Field.into(), + null_mask: bitvec![u8, Lsb0; 1, 0, 1].into_vec(), + values: Some(Values { + i8_values: vec![42], + ..Default::default() + }), + }]; + let row_count = 3; + assert!(columns_to_rows(columns, row_count).is_err()); - let invalid_request_with_wrong_row_count = InsertRequest { - table_name: String::from("test_table"), - row_count: 3, - region_number: 1, - columns: vec![Column { - column_name: String::from("col1"), - datatype: ColumnDataType::Int32.into(), - semantic_type: SemanticType::Field.into(), - null_mask: bitvec![u8, Lsb0; 0, 0, 1].into_vec(), - values: Some(Values { - i32_values: vec![42], - ..Default::default() - }), - }], - }; - assert!(request_column_to_row(invalid_request_with_wrong_row_count).is_err()); + // wrong row count + let columns = vec![Column { + column_name: String::from("col1"), + datatype: ColumnDataType::Int32.into(), + semantic_type: SemanticType::Field.into(), + null_mask: bitvec![u8, Lsb0; 0, 0, 1].into_vec(), + values: Some(Values { + i32_values: vec![42], + ..Default::default() + }), + }]; + let row_count = 3; + assert!(columns_to_rows(columns, row_count).is_err()); - let invalid_request_with_wrong_row_count = InsertRequest { - table_name: String::from("test_table"), - row_count: 3, - region_number: 1, - columns: vec![Column { - column_name: String::from("col1"), - datatype: ColumnDataType::Int32.into(), - semantic_type: SemanticType::Field.into(), - null_mask: vec![], - values: Some(Values { - i32_values: vec![42], - ..Default::default() - }), - }], - }; - assert!(request_column_to_row(invalid_request_with_wrong_row_count).is_err()); + // wrong row count + let columns = vec![Column { + column_name: String::from("col1"), + datatype: ColumnDataType::Int32.into(), + semantic_type: SemanticType::Field.into(), + null_mask: vec![], + values: Some(Values { + i32_values: vec![42], + ..Default::default() + }), + }]; + let row_count = 3; + assert!(columns_to_rows(columns, row_count).is_err()); } } diff --git a/src/frontend/src/req_convert/delete.rs b/src/frontend/src/req_convert/delete.rs new file mode 100644 index 0000000000..cdf9fed5ab --- /dev/null +++ b/src/frontend/src/req_convert/delete.rs @@ -0,0 +1,21 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod column_to_row; +mod row_to_region; +mod table_to_region; + +pub use column_to_row::ColumnToRow; +pub use row_to_region::RowToRegion; +pub use table_to_region::TableToRegion; diff --git a/src/frontend/src/req_convert/delete/column_to_row.rs b/src/frontend/src/req_convert/delete/column_to_row.rs new file mode 100644 index 0000000000..7e1cc3fda4 --- /dev/null +++ b/src/frontend/src/req_convert/delete/column_to_row.rs @@ -0,0 +1,40 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::{DeleteRequest, DeleteRequests, RowDeleteRequest, RowDeleteRequests}; + +use crate::error::Result; +use crate::req_convert::common::columns_to_rows; + +pub struct ColumnToRow; + +impl ColumnToRow { + pub fn convert(requests: DeleteRequests) -> Result { + requests + .deletes + .into_iter() + .map(request_column_to_row) + .collect::>>() + .map(|deletes| RowDeleteRequests { deletes }) + } +} + +fn request_column_to_row(request: DeleteRequest) -> Result { + let rows = columns_to_rows(request.key_columns, request.row_count)?; + Ok(RowDeleteRequest { + table_name: request.table_name, + rows: Some(rows), + region_number: request.region_number, + }) +} diff --git a/src/frontend/src/req_convert/delete/row_to_region.rs b/src/frontend/src/req_convert/delete/row_to_region.rs new file mode 100644 index 0000000000..5d09beffdf --- /dev/null +++ b/src/frontend/src/req_convert/delete/row_to_region.rs @@ -0,0 +1,69 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::region::{ + DeleteRequest as RegionDeleteRequest, DeleteRequests as RegionDeleteRequests, +}; +use api::v1::RowDeleteRequests; +use catalog::CatalogManager; +use session::context::QueryContext; +use snafu::{OptionExt, ResultExt}; +use store_api::storage::RegionId; +use table::TableRef; + +use crate::error::{CatalogSnafu, Result, TableNotFoundSnafu}; + +pub struct RowToRegion<'a> { + catalog_manager: &'a dyn CatalogManager, + ctx: &'a QueryContext, +} + +impl<'a> RowToRegion<'a> { + pub fn new(catalog_manager: &'a dyn CatalogManager, ctx: &'a QueryContext) -> Self { + Self { + catalog_manager, + ctx, + } + } + + pub async fn convert(&self, requests: RowDeleteRequests) -> Result { + let mut region_request = Vec::with_capacity(requests.deletes.len()); + for request in requests.deletes { + let table = self.get_table(&request.table_name).await?; + + let region_id = RegionId::new(table.table_info().table_id(), request.region_number); + let insert_request = RegionDeleteRequest { + region_id: region_id.into(), + rows: request.rows, + }; + region_request.push(insert_request); + } + + Ok(RegionDeleteRequests { + requests: region_request, + }) + } + + async fn get_table(&self, table_name: &str) -> Result { + let catalog_name = self.ctx.current_catalog(); + let schema_name = self.ctx.current_schema(); + self.catalog_manager + .table(catalog_name, schema_name, table_name) + .await + .context(CatalogSnafu)? + .with_context(|| TableNotFoundSnafu { + table_name: format!("{}.{}.{}", catalog_name, schema_name, table_name), + }) + } +} diff --git a/src/frontend/src/req_convert/delete/table_to_region.rs b/src/frontend/src/req_convert/delete/table_to_region.rs new file mode 100644 index 0000000000..ab51de97e0 --- /dev/null +++ b/src/frontend/src/req_convert/delete/table_to_region.rs @@ -0,0 +1,164 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::region::{ + DeleteRequest as RegionDeleteRequest, DeleteRequests as RegionDeleteRequests, +}; +use api::v1::Rows; +use store_api::storage::RegionId; +use table::metadata::TableInfo; +use table::requests::DeleteRequest as TableDeleteRequest; + +use crate::error::Result; +use crate::req_convert::common::{column_schema, row_count}; + +pub struct TableToRegion<'a> { + table_info: &'a TableInfo, +} + +impl<'a> TableToRegion<'a> { + pub fn new(table_info: &'a TableInfo) -> Self { + Self { table_info } + } + + pub fn convert(&self, request: TableDeleteRequest) -> Result { + let region_id = RegionId::new(self.table_info.table_id(), 0).into(); + let row_count = row_count(&request.key_column_values)?; + let schema = column_schema(self.table_info, &request.key_column_values)?; + let rows = api::helper::vectors_to_rows(request.key_column_values.values(), row_count); + Ok(RegionDeleteRequests { + requests: vec![RegionDeleteRequest { + region_id, + rows: Some(Rows { schema, rows }), + }], + }) + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::sync::Arc; + + use api::v1::value::ValueData; + use api::v1::{ColumnDataType, SemanticType}; + use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; + use datatypes::prelude::ConcreteDataType; + use datatypes::scalars::ScalarVectorBuilder; + use datatypes::schema::{ColumnSchema as DtColumnSchema, Schema}; + use datatypes::vectors::{Int16VectorBuilder, MutableVector, StringVectorBuilder}; + use table::metadata::{TableInfoBuilder, TableMetaBuilder}; + + use super::*; + + #[test] + fn test_delete_request_table_to_region() { + let schema = Schema::new(vec![ + DtColumnSchema::new("ts", ConcreteDataType::int64_datatype(), false) + .with_time_index(true), + DtColumnSchema::new("id", ConcreteDataType::int16_datatype(), false), + DtColumnSchema::new("host", ConcreteDataType::string_datatype(), false), + ]); + + let table_meta = TableMetaBuilder::default() + .schema(Arc::new(schema)) + .primary_key_indices(vec![1, 2]) + .next_column_id(3) + .build() + .unwrap(); + + let table_info = Arc::new( + TableInfoBuilder::default() + .name("demo") + .meta(table_meta) + .table_id(1) + .build() + .unwrap(), + ); + + let delete_request = mock_delete_request(); + let mut request = TableToRegion::new(&table_info) + .convert(delete_request) + .unwrap(); + + assert_eq!(request.requests.len(), 1); + verify_region_insert_request(request.requests.pop().unwrap()); + } + + fn mock_delete_request() -> TableDeleteRequest { + let mut builder = StringVectorBuilder::with_capacity(3); + builder.push(Some("host1")); + builder.push(None); + builder.push(Some("host3")); + let host = builder.to_vector(); + + let mut builder = Int16VectorBuilder::with_capacity(3); + builder.push(Some(1_i16)); + builder.push(Some(2_i16)); + builder.push(Some(3_i16)); + let id = builder.to_vector(); + + let key_column_values = HashMap::from([("host".to_string(), host), ("id".to_string(), id)]); + + TableDeleteRequest { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + table_name: "demo".to_string(), + key_column_values, + } + } + + fn verify_region_insert_request(request: RegionDeleteRequest) { + assert_eq!(request.region_id, RegionId::new(1, 0).as_u64()); + + let rows = request.rows.unwrap(); + for (i, column) in rows.schema.iter().enumerate() { + let name = &column.column_name; + if name == "id" { + assert_eq!(ColumnDataType::Int16 as i32, column.datatype); + assert_eq!(SemanticType::Tag as i32, column.semantic_type); + let values = rows + .rows + .iter() + .map(|row| row.values[i].value_data.clone()) + .collect::>(); + assert_eq!( + vec![ + Some(ValueData::I16Value(1)), + Some(ValueData::I16Value(2)), + Some(ValueData::I16Value(3)) + ], + values + ); + } + if name == "host" { + assert_eq!(ColumnDataType::String as i32, column.datatype); + assert_eq!(SemanticType::Tag as i32, column.semantic_type); + let values = rows + .rows + .iter() + .map(|row| row.values[i].value_data.clone()) + .collect::>(); + assert_eq!( + vec![ + Some(ValueData::StringValue("host1".to_string())), + None, + Some(ValueData::StringValue("host3".to_string())) + ], + values + ); + } + } + } +} diff --git a/src/frontend/src/inserter/req_convert.rs b/src/frontend/src/req_convert/insert.rs similarity index 100% rename from src/frontend/src/inserter/req_convert.rs rename to src/frontend/src/req_convert/insert.rs diff --git a/src/frontend/src/req_convert/insert/column_to_row.rs b/src/frontend/src/req_convert/insert/column_to_row.rs new file mode 100644 index 0000000000..f3d4c50b36 --- /dev/null +++ b/src/frontend/src/req_convert/insert/column_to_row.rs @@ -0,0 +1,40 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::{InsertRequest, InsertRequests, RowInsertRequest, RowInsertRequests}; + +use crate::error::Result; +use crate::req_convert::common::columns_to_rows; + +pub struct ColumnToRow; + +impl ColumnToRow { + pub fn convert(requests: InsertRequests) -> Result { + requests + .inserts + .into_iter() + .map(request_column_to_row) + .collect::>>() + .map(|inserts| RowInsertRequests { inserts }) + } +} + +fn request_column_to_row(request: InsertRequest) -> Result { + let rows = columns_to_rows(request.columns, request.row_count)?; + Ok(RowInsertRequest { + table_name: request.table_name, + rows: Some(rows), + region_number: request.region_number, + }) +} diff --git a/src/frontend/src/inserter/req_convert/row_to_region.rs b/src/frontend/src/req_convert/insert/row_to_region.rs similarity index 100% rename from src/frontend/src/inserter/req_convert/row_to_region.rs rename to src/frontend/src/req_convert/insert/row_to_region.rs diff --git a/src/frontend/src/inserter/req_convert/stmt_to_region.rs b/src/frontend/src/req_convert/insert/stmt_to_region.rs similarity index 100% rename from src/frontend/src/inserter/req_convert/stmt_to_region.rs rename to src/frontend/src/req_convert/insert/stmt_to_region.rs diff --git a/src/frontend/src/inserter/req_convert/table_to_region.rs b/src/frontend/src/req_convert/insert/table_to_region.rs similarity index 83% rename from src/frontend/src/inserter/req_convert/table_to_region.rs rename to src/frontend/src/req_convert/insert/table_to_region.rs index bf70249a0d..6ea09b3ad4 100644 --- a/src/frontend/src/inserter/req_convert/table_to_region.rs +++ b/src/frontend/src/req_convert/insert/table_to_region.rs @@ -12,20 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; - use api::v1::region::{ InsertRequest as RegionInsertRequest, InsertRequests as RegionInsertRequests, }; -use api::v1::{ColumnSchema, Rows}; -use datatypes::vectors::VectorRef; -use snafu::prelude::*; +use api::v1::Rows; use store_api::storage::RegionId; use table::metadata::TableInfo; use table::requests::InsertRequest as TableInsertRequest; -use super::{data_type, semantic_type}; -use crate::error::{InvalidInsertRequestSnafu, Result}; +use crate::error::Result; +use crate::req_convert::common::{column_schema, row_count}; pub struct TableToRegion<'a> { table_info: &'a TableInfo, @@ -50,41 +46,9 @@ impl<'a> TableToRegion<'a> { } } -fn row_count(columns: &HashMap) -> Result { - let mut columns_iter = columns.values(); - - let len = columns_iter - .next() - .map(|column| column.len()) - .unwrap_or_default(); - ensure!( - columns_iter.all(|column| column.len() == len), - InvalidInsertRequestSnafu { - reason: "The row count of columns is not the same." - } - ); - - Ok(len) -} - -fn column_schema( - table_info: &TableInfo, - columns: &HashMap, -) -> Result> { - columns - .iter() - .map(|(column_name, vector)| { - Ok(ColumnSchema { - column_name: column_name.clone(), - datatype: data_type(vector.data_type())?.into(), - semantic_type: semantic_type(table_info, column_name)?.into(), - }) - }) - .collect::>>() -} - #[cfg(test)] mod tests { + use std::collections::HashMap; use std::sync::Arc; use api::v1::value::ValueData; diff --git a/src/frontend/src/statement.rs b/src/frontend/src/statement.rs index 262a9b09bd..a98244af5b 100644 --- a/src/frontend/src/statement.rs +++ b/src/frontend/src/statement.rs @@ -22,7 +22,6 @@ mod tql; use std::collections::HashMap; use std::str::FromStr; -use std::sync::Arc; use api::v1::region::region_request; use catalog::CatalogManagerRef; @@ -41,19 +40,16 @@ use snafu::{OptionExt, ResultExt}; use sql::statements::copy::{CopyDatabaseArgument, CopyTable, CopyTableArgument}; use sql::statements::statement::Statement; use table::engine::TableReference; -use table::error::TableOperationSnafu; use table::requests::{ CopyDatabaseRequest, CopyDirection, CopyTableRequest, DeleteRequest, InsertRequest, }; use table::TableRef; -use crate::catalog::FrontendCatalogManager; use crate::error::{ - self, CatalogSnafu, ExecLogicalPlanSnafu, ExecuteStatementSnafu, ExternalSnafu, InsertSnafu, + self, CatalogSnafu, ExecLogicalPlanSnafu, ExecuteStatementSnafu, ExternalSnafu, PlanStatementSnafu, RequestDatanodeSnafu, Result, TableNotFoundSnafu, }; -use crate::inserter::req_convert::TableToRegion; -use crate::instance::distributed::deleter::DistDeleter; +use crate::req_convert::{delete, insert}; use crate::statement::backup::{COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY}; #[derive(Clone)] @@ -184,55 +180,35 @@ impl StatementExecutor { let table = self.get_table(&table_ref).await?; let table_info = table.table_info(); - let request = TableToRegion::new(&table_info).convert(request)?; - let region_response = self + let request = insert::TableToRegion::new(&table_info).convert(request)?; + let affected_rows = self .region_request_handler .handle(region_request::Body::Inserts(request), query_ctx) .await .context(RequestDatanodeSnafu)?; - - Ok(region_response.affected_rows as _) + Ok(affected_rows as _) } - // TODO(zhongzc): A middle state that eliminates calls to table.delete, - // For DistTable, its delete is not invoked; for MitoTable, it is still called but eventually eliminated. - async fn send_delete_request(&self, request: DeleteRequest) -> Result { - let frontend_catalog_manager = self - .catalog_manager - .as_any() - .downcast_ref::(); + async fn handle_table_delete_request( + &self, + request: DeleteRequest, + query_ctx: QueryContextRef, + ) -> Result { + let table_ref = TableReference::full( + &request.catalog_name, + &request.schema_name, + &request.table_name, + ); + let table = self.get_table(&table_ref).await?; + let table_info = table.table_info(); - let table_name = request.table_name.clone(); - match frontend_catalog_manager { - Some(frontend_catalog_manager) => { - let inserter = DistDeleter::new( - request.catalog_name.clone(), - request.schema_name.clone(), - Arc::new(frontend_catalog_manager.clone()), - ); - let affected_rows = inserter - .delete(vec![request]) - .await - .map_err(BoxedError::new) - .context(TableOperationSnafu) - .context(InsertSnafu { table_name })?; - Ok(affected_rows) - } - None => { - let table_ref = TableReference::full( - &request.catalog_name, - &request.schema_name, - &request.table_name, - ); - let affected_rows = self - .get_table(&table_ref) - .await? - .delete(request) - .await - .context(InsertSnafu { table_name })?; - Ok(affected_rows) - } - } + let request = delete::TableToRegion::new(&table_info).convert(request)?; + let affected_rows = self + .region_request_handler + .handle(region_request::Body::Deletes(request), query_ctx) + .await + .context(RequestDatanodeSnafu)?; + Ok(affected_rows as _) } } diff --git a/src/frontend/src/statement/dml.rs b/src/frontend/src/statement/dml.rs index 0b6c8cc47b..d5730fda32 100644 --- a/src/frontend/src/statement/dml.rs +++ b/src/frontend/src/statement/dml.rs @@ -14,6 +14,7 @@ use std::collections::HashMap; +use api::v1::region::region_request; use common_query::Output; use common_recordbatch::{RecordBatch, SendableRecordBatchStream}; use datafusion_expr::{DmlStatement, LogicalPlan as DfLogicalPlan, WriteOp}; @@ -33,18 +34,24 @@ use table::TableRef; use super::StatementExecutor; use crate::error::{ - BuildColumnVectorsSnafu, ExecLogicalPlanSnafu, ExecuteStatementSnafu, - MissingTimeIndexColumnSnafu, ReadRecordBatchSnafu, Result, UnexpectedSnafu, + BuildColumnVectorsSnafu, ExecLogicalPlanSnafu, MissingTimeIndexColumnSnafu, + ReadRecordBatchSnafu, RequestDatanodeSnafu, Result, UnexpectedSnafu, }; +use crate::req_convert::insert::StatementToRegion; impl StatementExecutor { pub async fn insert(&self, insert: Box, query_ctx: QueryContextRef) -> Result { if insert.can_extract_values() { // Fast path: plain insert ("insert with literal values") is executed directly - self.sql_stmt_executor - .execute_sql(Statement::Insert(insert), query_ctx) + let request = StatementToRegion::new(self.catalog_manager.as_ref(), &query_ctx) + .convert(&insert) + .await?; + let affected_rows = self + .region_request_handler + .handle(region_request::Body::Inserts(request), query_ctx) .await - .context(ExecuteStatementSnafu) + .context(RequestDatanodeSnafu)?; + Ok(Output::AffectedRows(affected_rows as _)) } else { // Slow path: insert with subquery. Execute the subquery first, via query engine. Then // insert the results by sending insert requests. @@ -108,7 +115,9 @@ impl StatementExecutor { while let Some(batch) = stream.next().await { let record_batch = batch.context(ReadRecordBatchSnafu)?; let delete_request = build_delete_request(record_batch, table.schema(), &table_info)?; - affected_rows += self.send_delete_request(delete_request).await?; + affected_rows += self + .handle_table_delete_request(delete_request, query_ctx.clone()) + .await?; } Ok(Output::AffectedRows(affected_rows)) diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index e5cac330aa..478cea7123 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -24,9 +24,6 @@ use table::TableRef; use crate::error::NotSupportedSnafu; -pub mod delete; -pub mod insert; - #[derive(Clone)] pub struct DistTable; diff --git a/src/frontend/src/table/delete.rs b/src/frontend/src/table/delete.rs deleted file mode 100644 index 9a6b3056db..0000000000 --- a/src/frontend/src/table/delete.rs +++ /dev/null @@ -1,106 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use api::v1::DeleteRequest as GrpcDeleteRequest; -use common_meta::table_name::TableName; -use store_api::storage::RegionNumber; -use table::metadata::TableMeta; -use table::requests::DeleteRequest; - -use crate::error::Result; -use crate::table::insert::to_grpc_columns; - -pub fn to_grpc_delete_request( - table_meta: &TableMeta, - table_name: &TableName, - region_number: RegionNumber, - request: DeleteRequest, -) -> Result { - let (key_columns, row_count) = to_grpc_columns(table_meta, &request.key_column_values)?; - Ok(GrpcDeleteRequest { - table_name: table_name.table_name.clone(), - region_number, - key_columns, - row_count, - }) -} - -#[cfg(test)] -mod tests { - use std::collections::HashMap; - use std::sync::Arc; - - use api::v1::column::Values; - use api::v1::{Column, ColumnDataType, SemanticType}; - use datatypes::prelude::{ConcreteDataType, VectorRef}; - use datatypes::schema::{ColumnSchema, Schema}; - use datatypes::vectors::Int32Vector; - use table::metadata::TableMetaBuilder; - - use super::*; - - #[test] - fn test_to_grpc_delete_request() { - let schema = Schema::new(vec![ - ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), false) - .with_time_index(true), - ColumnSchema::new("id", ConcreteDataType::int32_datatype(), false), - ]); - - let table_meta = TableMetaBuilder::default() - .schema(Arc::new(schema)) - .primary_key_indices(vec![]) - .next_column_id(2) - .build() - .unwrap(); - - let table_name = TableName { - catalog_name: "greptime".to_string(), - schema_name: "public".to_string(), - table_name: "foo".to_string(), - }; - let region_number = 1; - - let key_column_values = HashMap::from([( - "id".to_string(), - Arc::new(Int32Vector::from_slice(vec![1, 2, 3])) as VectorRef, - )]); - let request = DeleteRequest { - catalog_name: table_name.catalog_name.to_string(), - schema_name: table_name.schema_name.to_string(), - table_name: table_name.table_name.to_string(), - key_column_values, - }; - - let result = - to_grpc_delete_request(&table_meta, &table_name, region_number, request).unwrap(); - - assert_eq!(result.table_name, "foo"); - assert_eq!(result.region_number, region_number); - assert_eq!( - result.key_columns, - vec![Column { - column_name: "id".to_string(), - semantic_type: SemanticType::Field as i32, - values: Some(Values { - i32_values: vec![1, 2, 3], - ..Default::default() - }), - null_mask: vec![0], - datatype: ColumnDataType::Int32 as i32, - }] - ); - assert_eq!(result.row_count, 3); - } -} diff --git a/src/frontend/src/table/insert.rs b/src/frontend/src/table/insert.rs deleted file mode 100644 index 0e7e5aa186..0000000000 --- a/src/frontend/src/table/insert.rs +++ /dev/null @@ -1,168 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashMap; - -use api::helper::{push_vals, ColumnDataTypeWrapper}; -use api::v1::column::Values; -use api::v1::{Column, SemanticType}; -use datatypes::prelude::*; -use snafu::{ensure, OptionExt, ResultExt}; -use table::metadata::TableMeta; - -use crate::error::{self, ColumnDataTypeSnafu, NotSupportedSnafu, Result, VectorToGrpcColumnSnafu}; - -pub(crate) fn to_grpc_columns( - table_meta: &TableMeta, - columns_values: &HashMap, -) -> Result<(Vec, u32)> { - let mut row_count = None; - - let columns = columns_values - .iter() - .map(|(column_name, vector)| { - match row_count { - Some(rows) => ensure!( - rows == vector.len(), - error::InvalidInsertRequestSnafu { - reason: "The row count of columns is not the same." - } - ), - - None => row_count = Some(vector.len()), - } - - let column = vector_to_grpc_column(table_meta, column_name, vector.clone())?; - Ok(column) - }) - .collect::>>()?; - - let row_count = row_count.unwrap_or(0) as u32; - - Ok((columns, row_count)) -} - -fn vector_to_grpc_column( - table_meta: &TableMeta, - column_name: &str, - vector: VectorRef, -) -> Result { - let time_index_column = &table_meta - .schema - .timestamp_column() - .context(NotSupportedSnafu { - feat: "Table without time index.", - })? - .name; - let semantic_type = if column_name == time_index_column { - SemanticType::Timestamp - } else { - let column_index = table_meta - .schema - .column_index_by_name(column_name) - .context(VectorToGrpcColumnSnafu { - reason: format!("unable to find column {column_name} in table schema"), - })?; - if table_meta.primary_key_indices.contains(&column_index) { - SemanticType::Tag - } else { - SemanticType::Field - } - }; - - let datatype: ColumnDataTypeWrapper = - vector.data_type().try_into().context(ColumnDataTypeSnafu)?; - - let mut column = Column { - column_name: column_name.to_string(), - semantic_type: semantic_type as i32, - null_mask: vec![], - datatype: datatype.datatype() as i32, - values: Some(Values::default()), // vector values will be pushed into it below - }; - push_vals(&mut column, 0, vector); - Ok(column) -} - -#[cfg(test)] -mod tests { - use std::sync::Arc; - - use api::v1::ColumnDataType; - use datatypes::schema::{ColumnSchema, Schema}; - use datatypes::vectors::{Int32Vector, Int64Vector, StringVector}; - use table::metadata::TableMetaBuilder; - - use super::*; - - #[test] - fn test_vector_to_grpc_column() { - let schema = Arc::new(Schema::new(vec![ - ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), false) - .with_time_index(true), - ColumnSchema::new("k", ConcreteDataType::int32_datatype(), false), - ColumnSchema::new("v", ConcreteDataType::string_datatype(), true), - ])); - - let table_meta = TableMetaBuilder::default() - .schema(schema) - .primary_key_indices(vec![1]) - .next_column_id(3) - .build() - .unwrap(); - - let column = vector_to_grpc_column( - &table_meta, - "ts", - Arc::new(Int64Vector::from_slice([1, 2, 3])), - ) - .unwrap(); - assert_eq!(column.column_name, "ts"); - assert_eq!(column.semantic_type, SemanticType::Timestamp as i32); - assert_eq!(column.values.unwrap().i64_values, vec![1, 2, 3]); - assert_eq!(column.null_mask, vec![0]); - assert_eq!(column.datatype, ColumnDataType::Int64 as i32); - - let column = vector_to_grpc_column( - &table_meta, - "k", - Arc::new(Int32Vector::from_slice([3, 2, 1])), - ) - .unwrap(); - assert_eq!(column.column_name, "k"); - assert_eq!(column.semantic_type, SemanticType::Tag as i32); - assert_eq!(column.values.unwrap().i32_values, vec![3, 2, 1]); - assert_eq!(column.null_mask, vec![0]); - assert_eq!(column.datatype, ColumnDataType::Int32 as i32); - - let column = vector_to_grpc_column( - &table_meta, - "v", - Arc::new(StringVector::from(vec![ - Some("hello"), - None, - Some("greptime"), - ])), - ) - .unwrap(); - assert_eq!(column.column_name, "v"); - assert_eq!(column.semantic_type, SemanticType::Field as i32); - assert_eq!( - column.values.unwrap().string_values, - vec!["hello", "greptime"] - ); - assert_eq!(column.null_mask, vec![2]); - assert_eq!(column.datatype, ColumnDataType::String as i32); - } -} diff --git a/src/partition/src/lib.rs b/src/partition/src/lib.rs index dd05b7f6d8..9dba350441 100644 --- a/src/partition/src/lib.rs +++ b/src/partition/src/lib.rs @@ -21,7 +21,6 @@ pub mod metrics; pub mod partition; pub mod range; pub mod route; -pub mod row_splitter; pub mod splitter; pub use crate::partition::{PartitionRule, PartitionRuleRef}; diff --git a/src/partition/src/manager.rs b/src/partition/src/manager.rs index 541313c577..f2b8856a36 100644 --- a/src/partition/src/manager.rs +++ b/src/partition/src/manager.rs @@ -15,7 +15,7 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; -use api::v1::region::InsertRequest; +use api::v1::region::{DeleteRequest, InsertRequest}; use common_meta::peer::Peer; use common_meta::rpc::router::TableRoute; use common_query::prelude::Expr; @@ -24,15 +24,13 @@ use datatypes::prelude::Value; use snafu::{ensure, OptionExt, ResultExt}; use store_api::storage::{RegionId, RegionNumber}; use table::metadata::TableId; -use table::requests::DeleteRequest; use crate::columns::RangeColumnsPartitionRule; use crate::error::{FindLeaderSnafu, Result}; use crate::partition::{PartitionBound, PartitionDef, PartitionExpr}; use crate::range::RangePartitionRule; use crate::route::TableRoutes; -use crate::row_splitter::{InsertRequestSplits, RowSplitter}; -use crate::splitter::{DeleteRequestSplit, WriteSplitter}; +use crate::splitter::{DeleteRequestSplits, InsertRequestSplits, RowSplitter}; use crate::{error, PartitionRuleRef}; #[async_trait::async_trait] @@ -243,18 +241,18 @@ impl PartitionRuleManager { req: InsertRequest, ) -> Result { let partition_rule = self.find_table_partition_rule(table).await?; - RowSplitter::new(partition_rule).split(req) + RowSplitter::new(partition_rule).split_insert(req) } + /// Split [DeleteRequest] into [DeleteRequestSplits] according to the partition rule + /// of given table. pub async fn split_delete_request( &self, table: TableId, req: DeleteRequest, - primary_key_column_names: Vec<&String>, - ) -> Result { + ) -> Result { let partition_rule = self.find_table_partition_rule(table).await?; - let splitter = WriteSplitter::with_partition_rule(partition_rule); - splitter.split_delete(req, primary_key_column_names) + RowSplitter::new(partition_rule).split_delete(req) } } diff --git a/src/partition/src/row_splitter.rs b/src/partition/src/row_splitter.rs deleted file mode 100644 index cfbc25c9f2..0000000000 --- a/src/partition/src/row_splitter.rs +++ /dev/null @@ -1,322 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashMap; - -use api::helper; -use api::v1::region::InsertRequest; -use api::v1::{ColumnSchema, Row, Rows}; -use datatypes::value::Value; -use store_api::storage::{RegionId, RegionNumber, TableId}; - -use crate::error::Result; -use crate::PartitionRuleRef; - -pub type InsertRequestSplits = HashMap; - -pub struct RowSplitter { - partition_rule: PartitionRuleRef, -} - -impl RowSplitter { - pub fn new(partition_rule: PartitionRuleRef) -> Self { - Self { partition_rule } - } - - pub fn split(&self, req: InsertRequest) -> Result { - // No partition - let partition_columns = self.partition_rule.partition_columns(); - if partition_columns.is_empty() { - return Ok(HashMap::from([(0, req)])); - } - - // No data - let Some(rows) = req.rows else { - return Ok(HashMap::new()); - }; - - let table_id = RegionId::from_u64(req.region_id).table_id(); - SplitReadRowHelper::new(table_id, rows, &self.partition_rule).split_to_requests() - } -} - -struct SplitReadRowHelper<'a> { - table_id: TableId, - schema: Vec, - rows: Vec, - partition_rule: &'a PartitionRuleRef, - // Map from partition column name to index in the schema/row. - partition_cols_indexes: Vec>, -} - -impl<'a> SplitReadRowHelper<'a> { - fn new(table_id: TableId, rows: Rows, partition_rule: &'a PartitionRuleRef) -> Self { - let col_name_to_idx = rows - .schema - .iter() - .enumerate() - .map(|(idx, col)| (&col.column_name, idx)) - .collect::>(); - let partition_cols = partition_rule.partition_columns(); - let partition_cols_indexes = partition_cols - .into_iter() - .map(|col_name| col_name_to_idx.get(&col_name).cloned()) - .collect::>(); - - Self { - table_id, - schema: rows.schema, - rows: rows.rows, - partition_rule, - partition_cols_indexes, - } - } - - fn split_to_requests(mut self) -> Result { - let request_splits = self - .split_to_regions()? - .into_iter() - .map(|(region_number, row_indexes)| { - let rows = row_indexes - .into_iter() - .map(|row_idx| std::mem::take(&mut self.rows[row_idx])) - .collect(); - let req = InsertRequest { - rows: Some(Rows { - schema: self.schema.clone(), - rows, - }), - region_id: RegionId::new(self.table_id, region_number).into(), - }; - (region_number, req) - }) - .collect::>(); - - Ok(request_splits) - } - - fn split_to_regions(&self) -> Result>> { - let mut regions_row_indexes: HashMap> = HashMap::new(); - for (row_idx, values) in self.iter_partition_values().enumerate() { - let region_number = self.partition_rule.find_region(&values)?; - regions_row_indexes - .entry(region_number) - .or_default() - .push(row_idx); - } - - Ok(regions_row_indexes) - } - - fn iter_partition_values(&'a self) -> impl Iterator> + 'a { - self.rows.iter().map(|row| { - self.partition_cols_indexes - .iter() - .map(|idx| { - idx.as_ref().map_or(Value::Null, |idx| { - helper::pb_value_to_value_ref(&row.values[*idx]).into() - }) - }) - .collect() - }) - } -} - -#[cfg(test)] -mod tests { - use std::any::Any; - use std::sync::Arc; - - use api::v1::value::ValueData; - use api::v1::{ColumnDataType, SemanticType}; - use serde::{Deserialize, Serialize}; - - use super::*; - use crate::partition::PartitionExpr; - use crate::PartitionRule; - - fn mock_insert_request() -> InsertRequest { - let schema = vec![ - ColumnSchema { - column_name: "id".to_string(), - datatype: ColumnDataType::String as i32, - semantic_type: SemanticType::Tag as i32, - }, - ColumnSchema { - column_name: "name".to_string(), - datatype: ColumnDataType::String as i32, - semantic_type: SemanticType::Tag as i32, - }, - ColumnSchema { - column_name: "age".to_string(), - datatype: ColumnDataType::Uint32 as i32, - semantic_type: SemanticType::Field as i32, - }, - ]; - let rows = vec![ - Row { - values: vec![ - ValueData::StringValue("1".to_string()).into(), - ValueData::StringValue("Smith".to_string()).into(), - ValueData::U32Value(20).into(), - ], - }, - Row { - values: vec![ - ValueData::StringValue("2".to_string()).into(), - ValueData::StringValue("Johnson".to_string()).into(), - ValueData::U32Value(21).into(), - ], - }, - Row { - values: vec![ - ValueData::StringValue("3".to_string()).into(), - ValueData::StringValue("Williams".to_string()).into(), - ValueData::U32Value(22).into(), - ], - }, - ]; - InsertRequest { - rows: Some(Rows { schema, rows }), - region_id: 0, - } - } - - #[derive(Debug, Serialize, Deserialize)] - struct MockPartitionRule; - - impl PartitionRule for MockPartitionRule { - fn as_any(&self) -> &dyn Any { - self - } - - fn partition_columns(&self) -> Vec { - vec!["id".to_string()] - } - - fn find_region(&self, values: &[Value]) -> Result { - let val = values.get(0).unwrap().clone(); - let val = match val { - Value::String(v) => v.as_utf8().to_string(), - _ => unreachable!(), - }; - - Ok(val.parse::().unwrap() % 2) - } - - fn find_regions_by_exprs(&self, _: &[PartitionExpr]) -> Result> { - unimplemented!() - } - } - - #[derive(Debug, Serialize, Deserialize)] - struct MockMissedColPartitionRule; - - impl PartitionRule for MockMissedColPartitionRule { - fn as_any(&self) -> &dyn Any { - self - } - - fn partition_columns(&self) -> Vec { - vec!["missed_col".to_string()] - } - - fn find_region(&self, values: &[Value]) -> Result { - let val = values.get(0).unwrap().clone(); - let val = match val { - Value::Null => 1, - _ => 0, - }; - - Ok(val) - } - - fn find_regions_by_exprs(&self, _: &[PartitionExpr]) -> Result> { - unimplemented!() - } - } - - #[derive(Debug, Serialize, Deserialize)] - struct EmptyPartitionRule; - - impl PartitionRule for EmptyPartitionRule { - fn as_any(&self) -> &dyn Any { - self - } - - fn partition_columns(&self) -> Vec { - vec![] - } - - fn find_region(&self, _values: &[Value]) -> Result { - Ok(0) - } - - fn find_regions_by_exprs(&self, _: &[PartitionExpr]) -> Result> { - unimplemented!() - } - } - - #[test] - fn test_writer_splitter() { - let insert_request = mock_insert_request(); - let rule = Arc::new(MockPartitionRule) as PartitionRuleRef; - let splitter = RowSplitter::new(rule); - let splits = splitter.split(insert_request).unwrap(); - - assert_eq!(splits.len(), 2); - - let req0 = &splits[&0]; - let req1 = &splits[&1]; - assert_eq!(req0.region_id, 0); - assert_eq!(req1.region_id, 1); - - let rows0 = req0.rows.as_ref().unwrap(); - let rows1 = req1.rows.as_ref().unwrap(); - assert_eq!(rows0.rows.len(), 1); - assert_eq!(rows1.rows.len(), 2); - } - - #[test] - fn test_missed_col_writer_splitter() { - let insert_request = mock_insert_request(); - let rule = Arc::new(MockMissedColPartitionRule) as PartitionRuleRef; - let splitter = RowSplitter::new(rule); - let splits = splitter.split(insert_request).unwrap(); - - assert_eq!(splits.len(), 1); - - let req = &splits[&1]; - assert_eq!(req.region_id, 1); - - let rows = req.rows.as_ref().unwrap(); - assert_eq!(rows.rows.len(), 3); - } - - #[test] - fn test_empty_partition_rule_writer_splitter() { - let insert_request = mock_insert_request(); - let rule = Arc::new(EmptyPartitionRule) as PartitionRuleRef; - let splitter = RowSplitter::new(rule); - let splits = splitter.split(insert_request).unwrap(); - - assert_eq!(splits.len(), 1); - - let req = &splits[&0]; - assert_eq!(req.region_id, 0); - - let rows = req.rows.as_ref().unwrap(); - assert_eq!(rows.rows.len(), 3); - } -} diff --git a/src/partition/src/splitter.rs b/src/partition/src/splitter.rs index ceeac2e1da..2bf2b0ce1b 100644 --- a/src/partition/src/splitter.rs +++ b/src/partition/src/splitter.rs @@ -14,666 +14,219 @@ use std::collections::HashMap; -use datatypes::data_type::DataType; -use datatypes::prelude::MutableVector; -use datatypes::schema::Schema; +use api::helper; +use api::v1::region::{DeleteRequest, InsertRequest}; +use api::v1::{ColumnSchema, Row, Rows}; use datatypes::value::Value; -use datatypes::vectors::VectorRef; -use snafu::{ensure, OptionExt, ResultExt}; -use store_api::storage::RegionNumber; -use table::requests::{DeleteRequest, InsertRequest}; +use store_api::storage::{RegionId, RegionNumber}; -use crate::error::{ - CreateDefaultToReadSnafu, FindPartitionColumnSnafu, FindRegionSnafu, InvalidDeleteRequestSnafu, - InvalidInsertRequestSnafu, MissingDefaultValueSnafu, Result, -}; +use crate::error::Result; use crate::PartitionRuleRef; -pub type InsertRequestSplit = HashMap; -pub type DeleteRequestSplit = HashMap; +pub type InsertRequestSplits = HashMap; +pub type DeleteRequestSplits = HashMap; -pub struct WriteSplitter { +pub struct RowSplitter { partition_rule: PartitionRuleRef, } -impl WriteSplitter { - pub fn with_partition_rule(rule: PartitionRuleRef) -> Self { - Self { - partition_rule: rule, - } +impl RowSplitter { + pub fn new(partition_rule: PartitionRuleRef) -> Self { + Self { partition_rule } } - pub fn split_insert( - &self, - insert: InsertRequest, - schema: &Schema, - ) -> Result { - let row_nums = check_req(&insert)?; - let mut insert = insert; - let partition_columns = self.partition_rule.partition_columns(); - if partition_columns.is_empty() { - // If no partition column, all rows are inserted into the first region. - let mut split = HashMap::new(); - split.insert(0, insert); - return Ok(split); - } - let missing_columns = schema - .column_schemas() - .iter() - .filter(|schema| { - partition_columns.contains(&schema.name) - && !insert.columns_values.contains_key(&schema.name) - }) - .collect::>(); - for column_schema in missing_columns { - let default_values = column_schema - .create_default_vector(row_nums) - .context(CreateDefaultToReadSnafu { - column: &column_schema.name, - })? - .context(MissingDefaultValueSnafu { - column: &column_schema.name, - })?; - let _ = insert - .columns_values - .insert(column_schema.name.clone(), default_values); - } - let partition_columns = - find_partitioning_values(&insert.columns_values, &partition_columns)?; - let region_map = self.split_partitioning_values(&partition_columns)?; - - Ok(split_insert_request(&insert, region_map)) - } - - pub fn split_delete( - &self, - request: DeleteRequest, - key_column_names: Vec<&String>, - ) -> Result { - Self::validate_delete_request(&request)?; - - let partition_columns = self.partition_rule.partition_columns(); - if partition_columns.is_empty() { - // If no partition column, all requests are sent to the first region. - let mut split = HashMap::new(); - split.insert(0, request); - return Ok(split); - } - let values = find_partitioning_values(&request.key_column_values, &partition_columns)?; - let regional_value_indexes = self.split_partitioning_values(&values)?; - - let requests = regional_value_indexes + pub fn split_insert(&self, req: InsertRequest) -> Result { + let table_id = RegionId::from_u64(req.region_id).table_id(); + Ok(self + .split(req.rows)? .into_iter() - .map(|(region_id, value_indexes)| { - let key_column_values = request - .key_column_values - .iter() - .filter(|(column_name, _)| key_column_names.contains(column_name)) - .map(|(column_name, vector)| { - let mut builder = vector - .data_type() - .create_mutable_vector(value_indexes.len()); - - value_indexes.iter().for_each(|&index| { - builder.push_value_ref(vector.get(index).as_value_ref()); - }); - - (column_name.to_string(), builder.to_vector()) - }) - .collect(); - ( - region_id, - DeleteRequest { - catalog_name: request.catalog_name.clone(), - schema_name: request.schema_name.clone(), - table_name: request.table_name.clone(), - key_column_values, - }, - ) + .map(|(region_number, rows)| { + let region_id = RegionId::new(table_id, region_number); + let req = InsertRequest { + rows: Some(rows), + region_id: region_id.into(), + }; + (region_number, req) }) - .collect(); - Ok(requests) + .collect()) } - fn validate_delete_request(request: &DeleteRequest) -> Result<()> { - let rows = request - .key_column_values - .values() - .next() - .map(|x| x.len()) - .context(InvalidDeleteRequestSnafu { - reason: "no key column values", - })?; - ensure!( - rows > 0, - InvalidDeleteRequestSnafu { - reason: "no rows in delete request" - } - ); - ensure!( - request - .key_column_values - .values() - .map(|x| x.len()) - .all(|x| x == rows), - InvalidDeleteRequestSnafu { - reason: "the lengths of key column values are not the same" - } - ); - Ok(()) + pub fn split_delete(&self, req: DeleteRequest) -> Result { + let table_id = RegionId::from_u64(req.region_id).table_id(); + Ok(self + .split(req.rows)? + .into_iter() + .map(|(region_number, rows)| { + let region_id = RegionId::new(table_id, region_number); + let req = DeleteRequest { + rows: Some(rows), + region_id: region_id.into(), + }; + (region_number, req) + }) + .collect()) } - fn split_partitioning_values( - &self, - values: &[VectorRef], - ) -> Result>> { - if values.is_empty() { - return Ok(HashMap::default()); + fn split(&self, rows: Option) -> Result> { + // No data + let Some(rows) = rows else { + return Ok(HashMap::new()); + }; + if rows.rows.is_empty() { + return Ok(HashMap::new()); } - let mut region_map: HashMap> = HashMap::new(); - let row_count = values[0].len(); - for idx in 0..row_count { - let region_id = match self - .partition_rule - .find_region(&partition_values(values, idx)) - { - Ok(region_id) => region_id, - Err(e) => { - let reason = format!("{e:?}"); - return FindRegionSnafu { reason }.fail(); - } - }; - region_map.entry(region_id).or_default().push(idx); + + // No partition + let partition_columns = self.partition_rule.partition_columns(); + if partition_columns.is_empty() { + return Ok(HashMap::from([(0, rows)])); } - Ok(region_map) + + let splitter = SplitReadRowHelper::new(rows, &self.partition_rule); + splitter.split_rows() } } -fn check_req(insert: &InsertRequest) -> Result { - let mut len: Option = None; - for vector in insert.columns_values.values() { - match len { - Some(len) => ensure!( - len == vector.len(), - InvalidInsertRequestSnafu { - reason: "the lengths of vectors are not the same" - } - ), - None => len = Some(vector.len()), - } - } - let len = len.context(InvalidInsertRequestSnafu { - reason: "The columns in the insert statement are empty.", - })?; - Ok(len) +struct SplitReadRowHelper<'a> { + schema: Vec, + rows: Vec, + partition_rule: &'a PartitionRuleRef, + // Map from partition column name to index in the schema/row. + partition_cols_indexes: Vec>, } -fn find_partitioning_values( - values: &HashMap, - partition_columns: &[String], -) -> Result> { - partition_columns - .iter() - .map(|column_name| { - values - .get(column_name) - .cloned() - .context(FindPartitionColumnSnafu { column_name }) +impl<'a> SplitReadRowHelper<'a> { + fn new(rows: Rows, partition_rule: &'a PartitionRuleRef) -> Self { + let col_name_to_idx = rows + .schema + .iter() + .enumerate() + .map(|(idx, col)| (&col.column_name, idx)) + .collect::>(); + let partition_cols = partition_rule.partition_columns(); + let partition_cols_indexes = partition_cols + .into_iter() + .map(|col_name| col_name_to_idx.get(&col_name).cloned()) + .collect::>(); + + Self { + schema: rows.schema, + rows: rows.rows, + partition_rule, + partition_cols_indexes, + } + } + + fn split_rows(mut self) -> Result> { + let request_splits = self + .split_to_regions()? + .into_iter() + .map(|(region_number, row_indexes)| { + let rows = row_indexes + .into_iter() + .map(|row_idx| std::mem::take(&mut self.rows[row_idx])) + .collect(); + let rows = Rows { + schema: self.schema.clone(), + rows, + }; + (region_number, rows) + }) + .collect::>(); + + Ok(request_splits) + } + + fn split_to_regions(&self) -> Result>> { + let mut regions_row_indexes: HashMap> = HashMap::new(); + for (row_idx, values) in self.iter_partition_values().enumerate() { + let region_number = self.partition_rule.find_region(&values)?; + regions_row_indexes + .entry(region_number) + .or_default() + .push(row_idx); + } + + Ok(regions_row_indexes) + } + + fn iter_partition_values(&'a self) -> impl Iterator> + 'a { + self.rows.iter().map(|row| { + self.partition_cols_indexes + .iter() + .map(|idx| { + idx.as_ref().map_or(Value::Null, |idx| { + helper::pb_value_to_value_ref(&row.values[*idx]).into() + }) + }) + .collect() }) - .collect() -} - -fn partition_values(partition_columns: &[VectorRef], idx: usize) -> Vec { - partition_columns - .iter() - .map(|column| column.get(idx)) - .collect() -} - -fn split_insert_request( - insert: &InsertRequest, - region_map: HashMap>, -) -> InsertRequestSplit { - let mut dist_insert: HashMap>> = - HashMap::with_capacity(region_map.len()); - - let row_num = insert - .columns_values - .values() - .next() - .map(|v| v.len()) - .unwrap_or(0); - - let column_count = insert.columns_values.len(); - for (column_name, vector) in &insert.columns_values { - for (region_id, val_idxs) in ®ion_map { - let region_insert = dist_insert - .entry(*region_id) - .or_insert_with(|| HashMap::with_capacity(column_count)); - let builder = region_insert - .entry(column_name) - .or_insert_with(|| vector.data_type().create_mutable_vector(row_num)); - val_idxs.iter().for_each(|idx| { - // Safety: MutableVector is built according to column data type. - builder.push_value_ref(vector.get(*idx).as_value_ref()); - }); - } } - - let catalog_name = &insert.catalog_name; - let schema_name = &insert.schema_name; - let table_name = &insert.table_name; - dist_insert - .into_iter() - .map(|(region_number, vector_map)| { - let columns_values = vector_map - .into_iter() - .map(|(column_name, mut builder)| (column_name.to_string(), builder.to_vector())) - .collect(); - ( - region_number, - InsertRequest { - catalog_name: catalog_name.to_string(), - schema_name: schema_name.to_string(), - table_name: table_name.to_string(), - columns_values, - region_number, - }, - ) - }) - .collect() } #[cfg(test)] mod tests { use std::any::Any; - use std::collections::HashMap; - use std::result::Result; use std::sync::Arc; - use datatypes::data_type::ConcreteDataType; - use datatypes::prelude::ScalarVectorBuilder; - use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, Schema as DataTypesSchema}; - use datatypes::types::{BooleanType, Int16Type, StringType}; - use datatypes::value::Value; - use datatypes::vectors::{ - BooleanVectorBuilder, Int16VectorBuilder, MutableVector, StringVector, StringVectorBuilder, - Vector, - }; + use api::v1::value::ValueData; + use api::v1::{ColumnDataType, SemanticType}; use serde::{Deserialize, Serialize}; - use store_api::storage::RegionNumber; - use table::requests::InsertRequest; use super::*; - use crate::error::Error; - use crate::partition::{PartitionExpr, PartitionRule}; - use crate::PartitionRuleRef; - - #[test] - fn test_insert_req_check() { - let right = mock_insert_request(); - let ret = check_req(&right); - assert_eq!(ret.unwrap(), 3); - - let wrong = mock_wrong_insert_request(); - let ret = check_req(&wrong); - assert!(ret.is_err()); - } - - fn assert_columns(columns: &HashMap>, expected: &[(&str, &[Value])]) { - for (col_name, values) in expected { - for (idx, value) in values.iter().enumerate() { - assert_eq!(*value, columns.get(*col_name).unwrap().get(idx)); - } - } - } - - #[test] - fn test_writer_spliter() { - let insert = mock_insert_request(); - let rule = Arc::new(MockPartitionRule) as PartitionRuleRef; - let spliter = WriteSplitter::with_partition_rule(rule); - let mock_schema = DataTypesSchema::new(vec![ - ColumnSchema::new( - "enable_reboot", - ConcreteDataType::Boolean(BooleanType), - false, - ), - ColumnSchema::new("id", ConcreteDataType::Int16(Int16Type {}), false), - ColumnSchema::new("host", ConcreteDataType::String(StringType), true), - ]); - let ret = spliter.split_insert(insert, &mock_schema).unwrap(); - - assert_eq!(2, ret.len()); - - let r1_insert = ret.get(&0).unwrap(); - let r2_insert = ret.get(&1).unwrap(); - - assert_eq!("demo", r1_insert.table_name); - assert_eq!("demo", r2_insert.table_name); - - let r1_columns = &r1_insert.columns_values; - assert_eq!(3, r1_columns.len()); - assert_columns( - r1_columns, - &[ - ("id", &[Value::from(1_i16)]), - ("host", &[Value::from("host1")]), - ("enable_reboot", &[Value::from(true)]), - ], - ); - - let r2_columns = &r2_insert.columns_values; - assert_eq!(3, r2_columns.len()); - - assert_columns( - r2_columns, - &[ - ("id", &[Value::from(2_i16), Value::from(3_i16)]), - ("host", &[Value::Null, Value::from("host3")]), - ("enable_reboot", &[Value::from(false), Value::from(true)]), - ], - ); - } - - #[test] - fn test_writer_spliter_with_id_partition_columns() { - let (mock_schema, insert) = mock_schema_and_insert_request_without_partition_columns(); - let rule = Arc::new(MockPartitionRule) as PartitionRuleRef; - let spliter = WriteSplitter::with_partition_rule(rule); - let ret = spliter.split_insert(insert, &mock_schema).unwrap(); - - assert_eq!(1, ret.len()); - - let r1_insert = ret.get(&0).unwrap(); - - assert_eq!("demo", r1_insert.table_name); - - let r1_columns = &r1_insert.columns_values; - assert_eq!(3, r1_columns.len()); - assert_columns( - r1_columns, - &[ - ( - "id", - &[Value::from(1_i16), Value::from(1_i16), Value::from(1_i16)], - ), - ( - "host", - &[Value::from("host1"), Value::Null, Value::from("host3")], - ), - ( - "enable_reboot", - &[Value::from(true), Value::from(false), Value::from(true)], - ), - ], - ); - } - - #[test] - fn test_writer_spliter_without_partition_columns() { - let (mock_schema, insert) = mock_schema_and_insert_request_without_partition_columns(); - let rule = Arc::new(EmptyPartitionRule) as PartitionRuleRef; - let spliter = WriteSplitter::with_partition_rule(rule); - let ret = spliter.split_insert(insert, &mock_schema).unwrap(); - - assert_eq!(1, ret.len()); - - let r1_insert = ret.get(&0).unwrap(); - - assert_eq!("demo", r1_insert.table_name); - - let r1_columns = &r1_insert.columns_values; - assert_eq!(2, r1_columns.len()); - assert_columns( - r1_columns, - &[ - ( - "host", - &[Value::from("host1"), Value::Null, Value::from("host3")], - ), - ( - "enable_reboot", - &[Value::from(true), Value::from(false), Value::from(true)], - ), - ], - ); - } - - #[test] - fn test_delete_spliter_without_partition_columns() { - let mut key_column_values = HashMap::new(); - key_column_values.insert( - "host".to_string(), - Arc::new(StringVector::from(vec!["localhost"])) as _, - ); - let delete = DeleteRequest { - catalog_name: "foo_catalog".to_string(), - schema_name: "foo_schema".to_string(), - table_name: "foo_table".to_string(), - key_column_values, - }; - let rule = Arc::new(EmptyPartitionRule) as PartitionRuleRef; - let spliter = WriteSplitter::with_partition_rule(rule); - let ret = spliter - .split_delete(delete, vec![&String::from("host")]) - .unwrap(); - - assert_eq!(1, ret.len()); - } - - #[test] - fn test_partition_insert_request() { - let insert = mock_insert_request(); - let region_map = HashMap::from([(1, vec![2, 0]), (2, vec![1])]); - let dist_insert = split_insert_request(&insert, region_map); - - let r1_insert = dist_insert.get(&1_u32).unwrap(); - assert_eq!("demo", r1_insert.table_name); - let expected: Value = 3_i16.into(); - assert_eq!(expected, r1_insert.columns_values.get("id").unwrap().get(0)); - let expected: Value = 1_i16.into(); - assert_eq!(expected, r1_insert.columns_values.get("id").unwrap().get(1)); - let expected: Value = "host3".into(); - assert_eq!( - expected, - r1_insert.columns_values.get("host").unwrap().get(0) - ); - let expected: Value = "host1".into(); - assert_eq!( - expected, - r1_insert.columns_values.get("host").unwrap().get(1) - ); - let expected: Value = true.into(); - assert_eq!( - expected, - r1_insert - .columns_values - .get("enable_reboot") - .unwrap() - .get(0) - ); - let expected: Value = true.into(); - assert_eq!( - expected, - r1_insert - .columns_values - .get("enable_reboot") - .unwrap() - .get(1) - ); - - let r2_insert = dist_insert.get(&2_u32).unwrap(); - assert_eq!("demo", r2_insert.table_name); - let expected: Value = 2_i16.into(); - assert_eq!(expected, r2_insert.columns_values.get("id").unwrap().get(0)); - assert_eq!( - Value::Null, - r2_insert.columns_values.get("host").unwrap().get(0) - ); - let expected: Value = false.into(); - assert_eq!( - expected, - r2_insert - .columns_values - .get("enable_reboot") - .unwrap() - .get(0) - ); - } - - #[test] - fn test_partition_columns() { - let insert = mock_insert_request(); - - let partition_column_names = vec!["host".to_string(), "id".to_string()]; - let columns = - find_partitioning_values(&insert.columns_values, &partition_column_names).unwrap(); - - let host_column = columns[0].clone(); - assert_eq!( - ConcreteDataType::String(StringType), - host_column.data_type() - ); - assert_eq!(1, host_column.null_count()); - - let id_column = columns[1].clone(); - assert_eq!(ConcreteDataType::int16_datatype(), id_column.data_type()); - assert_eq!(0, id_column.null_count()); - } - - #[test] - fn test_partition_values() { - let mut builder = BooleanVectorBuilder::with_capacity(3); - builder.push(Some(true)); - builder.push(Some(false)); - builder.push(Some(true)); - let v1 = builder.to_vector(); - - let mut builder = StringVectorBuilder::with_capacity(3); - builder.push(Some("host1")); - builder.push(None); - builder.push(Some("host3")); - let v2 = builder.to_vector(); - - let vectors = vec![v1, v2]; - - let row_0_vals = partition_values(&vectors, 0); - let expected: Vec = vec![true.into(), "host1".into()]; - assert_eq!(expected, row_0_vals); - - let row_1_vals = partition_values(&vectors, 1); - let expected: Vec = vec![false.into(), Value::Null]; - assert_eq!(expected, row_1_vals); - - let row_2_vals = partition_values(&vectors, 2); - let expected: Vec = vec![true.into(), "host3".into()]; - assert_eq!(expected, row_2_vals); - } + use crate::partition::PartitionExpr; + use crate::PartitionRule; fn mock_insert_request() -> InsertRequest { - let mut columns_values = HashMap::with_capacity(4); - let mut builder = BooleanVectorBuilder::with_capacity(3); - builder.push(Some(true)); - builder.push(Some(false)); - builder.push(Some(true)); - let _ = columns_values.insert("enable_reboot".to_string(), builder.to_vector()); - - let mut builder = StringVectorBuilder::with_capacity(3); - builder.push(Some("host1")); - builder.push(None); - builder.push(Some("host3")); - let _ = columns_values.insert("host".to_string(), builder.to_vector()); - - let mut builder = Int16VectorBuilder::with_capacity(3); - builder.push(Some(1_i16)); - builder.push(Some(2_i16)); - builder.push(Some(3_i16)); - let _ = columns_values.insert("id".to_string(), builder.to_vector()); - + let schema = vec![ + ColumnSchema { + column_name: "id".to_string(), + datatype: ColumnDataType::String as i32, + semantic_type: SemanticType::Tag as i32, + }, + ColumnSchema { + column_name: "name".to_string(), + datatype: ColumnDataType::String as i32, + semantic_type: SemanticType::Tag as i32, + }, + ColumnSchema { + column_name: "age".to_string(), + datatype: ColumnDataType::Uint32 as i32, + semantic_type: SemanticType::Field as i32, + }, + ]; + let rows = vec![ + Row { + values: vec![ + ValueData::StringValue("1".to_string()).into(), + ValueData::StringValue("Smith".to_string()).into(), + ValueData::U32Value(20).into(), + ], + }, + Row { + values: vec![ + ValueData::StringValue("2".to_string()).into(), + ValueData::StringValue("Johnson".to_string()).into(), + ValueData::U32Value(21).into(), + ], + }, + Row { + values: vec![ + ValueData::StringValue("3".to_string()).into(), + ValueData::StringValue("Williams".to_string()).into(), + ValueData::U32Value(22).into(), + ], + }, + ]; InsertRequest { - catalog_name: common_catalog::consts::DEFAULT_CATALOG_NAME.to_string(), - schema_name: common_catalog::consts::DEFAULT_SCHEMA_NAME.to_string(), - table_name: "demo".to_string(), - columns_values, - region_number: 0, - } - } - - fn mock_schema_and_insert_request_without_partition_columns() -> (Schema, InsertRequest) { - let mut columns_values = HashMap::with_capacity(4); - let mut builder = BooleanVectorBuilder::with_capacity(3); - builder.push(Some(true)); - builder.push(Some(false)); - builder.push(Some(true)); - let _ = columns_values.insert("enable_reboot".to_string(), builder.to_vector()); - - let mut builder = StringVectorBuilder::with_capacity(3); - builder.push(Some("host1")); - builder.push(None); - builder.push(Some("host3")); - let _ = columns_values.insert("host".to_string(), builder.to_vector()); - - let insert_request = InsertRequest { - catalog_name: common_catalog::consts::DEFAULT_CATALOG_NAME.to_string(), - schema_name: common_catalog::consts::DEFAULT_SCHEMA_NAME.to_string(), - table_name: "demo".to_string(), - columns_values, - region_number: 0, - }; - - let id_column = ColumnSchema::new("id", ConcreteDataType::Int16(Int16Type {}), false); - let id_column = id_column - .with_default_constraint(Some(ColumnDefaultConstraint::Value(Value::from(1_i16)))) - .unwrap(); - let mock_schema = DataTypesSchema::new(vec![ - ColumnSchema::new( - "enable_reboot", - ConcreteDataType::Boolean(BooleanType), - false, - ), - id_column, - ColumnSchema::new("host", ConcreteDataType::String(StringType), true), - ]); - - (mock_schema, insert_request) - } - - fn mock_wrong_insert_request() -> InsertRequest { - let mut columns_values = HashMap::with_capacity(4); - let mut builder = BooleanVectorBuilder::with_capacity(3); - builder.push(Some(true)); - builder.push(Some(false)); - builder.push(Some(true)); - let _ = columns_values.insert("enable_reboot".to_string(), builder.to_vector()); - - let mut builder = StringVectorBuilder::with_capacity(3); - builder.push(Some("host1")); - builder.push(None); - builder.push(Some("host3")); - let _ = columns_values.insert("host".to_string(), builder.to_vector()); - - let mut builder = Int16VectorBuilder::with_capacity(1); - builder.push(Some(1_i16)); - // two values are missing - let _ = columns_values.insert("id".to_string(), builder.to_vector()); - - InsertRequest { - catalog_name: common_catalog::consts::DEFAULT_CATALOG_NAME.to_string(), - schema_name: common_catalog::consts::DEFAULT_SCHEMA_NAME.to_string(), - table_name: "demo".to_string(), - columns_values, - region_number: 0, + rows: Some(Rows { schema, rows }), + region_id: 0, } } #[derive(Debug, Serialize, Deserialize)] struct MockPartitionRule; - // PARTITION BY LIST COLUMNS(id) ( - // PARTITION r0 VALUES IN(1), - // PARTITION r1 VALUES IN(2, 3), - // ); impl PartitionRule for MockPartitionRule { fn as_any(&self) -> &dyn Any { self @@ -683,21 +236,44 @@ mod tests { vec!["id".to_string()] } - fn find_region(&self, values: &[Value]) -> Result { + fn find_region(&self, values: &[Value]) -> Result { let val = values.get(0).unwrap().clone(); - let id_1: Value = 1_i16.into(); - let id_2: Value = 2_i16.into(); - let id_3: Value = 3_i16.into(); - if val == id_1 { - return Ok(0); - } - if val == id_2 || val == id_3 { - return Ok(1); - } - unreachable!() + let val = match val { + Value::String(v) => v.as_utf8().to_string(), + _ => unreachable!(), + }; + + Ok(val.parse::().unwrap() % 2) } - fn find_regions_by_exprs(&self, _: &[PartitionExpr]) -> Result, Error> { + fn find_regions_by_exprs(&self, _: &[PartitionExpr]) -> Result> { + unimplemented!() + } + } + + #[derive(Debug, Serialize, Deserialize)] + struct MockMissedColPartitionRule; + + impl PartitionRule for MockMissedColPartitionRule { + fn as_any(&self) -> &dyn Any { + self + } + + fn partition_columns(&self) -> Vec { + vec!["missed_col".to_string()] + } + + fn find_region(&self, values: &[Value]) -> Result { + let val = values.get(0).unwrap().clone(); + let val = match val { + Value::Null => 1, + _ => 0, + }; + + Ok(val) + } + + fn find_regions_by_exprs(&self, _: &[PartitionExpr]) -> Result> { unimplemented!() } } @@ -705,9 +281,6 @@ mod tests { #[derive(Debug, Serialize, Deserialize)] struct EmptyPartitionRule; - // PARTITION BY LIST COLUMNS() ( - // PARTITION r0 VALUES LESS THAN MAXVALUE, - // ); impl PartitionRule for EmptyPartitionRule { fn as_any(&self) -> &dyn Any { self @@ -717,12 +290,64 @@ mod tests { vec![] } - fn find_region(&self, _: &[Value]) -> Result { + fn find_region(&self, _values: &[Value]) -> Result { Ok(0) } - fn find_regions_by_exprs(&self, _: &[PartitionExpr]) -> Result, Error> { + fn find_regions_by_exprs(&self, _: &[PartitionExpr]) -> Result> { unimplemented!() } } + + #[test] + fn test_writer_splitter() { + let insert_request = mock_insert_request(); + let rule = Arc::new(MockPartitionRule) as PartitionRuleRef; + let splitter = RowSplitter::new(rule); + let splits = splitter.split_insert(insert_request).unwrap(); + + assert_eq!(splits.len(), 2); + + let req0 = &splits[&0]; + let req1 = &splits[&1]; + assert_eq!(req0.region_id, 0); + assert_eq!(req1.region_id, 1); + + let rows0 = req0.rows.as_ref().unwrap(); + let rows1 = req1.rows.as_ref().unwrap(); + assert_eq!(rows0.rows.len(), 1); + assert_eq!(rows1.rows.len(), 2); + } + + #[test] + fn test_missed_col_writer_splitter() { + let insert_request = mock_insert_request(); + let rule = Arc::new(MockMissedColPartitionRule) as PartitionRuleRef; + let splitter = RowSplitter::new(rule); + let splits = splitter.split_insert(insert_request).unwrap(); + + assert_eq!(splits.len(), 1); + + let req = &splits[&1]; + assert_eq!(req.region_id, 1); + + let rows = req.rows.as_ref().unwrap(); + assert_eq!(rows.rows.len(), 3); + } + + #[test] + fn test_empty_partition_rule_writer_splitter() { + let insert_request = mock_insert_request(); + let rule = Arc::new(EmptyPartitionRule) as PartitionRuleRef; + let splitter = RowSplitter::new(rule); + let splits = splitter.split_insert(insert_request).unwrap(); + + assert_eq!(splits.len(), 1); + + let req = &splits[&0]; + assert_eq!(req.region_id, 0); + + let rows = req.rows.as_ref().unwrap(); + assert_eq!(rows.rows.len(), 3); + } }