feat(frontend): migrate delete to region server (#2329)

* feat(frontend): migrate delete to region server

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* feat: add more check and do trim columns

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* refactor: RegionRequestHandler.handle retrun AffectedRows

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
Zhenchi
2023-09-06 16:10:06 +08:00
committed by Ruihang Xia
parent 1e44e86d81
commit dac6b2e80a
33 changed files with 1232 additions and 1708 deletions

View File

@@ -164,7 +164,7 @@ impl RegionRequester {
}
}
fn check_response_header(header: Option<ResponseHeader>) -> Result<()> {
pub fn check_response_header(header: Option<ResponseHeader>) -> Result<()> {
let status = header
.and_then(|header| header.status)
.context(IllegalDatabaseResponseSnafu {

View File

@@ -14,8 +14,9 @@
use std::sync::Arc;
use api::v1::region::{region_request, QueryRequest, RegionResponse};
use api::v1::region::{region_request, QueryRequest};
use async_trait::async_trait;
use common_meta::datanode_manager::AffectedRows;
use common_recordbatch::SendableRecordBatchStream;
use session::context::QueryContextRef;
@@ -27,7 +28,7 @@ pub trait RegionRequestHandler: Send + Sync {
&self,
request: region_request::Body,
ctx: QueryContextRef,
) -> Result<RegionResponse>;
) -> Result<AffectedRows>;
// TODO(ruihang): add trace id and span id in the request.
async fn do_get(&self, request: QueryRequest) -> Result<SendableRecordBatchStream>;

168
src/frontend/src/delete.rs Normal file
View File

@@ -0,0 +1,168 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use std::{iter, mem};
use api::v1::region::region_request;
use api::v1::{DeleteRequests, RowDeleteRequest, RowDeleteRequests};
use catalog::CatalogManager;
use client::region_handler::RegionRequestHandler;
use common_query::Output;
use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
use table::TableRef;
use crate::error::{
CatalogSnafu, InvalidDeleteRequestSnafu, MissingTimeIndexColumnSnafu, RequestDatanodeSnafu,
Result, TableNotFoundSnafu,
};
use crate::req_convert::delete::{ColumnToRow, RowToRegion};
pub(crate) struct Deleter<'a> {
catalog_manager: &'a dyn CatalogManager,
region_request_handler: &'a dyn RegionRequestHandler,
}
impl<'a> Deleter<'a> {
pub fn new(
catalog_manager: &'a dyn CatalogManager,
region_request_handler: &'a dyn RegionRequestHandler,
) -> Self {
Self {
catalog_manager,
region_request_handler,
}
}
pub async fn handle_column_deletes(
&self,
requests: DeleteRequests,
ctx: QueryContextRef,
) -> Result<Output> {
let row_deletes = ColumnToRow::convert(requests)?;
self.handle_row_deletes(row_deletes, ctx).await
}
pub async fn handle_row_deletes(
&self,
mut requests: RowDeleteRequests,
ctx: QueryContextRef,
) -> Result<Output> {
// remove empty requests
requests.deletes.retain(|req| {
req.rows
.as_ref()
.map(|r| !r.rows.is_empty())
.unwrap_or_default()
});
validate_row_count_match(&requests)?;
let requests = self.trim_columns(requests, &ctx).await?;
let region_request = RowToRegion::new(self.catalog_manager, &ctx)
.convert(requests)
.await?;
let region_request = region_request::Body::Deletes(region_request);
let affected_rows = self
.region_request_handler
.handle(region_request, ctx)
.await
.context(RequestDatanodeSnafu)?;
Ok(Output::AffectedRows(affected_rows as _))
}
}
impl<'a> Deleter<'a> {
async fn trim_columns(
&self,
mut requests: RowDeleteRequests,
ctx: &QueryContextRef,
) -> Result<RowDeleteRequests> {
for req in &mut requests.deletes {
let table = self.get_table(req, ctx).await?;
let key_column_names = self.key_column_names(&table)?;
let rows = req.rows.as_mut().unwrap();
let all_key = rows
.schema
.iter()
.all(|column| key_column_names.contains(&column.column_name));
if all_key {
continue;
}
for row in &mut rows.rows {
let source_values = mem::take(&mut row.values);
row.values = rows
.schema
.iter()
.zip(source_values)
.filter_map(|(column, v)| {
key_column_names.contains(&column.column_name).then_some(v)
})
.collect();
}
rows.schema
.retain(|column| key_column_names.contains(&column.column_name));
}
Ok(requests)
}
fn key_column_names(&self, table: &TableRef) -> Result<HashSet<String>> {
let time_index = table
.schema()
.timestamp_column()
.with_context(|| table::error::MissingTimeIndexColumnSnafu {
table_name: table.table_info().name.clone(),
})
.context(MissingTimeIndexColumnSnafu)?
.name
.clone();
let key_column_names = table
.table_info()
.meta
.row_key_column_names()
.cloned()
.chain(iter::once(time_index))
.collect();
Ok(key_column_names)
}
async fn get_table(&self, req: &RowDeleteRequest, ctx: &QueryContextRef) -> Result<TableRef> {
self.catalog_manager
.table(ctx.current_catalog(), ctx.current_schema(), &req.table_name)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: req.table_name.clone(),
})
}
}
fn validate_row_count_match(requests: &RowDeleteRequests) -> Result<()> {
for request in &requests.deletes {
let rows = request.rows.as_ref().unwrap();
let column_count = rows.schema.len();
ensure!(
rows.rows.iter().all(|r| r.values.len() == column_count),
InvalidDeleteRequestSnafu {
reason: "row count mismatch"
}
)
}
Ok(())
}

View File

@@ -63,6 +63,12 @@ pub enum Error {
source: common_meta::error::Error,
},
#[snafu(display("Failed to delete data, source: {}", source))]
RequestDeletes {
#[snafu(backtrace)]
source: common_meta::error::Error,
},
#[snafu(display("Runtime resource error, source: {}", source))]
RuntimeResource {
#[snafu(backtrace)]
@@ -146,6 +152,9 @@ pub enum Error {
#[snafu(display("Invalid InsertRequest, reason: {}", reason))]
InvalidInsertRequest { reason: String, location: Location },
#[snafu(display("Invalid DeleteRequest, reason: {}", reason))]
InvalidDeleteRequest { reason: String, location: Location },
#[snafu(display("Table not found: {}", table_name))]
TableNotFound {
table_name: String,
@@ -663,6 +672,7 @@ impl ErrorExt for Error {
Error::ParseAddr { .. }
| Error::InvalidSql { .. }
| Error::InvalidInsertRequest { .. }
| Error::InvalidDeleteRequest { .. }
| Error::IllegalPrimaryKeysDef { .. }
| Error::CatalogNotFound { .. }
| Error::SchemaNotFound { .. }
@@ -711,6 +721,7 @@ impl ErrorExt for Error {
Error::RequestDatanode { source } => source.status_code(),
Error::RequestInserts { source } => source.status_code(),
Error::RequestDeletes { source } => source.status_code(),
Error::ColumnDataType { source } | Error::InvalidColumnDef { source, .. } => {
source.status_code()

View File

@@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub(crate) mod req_convert;
use api::v1::alter_expr::Kind;
use api::v1::ddl_request::Expr as DdlExpr;
use api::v1::greptime_request::Request;
@@ -21,8 +19,8 @@ use api::v1::region::region_request;
use api::v1::{
AlterExpr, ColumnSchema, DdlRequest, InsertRequests, RowInsertRequest, RowInsertRequests,
};
use catalog::CatalogManagerRef;
use client::region_handler::RegionRequestHandlerRef;
use catalog::CatalogManager;
use client::region_handler::RegionRequestHandler;
use common_catalog::consts::default_engine;
use common_grpc_expr::util::{extract_new_columns, ColumnExpr};
use common_query::Output;
@@ -34,26 +32,26 @@ use snafu::prelude::*;
use table::engine::TableReference;
use table::TableRef;
use self::req_convert::{ColumnToRow, RowToRegion};
use crate::error::{
CatalogSnafu, EmptyDataSnafu, Error, FindNewColumnsOnInsertionSnafu, InvalidInsertRequestSnafu,
CatalogSnafu, Error, FindNewColumnsOnInsertionSnafu, InvalidInsertRequestSnafu,
RequestDatanodeSnafu, Result,
};
use crate::expr_factory::CreateExprFactory;
use crate::req_convert::insert::{ColumnToRow, RowToRegion};
pub(crate) struct Inserter<'a> {
catalog_manager: &'a CatalogManagerRef,
catalog_manager: &'a dyn CatalogManager,
create_expr_factory: &'a CreateExprFactory,
grpc_query_handler: &'a GrpcQueryHandlerRef<Error>,
region_request_handler: &'a RegionRequestHandlerRef,
region_request_handler: &'a dyn RegionRequestHandler,
}
impl<'a> Inserter<'a> {
pub fn new(
catalog_manager: &'a CatalogManagerRef,
catalog_manager: &'a dyn CatalogManager,
create_expr_factory: &'a CreateExprFactory,
grpc_query_handler: &'a GrpcQueryHandlerRef<Error>,
region_request_handler: &'a RegionRequestHandlerRef,
region_request_handler: &'a dyn RegionRequestHandler,
) -> Self {
Self {
catalog_manager,
@@ -74,29 +72,31 @@ impl<'a> Inserter<'a> {
pub async fn handle_row_inserts(
&self,
requests: RowInsertRequests,
mut requests: RowInsertRequests,
ctx: QueryContextRef,
) -> Result<Output> {
requests.inserts.iter().try_for_each(|req| {
let non_empty = req.rows.as_ref().map(|r| !r.rows.is_empty());
let non_empty = non_empty.unwrap_or_default();
non_empty.then_some(()).with_context(|| EmptyDataSnafu {
msg: format!("insert to table: {:?}", &req.table_name),
})
})?;
// remove empty requests
requests.inserts.retain(|req| {
req.rows
.as_ref()
.map(|r| !r.rows.is_empty())
.unwrap_or_default()
});
validate_row_count_match(&requests)?;
self.create_or_alter_tables_on_demand(&requests, &ctx)
.await?;
let region_request = RowToRegion::new(self.catalog_manager.as_ref(), &ctx)
let region_request = RowToRegion::new(self.catalog_manager, &ctx)
.convert(requests)
.await?;
let region_request = region_request::Body::Inserts(region_request);
let response = self
let affected_rows = self
.region_request_handler
.handle(region_request, ctx)
.await
.context(RequestDatanodeSnafu)?;
Ok(Output::AffectedRows(response.affected_rows as _))
Ok(Output::AffectedRows(affected_rows as _))
}
}
@@ -205,6 +205,20 @@ impl<'a> Inserter<'a> {
}
}
fn validate_row_count_match(requests: &RowInsertRequests) -> Result<()> {
for request in &requests.inserts {
let rows = request.rows.as_ref().unwrap();
let column_count = rows.schema.len();
ensure!(
rows.rows.iter().all(|r| r.values.len() == column_count),
InvalidInsertRequestSnafu {
reason: "row count mismatch"
}
)
}
Ok(())
}
fn validate_request_with_table(req: &RowInsertRequest, table: &TableRef) -> Result<()> {
let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
let table_schema = table.schema();

View File

@@ -26,7 +26,7 @@ use std::sync::Arc;
use std::time::Duration;
use api::v1::meta::Role;
use api::v1::{InsertRequests, RowInsertRequests};
use api::v1::{DeleteRequests, InsertRequests, RowDeleteRequests, RowInsertRequests};
use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use catalog::remote::CachedMetaKvBackend;
@@ -77,6 +77,7 @@ use sqlparser::ast::ObjectName;
use self::distributed::DistRegionRequestHandler;
use self::standalone::StandaloneRegionRequestHandler;
use crate::catalog::FrontendCatalogManager;
use crate::delete::Deleter;
use crate::error::{
self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, MissingMetasrvOptsSnafu,
ParseSqlSnafu, PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu,
@@ -85,7 +86,7 @@ use crate::expr_factory::CreateExprFactory;
use crate::frontend::FrontendOptions;
use crate::heartbeat::handler::invalidate_table_cache::InvalidateTableCacheHandler;
use crate::heartbeat::HeartbeatTask;
use crate::inserter::Inserter;
use crate::insert::Inserter;
use crate::instance::standalone::StandaloneGrpcQueryHandler;
use crate::metrics;
use crate::script::ScriptExecutor;
@@ -310,10 +311,10 @@ impl Instance {
ctx: QueryContextRef,
) -> Result<Output> {
let inserter = Inserter::new(
&self.catalog_manager,
self.catalog_manager.as_ref(),
&self.create_expr_factory,
&self.grpc_query_handler,
&self.region_request_handler,
self.region_request_handler.as_ref(),
);
inserter.handle_row_inserts(requests, ctx).await
}
@@ -325,14 +326,40 @@ impl Instance {
ctx: QueryContextRef,
) -> Result<Output> {
let inserter = Inserter::new(
&self.catalog_manager,
self.catalog_manager.as_ref(),
&self.create_expr_factory,
&self.grpc_query_handler,
&self.region_request_handler,
self.region_request_handler.as_ref(),
);
inserter.handle_column_inserts(requests, ctx).await
}
/// Handle batch deletes with row-format
pub async fn handle_row_deletes(
&self,
requests: RowDeleteRequests,
ctx: QueryContextRef,
) -> Result<Output> {
let deleter = Deleter::new(
self.catalog_manager.as_ref(),
self.region_request_handler.as_ref(),
);
deleter.handle_row_deletes(requests, ctx).await
}
/// Handle batch deletes
pub async fn handle_deletes(
&self,
requests: DeleteRequests,
ctx: QueryContextRef,
) -> Result<Output> {
let deleter = Deleter::new(
self.catalog_manager.as_ref(),
self.region_request_handler.as_ref(),
);
deleter.handle_column_deletes(requests, ctx).await
}
pub fn set_plugins(&mut self, map: Arc<Plugins>) {
self.plugins = map;
}

View File

@@ -21,10 +21,8 @@ use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;
use api::v1::ddl_request::Expr as DdlExpr;
use api::v1::greptime_request::Request;
use api::v1::region::{region_request, QueryRequest, RegionResponse};
use api::v1::{
column_def, AlterExpr, CreateDatabaseExpr, CreateTableExpr, DeleteRequests, TruncateTableExpr,
};
use api::v1::region::{region_request, QueryRequest};
use api::v1::{column_def, AlterExpr, CreateDatabaseExpr, CreateTableExpr, TruncateTableExpr};
use arrow_flight::Ticket;
use async_trait::async_trait;
use catalog::{CatalogManager, DeregisterTableRequest, RegisterTableRequest};
@@ -35,6 +33,7 @@ use client::region_handler::RegionRequestHandler;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_catalog::format_full_table_name;
use common_error::ext::BoxedError;
use common_meta::datanode_manager::AffectedRows;
use common_meta::ddl::{DdlExecutorRef, ExecutorContext};
use common_meta::key::schema_name::{SchemaNameKey, SchemaNameValue};
use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse};
@@ -71,7 +70,6 @@ use crate::error::{
TableMetadataManagerSnafu, TableNotFoundSnafu, UnrecognizedTableOptionSnafu,
};
use crate::expr_factory;
use crate::inserter::req_convert::StatementToRegion;
use crate::instance::distributed::deleter::DistDeleter;
use crate::instance::distributed::inserter::DistInserter;
use crate::table::DistTable;
@@ -269,14 +267,6 @@ impl DistInstance {
let table_name = TableName::new(catalog, schema, table);
self.drop_table(table_name).await
}
Statement::Insert(insert) => {
let request = StatementToRegion::new(self.catalog_manager.as_ref(), &query_ctx)
.convert(&insert)
.await?;
let inserter = DistInserter::new(&self.catalog_manager);
let affected_rows = inserter.insert(request).await?;
Ok(Output::AffectedRows(affected_rows as usize))
}
Statement::ShowCreateTable(show) => {
let (catalog, schema, table) =
table_idents_to_full_name(&show.table_name, query_ctx.clone())
@@ -507,20 +497,6 @@ impl DistInstance {
.context(error::ExecuteDdlSnafu)
}
async fn handle_dist_delete(
&self,
request: DeleteRequests,
ctx: QueryContextRef,
) -> Result<Output> {
let deleter = DistDeleter::new(
ctx.current_catalog().to_string(),
ctx.current_schema().to_string(),
self.catalog_manager(),
);
let affected_rows = deleter.grpc_delete(request).await?;
Ok(Output::AffectedRows(affected_rows))
}
pub fn catalog_manager(&self) -> Arc<FrontendCatalogManager> {
self.catalog_manager.clone()
}
@@ -547,6 +523,7 @@ impl GrpcQueryHandler for DistInstance {
async fn do_query(&self, request: Request, ctx: QueryContextRef) -> Result<Output> {
match request {
Request::Inserts(_) => NotSupportedSnafu { feat: "inserts" }.fail(),
Request::Deletes(_) => NotSupportedSnafu { feat: "deletes" }.fail(),
Request::RowInserts(_) => NotSupportedSnafu {
feat: "row inserts",
}
@@ -555,7 +532,6 @@ impl GrpcQueryHandler for DistInstance {
feat: "row deletes",
}
.fail(),
Request::Deletes(requests) => self.handle_dist_delete(requests, ctx).await,
Request::Query(_) => {
unreachable!("Query should have been handled directly in Frontend Instance!")
}
@@ -602,7 +578,7 @@ impl RegionRequestHandler for DistRegionRequestHandler {
&self,
request: region_request::Body,
ctx: QueryContextRef,
) -> ClientResult<RegionResponse> {
) -> ClientResult<AffectedRows> {
self.handle_inner(request, ctx)
.await
.map_err(BoxedError::new)
@@ -622,21 +598,17 @@ impl DistRegionRequestHandler {
&self,
request: region_request::Body,
ctx: QueryContextRef,
) -> Result<RegionResponse> {
) -> Result<AffectedRows> {
match request {
region_request::Body::Inserts(inserts) => {
let inserter =
DistInserter::new(&self.catalog_manager).with_trace_id(ctx.trace_id());
let affected_rows = inserter.insert(inserts).await? as _;
Ok(RegionResponse {
header: Some(Default::default()),
affected_rows,
})
inserter.insert(inserts).await
}
region_request::Body::Deletes(_) => NotSupportedSnafu {
feat: "region deletes",
region_request::Body::Deletes(deletes) => {
let deleter = DistDeleter::new(&self.catalog_manager).with_trace_id(ctx.trace_id());
deleter.delete(deletes).await
}
.fail(),
region_request::Body::Create(_) => NotSupportedSnafu {
feat: "region create",
}

View File

@@ -13,164 +13,123 @@
// limitations under the License.
use std::collections::HashMap;
use std::iter;
use std::sync::Arc;
use api::v1::DeleteRequests;
use catalog::CatalogManager;
use client::Database;
use common_grpc_expr::delete::to_table_delete_request;
use api::v1::region::{region_request, DeleteRequests, RegionRequest, RegionRequestHeader};
use common_meta::datanode_manager::{AffectedRows, DatanodeManager};
use common_meta::peer::Peer;
use common_meta::table_name::TableName;
use futures::future;
use metrics::counter;
use snafu::{OptionExt, ResultExt};
use table::requests::DeleteRequest;
use store_api::storage::RegionId;
use crate::catalog::FrontendCatalogManager;
use crate::error::{
CatalogSnafu, FindDatanodeSnafu, FindTableRouteSnafu, JoinTaskSnafu,
MissingTimeIndexColumnSnafu, RequestDatanodeSnafu, Result, SplitDeleteSnafu,
TableNotFoundSnafu, ToTableDeleteRequestSnafu,
FindDatanodeSnafu, FindTableRouteSnafu, JoinTaskSnafu, RequestDeletesSnafu, Result,
SplitDeleteSnafu,
};
use crate::table::delete::to_grpc_delete_request;
/// A distributed deleter. It ingests GRPC [DeleteRequests] or table [DeleteRequest] (so it can be
/// used in protocol handlers or table deletion API).
/// A distributed deleter. It ingests gRPC [DeleteRequests].
///
/// Table data partitioning and Datanode requests batching are handled inside.
///
/// Note that the deleter is confined to a single catalog and schema. I.e., it cannot handle
/// multiple deletes requests with different catalog or schema (will throw "NotSupported" error).
/// This is because we currently do not have this kind of requirements. Let's keep it simple for now.
pub(crate) struct DistDeleter {
catalog: String,
schema: String,
catalog_manager: Arc<FrontendCatalogManager>,
pub struct DistDeleter<'a> {
catalog_manager: &'a FrontendCatalogManager,
trace_id: Option<u64>,
span_id: Option<u64>,
}
impl DistDeleter {
pub(crate) fn new(
catalog: String,
schema: String,
catalog_manager: Arc<FrontendCatalogManager>,
) -> Self {
impl<'a> DistDeleter<'a> {
pub(crate) fn new(catalog_manager: &'a FrontendCatalogManager) -> Self {
Self {
catalog,
schema,
catalog_manager,
trace_id: None,
span_id: None,
}
}
pub async fn grpc_delete(&self, requests: DeleteRequests) -> Result<usize> {
let deletes = requests
.deletes
.into_iter()
.map(|delete| {
to_table_delete_request(&self.catalog, &self.schema, delete)
.context(ToTableDeleteRequestSnafu)
})
.collect::<Result<Vec<_>>>()?;
self.delete(deletes).await
pub fn with_trace_id(mut self, trace_id: u64) -> Self {
self.trace_id = Some(trace_id);
self
}
pub(crate) async fn delete(&self, requests: Vec<DeleteRequest>) -> Result<usize> {
debug_assert!(requests
.iter()
.all(|x| x.catalog_name == self.catalog && x.schema_name == self.schema));
let deletes = self.split_deletes(requests).await?;
self.request_datanodes(deletes).await
#[allow(dead_code)]
pub fn with_span_id(mut self, span_id: u64) -> Self {
self.span_id = Some(span_id);
self
}
async fn split_deletes(
&self,
requests: Vec<DeleteRequest>,
) -> Result<HashMap<Peer, DeleteRequests>> {
let partition_manager = self.catalog_manager.partition_manager();
let mut deletes = HashMap::new();
for request in requests {
let table_name = &request.table_name;
let table = self
.catalog_manager
.table(&self.catalog, &self.schema, table_name)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: table_name.to_string(),
})?;
let table_info = table.table_info();
let table_meta = &table_info.meta;
let table_id = table_info.table_id();
let table_name = &request.table_name;
let schema = table.schema();
let time_index = &schema
.timestamp_column()
.with_context(|| table::error::MissingTimeIndexColumnSnafu {
table_name: table_name.to_string(),
})
.context(MissingTimeIndexColumnSnafu)?
.name;
let primary_key_column_names = table_info
.meta
.row_key_column_names()
.chain(iter::once(time_index))
.collect::<Vec<_>>();
let table_name = request.table_name.clone();
let split = partition_manager
.split_delete_request(table_id, request, primary_key_column_names)
.await
.context(SplitDeleteSnafu)?;
let table_route = partition_manager
.find_table_route(table_id)
.await
.with_context(|_| FindTableRouteSnafu { table_id })?;
for (region_number, delete) in split {
let datanode =
table_route
.find_region_leader(region_number)
.context(FindDatanodeSnafu {
region: region_number,
})?;
let table_name = TableName::new(&self.catalog, &self.schema, &table_name);
let delete =
to_grpc_delete_request(table_meta, &table_name, region_number, delete)?;
deletes
.entry(datanode.clone())
.or_insert_with(|| DeleteRequests { deletes: vec![] })
.deletes
.push(delete);
}
}
Ok(deletes)
}
async fn request_datanodes(&self, deletes: HashMap<Peer, DeleteRequests>) -> Result<usize> {
let results = future::try_join_all(deletes.into_iter().map(|(peer, deletes)| {
pub(crate) async fn delete(&self, requests: DeleteRequests) -> Result<AffectedRows> {
let requests = self.split(requests).await?;
let trace_id = self.trace_id.unwrap_or_default();
let span_id = self.span_id.unwrap_or_default();
let results = future::try_join_all(requests.into_iter().map(|(peer, deletes)| {
let datanode_clients = self.catalog_manager.datanode_clients();
let catalog = self.catalog.clone();
let schema = self.schema.clone();
common_runtime::spawn_write(async move {
let client = datanode_clients.get_client(&peer).await;
let database = Database::new(&catalog, &schema, client);
database.delete(deletes).await.context(RequestDatanodeSnafu)
let request = RegionRequest {
header: Some(RegionRequestHeader { trace_id, span_id }),
body: Some(region_request::Body::Deletes(deletes)),
};
datanode_clients
.datanode(&peer)
.await
.handle(request)
.await
.context(RequestDeletesSnafu)
})
}))
.await
.context(JoinTaskSnafu)?;
let affected_rows = results.into_iter().sum::<Result<u32>>()?;
Ok(affected_rows as usize)
let affected_rows = results.into_iter().sum::<Result<u64>>()?;
counter!(crate::metrics::DIST_DELETE_ROW_COUNT, affected_rows);
Ok(affected_rows)
}
/// Splits gRPC [DeleteRequests] into multiple gRPC [DeleteRequests]s, each of which
/// is grouped by the peer of Datanode, so we can batch them together when invoking gRPC write
/// method in Datanode.
async fn split(&self, requests: DeleteRequests) -> Result<HashMap<Peer, DeleteRequests>> {
let partition_manager = self.catalog_manager.partition_manager();
let mut deletes: HashMap<Peer, DeleteRequests> = HashMap::new();
for req in requests.requests {
let table_id = RegionId::from_u64(req.region_id).table_id();
let req_splits = partition_manager
.split_delete_request(table_id, req)
.await
.context(SplitDeleteSnafu)?;
let table_route = partition_manager
.find_table_route(table_id)
.await
.context(FindTableRouteSnafu { table_id })?;
for (region_number, delete) in req_splits {
let peer =
table_route
.find_region_leader(region_number)
.context(FindDatanodeSnafu {
region: region_number,
})?;
deletes
.entry(peer.clone())
.or_default()
.requests
.push(delete);
}
}
Ok(deletes)
}
}
#[cfg(test)]
mod tests {
use api::v1::column::Values;
use api::v1::{Column, ColumnDataType, DeleteRequest as GrpcDeleteRequest, SemanticType};
use std::sync::Arc;
use api::helper::vectors_to_rows;
use api::v1::region::DeleteRequest;
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value};
use client::client_manager::DatanodeClients;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_meta::helper::{CatalogValue, SchemaValue};
@@ -182,7 +141,7 @@ mod tests {
use common_meta::rpc::router::{Region, RegionRoute};
use common_meta::rpc::store::PutRequest;
use datatypes::prelude::{ConcreteDataType, VectorRef};
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, Schema};
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema as DtColumnSchema, Schema};
use datatypes::vectors::Int32Vector;
use table::metadata::{RawTableInfo, TableInfoBuilder, TableMetaBuilder};
@@ -220,14 +179,14 @@ mod tests {
table_metadata_manager: &TableMetadataManagerRef,
) {
let schema = Arc::new(Schema::new(vec![
ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), false)
DtColumnSchema::new("ts", ConcreteDataType::int64_datatype(), false)
.with_time_index(true)
.with_default_constraint(Some(ColumnDefaultConstraint::Function(
"current_timestamp()".to_string(),
)))
.unwrap(),
ColumnSchema::new("a", ConcreteDataType::int32_datatype(), true),
ColumnSchema::new("value", ConcreteDataType::int32_datatype(), false),
DtColumnSchema::new("a", ConcreteDataType::int32_datatype(), true),
DtColumnSchema::new("value", ConcreteDataType::int32_datatype(), false),
]));
let table_meta = TableMetaBuilder::default()
@@ -284,56 +243,60 @@ mod tests {
));
let new_delete_request = |vector: VectorRef| -> DeleteRequest {
let row_count = vector.len();
DeleteRequest {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table_name.to_string(),
key_column_values: HashMap::from([("a".to_string(), vector)]),
region_id: RegionId::new(1, 0).into(),
rows: Some(Rows {
schema: vec![ColumnSchema {
column_name: "a".to_string(),
datatype: ColumnDataType::Int32 as i32,
semantic_type: SemanticType::Tag as i32,
}],
rows: vectors_to_rows([vector].iter(), row_count),
}),
}
};
let requests = vec![
new_delete_request(Arc::new(Int32Vector::from(vec![
Some(1),
Some(11),
Some(50),
]))),
new_delete_request(Arc::new(Int32Vector::from(vec![
Some(2),
Some(12),
Some(102),
]))),
];
let requests = DeleteRequests {
requests: vec![
new_delete_request(Arc::new(Int32Vector::from(vec![
Some(1),
Some(11),
Some(50),
]))),
new_delete_request(Arc::new(Int32Vector::from(vec![
Some(2),
Some(12),
Some(102),
]))),
],
};
let deleter = DistDeleter::new(
DEFAULT_CATALOG_NAME.to_string(),
DEFAULT_SCHEMA_NAME.to_string(),
catalog_manager,
);
let mut deletes = deleter.split_deletes(requests).await.unwrap();
let deleter = DistDeleter::new(&catalog_manager);
let mut deletes = deleter.split(requests).await.unwrap();
assert_eq!(deletes.len(), 3);
let new_grpc_delete_request = |column_values: Vec<i32>,
null_mask: Vec<u8>,
row_count: u32,
region_number: u32|
-> GrpcDeleteRequest {
GrpcDeleteRequest {
table_name: table_name.to_string(),
key_columns: vec![Column {
column_name: "a".to_string(),
semantic_type: SemanticType::Tag as i32,
values: Some(Values {
i32_values: column_values,
..Default::default()
let new_split_delete_request =
|rows: Vec<Option<i32>>, region_id: RegionId| -> DeleteRequest {
DeleteRequest {
region_id: region_id.into(),
rows: Some(Rows {
schema: vec![ColumnSchema {
column_name: "a".to_string(),
datatype: ColumnDataType::Int32 as i32,
semantic_type: SemanticType::Tag as i32,
}],
rows: rows
.into_iter()
.map(|v| Row {
values: vec![Value {
value_data: v.map(ValueData::I32Value),
}],
})
.collect(),
}),
null_mask,
datatype: ColumnDataType::Int32 as i32,
}],
row_count,
region_number,
}
};
}
};
// region to datanode placement:
// 1 -> 1
@@ -345,38 +308,38 @@ mod tests {
// 2 -> [10, 50)
// 3 -> (min, 10)
let datanode_deletes = deletes.remove(&Peer::new(1, "")).unwrap().deletes;
let datanode_deletes = deletes.remove(&Peer::new(1, "")).unwrap().requests;
assert_eq!(datanode_deletes.len(), 2);
assert_eq!(
datanode_deletes[0],
new_grpc_delete_request(vec![50], vec![0], 1, 1)
new_split_delete_request(vec![Some(50)], RegionId::new(1, 1))
);
assert_eq!(
datanode_deletes[1],
new_grpc_delete_request(vec![102], vec![0], 1, 1)
new_split_delete_request(vec![Some(102)], RegionId::new(1, 1))
);
let datanode_deletes = deletes.remove(&Peer::new(2, "")).unwrap().deletes;
let datanode_deletes = deletes.remove(&Peer::new(2, "")).unwrap().requests;
assert_eq!(datanode_deletes.len(), 2);
assert_eq!(
datanode_deletes[0],
new_grpc_delete_request(vec![11], vec![0], 1, 2)
new_split_delete_request(vec![Some(11)], RegionId::new(1, 2))
);
assert_eq!(
datanode_deletes[1],
new_grpc_delete_request(vec![12], vec![0], 1, 2)
new_split_delete_request(vec![Some(12)], RegionId::new(1, 2))
);
let datanode_deletes = deletes.remove(&Peer::new(3, "")).unwrap().deletes;
let datanode_deletes = deletes.remove(&Peer::new(3, "")).unwrap().requests;
assert_eq!(datanode_deletes.len(), 2);
assert_eq!(
datanode_deletes[0],
new_grpc_delete_request(vec![1], vec![0], 1, 3)
new_split_delete_request(vec![Some(1)], RegionId::new(1, 3))
);
assert_eq!(
datanode_deletes[1],
new_grpc_delete_request(vec![2], vec![0], 1, 3)
new_split_delete_request(vec![Some(2)], RegionId::new(1, 3))
);
}
}

View File

@@ -15,7 +15,7 @@
use std::collections::HashMap;
use api::v1::region::{region_request, InsertRequests, RegionRequest, RegionRequestHeader};
use common_meta::datanode_manager::DatanodeManager;
use common_meta::datanode_manager::{AffectedRows, DatanodeManager};
use common_meta::peer::Peer;
use futures_util::future;
use metrics::counter;
@@ -57,7 +57,7 @@ impl<'a> DistInserter<'a> {
self
}
pub(crate) async fn insert(&self, requests: InsertRequests) -> Result<u64> {
pub(crate) async fn insert(&self, requests: InsertRequests) -> Result<AffectedRows> {
let requests = self.split(requests).await?;
let trace_id = self.trace_id.unwrap_or_default();
let span_id = self.span_id.unwrap_or_default();

View File

@@ -45,12 +45,8 @@ impl GrpcQueryHandler for Instance {
let output = match request {
Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?,
Request::RowInserts(requests) => self.handle_row_inserts(requests, ctx.clone()).await?,
Request::RowDeletes(_) => {
return NotSupportedSnafu {
feat: "row deletes",
}
.fail();
}
Request::Deletes(requests) => self.handle_deletes(requests, ctx.clone()).await?,
Request::RowDeletes(requests) => self.handle_row_deletes(requests, ctx.clone()).await?,
Request::Query(query_request) => {
let query = query_request.query.context(IncompleteGrpcResultSnafu {
err_msg: "Missing field 'QueryRequest.query'",
@@ -91,7 +87,7 @@ impl GrpcQueryHandler for Instance {
}
}
}
Request::Ddl(_) | Request::Deletes(_) => {
Request::Ddl(_) => {
GrpcQueryHandler::do_query(self.grpc_query_handler.as_ref(), request, ctx.clone())
.await?
}

View File

@@ -15,11 +15,13 @@
use std::sync::Arc;
use api::v1::greptime_request::Request;
use api::v1::region::{region_request, QueryRequest, RegionResponse};
use api::v1::region::{region_request, QueryRequest};
use async_trait::async_trait;
use client::error::{HandleRequestSnafu, Result as ClientResult};
use client::region::check_response_header;
use client::region_handler::RegionRequestHandler;
use common_error::ext::BoxedError;
use common_meta::datanode_manager::AffectedRows;
use common_query::Output;
use common_recordbatch::SendableRecordBatchStream;
use datanode::error::Error as DatanodeError;
@@ -67,13 +69,17 @@ impl RegionRequestHandler for StandaloneRegionRequestHandler {
&self,
request: region_request::Body,
_ctx: QueryContextRef,
) -> ClientResult<RegionResponse> {
self.region_server
) -> ClientResult<AffectedRows> {
let response = self
.region_server
.handle(request)
.await
.context(InvokeRegionServerSnafu)
.map_err(BoxedError::new)
.context(HandleRequestSnafu)
.context(HandleRequestSnafu)?;
check_response_header(response.header)?;
Ok(response.affected_rows)
}
async fn do_get(&self, request: QueryRequest) -> ClientResult<SendableRecordBatchStream> {

View File

@@ -16,13 +16,15 @@
#![feature(trait_upcasting)]
pub mod catalog;
pub(crate) mod delete;
pub mod error;
pub mod expr_factory;
pub mod frontend;
pub mod heartbeat;
pub(crate) mod inserter;
pub(crate) mod insert;
pub mod instance;
pub(crate) mod metrics;
pub(crate) mod req_convert;
mod script;
mod server;
pub mod service_config;

View File

@@ -22,6 +22,7 @@ pub(crate) const METRIC_RUN_SCRIPT_ELAPSED: &str = "frontend.run_script_elapsed"
/// Metrics for creating table in dist mode.
pub const DIST_CREATE_TABLE: &str = "frontend.dist.create_table";
pub const DIST_INGEST_ROW_COUNT: &str = "frontend.dist.ingest_rows";
pub const DIST_DELETE_ROW_COUNT: &str = "frontend.dist.delete_rows";
/// The samples count of Prometheus remote write.
pub const PROM_STORE_REMOTE_WRITE_SAMPLES: &str = "frontend.prometheus.remote_write.samples";

View File

@@ -0,0 +1,17 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod common;
pub mod delete;
pub mod insert;

View File

@@ -12,34 +12,26 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use api::helper::ColumnDataTypeWrapper;
use api::v1::value::ValueData;
use api::v1::{
Column, ColumnDataType, ColumnSchema, InsertRequest, InsertRequests, Row, RowInsertRequest,
RowInsertRequests, Rows, Value,
};
use api::v1::{Column, ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value};
use common_base::BitVec;
use datatypes::prelude::ConcreteDataType;
use datatypes::vectors::VectorRef;
use snafu::prelude::*;
use snafu::ResultExt;
use table::metadata::TableInfo;
use crate::error::{ColumnDataTypeSnafu, InvalidInsertRequestSnafu, Result};
use crate::error::{
ColumnDataTypeSnafu, ColumnNotFoundSnafu, InvalidInsertRequestSnafu,
MissingTimeIndexColumnSnafu, Result,
};
pub struct ColumnToRow;
impl ColumnToRow {
pub fn convert(requests: InsertRequests) -> Result<RowInsertRequests> {
requests
.inserts
.into_iter()
.map(request_column_to_row)
.collect::<Result<Vec<_>>>()
.map(|inserts| RowInsertRequests { inserts })
}
}
fn request_column_to_row(request: InsertRequest) -> Result<RowInsertRequest> {
let row_count = request.row_count as usize;
let column_count = request.columns.len();
pub fn columns_to_rows(columns: Vec<Column>, row_count: u32) -> Result<Rows> {
let row_count = row_count as usize;
let column_count = columns.len();
let mut schema = Vec::with_capacity(column_count);
let mut rows = vec![
Row {
@@ -47,7 +39,7 @@ fn request_column_to_row(request: InsertRequest) -> Result<RowInsertRequest> {
};
row_count
];
for column in request.columns {
for column in columns {
let column_schema = ColumnSchema {
column_name: column.column_name.clone(),
datatype: column.datatype,
@@ -58,11 +50,7 @@ fn request_column_to_row(request: InsertRequest) -> Result<RowInsertRequest> {
push_column_to_rows(column, &mut rows)?;
}
Ok(RowInsertRequest {
table_name: request.table_name,
rows: Some(Rows { schema, rows }),
region_number: request.region_number,
})
Ok(Rows { schema, rows })
}
fn push_column_to_rows(column: Column, rows: &mut [Row]) -> Result<()> {
@@ -168,6 +156,74 @@ fn push_column_to_rows(column: Column, rows: &mut [Row]) -> Result<()> {
Ok(())
}
pub fn row_count(columns: &HashMap<String, VectorRef>) -> Result<usize> {
let mut columns_iter = columns.values();
let len = columns_iter
.next()
.map(|column| column.len())
.unwrap_or_default();
ensure!(
columns_iter.all(|column| column.len() == len),
InvalidInsertRequestSnafu {
reason: "The row count of columns is not the same."
}
);
Ok(len)
}
pub fn column_schema(
table_info: &TableInfo,
columns: &HashMap<String, VectorRef>,
) -> Result<Vec<ColumnSchema>> {
columns
.iter()
.map(|(column_name, vector)| {
Ok(ColumnSchema {
column_name: column_name.clone(),
datatype: data_type(vector.data_type())?.into(),
semantic_type: semantic_type(table_info, column_name)?.into(),
})
})
.collect::<Result<Vec<_>>>()
}
fn semantic_type(table_info: &TableInfo, column: &str) -> Result<SemanticType> {
let table_meta = &table_info.meta;
let table_schema = &table_meta.schema;
let time_index_column = &table_schema
.timestamp_column()
.with_context(|| table::error::MissingTimeIndexColumnSnafu {
table_name: table_info.name.to_string(),
})
.context(MissingTimeIndexColumnSnafu)?
.name;
let semantic_type = if column == time_index_column {
SemanticType::Timestamp
} else {
let column_index = table_schema.column_index_by_name(column);
let column_index = column_index.context(ColumnNotFoundSnafu {
msg: format!("unable to find column {column} in table schema"),
})?;
if table_meta.primary_key_indices.contains(&column_index) {
SemanticType::Tag
} else {
SemanticType::Field
}
};
Ok(semantic_type)
}
fn data_type(data_type: ConcreteDataType) -> Result<ColumnDataType> {
let datatype: ColumnDataTypeWrapper = data_type.try_into().context(ColumnDataTypeSnafu)?;
Ok(datatype.datatype())
}
#[cfg(test)]
mod tests {
use api::v1::column::Values;
@@ -178,43 +234,36 @@ mod tests {
#[test]
fn test_request_column_to_row() {
let insert_request = InsertRequest {
table_name: String::from("test_table"),
row_count: 3,
region_number: 1,
columns: vec![
Column {
column_name: String::from("col1"),
datatype: ColumnDataType::Int32.into(),
semantic_type: SemanticType::Field.into(),
null_mask: bitvec![u8, Lsb0; 1, 0, 1].into_vec(),
values: Some(Values {
i32_values: vec![42],
..Default::default()
}),
},
Column {
column_name: String::from("col2"),
datatype: ColumnDataType::String.into(),
semantic_type: SemanticType::Tag.into(),
null_mask: vec![],
values: Some(Values {
string_values: vec![
String::from("value1"),
String::from("value2"),
String::from("value3"),
],
..Default::default()
}),
},
],
};
let columns = vec![
Column {
column_name: String::from("col1"),
datatype: ColumnDataType::Int32.into(),
semantic_type: SemanticType::Field.into(),
null_mask: bitvec![u8, Lsb0; 1, 0, 1].into_vec(),
values: Some(Values {
i32_values: vec![42],
..Default::default()
}),
},
Column {
column_name: String::from("col2"),
datatype: ColumnDataType::String.into(),
semantic_type: SemanticType::Tag.into(),
null_mask: vec![],
values: Some(Values {
string_values: vec![
String::from("value1"),
String::from("value2"),
String::from("value3"),
],
..Default::default()
}),
},
];
let row_count = 3;
let result = request_column_to_row(insert_request);
let row_insert_request = result.unwrap();
assert_eq!(row_insert_request.table_name, "test_table");
assert_eq!(row_insert_request.region_number, 1);
let rows = row_insert_request.rows.unwrap();
let result = columns_to_rows(columns, row_count);
let rows = result.unwrap();
assert_eq!(rows.schema.len(), 2);
assert_eq!(rows.schema[0].column_name, "col1");
@@ -250,55 +299,46 @@ mod tests {
Some(ValueData::StringValue(String::from("value3")))
);
let invalid_request_with_wrong_type = InsertRequest {
table_name: String::from("test_table"),
row_count: 3,
region_number: 1,
columns: vec![Column {
column_name: String::from("col1"),
datatype: ColumnDataType::Int32.into(),
semantic_type: SemanticType::Field.into(),
null_mask: bitvec![u8, Lsb0; 1, 0, 1].into_vec(),
values: Some(Values {
i8_values: vec![42],
..Default::default()
}),
}],
};
assert!(request_column_to_row(invalid_request_with_wrong_type).is_err());
// wrong type
let columns = vec![Column {
column_name: String::from("col1"),
datatype: ColumnDataType::Int32.into(),
semantic_type: SemanticType::Field.into(),
null_mask: bitvec![u8, Lsb0; 1, 0, 1].into_vec(),
values: Some(Values {
i8_values: vec![42],
..Default::default()
}),
}];
let row_count = 3;
assert!(columns_to_rows(columns, row_count).is_err());
let invalid_request_with_wrong_row_count = InsertRequest {
table_name: String::from("test_table"),
row_count: 3,
region_number: 1,
columns: vec![Column {
column_name: String::from("col1"),
datatype: ColumnDataType::Int32.into(),
semantic_type: SemanticType::Field.into(),
null_mask: bitvec![u8, Lsb0; 0, 0, 1].into_vec(),
values: Some(Values {
i32_values: vec![42],
..Default::default()
}),
}],
};
assert!(request_column_to_row(invalid_request_with_wrong_row_count).is_err());
// wrong row count
let columns = vec![Column {
column_name: String::from("col1"),
datatype: ColumnDataType::Int32.into(),
semantic_type: SemanticType::Field.into(),
null_mask: bitvec![u8, Lsb0; 0, 0, 1].into_vec(),
values: Some(Values {
i32_values: vec![42],
..Default::default()
}),
}];
let row_count = 3;
assert!(columns_to_rows(columns, row_count).is_err());
let invalid_request_with_wrong_row_count = InsertRequest {
table_name: String::from("test_table"),
row_count: 3,
region_number: 1,
columns: vec![Column {
column_name: String::from("col1"),
datatype: ColumnDataType::Int32.into(),
semantic_type: SemanticType::Field.into(),
null_mask: vec![],
values: Some(Values {
i32_values: vec![42],
..Default::default()
}),
}],
};
assert!(request_column_to_row(invalid_request_with_wrong_row_count).is_err());
// wrong row count
let columns = vec![Column {
column_name: String::from("col1"),
datatype: ColumnDataType::Int32.into(),
semantic_type: SemanticType::Field.into(),
null_mask: vec![],
values: Some(Values {
i32_values: vec![42],
..Default::default()
}),
}];
let row_count = 3;
assert!(columns_to_rows(columns, row_count).is_err());
}
}

View File

@@ -0,0 +1,21 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod column_to_row;
mod row_to_region;
mod table_to_region;
pub use column_to_row::ColumnToRow;
pub use row_to_region::RowToRegion;
pub use table_to_region::TableToRegion;

View File

@@ -0,0 +1,40 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::{DeleteRequest, DeleteRequests, RowDeleteRequest, RowDeleteRequests};
use crate::error::Result;
use crate::req_convert::common::columns_to_rows;
pub struct ColumnToRow;
impl ColumnToRow {
pub fn convert(requests: DeleteRequests) -> Result<RowDeleteRequests> {
requests
.deletes
.into_iter()
.map(request_column_to_row)
.collect::<Result<Vec<_>>>()
.map(|deletes| RowDeleteRequests { deletes })
}
}
fn request_column_to_row(request: DeleteRequest) -> Result<RowDeleteRequest> {
let rows = columns_to_rows(request.key_columns, request.row_count)?;
Ok(RowDeleteRequest {
table_name: request.table_name,
rows: Some(rows),
region_number: request.region_number,
})
}

View File

@@ -0,0 +1,69 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::region::{
DeleteRequest as RegionDeleteRequest, DeleteRequests as RegionDeleteRequests,
};
use api::v1::RowDeleteRequests;
use catalog::CatalogManager;
use session::context::QueryContext;
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
use table::TableRef;
use crate::error::{CatalogSnafu, Result, TableNotFoundSnafu};
pub struct RowToRegion<'a> {
catalog_manager: &'a dyn CatalogManager,
ctx: &'a QueryContext,
}
impl<'a> RowToRegion<'a> {
pub fn new(catalog_manager: &'a dyn CatalogManager, ctx: &'a QueryContext) -> Self {
Self {
catalog_manager,
ctx,
}
}
pub async fn convert(&self, requests: RowDeleteRequests) -> Result<RegionDeleteRequests> {
let mut region_request = Vec::with_capacity(requests.deletes.len());
for request in requests.deletes {
let table = self.get_table(&request.table_name).await?;
let region_id = RegionId::new(table.table_info().table_id(), request.region_number);
let insert_request = RegionDeleteRequest {
region_id: region_id.into(),
rows: request.rows,
};
region_request.push(insert_request);
}
Ok(RegionDeleteRequests {
requests: region_request,
})
}
async fn get_table(&self, table_name: &str) -> Result<TableRef> {
let catalog_name = self.ctx.current_catalog();
let schema_name = self.ctx.current_schema();
self.catalog_manager
.table(catalog_name, schema_name, table_name)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: format!("{}.{}.{}", catalog_name, schema_name, table_name),
})
}
}

View File

@@ -0,0 +1,164 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::region::{
DeleteRequest as RegionDeleteRequest, DeleteRequests as RegionDeleteRequests,
};
use api::v1::Rows;
use store_api::storage::RegionId;
use table::metadata::TableInfo;
use table::requests::DeleteRequest as TableDeleteRequest;
use crate::error::Result;
use crate::req_convert::common::{column_schema, row_count};
pub struct TableToRegion<'a> {
table_info: &'a TableInfo,
}
impl<'a> TableToRegion<'a> {
pub fn new(table_info: &'a TableInfo) -> Self {
Self { table_info }
}
pub fn convert(&self, request: TableDeleteRequest) -> Result<RegionDeleteRequests> {
let region_id = RegionId::new(self.table_info.table_id(), 0).into();
let row_count = row_count(&request.key_column_values)?;
let schema = column_schema(self.table_info, &request.key_column_values)?;
let rows = api::helper::vectors_to_rows(request.key_column_values.values(), row_count);
Ok(RegionDeleteRequests {
requests: vec![RegionDeleteRequest {
region_id,
rows: Some(Rows { schema, rows }),
}],
})
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, SemanticType};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::schema::{ColumnSchema as DtColumnSchema, Schema};
use datatypes::vectors::{Int16VectorBuilder, MutableVector, StringVectorBuilder};
use table::metadata::{TableInfoBuilder, TableMetaBuilder};
use super::*;
#[test]
fn test_delete_request_table_to_region() {
let schema = Schema::new(vec![
DtColumnSchema::new("ts", ConcreteDataType::int64_datatype(), false)
.with_time_index(true),
DtColumnSchema::new("id", ConcreteDataType::int16_datatype(), false),
DtColumnSchema::new("host", ConcreteDataType::string_datatype(), false),
]);
let table_meta = TableMetaBuilder::default()
.schema(Arc::new(schema))
.primary_key_indices(vec![1, 2])
.next_column_id(3)
.build()
.unwrap();
let table_info = Arc::new(
TableInfoBuilder::default()
.name("demo")
.meta(table_meta)
.table_id(1)
.build()
.unwrap(),
);
let delete_request = mock_delete_request();
let mut request = TableToRegion::new(&table_info)
.convert(delete_request)
.unwrap();
assert_eq!(request.requests.len(), 1);
verify_region_insert_request(request.requests.pop().unwrap());
}
fn mock_delete_request() -> TableDeleteRequest {
let mut builder = StringVectorBuilder::with_capacity(3);
builder.push(Some("host1"));
builder.push(None);
builder.push(Some("host3"));
let host = builder.to_vector();
let mut builder = Int16VectorBuilder::with_capacity(3);
builder.push(Some(1_i16));
builder.push(Some(2_i16));
builder.push(Some(3_i16));
let id = builder.to_vector();
let key_column_values = HashMap::from([("host".to_string(), host), ("id".to_string(), id)]);
TableDeleteRequest {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "demo".to_string(),
key_column_values,
}
}
fn verify_region_insert_request(request: RegionDeleteRequest) {
assert_eq!(request.region_id, RegionId::new(1, 0).as_u64());
let rows = request.rows.unwrap();
for (i, column) in rows.schema.iter().enumerate() {
let name = &column.column_name;
if name == "id" {
assert_eq!(ColumnDataType::Int16 as i32, column.datatype);
assert_eq!(SemanticType::Tag as i32, column.semantic_type);
let values = rows
.rows
.iter()
.map(|row| row.values[i].value_data.clone())
.collect::<Vec<_>>();
assert_eq!(
vec![
Some(ValueData::I16Value(1)),
Some(ValueData::I16Value(2)),
Some(ValueData::I16Value(3))
],
values
);
}
if name == "host" {
assert_eq!(ColumnDataType::String as i32, column.datatype);
assert_eq!(SemanticType::Tag as i32, column.semantic_type);
let values = rows
.rows
.iter()
.map(|row| row.values[i].value_data.clone())
.collect::<Vec<_>>();
assert_eq!(
vec![
Some(ValueData::StringValue("host1".to_string())),
None,
Some(ValueData::StringValue("host3".to_string()))
],
values
);
}
}
}
}

View File

@@ -0,0 +1,40 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::{InsertRequest, InsertRequests, RowInsertRequest, RowInsertRequests};
use crate::error::Result;
use crate::req_convert::common::columns_to_rows;
pub struct ColumnToRow;
impl ColumnToRow {
pub fn convert(requests: InsertRequests) -> Result<RowInsertRequests> {
requests
.inserts
.into_iter()
.map(request_column_to_row)
.collect::<Result<Vec<_>>>()
.map(|inserts| RowInsertRequests { inserts })
}
}
fn request_column_to_row(request: InsertRequest) -> Result<RowInsertRequest> {
let rows = columns_to_rows(request.columns, request.row_count)?;
Ok(RowInsertRequest {
table_name: request.table_name,
rows: Some(rows),
region_number: request.region_number,
})
}

View File

@@ -12,20 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use api::v1::region::{
InsertRequest as RegionInsertRequest, InsertRequests as RegionInsertRequests,
};
use api::v1::{ColumnSchema, Rows};
use datatypes::vectors::VectorRef;
use snafu::prelude::*;
use api::v1::Rows;
use store_api::storage::RegionId;
use table::metadata::TableInfo;
use table::requests::InsertRequest as TableInsertRequest;
use super::{data_type, semantic_type};
use crate::error::{InvalidInsertRequestSnafu, Result};
use crate::error::Result;
use crate::req_convert::common::{column_schema, row_count};
pub struct TableToRegion<'a> {
table_info: &'a TableInfo,
@@ -50,41 +46,9 @@ impl<'a> TableToRegion<'a> {
}
}
fn row_count(columns: &HashMap<String, VectorRef>) -> Result<usize> {
let mut columns_iter = columns.values();
let len = columns_iter
.next()
.map(|column| column.len())
.unwrap_or_default();
ensure!(
columns_iter.all(|column| column.len() == len),
InvalidInsertRequestSnafu {
reason: "The row count of columns is not the same."
}
);
Ok(len)
}
fn column_schema(
table_info: &TableInfo,
columns: &HashMap<String, VectorRef>,
) -> Result<Vec<ColumnSchema>> {
columns
.iter()
.map(|(column_name, vector)| {
Ok(ColumnSchema {
column_name: column_name.clone(),
datatype: data_type(vector.data_type())?.into(),
semantic_type: semantic_type(table_info, column_name)?.into(),
})
})
.collect::<Result<Vec<_>>>()
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::value::ValueData;

View File

@@ -22,7 +22,6 @@ mod tql;
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
use api::v1::region::region_request;
use catalog::CatalogManagerRef;
@@ -41,19 +40,16 @@ use snafu::{OptionExt, ResultExt};
use sql::statements::copy::{CopyDatabaseArgument, CopyTable, CopyTableArgument};
use sql::statements::statement::Statement;
use table::engine::TableReference;
use table::error::TableOperationSnafu;
use table::requests::{
CopyDatabaseRequest, CopyDirection, CopyTableRequest, DeleteRequest, InsertRequest,
};
use table::TableRef;
use crate::catalog::FrontendCatalogManager;
use crate::error::{
self, CatalogSnafu, ExecLogicalPlanSnafu, ExecuteStatementSnafu, ExternalSnafu, InsertSnafu,
self, CatalogSnafu, ExecLogicalPlanSnafu, ExecuteStatementSnafu, ExternalSnafu,
PlanStatementSnafu, RequestDatanodeSnafu, Result, TableNotFoundSnafu,
};
use crate::inserter::req_convert::TableToRegion;
use crate::instance::distributed::deleter::DistDeleter;
use crate::req_convert::{delete, insert};
use crate::statement::backup::{COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY};
#[derive(Clone)]
@@ -184,55 +180,35 @@ impl StatementExecutor {
let table = self.get_table(&table_ref).await?;
let table_info = table.table_info();
let request = TableToRegion::new(&table_info).convert(request)?;
let region_response = self
let request = insert::TableToRegion::new(&table_info).convert(request)?;
let affected_rows = self
.region_request_handler
.handle(region_request::Body::Inserts(request), query_ctx)
.await
.context(RequestDatanodeSnafu)?;
Ok(region_response.affected_rows as _)
Ok(affected_rows as _)
}
// TODO(zhongzc): A middle state that eliminates calls to table.delete,
// For DistTable, its delete is not invoked; for MitoTable, it is still called but eventually eliminated.
async fn send_delete_request(&self, request: DeleteRequest) -> Result<usize> {
let frontend_catalog_manager = self
.catalog_manager
.as_any()
.downcast_ref::<FrontendCatalogManager>();
async fn handle_table_delete_request(
&self,
request: DeleteRequest,
query_ctx: QueryContextRef,
) -> Result<usize> {
let table_ref = TableReference::full(
&request.catalog_name,
&request.schema_name,
&request.table_name,
);
let table = self.get_table(&table_ref).await?;
let table_info = table.table_info();
let table_name = request.table_name.clone();
match frontend_catalog_manager {
Some(frontend_catalog_manager) => {
let inserter = DistDeleter::new(
request.catalog_name.clone(),
request.schema_name.clone(),
Arc::new(frontend_catalog_manager.clone()),
);
let affected_rows = inserter
.delete(vec![request])
.await
.map_err(BoxedError::new)
.context(TableOperationSnafu)
.context(InsertSnafu { table_name })?;
Ok(affected_rows)
}
None => {
let table_ref = TableReference::full(
&request.catalog_name,
&request.schema_name,
&request.table_name,
);
let affected_rows = self
.get_table(&table_ref)
.await?
.delete(request)
.await
.context(InsertSnafu { table_name })?;
Ok(affected_rows)
}
}
let request = delete::TableToRegion::new(&table_info).convert(request)?;
let affected_rows = self
.region_request_handler
.handle(region_request::Body::Deletes(request), query_ctx)
.await
.context(RequestDatanodeSnafu)?;
Ok(affected_rows as _)
}
}

View File

@@ -14,6 +14,7 @@
use std::collections::HashMap;
use api::v1::region::region_request;
use common_query::Output;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use datafusion_expr::{DmlStatement, LogicalPlan as DfLogicalPlan, WriteOp};
@@ -33,18 +34,24 @@ use table::TableRef;
use super::StatementExecutor;
use crate::error::{
BuildColumnVectorsSnafu, ExecLogicalPlanSnafu, ExecuteStatementSnafu,
MissingTimeIndexColumnSnafu, ReadRecordBatchSnafu, Result, UnexpectedSnafu,
BuildColumnVectorsSnafu, ExecLogicalPlanSnafu, MissingTimeIndexColumnSnafu,
ReadRecordBatchSnafu, RequestDatanodeSnafu, Result, UnexpectedSnafu,
};
use crate::req_convert::insert::StatementToRegion;
impl StatementExecutor {
pub async fn insert(&self, insert: Box<Insert>, query_ctx: QueryContextRef) -> Result<Output> {
if insert.can_extract_values() {
// Fast path: plain insert ("insert with literal values") is executed directly
self.sql_stmt_executor
.execute_sql(Statement::Insert(insert), query_ctx)
let request = StatementToRegion::new(self.catalog_manager.as_ref(), &query_ctx)
.convert(&insert)
.await?;
let affected_rows = self
.region_request_handler
.handle(region_request::Body::Inserts(request), query_ctx)
.await
.context(ExecuteStatementSnafu)
.context(RequestDatanodeSnafu)?;
Ok(Output::AffectedRows(affected_rows as _))
} else {
// Slow path: insert with subquery. Execute the subquery first, via query engine. Then
// insert the results by sending insert requests.
@@ -108,7 +115,9 @@ impl StatementExecutor {
while let Some(batch) = stream.next().await {
let record_batch = batch.context(ReadRecordBatchSnafu)?;
let delete_request = build_delete_request(record_batch, table.schema(), &table_info)?;
affected_rows += self.send_delete_request(delete_request).await?;
affected_rows += self
.handle_table_delete_request(delete_request, query_ctx.clone())
.await?;
}
Ok(Output::AffectedRows(affected_rows))

View File

@@ -24,9 +24,6 @@ use table::TableRef;
use crate::error::NotSupportedSnafu;
pub mod delete;
pub mod insert;
#[derive(Clone)]
pub struct DistTable;

View File

@@ -1,106 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::DeleteRequest as GrpcDeleteRequest;
use common_meta::table_name::TableName;
use store_api::storage::RegionNumber;
use table::metadata::TableMeta;
use table::requests::DeleteRequest;
use crate::error::Result;
use crate::table::insert::to_grpc_columns;
pub fn to_grpc_delete_request(
table_meta: &TableMeta,
table_name: &TableName,
region_number: RegionNumber,
request: DeleteRequest,
) -> Result<GrpcDeleteRequest> {
let (key_columns, row_count) = to_grpc_columns(table_meta, &request.key_column_values)?;
Ok(GrpcDeleteRequest {
table_name: table_name.table_name.clone(),
region_number,
key_columns,
row_count,
})
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::column::Values;
use api::v1::{Column, ColumnDataType, SemanticType};
use datatypes::prelude::{ConcreteDataType, VectorRef};
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::vectors::Int32Vector;
use table::metadata::TableMetaBuilder;
use super::*;
#[test]
fn test_to_grpc_delete_request() {
let schema = Schema::new(vec![
ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), false)
.with_time_index(true),
ColumnSchema::new("id", ConcreteDataType::int32_datatype(), false),
]);
let table_meta = TableMetaBuilder::default()
.schema(Arc::new(schema))
.primary_key_indices(vec![])
.next_column_id(2)
.build()
.unwrap();
let table_name = TableName {
catalog_name: "greptime".to_string(),
schema_name: "public".to_string(),
table_name: "foo".to_string(),
};
let region_number = 1;
let key_column_values = HashMap::from([(
"id".to_string(),
Arc::new(Int32Vector::from_slice(vec![1, 2, 3])) as VectorRef,
)]);
let request = DeleteRequest {
catalog_name: table_name.catalog_name.to_string(),
schema_name: table_name.schema_name.to_string(),
table_name: table_name.table_name.to_string(),
key_column_values,
};
let result =
to_grpc_delete_request(&table_meta, &table_name, region_number, request).unwrap();
assert_eq!(result.table_name, "foo");
assert_eq!(result.region_number, region_number);
assert_eq!(
result.key_columns,
vec![Column {
column_name: "id".to_string(),
semantic_type: SemanticType::Field as i32,
values: Some(Values {
i32_values: vec![1, 2, 3],
..Default::default()
}),
null_mask: vec![0],
datatype: ColumnDataType::Int32 as i32,
}]
);
assert_eq!(result.row_count, 3);
}
}

View File

@@ -1,168 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use api::helper::{push_vals, ColumnDataTypeWrapper};
use api::v1::column::Values;
use api::v1::{Column, SemanticType};
use datatypes::prelude::*;
use snafu::{ensure, OptionExt, ResultExt};
use table::metadata::TableMeta;
use crate::error::{self, ColumnDataTypeSnafu, NotSupportedSnafu, Result, VectorToGrpcColumnSnafu};
pub(crate) fn to_grpc_columns(
table_meta: &TableMeta,
columns_values: &HashMap<String, VectorRef>,
) -> Result<(Vec<Column>, u32)> {
let mut row_count = None;
let columns = columns_values
.iter()
.map(|(column_name, vector)| {
match row_count {
Some(rows) => ensure!(
rows == vector.len(),
error::InvalidInsertRequestSnafu {
reason: "The row count of columns is not the same."
}
),
None => row_count = Some(vector.len()),
}
let column = vector_to_grpc_column(table_meta, column_name, vector.clone())?;
Ok(column)
})
.collect::<Result<Vec<_>>>()?;
let row_count = row_count.unwrap_or(0) as u32;
Ok((columns, row_count))
}
fn vector_to_grpc_column(
table_meta: &TableMeta,
column_name: &str,
vector: VectorRef,
) -> Result<Column> {
let time_index_column = &table_meta
.schema
.timestamp_column()
.context(NotSupportedSnafu {
feat: "Table without time index.",
})?
.name;
let semantic_type = if column_name == time_index_column {
SemanticType::Timestamp
} else {
let column_index = table_meta
.schema
.column_index_by_name(column_name)
.context(VectorToGrpcColumnSnafu {
reason: format!("unable to find column {column_name} in table schema"),
})?;
if table_meta.primary_key_indices.contains(&column_index) {
SemanticType::Tag
} else {
SemanticType::Field
}
};
let datatype: ColumnDataTypeWrapper =
vector.data_type().try_into().context(ColumnDataTypeSnafu)?;
let mut column = Column {
column_name: column_name.to_string(),
semantic_type: semantic_type as i32,
null_mask: vec![],
datatype: datatype.datatype() as i32,
values: Some(Values::default()), // vector values will be pushed into it below
};
push_vals(&mut column, 0, vector);
Ok(column)
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use api::v1::ColumnDataType;
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::vectors::{Int32Vector, Int64Vector, StringVector};
use table::metadata::TableMetaBuilder;
use super::*;
#[test]
fn test_vector_to_grpc_column() {
let schema = Arc::new(Schema::new(vec![
ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), false)
.with_time_index(true),
ColumnSchema::new("k", ConcreteDataType::int32_datatype(), false),
ColumnSchema::new("v", ConcreteDataType::string_datatype(), true),
]));
let table_meta = TableMetaBuilder::default()
.schema(schema)
.primary_key_indices(vec![1])
.next_column_id(3)
.build()
.unwrap();
let column = vector_to_grpc_column(
&table_meta,
"ts",
Arc::new(Int64Vector::from_slice([1, 2, 3])),
)
.unwrap();
assert_eq!(column.column_name, "ts");
assert_eq!(column.semantic_type, SemanticType::Timestamp as i32);
assert_eq!(column.values.unwrap().i64_values, vec![1, 2, 3]);
assert_eq!(column.null_mask, vec![0]);
assert_eq!(column.datatype, ColumnDataType::Int64 as i32);
let column = vector_to_grpc_column(
&table_meta,
"k",
Arc::new(Int32Vector::from_slice([3, 2, 1])),
)
.unwrap();
assert_eq!(column.column_name, "k");
assert_eq!(column.semantic_type, SemanticType::Tag as i32);
assert_eq!(column.values.unwrap().i32_values, vec![3, 2, 1]);
assert_eq!(column.null_mask, vec![0]);
assert_eq!(column.datatype, ColumnDataType::Int32 as i32);
let column = vector_to_grpc_column(
&table_meta,
"v",
Arc::new(StringVector::from(vec![
Some("hello"),
None,
Some("greptime"),
])),
)
.unwrap();
assert_eq!(column.column_name, "v");
assert_eq!(column.semantic_type, SemanticType::Field as i32);
assert_eq!(
column.values.unwrap().string_values,
vec!["hello", "greptime"]
);
assert_eq!(column.null_mask, vec![2]);
assert_eq!(column.datatype, ColumnDataType::String as i32);
}
}

View File

@@ -21,7 +21,6 @@ pub mod metrics;
pub mod partition;
pub mod range;
pub mod route;
pub mod row_splitter;
pub mod splitter;
pub use crate::partition::{PartitionRule, PartitionRuleRef};

View File

@@ -15,7 +15,7 @@
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use api::v1::region::InsertRequest;
use api::v1::region::{DeleteRequest, InsertRequest};
use common_meta::peer::Peer;
use common_meta::rpc::router::TableRoute;
use common_query::prelude::Expr;
@@ -24,15 +24,13 @@ use datatypes::prelude::Value;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::{RegionId, RegionNumber};
use table::metadata::TableId;
use table::requests::DeleteRequest;
use crate::columns::RangeColumnsPartitionRule;
use crate::error::{FindLeaderSnafu, Result};
use crate::partition::{PartitionBound, PartitionDef, PartitionExpr};
use crate::range::RangePartitionRule;
use crate::route::TableRoutes;
use crate::row_splitter::{InsertRequestSplits, RowSplitter};
use crate::splitter::{DeleteRequestSplit, WriteSplitter};
use crate::splitter::{DeleteRequestSplits, InsertRequestSplits, RowSplitter};
use crate::{error, PartitionRuleRef};
#[async_trait::async_trait]
@@ -243,18 +241,18 @@ impl PartitionRuleManager {
req: InsertRequest,
) -> Result<InsertRequestSplits> {
let partition_rule = self.find_table_partition_rule(table).await?;
RowSplitter::new(partition_rule).split(req)
RowSplitter::new(partition_rule).split_insert(req)
}
/// Split [DeleteRequest] into [DeleteRequestSplits] according to the partition rule
/// of given table.
pub async fn split_delete_request(
&self,
table: TableId,
req: DeleteRequest,
primary_key_column_names: Vec<&String>,
) -> Result<DeleteRequestSplit> {
) -> Result<DeleteRequestSplits> {
let partition_rule = self.find_table_partition_rule(table).await?;
let splitter = WriteSplitter::with_partition_rule(partition_rule);
splitter.split_delete(req, primary_key_column_names)
RowSplitter::new(partition_rule).split_delete(req)
}
}

View File

@@ -1,322 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use api::helper;
use api::v1::region::InsertRequest;
use api::v1::{ColumnSchema, Row, Rows};
use datatypes::value::Value;
use store_api::storage::{RegionId, RegionNumber, TableId};
use crate::error::Result;
use crate::PartitionRuleRef;
pub type InsertRequestSplits = HashMap<RegionNumber, InsertRequest>;
pub struct RowSplitter {
partition_rule: PartitionRuleRef,
}
impl RowSplitter {
pub fn new(partition_rule: PartitionRuleRef) -> Self {
Self { partition_rule }
}
pub fn split(&self, req: InsertRequest) -> Result<InsertRequestSplits> {
// No partition
let partition_columns = self.partition_rule.partition_columns();
if partition_columns.is_empty() {
return Ok(HashMap::from([(0, req)]));
}
// No data
let Some(rows) = req.rows else {
return Ok(HashMap::new());
};
let table_id = RegionId::from_u64(req.region_id).table_id();
SplitReadRowHelper::new(table_id, rows, &self.partition_rule).split_to_requests()
}
}
struct SplitReadRowHelper<'a> {
table_id: TableId,
schema: Vec<ColumnSchema>,
rows: Vec<Row>,
partition_rule: &'a PartitionRuleRef,
// Map from partition column name to index in the schema/row.
partition_cols_indexes: Vec<Option<usize>>,
}
impl<'a> SplitReadRowHelper<'a> {
fn new(table_id: TableId, rows: Rows, partition_rule: &'a PartitionRuleRef) -> Self {
let col_name_to_idx = rows
.schema
.iter()
.enumerate()
.map(|(idx, col)| (&col.column_name, idx))
.collect::<HashMap<_, _>>();
let partition_cols = partition_rule.partition_columns();
let partition_cols_indexes = partition_cols
.into_iter()
.map(|col_name| col_name_to_idx.get(&col_name).cloned())
.collect::<Vec<_>>();
Self {
table_id,
schema: rows.schema,
rows: rows.rows,
partition_rule,
partition_cols_indexes,
}
}
fn split_to_requests(mut self) -> Result<InsertRequestSplits> {
let request_splits = self
.split_to_regions()?
.into_iter()
.map(|(region_number, row_indexes)| {
let rows = row_indexes
.into_iter()
.map(|row_idx| std::mem::take(&mut self.rows[row_idx]))
.collect();
let req = InsertRequest {
rows: Some(Rows {
schema: self.schema.clone(),
rows,
}),
region_id: RegionId::new(self.table_id, region_number).into(),
};
(region_number, req)
})
.collect::<HashMap<_, _>>();
Ok(request_splits)
}
fn split_to_regions(&self) -> Result<HashMap<RegionNumber, Vec<usize>>> {
let mut regions_row_indexes: HashMap<RegionNumber, Vec<usize>> = HashMap::new();
for (row_idx, values) in self.iter_partition_values().enumerate() {
let region_number = self.partition_rule.find_region(&values)?;
regions_row_indexes
.entry(region_number)
.or_default()
.push(row_idx);
}
Ok(regions_row_indexes)
}
fn iter_partition_values(&'a self) -> impl Iterator<Item = Vec<Value>> + 'a {
self.rows.iter().map(|row| {
self.partition_cols_indexes
.iter()
.map(|idx| {
idx.as_ref().map_or(Value::Null, |idx| {
helper::pb_value_to_value_ref(&row.values[*idx]).into()
})
})
.collect()
})
}
}
#[cfg(test)]
mod tests {
use std::any::Any;
use std::sync::Arc;
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, SemanticType};
use serde::{Deserialize, Serialize};
use super::*;
use crate::partition::PartitionExpr;
use crate::PartitionRule;
fn mock_insert_request() -> InsertRequest {
let schema = vec![
ColumnSchema {
column_name: "id".to_string(),
datatype: ColumnDataType::String as i32,
semantic_type: SemanticType::Tag as i32,
},
ColumnSchema {
column_name: "name".to_string(),
datatype: ColumnDataType::String as i32,
semantic_type: SemanticType::Tag as i32,
},
ColumnSchema {
column_name: "age".to_string(),
datatype: ColumnDataType::Uint32 as i32,
semantic_type: SemanticType::Field as i32,
},
];
let rows = vec![
Row {
values: vec![
ValueData::StringValue("1".to_string()).into(),
ValueData::StringValue("Smith".to_string()).into(),
ValueData::U32Value(20).into(),
],
},
Row {
values: vec![
ValueData::StringValue("2".to_string()).into(),
ValueData::StringValue("Johnson".to_string()).into(),
ValueData::U32Value(21).into(),
],
},
Row {
values: vec![
ValueData::StringValue("3".to_string()).into(),
ValueData::StringValue("Williams".to_string()).into(),
ValueData::U32Value(22).into(),
],
},
];
InsertRequest {
rows: Some(Rows { schema, rows }),
region_id: 0,
}
}
#[derive(Debug, Serialize, Deserialize)]
struct MockPartitionRule;
impl PartitionRule for MockPartitionRule {
fn as_any(&self) -> &dyn Any {
self
}
fn partition_columns(&self) -> Vec<String> {
vec!["id".to_string()]
}
fn find_region(&self, values: &[Value]) -> Result<RegionNumber> {
let val = values.get(0).unwrap().clone();
let val = match val {
Value::String(v) => v.as_utf8().to_string(),
_ => unreachable!(),
};
Ok(val.parse::<u32>().unwrap() % 2)
}
fn find_regions_by_exprs(&self, _: &[PartitionExpr]) -> Result<Vec<RegionNumber>> {
unimplemented!()
}
}
#[derive(Debug, Serialize, Deserialize)]
struct MockMissedColPartitionRule;
impl PartitionRule for MockMissedColPartitionRule {
fn as_any(&self) -> &dyn Any {
self
}
fn partition_columns(&self) -> Vec<String> {
vec!["missed_col".to_string()]
}
fn find_region(&self, values: &[Value]) -> Result<RegionNumber> {
let val = values.get(0).unwrap().clone();
let val = match val {
Value::Null => 1,
_ => 0,
};
Ok(val)
}
fn find_regions_by_exprs(&self, _: &[PartitionExpr]) -> Result<Vec<RegionNumber>> {
unimplemented!()
}
}
#[derive(Debug, Serialize, Deserialize)]
struct EmptyPartitionRule;
impl PartitionRule for EmptyPartitionRule {
fn as_any(&self) -> &dyn Any {
self
}
fn partition_columns(&self) -> Vec<String> {
vec![]
}
fn find_region(&self, _values: &[Value]) -> Result<RegionNumber> {
Ok(0)
}
fn find_regions_by_exprs(&self, _: &[PartitionExpr]) -> Result<Vec<RegionNumber>> {
unimplemented!()
}
}
#[test]
fn test_writer_splitter() {
let insert_request = mock_insert_request();
let rule = Arc::new(MockPartitionRule) as PartitionRuleRef;
let splitter = RowSplitter::new(rule);
let splits = splitter.split(insert_request).unwrap();
assert_eq!(splits.len(), 2);
let req0 = &splits[&0];
let req1 = &splits[&1];
assert_eq!(req0.region_id, 0);
assert_eq!(req1.region_id, 1);
let rows0 = req0.rows.as_ref().unwrap();
let rows1 = req1.rows.as_ref().unwrap();
assert_eq!(rows0.rows.len(), 1);
assert_eq!(rows1.rows.len(), 2);
}
#[test]
fn test_missed_col_writer_splitter() {
let insert_request = mock_insert_request();
let rule = Arc::new(MockMissedColPartitionRule) as PartitionRuleRef;
let splitter = RowSplitter::new(rule);
let splits = splitter.split(insert_request).unwrap();
assert_eq!(splits.len(), 1);
let req = &splits[&1];
assert_eq!(req.region_id, 1);
let rows = req.rows.as_ref().unwrap();
assert_eq!(rows.rows.len(), 3);
}
#[test]
fn test_empty_partition_rule_writer_splitter() {
let insert_request = mock_insert_request();
let rule = Arc::new(EmptyPartitionRule) as PartitionRuleRef;
let splitter = RowSplitter::new(rule);
let splits = splitter.split(insert_request).unwrap();
assert_eq!(splits.len(), 1);
let req = &splits[&0];
assert_eq!(req.region_id, 0);
let rows = req.rows.as_ref().unwrap();
assert_eq!(rows.rows.len(), 3);
}
}

View File

@@ -14,666 +14,219 @@
use std::collections::HashMap;
use datatypes::data_type::DataType;
use datatypes::prelude::MutableVector;
use datatypes::schema::Schema;
use api::helper;
use api::v1::region::{DeleteRequest, InsertRequest};
use api::v1::{ColumnSchema, Row, Rows};
use datatypes::value::Value;
use datatypes::vectors::VectorRef;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::RegionNumber;
use table::requests::{DeleteRequest, InsertRequest};
use store_api::storage::{RegionId, RegionNumber};
use crate::error::{
CreateDefaultToReadSnafu, FindPartitionColumnSnafu, FindRegionSnafu, InvalidDeleteRequestSnafu,
InvalidInsertRequestSnafu, MissingDefaultValueSnafu, Result,
};
use crate::error::Result;
use crate::PartitionRuleRef;
pub type InsertRequestSplit = HashMap<RegionNumber, InsertRequest>;
pub type DeleteRequestSplit = HashMap<RegionNumber, DeleteRequest>;
pub type InsertRequestSplits = HashMap<RegionNumber, InsertRequest>;
pub type DeleteRequestSplits = HashMap<RegionNumber, DeleteRequest>;
pub struct WriteSplitter {
pub struct RowSplitter {
partition_rule: PartitionRuleRef,
}
impl WriteSplitter {
pub fn with_partition_rule(rule: PartitionRuleRef) -> Self {
Self {
partition_rule: rule,
}
impl RowSplitter {
pub fn new(partition_rule: PartitionRuleRef) -> Self {
Self { partition_rule }
}
pub fn split_insert(
&self,
insert: InsertRequest,
schema: &Schema,
) -> Result<InsertRequestSplit> {
let row_nums = check_req(&insert)?;
let mut insert = insert;
let partition_columns = self.partition_rule.partition_columns();
if partition_columns.is_empty() {
// If no partition column, all rows are inserted into the first region.
let mut split = HashMap::new();
split.insert(0, insert);
return Ok(split);
}
let missing_columns = schema
.column_schemas()
.iter()
.filter(|schema| {
partition_columns.contains(&schema.name)
&& !insert.columns_values.contains_key(&schema.name)
})
.collect::<Vec<_>>();
for column_schema in missing_columns {
let default_values = column_schema
.create_default_vector(row_nums)
.context(CreateDefaultToReadSnafu {
column: &column_schema.name,
})?
.context(MissingDefaultValueSnafu {
column: &column_schema.name,
})?;
let _ = insert
.columns_values
.insert(column_schema.name.clone(), default_values);
}
let partition_columns =
find_partitioning_values(&insert.columns_values, &partition_columns)?;
let region_map = self.split_partitioning_values(&partition_columns)?;
Ok(split_insert_request(&insert, region_map))
}
pub fn split_delete(
&self,
request: DeleteRequest,
key_column_names: Vec<&String>,
) -> Result<DeleteRequestSplit> {
Self::validate_delete_request(&request)?;
let partition_columns = self.partition_rule.partition_columns();
if partition_columns.is_empty() {
// If no partition column, all requests are sent to the first region.
let mut split = HashMap::new();
split.insert(0, request);
return Ok(split);
}
let values = find_partitioning_values(&request.key_column_values, &partition_columns)?;
let regional_value_indexes = self.split_partitioning_values(&values)?;
let requests = regional_value_indexes
pub fn split_insert(&self, req: InsertRequest) -> Result<InsertRequestSplits> {
let table_id = RegionId::from_u64(req.region_id).table_id();
Ok(self
.split(req.rows)?
.into_iter()
.map(|(region_id, value_indexes)| {
let key_column_values = request
.key_column_values
.iter()
.filter(|(column_name, _)| key_column_names.contains(column_name))
.map(|(column_name, vector)| {
let mut builder = vector
.data_type()
.create_mutable_vector(value_indexes.len());
value_indexes.iter().for_each(|&index| {
builder.push_value_ref(vector.get(index).as_value_ref());
});
(column_name.to_string(), builder.to_vector())
})
.collect();
(
region_id,
DeleteRequest {
catalog_name: request.catalog_name.clone(),
schema_name: request.schema_name.clone(),
table_name: request.table_name.clone(),
key_column_values,
},
)
.map(|(region_number, rows)| {
let region_id = RegionId::new(table_id, region_number);
let req = InsertRequest {
rows: Some(rows),
region_id: region_id.into(),
};
(region_number, req)
})
.collect();
Ok(requests)
.collect())
}
fn validate_delete_request(request: &DeleteRequest) -> Result<()> {
let rows = request
.key_column_values
.values()
.next()
.map(|x| x.len())
.context(InvalidDeleteRequestSnafu {
reason: "no key column values",
})?;
ensure!(
rows > 0,
InvalidDeleteRequestSnafu {
reason: "no rows in delete request"
}
);
ensure!(
request
.key_column_values
.values()
.map(|x| x.len())
.all(|x| x == rows),
InvalidDeleteRequestSnafu {
reason: "the lengths of key column values are not the same"
}
);
Ok(())
pub fn split_delete(&self, req: DeleteRequest) -> Result<DeleteRequestSplits> {
let table_id = RegionId::from_u64(req.region_id).table_id();
Ok(self
.split(req.rows)?
.into_iter()
.map(|(region_number, rows)| {
let region_id = RegionId::new(table_id, region_number);
let req = DeleteRequest {
rows: Some(rows),
region_id: region_id.into(),
};
(region_number, req)
})
.collect())
}
fn split_partitioning_values(
&self,
values: &[VectorRef],
) -> Result<HashMap<RegionNumber, Vec<usize>>> {
if values.is_empty() {
return Ok(HashMap::default());
fn split(&self, rows: Option<Rows>) -> Result<HashMap<RegionNumber, Rows>> {
// No data
let Some(rows) = rows else {
return Ok(HashMap::new());
};
if rows.rows.is_empty() {
return Ok(HashMap::new());
}
let mut region_map: HashMap<RegionNumber, Vec<usize>> = HashMap::new();
let row_count = values[0].len();
for idx in 0..row_count {
let region_id = match self
.partition_rule
.find_region(&partition_values(values, idx))
{
Ok(region_id) => region_id,
Err(e) => {
let reason = format!("{e:?}");
return FindRegionSnafu { reason }.fail();
}
};
region_map.entry(region_id).or_default().push(idx);
// No partition
let partition_columns = self.partition_rule.partition_columns();
if partition_columns.is_empty() {
return Ok(HashMap::from([(0, rows)]));
}
Ok(region_map)
let splitter = SplitReadRowHelper::new(rows, &self.partition_rule);
splitter.split_rows()
}
}
fn check_req(insert: &InsertRequest) -> Result<usize> {
let mut len: Option<usize> = None;
for vector in insert.columns_values.values() {
match len {
Some(len) => ensure!(
len == vector.len(),
InvalidInsertRequestSnafu {
reason: "the lengths of vectors are not the same"
}
),
None => len = Some(vector.len()),
}
}
let len = len.context(InvalidInsertRequestSnafu {
reason: "The columns in the insert statement are empty.",
})?;
Ok(len)
struct SplitReadRowHelper<'a> {
schema: Vec<ColumnSchema>,
rows: Vec<Row>,
partition_rule: &'a PartitionRuleRef,
// Map from partition column name to index in the schema/row.
partition_cols_indexes: Vec<Option<usize>>,
}
fn find_partitioning_values(
values: &HashMap<String, VectorRef>,
partition_columns: &[String],
) -> Result<Vec<VectorRef>> {
partition_columns
.iter()
.map(|column_name| {
values
.get(column_name)
.cloned()
.context(FindPartitionColumnSnafu { column_name })
impl<'a> SplitReadRowHelper<'a> {
fn new(rows: Rows, partition_rule: &'a PartitionRuleRef) -> Self {
let col_name_to_idx = rows
.schema
.iter()
.enumerate()
.map(|(idx, col)| (&col.column_name, idx))
.collect::<HashMap<_, _>>();
let partition_cols = partition_rule.partition_columns();
let partition_cols_indexes = partition_cols
.into_iter()
.map(|col_name| col_name_to_idx.get(&col_name).cloned())
.collect::<Vec<_>>();
Self {
schema: rows.schema,
rows: rows.rows,
partition_rule,
partition_cols_indexes,
}
}
fn split_rows(mut self) -> Result<HashMap<RegionNumber, Rows>> {
let request_splits = self
.split_to_regions()?
.into_iter()
.map(|(region_number, row_indexes)| {
let rows = row_indexes
.into_iter()
.map(|row_idx| std::mem::take(&mut self.rows[row_idx]))
.collect();
let rows = Rows {
schema: self.schema.clone(),
rows,
};
(region_number, rows)
})
.collect::<HashMap<_, _>>();
Ok(request_splits)
}
fn split_to_regions(&self) -> Result<HashMap<RegionNumber, Vec<usize>>> {
let mut regions_row_indexes: HashMap<RegionNumber, Vec<usize>> = HashMap::new();
for (row_idx, values) in self.iter_partition_values().enumerate() {
let region_number = self.partition_rule.find_region(&values)?;
regions_row_indexes
.entry(region_number)
.or_default()
.push(row_idx);
}
Ok(regions_row_indexes)
}
fn iter_partition_values(&'a self) -> impl Iterator<Item = Vec<Value>> + 'a {
self.rows.iter().map(|row| {
self.partition_cols_indexes
.iter()
.map(|idx| {
idx.as_ref().map_or(Value::Null, |idx| {
helper::pb_value_to_value_ref(&row.values[*idx]).into()
})
})
.collect()
})
.collect()
}
fn partition_values(partition_columns: &[VectorRef], idx: usize) -> Vec<Value> {
partition_columns
.iter()
.map(|column| column.get(idx))
.collect()
}
fn split_insert_request(
insert: &InsertRequest,
region_map: HashMap<RegionNumber, Vec<usize>>,
) -> InsertRequestSplit {
let mut dist_insert: HashMap<RegionNumber, HashMap<&str, Box<dyn MutableVector>>> =
HashMap::with_capacity(region_map.len());
let row_num = insert
.columns_values
.values()
.next()
.map(|v| v.len())
.unwrap_or(0);
let column_count = insert.columns_values.len();
for (column_name, vector) in &insert.columns_values {
for (region_id, val_idxs) in &region_map {
let region_insert = dist_insert
.entry(*region_id)
.or_insert_with(|| HashMap::with_capacity(column_count));
let builder = region_insert
.entry(column_name)
.or_insert_with(|| vector.data_type().create_mutable_vector(row_num));
val_idxs.iter().for_each(|idx| {
// Safety: MutableVector is built according to column data type.
builder.push_value_ref(vector.get(*idx).as_value_ref());
});
}
}
let catalog_name = &insert.catalog_name;
let schema_name = &insert.schema_name;
let table_name = &insert.table_name;
dist_insert
.into_iter()
.map(|(region_number, vector_map)| {
let columns_values = vector_map
.into_iter()
.map(|(column_name, mut builder)| (column_name.to_string(), builder.to_vector()))
.collect();
(
region_number,
InsertRequest {
catalog_name: catalog_name.to_string(),
schema_name: schema_name.to_string(),
table_name: table_name.to_string(),
columns_values,
region_number,
},
)
})
.collect()
}
#[cfg(test)]
mod tests {
use std::any::Any;
use std::collections::HashMap;
use std::result::Result;
use std::sync::Arc;
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::ScalarVectorBuilder;
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, Schema as DataTypesSchema};
use datatypes::types::{BooleanType, Int16Type, StringType};
use datatypes::value::Value;
use datatypes::vectors::{
BooleanVectorBuilder, Int16VectorBuilder, MutableVector, StringVector, StringVectorBuilder,
Vector,
};
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, SemanticType};
use serde::{Deserialize, Serialize};
use store_api::storage::RegionNumber;
use table::requests::InsertRequest;
use super::*;
use crate::error::Error;
use crate::partition::{PartitionExpr, PartitionRule};
use crate::PartitionRuleRef;
#[test]
fn test_insert_req_check() {
let right = mock_insert_request();
let ret = check_req(&right);
assert_eq!(ret.unwrap(), 3);
let wrong = mock_wrong_insert_request();
let ret = check_req(&wrong);
assert!(ret.is_err());
}
fn assert_columns(columns: &HashMap<String, Arc<dyn Vector>>, expected: &[(&str, &[Value])]) {
for (col_name, values) in expected {
for (idx, value) in values.iter().enumerate() {
assert_eq!(*value, columns.get(*col_name).unwrap().get(idx));
}
}
}
#[test]
fn test_writer_spliter() {
let insert = mock_insert_request();
let rule = Arc::new(MockPartitionRule) as PartitionRuleRef;
let spliter = WriteSplitter::with_partition_rule(rule);
let mock_schema = DataTypesSchema::new(vec![
ColumnSchema::new(
"enable_reboot",
ConcreteDataType::Boolean(BooleanType),
false,
),
ColumnSchema::new("id", ConcreteDataType::Int16(Int16Type {}), false),
ColumnSchema::new("host", ConcreteDataType::String(StringType), true),
]);
let ret = spliter.split_insert(insert, &mock_schema).unwrap();
assert_eq!(2, ret.len());
let r1_insert = ret.get(&0).unwrap();
let r2_insert = ret.get(&1).unwrap();
assert_eq!("demo", r1_insert.table_name);
assert_eq!("demo", r2_insert.table_name);
let r1_columns = &r1_insert.columns_values;
assert_eq!(3, r1_columns.len());
assert_columns(
r1_columns,
&[
("id", &[Value::from(1_i16)]),
("host", &[Value::from("host1")]),
("enable_reboot", &[Value::from(true)]),
],
);
let r2_columns = &r2_insert.columns_values;
assert_eq!(3, r2_columns.len());
assert_columns(
r2_columns,
&[
("id", &[Value::from(2_i16), Value::from(3_i16)]),
("host", &[Value::Null, Value::from("host3")]),
("enable_reboot", &[Value::from(false), Value::from(true)]),
],
);
}
#[test]
fn test_writer_spliter_with_id_partition_columns() {
let (mock_schema, insert) = mock_schema_and_insert_request_without_partition_columns();
let rule = Arc::new(MockPartitionRule) as PartitionRuleRef;
let spliter = WriteSplitter::with_partition_rule(rule);
let ret = spliter.split_insert(insert, &mock_schema).unwrap();
assert_eq!(1, ret.len());
let r1_insert = ret.get(&0).unwrap();
assert_eq!("demo", r1_insert.table_name);
let r1_columns = &r1_insert.columns_values;
assert_eq!(3, r1_columns.len());
assert_columns(
r1_columns,
&[
(
"id",
&[Value::from(1_i16), Value::from(1_i16), Value::from(1_i16)],
),
(
"host",
&[Value::from("host1"), Value::Null, Value::from("host3")],
),
(
"enable_reboot",
&[Value::from(true), Value::from(false), Value::from(true)],
),
],
);
}
#[test]
fn test_writer_spliter_without_partition_columns() {
let (mock_schema, insert) = mock_schema_and_insert_request_without_partition_columns();
let rule = Arc::new(EmptyPartitionRule) as PartitionRuleRef;
let spliter = WriteSplitter::with_partition_rule(rule);
let ret = spliter.split_insert(insert, &mock_schema).unwrap();
assert_eq!(1, ret.len());
let r1_insert = ret.get(&0).unwrap();
assert_eq!("demo", r1_insert.table_name);
let r1_columns = &r1_insert.columns_values;
assert_eq!(2, r1_columns.len());
assert_columns(
r1_columns,
&[
(
"host",
&[Value::from("host1"), Value::Null, Value::from("host3")],
),
(
"enable_reboot",
&[Value::from(true), Value::from(false), Value::from(true)],
),
],
);
}
#[test]
fn test_delete_spliter_without_partition_columns() {
let mut key_column_values = HashMap::new();
key_column_values.insert(
"host".to_string(),
Arc::new(StringVector::from(vec!["localhost"])) as _,
);
let delete = DeleteRequest {
catalog_name: "foo_catalog".to_string(),
schema_name: "foo_schema".to_string(),
table_name: "foo_table".to_string(),
key_column_values,
};
let rule = Arc::new(EmptyPartitionRule) as PartitionRuleRef;
let spliter = WriteSplitter::with_partition_rule(rule);
let ret = spliter
.split_delete(delete, vec![&String::from("host")])
.unwrap();
assert_eq!(1, ret.len());
}
#[test]
fn test_partition_insert_request() {
let insert = mock_insert_request();
let region_map = HashMap::from([(1, vec![2, 0]), (2, vec![1])]);
let dist_insert = split_insert_request(&insert, region_map);
let r1_insert = dist_insert.get(&1_u32).unwrap();
assert_eq!("demo", r1_insert.table_name);
let expected: Value = 3_i16.into();
assert_eq!(expected, r1_insert.columns_values.get("id").unwrap().get(0));
let expected: Value = 1_i16.into();
assert_eq!(expected, r1_insert.columns_values.get("id").unwrap().get(1));
let expected: Value = "host3".into();
assert_eq!(
expected,
r1_insert.columns_values.get("host").unwrap().get(0)
);
let expected: Value = "host1".into();
assert_eq!(
expected,
r1_insert.columns_values.get("host").unwrap().get(1)
);
let expected: Value = true.into();
assert_eq!(
expected,
r1_insert
.columns_values
.get("enable_reboot")
.unwrap()
.get(0)
);
let expected: Value = true.into();
assert_eq!(
expected,
r1_insert
.columns_values
.get("enable_reboot")
.unwrap()
.get(1)
);
let r2_insert = dist_insert.get(&2_u32).unwrap();
assert_eq!("demo", r2_insert.table_name);
let expected: Value = 2_i16.into();
assert_eq!(expected, r2_insert.columns_values.get("id").unwrap().get(0));
assert_eq!(
Value::Null,
r2_insert.columns_values.get("host").unwrap().get(0)
);
let expected: Value = false.into();
assert_eq!(
expected,
r2_insert
.columns_values
.get("enable_reboot")
.unwrap()
.get(0)
);
}
#[test]
fn test_partition_columns() {
let insert = mock_insert_request();
let partition_column_names = vec!["host".to_string(), "id".to_string()];
let columns =
find_partitioning_values(&insert.columns_values, &partition_column_names).unwrap();
let host_column = columns[0].clone();
assert_eq!(
ConcreteDataType::String(StringType),
host_column.data_type()
);
assert_eq!(1, host_column.null_count());
let id_column = columns[1].clone();
assert_eq!(ConcreteDataType::int16_datatype(), id_column.data_type());
assert_eq!(0, id_column.null_count());
}
#[test]
fn test_partition_values() {
let mut builder = BooleanVectorBuilder::with_capacity(3);
builder.push(Some(true));
builder.push(Some(false));
builder.push(Some(true));
let v1 = builder.to_vector();
let mut builder = StringVectorBuilder::with_capacity(3);
builder.push(Some("host1"));
builder.push(None);
builder.push(Some("host3"));
let v2 = builder.to_vector();
let vectors = vec![v1, v2];
let row_0_vals = partition_values(&vectors, 0);
let expected: Vec<Value> = vec![true.into(), "host1".into()];
assert_eq!(expected, row_0_vals);
let row_1_vals = partition_values(&vectors, 1);
let expected: Vec<Value> = vec![false.into(), Value::Null];
assert_eq!(expected, row_1_vals);
let row_2_vals = partition_values(&vectors, 2);
let expected: Vec<Value> = vec![true.into(), "host3".into()];
assert_eq!(expected, row_2_vals);
}
use crate::partition::PartitionExpr;
use crate::PartitionRule;
fn mock_insert_request() -> InsertRequest {
let mut columns_values = HashMap::with_capacity(4);
let mut builder = BooleanVectorBuilder::with_capacity(3);
builder.push(Some(true));
builder.push(Some(false));
builder.push(Some(true));
let _ = columns_values.insert("enable_reboot".to_string(), builder.to_vector());
let mut builder = StringVectorBuilder::with_capacity(3);
builder.push(Some("host1"));
builder.push(None);
builder.push(Some("host3"));
let _ = columns_values.insert("host".to_string(), builder.to_vector());
let mut builder = Int16VectorBuilder::with_capacity(3);
builder.push(Some(1_i16));
builder.push(Some(2_i16));
builder.push(Some(3_i16));
let _ = columns_values.insert("id".to_string(), builder.to_vector());
let schema = vec![
ColumnSchema {
column_name: "id".to_string(),
datatype: ColumnDataType::String as i32,
semantic_type: SemanticType::Tag as i32,
},
ColumnSchema {
column_name: "name".to_string(),
datatype: ColumnDataType::String as i32,
semantic_type: SemanticType::Tag as i32,
},
ColumnSchema {
column_name: "age".to_string(),
datatype: ColumnDataType::Uint32 as i32,
semantic_type: SemanticType::Field as i32,
},
];
let rows = vec![
Row {
values: vec![
ValueData::StringValue("1".to_string()).into(),
ValueData::StringValue("Smith".to_string()).into(),
ValueData::U32Value(20).into(),
],
},
Row {
values: vec![
ValueData::StringValue("2".to_string()).into(),
ValueData::StringValue("Johnson".to_string()).into(),
ValueData::U32Value(21).into(),
],
},
Row {
values: vec![
ValueData::StringValue("3".to_string()).into(),
ValueData::StringValue("Williams".to_string()).into(),
ValueData::U32Value(22).into(),
],
},
];
InsertRequest {
catalog_name: common_catalog::consts::DEFAULT_CATALOG_NAME.to_string(),
schema_name: common_catalog::consts::DEFAULT_SCHEMA_NAME.to_string(),
table_name: "demo".to_string(),
columns_values,
region_number: 0,
}
}
fn mock_schema_and_insert_request_without_partition_columns() -> (Schema, InsertRequest) {
let mut columns_values = HashMap::with_capacity(4);
let mut builder = BooleanVectorBuilder::with_capacity(3);
builder.push(Some(true));
builder.push(Some(false));
builder.push(Some(true));
let _ = columns_values.insert("enable_reboot".to_string(), builder.to_vector());
let mut builder = StringVectorBuilder::with_capacity(3);
builder.push(Some("host1"));
builder.push(None);
builder.push(Some("host3"));
let _ = columns_values.insert("host".to_string(), builder.to_vector());
let insert_request = InsertRequest {
catalog_name: common_catalog::consts::DEFAULT_CATALOG_NAME.to_string(),
schema_name: common_catalog::consts::DEFAULT_SCHEMA_NAME.to_string(),
table_name: "demo".to_string(),
columns_values,
region_number: 0,
};
let id_column = ColumnSchema::new("id", ConcreteDataType::Int16(Int16Type {}), false);
let id_column = id_column
.with_default_constraint(Some(ColumnDefaultConstraint::Value(Value::from(1_i16))))
.unwrap();
let mock_schema = DataTypesSchema::new(vec![
ColumnSchema::new(
"enable_reboot",
ConcreteDataType::Boolean(BooleanType),
false,
),
id_column,
ColumnSchema::new("host", ConcreteDataType::String(StringType), true),
]);
(mock_schema, insert_request)
}
fn mock_wrong_insert_request() -> InsertRequest {
let mut columns_values = HashMap::with_capacity(4);
let mut builder = BooleanVectorBuilder::with_capacity(3);
builder.push(Some(true));
builder.push(Some(false));
builder.push(Some(true));
let _ = columns_values.insert("enable_reboot".to_string(), builder.to_vector());
let mut builder = StringVectorBuilder::with_capacity(3);
builder.push(Some("host1"));
builder.push(None);
builder.push(Some("host3"));
let _ = columns_values.insert("host".to_string(), builder.to_vector());
let mut builder = Int16VectorBuilder::with_capacity(1);
builder.push(Some(1_i16));
// two values are missing
let _ = columns_values.insert("id".to_string(), builder.to_vector());
InsertRequest {
catalog_name: common_catalog::consts::DEFAULT_CATALOG_NAME.to_string(),
schema_name: common_catalog::consts::DEFAULT_SCHEMA_NAME.to_string(),
table_name: "demo".to_string(),
columns_values,
region_number: 0,
rows: Some(Rows { schema, rows }),
region_id: 0,
}
}
#[derive(Debug, Serialize, Deserialize)]
struct MockPartitionRule;
// PARTITION BY LIST COLUMNS(id) (
// PARTITION r0 VALUES IN(1),
// PARTITION r1 VALUES IN(2, 3),
// );
impl PartitionRule for MockPartitionRule {
fn as_any(&self) -> &dyn Any {
self
@@ -683,21 +236,44 @@ mod tests {
vec!["id".to_string()]
}
fn find_region(&self, values: &[Value]) -> Result<RegionNumber, Error> {
fn find_region(&self, values: &[Value]) -> Result<RegionNumber> {
let val = values.get(0).unwrap().clone();
let id_1: Value = 1_i16.into();
let id_2: Value = 2_i16.into();
let id_3: Value = 3_i16.into();
if val == id_1 {
return Ok(0);
}
if val == id_2 || val == id_3 {
return Ok(1);
}
unreachable!()
let val = match val {
Value::String(v) => v.as_utf8().to_string(),
_ => unreachable!(),
};
Ok(val.parse::<u32>().unwrap() % 2)
}
fn find_regions_by_exprs(&self, _: &[PartitionExpr]) -> Result<Vec<RegionNumber>, Error> {
fn find_regions_by_exprs(&self, _: &[PartitionExpr]) -> Result<Vec<RegionNumber>> {
unimplemented!()
}
}
#[derive(Debug, Serialize, Deserialize)]
struct MockMissedColPartitionRule;
impl PartitionRule for MockMissedColPartitionRule {
fn as_any(&self) -> &dyn Any {
self
}
fn partition_columns(&self) -> Vec<String> {
vec!["missed_col".to_string()]
}
fn find_region(&self, values: &[Value]) -> Result<RegionNumber> {
let val = values.get(0).unwrap().clone();
let val = match val {
Value::Null => 1,
_ => 0,
};
Ok(val)
}
fn find_regions_by_exprs(&self, _: &[PartitionExpr]) -> Result<Vec<RegionNumber>> {
unimplemented!()
}
}
@@ -705,9 +281,6 @@ mod tests {
#[derive(Debug, Serialize, Deserialize)]
struct EmptyPartitionRule;
// PARTITION BY LIST COLUMNS() (
// PARTITION r0 VALUES LESS THAN MAXVALUE,
// );
impl PartitionRule for EmptyPartitionRule {
fn as_any(&self) -> &dyn Any {
self
@@ -717,12 +290,64 @@ mod tests {
vec![]
}
fn find_region(&self, _: &[Value]) -> Result<RegionNumber, Error> {
fn find_region(&self, _values: &[Value]) -> Result<RegionNumber> {
Ok(0)
}
fn find_regions_by_exprs(&self, _: &[PartitionExpr]) -> Result<Vec<RegionNumber>, Error> {
fn find_regions_by_exprs(&self, _: &[PartitionExpr]) -> Result<Vec<RegionNumber>> {
unimplemented!()
}
}
#[test]
fn test_writer_splitter() {
let insert_request = mock_insert_request();
let rule = Arc::new(MockPartitionRule) as PartitionRuleRef;
let splitter = RowSplitter::new(rule);
let splits = splitter.split_insert(insert_request).unwrap();
assert_eq!(splits.len(), 2);
let req0 = &splits[&0];
let req1 = &splits[&1];
assert_eq!(req0.region_id, 0);
assert_eq!(req1.region_id, 1);
let rows0 = req0.rows.as_ref().unwrap();
let rows1 = req1.rows.as_ref().unwrap();
assert_eq!(rows0.rows.len(), 1);
assert_eq!(rows1.rows.len(), 2);
}
#[test]
fn test_missed_col_writer_splitter() {
let insert_request = mock_insert_request();
let rule = Arc::new(MockMissedColPartitionRule) as PartitionRuleRef;
let splitter = RowSplitter::new(rule);
let splits = splitter.split_insert(insert_request).unwrap();
assert_eq!(splits.len(), 1);
let req = &splits[&1];
assert_eq!(req.region_id, 1);
let rows = req.rows.as_ref().unwrap();
assert_eq!(rows.rows.len(), 3);
}
#[test]
fn test_empty_partition_rule_writer_splitter() {
let insert_request = mock_insert_request();
let rule = Arc::new(EmptyPartitionRule) as PartitionRuleRef;
let splitter = RowSplitter::new(rule);
let splits = splitter.split_insert(insert_request).unwrap();
assert_eq!(splits.len(), 1);
let req = &splits[&0];
assert_eq!(req.region_id, 0);
let rows = req.rows.as_ref().unwrap();
assert_eq!(rows.rows.len(), 3);
}
}