feat: support "delete" in distributed mode (#1441)

* feat: support "delete" in distributed mode

* fix: resolve PR comments
This commit is contained in:
LFC
2023-04-24 12:07:50 +08:00
committed by GitHub
parent 7c6754d03e
commit 17daf4cdff
33 changed files with 965 additions and 286 deletions

View File

@@ -10,7 +10,7 @@ common-base = { path = "../common/base" }
common-error = { path = "../common/error" }
common-time = { path = "../common/time" }
datatypes = { path = "../datatypes" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "0bebe5f69c91cdfbce85cb8f45f9fcd28185261c" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "57e6a032db70264ec394cbb8a2f327ad33705d76" }
prost.workspace = true
snafu = { version = "0.7", features = ["backtraces"] }
tonic.workspace = true

View File

@@ -18,8 +18,8 @@ use api::v1::greptime_request::Request;
use api::v1::query_request::Query;
use api::v1::{
greptime_response, AffectedRows, AlterExpr, AuthHeader, CreateTableExpr, DdlRequest,
DropTableExpr, FlushTableExpr, GreptimeRequest, InsertRequest, PromRangeQuery, QueryRequest,
RequestHeader,
DeleteRequest, DropTableExpr, FlushTableExpr, GreptimeRequest, InsertRequest, PromRangeQuery,
QueryRequest, RequestHeader,
};
use arrow_flight::{FlightData, Ticket};
use common_error::prelude::*;
@@ -108,6 +108,15 @@ impl Database {
pub async fn insert(&self, request: InsertRequest) -> Result<u32> {
let _timer = timer!(metrics::METRIC_GRPC_INSERT);
self.handle(Request::Insert(request)).await
}
pub async fn delete(&self, request: DeleteRequest) -> Result<u32> {
let _timer = timer!(metrics::METRIC_GRPC_DELETE);
self.handle(Request::Delete(request)).await
}
async fn handle(&self, request: Request) -> Result<u32> {
let mut client = self.client.make_database_client()?.inner;
let request = GreptimeRequest {
header: Some(RequestHeader {
@@ -116,7 +125,7 @@ impl Database {
authorization: self.ctx.auth_header.clone(),
dbname: self.dbname.clone(),
}),
request: Some(Request::Insert(request)),
request: Some(request),
};
let response = client
.handle(request)

View File

@@ -16,6 +16,7 @@
pub const METRIC_GRPC_CREATE_TABLE: &str = "grpc.create_table";
pub const METRIC_GRPC_PROMQL_RANGE_QUERY: &str = "grpc.promql.range_query";
pub const METRIC_GRPC_INSERT: &str = "grpc.insert";
pub const METRIC_GRPC_DELETE: &str = "grpc.delete";
pub const METRIC_GRPC_SQL: &str = "grpc.sql";
pub const METRIC_GRPC_LOGICAL_PLAN: &str = "grpc.logical_plan";
pub const METRIC_GRPC_ALTER: &str = "grpc.alter";

View File

@@ -0,0 +1,113 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use api::helper::ColumnDataTypeWrapper;
use api::v1::{Column, DeleteRequest as GrpcDeleteRequest};
use datatypes::data_type::DataType;
use datatypes::prelude::ConcreteDataType;
use snafu::{ensure, ResultExt};
use table::requests::DeleteRequest;
use crate::error::{ColumnDataTypeSnafu, IllegalDeleteRequestSnafu, Result};
use crate::insert::add_values_to_builder;
pub fn to_table_delete_request(request: GrpcDeleteRequest) -> Result<DeleteRequest> {
let row_count = request.row_count as usize;
let mut key_column_values = HashMap::with_capacity(request.key_columns.len());
for Column {
column_name,
values,
null_mask,
datatype,
..
} in request.key_columns
{
let Some(values) = values else { continue };
let datatype: ConcreteDataType = ColumnDataTypeWrapper::try_new(datatype)
.context(ColumnDataTypeSnafu)?
.into();
let vector_builder = &mut datatype.create_mutable_vector(row_count);
add_values_to_builder(vector_builder, values, row_count, null_mask)?;
ensure!(
key_column_values
.insert(column_name.clone(), vector_builder.to_vector())
.is_none(),
IllegalDeleteRequestSnafu {
reason: format!("Duplicated column '{column_name}' in delete request.")
}
);
}
Ok(DeleteRequest { key_column_values })
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use api::v1::column::Values;
use api::v1::ColumnDataType;
use datatypes::prelude::{ScalarVector, VectorRef};
use datatypes::vectors::{Int32Vector, StringVector};
use super::*;
#[test]
fn test_to_table_delete_request() {
let grpc_request = GrpcDeleteRequest {
table_name: "foo".to_string(),
region_number: 0,
key_columns: vec![
Column {
column_name: "id".to_string(),
values: Some(Values {
i32_values: vec![1, 2, 3],
..Default::default()
}),
datatype: ColumnDataType::Int32 as i32,
..Default::default()
},
Column {
column_name: "name".to_string(),
values: Some(Values {
string_values: vec!["a".to_string(), "b".to_string(), "c".to_string()],
..Default::default()
}),
datatype: ColumnDataType::String as i32,
..Default::default()
},
],
row_count: 3,
};
let mut request = to_table_delete_request(grpc_request).unwrap();
assert_eq!(
Arc::new(Int32Vector::from_slice(vec![1, 2, 3])) as VectorRef,
request.key_column_values.remove("id").unwrap()
);
assert_eq!(
Arc::new(StringVector::from_slice(&["a", "b", "c"])) as VectorRef,
request.key_column_values.remove("name").unwrap()
);
assert!(request.key_column_values.is_empty());
}
}

View File

@@ -34,6 +34,9 @@ pub enum Error {
#[snafu(display("Illegal insert data"))]
IllegalInsertData { location: Location },
#[snafu(display("Illegal delete request, reason: {reason}"))]
IllegalDeleteRequest { reason: String, location: Location },
#[snafu(display("Column datatype error, source: {}", source))]
ColumnDataType {
#[snafu(backtrace)]
@@ -95,9 +98,11 @@ impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::ColumnNotFound { .. } => StatusCode::TableColumnNotFound,
Error::DecodeInsert { .. } | Error::IllegalInsertData { .. } => {
StatusCode::InvalidArguments
}
Error::DecodeInsert { .. }
| Error::IllegalInsertData { .. }
| Error::IllegalDeleteRequest { .. } => StatusCode::InvalidArguments,
Error::ColumnDataType { .. } => StatusCode::Internal,
Error::DuplicatedTimestampColumn { .. } | Error::MissingTimestampColumn { .. } => {
StatusCode::InvalidArguments

View File

@@ -307,7 +307,7 @@ pub fn to_table_insert_request(
})
}
fn add_values_to_builder(
pub(crate) fn add_values_to_builder(
builder: &mut Box<dyn MutableVector>,
values: Values,
row_count: usize,

View File

@@ -13,6 +13,7 @@
// limitations under the License.
mod alter;
pub mod delete;
pub mod error;
pub mod insert;

View File

@@ -282,6 +282,12 @@ pub enum Error {
source: common_grpc_expr::error::Error,
},
#[snafu(display("Failed to convert delete expr to request: {}", source))]
DeleteExprToRequest {
#[snafu(backtrace)]
source: common_grpc_expr::error::Error,
},
#[snafu(display("Failed to parse SQL, source: {}", source))]
ParseSql {
#[snafu(backtrace)]
@@ -453,6 +459,7 @@ impl ErrorExt for Error {
AlterExprToRequest { source, .. }
| CreateExprToRequest { source }
| DeleteExprToRequest { source }
| InsertData { source } => source.status_code(),
ConvertSchema { source, .. } | VectorComputation { source } => source.status_code(),

View File

@@ -15,7 +15,7 @@
use api::v1::ddl_request::Expr as DdlExpr;
use api::v1::greptime_request::Request as GrpcRequest;
use api::v1::query_request::Query;
use api::v1::{CreateDatabaseExpr, DdlRequest, InsertRequest};
use api::v1::{CreateDatabaseExpr, DdlRequest, DeleteRequest, InsertRequest};
use async_trait::async_trait;
use common_query::Output;
use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
@@ -26,11 +26,13 @@ use session::context::{QueryContext, QueryContextRef};
use snafu::prelude::*;
use sql::statements::statement::Statement;
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use table::engine::TableReference;
use table::requests::CreateDatabaseRequest;
use crate::error::{
self, DecodeLogicalPlanSnafu, ExecuteLogicalPlanSnafu, ExecuteSqlSnafu, PlanStatementSnafu,
Result,
self, CatalogSnafu, DecodeLogicalPlanSnafu, DeleteExprToRequestSnafu, DeleteSnafu,
ExecuteLogicalPlanSnafu, ExecuteSqlSnafu, InsertSnafu, PlanStatementSnafu, Result,
TableNotFoundSnafu,
};
use crate::instance::Instance;
@@ -104,20 +106,47 @@ impl Instance {
let catalog = &ctx.current_catalog();
let schema = &ctx.current_schema();
let table_name = &request.table_name.clone();
let table_ref = TableReference::full(catalog, schema, table_name);
let table = self
.catalog_manager
.table(catalog, schema, table_name)
.await
.context(error::CatalogSnafu)?
.context(error::TableNotFoundSnafu { table_name })?;
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: table_ref.to_string(),
})?;
let request = common_grpc_expr::insert::to_table_insert_request(catalog, schema, request)
.context(error::InsertDataSnafu)?;
let affected_rows = table
.insert(request)
let affected_rows = table.insert(request).await.with_context(|_| InsertSnafu {
table_name: table_ref.to_string(),
})?;
Ok(Output::AffectedRows(affected_rows))
}
async fn handle_delete(&self, request: DeleteRequest, ctx: QueryContextRef) -> Result<Output> {
let catalog = &ctx.current_catalog();
let schema = &ctx.current_schema();
let table_name = &request.table_name.clone();
let table_ref = TableReference::full(catalog, schema, table_name);
let table = self
.catalog_manager
.table(catalog, schema, table_name)
.await
.context(error::InsertSnafu { table_name })?;
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: table_ref.to_string(),
})?;
let request = common_grpc_expr::delete::to_table_delete_request(request)
.context(DeleteExprToRequestSnafu)?;
let affected_rows = table.delete(request).await.with_context(|_| DeleteSnafu {
table_name: table_ref.to_string(),
})?;
Ok(Output::AffectedRows(affected_rows))
}
@@ -142,6 +171,7 @@ impl GrpcQueryHandler for Instance {
async fn do_query(&self, request: GrpcRequest, ctx: QueryContextRef) -> Result<Output> {
match request {
GrpcRequest::Insert(request) => self.handle_insert(request, ctx).await,
GrpcRequest::Delete(request) => self.handle_delete(request, ctx).await,
GrpcRequest::Query(query_request) => {
let query = query_request
.query
@@ -334,6 +364,72 @@ mod test {
assert_eq!(recordbatches.pretty_print().unwrap(), expected);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_handle_delete() {
let instance = MockInstance::new("test_handle_delete").await;
let instance = instance.inner();
test_util::create_test_table(instance, ConcreteDataType::timestamp_millisecond_datatype())
.await
.unwrap();
let query = GrpcRequest::Query(QueryRequest {
query: Some(Query::Sql(
"INSERT INTO demo(host, cpu, memory, ts) VALUES \
('host1', 66.6, 1024, 1672201025000),\
('host2', 88.8, 333.3, 1672201026000),\
('host3', 88.8, 333.3, 1672201026000)"
.to_string(),
)),
});
let output = instance.do_query(query, QueryContext::arc()).await.unwrap();
assert!(matches!(output, Output::AffectedRows(3)));
let request = DeleteRequest {
table_name: "demo".to_string(),
region_number: 0,
key_columns: vec![
Column {
column_name: "host".to_string(),
values: Some(Values {
string_values: vec!["host2".to_string()],
..Default::default()
}),
datatype: ColumnDataType::String as i32,
..Default::default()
},
Column {
column_name: "ts".to_string(),
values: Some(Values {
ts_millisecond_values: vec![1672201026000],
..Default::default()
}),
datatype: ColumnDataType::TimestampMillisecond as i32,
..Default::default()
},
],
row_count: 1,
};
let request = GrpcRequest::Delete(request);
let output = instance
.do_query(request, QueryContext::arc())
.await
.unwrap();
assert!(matches!(output, Output::AffectedRows(1)));
let output = exec_selection(instance, "SELECT ts, host, cpu FROM demo").await;
let Output::Stream(stream) = output else { unreachable!() };
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+---------------------+-------+------+
| ts | host | cpu |
+---------------------+-------+------+
| 2022-12-28T04:17:05 | host1 | 66.6 |
| 2022-12-28T04:17:06 | host3 | 88.8 |
+---------------------+-------+------+";
assert_eq!(recordbatches.pretty_print().unwrap(), expected);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_handle_query() {
let instance = MockInstance::new("test_handle_query").await;

View File

@@ -224,6 +224,15 @@ pub enum Error {
source: common_grpc_expr::error::Error,
},
#[snafu(display(
"Failed to convert GRPC DeleteRequest to table DeleteRequest, source: {}",
source
))]
ToTableDeleteRequest {
#[snafu(backtrace)]
source: common_grpc_expr::error::Error,
},
#[snafu(display("Failed to find catalog by name: {}", catalog_name))]
CatalogNotFound {
catalog_name: String,
@@ -546,6 +555,7 @@ impl ErrorExt for Error {
}
Error::BuildCreateExprOnInsertion { source }
| Error::ToTableInsertRequest { source }
| Error::ToTableDeleteRequest { source }
| Error::FindNewColumnsOnInsertion { source } => source.status_code(),
Error::ExecuteStatement { source, .. }

View File

@@ -19,8 +19,8 @@ use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;
use api::v1::{
column_def, AlterExpr, CreateDatabaseExpr, CreateTableExpr, DropTableExpr, FlushTableExpr,
InsertRequest, TableId,
column_def, AlterExpr, CreateDatabaseExpr, CreateTableExpr, DeleteRequest, DropTableExpr,
FlushTableExpr, InsertRequest, TableId,
};
use async_trait::async_trait;
use catalog::helper::{SchemaKey, SchemaValue};
@@ -52,6 +52,7 @@ use sql::ast::{Ident, Value as SqlValue};
use sql::statements::create::{PartitionEntry, Partitions};
use sql::statements::statement::Statement;
use sql::statements::{self, sql_value_to_value};
use table::engine::TableReference;
use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType};
use table::requests::TableOptions;
use table::table::AlterContext;
@@ -63,8 +64,8 @@ use crate::error::{
self, AlterExprToRequestSnafu, CatalogEntrySerdeSnafu, CatalogSnafu, ColumnDataTypeSnafu,
DeserializePartitionSnafu, InvokeDatanodeSnafu, ParseSqlSnafu, PrimaryKeyNotFoundSnafu,
RequestDatanodeSnafu, RequestMetaSnafu, Result, SchemaExistsSnafu, StartMetaClientSnafu,
TableAlreadyExistSnafu, TableNotFoundSnafu, TableSnafu, ToTableInsertRequestSnafu,
UnrecognizedTableOptionSnafu,
TableAlreadyExistSnafu, TableNotFoundSnafu, TableSnafu, ToTableDeleteRequestSnafu,
ToTableInsertRequestSnafu, UnrecognizedTableOptionSnafu,
};
use crate::expr_factory;
use crate::table::DistTable;
@@ -516,10 +517,10 @@ impl DistInstance {
.context(RequestMetaSnafu)
}
// TODO(LFC): Refactor insertion implementation for DistTable,
// GRPC InsertRequest to Table InsertRequest, than split Table InsertRequest, than assemble each GRPC InsertRequest, is rather inefficient,
// should operate on GRPC InsertRequest directly.
// Also remember to check the "region_number" carried in InsertRequest, too.
// TODO(LFC): Refactor GRPC insertion and deletion implementation here,
// Take insertion as an example. GRPC insertion is converted to Table InsertRequest here,
// than split the Table InsertRequest in DistTable, than assemble each GRPC InsertRequest there.
// Rather inefficient, should operate on GRPC InsertRequest directly.
async fn handle_dist_insert(
&self,
request: InsertRequest,
@@ -528,12 +529,16 @@ impl DistInstance {
let catalog = &ctx.current_catalog();
let schema = &ctx.current_schema();
let table_name = &request.table_name;
let table_ref = TableReference::full(catalog, schema, table_name);
let table = self
.catalog_manager
.table(catalog, schema, table_name)
.await
.context(CatalogSnafu)?
.context(TableNotFoundSnafu { table_name })?;
.with_context(|| TableNotFoundSnafu {
table_name: table_ref.to_string(),
})?;
let request = common_grpc_expr::insert::to_table_insert_request(catalog, schema, request)
.context(ToTableInsertRequestSnafu)?;
@@ -542,6 +547,32 @@ impl DistInstance {
Ok(Output::AffectedRows(affected_rows))
}
async fn handle_dist_delete(
&self,
request: DeleteRequest,
ctx: QueryContextRef,
) -> Result<Output> {
let catalog = &ctx.current_catalog();
let schema = &ctx.current_schema();
let table_name = &request.table_name;
let table_ref = TableReference::full(catalog, schema, table_name);
let table = self
.catalog_manager
.table(catalog, schema, table_name)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: table_ref.to_string(),
})?;
let request = common_grpc_expr::delete::to_table_delete_request(request)
.context(ToTableDeleteRequestSnafu)?;
let affected_rows = table.delete(request).await.context(TableSnafu)?;
Ok(Output::AffectedRows(affected_rows))
}
#[cfg(test)]
pub(crate) fn catalog_manager(&self) -> Arc<FrontendCatalogManager> {
self.catalog_manager.clone()

View File

@@ -37,6 +37,7 @@ impl GrpcQueryHandler for DistInstance {
async fn do_query(&self, request: Request, ctx: QueryContextRef) -> Result<Output> {
match request {
Request::Insert(request) => self.handle_dist_insert(request, ctx).await,
Request::Delete(request) => self.handle_dist_delete(request, ctx).await,
Request::Query(_) => {
unreachable!("Query should have been handled directly in Frontend Instance!")
}

View File

@@ -74,9 +74,8 @@ impl GrpcQueryHandler for Instance {
}
}
}
Request::Ddl(request) => {
let query = Request::Ddl(request);
GrpcQueryHandler::do_query(&*self.grpc_query_handler, query, ctx).await?
Request::Ddl(_) | Request::Delete(_) => {
GrpcQueryHandler::do_query(self.grpc_query_handler.as_ref(), request, ctx).await?
}
};
Ok(output)
@@ -91,8 +90,8 @@ mod test {
use api::v1::ddl_request::Expr as DdlExpr;
use api::v1::{
alter_expr, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDef,
CreateDatabaseExpr, CreateTableExpr, DdlRequest, DropTableExpr, FlushTableExpr,
InsertRequest, QueryRequest,
CreateDatabaseExpr, CreateTableExpr, DdlRequest, DeleteRequest, DropTableExpr,
FlushTableExpr, InsertRequest, QueryRequest,
};
use catalog::helper::{TableGlobalKey, TableGlobalValue};
use common_catalog::consts::MITO_ENGINE;
@@ -241,11 +240,11 @@ mod test {
}
#[tokio::test(flavor = "multi_thread")]
async fn test_distributed_insert_and_query() {
async fn test_distributed_insert_delete_and_query() {
common_telemetry::init_default_ut_logging();
let instance =
tests::create_distributed_instance("test_distributed_insert_and_query").await;
tests::create_distributed_instance("test_distributed_insert_delete_and_query").await;
let frontend = instance.frontend.as_ref();
let table_name = "my_dist_table";
@@ -253,6 +252,7 @@ mod test {
r"
CREATE TABLE {table_name} (
a INT,
b STRING PRIMARY KEY,
ts TIMESTAMP,
TIME INDEX (ts)
) PARTITION BY RANGE COLUMNS(a) (
@@ -264,7 +264,7 @@ CREATE TABLE {table_name} (
);
create_table(frontend, sql).await;
test_insert_and_query_on_existing_table(frontend, table_name).await;
test_insert_delete_and_query_on_existing_table(frontend, table_name).await;
verify_data_distribution(
&instance,
@@ -273,48 +273,52 @@ CREATE TABLE {table_name} (
(
0u32,
"\
+---------------------+---+
| ts | a |
+---------------------+---+
| 2023-01-01T07:26:12 | 1 |
| 2023-01-01T07:26:14 | |
+---------------------+---+",
+---------------------+---+-------------------+
| ts | a | b |
+---------------------+---+-------------------+
| 2023-01-01T07:26:12 | 1 | ts: 1672557972000 |
| 2023-01-01T07:26:14 | 3 | ts: 1672557974000 |
| 2023-01-01T07:26:15 | 4 | ts: 1672557975000 |
| 2023-01-01T07:26:16 | 5 | ts: 1672557976000 |
| 2023-01-01T07:26:17 | | ts: 1672557977000 |
+---------------------+---+-------------------+",
),
(
1u32,
"\
+---------------------+----+
| ts | a |
+---------------------+----+
| 2023-01-01T07:26:13 | 11 |
+---------------------+----+",
+---------------------+----+-------------------+
| ts | a | b |
+---------------------+----+-------------------+
| 2023-01-01T07:26:18 | 11 | ts: 1672557978000 |
+---------------------+----+-------------------+",
),
(
2u32,
"\
+---------------------+----+
| ts | a |
+---------------------+----+
| 2023-01-01T07:26:15 | 20 |
| 2023-01-01T07:26:16 | 22 |
+---------------------+----+",
+---------------------+----+-------------------+
| ts | a | b |
+---------------------+----+-------------------+
| 2023-01-01T07:26:20 | 20 | ts: 1672557980000 |
| 2023-01-01T07:26:21 | 21 | ts: 1672557981000 |
| 2023-01-01T07:26:23 | 23 | ts: 1672557983000 |
+---------------------+----+-------------------+",
),
(
3u32,
"\
+---------------------+----+
| ts | a |
+---------------------+----+
| 2023-01-01T07:26:17 | 50 |
| 2023-01-01T07:26:18 | 55 |
| 2023-01-01T07:26:19 | 99 |
+---------------------+----+",
+---------------------+----+-------------------+
| ts | a | b |
+---------------------+----+-------------------+
| 2023-01-01T07:26:24 | 50 | ts: 1672557984000 |
| 2023-01-01T07:26:25 | 51 | ts: 1672557985000 |
| 2023-01-01T07:26:27 | 53 | ts: 1672557987000 |
+---------------------+----+-------------------+",
),
]),
)
.await;
test_insert_and_query_on_auto_created_table(frontend).await;
test_insert_delete_and_query_on_auto_created_table(frontend).await;
// Auto created table has only one region.
verify_data_distribution(
@@ -323,16 +327,14 @@ CREATE TABLE {table_name} (
HashMap::from([(
0u32,
"\
+---------------------+---+
| ts | a |
+---------------------+---+
| 2023-01-01T07:26:15 | 4 |
| 2023-01-01T07:26:16 | |
| 2023-01-01T07:26:17 | 6 |
| 2023-01-01T07:26:18 | |
| 2023-01-01T07:26:19 | |
| 2023-01-01T07:26:20 | |
+---------------------+---+",
+---------------------+---+---+
| ts | a | b |
+---------------------+---+---+
| 2023-01-01T07:26:16 | | |
| 2023-01-01T07:26:17 | 6 | |
| 2023-01-01T07:26:18 | | x |
| 2023-01-01T07:26:20 | | z |
+---------------------+---+---+",
)]),
)
.await;
@@ -347,12 +349,12 @@ CREATE TABLE {table_name} (
let instance = &standalone.instance;
let table_name = "my_table";
let sql = format!("CREATE TABLE {table_name} (a INT, ts TIMESTAMP, TIME INDEX (ts))");
let sql = format!("CREATE TABLE {table_name} (a INT, b STRING, ts TIMESTAMP, TIME INDEX (ts), PRIMARY KEY (a, b))");
create_table(instance, sql).await;
test_insert_and_query_on_existing_table(instance, table_name).await;
test_insert_delete_and_query_on_existing_table(instance, table_name).await;
test_insert_and_query_on_auto_created_table(instance).await
test_insert_delete_and_query_on_auto_created_table(instance).await
}
#[tokio::test(flavor = "multi_thread")]
@@ -379,7 +381,7 @@ CREATE TABLE {table_name} (
);
create_table(frontend, sql).await;
test_insert_and_query_on_existing_table(frontend, table_name).await;
test_insert_delete_and_query_on_existing_table(frontend, table_name).await;
flush_table(frontend, "greptime", "public", table_name, None).await;
// Wait for previous task finished
@@ -434,11 +436,11 @@ CREATE TABLE {table_name} (
let data_tmp_dir = standalone.data_tmp_dir();
let table_name = "my_table";
let sql = format!("CREATE TABLE {table_name} (a INT, ts TIMESTAMP, TIME INDEX (ts))");
let sql = format!("CREATE TABLE {table_name} (a INT, b STRING, ts TIMESTAMP, TIME INDEX (ts), PRIMARY KEY (a, b))");
create_table(instance, sql).await;
test_insert_and_query_on_existing_table(instance, table_name).await;
test_insert_delete_and_query_on_existing_table(instance, table_name).await;
let table_id = 1024;
let region_id = 0;
@@ -486,33 +488,55 @@ CREATE TABLE {table_name} (
assert!(matches!(output, Output::AffectedRows(0)));
}
async fn test_insert_and_query_on_existing_table(instance: &Instance, table_name: &str) {
async fn test_insert_delete_and_query_on_existing_table(instance: &Instance, table_name: &str) {
let ts_millisecond_values = vec![
1672557972000,
1672557973000,
1672557974000,
1672557975000,
1672557976000,
1672557977000,
1672557978000,
1672557979000,
1672557980000,
1672557981000,
1672557982000,
1672557983000,
1672557984000,
1672557985000,
1672557986000,
1672557987000,
];
let insert = InsertRequest {
table_name: table_name.to_string(),
columns: vec![
Column {
column_name: "a".to_string(),
values: Some(Values {
i32_values: vec![1, 11, 20, 22, 50, 55, 99],
i32_values: vec![1, 2, 3, 4, 5, 11, 12, 20, 21, 22, 23, 50, 51, 52, 53],
..Default::default()
}),
null_mask: vec![4],
null_mask: vec![32, 0],
semantic_type: SemanticType::Field as i32,
datatype: ColumnDataType::Int32 as i32,
},
Column {
column_name: "b".to_string(),
values: Some(Values {
string_values: ts_millisecond_values
.iter()
.map(|x| format!("ts: {x}"))
.collect(),
..Default::default()
}),
semantic_type: SemanticType::Tag as i32,
datatype: ColumnDataType::String as i32,
..Default::default()
},
Column {
column_name: "ts".to_string(),
values: Some(Values {
ts_millisecond_values: vec![
1672557972000,
1672557973000,
1672557974000,
1672557975000,
1672557976000,
1672557977000,
1672557978000,
1672557979000,
],
ts_millisecond_values,
..Default::default()
}),
semantic_type: SemanticType::Timestamp as i32,
@@ -520,35 +544,113 @@ CREATE TABLE {table_name} (
..Default::default()
},
],
row_count: 8,
row_count: 16,
..Default::default()
};
let request = Request::Insert(insert);
let output = query(instance, request).await;
assert!(matches!(output, Output::AffectedRows(8)));
let output = query(instance, Request::Insert(insert)).await;
assert!(matches!(output, Output::AffectedRows(16)));
let request = Request::Query(QueryRequest {
query: Some(Query::Sql(format!(
"SELECT ts, a FROM {table_name} ORDER BY ts"
"SELECT ts, a, b FROM {table_name} ORDER BY ts"
))),
});
let output = query(instance, request.clone()).await;
let Output::Stream(stream) = output else { unreachable!() };
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+---------------------+----+-------------------+
| ts | a | b |
+---------------------+----+-------------------+
| 2023-01-01T07:26:12 | 1 | ts: 1672557972000 |
| 2023-01-01T07:26:13 | 2 | ts: 1672557973000 |
| 2023-01-01T07:26:14 | 3 | ts: 1672557974000 |
| 2023-01-01T07:26:15 | 4 | ts: 1672557975000 |
| 2023-01-01T07:26:16 | 5 | ts: 1672557976000 |
| 2023-01-01T07:26:17 | | ts: 1672557977000 |
| 2023-01-01T07:26:18 | 11 | ts: 1672557978000 |
| 2023-01-01T07:26:19 | 12 | ts: 1672557979000 |
| 2023-01-01T07:26:20 | 20 | ts: 1672557980000 |
| 2023-01-01T07:26:21 | 21 | ts: 1672557981000 |
| 2023-01-01T07:26:22 | 22 | ts: 1672557982000 |
| 2023-01-01T07:26:23 | 23 | ts: 1672557983000 |
| 2023-01-01T07:26:24 | 50 | ts: 1672557984000 |
| 2023-01-01T07:26:25 | 51 | ts: 1672557985000 |
| 2023-01-01T07:26:26 | 52 | ts: 1672557986000 |
| 2023-01-01T07:26:27 | 53 | ts: 1672557987000 |
+---------------------+----+-------------------+";
assert_eq!(recordbatches.pretty_print().unwrap(), expected);
let delete = DeleteRequest {
table_name: table_name.to_string(),
region_number: 0,
key_columns: vec![
Column {
column_name: "a".to_string(),
semantic_type: SemanticType::Field as i32,
values: Some(Values {
i32_values: vec![2, 12, 22, 52],
..Default::default()
}),
datatype: ColumnDataType::Int32 as i32,
..Default::default()
},
Column {
column_name: "b".to_string(),
semantic_type: SemanticType::Tag as i32,
values: Some(Values {
string_values: vec![
"ts: 1672557973000".to_string(),
"ts: 1672557979000".to_string(),
"ts: 1672557982000".to_string(),
"ts: 1672557986000".to_string(),
],
..Default::default()
}),
datatype: ColumnDataType::String as i32,
..Default::default()
},
Column {
column_name: "ts".to_string(),
semantic_type: SemanticType::Timestamp as i32,
values: Some(Values {
ts_millisecond_values: vec![
1672557973000,
1672557979000,
1672557982000,
1672557986000,
],
..Default::default()
}),
datatype: ColumnDataType::TimestampMillisecond as i32,
..Default::default()
},
],
row_count: 4,
};
let output = query(instance, Request::Delete(delete)).await;
assert!(matches!(output, Output::AffectedRows(4)));
let output = query(instance, request).await;
let Output::Stream(stream) = output else { unreachable!() };
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+---------------------+----+
| ts | a |
+---------------------+----+
| 2023-01-01T07:26:12 | 1 |
| 2023-01-01T07:26:13 | 11 |
| 2023-01-01T07:26:14 | |
| 2023-01-01T07:26:15 | 20 |
| 2023-01-01T07:26:16 | 22 |
| 2023-01-01T07:26:17 | 50 |
| 2023-01-01T07:26:18 | 55 |
| 2023-01-01T07:26:19 | 99 |
+---------------------+----+";
+---------------------+----+-------------------+
| ts | a | b |
+---------------------+----+-------------------+
| 2023-01-01T07:26:12 | 1 | ts: 1672557972000 |
| 2023-01-01T07:26:14 | 3 | ts: 1672557974000 |
| 2023-01-01T07:26:15 | 4 | ts: 1672557975000 |
| 2023-01-01T07:26:16 | 5 | ts: 1672557976000 |
| 2023-01-01T07:26:17 | | ts: 1672557977000 |
| 2023-01-01T07:26:18 | 11 | ts: 1672557978000 |
| 2023-01-01T07:26:20 | 20 | ts: 1672557980000 |
| 2023-01-01T07:26:21 | 21 | ts: 1672557981000 |
| 2023-01-01T07:26:23 | 23 | ts: 1672557983000 |
| 2023-01-01T07:26:24 | 50 | ts: 1672557984000 |
| 2023-01-01T07:26:25 | 51 | ts: 1672557985000 |
| 2023-01-01T07:26:27 | 53 | ts: 1672557987000 |
+---------------------+----+-------------------+";
assert_eq!(recordbatches.pretty_print().unwrap(), expected);
}
@@ -583,7 +685,7 @@ CREATE TABLE {table_name} (
for (region, dn) in region_to_dn_map.iter() {
let stmt = QueryLanguageParser::parse_sql(&format!(
"SELECT ts, a FROM {table_name} ORDER BY ts"
"SELECT ts, a, b FROM {table_name} ORDER BY ts"
))
.unwrap();
let dn = instance.datanodes.get(dn).unwrap();
@@ -603,7 +705,7 @@ CREATE TABLE {table_name} (
}
}
async fn test_insert_and_query_on_auto_created_table(instance: &Instance) {
async fn test_insert_delete_and_query_on_auto_created_table(instance: &Instance) {
let insert = InsertRequest {
table_name: "auto_created_table".to_string(),
columns: vec![
@@ -675,7 +777,7 @@ CREATE TABLE {table_name} (
"SELECT ts, a, b FROM auto_created_table".to_string(),
)),
});
let output = query(instance, request).await;
let output = query(instance, request.clone()).await;
let Output::Stream(stream) = output else { unreachable!() };
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
@@ -688,6 +790,39 @@ CREATE TABLE {table_name} (
| 2023-01-01T07:26:18 | | x |
| 2023-01-01T07:26:19 | | |
| 2023-01-01T07:26:20 | | z |
+---------------------+---+---+";
assert_eq!(recordbatches.pretty_print().unwrap(), expected);
let delete = DeleteRequest {
table_name: "auto_created_table".to_string(),
region_number: 0,
key_columns: vec![Column {
column_name: "ts".to_string(),
values: Some(Values {
ts_millisecond_values: vec![1672557975000, 1672557979000],
..Default::default()
}),
semantic_type: SemanticType::Timestamp as i32,
datatype: ColumnDataType::TimestampMillisecond as i32,
..Default::default()
}],
row_count: 2,
};
let output = query(instance, Request::Delete(delete)).await;
assert!(matches!(output, Output::AffectedRows(2)));
let output = query(instance, request).await;
let Output::Stream(stream) = output else { unreachable!() };
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+---------------------+---+---+
| ts | a | b |
+---------------------+---+---+
| 2023-01-01T07:26:16 | | |
| 2023-01-01T07:26:17 | 6 | |
| 2023-01-01T07:26:18 | | x |
| 2023-01-01T07:26:20 | | z |
+---------------------+---+---+";
assert_eq!(recordbatches.pretty_print().unwrap(), expected);
}

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use std::any::Any;
use std::iter;
use std::sync::Arc;
use api::v1::AlterExpr;
@@ -36,18 +37,23 @@ use datafusion_common::DataFusionError;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use meta_client::rpc::TableName;
use partition::manager::PartitionRuleManagerRef;
use partition::splitter::WriteSplitter;
use snafu::prelude::*;
use store_api::storage::RegionNumber;
use table::error::TableOperationSnafu;
use table::metadata::{FilterPushDownType, TableInfo, TableInfoRef};
use table::requests::{AlterTableRequest, InsertRequest};
use table::requests::{AlterTableRequest, DeleteRequest, InsertRequest};
use table::table::AlterContext;
use table::{meter_insert_request, Table};
use tokio::sync::RwLock;
use crate::datanode::DatanodeClients;
use crate::error::{self, Result};
use crate::error::{self, FindDatanodeSnafu, FindTableRouteSnafu, Result};
use crate::table::delete::to_grpc_delete_request;
use crate::table::insert::to_grpc_insert_request;
use crate::table::scan::{DatanodeInstance, TableScanPlan};
mod delete;
pub mod insert;
pub(crate) mod scan;
@@ -84,8 +90,15 @@ impl Table for DistTable {
.map_err(BoxedError::new)
.context(TableOperationSnafu)?;
let inserts = splits
.into_iter()
.map(|(region_number, insert)| to_grpc_insert_request(region_number, insert))
.collect::<Result<Vec<_>>>()
.map_err(BoxedError::new)
.context(TableOperationSnafu)?;
let output = self
.dist_insert(splits)
.dist_insert(inserts)
.await
.map_err(BoxedError::new)
.context(TableOperationSnafu)?;
@@ -155,6 +168,52 @@ impl Table for DistTable {
.map_err(BoxedError::new)
.context(TableOperationSnafu)
}
async fn delete(&self, request: DeleteRequest) -> table::Result<usize> {
let partition_rule = self
.partition_manager
.find_table_partition_rule(&self.table_name)
.await
.map_err(BoxedError::new)
.context(TableOperationSnafu)?;
let schema = self.schema();
let time_index = &schema
.timestamp_column()
.with_context(|| table::error::MissingTimeIndexColumnSnafu {
table_name: self.table_name.to_string(),
})?
.name;
let table_info = self.table_info();
let key_column_names = table_info
.meta
.row_key_column_names()
.chain(iter::once(time_index))
.collect::<Vec<_>>();
let requests = WriteSplitter::with_partition_rule(partition_rule)
.split_delete(request, key_column_names)
.map_err(BoxedError::new)
.and_then(|requests| {
requests
.into_iter()
.map(|(region_number, request)| {
to_grpc_delete_request(&self.table_name, region_number, request)
})
.collect::<Result<Vec<_>>>()
.map_err(BoxedError::new)
})
.context(TableOperationSnafu)?;
let output = self
.dist_delete(requests)
.await
.map_err(BoxedError::new)
.context(TableOperationSnafu)?;
let Output::AffectedRows(rows) = output else { unreachable!() };
Ok(rows)
}
}
impl DistTable {
@@ -274,6 +333,46 @@ impl DistTable {
}
Ok(())
}
async fn find_datanode_instances(
&self,
regions: &[RegionNumber],
) -> Result<Vec<DatanodeInstance>> {
let table_name = &self.table_name;
let route = self
.partition_manager
.find_table_route(table_name)
.await
.with_context(|_| FindTableRouteSnafu {
table_name: table_name.to_string(),
})?;
let datanodes = regions
.iter()
.map(|&n| {
let region_id = n as u64;
route
.region_routes
.iter()
.find_map(|x| {
if x.region.id == region_id {
x.leader_peer.clone()
} else {
None
}
})
.context(FindDatanodeSnafu { region: region_id })
})
.collect::<Result<Vec<_>>>()?;
let mut instances = Vec::with_capacity(datanodes.len());
for datanode in datanodes {
let client = self.datanode_clients.get_client(&datanode).await;
let db = Database::new(&table_name.catalog_name, &table_name.schema_name, client);
instances.push(DatanodeInstance::new(Arc::new(self.clone()) as _, db));
}
Ok(instances)
}
}
fn project_schema(table_schema: SchemaRef, projection: Option<&Vec<usize>>) -> SchemaRef {

View File

@@ -0,0 +1,110 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::DeleteRequest as GrpcDeleteRequest;
use common_query::Output;
use futures::future;
use meta_client::rpc::TableName;
use snafu::ResultExt;
use store_api::storage::RegionNumber;
use table::requests::DeleteRequest;
use crate::error::{JoinTaskSnafu, RequestDatanodeSnafu, Result};
use crate::table::insert::to_grpc_columns;
use crate::table::DistTable;
impl DistTable {
pub(super) async fn dist_delete(&self, requests: Vec<GrpcDeleteRequest>) -> Result<Output> {
let regions = requests.iter().map(|x| x.region_number).collect::<Vec<_>>();
let instances = self.find_datanode_instances(&regions).await?;
let results = future::try_join_all(instances.into_iter().zip(requests.into_iter()).map(
|(instance, request)| {
common_runtime::spawn_write(async move {
instance
.grpc_delete(request)
.await
.context(RequestDatanodeSnafu)
})
},
))
.await
.context(JoinTaskSnafu)?;
let affected_rows = results.into_iter().sum::<Result<u32>>()?;
Ok(Output::AffectedRows(affected_rows as _))
}
}
pub(super) fn to_grpc_delete_request(
table_name: &TableName,
region_number: RegionNumber,
request: DeleteRequest,
) -> Result<GrpcDeleteRequest> {
let (key_columns, row_count) = to_grpc_columns(&request.key_column_values)?;
Ok(GrpcDeleteRequest {
table_name: table_name.table_name.clone(),
region_number,
key_columns,
row_count,
})
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::column::{SemanticType, Values};
use api::v1::{Column, ColumnDataType};
use datatypes::prelude::VectorRef;
use datatypes::vectors::Int32Vector;
use super::*;
#[test]
fn test_to_grpc_delete_request() {
let table_name = TableName {
catalog_name: "greptime".to_string(),
schema_name: "public".to_string(),
table_name: "foo".to_string(),
};
let region_number = 1;
let key_column_values = HashMap::from([(
"id".to_string(),
Arc::new(Int32Vector::from_slice(vec![1, 2, 3])) as VectorRef,
)]);
let request = DeleteRequest { key_column_values };
let result = to_grpc_delete_request(&table_name, region_number, request).unwrap();
assert_eq!(result.table_name, "foo");
assert_eq!(result.region_number, region_number);
assert_eq!(
result.key_columns,
vec![Column {
column_name: "id".to_string(),
semantic_type: SemanticType::Field as i32,
values: Some(Values {
i32_values: vec![1, 2, 3],
..Default::default()
}),
null_mask: vec![0],
datatype: ColumnDataType::Int32 as i32,
}]
);
assert_eq!(result.row_count, 3);
}
}

View File

@@ -13,79 +13,54 @@
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use api::helper::{push_vals, ColumnDataTypeWrapper};
use api::v1::column::SemanticType;
use api::v1::{Column, InsertRequest as GrpcInsertRequest};
use client::Database;
use common_query::Output;
use datatypes::prelude::ConcreteDataType;
use snafu::{ensure, OptionExt, ResultExt};
use datatypes::prelude::{ConcreteDataType, VectorRef};
use futures::future;
use snafu::{ensure, ResultExt};
use store_api::storage::RegionNumber;
use table::requests::InsertRequest;
use super::DistTable;
use crate::error;
use crate::error::{FindTableRouteSnafu, Result};
use crate::table::scan::DatanodeInstance;
use crate::error::{JoinTaskSnafu, RequestDatanodeSnafu, Result};
impl DistTable {
pub async fn dist_insert(
&self,
inserts: HashMap<RegionNumber, InsertRequest>,
) -> Result<Output> {
let table_name = &self.table_name;
let route = self
.partition_manager
.find_table_route(&self.table_name)
.await
.with_context(|_| FindTableRouteSnafu {
table_name: table_name.to_string(),
})?;
pub async fn dist_insert(&self, inserts: Vec<GrpcInsertRequest>) -> Result<Output> {
let regions = inserts.iter().map(|x| x.region_number).collect::<Vec<_>>();
let instances = self.find_datanode_instances(&regions).await?;
let mut joins = Vec::with_capacity(inserts.len());
for (region_id, insert) in inserts {
let datanode = route
.region_routes
.iter()
.find_map(|x| {
if x.region.id == region_id as u64 {
x.leader_peer.clone()
} else {
None
}
let results = future::try_join_all(instances.into_iter().zip(inserts.into_iter()).map(
|(instance, request)| {
common_runtime::spawn_write(async move {
instance
.grpc_insert(request)
.await
.context(RequestDatanodeSnafu)
})
.context(error::FindDatanodeSnafu { region: region_id })?;
},
))
.await
.context(JoinTaskSnafu)?;
let client = self.datanode_clients.get_client(&datanode).await;
let db = Database::new(&table_name.catalog_name, &table_name.schema_name, client);
let instance = DatanodeInstance::new(Arc::new(self.clone()) as _, db);
let join = common_runtime::spawn_write(async move {
instance
.grpc_insert(to_grpc_insert_request(region_id, insert)?)
.await
.context(error::RequestDatanodeSnafu)
});
joins.push(join);
}
let mut success = 0;
for join in joins {
let rows = join.await.context(error::JoinTaskSnafu)?? as usize;
success += rows;
}
Ok(Output::AffectedRows(success))
let affected_rows = results.into_iter().sum::<Result<u32>>()?;
Ok(Output::AffectedRows(affected_rows as _))
}
}
pub fn insert_request_to_insert_batch(insert: &InsertRequest) -> Result<(Vec<Column>, u32)> {
to_grpc_columns(&insert.columns_values)
}
pub(crate) fn to_grpc_columns(
columns_values: &HashMap<String, VectorRef>,
) -> Result<(Vec<Column>, u32)> {
let mut row_count = None;
let columns = insert
.columns_values
let columns = columns_values
.iter()
.map(|(column_name, vector)| {
match row_count {
@@ -129,7 +104,7 @@ pub fn insert_request_to_insert_batch(insert: &InsertRequest) -> Result<(Vec<Col
Ok((columns, row_count))
}
fn to_grpc_insert_request(
pub(crate) fn to_grpc_insert_request(
region_number: RegionNumber,
insert: InsertRequest,
) -> Result<GrpcInsertRequest> {

View File

@@ -15,7 +15,7 @@
use std::fmt::Formatter;
use std::sync::Arc;
use api::v1::InsertRequest;
use api::v1::{DeleteRequest, InsertRequest};
use client::Database;
use common_query::prelude::Expr;
use common_query::Output;
@@ -51,6 +51,10 @@ impl DatanodeInstance {
self.db.insert(request).await
}
pub(crate) async fn grpc_delete(&self, request: DeleteRequest) -> client::Result<u32> {
self.db.delete(request).await
}
pub(crate) async fn grpc_table_scan(&self, plan: TableScanPlan) -> Result<RecordBatches> {
let logical_plan = self.build_logical_plan(&plan)?;

View File

@@ -776,8 +776,7 @@ async fn test_use_database(instance: Arc<dyn MockInstance>) {
check_output_stream(output, expected).await;
}
// should apply to both instances. tracked in #755
#[apply(standalone_instance_case)]
#[apply(both_instances_cases)]
async fn test_delete(instance: Arc<dyn MockInstance>) {
let instance = instance.frontend();

View File

@@ -19,7 +19,7 @@ use datatypes::value::Value;
use snafu::ensure;
use store_api::storage::RegionNumber;
use crate::error::{self, Error};
use crate::error::{self, Result};
use crate::partition::{PartitionBound, PartitionExpr, PartitionRule};
/// A [RangeColumnsPartitionRule] is very similar to [RangePartitionRule] except that it allows
@@ -145,7 +145,7 @@ impl PartitionRule for RangeColumnsPartitionRule {
self.column_list.clone()
}
fn find_region(&self, values: &[Value]) -> Result<RegionNumber, Error> {
fn find_region(&self, values: &[Value]) -> Result<RegionNumber> {
ensure!(
values.len() == self.column_list.len(),
error::RegionKeysSizeSnafu {
@@ -166,7 +166,7 @@ impl PartitionRule for RangeColumnsPartitionRule {
})
}
fn find_regions(&self, exprs: &[PartitionExpr]) -> Result<Vec<RegionNumber>, Error> {
fn find_regions_by_exprs(&self, exprs: &[PartitionExpr]) -> Result<Vec<RegionNumber>> {
let regions = if exprs.iter().all(|x| self.column_list.contains(&x.column)) {
let PartitionExpr {
column: _,
@@ -269,7 +269,7 @@ mod tests {
value: value.into(),
},
];
let regions = rule.find_regions(&exprs).unwrap();
let regions = rule.find_regions_by_exprs(&exprs).unwrap();
assert_eq!(
regions,
expected_regions.into_iter().collect::<Vec<RegionNumber>>()
@@ -332,7 +332,7 @@ mod tests {
value: "hz".into(),
},
];
let regions = rule.find_regions(&exprs).unwrap();
let regions = rule.find_regions_by_exprs(&exprs).unwrap();
assert_eq!(regions, vec![1, 2, 3, 4, 5, 6]);
}

View File

@@ -93,6 +93,9 @@ pub enum Error {
#[snafu(display("Invalid InsertRequest, reason: {}", reason))]
InvalidInsertRequest { reason: String, location: Location },
#[snafu(display("Invalid DeleteRequest, reason: {}", reason))]
InvalidDeleteRequest { reason: String, location: Location },
#[snafu(display(
"Invalid table route data in meta, table name: {}, msg: {}",
table_name,
@@ -127,6 +130,7 @@ impl ErrorExt for Error {
| Error::FindRegions { .. }
| Error::RegionKeysSize { .. }
| Error::InvalidInsertRequest { .. }
| Error::InvalidDeleteRequest { .. }
| Error::FindPartitionColumn { .. } => StatusCode::InvalidArguments,
Error::SerializeJson { .. } | Error::DeserializeJson { .. } => StatusCode::Internal,
Error::InvalidTableRouteData { .. } => StatusCode::Internal,

View File

@@ -209,7 +209,7 @@ impl PartitionRuleManager {
}
target.into_iter().collect::<Vec<_>>()
} else {
partition_rule.find_regions(&[])?
partition_rule.find_regions_by_exprs(&[])?
};
ensure!(
!regions.is_empty(),
@@ -249,7 +249,7 @@ fn find_regions0(partition_rule: PartitionRuleRef, filter: &Expr) -> Result<Hash
}
})?;
return Ok(partition_rule
.find_regions(&[PartitionExpr::new(column, op, value)])?
.find_regions_by_exprs(&[PartitionExpr::new(column, op, value)])?
.into_iter()
.collect::<HashSet<RegionNumber>>());
}
@@ -277,7 +277,7 @@ fn find_regions0(partition_rule: PartitionRuleRef, filter: &Expr) -> Result<Hash
// Returns all regions for not supported partition expr as a safety hatch.
Ok(partition_rule
.find_regions(&[])?
.find_regions_by_exprs(&[])?
.into_iter()
.collect::<HashSet<RegionNumber>>())
}

View File

@@ -23,7 +23,7 @@ use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use store_api::storage::RegionNumber;
use crate::error::{self, Error};
use crate::error::{self, Error, Result};
pub type PartitionRuleRef = Arc<dyn PartitionRule>;
@@ -32,11 +32,15 @@ pub trait PartitionRule: Sync + Send {
fn partition_columns(&self) -> Vec<String>;
// TODO(LFC): Unify `find_region` and `find_regions` methods when distributed read and write features are both merged into develop.
// Or find better names since one is mainly for writes and the other is for reads.
fn find_region(&self, values: &[Value]) -> Result<RegionNumber, Error>;
/// Finds the target region by the partition values.
///
/// Note that the `values` should have the same length as the `partition_columns`.
fn find_region(&self, values: &[Value]) -> Result<RegionNumber>;
fn find_regions(&self, exprs: &[PartitionExpr]) -> Result<Vec<RegionNumber>, Error>;
/// Finds the target regions by the partition expressions.
///
/// Note that the `exprs` should have the same length as the `partition_columns`.
fn find_regions_by_exprs(&self, exprs: &[PartitionExpr]) -> Result<Vec<RegionNumber>>;
}
/// The right bound(exclusive) of partition range.
@@ -72,7 +76,7 @@ impl PartitionDef {
impl TryFrom<MetaPartition> for PartitionDef {
type Error = Error;
fn try_from(partition: MetaPartition) -> Result<Self, Self::Error> {
fn try_from(partition: MetaPartition) -> Result<Self> {
let MetaPartition {
column_list,
value_list,
@@ -86,7 +90,7 @@ impl TryFrom<MetaPartition> for PartitionDef {
let partition_bounds = value_list
.into_iter()
.map(|x| serde_json::from_str(&String::from_utf8_lossy(&x)))
.collect::<Result<Vec<PartitionBound>, serde_json::Error>>()
.collect::<std::result::Result<Vec<PartitionBound>, serde_json::Error>>()
.context(error::DeserializeJsonSnafu)?;
Ok(PartitionDef {
@@ -99,7 +103,7 @@ impl TryFrom<MetaPartition> for PartitionDef {
impl TryFrom<PartitionDef> for MetaPartition {
type Error = Error;
fn try_from(partition: PartitionDef) -> Result<Self, Self::Error> {
fn try_from(partition: PartitionDef) -> Result<Self> {
let PartitionDef {
partition_columns: columns,
partition_bounds: bounds,
@@ -113,7 +117,7 @@ impl TryFrom<PartitionDef> for MetaPartition {
let value_list = bounds
.into_iter()
.map(|x| serde_json::to_string(&x).map(|s| s.into_bytes()))
.collect::<Result<Vec<Vec<u8>>, serde_json::Error>>()
.collect::<std::result::Result<Vec<Vec<u8>>, serde_json::Error>>()
.context(error::SerializeJsonSnafu)?;
Ok(MetaPartition {

View File

@@ -120,7 +120,7 @@ impl PartitionRule for RangePartitionRule {
})
}
fn find_regions(&self, exprs: &[PartitionExpr]) -> Result<Vec<RegionNumber>, Error> {
fn find_regions_by_exprs(&self, exprs: &[PartitionExpr]) -> Result<Vec<RegionNumber>, Error> {
if exprs.is_empty() {
return Ok(self.regions.clone());
}
@@ -197,7 +197,7 @@ mod test {
op,
value: value.into(),
};
let regions = rule.find_regions(&[expr]).unwrap();
let regions = rule.find_regions_by_exprs(&[expr]).unwrap();
assert_eq!(
regions,
expected_regions.into_iter().collect::<Vec<RegionNumber>>()

View File

@@ -22,7 +22,10 @@ use snafu::{ensure, OptionExt};
use store_api::storage::RegionNumber;
use table::requests::{DeleteRequest, InsertRequest};
use crate::error::{FindPartitionColumnSnafu, FindRegionSnafu, InvalidInsertRequestSnafu, Result};
use crate::error::{
FindPartitionColumnSnafu, FindRegionSnafu, InvalidDeleteRequestSnafu,
InvalidInsertRequestSnafu, Result,
};
use crate::PartitionRuleRef;
pub type InsertRequestSplit = HashMap<RegionNumber, InsertRequest>;
@@ -43,12 +46,77 @@ impl WriteSplitter {
check_req(&insert)?;
let column_names = self.partition_rule.partition_columns();
let partition_columns = find_partitioning_values(&insert, &column_names)?;
let values = &insert.columns_values;
let partition_columns = find_partitioning_values(values, &column_names)?;
let region_map = self.split_partitioning_values(&partition_columns)?;
Ok(split_insert_request(&insert, region_map))
}
pub fn split_delete(
&self,
request: DeleteRequest,
key_column_names: Vec<&String>,
) -> Result<DeleteRequestSplit> {
Self::validate_delete_request(&request)?;
let partition_columns = self.partition_rule.partition_columns();
let values = find_partitioning_values(&request.key_column_values, &partition_columns)?;
let regional_value_indexes = self.split_partitioning_values(&values)?;
let requests = regional_value_indexes
.into_iter()
.map(|(region_id, value_indexes)| {
let key_column_values = request
.key_column_values
.iter()
.filter(|(column_name, _)| key_column_names.contains(column_name))
.map(|(column_name, vector)| {
let mut builder = vector
.data_type()
.create_mutable_vector(value_indexes.len());
value_indexes.iter().for_each(|&index| {
builder.push_value_ref(vector.get(index).as_value_ref());
});
(column_name.to_string(), builder.to_vector())
})
.collect();
(region_id, DeleteRequest { key_column_values })
})
.collect();
Ok(requests)
}
fn validate_delete_request(request: &DeleteRequest) -> Result<()> {
let rows = request
.key_column_values
.values()
.next()
.map(|x| x.len())
.context(InvalidDeleteRequestSnafu {
reason: "no key column values",
})?;
ensure!(
rows > 0,
InvalidDeleteRequestSnafu {
reason: "no rows in delete request"
}
);
ensure!(
request
.key_column_values
.values()
.map(|x| x.len())
.all(|x| x == rows),
InvalidDeleteRequestSnafu {
reason: "the lengths of key column values are not the same"
}
);
Ok(())
}
fn split_partitioning_values(
&self,
values: &[VectorRef],
@@ -95,11 +163,9 @@ fn check_req(insert: &InsertRequest) -> Result<()> {
}
fn find_partitioning_values(
insert: &InsertRequest,
values: &HashMap<String, VectorRef>,
partition_columns: &[String],
) -> Result<Vec<VectorRef>> {
let values = &insert.columns_values;
partition_columns
.iter()
.map(|column_name| {
@@ -190,9 +256,7 @@ mod tests {
use store_api::storage::RegionNumber;
use table::requests::InsertRequest;
use super::{
check_req, find_partitioning_values, partition_values, split_insert_request, WriteSplitter,
};
use super::*;
use crate::error::Error;
use crate::partition::{PartitionExpr, PartitionRule};
use crate::PartitionRuleRef;
@@ -331,7 +395,8 @@ mod tests {
let insert = mock_insert_request();
let partition_column_names = vec!["host".to_string(), "id".to_string()];
let columns = find_partitioning_values(&insert, &partition_column_names).unwrap();
let columns =
find_partitioning_values(&insert.columns_values, &partition_column_names).unwrap();
let host_column = columns[0].clone();
assert_eq!(
@@ -461,7 +526,7 @@ mod tests {
unreachable!()
}
fn find_regions(&self, _: &[PartitionExpr]) -> Result<Vec<RegionNumber>, Error> {
fn find_regions_by_exprs(&self, _: &[PartitionExpr]) -> Result<Vec<RegionNumber>, Error> {
unimplemented!()
}
}

View File

@@ -155,7 +155,7 @@ impl GrpcQueryHandler for DummyInstance {
ctx: QueryContextRef,
) -> std::result::Result<Output, Self::Error> {
let output = match request {
Request::Insert(_) => unimplemented!(),
Request::Insert(_) | Request::Delete(_) => unimplemented!(),
Request::Query(query_request) => {
let query = query_request.query.unwrap();
match query {

View File

@@ -130,6 +130,12 @@ pub enum Error {
table_id: TableId,
location: Location,
},
#[snafu(display("Missing time index column in table: {}", table_name))]
MissingTimeIndexColumn {
table_name: String,
location: Location,
},
}
impl ErrorExt for Error {
@@ -152,7 +158,10 @@ impl ErrorExt for Error {
Error::ParseTableOption { .. }
| Error::EngineNotFound { .. }
| Error::EngineExist { .. } => StatusCode::InvalidArguments,
Error::InvalidTable { .. } => StatusCode::Internal,
Error::InvalidTable { .. } | Error::MissingTimeIndexColumn { .. } => {
StatusCode::Internal
}
}
}