feat: consolidate Insert request related partitioning and distributed processing operations into Inserter (#2346)

* refactor: RegionRequest as param of RegionRequestHandler.handle

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* feat: partition insert & delete reqs for both standalone and distributed mode

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: nit change

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: wrong function nameg

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* feat: do request in inserter & deleter

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* feat: remove RegionRequestHandler.handle

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* refactor: rename table_creator

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: nit change

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* refactor: address comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: nit change

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
Zhenchi
2023-09-12 12:01:39 +08:00
committed by Ruihang Xia
parent fe954b78a2
commit 4af126eb1b
31 changed files with 893 additions and 1267 deletions

View File

@@ -14,22 +14,14 @@
use std::sync::Arc;
use api::v1::region::{region_request, QueryRequest};
use api::v1::region::QueryRequest;
use async_trait::async_trait;
use common_meta::datanode_manager::AffectedRows;
use common_recordbatch::SendableRecordBatchStream;
use session::context::QueryContextRef;
use crate::error::Result;
#[async_trait]
pub trait RegionRequestHandler: Send + Sync {
async fn handle(
&self,
request: region_request::Body,
ctx: QueryContextRef,
) -> Result<AffectedRows>;
// TODO(ruihang): add trace id and span id in the request.
async fn do_get(&self, request: QueryRequest) -> Result<SendableRecordBatchStream>;
}

View File

@@ -49,7 +49,7 @@ pub struct DdlManager {
datanode_manager: DatanodeManagerRef,
cache_invalidator: CacheInvalidatorRef,
table_metadata_manager: TableMetadataManagerRef,
table_creator: TableMetadataAllocatorRef,
table_meta_allocator: TableMetadataAllocatorRef,
}
impl DdlManager {
@@ -58,14 +58,14 @@ impl DdlManager {
datanode_clients: DatanodeManagerRef,
cache_invalidator: CacheInvalidatorRef,
table_metadata_manager: TableMetadataManagerRef,
table_creator: TableMetadataAllocatorRef,
table_meta_allocator: TableMetadataAllocatorRef,
) -> Self {
Self {
procedure_manager,
datanode_manager: datanode_clients,
cache_invalidator,
table_metadata_manager,
table_creator,
table_meta_allocator,
}
}
@@ -333,7 +333,7 @@ async fn handle_create_table_task(
mut create_table_task: CreateTableTask,
) -> Result<SubmitDdlTaskResponse> {
let (table_id, region_routes) = ddl_manager
.table_creator
.table_meta_allocator
.create(
&TableMetadataAllocatorContext { cluster_id },
&mut create_table_task.table_info,

View File

@@ -12,37 +12,49 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::{iter, mem};
use api::v1::region::region_request;
use api::v1::{DeleteRequests, RowDeleteRequest, RowDeleteRequests};
use catalog::CatalogManager;
use client::region_handler::RegionRequestHandler;
use api::v1::region::{DeleteRequests as RegionDeleteRequests, RegionRequestHeader};
use api::v1::{DeleteRequests, RowDeleteRequests};
use catalog::CatalogManagerRef;
use common_meta::datanode_manager::{AffectedRows, DatanodeManagerRef};
use common_meta::peer::Peer;
use common_query::Output;
use futures_util::future;
use metrics::counter;
use partition::manager::PartitionRuleManagerRef;
use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
use table::requests::DeleteRequest as TableDeleteRequest;
use table::TableRef;
use crate::error::{
CatalogSnafu, InvalidDeleteRequestSnafu, MissingTimeIndexColumnSnafu, RequestDatanodeSnafu,
Result, TableNotFoundSnafu,
CatalogSnafu, FindRegionLeaderSnafu, InvalidDeleteRequestSnafu, JoinTaskSnafu,
MissingTimeIndexColumnSnafu, RequestDeletesSnafu, Result, TableNotFoundSnafu,
};
use crate::req_convert::delete::{ColumnToRow, RowToRegion};
use crate::region_req_factory::RegionRequestFactory;
use crate::req_convert::delete::{ColumnToRow, RowToRegion, TableToRegion};
pub(crate) struct Deleter<'a> {
catalog_manager: &'a dyn CatalogManager,
region_request_handler: &'a dyn RegionRequestHandler,
pub struct Deleter {
catalog_manager: CatalogManagerRef,
partition_manager: PartitionRuleManagerRef,
datanode_manager: DatanodeManagerRef,
}
impl<'a> Deleter<'a> {
pub type DeleterRef = Arc<Deleter>;
impl Deleter {
pub fn new(
catalog_manager: &'a dyn CatalogManager,
region_request_handler: &'a dyn RegionRequestHandler,
catalog_manager: CatalogManagerRef,
partition_manager: PartitionRuleManagerRef,
datanode_manager: DatanodeManagerRef,
) -> Self {
Self {
catalog_manager,
region_request_handler,
partition_manager,
datanode_manager,
}
}
@@ -67,31 +79,99 @@ impl<'a> Deleter<'a> {
.map(|r| !r.rows.is_empty())
.unwrap_or_default()
});
validate_row_count_match(&requests)?;
validate_column_count_match(&requests)?;
let requests = self.trim_columns(requests, &ctx).await?;
let region_request = RowToRegion::new(self.catalog_manager, &ctx)
.convert(requests)
.await?;
let region_request = region_request::Body::Deletes(region_request);
let deletes = RowToRegion::new(
self.catalog_manager.as_ref(),
self.partition_manager.as_ref(),
&ctx,
)
.convert(requests)
.await?;
let affected_rows = self
.region_request_handler
.handle(region_request, ctx)
.await
.context(RequestDatanodeSnafu)?;
let affected_rows = self.do_request(deletes, ctx.trace_id(), 0).await?;
Ok(Output::AffectedRows(affected_rows as _))
}
pub async fn handle_table_delete(
&self,
request: TableDeleteRequest,
ctx: QueryContextRef,
) -> Result<AffectedRows> {
let catalog = request.catalog_name.as_str();
let schema = request.schema_name.as_str();
let table = request.table_name.as_str();
let table = self.get_table(catalog, schema, table).await?;
let table_info = table.table_info();
let deletes = TableToRegion::new(&table_info, &self.partition_manager)
.convert(request)
.await?;
self.do_request(deletes, ctx.trace_id(), 0).await
}
}
impl<'a> Deleter<'a> {
impl Deleter {
async fn do_request(
&self,
requests: RegionDeleteRequests,
trace_id: u64,
span_id: u64,
) -> Result<AffectedRows> {
let header = RegionRequestHeader { trace_id, span_id };
let request_factory = RegionRequestFactory::new(header);
let tasks = self
.group_requests_by_peer(requests)
.await?
.into_iter()
.map(|(peer, deletes)| {
let request = request_factory.build_delete(deletes);
let datanode_manager = self.datanode_manager.clone();
common_runtime::spawn_write(async move {
datanode_manager
.datanode(&peer)
.await
.handle(request)
.await
.context(RequestDeletesSnafu)
})
});
let results = future::try_join_all(tasks).await.context(JoinTaskSnafu)?;
let affected_rows = results.into_iter().sum::<Result<u64>>()?;
counter!(crate::metrics::DIST_DELETE_ROW_COUNT, affected_rows);
Ok(affected_rows)
}
async fn group_requests_by_peer(
&self,
requests: RegionDeleteRequests,
) -> Result<HashMap<Peer, RegionDeleteRequests>> {
let mut deletes: HashMap<Peer, RegionDeleteRequests> = HashMap::new();
for req in requests.requests {
let peer = self
.partition_manager
.find_region_leader(req.region_id.into())
.await
.context(FindRegionLeaderSnafu)?;
deletes.entry(peer).or_default().requests.push(req);
}
Ok(deletes)
}
async fn trim_columns(
&self,
mut requests: RowDeleteRequests,
ctx: &QueryContextRef,
) -> Result<RowDeleteRequests> {
for req in &mut requests.deletes {
let table = self.get_table(req, ctx).await?;
let catalog = ctx.current_catalog();
let schema = ctx.current_schema();
let table = self.get_table(catalog, schema, &req.table_name).await?;
let key_column_names = self.key_column_names(&table)?;
let rows = req.rows.as_mut().unwrap();
@@ -142,25 +222,25 @@ impl<'a> Deleter<'a> {
Ok(key_column_names)
}
async fn get_table(&self, req: &RowDeleteRequest, ctx: &QueryContextRef) -> Result<TableRef> {
async fn get_table(&self, catalog: &str, schema: &str, table: &str) -> Result<TableRef> {
self.catalog_manager
.table(ctx.current_catalog(), ctx.current_schema(), &req.table_name)
.table(catalog, schema, table)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: req.table_name.clone(),
table_name: common_catalog::format_full_table_name(catalog, schema, table),
})
}
}
fn validate_row_count_match(requests: &RowDeleteRequests) -> Result<()> {
fn validate_column_count_match(requests: &RowDeleteRequests) -> Result<()> {
for request in &requests.deletes {
let rows = request.rows.as_ref().unwrap();
let column_count = rows.schema.len();
ensure!(
rows.rows.iter().all(|r| r.values.len() == column_count),
InvalidDeleteRequestSnafu {
reason: "row count mismatch"
reason: "column count mismatch"
}
)
}

View File

@@ -258,6 +258,12 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to find leader for region, source: {}", source))]
FindRegionLeader {
source: partition::error::Error,
location: Location,
},
#[snafu(display("Failed to create table info, source: {}", source))]
CreateTableInfo {
#[snafu(backtrace)]
@@ -683,6 +689,9 @@ pub enum Error {
column,
))]
ColumnNoneDefaultValue { column: String, location: Location },
#[snafu(display("Invalid region request, reason: {}", reason))]
InvalidRegionRequest { reason: String },
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -761,7 +770,8 @@ impl ErrorExt for Error {
| Error::BuildDfLogicalPlan { .. }
| Error::BuildTableMeta { .. }
| Error::VectorToGrpcColumn { .. }
| Error::MissingInsertBody { .. } => StatusCode::Internal,
| Error::MissingInsertBody { .. }
| Error::InvalidRegionRequest { .. } => StatusCode::Internal,
Error::IncompleteGrpcResult { .. }
| Error::ContextValueNotFound { .. }
@@ -808,7 +818,8 @@ impl ErrorExt for Error {
| Error::FindTablePartitionRule { source, .. }
| Error::FindTableRoute { source, .. }
| Error::SplitInsert { source, .. }
| Error::SplitDelete { source, .. } => source.status_code(),
| Error::SplitDelete { source, .. }
| Error::FindRegionLeader { source, .. } => source.status_code(),
Error::UnrecognizedTableOption { .. } => StatusCode::InvalidArguments,

View File

@@ -12,48 +12,59 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::alter_expr::Kind;
use api::v1::region::region_request;
use api::v1::{AlterExpr, ColumnSchema, InsertRequests, RowInsertRequest, RowInsertRequests};
use catalog::CatalogManager;
use client::region_handler::RegionRequestHandler;
use api::v1::region::{InsertRequests as RegionInsertRequests, RegionRequestHeader};
use api::v1::{
AlterExpr, ColumnSchema, CreateTableExpr, InsertRequests, RowInsertRequest, RowInsertRequests,
};
use catalog::CatalogManagerRef;
use common_catalog::consts::default_engine;
use common_grpc_expr::util::{extract_new_columns, ColumnExpr};
use common_meta::datanode_manager::{AffectedRows, DatanodeManagerRef};
use common_meta::peer::Peer;
use common_query::Output;
use common_telemetry::info;
use common_telemetry::{error, info};
use datatypes::schema::Schema;
use futures_util::future;
use metrics::counter;
use partition::manager::PartitionRuleManagerRef;
use session::context::QueryContextRef;
use snafu::prelude::*;
use sql::statements::insert::Insert;
use table::engine::TableReference;
use table::requests::InsertRequest as TableInsertRequest;
use table::TableRef;
use crate::error::{
CatalogSnafu, FindNewColumnsOnInsertionSnafu, InvalidInsertRequestSnafu, RequestDatanodeSnafu,
Result,
CatalogSnafu, FindNewColumnsOnInsertionSnafu, FindRegionLeaderSnafu, InvalidInsertRequestSnafu,
JoinTaskSnafu, RequestInsertsSnafu, Result, TableNotFoundSnafu,
};
use crate::expr_factory::CreateExprFactory;
use crate::req_convert::insert::{ColumnToRow, RowToRegion};
use crate::region_req_factory::RegionRequestFactory;
use crate::req_convert::insert::{ColumnToRow, RowToRegion, StatementToRegion, TableToRegion};
use crate::statement::StatementExecutor;
pub(crate) struct Inserter<'a> {
catalog_manager: &'a dyn CatalogManager,
create_expr_factory: &'a CreateExprFactory,
statement_executor: &'a StatementExecutor,
region_request_handler: &'a dyn RegionRequestHandler,
pub struct Inserter {
catalog_manager: CatalogManagerRef,
partition_manager: PartitionRuleManagerRef,
datanode_manager: DatanodeManagerRef,
}
impl<'a> Inserter<'a> {
pub type InserterRef = Arc<Inserter>;
impl Inserter {
pub fn new(
catalog_manager: &'a dyn CatalogManager,
create_expr_factory: &'a CreateExprFactory,
statement_executor: &'a StatementExecutor,
region_request_handler: &'a dyn RegionRequestHandler,
catalog_manager: CatalogManagerRef,
partition_manager: PartitionRuleManagerRef,
datanode_manager: DatanodeManagerRef,
) -> Self {
Self {
catalog_manager,
create_expr_factory,
statement_executor,
region_request_handler,
partition_manager,
datanode_manager,
}
}
@@ -61,15 +72,18 @@ impl<'a> Inserter<'a> {
&self,
requests: InsertRequests,
ctx: QueryContextRef,
statement_executor: &StatementExecutor,
) -> Result<Output> {
let row_inserts = ColumnToRow::convert(requests)?;
self.handle_row_inserts(row_inserts, ctx).await
self.handle_row_inserts(row_inserts, ctx, statement_executor)
.await
}
pub async fn handle_row_inserts(
&self,
mut requests: RowInsertRequests,
ctx: QueryContextRef,
statement_executor: &StatementExecutor,
) -> Result<Output> {
// remove empty requests
requests.inserts.retain(|req| {
@@ -78,25 +92,110 @@ impl<'a> Inserter<'a> {
.map(|r| !r.rows.is_empty())
.unwrap_or_default()
});
validate_row_count_match(&requests)?;
validate_column_count_match(&requests)?;
self.create_or_alter_tables_on_demand(&requests, &ctx)
self.create_or_alter_tables_on_demand(&requests, &ctx, statement_executor)
.await?;
let region_request = RowToRegion::new(self.catalog_manager, &ctx)
.convert(requests)
.await?;
let region_request = region_request::Body::Inserts(region_request);
let inserts = RowToRegion::new(
self.catalog_manager.as_ref(),
self.partition_manager.as_ref(),
&ctx,
)
.convert(requests)
.await?;
let affected_rows = self
.region_request_handler
.handle(region_request, ctx)
.await
.context(RequestDatanodeSnafu)?;
let affected_rows = self.do_request(inserts, ctx.trace_id(), 0).await?;
Ok(Output::AffectedRows(affected_rows as _))
}
pub async fn handle_table_insert(
&self,
request: TableInsertRequest,
ctx: QueryContextRef,
) -> Result<usize> {
let catalog = request.catalog_name.as_str();
let schema = request.schema_name.as_str();
let table_name = request.table_name.as_str();
let table = self.get_table(catalog, schema, table_name).await?;
let table = table.with_context(|| TableNotFoundSnafu {
table_name: common_catalog::format_full_table_name(catalog, schema, table_name),
})?;
let table_info = table.table_info();
let inserts = TableToRegion::new(&table_info, &self.partition_manager)
.convert(request)
.await?;
let affected_rows = self.do_request(inserts, ctx.trace_id(), 0).await?;
Ok(affected_rows as _)
}
pub async fn handle_statement_insert(
&self,
insert: &Insert,
ctx: &QueryContextRef,
) -> Result<Output> {
let inserts =
StatementToRegion::new(self.catalog_manager.as_ref(), &self.partition_manager, ctx)
.convert(insert)
.await?;
let affected_rows = self.do_request(inserts, ctx.trace_id(), 0).await?;
Ok(Output::AffectedRows(affected_rows as _))
}
}
impl<'a> Inserter<'a> {
impl Inserter {
async fn do_request(
&self,
requests: RegionInsertRequests,
trace_id: u64,
span_id: u64,
) -> Result<AffectedRows> {
let header = RegionRequestHeader { trace_id, span_id };
let request_factory = RegionRequestFactory::new(header);
let tasks = self
.group_requests_by_peer(requests)
.await?
.into_iter()
.map(|(peer, inserts)| {
let request = request_factory.build_insert(inserts);
let datanode_manager = self.datanode_manager.clone();
common_runtime::spawn_write(async move {
datanode_manager
.datanode(&peer)
.await
.handle(request)
.await
.context(RequestInsertsSnafu)
})
});
let results = future::try_join_all(tasks).await.context(JoinTaskSnafu)?;
let affected_rows = results.into_iter().sum::<Result<u64>>()?;
counter!(crate::metrics::DIST_INGEST_ROW_COUNT, affected_rows);
Ok(affected_rows)
}
async fn group_requests_by_peer(
&self,
requests: RegionInsertRequests,
) -> Result<HashMap<Peer, RegionInsertRequests>> {
let mut inserts: HashMap<Peer, RegionInsertRequests> = HashMap::new();
for req in requests.requests {
let peer = self
.partition_manager
.find_region_leader(req.region_id.into())
.await
.context(FindRegionLeaderSnafu)?;
inserts.entry(peer).or_default().requests.push(req);
}
Ok(inserts)
}
// check if tables already exist:
// - if table does not exist, create table by inferred CreateExpr
// - if table exist, check if schema matches. If any new column found, alter table by inferred `AlterExpr`
@@ -104,15 +203,20 @@ impl<'a> Inserter<'a> {
&self,
requests: &RowInsertRequests,
ctx: &QueryContextRef,
statement_executor: &StatementExecutor,
) -> Result<()> {
// TODO(jeremy): create and alter in batch?
for req in &requests.inserts {
match self.get_table(req, ctx).await? {
let catalog = ctx.current_catalog();
let schema = ctx.current_schema();
let table = self.get_table(catalog, schema, &req.table_name).await?;
match table {
Some(table) => {
validate_request_with_table(req, &table)?;
self.alter_table_on_demand(req, table, ctx).await?
self.alter_table_on_demand(req, table, ctx, statement_executor)
.await?
}
None => self.create_table(req, ctx).await?,
None => self.create_table(req, ctx, statement_executor).await?,
}
}
@@ -121,11 +225,12 @@ impl<'a> Inserter<'a> {
async fn get_table(
&self,
req: &RowInsertRequest,
ctx: &QueryContextRef,
catalog: &str,
schema: &str,
table: &str,
) -> Result<Option<TableRef>> {
self.catalog_manager
.table(ctx.current_catalog(), ctx.current_schema(), &req.table_name)
.table(catalog, schema, table)
.await
.context(CatalogSnafu)
}
@@ -135,9 +240,11 @@ impl<'a> Inserter<'a> {
req: &RowInsertRequest,
table: TableRef,
ctx: &QueryContextRef,
statement_executor: &StatementExecutor,
) -> Result<()> {
let catalog_name = ctx.current_catalog();
let schema_name = ctx.current_schema();
let table_name = table.table_info().name.clone();
let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
let column_exprs = ColumnExpr::from_column_schemas(request_schema);
@@ -147,7 +254,6 @@ impl<'a> Inserter<'a> {
return Ok(());
};
let table_name = table.table_info().name.clone();
info!(
"Adding new columns: {:?} to table: {}.{}.{}",
add_columns, catalog_name, schema_name, table_name
@@ -160,54 +266,75 @@ impl<'a> Inserter<'a> {
kind: Some(Kind::AddColumns(add_columns)),
};
self.statement_executor
.alter_table_inner(alter_table_expr)
.await?;
let res = statement_executor.alter_table_inner(alter_table_expr).await;
info!(
"Successfully added new columns to table: {}.{}.{}",
catalog_name, schema_name, table_name
);
Ok(())
match res {
Ok(_) => {
info!(
"Successfully added new columns to table: {}.{}.{}",
catalog_name, schema_name, table_name
);
Ok(())
}
Err(err) => {
error!(
"Failed to add new columns to table: {}.{}.{}: {}",
catalog_name, schema_name, table_name, err
);
Err(err)
}
}
}
async fn create_table(&self, req: &RowInsertRequest, ctx: &QueryContextRef) -> Result<()> {
async fn create_table(
&self,
req: &RowInsertRequest,
ctx: &QueryContextRef,
statement_executor: &StatementExecutor,
) -> Result<()> {
let table_ref =
TableReference::full(ctx.current_catalog(), ctx.current_schema(), &req.table_name);
let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
let create_table_expr = &mut build_create_table_expr(&table_ref, request_schema)?;
info!(
"Table {}.{}.{} does not exist, try create table",
table_ref.catalog, table_ref.schema, table_ref.table,
);
let mut create_table_expr = self
.create_expr_factory
.create_table_expr_by_column_schemas(&table_ref, request_schema, default_engine())?;
// TODO(weny): multiple regions table.
self.statement_executor
.create_table_inner(&mut create_table_expr, None)
.await?;
let res = statement_executor
.create_table_inner(create_table_expr, None)
.await;
info!(
"Successfully created table on insertion: {}.{}.{}",
table_ref.catalog, table_ref.schema, table_ref.table,
);
Ok(())
match res {
Ok(_) => {
info!(
"Successfully created table {}.{}.{}",
table_ref.catalog, table_ref.schema, table_ref.table,
);
Ok(())
}
Err(err) => {
error!(
"Failed to create table {}.{}.{}: {}",
table_ref.catalog, table_ref.schema, table_ref.table, err
);
Err(err)
}
}
}
}
fn validate_row_count_match(requests: &RowInsertRequests) -> Result<()> {
fn validate_column_count_match(requests: &RowInsertRequests) -> Result<()> {
for request in &requests.inserts {
let rows = request.rows.as_ref().unwrap();
let column_count = rows.schema.len();
ensure!(
rows.rows.iter().all(|r| r.values.len() == column_count),
InvalidInsertRequestSnafu {
reason: "row count mismatch"
reason: "column count mismatch"
}
)
}
@@ -243,6 +370,13 @@ fn validate_required_columns(request_schema: &[ColumnSchema], table_schema: &Sch
Ok(())
}
fn build_create_table_expr(
table: &TableReference,
request_schema: &[ColumnSchema],
) -> Result<CreateTableExpr> {
CreateExprFactory.create_table_expr_by_column_schemas(table, request_schema, default_engine())
}
#[cfg(test)]
mod tests {
use datatypes::prelude::{ConcreteDataType, Value as DtValue};

View File

@@ -25,14 +25,12 @@ use std::sync::Arc;
use std::time::Duration;
use api::v1::meta::Role;
use api::v1::{DeleteRequests, InsertRequests, RowDeleteRequests, RowInsertRequests};
use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use catalog::local::manager::SystemTableInitializer;
use catalog::remote::CachedMetaKvBackend;
use catalog::CatalogManagerRef;
use client::client_manager::DatanodeClients;
use client::region_handler::RegionRequestHandlerRef;
use common_base::Plugins;
use common_config::KvStoreConfig;
use common_error::ext::BoxedError;
@@ -53,6 +51,7 @@ use common_telemetry::{error, timer};
use datanode::region_server::RegionServer;
use log_store::raft_engine::RaftEngineBackend;
use meta_client::client::{MetaClient, MetaClientBuilder};
use partition::manager::PartitionRuleManager;
use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
use query::plan::LogicalPlan;
use query::query_engine::options::{validate_catalog_and_schema, QueryOptions};
@@ -82,19 +81,18 @@ pub use standalone::StandaloneDatanodeManager;
use table::engine::manager::MemoryTableEngineManager;
use self::distributed::DistRegionRequestHandler;
use self::standalone::{StandaloneRegionRequestHandler, StandaloneTableMetadataCreator};
use self::standalone::StandaloneTableMetadataCreator;
use crate::catalog::FrontendCatalogManager;
use crate::delete::Deleter;
use crate::delete::{Deleter, DeleterRef};
use crate::error::{
self, CatalogSnafu, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu,
MissingMetasrvOptsSnafu, ParseSqlSnafu, PermissionSnafu, PlanStatementSnafu, Result,
SqlExecInterceptedSnafu,
};
use crate::expr_factory::CreateExprFactory;
use crate::frontend::FrontendOptions;
use crate::heartbeat::handler::invalidate_table_cache::InvalidateTableCacheHandler;
use crate::heartbeat::HeartbeatTask;
use crate::insert::Inserter;
use crate::insert::{Inserter, InserterRef};
use crate::metrics;
use crate::script::ScriptExecutor;
use crate::server::{start_server, ServerHandlers, Services};
@@ -127,13 +125,13 @@ pub struct Instance {
script_executor: Arc<ScriptExecutor>,
statement_executor: Arc<StatementExecutor>,
query_engine: QueryEngineRef,
region_request_handler: RegionRequestHandlerRef,
create_expr_factory: CreateExprFactory,
/// plugins: this map holds extensions to customize query or auth
/// behaviours.
plugins: Arc<Plugins>,
servers: Arc<ServerHandlers>,
heartbeat_task: Option<HeartbeatTask>,
inserter: InserterRef,
deleter: DeleterRef,
}
impl Instance {
@@ -172,15 +170,27 @@ impl Instance {
)
.query_engine();
let region_request_handler = DistRegionRequestHandler::arc(catalog_manager.clone());
let partition_manager = Arc::new(PartitionRuleManager::new(meta_backend.clone()));
let inserter = Arc::new(Inserter::new(
catalog_manager.clone(),
partition_manager.clone(),
datanode_clients.clone(),
));
let deleter = Arc::new(Deleter::new(
catalog_manager.clone(),
partition_manager,
datanode_clients,
));
let statement_executor = Arc::new(StatementExecutor::new(
catalog_manager.clone(),
query_engine.clone(),
region_request_handler.clone(),
meta_client.clone(),
meta_backend.clone(),
catalog_manager.clone(),
inserter.clone(),
deleter.clone(),
));
plugins.insert::<StatementExecutorRef>(statement_executor.clone());
@@ -194,25 +204,23 @@ impl Instance {
]);
let heartbeat_task = Some(HeartbeatTask::new(
meta_client,
meta_client.clone(),
opts.heartbeat.clone(),
Arc::new(handlers_executor),
));
common_telemetry::init_node_id(opts.node_id.clone());
let create_expr_factory = CreateExprFactory;
Ok(Instance {
catalog_manager,
script_executor,
create_expr_factory,
statement_executor,
query_engine,
region_request_handler,
plugins: plugins.clone(),
servers: Arc::new(HashMap::new()),
heartbeat_task,
inserter,
deleter,
})
}
@@ -293,40 +301,51 @@ impl Instance {
let script_executor =
Arc::new(ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()).await?);
let region_request_handler = StandaloneRegionRequestHandler::arc(region_server.clone());
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
let datanode_manager = Arc::new(StandaloneDatanodeManager(region_server));
let cache_invalidator = Arc::new(DummyCacheInvalidator);
let ddl_executor = Arc::new(DdlManager::new(
procedure_manager,
Arc::new(StandaloneDatanodeManager(region_server)),
datanode_manager.clone(),
cache_invalidator.clone(),
table_metadata_manager.clone(),
Arc::new(StandaloneTableMetadataCreator::new(kv_backend.clone())),
));
let partition_manager = Arc::new(PartitionRuleManager::new(kv_backend.clone()));
let inserter = Arc::new(Inserter::new(
catalog_manager.clone(),
partition_manager.clone(),
datanode_manager.clone(),
));
let deleter = Arc::new(Deleter::new(
catalog_manager.clone(),
partition_manager,
datanode_manager,
));
let statement_executor = Arc::new(StatementExecutor::new(
catalog_manager.clone(),
query_engine.clone(),
region_request_handler.clone(),
ddl_executor,
kv_backend.clone(),
cache_invalidator,
inserter.clone(),
deleter.clone(),
));
let create_expr_factory = CreateExprFactory;
Ok(Instance {
catalog_manager: catalog_manager.clone(),
script_executor,
create_expr_factory,
statement_executor,
query_engine,
region_request_handler,
plugins: Default::default(),
servers: Arc::new(HashMap::new()),
heartbeat_task: None,
inserter,
deleter,
})
}
@@ -341,62 +360,6 @@ impl Instance {
&self.catalog_manager
}
// Handle batch inserts with row-format
pub async fn handle_row_inserts(
&self,
requests: RowInsertRequests,
ctx: QueryContextRef,
) -> Result<Output> {
let inserter = Inserter::new(
self.catalog_manager.as_ref(),
&self.create_expr_factory,
&self.statement_executor,
self.region_request_handler.as_ref(),
);
inserter.handle_row_inserts(requests, ctx).await
}
/// Handle batch inserts
pub async fn handle_inserts(
&self,
requests: InsertRequests,
ctx: QueryContextRef,
) -> Result<Output> {
let inserter = Inserter::new(
self.catalog_manager.as_ref(),
&self.create_expr_factory,
&self.statement_executor,
self.region_request_handler.as_ref(),
);
inserter.handle_column_inserts(requests, ctx).await
}
/// Handle batch deletes with row-format
pub async fn handle_row_deletes(
&self,
requests: RowDeleteRequests,
ctx: QueryContextRef,
) -> Result<Output> {
let deleter = Deleter::new(
self.catalog_manager.as_ref(),
self.region_request_handler.as_ref(),
);
deleter.handle_row_deletes(requests, ctx).await
}
/// Handle batch deletes
pub async fn handle_deletes(
&self,
requests: DeleteRequests,
ctx: QueryContextRef,
) -> Result<Output> {
let deleter = Deleter::new(
self.catalog_manager.as_ref(),
self.region_request_handler.as_ref(),
);
deleter.handle_column_deletes(requests, ctx).await
}
pub fn set_plugins(&mut self, map: Arc<Plugins>) {
self.plugins = map;
}

View File

@@ -12,28 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod deleter;
pub(crate) mod inserter;
use std::sync::Arc;
use api::v1::region::{region_request, QueryRequest};
use api::v1::region::QueryRequest;
use async_trait::async_trait;
use client::error::{HandleRequestSnafu, Result as ClientResult};
use client::region_handler::RegionRequestHandler;
use common_error::ext::BoxedError;
use common_meta::datanode_manager::AffectedRows;
use common_recordbatch::SendableRecordBatchStream;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
use crate::catalog::FrontendCatalogManager;
use crate::error::{
FindDatanodeSnafu, FindTableRouteSnafu, NotSupportedSnafu, RequestQuerySnafu, Result,
};
use crate::instance::distributed::deleter::DistDeleter;
use crate::instance::distributed::inserter::DistInserter;
use crate::error::{FindDatanodeSnafu, FindTableRouteSnafu, RequestQuerySnafu, Result};
pub(crate) struct DistRegionRequestHandler {
catalog_manager: Arc<FrontendCatalogManager>,
@@ -47,17 +38,6 @@ impl DistRegionRequestHandler {
#[async_trait]
impl RegionRequestHandler for DistRegionRequestHandler {
async fn handle(
&self,
request: region_request::Body,
ctx: QueryContextRef,
) -> ClientResult<AffectedRows> {
self.handle_inner(request, ctx)
.await
.map_err(BoxedError::new)
.context(HandleRequestSnafu)
}
async fn do_get(&self, request: QueryRequest) -> ClientResult<SendableRecordBatchStream> {
self.do_get_inner(request)
.await
@@ -67,52 +47,6 @@ impl RegionRequestHandler for DistRegionRequestHandler {
}
impl DistRegionRequestHandler {
async fn handle_inner(
&self,
request: region_request::Body,
ctx: QueryContextRef,
) -> Result<AffectedRows> {
match request {
region_request::Body::Inserts(inserts) => {
let inserter =
DistInserter::new(&self.catalog_manager).with_trace_id(ctx.trace_id());
inserter.insert(inserts).await
}
region_request::Body::Deletes(deletes) => {
let deleter = DistDeleter::new(&self.catalog_manager).with_trace_id(ctx.trace_id());
deleter.delete(deletes).await
}
region_request::Body::Create(_) => NotSupportedSnafu {
feat: "region create",
}
.fail(),
region_request::Body::Drop(_) => NotSupportedSnafu {
feat: "region drop",
}
.fail(),
region_request::Body::Open(_) => NotSupportedSnafu {
feat: "region open",
}
.fail(),
region_request::Body::Close(_) => NotSupportedSnafu {
feat: "region close",
}
.fail(),
region_request::Body::Alter(_) => NotSupportedSnafu {
feat: "region alter",
}
.fail(),
region_request::Body::Flush(_) => NotSupportedSnafu {
feat: "region flush",
}
.fail(),
region_request::Body::Compact(_) => NotSupportedSnafu {
feat: "region compact",
}
.fail(),
}
}
async fn do_get_inner(&self, request: QueryRequest) -> Result<SendableRecordBatchStream> {
let region_id = RegionId::from_u64(request.region_id);

View File

@@ -1,284 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use api::v1::region::{region_request, DeleteRequests, RegionRequest, RegionRequestHeader};
use common_meta::datanode_manager::AffectedRows;
use common_meta::peer::Peer;
use futures::future;
use metrics::counter;
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
use crate::catalog::FrontendCatalogManager;
use crate::error::{
FindDatanodeSnafu, FindTableRouteSnafu, JoinTaskSnafu, RequestDeletesSnafu, Result,
SplitDeleteSnafu,
};
/// A distributed deleter. It ingests gRPC [DeleteRequests].
///
/// Table data partitioning and Datanode requests batching are handled inside.
pub struct DistDeleter<'a> {
catalog_manager: &'a FrontendCatalogManager,
trace_id: Option<u64>,
span_id: Option<u64>,
}
impl<'a> DistDeleter<'a> {
pub(crate) fn new(catalog_manager: &'a FrontendCatalogManager) -> Self {
Self {
catalog_manager,
trace_id: None,
span_id: None,
}
}
pub fn with_trace_id(mut self, trace_id: u64) -> Self {
self.trace_id = Some(trace_id);
self
}
#[allow(dead_code)]
pub fn with_span_id(mut self, span_id: u64) -> Self {
self.span_id = Some(span_id);
self
}
pub(crate) async fn delete(&self, requests: DeleteRequests) -> Result<AffectedRows> {
let requests = self.split(requests).await?;
let trace_id = self.trace_id.unwrap_or_default();
let span_id = self.span_id.unwrap_or_default();
let results = future::try_join_all(requests.into_iter().map(|(peer, deletes)| {
let datanode_clients = self.catalog_manager.datanode_manager();
common_runtime::spawn_write(async move {
let request = RegionRequest {
header: Some(RegionRequestHeader { trace_id, span_id }),
body: Some(region_request::Body::Deletes(deletes)),
};
datanode_clients
.datanode(&peer)
.await
.handle(request)
.await
.context(RequestDeletesSnafu)
})
}))
.await
.context(JoinTaskSnafu)?;
let affected_rows = results.into_iter().sum::<Result<u64>>()?;
counter!(crate::metrics::DIST_DELETE_ROW_COUNT, affected_rows);
Ok(affected_rows)
}
/// Splits gRPC [DeleteRequests] into multiple gRPC [DeleteRequests]s, each of which
/// is grouped by the peer of Datanode, so we can batch them together when invoking gRPC write
/// method in Datanode.
async fn split(&self, requests: DeleteRequests) -> Result<HashMap<Peer, DeleteRequests>> {
let partition_manager = self.catalog_manager.partition_manager();
let mut deletes: HashMap<Peer, DeleteRequests> = HashMap::new();
for req in requests.requests {
let table_id = RegionId::from_u64(req.region_id).table_id();
let req_splits = partition_manager
.split_delete_request(table_id, req)
.await
.context(SplitDeleteSnafu)?;
let table_route = partition_manager
.find_table_route(table_id)
.await
.context(FindTableRouteSnafu { table_id })?;
for (region_number, delete) in req_splits {
let peer =
table_route
.find_region_leader(region_number)
.context(FindDatanodeSnafu {
region: region_number,
})?;
deletes
.entry(peer.clone())
.or_default()
.requests
.push(delete);
}
}
Ok(deletes)
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use api::helper::vectors_to_rows;
use api::v1::region::DeleteRequest;
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value};
use client::client_manager::DatanodeClients;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_meta::helper::{CatalogValue, SchemaValue};
use common_meta::key::catalog_name::CatalogNameKey;
use common_meta::key::schema_name::SchemaNameKey;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::{KvBackend, KvBackendRef};
use common_meta::rpc::store::PutRequest;
use datatypes::prelude::VectorRef;
use datatypes::vectors::Int32Vector;
use super::*;
use crate::heartbeat::handler::tests::MockKvCacheInvalidator;
use crate::table::test::create_partition_rule_manager;
async fn prepare_mocked_backend() -> KvBackendRef {
let backend = Arc::new(MemoryKvBackend::default());
let default_catalog = CatalogNameKey {
catalog: DEFAULT_CATALOG_NAME,
}
.to_string();
let req = PutRequest::new()
.with_key(default_catalog.as_bytes())
.with_value(CatalogValue.as_bytes().unwrap());
backend.put(req).await.unwrap();
let default_schema = SchemaNameKey {
catalog: DEFAULT_CATALOG_NAME,
schema: DEFAULT_SCHEMA_NAME,
}
.to_string();
let req = PutRequest::new()
.with_key(default_schema.as_bytes())
.with_value(SchemaValue.as_bytes().unwrap());
backend.put(req).await.unwrap();
backend
}
#[tokio::test]
async fn test_split_deletes() {
let backend = prepare_mocked_backend().await;
create_partition_rule_manager(backend.clone()).await;
let catalog_manager = Arc::new(FrontendCatalogManager::new(
backend,
Arc::new(MockKvCacheInvalidator::default()),
Arc::new(DatanodeClients::default()),
));
let new_delete_request = |vector: VectorRef| -> DeleteRequest {
let row_count = vector.len();
DeleteRequest {
region_id: RegionId::new(1, 0).into(),
rows: Some(Rows {
schema: vec![ColumnSchema {
column_name: "a".to_string(),
datatype: ColumnDataType::Int32 as i32,
semantic_type: SemanticType::Tag as i32,
}],
rows: vectors_to_rows([vector].iter(), row_count),
}),
}
};
let requests = DeleteRequests {
requests: vec![
new_delete_request(Arc::new(Int32Vector::from(vec![
Some(1),
Some(11),
Some(50),
]))),
new_delete_request(Arc::new(Int32Vector::from(vec![
Some(2),
Some(12),
Some(102),
]))),
],
};
let deleter = DistDeleter::new(&catalog_manager);
let mut deletes = deleter.split(requests).await.unwrap();
assert_eq!(deletes.len(), 3);
let new_split_delete_request =
|rows: Vec<Option<i32>>, region_id: RegionId| -> DeleteRequest {
DeleteRequest {
region_id: region_id.into(),
rows: Some(Rows {
schema: vec![ColumnSchema {
column_name: "a".to_string(),
datatype: ColumnDataType::Int32 as i32,
semantic_type: SemanticType::Tag as i32,
}],
rows: rows
.into_iter()
.map(|v| Row {
values: vec![Value {
value_data: v.map(ValueData::I32Value),
}],
})
.collect(),
}),
}
};
// region to datanode placement:
// 1 -> 1
// 2 -> 2
// 3 -> 3
//
// region value ranges:
// 1 -> [50, max)
// 2 -> [10, 50)
// 3 -> (min, 10)
let datanode_deletes = deletes.remove(&Peer::new(1, "")).unwrap().requests;
assert_eq!(datanode_deletes.len(), 2);
assert_eq!(
datanode_deletes[0],
new_split_delete_request(vec![Some(50)], RegionId::new(1, 1))
);
assert_eq!(
datanode_deletes[1],
new_split_delete_request(vec![Some(102)], RegionId::new(1, 1))
);
let datanode_deletes = deletes.remove(&Peer::new(2, "")).unwrap().requests;
assert_eq!(datanode_deletes.len(), 2);
assert_eq!(
datanode_deletes[0],
new_split_delete_request(vec![Some(11)], RegionId::new(1, 2))
);
assert_eq!(
datanode_deletes[1],
new_split_delete_request(vec![Some(12)], RegionId::new(1, 2))
);
let datanode_deletes = deletes.remove(&Peer::new(3, "")).unwrap().requests;
assert_eq!(datanode_deletes.len(), 2);
assert_eq!(
datanode_deletes[0],
new_split_delete_request(vec![Some(1)], RegionId::new(1, 3))
);
assert_eq!(
datanode_deletes[1],
new_split_delete_request(vec![Some(2)], RegionId::new(1, 3))
);
}
}

View File

@@ -1,275 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use api::v1::region::{region_request, InsertRequests, RegionRequest, RegionRequestHeader};
use common_meta::datanode_manager::AffectedRows;
use common_meta::peer::Peer;
use futures_util::future;
use metrics::counter;
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
use crate::catalog::FrontendCatalogManager;
use crate::error::{
FindDatanodeSnafu, FindTableRouteSnafu, JoinTaskSnafu, RequestInsertsSnafu, Result,
SplitInsertSnafu,
};
/// A distributed inserter. It ingests gRPC [InsertRequests].
///
/// Table data partitioning and Datanode requests batching are handled inside.
pub struct DistInserter<'a> {
catalog_manager: &'a FrontendCatalogManager,
trace_id: Option<u64>,
span_id: Option<u64>,
}
impl<'a> DistInserter<'a> {
pub fn new(catalog_manager: &'a FrontendCatalogManager) -> Self {
Self {
catalog_manager,
trace_id: None,
span_id: None,
}
}
pub fn with_trace_id(mut self, trace_id: u64) -> Self {
self.trace_id = Some(trace_id);
self
}
#[allow(dead_code)]
pub fn with_span_id(mut self, span_id: u64) -> Self {
self.span_id = Some(span_id);
self
}
pub(crate) async fn insert(&self, requests: InsertRequests) -> Result<AffectedRows> {
let requests = self.split(requests).await?;
let trace_id = self.trace_id.unwrap_or_default();
let span_id = self.span_id.unwrap_or_default();
let results = future::try_join_all(requests.into_iter().map(|(peer, inserts)| {
let datanode_clients = self.catalog_manager.datanode_manager();
common_runtime::spawn_write(async move {
let request = RegionRequest {
header: Some(RegionRequestHeader { trace_id, span_id }),
body: Some(region_request::Body::Inserts(inserts)),
};
datanode_clients
.datanode(&peer)
.await
.handle(request)
.await
.context(RequestInsertsSnafu)
})
}))
.await
.context(JoinTaskSnafu)?;
let affected_rows = results.into_iter().sum::<Result<u64>>()?;
counter!(crate::metrics::DIST_INGEST_ROW_COUNT, affected_rows);
Ok(affected_rows)
}
/// Splits gRPC [InsertRequests] into multiple gRPC [InsertRequests]s, each of which
/// is grouped by the peer of Datanode, so we can batch them together when invoking gRPC write
/// method in Datanode.
async fn split(&self, requests: InsertRequests) -> Result<HashMap<Peer, InsertRequests>> {
let partition_manager = self.catalog_manager.partition_manager();
let mut inserts: HashMap<Peer, InsertRequests> = HashMap::new();
for req in requests.requests {
let table_id = RegionId::from_u64(req.region_id).table_id();
let req_splits = partition_manager
.split_insert_request(table_id, req)
.await
.context(SplitInsertSnafu)?;
let table_route = partition_manager
.find_table_route(table_id)
.await
.context(FindTableRouteSnafu { table_id })?;
let region_map = table_route.region_map();
for (region_number, insert) in req_splits {
let peer = *region_map.get(&region_number).context(FindDatanodeSnafu {
region: region_number,
})?;
inserts
.entry(peer.clone())
.or_default()
.requests
.push(insert);
}
}
Ok(inserts)
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use api::helper::vectors_to_rows;
use api::v1::region::InsertRequest;
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value};
use client::client_manager::DatanodeClients;
use common_meta::key::catalog_name::{CatalogManager, CatalogNameKey};
use common_meta::key::schema_name::{SchemaManager, SchemaNameKey};
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::KvBackendRef;
use datatypes::prelude::VectorRef;
use datatypes::vectors::Int32Vector;
use super::*;
use crate::heartbeat::handler::tests::MockKvCacheInvalidator;
use crate::table::test::create_partition_rule_manager;
async fn prepare_mocked_backend() -> KvBackendRef {
let backend = Arc::new(MemoryKvBackend::default());
let catalog_manager = CatalogManager::new(backend.clone());
let schema_manager = SchemaManager::new(backend.clone());
catalog_manager
.create(CatalogNameKey::default())
.await
.unwrap();
schema_manager
.create(SchemaNameKey::default(), None)
.await
.unwrap();
backend
}
#[tokio::test]
async fn test_split_inserts() {
let backend = prepare_mocked_backend().await;
create_partition_rule_manager(backend.clone()).await;
let catalog_manager = Arc::new(FrontendCatalogManager::new(
backend,
Arc::new(MockKvCacheInvalidator::default()),
Arc::new(DatanodeClients::default()),
));
let inserter = DistInserter::new(&catalog_manager);
let new_insert_request = |vector: VectorRef| -> InsertRequest {
let row_count = vector.len();
InsertRequest {
region_id: RegionId::new(1, 0).into(),
rows: Some(Rows {
schema: vec![ColumnSchema {
column_name: "a".to_string(),
datatype: ColumnDataType::Int32 as i32,
semantic_type: SemanticType::Field as i32,
}],
rows: vectors_to_rows([vector].iter(), row_count),
}),
}
};
let requests = InsertRequests {
requests: vec![
new_insert_request(Arc::new(Int32Vector::from(vec![
Some(1),
None,
Some(11),
Some(101),
]))),
new_insert_request(Arc::new(Int32Vector::from(vec![
Some(2),
Some(12),
None,
Some(102),
]))),
],
};
let mut inserts = inserter.split(requests).await.unwrap();
assert_eq!(inserts.len(), 3);
let new_split_insert_request =
|rows: Vec<Option<i32>>, region_id: RegionId| -> InsertRequest {
InsertRequest {
region_id: region_id.into(),
rows: Some(Rows {
schema: vec![ColumnSchema {
column_name: "a".to_string(),
datatype: ColumnDataType::Int32 as i32,
semantic_type: SemanticType::Field as i32,
}],
rows: rows
.into_iter()
.map(|v| Row {
values: vec![Value {
value_data: v.map(ValueData::I32Value),
}],
})
.collect(),
}),
}
};
// region to datanode placement:
// 1 -> 1
// 2 -> 2
// 3 -> 3
//
// region value ranges:
// 1 -> [50, max)
// 2 -> [10, 50)
// 3 -> (min, 10)
let datanode_inserts = inserts.remove(&Peer::new(1, "")).unwrap().requests;
assert_eq!(datanode_inserts.len(), 2);
assert_eq!(
datanode_inserts[0],
new_split_insert_request(vec![Some(101)], RegionId::new(1, 1))
);
assert_eq!(
datanode_inserts[1],
new_split_insert_request(vec![Some(102)], RegionId::new(1, 1))
);
let datanode_inserts = inserts.remove(&Peer::new(2, "")).unwrap().requests;
assert_eq!(datanode_inserts.len(), 2);
assert_eq!(
datanode_inserts[0],
new_split_insert_request(vec![Some(11)], RegionId::new(1, 2))
);
assert_eq!(
datanode_inserts[1],
new_split_insert_request(vec![Some(12)], RegionId::new(1, 2))
);
let datanode_inserts = inserts.remove(&Peer::new(3, "")).unwrap().requests;
assert_eq!(datanode_inserts.len(), 2);
assert_eq!(
datanode_inserts[0],
new_split_insert_request(vec![Some(1), None], RegionId::new(1, 3))
);
assert_eq!(
datanode_inserts[1],
new_split_insert_request(vec![Some(2), None], RegionId::new(1, 3))
);
}
}

View File

@@ -15,6 +15,7 @@
use api::v1::ddl_request::Expr as DdlExpr;
use api::v1::greptime_request::Request;
use api::v1::query_request::Query;
use api::v1::{DeleteRequests, InsertRequests, RowDeleteRequests, RowInsertRequests};
use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use common_meta::table_name::TableName;
@@ -133,3 +134,41 @@ impl GrpcQueryHandler for Instance {
Ok(output)
}
}
impl Instance {
pub async fn handle_inserts(
&self,
requests: InsertRequests,
ctx: QueryContextRef,
) -> Result<Output> {
self.inserter
.handle_column_inserts(requests, ctx, self.statement_executor.as_ref())
.await
}
pub async fn handle_row_inserts(
&self,
requests: RowInsertRequests,
ctx: QueryContextRef,
) -> Result<Output> {
self.inserter
.handle_row_inserts(requests, ctx, self.statement_executor.as_ref())
.await
}
pub async fn handle_deletes(
&self,
requests: DeleteRequests,
ctx: QueryContextRef,
) -> Result<Output> {
self.deleter.handle_column_deletes(requests, ctx).await
}
pub async fn handle_row_deletes(
&self,
requests: RowDeleteRequests,
ctx: QueryContextRef,
) -> Result<Output> {
self.deleter.handle_row_deletes(requests, ctx).await
}
}

View File

@@ -15,7 +15,7 @@
use std::sync::Arc;
use api::v1::meta::Partition;
use api::v1::region::{region_request, QueryRequest, RegionRequest};
use api::v1::region::{QueryRequest, RegionRequest, RegionResponse};
use async_trait::async_trait;
use client::error::{HandleRequestSnafu, Result as ClientResult};
use client::region::check_response_header;
@@ -31,12 +31,11 @@ use common_meta::sequence::{Sequence, SequenceRef};
use common_recordbatch::SendableRecordBatchStream;
use datanode::region_server::RegionServer;
use servers::grpc::region_server::RegionServerHandler;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
use store_api::storage::{RegionId, TableId};
use table::metadata::RawTableInfo;
use crate::error::InvokeRegionServerSnafu;
use crate::error::{InvalidRegionRequestSnafu, InvokeRegionServerSnafu, Result};
const TABLE_ID_SEQ: &str = "table_id";
@@ -45,31 +44,24 @@ pub(crate) struct StandaloneRegionRequestHandler {
}
impl StandaloneRegionRequestHandler {
#[allow(dead_code)]
pub fn arc(region_server: RegionServer) -> Arc<Self> {
Arc::new(Self { region_server })
}
async fn handle_inner(&self, request: RegionRequest) -> Result<RegionResponse> {
let body = request.body.with_context(|| InvalidRegionRequestSnafu {
reason: "body not found",
})?;
self.region_server
.handle(body)
.await
.context(InvokeRegionServerSnafu)
}
}
#[async_trait]
impl RegionRequestHandler for StandaloneRegionRequestHandler {
async fn handle(
&self,
request: region_request::Body,
_ctx: QueryContextRef,
) -> ClientResult<AffectedRows> {
let response = self
.region_server
.handle(request)
.await
.context(InvokeRegionServerSnafu)
.map_err(BoxedError::new)
.context(HandleRequestSnafu)?;
check_response_header(response.header)?;
Ok(response.affected_rows)
}
async fn do_get(&self, request: QueryRequest) -> ClientResult<SendableRecordBatchStream> {
self.region_server
.handle_read(request)
@@ -79,26 +71,22 @@ impl RegionRequestHandler for StandaloneRegionRequestHandler {
}
}
pub(crate) struct StandaloneDatanode(pub(crate) RegionServer);
#[async_trait]
impl Datanode for StandaloneDatanode {
impl Datanode for StandaloneRegionRequestHandler {
async fn handle(&self, request: RegionRequest) -> MetaResult<AffectedRows> {
let body = request.body.context(meta_error::UnexpectedSnafu {
err_msg: "body not found",
})?;
let resp = self
.0
.handle(body)
let response = self
.handle_inner(request)
.await
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)?;
Ok(resp.affected_rows)
check_response_header(response.header)
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)?;
Ok(response.affected_rows)
}
async fn handle_query(&self, request: QueryRequest) -> MetaResult<SendableRecordBatchStream> {
self.0
self.region_server
.handle_read(request)
.await
.map_err(BoxedError::new)
@@ -111,7 +99,7 @@ pub struct StandaloneDatanodeManager(pub RegionServer);
#[async_trait]
impl DatanodeManager for StandaloneDatanodeManager {
async fn datanode(&self, _datanode: &Peer) -> DatanodeRef {
Arc::new(StandaloneDatanode(self.0.clone()))
StandaloneRegionRequestHandler::arc(self.0.clone())
}
}

View File

@@ -24,6 +24,7 @@ pub mod heartbeat;
pub(crate) mod insert;
pub mod instance;
pub(crate) mod metrics;
pub(crate) mod region_req_factory;
pub(crate) mod req_convert;
mod script;
mod server;

View File

@@ -0,0 +1,43 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::region::region_request::Body;
use api::v1::region::{
DeleteRequests as RegionDeleteRequests, InsertRequests as RegionInsertRequests, RegionRequest,
RegionRequestHeader,
};
pub struct RegionRequestFactory {
header: RegionRequestHeader,
}
impl RegionRequestFactory {
pub fn new(header: RegionRequestHeader) -> Self {
Self { header }
}
pub fn build_insert(&self, requests: RegionInsertRequests) -> RegionRequest {
RegionRequest {
header: Some(self.header.clone()),
body: Some(Body::Inserts(requests)),
}
}
pub fn build_delete(&self, requests: RegionDeleteRequests) -> RegionRequest {
RegionRequest {
header: Some(self.header.clone()),
body: Some(Body::Deletes(requests)),
}
}
}

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub(crate) mod partitioner;
use std::collections::HashMap;
use api::helper::ColumnDataTypeWrapper;

View File

@@ -0,0 +1,69 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::region::{DeleteRequest, InsertRequest};
use api::v1::Rows;
use partition::manager::PartitionRuleManager;
use snafu::ResultExt;
use store_api::storage::{RegionId, TableId};
use crate::error::{Result, SplitDeleteSnafu, SplitInsertSnafu};
pub struct Partitioner<'a> {
partition_manager: &'a PartitionRuleManager,
}
impl<'a> Partitioner<'a> {
pub fn new(partition_manager: &'a PartitionRuleManager) -> Self {
Self { partition_manager }
}
pub async fn partition_insert_requests(
&self,
table_id: TableId,
rows: Rows,
) -> Result<Vec<InsertRequest>> {
let requests = self
.partition_manager
.split_rows(table_id, rows)
.await
.context(SplitInsertSnafu)?
.into_iter()
.map(|(region_number, rows)| InsertRequest {
region_id: RegionId::new(table_id, region_number).into(),
rows: Some(rows),
})
.collect();
Ok(requests)
}
pub async fn partition_delete_requests(
&self,
table_id: TableId,
rows: Rows,
) -> Result<Vec<DeleteRequest>> {
let requests = self
.partition_manager
.split_rows(table_id, rows)
.await
.context(SplitDeleteSnafu)?
.into_iter()
.map(|(region_number, rows)| DeleteRequest {
region_id: RegionId::new(table_id, region_number).into(),
rows: Some(rows),
})
.collect();
Ok(requests)
}
}

View File

@@ -35,6 +35,6 @@ fn request_column_to_row(request: DeleteRequest) -> Result<RowDeleteRequest> {
Ok(RowDeleteRequest {
table_name: request.table_name,
rows: Some(rows),
region_number: request.region_number,
region_number: 0, // FIXME(zhongzc): deprecated field
})
}

View File

@@ -12,27 +12,32 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::region::{
DeleteRequest as RegionDeleteRequest, DeleteRequests as RegionDeleteRequests,
};
use api::v1::region::DeleteRequests as RegionDeleteRequests;
use api::v1::RowDeleteRequests;
use catalog::CatalogManager;
use partition::manager::PartitionRuleManager;
use session::context::QueryContext;
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
use table::TableRef;
use crate::error::{CatalogSnafu, Result, TableNotFoundSnafu};
use crate::req_convert::common::partitioner::Partitioner;
pub struct RowToRegion<'a> {
catalog_manager: &'a dyn CatalogManager,
partition_manager: &'a PartitionRuleManager,
ctx: &'a QueryContext,
}
impl<'a> RowToRegion<'a> {
pub fn new(catalog_manager: &'a dyn CatalogManager, ctx: &'a QueryContext) -> Self {
pub fn new(
catalog_manager: &'a dyn CatalogManager,
partition_manager: &'a PartitionRuleManager,
ctx: &'a QueryContext,
) -> Self {
Self {
catalog_manager,
partition_manager,
ctx,
}
}
@@ -41,13 +46,13 @@ impl<'a> RowToRegion<'a> {
let mut region_request = Vec::with_capacity(requests.deletes.len());
for request in requests.deletes {
let table = self.get_table(&request.table_name).await?;
let table_id = table.table_info().table_id();
let region_id = RegionId::new(table.table_info().table_id(), request.region_number);
let insert_request = RegionDeleteRequest {
region_id: region_id.into(),
rows: request.rows,
};
region_request.push(insert_request);
let requests = Partitioner::new(self.partition_manager)
.partition_delete_requests(table_id, request.rows.unwrap_or_default())
.await?;
region_request.extend(requests);
}
Ok(RegionDeleteRequests {
@@ -63,7 +68,11 @@ impl<'a> RowToRegion<'a> {
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: format!("{}.{}.{}", catalog_name, schema_name, table_name),
table_name: common_catalog::format_full_table_name(
catalog_name,
schema_name,
table_name,
),
})
}
}

View File

@@ -12,37 +12,39 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::region::{
DeleteRequest as RegionDeleteRequest, DeleteRequests as RegionDeleteRequests,
};
use api::v1::region::DeleteRequests as RegionDeleteRequests;
use api::v1::Rows;
use store_api::storage::RegionId;
use partition::manager::PartitionRuleManager;
use table::metadata::TableInfo;
use table::requests::DeleteRequest as TableDeleteRequest;
use crate::error::Result;
use crate::req_convert::common::partitioner::Partitioner;
use crate::req_convert::common::{column_schema, row_count};
pub struct TableToRegion<'a> {
table_info: &'a TableInfo,
partition_manager: &'a PartitionRuleManager,
}
impl<'a> TableToRegion<'a> {
pub fn new(table_info: &'a TableInfo) -> Self {
Self { table_info }
pub fn new(table_info: &'a TableInfo, partition_manager: &'a PartitionRuleManager) -> Self {
Self {
table_info,
partition_manager,
}
}
pub fn convert(&self, request: TableDeleteRequest) -> Result<RegionDeleteRequests> {
let region_id = RegionId::new(self.table_info.table_id(), 0).into();
pub async fn convert(&self, request: TableDeleteRequest) -> Result<RegionDeleteRequests> {
let row_count = row_count(&request.key_column_values)?;
let schema = column_schema(self.table_info, &request.key_column_values)?;
let rows = api::helper::vectors_to_rows(request.key_column_values.values(), row_count);
Ok(RegionDeleteRequests {
requests: vec![RegionDeleteRequest {
region_id,
rows: Some(Rows { schema, rows }),
}],
})
let rows = Rows { schema, rows };
let requests = Partitioner::new(self.partition_manager)
.partition_delete_requests(self.table_info.table_id(), rows)
.await?;
Ok(RegionDeleteRequests { requests })
}
}
@@ -51,114 +53,119 @@ mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::region::DeleteRequest as RegionDeleteRequest;
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, SemanticType};
use api::v1::{ColumnDataType, ColumnSchema, Row, SemanticType, Value};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::schema::{ColumnSchema as DtColumnSchema, Schema};
use datatypes::vectors::{Int16VectorBuilder, MutableVector, StringVectorBuilder};
use table::metadata::{TableInfoBuilder, TableMetaBuilder};
use common_meta::key::catalog_name::{CatalogManager, CatalogNameKey};
use common_meta::key::schema_name::{SchemaManager, SchemaNameKey};
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::KvBackendRef;
use datatypes::vectors::{Int32Vector, VectorRef};
use store_api::storage::RegionId;
use super::*;
use crate::table::test::{create_partition_rule_manager, new_test_table_info};
#[test]
fn test_delete_request_table_to_region() {
let schema = Schema::new(vec![
DtColumnSchema::new("ts", ConcreteDataType::int64_datatype(), false)
.with_time_index(true),
DtColumnSchema::new("id", ConcreteDataType::int16_datatype(), false),
DtColumnSchema::new("host", ConcreteDataType::string_datatype(), false),
]);
async fn prepare_mocked_backend() -> KvBackendRef {
let backend = Arc::new(MemoryKvBackend::default());
let table_meta = TableMetaBuilder::default()
.schema(Arc::new(schema))
.primary_key_indices(vec![1, 2])
.next_column_id(3)
.build()
let catalog_manager = CatalogManager::new(backend.clone());
let schema_manager = SchemaManager::new(backend.clone());
catalog_manager
.create(CatalogNameKey::default())
.await
.unwrap();
schema_manager
.create(SchemaNameKey::default(), None)
.await
.unwrap();
let table_info = Arc::new(
TableInfoBuilder::default()
.name("demo")
.meta(table_meta)
.table_id(1)
.build()
.unwrap(),
);
let delete_request = mock_delete_request();
let mut request = TableToRegion::new(&table_info)
.convert(delete_request)
.unwrap();
assert_eq!(request.requests.len(), 1);
verify_region_insert_request(request.requests.pop().unwrap());
backend
}
fn mock_delete_request() -> TableDeleteRequest {
let mut builder = StringVectorBuilder::with_capacity(3);
builder.push(Some("host1"));
builder.push(None);
builder.push(Some("host3"));
let host = builder.to_vector();
#[tokio::test]
async fn test_delete_request_table_to_region() {
// region to datanode placement:
// 1 -> 1
// 2 -> 2
// 3 -> 3
//
// region value ranges:
// 1 -> [50, max)
// 2 -> [10, 50)
// 3 -> (min, 10)
let mut builder = Int16VectorBuilder::with_capacity(3);
builder.push(Some(1_i16));
builder.push(Some(2_i16));
builder.push(Some(3_i16));
let id = builder.to_vector();
let backend = prepare_mocked_backend().await;
let partition_manager = create_partition_rule_manager(backend.clone()).await;
let table_info = new_test_table_info(1, "table_1", vec![0u32, 1, 2].into_iter());
let key_column_values = HashMap::from([("host".to_string(), host), ("id".to_string(), id)]);
let converter = TableToRegion::new(&table_info, &partition_manager);
let table_request = build_table_request(Arc::new(Int32Vector::from(vec![
Some(1),
None,
Some(11),
Some(101),
])));
let region_requests = converter.convert(table_request).await.unwrap();
let mut region_id_to_region_requests = region_requests
.requests
.into_iter()
.map(|r| (r.region_id, r))
.collect::<HashMap<_, _>>();
let region_id = RegionId::new(1, 1).as_u64();
let region_request = region_id_to_region_requests.remove(&region_id).unwrap();
assert_eq!(
region_request,
build_region_request(vec![Some(101)], region_id)
);
let region_id = RegionId::new(1, 2).as_u64();
let region_request = region_id_to_region_requests.remove(&region_id).unwrap();
assert_eq!(
region_request,
build_region_request(vec![Some(11)], region_id)
);
let region_id = RegionId::new(1, 3).as_u64();
let region_request = region_id_to_region_requests.remove(&region_id).unwrap();
assert_eq!(
region_request,
build_region_request(vec![Some(1), None], region_id)
);
}
fn build_table_request(vector: VectorRef) -> TableDeleteRequest {
TableDeleteRequest {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "demo".to_string(),
key_column_values,
table_name: "table_1".to_string(),
key_column_values: HashMap::from([("a".to_string(), vector)]),
}
}
fn verify_region_insert_request(request: RegionDeleteRequest) {
assert_eq!(request.region_id, RegionId::new(1, 0).as_u64());
let rows = request.rows.unwrap();
for (i, column) in rows.schema.iter().enumerate() {
let name = &column.column_name;
if name == "id" {
assert_eq!(ColumnDataType::Int16 as i32, column.datatype);
assert_eq!(SemanticType::Tag as i32, column.semantic_type);
let values = rows
.rows
.iter()
.map(|row| row.values[i].value_data.clone())
.collect::<Vec<_>>();
assert_eq!(
vec![
Some(ValueData::I16Value(1)),
Some(ValueData::I16Value(2)),
Some(ValueData::I16Value(3))
],
values
);
}
if name == "host" {
assert_eq!(ColumnDataType::String as i32, column.datatype);
assert_eq!(SemanticType::Tag as i32, column.semantic_type);
let values = rows
.rows
.iter()
.map(|row| row.values[i].value_data.clone())
.collect::<Vec<_>>();
assert_eq!(
vec![
Some(ValueData::StringValue("host1".to_string())),
None,
Some(ValueData::StringValue("host3".to_string()))
],
values
);
}
fn build_region_request(rows: Vec<Option<i32>>, region_id: u64) -> RegionDeleteRequest {
RegionDeleteRequest {
region_id,
rows: Some(Rows {
schema: vec![ColumnSchema {
column_name: "a".to_string(),
datatype: ColumnDataType::Int32 as i32,
semantic_type: SemanticType::Tag as i32,
}],
rows: rows
.into_iter()
.map(|v| Row {
values: vec![Value {
value_data: v.map(ValueData::I32Value),
}],
})
.collect(),
}),
}
}
}

View File

@@ -35,6 +35,6 @@ fn request_column_to_row(request: InsertRequest) -> Result<RowInsertRequest> {
Ok(RowInsertRequest {
table_name: request.table_name,
rows: Some(rows),
region_number: request.region_number,
region_number: 0, // FIXME(zhongzc): deprecated field
})
}

View File

@@ -12,27 +12,32 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::region::{
InsertRequest as RegionInsertRequest, InsertRequests as RegionInsertRequests,
};
use api::v1::region::InsertRequests as RegionInsertRequests;
use api::v1::RowInsertRequests;
use catalog::CatalogManager;
use partition::manager::PartitionRuleManager;
use session::context::QueryContext;
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
use table::TableRef;
use crate::error::{CatalogSnafu, Result, TableNotFoundSnafu};
use crate::req_convert::common::partitioner::Partitioner;
pub struct RowToRegion<'a> {
catalog_manager: &'a dyn CatalogManager,
partition_manager: &'a PartitionRuleManager,
ctx: &'a QueryContext,
}
impl<'a> RowToRegion<'a> {
pub fn new(catalog_manager: &'a dyn CatalogManager, ctx: &'a QueryContext) -> Self {
pub fn new(
catalog_manager: &'a dyn CatalogManager,
partition_manager: &'a PartitionRuleManager,
ctx: &'a QueryContext,
) -> Self {
Self {
catalog_manager,
partition_manager,
ctx,
}
}
@@ -41,13 +46,13 @@ impl<'a> RowToRegion<'a> {
let mut region_request = Vec::with_capacity(requests.inserts.len());
for request in requests.inserts {
let table = self.get_table(&request.table_name).await?;
let table_id = table.table_info().table_id();
let region_id = RegionId::new(table.table_info().table_id(), request.region_number);
let insert_request = RegionInsertRequest {
region_id: region_id.into(),
rows: request.rows,
};
region_request.push(insert_request);
let requests = Partitioner::new(self.partition_manager)
.partition_insert_requests(table_id, request.rows.unwrap_or_default())
.await?;
region_request.extend(requests);
}
Ok(RegionInsertRequests {
@@ -63,7 +68,11 @@ impl<'a> RowToRegion<'a> {
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: format!("{}.{}.{}", catalog_name, schema_name, table_name),
table_name: common_catalog::format_full_table_name(
catalog_name,
schema_name,
table_name,
),
})
}
}

View File

@@ -13,18 +13,16 @@
// limitations under the License.
use api::helper::value_to_grpc_value;
use api::v1::region::{
InsertRequest as RegionInsertRequest, InsertRequests as RegionInsertRequests,
};
use api::v1::region::InsertRequests as RegionInsertRequests;
use api::v1::{ColumnSchema as GrpcColumnSchema, Row, Rows, Value as GrpcValue};
use catalog::CatalogManager;
use datatypes::schema::{ColumnSchema, SchemaRef};
use partition::manager::PartitionRuleManager;
use session::context::QueryContext;
use snafu::{ensure, OptionExt, ResultExt};
use sql::statements;
use sql::statements::insert::Insert;
use sqlparser::ast::{ObjectName, Value as SqlValue};
use store_api::storage::RegionId;
use table::TableRef;
use super::{data_type, semantic_type};
@@ -32,18 +30,25 @@ use crate::error::{
CatalogSnafu, ColumnDefaultValueSnafu, ColumnNoneDefaultValueSnafu, ColumnNotFoundSnafu,
InvalidSqlSnafu, MissingInsertBodySnafu, ParseSqlSnafu, Result, TableNotFoundSnafu,
};
use crate::req_convert::common::partitioner::Partitioner;
const DEFAULT_PLACEHOLDER_VALUE: &str = "default";
pub struct StatementToRegion<'a> {
catalog_manager: &'a dyn CatalogManager,
partition_manager: &'a PartitionRuleManager,
ctx: &'a QueryContext,
}
impl<'a> StatementToRegion<'a> {
pub fn new(catalog_manager: &'a dyn CatalogManager, ctx: &'a QueryContext) -> Self {
pub fn new(
catalog_manager: &'a dyn CatalogManager,
partition_manager: &'a PartitionRuleManager,
ctx: &'a QueryContext,
) -> Self {
Self {
catalog_manager,
partition_manager,
ctx,
}
}
@@ -63,7 +68,7 @@ impl<'a> StatementToRegion<'a> {
ensure!(
sql_rows.iter().all(|row| row.len() == column_count),
InvalidSqlSnafu {
err_msg: "The column count of the row is not the same as columns."
err_msg: "column count mismatch"
}
);
@@ -98,12 +103,10 @@ impl<'a> StatementToRegion<'a> {
}
}
Ok(RegionInsertRequests {
requests: vec![RegionInsertRequest {
region_id: RegionId::new(table_info.table_id(), 0).into(),
rows: Some(Rows { schema, rows }),
}],
})
let requests = Partitioner::new(self.partition_manager)
.partition_insert_requests(table_info.table_id(), Rows { schema, rows })
.await?;
Ok(RegionInsertRequests { requests })
}
async fn get_table(&self, catalog: &str, schema: &str, table: &str) -> Result<TableRef> {
@@ -112,7 +115,7 @@ impl<'a> StatementToRegion<'a> {
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: format!("{}.{}.{}", catalog, schema, table),
table_name: common_catalog::format_full_table_name(catalog, schema, table),
})
}

View File

@@ -12,37 +12,39 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::region::{
InsertRequest as RegionInsertRequest, InsertRequests as RegionInsertRequests,
};
use api::v1::region::InsertRequests as RegionInsertRequests;
use api::v1::Rows;
use store_api::storage::RegionId;
use partition::manager::PartitionRuleManager;
use table::metadata::TableInfo;
use table::requests::InsertRequest as TableInsertRequest;
use crate::error::Result;
use crate::req_convert::common::partitioner::Partitioner;
use crate::req_convert::common::{column_schema, row_count};
pub struct TableToRegion<'a> {
table_info: &'a TableInfo,
partition_manager: &'a PartitionRuleManager,
}
impl<'a> TableToRegion<'a> {
pub fn new(table_info: &'a TableInfo) -> Self {
Self { table_info }
pub fn new(table_info: &'a TableInfo, partition_manager: &'a PartitionRuleManager) -> Self {
Self {
table_info,
partition_manager,
}
}
pub fn convert(&self, request: TableInsertRequest) -> Result<RegionInsertRequests> {
let region_id = RegionId::new(self.table_info.table_id(), request.region_number).into();
pub async fn convert(&self, request: TableInsertRequest) -> Result<RegionInsertRequests> {
let row_count = row_count(&request.columns_values)?;
let schema = column_schema(self.table_info, &request.columns_values)?;
let rows = api::helper::vectors_to_rows(request.columns_values.values(), row_count);
Ok(RegionInsertRequests {
requests: vec![RegionInsertRequest {
region_id,
rows: Some(Rows { schema, rows }),
}],
})
let rows = Rows { schema, rows };
let requests = Partitioner::new(self.partition_manager)
.partition_insert_requests(self.table_info.table_id(), rows)
.await?;
Ok(RegionInsertRequests { requests })
}
}
@@ -51,115 +53,120 @@ mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::region::InsertRequest as RegionInsertRequest;
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, SemanticType};
use api::v1::{ColumnDataType, ColumnSchema, Row, SemanticType, Value};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::schema::{ColumnSchema as DtColumnSchema, Schema};
use datatypes::vectors::{Int16VectorBuilder, MutableVector, StringVectorBuilder};
use table::metadata::{TableInfoBuilder, TableMetaBuilder};
use common_meta::key::catalog_name::{CatalogManager, CatalogNameKey};
use common_meta::key::schema_name::{SchemaManager, SchemaNameKey};
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::KvBackendRef;
use datatypes::vectors::{Int32Vector, VectorRef};
use store_api::storage::RegionId;
use super::*;
use crate::table::test::{create_partition_rule_manager, new_test_table_info};
#[test]
fn test_insert_request_table_to_region() {
let schema = Schema::new(vec![
DtColumnSchema::new("ts", ConcreteDataType::int64_datatype(), false)
.with_time_index(true),
DtColumnSchema::new("id", ConcreteDataType::int16_datatype(), false),
DtColumnSchema::new("host", ConcreteDataType::string_datatype(), false),
]);
async fn prepare_mocked_backend() -> KvBackendRef {
let backend = Arc::new(MemoryKvBackend::default());
let table_meta = TableMetaBuilder::default()
.schema(Arc::new(schema))
.primary_key_indices(vec![2])
.next_column_id(3)
.build()
let catalog_manager = CatalogManager::new(backend.clone());
let schema_manager = SchemaManager::new(backend.clone());
catalog_manager
.create(CatalogNameKey::default())
.await
.unwrap();
schema_manager
.create(SchemaNameKey::default(), None)
.await
.unwrap();
let table_info = Arc::new(
TableInfoBuilder::default()
.name("demo")
.meta(table_meta)
.table_id(1)
.build()
.unwrap(),
);
let insert_request = mock_insert_request();
let mut request = TableToRegion::new(&table_info)
.convert(insert_request)
.unwrap();
assert_eq!(request.requests.len(), 1);
verify_region_insert_request(request.requests.pop().unwrap());
backend
}
fn mock_insert_request() -> TableInsertRequest {
let mut builder = StringVectorBuilder::with_capacity(3);
builder.push(Some("host1"));
builder.push(None);
builder.push(Some("host3"));
let host = builder.to_vector();
#[tokio::test]
async fn test_insert_request_table_to_region() {
// region to datanode placement:
// 1 -> 1
// 2 -> 2
// 3 -> 3
//
// region value ranges:
// 1 -> [50, max)
// 2 -> [10, 50)
// 3 -> (min, 10)
let mut builder = Int16VectorBuilder::with_capacity(3);
builder.push(Some(1_i16));
builder.push(Some(2_i16));
builder.push(Some(3_i16));
let id = builder.to_vector();
let backend = prepare_mocked_backend().await;
let partition_manager = create_partition_rule_manager(backend.clone()).await;
let table_info = new_test_table_info(1, "table_1", vec![0u32, 1, 2].into_iter());
let columns_values = HashMap::from([("host".to_string(), host), ("id".to_string(), id)]);
let converter = TableToRegion::new(&table_info, &partition_manager);
let table_request = build_table_request(Arc::new(Int32Vector::from(vec![
Some(1),
None,
Some(11),
Some(101),
])));
let region_requests = converter.convert(table_request).await.unwrap();
let mut region_id_to_region_requests = region_requests
.requests
.into_iter()
.map(|r| (r.region_id, r))
.collect::<HashMap<_, _>>();
let region_id = RegionId::new(1, 1).as_u64();
let region_request = region_id_to_region_requests.remove(&region_id).unwrap();
assert_eq!(
region_request,
build_region_request(vec![Some(101)], region_id)
);
let region_id = RegionId::new(1, 2).as_u64();
let region_request = region_id_to_region_requests.remove(&region_id).unwrap();
assert_eq!(
region_request,
build_region_request(vec![Some(11)], region_id)
);
let region_id = RegionId::new(1, 3).as_u64();
let region_request = region_id_to_region_requests.remove(&region_id).unwrap();
assert_eq!(
region_request,
build_region_request(vec![Some(1), None], region_id)
);
}
fn build_table_request(vector: VectorRef) -> TableInsertRequest {
TableInsertRequest {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "demo".to_string(),
columns_values,
table_name: "table_1".to_string(),
columns_values: HashMap::from([("a".to_string(), vector)]),
region_number: 0,
}
}
fn verify_region_insert_request(request: RegionInsertRequest) {
assert_eq!(request.region_id, RegionId::new(1, 0).as_u64());
let rows = request.rows.unwrap();
for (i, column) in rows.schema.iter().enumerate() {
let name = &column.column_name;
if name == "id" {
assert_eq!(ColumnDataType::Int16 as i32, column.datatype);
assert_eq!(SemanticType::Field as i32, column.semantic_type);
let values = rows
.rows
.iter()
.map(|row| row.values[i].value_data.clone())
.collect::<Vec<_>>();
assert_eq!(
vec![
Some(ValueData::I16Value(1)),
Some(ValueData::I16Value(2)),
Some(ValueData::I16Value(3))
],
values
);
}
if name == "host" {
assert_eq!(ColumnDataType::String as i32, column.datatype);
assert_eq!(SemanticType::Tag as i32, column.semantic_type);
let values = rows
.rows
.iter()
.map(|row| row.values[i].value_data.clone())
.collect::<Vec<_>>();
assert_eq!(
vec![
Some(ValueData::StringValue("host1".to_string())),
None,
Some(ValueData::StringValue("host3".to_string()))
],
values
);
}
fn build_region_request(rows: Vec<Option<i32>>, region_id: u64) -> RegionInsertRequest {
RegionInsertRequest {
region_id,
rows: Some(Rows {
schema: vec![ColumnSchema {
column_name: "a".to_string(),
datatype: ColumnDataType::Int32 as i32,
semantic_type: SemanticType::Tag as i32,
}],
rows: rows
.into_iter()
.map(|v| Row {
values: vec![Value {
value_data: v.map(ValueData::I32Value),
}],
})
.collect(),
}),
}
}
}

View File

@@ -25,9 +25,7 @@ use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
use api::v1::region::region_request;
use catalog::CatalogManagerRef;
use client::region_handler::RegionRequestHandlerRef;
use common_error::ext::BoxedError;
use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::ddl::DdlTaskExecutorRef;
@@ -47,16 +45,15 @@ use sql::statements::copy::{CopyDatabaseArgument, CopyTable, CopyTableArgument};
use sql::statements::statement::Statement;
use sqlparser::ast::ObjectName;
use table::engine::TableReference;
use table::requests::{
CopyDatabaseRequest, CopyDirection, CopyTableRequest, DeleteRequest, InsertRequest,
};
use table::requests::{CopyDatabaseRequest, CopyDirection, CopyTableRequest};
use table::TableRef;
use crate::delete::DeleterRef;
use crate::error::{
self, CatalogSnafu, ExecLogicalPlanSnafu, ExternalSnafu, InvalidSqlSnafu, PlanStatementSnafu,
RequestDatanodeSnafu, Result, TableNotFoundSnafu,
Result, TableNotFoundSnafu,
};
use crate::req_convert::{delete, insert};
use crate::insert::InserterRef;
use crate::statement::backup::{COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY};
use crate::table::table_idents_to_full_name;
@@ -64,30 +61,33 @@ use crate::table::table_idents_to_full_name;
pub struct StatementExecutor {
catalog_manager: CatalogManagerRef,
query_engine: QueryEngineRef,
region_request_handler: RegionRequestHandlerRef,
ddl_executor: DdlTaskExecutorRef,
table_metadata_manager: TableMetadataManagerRef,
partition_manager: PartitionRuleManagerRef,
cache_invalidator: CacheInvalidatorRef,
inserter: InserterRef,
deleter: DeleterRef,
}
impl StatementExecutor {
pub(crate) fn new(
catalog_manager: CatalogManagerRef,
query_engine: QueryEngineRef,
region_request_handler: RegionRequestHandlerRef,
ddl_task_executor: DdlTaskExecutorRef,
kv_backend: KvBackendRef,
cache_invalidator: CacheInvalidatorRef,
inserter: InserterRef,
deleter: DeleterRef,
) -> Self {
Self {
catalog_manager,
query_engine,
region_request_handler,
ddl_executor: ddl_task_executor,
table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())),
partition_manager: Arc::new(PartitionRuleManager::new(kv_backend)),
cache_invalidator,
inserter,
deleter,
}
}
@@ -224,50 +224,6 @@ impl StatementExecutor {
table_name: table_ref.to_string(),
})
}
async fn handle_table_insert_request(
&self,
request: InsertRequest,
query_ctx: QueryContextRef,
) -> Result<usize> {
let table_ref = TableReference::full(
&request.catalog_name,
&request.schema_name,
&request.table_name,
);
let table = self.get_table(&table_ref).await?;
let table_info = table.table_info();
let request = insert::TableToRegion::new(&table_info).convert(request)?;
let affected_rows = self
.region_request_handler
.handle(region_request::Body::Inserts(request), query_ctx)
.await
.context(RequestDatanodeSnafu)?;
Ok(affected_rows as _)
}
async fn handle_table_delete_request(
&self,
request: DeleteRequest,
query_ctx: QueryContextRef,
) -> Result<usize> {
let table_ref = TableReference::full(
&request.catalog_name,
&request.schema_name,
&request.table_name,
);
let table = self.get_table(&table_ref).await?;
let table_info = table.table_info();
let request = delete::TableToRegion::new(&table_info).convert(request)?;
let affected_rows = self
.region_request_handler
.handle(region_request::Body::Deletes(request), query_ctx)
.await
.context(RequestDatanodeSnafu)?;
Ok(affected_rows as _)
}
}
fn to_copy_table_request(stmt: CopyTable, query_ctx: QueryContextRef) -> Result<CopyTableRequest> {

View File

@@ -327,13 +327,12 @@ impl StatementExecutor {
.zip(vectors)
.collect::<HashMap<_, _>>();
pending.push(self.handle_table_insert_request(
pending.push(self.inserter.handle_table_insert(
InsertRequest {
catalog_name: req.catalog_name.to_string(),
schema_name: req.schema_name.to_string(),
table_name: req.table_name.to_string(),
columns_values,
// TODO: support multi-regions
region_number: 0,
},
query_ctx.clone(),

View File

@@ -14,7 +14,6 @@
use std::collections::HashMap;
use api::v1::region::region_request;
use common_query::Output;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use datafusion_expr::{DmlStatement, LogicalPlan as DfLogicalPlan, WriteOp};
@@ -35,23 +34,16 @@ use table::TableRef;
use super::StatementExecutor;
use crate::error::{
BuildColumnVectorsSnafu, ExecLogicalPlanSnafu, MissingTimeIndexColumnSnafu,
ReadRecordBatchSnafu, RequestDatanodeSnafu, Result, UnexpectedSnafu,
ReadRecordBatchSnafu, Result, UnexpectedSnafu,
};
use crate::req_convert::insert::StatementToRegion;
impl StatementExecutor {
pub async fn insert(&self, insert: Box<Insert>, query_ctx: QueryContextRef) -> Result<Output> {
if insert.can_extract_values() {
// Fast path: plain insert ("insert with literal values") is executed directly
let request = StatementToRegion::new(self.catalog_manager.as_ref(), &query_ctx)
.convert(&insert)
.await?;
let affected_rows = self
.region_request_handler
.handle(region_request::Body::Inserts(request), query_ctx)
self.inserter
.handle_statement_insert(insert.as_ref(), &query_ctx)
.await
.context(RequestDatanodeSnafu)?;
Ok(Output::AffectedRows(affected_rows as _))
} else {
// Slow path: insert with subquery. Execute the subquery first, via query engine. Then
// insert the results by sending insert requests.
@@ -82,7 +74,8 @@ impl StatementExecutor {
let insert_request =
build_insert_request(record_batch, table.schema(), &table_info)?;
affected_rows += self
.handle_table_insert_request(insert_request, query_ctx.clone())
.inserter
.handle_table_insert(insert_request, query_ctx.clone())
.await?;
}
@@ -114,13 +107,14 @@ impl StatementExecutor {
let table_info = table.table_info();
while let Some(batch) = stream.next().await {
let record_batch = batch.context(ReadRecordBatchSnafu)?;
let delete_request = build_delete_request(record_batch, table.schema(), &table_info)?;
let request = build_delete_request(record_batch, table.schema(), &table_info)?;
affected_rows += self
.handle_table_delete_request(delete_request, query_ctx.clone())
.deleter
.handle_table_delete(request, query_ctx.clone())
.await?;
}
Ok(Output::AffectedRows(affected_rows))
Ok(Output::AffectedRows(affected_rows as _))
}
async fn execute_dml_subquery(

View File

@@ -116,20 +116,20 @@ pub(crate) mod test {
use super::*;
fn new_test_table_info(
pub fn new_test_table_info(
table_id: u32,
table_name: &str,
region_numbers: impl Iterator<Item = u32>,
) -> TableInfo {
let column_schemas = vec![
ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
ColumnSchema::new("a", ConcreteDataType::int32_datatype(), true),
ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_time_index(true),
ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
ColumnSchema::new("b", ConcreteDataType::int32_datatype(), true),
];
let schema = SchemaBuilder::try_from(column_schemas)
.unwrap()

View File

@@ -34,7 +34,7 @@ pub mod procedure;
pub mod pubsub;
pub mod selector;
pub mod service;
pub mod table_creator;
pub mod table_meta_alloc;
pub mod table_routes;
pub use crate::error::Result;

View File

@@ -54,7 +54,7 @@ use crate::service::mailbox::MailboxRef;
use crate::service::store::cached_kv::{CheckLeader, LeaderCachedKvStore};
use crate::service::store::kv::{KvBackendAdapter, KvStoreRef, ResettableKvStoreRef};
use crate::service::store::memory::MemStore;
use crate::table_creator::MetaSrvTableMetadataAllocator;
use crate::table_meta_alloc::MetaSrvTableMetadataAllocator;
// TODO(fys): try use derive_builder macro
pub struct MetaSrvBuilder {
@@ -366,7 +366,7 @@ fn build_ddl_manager(
},
));
let table_creator = Arc::new(MetaSrvTableMetadataAllocator::new(
let table_meta_allocator = Arc::new(MetaSrvTableMetadataAllocator::new(
selector_ctx.clone(),
selector.clone(),
table_id_sequence.clone(),
@@ -377,7 +377,7 @@ fn build_ddl_manager(
datanode_clients,
cache_invalidator,
table_metadata_manager.clone(),
table_creator,
table_meta_allocator,
))
}

View File

@@ -15,7 +15,7 @@
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use api::v1::region::{DeleteRequest, InsertRequest};
use api::v1::Rows;
use common_meta::key::table_route::TableRouteManager;
use common_meta::kv_backend::KvBackendRef;
use common_meta::peer::Peer;
@@ -31,7 +31,7 @@ use crate::columns::RangeColumnsPartitionRule;
use crate::error::{FindLeaderSnafu, Result};
use crate::partition::{PartitionBound, PartitionDef, PartitionExpr};
use crate::range::RangePartitionRule;
use crate::splitter::{DeleteRequestSplits, InsertRequestSplits, RowSplitter};
use crate::splitter::RowSplitter;
use crate::{error, PartitionRuleRef};
#[async_trait::async_trait]
@@ -247,26 +247,24 @@ impl PartitionRuleManager {
Ok(regions)
}
/// Split [InsertRequest] into [InsertRequestSplits] according to the partition rule
/// of given table.
pub async fn split_insert_request(
&self,
table: TableId,
req: InsertRequest,
) -> Result<InsertRequestSplits> {
let partition_rule = self.find_table_partition_rule(table).await?;
RowSplitter::new(partition_rule).split_insert(req)
pub async fn find_region_leader(&self, region_id: RegionId) -> Result<Peer> {
let table_route = self.find_table_route(region_id.table_id()).await?;
let peer = table_route
.find_region_leader(region_id.region_number())
.with_context(|| FindLeaderSnafu {
region_id,
table_id: region_id.table_id(),
})?;
Ok(peer.clone())
}
/// Split [DeleteRequest] into [DeleteRequestSplits] according to the partition rule
/// of given table.
pub async fn split_delete_request(
pub async fn split_rows(
&self,
table: TableId,
req: DeleteRequest,
) -> Result<DeleteRequestSplits> {
let partition_rule = self.find_table_partition_rule(table).await?;
RowSplitter::new(partition_rule).split_delete(req)
table_id: TableId,
rows: Rows,
) -> Result<HashMap<RegionNumber, Rows>> {
let partition_rule = self.find_table_partition_rule(table_id).await?;
RowSplitter::new(partition_rule).split(rows)
}
}

View File

@@ -15,17 +15,13 @@
use std::collections::HashMap;
use api::helper;
use api::v1::region::{DeleteRequest, InsertRequest};
use api::v1::{ColumnSchema, Row, Rows};
use datatypes::value::Value;
use store_api::storage::{RegionId, RegionNumber};
use store_api::storage::RegionNumber;
use crate::error::Result;
use crate::PartitionRuleRef;
pub type InsertRequestSplits = HashMap<RegionNumber, InsertRequest>;
pub type DeleteRequestSplits = HashMap<RegionNumber, DeleteRequest>;
pub struct RowSplitter {
partition_rule: PartitionRuleRef,
}
@@ -35,43 +31,8 @@ impl RowSplitter {
Self { partition_rule }
}
pub fn split_insert(&self, req: InsertRequest) -> Result<InsertRequestSplits> {
let table_id = RegionId::from_u64(req.region_id).table_id();
Ok(self
.split(req.rows)?
.into_iter()
.map(|(region_number, rows)| {
let region_id = RegionId::new(table_id, region_number);
let req = InsertRequest {
rows: Some(rows),
region_id: region_id.into(),
};
(region_number, req)
})
.collect())
}
pub fn split_delete(&self, req: DeleteRequest) -> Result<DeleteRequestSplits> {
let table_id = RegionId::from_u64(req.region_id).table_id();
Ok(self
.split(req.rows)?
.into_iter()
.map(|(region_number, rows)| {
let region_id = RegionId::new(table_id, region_number);
let req = DeleteRequest {
rows: Some(rows),
region_id: region_id.into(),
};
(region_number, req)
})
.collect())
}
fn split(&self, rows: Option<Rows>) -> Result<HashMap<RegionNumber, Rows>> {
pub fn split(&self, rows: Rows) -> Result<HashMap<RegionNumber, Rows>> {
// No data
let Some(rows) = rows else {
return Ok(HashMap::new());
};
if rows.rows.is_empty() {
return Ok(HashMap::new());
}
@@ -177,7 +138,7 @@ mod tests {
use crate::partition::PartitionExpr;
use crate::PartitionRule;
fn mock_insert_request() -> InsertRequest {
fn mock_rows() -> Rows {
let schema = vec![
ColumnSchema {
column_name: "id".to_string(),
@@ -218,10 +179,7 @@ mod tests {
],
},
];
InsertRequest {
rows: Some(Rows { schema, rows }),
region_id: 0,
}
Rows { schema, rows }
}
#[derive(Debug, Serialize, Deserialize)]
@@ -301,53 +259,42 @@ mod tests {
#[test]
fn test_writer_splitter() {
let insert_request = mock_insert_request();
let rows = mock_rows();
let rule = Arc::new(MockPartitionRule) as PartitionRuleRef;
let splitter = RowSplitter::new(rule);
let splits = splitter.split_insert(insert_request).unwrap();
let mut splits = splitter.split(rows).unwrap();
assert_eq!(splits.len(), 2);
let req0 = &splits[&0];
let req1 = &splits[&1];
assert_eq!(req0.region_id, 0);
assert_eq!(req1.region_id, 1);
let rows0 = req0.rows.as_ref().unwrap();
let rows1 = req1.rows.as_ref().unwrap();
assert_eq!(rows0.rows.len(), 1);
assert_eq!(rows1.rows.len(), 2);
let rows0 = splits.remove(&0).unwrap().rows;
let rows1 = splits.remove(&1).unwrap().rows;
assert_eq!(rows0.len(), 1);
assert_eq!(rows1.len(), 2);
}
#[test]
fn test_missed_col_writer_splitter() {
let insert_request = mock_insert_request();
let rows = mock_rows();
let rule = Arc::new(MockMissedColPartitionRule) as PartitionRuleRef;
let splitter = RowSplitter::new(rule);
let splits = splitter.split_insert(insert_request).unwrap();
let splitter = RowSplitter::new(rule);
let mut splits = splitter.split(rows).unwrap();
assert_eq!(splits.len(), 1);
let req = &splits[&1];
assert_eq!(req.region_id, 1);
let rows = req.rows.as_ref().unwrap();
assert_eq!(rows.rows.len(), 3);
let rows = splits.remove(&1).unwrap().rows;
assert_eq!(rows.len(), 3);
}
#[test]
fn test_empty_partition_rule_writer_splitter() {
let insert_request = mock_insert_request();
let rows = mock_rows();
let rule = Arc::new(EmptyPartitionRule) as PartitionRuleRef;
let splitter = RowSplitter::new(rule);
let splits = splitter.split_insert(insert_request).unwrap();
let mut splits = splitter.split(rows).unwrap();
assert_eq!(splits.len(), 1);
let req = &splits[&0];
assert_eq!(req.region_id, 0);
let rows = req.rows.as_ref().unwrap();
assert_eq!(rows.rows.len(), 3);
let rows = splits.remove(&0).unwrap().rows;
assert_eq!(rows.len(), 3);
}
}