feat(frontend): migrate insert to region server (#2318)

* feat(frontend): migrate insert to region server

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* refactor: move converter to Inserter

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: rename convert function

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: address comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: address comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: add span id

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: compilation

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* retrigger action

* retrigger action

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
Zhenchi
2023-09-05 11:23:54 +08:00
committed by Ruihang Xia
parent 3eccb36047
commit 7dde9ce3ce
22 changed files with 837 additions and 609 deletions

View File

@@ -33,17 +33,16 @@ use datatypes::vectors::{
TimestampMillisecondVector, TimestampNanosecondVector, TimestampSecondVector, UInt32Vector,
UInt64Vector, VectorRef,
};
use greptime_proto::v1;
use greptime_proto::v1::ddl_request::Expr;
use greptime_proto::v1::greptime_request::Request;
use greptime_proto::v1::query_request::Query;
use greptime_proto::v1::value::ValueData;
use greptime_proto::v1::{DdlRequest, IntervalMonthDayNano, QueryRequest, SemanticType};
use greptime_proto::v1::{self, DdlRequest, IntervalMonthDayNano, QueryRequest, Row, SemanticType};
use snafu::prelude::*;
use crate::error::{self, Result};
use crate::v1::column::Values;
use crate::v1::{Column, ColumnDataType};
use crate::v1::{Column, ColumnDataType, Value as GrpcValue};
#[derive(Debug, PartialEq, Eq)]
pub struct ColumnDataTypeWrapper(ColumnDataType);
@@ -804,6 +803,59 @@ pub fn to_column_data_type(data_type: &ConcreteDataType) -> Option<ColumnDataTyp
Some(column_data_type)
}
pub fn vectors_to_rows<'a>(
columns: impl Iterator<Item = &'a VectorRef>,
row_count: usize,
) -> Vec<Row> {
let mut rows = vec![Row { values: vec![] }; row_count];
for column in columns {
for (row_index, row) in rows.iter_mut().enumerate() {
row.values.push(GrpcValue {
value_data: match column.get(row_index) {
Value::Null => None,
Value::Boolean(v) => Some(ValueData::BoolValue(v)),
Value::UInt8(v) => Some(ValueData::U8Value(v as _)),
Value::UInt16(v) => Some(ValueData::U16Value(v as _)),
Value::UInt32(v) => Some(ValueData::U32Value(v)),
Value::UInt64(v) => Some(ValueData::U64Value(v)),
Value::Int8(v) => Some(ValueData::I8Value(v as _)),
Value::Int16(v) => Some(ValueData::I16Value(v as _)),
Value::Int32(v) => Some(ValueData::I32Value(v)),
Value::Int64(v) => Some(ValueData::I64Value(v)),
Value::Float32(v) => Some(ValueData::F32Value(*v)),
Value::Float64(v) => Some(ValueData::F64Value(*v)),
Value::String(v) => Some(ValueData::StringValue(v.as_utf8().to_string())),
Value::Binary(v) => Some(ValueData::BinaryValue(v.to_vec())),
Value::Date(v) => Some(ValueData::DateValue(v.val())),
Value::DateTime(v) => Some(ValueData::DatetimeValue(v.val())),
Value::Timestamp(v) => Some(match v.unit() {
TimeUnit::Second => ValueData::TimeSecondValue(v.value()),
TimeUnit::Millisecond => ValueData::TimeMillisecondValue(v.value()),
TimeUnit::Microsecond => ValueData::TimeMicrosecondValue(v.value()),
TimeUnit::Nanosecond => ValueData::TimeNanosecondValue(v.value()),
}),
Value::Time(v) => Some(match v.unit() {
TimeUnit::Second => ValueData::TimeSecondValue(v.value()),
TimeUnit::Millisecond => ValueData::TimeMillisecondValue(v.value()),
TimeUnit::Microsecond => ValueData::TimeMicrosecondValue(v.value()),
TimeUnit::Nanosecond => ValueData::TimeNanosecondValue(v.value()),
}),
Value::Interval(v) => Some(match v.unit() {
IntervalUnit::YearMonth => ValueData::IntervalYearMonthValues(v.to_i32()),
IntervalUnit::DayTime => ValueData::IntervalDayTimeValues(v.to_i64()),
IntervalUnit::MonthDayNano => ValueData::IntervalMonthDayNanoValues(
convert_i128_to_interval(v.to_i128()),
),
}),
Value::List(_) => unreachable!(),
},
})
}
}
rows
}
/// Returns true if the column type is equal to expected type.
fn is_column_type_eq(column_type: ColumnDataType, expect_type: &ConcreteDataType) -> bool {
if let Some(expect) = to_column_data_type(expect_type) {
@@ -818,8 +870,9 @@ mod tests {
use std::sync::Arc;
use datatypes::types::{
IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType, TimeMillisecondType,
TimeSecondType, TimestampMillisecondType, TimestampSecondType,
Int32Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType,
TimeMillisecondType, TimeSecondType, TimestampMillisecondType, TimestampSecondType,
UInt32Type,
};
use datatypes::vectors::{
BooleanVector, IntervalDayTimeVector, IntervalMonthDayNanoVector, IntervalYearMonthVector,
@@ -1522,4 +1575,76 @@ mod tests {
Value::DateTime(3.into())
]
);
#[test]
fn test_vectors_to_rows_for_different_types() {
let boolean_vec = BooleanVector::from_vec(vec![true, false, true]);
let int8_vec = PrimitiveVector::<Int8Type>::from_iter_values(vec![1, 2, 3]);
let int32_vec = PrimitiveVector::<Int32Type>::from_iter_values(vec![100, 200, 300]);
let uint8_vec = PrimitiveVector::<UInt8Type>::from_iter_values(vec![10, 20, 30]);
let uint32_vec = PrimitiveVector::<UInt32Type>::from_iter_values(vec![1000, 2000, 3000]);
let float32_vec = Float32Vector::from_vec(vec![1.1, 2.2, 3.3]);
let date_vec = DateVector::from_vec(vec![10, 20, 30]);
let string_vec = StringVector::from_vec(vec!["a", "b", "c"]);
let vector_refs: Vec<VectorRef> = vec![
Arc::new(boolean_vec),
Arc::new(int8_vec),
Arc::new(int32_vec),
Arc::new(uint8_vec),
Arc::new(uint32_vec),
Arc::new(float32_vec),
Arc::new(date_vec),
Arc::new(string_vec),
];
let result = vectors_to_rows(vector_refs.iter(), 3);
assert_eq!(result.len(), 3);
assert_eq!(result[0].values.len(), 8);
let values = result[0]
.values
.iter()
.map(|v| v.value_data.clone().unwrap())
.collect::<Vec<_>>();
assert_eq!(values[0], ValueData::BoolValue(true));
assert_eq!(values[1], ValueData::I8Value(1));
assert_eq!(values[2], ValueData::I32Value(100));
assert_eq!(values[3], ValueData::U8Value(10));
assert_eq!(values[4], ValueData::U32Value(1000));
assert_eq!(values[5], ValueData::F32Value(1.1));
assert_eq!(values[6], ValueData::DateValue(10));
assert_eq!(values[7], ValueData::StringValue("a".to_string()));
assert_eq!(result[1].values.len(), 8);
let values = result[1]
.values
.iter()
.map(|v| v.value_data.clone().unwrap())
.collect::<Vec<_>>();
assert_eq!(values[0], ValueData::BoolValue(false));
assert_eq!(values[1], ValueData::I8Value(2));
assert_eq!(values[2], ValueData::I32Value(200));
assert_eq!(values[3], ValueData::U8Value(20));
assert_eq!(values[4], ValueData::U32Value(2000));
assert_eq!(values[5], ValueData::F32Value(2.2));
assert_eq!(values[6], ValueData::DateValue(20));
assert_eq!(values[7], ValueData::StringValue("b".to_string()));
assert_eq!(result[2].values.len(), 8);
let values = result[2]
.values
.iter()
.map(|v| v.value_data.clone().unwrap())
.collect::<Vec<_>>();
assert_eq!(values[0], ValueData::BoolValue(true));
assert_eq!(values[1], ValueData::I8Value(3));
assert_eq!(values[2], ValueData::I32Value(300));
assert_eq!(values[3], ValueData::U8Value(30));
assert_eq!(values[4], ValueData::U32Value(3000));
assert_eq!(values[5], ValueData::F32Value(3.3));
assert_eq!(values[6], ValueData::DateValue(30));
assert_eq!(values[7], ValueData::StringValue("c".to_string()));
}
}

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::region::{region_request, RegionRequest, RegionRequestHeader, RegionResponse};
use api::v1::region::{RegionRequest, RegionResponse};
use api::v1::ResponseHeader;
use async_trait::async_trait;
use common_error::ext::BoxedError;
@@ -23,19 +23,17 @@ use common_telemetry::timer;
use snafu::{location, Location, OptionExt};
use crate::error::Error::FlightGet;
use crate::error::{IllegalDatabaseResponseSnafu, Result, ServerSnafu};
use crate::error::{IllegalDatabaseResponseSnafu, MissingFieldSnafu, Result, ServerSnafu};
use crate::{metrics, Client};
#[derive(Debug)]
pub struct RegionRequester {
trace_id: u64,
span_id: u64,
client: Client,
}
#[async_trait]
impl Datanode for RegionRequester {
async fn handle(&self, request: region_request::Body) -> MetaResult<AffectedRows> {
async fn handle(&self, request: RegionRequest) -> MetaResult<AffectedRows> {
self.handle_inner(request).await.map_err(|err| {
if matches!(err, FlightGet { .. }) {
meta_error::Error::RetryLater {
@@ -53,24 +51,16 @@ impl Datanode for RegionRequester {
impl RegionRequester {
pub fn new(client: Client) -> Self {
// TODO(LFC): Pass in trace_id and span_id from some context when we have it.
Self {
trace_id: 0,
span_id: 0,
client,
}
Self { client }
}
async fn handle_inner(&self, request: region_request::Body) -> Result<AffectedRows> {
let request_type = request.as_ref().to_string();
let request = RegionRequest {
header: Some(RegionRequestHeader {
trace_id: self.trace_id,
span_id: self.span_id,
}),
body: Some(request),
};
async fn handle_inner(&self, request: RegionRequest) -> Result<AffectedRows> {
let request_type = request
.body
.as_ref()
.with_context(|| MissingFieldSnafu { field: "body" })?
.as_ref()
.to_string();
let _timer = timer!(
metrics::METRIC_REGION_REQUEST_GRPC,
@@ -89,7 +79,7 @@ impl RegionRequester {
Ok(affected_rows)
}
pub async fn handle(&self, request: region_request::Body) -> Result<AffectedRows> {
pub async fn handle(&self, request: RegionRequest) -> Result<AffectedRows> {
self.handle_inner(request).await
}
}

View File

@@ -21,6 +21,7 @@ use common_telemetry::info;
use common_telemetry::logging::LoggingOptions;
use datanode::datanode::{Datanode, DatanodeOptions, ProcedureConfig, StorageConfig};
use datanode::instance::InstanceRef;
use datanode::region_server::RegionServer;
use frontend::frontend::FrontendOptions;
use frontend::instance::{FrontendInstance, Instance as FeInstance};
use frontend::service_config::{
@@ -293,7 +294,8 @@ impl StartCommand {
.context(StartDatanodeSnafu)?;
// TODO: build frontend instance like in distributed mode
let mut frontend = build_frontend(plugins.clone(), todo!()).await?;
let mut frontend =
build_frontend(plugins.clone(), todo!(), datanode.region_server()).await?;
frontend
.build_servers(&fe_opts)
@@ -308,8 +310,9 @@ impl StartCommand {
async fn build_frontend(
plugins: Arc<Plugins>,
datanode_instance: InstanceRef,
region_server: RegionServer,
) -> Result<FeInstance> {
let mut frontend_instance = FeInstance::try_new_standalone(datanode_instance.clone())
let mut frontend_instance = FeInstance::try_new_standalone(datanode_instance, region_server)
.await
.context(StartFrontendSnafu)?;
frontend_instance.set_plugins(plugins.clone());

View File

@@ -14,7 +14,7 @@
use std::sync::Arc;
use api::v1::region::region_request;
use api::v1::region::RegionRequest;
use crate::error::Result;
use crate::peer::Peer;
@@ -24,7 +24,7 @@ pub type AffectedRows = u64;
#[async_trait::async_trait]
pub trait Datanode: Send + Sync {
/// Handles DML, and DDL requests.
async fn handle(&self, request: region_request::Body) -> Result<AffectedRows>;
async fn handle(&self, request: RegionRequest) -> Result<AffectedRows>;
}
pub type DatanodeRef = Arc<dyn Datanode>;

View File

@@ -13,7 +13,9 @@
// limitations under the License.
use api::v1::region::region_request::Body as PbRegionRequest;
use api::v1::region::{ColumnDef, CreateRequest as PbCreateRegionRequest};
use api::v1::region::{
ColumnDef, CreateRequest as PbCreateRegionRequest, RegionRequest, RegionRequestHeader,
};
use api::v1::SemanticType;
use async_trait::async_trait;
use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
@@ -197,6 +199,13 @@ impl CreateTableProcedure {
for request in requests {
let requester = manager.datanode(&datanode).await;
let request = RegionRequest {
header: Some(RegionRequestHeader {
trace_id: 0,
span_id: 0,
}),
body: Some(request),
};
if let Err(err) = requester.handle(request).await {
return Err(handle_operate_region_error(datanode)(err));
}

View File

@@ -12,7 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::region::{region_request, DropRequest as PbDropRegionRequest};
use api::v1::region::{
region_request, DropRequest as PbDropRegionRequest, RegionRequest, RegionRequestHeader,
};
use async_trait::async_trait;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
@@ -162,13 +164,17 @@ impl DropTableProcedure {
for region_id in region_ids {
debug!("Dropping region {region_id} on Datanode {datanode:?}");
let request = region_request::Body::Drop(PbDropRegionRequest {
region_id: region_id.as_u64(),
});
let request = RegionRequest {
header: Some(RegionRequestHeader {
trace_id: 0,
span_id: 0,
}),
body: Some(region_request::Body::Drop(PbDropRegionRequest {
region_id: region_id.as_u64(),
})),
};
let requester = clients.datanode(&datanode).await;
if let Err(err) = requester.handle(request).await {
if let Err(err) = clients.datanode(&datanode).await.handle(request).await {
if err.status_code() != StatusCode::RegionNotFound {
return Err(handle_operate_region_error(datanode)(err));
}

View File

@@ -398,6 +398,7 @@ pub struct Datanode {
opts: DatanodeOptions,
services: Option<Services>,
heartbeat_task: Option<HeartbeatTask>,
region_server: RegionServer,
}
impl Datanode {
@@ -434,7 +435,9 @@ impl Datanode {
Mode::Standalone => None,
};
let heartbeat_task = match opts.mode {
Mode::Distributed => Some(HeartbeatTask::try_new(&opts, Some(region_server)).await?),
Mode::Distributed => {
Some(HeartbeatTask::try_new(&opts, Some(region_server.clone())).await?)
}
Mode::Standalone => None,
};
@@ -442,6 +445,7 @@ impl Datanode {
opts,
services,
heartbeat_task,
region_server,
})
}
@@ -483,6 +487,10 @@ impl Datanode {
Ok(())
}
pub fn region_server(&self) -> RegionServer {
self.region_server.clone()
}
// internal utils
/// Build [RaftEngineLogStore]

View File

@@ -57,6 +57,12 @@ pub enum Error {
source: client::Error,
},
#[snafu(display("Failed to insert data, source: {}", source))]
RequestInserts {
#[snafu(backtrace)]
source: common_meta::error::Error,
},
#[snafu(display("Runtime resource error, source: {}", source))]
RuntimeResource {
#[snafu(backtrace)]
@@ -189,12 +195,12 @@ pub enum Error {
},
#[snafu(display(
"Failed to find table route for table {}, source: {}",
table_name,
"Failed to find table route for table id {}, source: {}",
table_id,
source
))]
FindTableRoute {
table_name: String,
table_id: u32,
#[snafu(backtrace)]
source: partition::error::Error,
},
@@ -325,6 +331,12 @@ pub enum Error {
source: datanode::error::Error,
},
#[snafu(display("{source}"))]
InvokeRegionServer {
#[snafu(backtrace)]
source: servers::error::Error,
},
#[snafu(display("Missing meta_client_options section in config"))]
MissingMetasrvOpts { location: Location },
@@ -674,6 +686,7 @@ impl ErrorExt for Error {
| Error::IntoVectors { source } => source.status_code(),
Error::RequestDatanode { source } => source.status_code(),
Error::RequestInserts { source } => source.status_code(),
Error::ColumnDataType { source } | Error::InvalidColumnDef { source, .. } => {
source.status_code()
@@ -725,6 +738,7 @@ impl ErrorExt for Error {
Error::TableAlreadyExist { .. } => StatusCode::TableAlreadyExists,
Error::EncodeSubstraitLogicalPlan { source } => source.status_code(),
Error::InvokeDatanode { source } => source.status_code(),
Error::InvokeRegionServer { source } => source.status_code(),
Error::External { source } => source.status_code(),
Error::DeserializePartition { source, .. }

View File

@@ -12,14 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use api::helper::ColumnDataTypeWrapper;
use api::v1::alter_expr::Kind;
use api::v1::ddl_request::Expr as DdlExpr;
use api::v1::greptime_request::Request;
use api::v1::region::{
region_request, InsertRequest as RegionInsertRequest, InsertRequests as RegionInsertRequests,
};
use api::v1::value::ValueData;
use api::v1::{
AlterExpr, Column, ColumnDataType, ColumnSchema, DdlRequest, InsertRequest, InsertRequests,
Row, RowInsertRequest, RowInsertRequests, Rows, Value,
Row, RowInsertRequest, RowInsertRequests, Rows, SemanticType, Value,
};
use catalog::CatalogManagerRef;
use common_base::BitVec;
@@ -28,22 +33,29 @@ use common_grpc_expr::util::{extract_new_columns, ColumnExpr};
use common_query::Output;
use common_telemetry::info;
use datatypes::schema::Schema;
use datatypes::vectors::VectorRef;
use servers::query_handler::grpc::GrpcQueryHandlerRef;
use session::context::QueryContextRef;
use snafu::prelude::*;
use store_api::storage::RegionId;
use table::engine::TableReference;
use table::metadata::TableInfoRef;
use table::requests::InsertRequest as TableInsertRequest;
use table::TableRef;
use crate::error::{
CatalogSnafu, ColumnDataTypeSnafu, EmptyDataSnafu, Error, FindNewColumnsOnInsertionSnafu,
InvalidInsertRequestSnafu, Result,
CatalogSnafu, ColumnDataTypeSnafu, ColumnNotFoundSnafu, EmptyDataSnafu, Error,
FindNewColumnsOnInsertionSnafu, InvalidInsertRequestSnafu, MissingTimeIndexColumnSnafu, Result,
TableNotFoundSnafu,
};
use crate::expr_factory::CreateExprFactory;
use crate::instance::region_handler::RegionRequestHandlerRef;
pub(crate) struct Inserter<'a> {
catalog_manager: &'a CatalogManagerRef,
create_expr_factory: &'a CreateExprFactory,
grpc_query_handler: &'a GrpcQueryHandlerRef<Error>,
region_request_handler: &'a RegionRequestHandlerRef,
}
impl<'a> Inserter<'a> {
@@ -51,11 +63,13 @@ impl<'a> Inserter<'a> {
catalog_manager: &'a CatalogManagerRef,
create_expr_factory: &'a CreateExprFactory,
grpc_query_handler: &'a GrpcQueryHandlerRef<Error>,
region_request_handler: &'a RegionRequestHandlerRef,
) -> Self {
Self {
catalog_manager,
create_expr_factory,
grpc_query_handler,
region_request_handler,
}
}
@@ -81,10 +95,30 @@ impl<'a> Inserter<'a> {
})
})?;
self.create_or_alter_tables_on_demand(&requests, ctx.clone())
self.create_or_alter_tables_on_demand(&requests, &ctx)
.await?;
let query = Request::RowInserts(requests);
self.grpc_query_handler.do_query(query, ctx).await
let region_request = self.convert_req_row_to_region(requests, &ctx).await?;
let response = self
.region_request_handler
.handle(region_request, ctx)
.await?;
Ok(Output::AffectedRows(response.affected_rows as _))
}
pub fn convert_req_table_to_region(
table_info: &TableInfoRef,
insert: TableInsertRequest,
) -> Result<RegionInsertRequests> {
let region_id = RegionId::new(table_info.table_id(), insert.region_number).into();
let row_count = row_count(&insert.columns_values)?;
let schema = column_schema(table_info, &insert.columns_values)?;
let rows = api::helper::vectors_to_rows(insert.columns_values.values(), row_count);
Ok(RegionInsertRequests {
requests: vec![RegionInsertRequest {
region_id,
rows: Some(Rows { schema, rows }),
}],
})
}
}
@@ -95,16 +129,16 @@ impl<'a> Inserter<'a> {
async fn create_or_alter_tables_on_demand(
&self,
requests: &RowInsertRequests,
ctx: QueryContextRef,
ctx: &QueryContextRef,
) -> Result<()> {
// TODO(jeremy): create and alter in batch?
for req in &requests.inserts {
match self.get_table(req, &ctx).await? {
match self.get_table(req, ctx).await? {
Some(table) => {
validate_request_with_table(req, &table)?;
self.alter_table_on_demand(req, table, &ctx).await?
self.alter_table_on_demand(req, table, ctx).await?
}
None => self.create_table(req, &ctx).await?,
None => self.create_table(req, ctx).await?,
}
}
@@ -192,6 +226,31 @@ impl<'a> Inserter<'a> {
Ok(())
}
async fn convert_req_row_to_region(
&self,
requests: RowInsertRequests,
ctx: &QueryContextRef,
) -> Result<region_request::Body> {
let mut region_request = Vec::with_capacity(requests.inserts.len());
for request in requests.inserts {
let table = self.get_table(&request, ctx).await?;
let table = table.with_context(|| TableNotFoundSnafu {
table_name: request.table_name.clone(),
})?;
let region_id = RegionId::new(table.table_info().table_id(), request.region_number);
let insert_request = RegionInsertRequest {
region_id: region_id.into(),
rows: request.rows,
};
region_request.push(insert_request);
}
Ok(region_request::Body::Inserts(RegionInsertRequests {
requests: region_request,
}))
}
}
fn requests_column_to_row(requests: InsertRequests) -> Result<RowInsertRequests> {
@@ -363,13 +422,81 @@ fn validate_required_columns(request_schema: &[ColumnSchema], table_schema: &Sch
Ok(())
}
fn row_count(columns: &HashMap<String, VectorRef>) -> Result<usize> {
let mut columns_iter = columns.values();
let len = columns_iter
.next()
.map(|column| column.len())
.unwrap_or_default();
ensure!(
columns_iter.all(|column| column.len() == len),
InvalidInsertRequestSnafu {
reason: "The row count of columns is not the same."
}
);
Ok(len)
}
fn column_schema(
table_info: &TableInfoRef,
columns: &HashMap<String, VectorRef>,
) -> Result<Vec<ColumnSchema>> {
let table_meta = &table_info.meta;
let mut schema = vec![];
for (column_name, vector) in columns {
let time_index_column = &table_meta
.schema
.timestamp_column()
.with_context(|| table::error::MissingTimeIndexColumnSnafu {
table_name: table_info.name.to_string(),
})
.context(MissingTimeIndexColumnSnafu)?
.name;
let semantic_type = if column_name == time_index_column {
SemanticType::Timestamp
} else {
let column_index = table_meta
.schema
.column_index_by_name(column_name)
.context(ColumnNotFoundSnafu {
msg: format!("unable to find column {column_name} in table schema"),
})?;
if table_meta.primary_key_indices.contains(&column_index) {
SemanticType::Tag
} else {
SemanticType::Field
}
};
let datatype: ColumnDataTypeWrapper =
vector.data_type().try_into().context(ColumnDataTypeSnafu)?;
schema.push(ColumnSchema {
column_name: column_name.clone(),
datatype: datatype.datatype().into(),
semantic_type: semantic_type.into(),
});
}
Ok(schema)
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use api::v1::column::Values;
use api::v1::SemanticType;
use common_base::bit_vec::prelude::*;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use datatypes::prelude::{ConcreteDataType, Value as DtValue};
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema as DtColumnSchema};
use datatypes::vectors::{Int16VectorBuilder, MutableVector, StringVectorBuilder};
use table::metadata::{TableInfoBuilder, TableMetaBuilder};
use super::*;
@@ -551,4 +678,104 @@ mod tests {
};
assert!(request_column_to_row(invalid_request_with_wrong_row_count).is_err());
}
#[test]
fn test_insert_request_table_to_region() {
let schema = Schema::new(vec![
DtColumnSchema::new("ts", ConcreteDataType::int64_datatype(), false)
.with_time_index(true),
DtColumnSchema::new("id", ConcreteDataType::int16_datatype(), false),
DtColumnSchema::new("host", ConcreteDataType::string_datatype(), false),
]);
let table_meta = TableMetaBuilder::default()
.schema(Arc::new(schema))
.primary_key_indices(vec![2])
.next_column_id(3)
.build()
.unwrap();
let table_info = Arc::new(
TableInfoBuilder::default()
.name("demo")
.meta(table_meta)
.table_id(1)
.build()
.unwrap(),
);
let insert_request = mock_insert_request();
let mut request =
Inserter::convert_req_table_to_region(&table_info, insert_request).unwrap();
assert_eq!(request.requests.len(), 1);
verify_region_insert_request(request.requests.pop().unwrap());
}
fn mock_insert_request() -> TableInsertRequest {
let mut builder = StringVectorBuilder::with_capacity(3);
builder.push(Some("host1"));
builder.push(None);
builder.push(Some("host3"));
let host = builder.to_vector();
let mut builder = Int16VectorBuilder::with_capacity(3);
builder.push(Some(1_i16));
builder.push(Some(2_i16));
builder.push(Some(3_i16));
let id = builder.to_vector();
let columns_values = HashMap::from([("host".to_string(), host), ("id".to_string(), id)]);
TableInsertRequest {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "demo".to_string(),
columns_values,
region_number: 0,
}
}
fn verify_region_insert_request(request: RegionInsertRequest) {
assert_eq!(request.region_id, RegionId::new(1, 0).as_u64());
let rows = request.rows.unwrap();
for (i, column) in rows.schema.iter().enumerate() {
let name = &column.column_name;
if name == "id" {
assert_eq!(ColumnDataType::Int16 as i32, column.datatype);
assert_eq!(SemanticType::Field as i32, column.semantic_type);
let values = rows
.rows
.iter()
.map(|row| row.values[i].value_data.clone())
.collect::<Vec<_>>();
assert_eq!(
vec![
Some(ValueData::I16Value(1)),
Some(ValueData::I16Value(2)),
Some(ValueData::I16Value(3))
],
values
);
}
if name == "host" {
assert_eq!(ColumnDataType::String as i32, column.datatype);
assert_eq!(SemanticType::Tag as i32, column.semantic_type);
let values = rows
.rows
.iter()
.map(|row| row.values[i].value_data.clone())
.collect::<Vec<_>>();
assert_eq!(
vec![
Some(ValueData::StringValue("host1".to_string())),
None,
Some(ValueData::StringValue("host3".to_string()))
],
values
);
}
}
}
}

View File

@@ -18,6 +18,7 @@ mod influxdb;
mod opentsdb;
mod otlp;
mod prom_store;
pub mod region_handler;
mod script;
mod standalone;
@@ -43,6 +44,7 @@ use common_telemetry::logging::info;
use common_telemetry::{error, timer};
use datanode::instance::sql::table_idents_to_full_name;
use datanode::instance::InstanceRef as DnInstanceRef;
use datanode::region_server::RegionServer;
use distributed::DistInstance;
use meta_client::client::{MetaClient, MetaClientBuilder};
use partition::manager::PartitionRuleManager;
@@ -72,6 +74,9 @@ use sql::statements::copy::CopyTable;
use sql::statements::statement::Statement;
use sqlparser::ast::ObjectName;
use self::distributed::DistRegionRequestHandler;
use self::region_handler::RegionRequestHandlerRef;
use self::standalone::StandaloneRegionRequestHandler;
use crate::catalog::FrontendCatalogManager;
use crate::error::{
self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, MissingMetasrvOptsSnafu,
@@ -115,6 +120,7 @@ pub struct Instance {
statement_executor: Arc<StatementExecutor>,
query_engine: QueryEngineRef,
grpc_query_handler: GrpcQueryHandlerRef<Error>,
region_request_handler: RegionRequestHandlerRef,
create_expr_factory: CreateExprFactory,
/// plugins: this map holds extensions to customize query or auth
/// behaviours.
@@ -165,7 +171,7 @@ impl Instance {
catalog_manager.clone(),
true,
Some(partition_manager.clone()),
Some(datanode_clients),
Some(datanode_clients.clone()),
plugins.clone(),
)
.query_engine();
@@ -173,10 +179,13 @@ impl Instance {
let script_executor =
Arc::new(ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()).await?);
let region_request_handler = DistRegionRequestHandler::arc(catalog_manager.clone());
let statement_executor = Arc::new(StatementExecutor::new(
catalog_manager.clone(),
query_engine.clone(),
dist_instance.clone(),
region_request_handler.clone(),
));
plugins.insert::<StatementExecutorRef>(statement_executor.clone());
@@ -205,6 +214,7 @@ impl Instance {
create_expr_factory,
statement_executor,
query_engine,
region_request_handler,
grpc_query_handler: dist_instance,
plugins: plugins.clone(),
servers: Arc::new(HashMap::new()),
@@ -249,16 +259,21 @@ impl Instance {
Ok(Arc::new(meta_client))
}
pub async fn try_new_standalone(dn_instance: DnInstanceRef) -> Result<Self> {
pub async fn try_new_standalone(
dn_instance: DnInstanceRef,
region_server: RegionServer,
) -> Result<Self> {
let catalog_manager = dn_instance.catalog_manager();
let query_engine = dn_instance.query_engine();
let script_executor =
Arc::new(ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()).await?);
let region_request_handler = StandaloneRegionRequestHandler::arc(region_server);
let statement_executor = Arc::new(StatementExecutor::new(
catalog_manager.clone(),
query_engine.clone(),
dn_instance.clone(),
region_request_handler.clone(),
));
let create_expr_factory = CreateExprFactory;
@@ -271,6 +286,7 @@ impl Instance {
statement_executor,
query_engine,
grpc_query_handler,
region_request_handler,
plugins: Default::default(),
servers: Arc::new(HashMap::new()),
heartbeat_task: None,
@@ -298,6 +314,7 @@ impl Instance {
&self.catalog_manager,
&self.create_expr_factory,
&self.grpc_query_handler,
&self.region_request_handler,
);
inserter.handle_row_inserts(requests, ctx).await
}
@@ -312,6 +329,7 @@ impl Instance {
&self.catalog_manager,
&self.create_expr_factory,
&self.grpc_query_handler,
&self.region_request_handler,
);
inserter.handle_column_inserts(requests, ctx).await
}

View File

@@ -14,7 +14,6 @@
pub mod deleter;
pub(crate) mod inserter;
pub(crate) mod row_inserter;
use std::collections::HashMap;
use std::sync::Arc;
@@ -22,9 +21,9 @@ use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;
use api::v1::ddl_request::Expr as DdlExpr;
use api::v1::greptime_request::Request;
use api::v1::region::{region_request, RegionResponse};
use api::v1::{
column_def, AlterExpr, CreateDatabaseExpr, CreateTableExpr, DeleteRequests, InsertRequests,
RowInsertRequests, TruncateTableExpr,
column_def, AlterExpr, CreateDatabaseExpr, CreateTableExpr, DeleteRequests, TruncateTableExpr,
};
use async_trait::async_trait;
use catalog::{CatalogManager, DeregisterTableRequest, RegisterTableRequest};
@@ -54,22 +53,21 @@ use sql::ast::{Ident, Value as SqlValue};
use sql::statements::create::{PartitionEntry, Partitions};
use sql::statements::statement::Statement;
use sql::statements::{self, sql_value_to_value};
use table::error::TableOperationSnafu;
use table::metadata::{RawTableInfo, RawTableMeta, TableId, TableIdent, TableInfo, TableType};
use table::requests::{AlterTableRequest, TableOptions};
use table::TableRef;
use super::region_handler::RegionRequestHandler;
use crate::catalog::FrontendCatalogManager;
use crate::error::{
self, AlterExprToRequestSnafu, CatalogSnafu, ColumnDataTypeSnafu, ColumnNotFoundSnafu,
DeserializePartitionSnafu, InvokeDatanodeSnafu, NotSupportedSnafu, ParseSqlSnafu, Result,
SchemaExistsSnafu, TableAlreadyExistSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu,
TableSnafu, UnrecognizedTableOptionSnafu,
UnrecognizedTableOptionSnafu,
};
use crate::expr_factory;
use crate::instance::distributed::deleter::DistDeleter;
use crate::instance::distributed::inserter::DistInserter;
use crate::instance::distributed::row_inserter::RowDistInserter;
use crate::table::DistTable;
const MAX_VALUE: &str = "MAXVALUE";
@@ -266,23 +264,13 @@ impl DistInstance {
self.drop_table(table_name).await
}
Statement::Insert(insert) => {
let (catalog, schema, _) =
table_idents_to_full_name(insert.table_name(), query_ctx.clone())
.map_err(BoxedError::new)
.context(error::ExternalSnafu)?;
let insert_request =
SqlHandler::insert_to_request(self.catalog_manager.clone(), &insert, query_ctx)
.await
.context(InvokeDatanodeSnafu)?;
let inserter = DistInserter::new(catalog, schema, self.catalog_manager.clone());
let affected_rows = inserter
.insert(vec![insert_request])
.await
.map_err(BoxedError::new)
.context(TableOperationSnafu)
.context(TableSnafu)?;
let inserter = DistInserter::new(&self.catalog_manager);
let affected_rows = inserter.insert_table_request(insert_request).await?;
Ok(Output::AffectedRows(affected_rows as usize))
}
Statement::ShowCreateTable(show) => {
@@ -517,34 +505,6 @@ impl DistInstance {
.context(error::ExecuteDdlSnafu)
}
async fn handle_dist_insert(
&self,
requests: InsertRequests,
ctx: QueryContextRef,
) -> Result<Output> {
let inserter = DistInserter::new(
ctx.current_catalog().to_owned(),
ctx.current_schema().to_owned(),
self.catalog_manager.clone(),
);
let affected_rows = inserter.grpc_insert(requests).await?;
Ok(Output::AffectedRows(affected_rows as usize))
}
async fn handle_row_dist_insert(
&self,
requests: RowInsertRequests,
ctx: QueryContextRef,
) -> Result<Output> {
let inserter = RowDistInserter::new(
ctx.current_catalog().to_owned(),
ctx.current_schema().to_owned(),
self.catalog_manager.clone(),
);
let affected_rows = inserter.insert(requests).await?;
Ok(Output::AffectedRows(affected_rows as usize))
}
async fn handle_dist_delete(
&self,
request: DeleteRequests,
@@ -584,8 +544,11 @@ impl GrpcQueryHandler for DistInstance {
async fn do_query(&self, request: Request, ctx: QueryContextRef) -> Result<Output> {
match request {
Request::Inserts(requests) => self.handle_dist_insert(requests, ctx).await,
Request::RowInserts(requests) => self.handle_row_dist_insert(requests, ctx).await,
Request::Inserts(_) => NotSupportedSnafu { feat: "inserts" }.fail(),
Request::RowInserts(_) => NotSupportedSnafu {
feat: "row inserts",
}
.fail(),
Request::RowDeletes(_) => NotSupportedSnafu {
feat: "row deletes",
}
@@ -621,6 +584,69 @@ impl GrpcQueryHandler for DistInstance {
}
}
pub(crate) struct DistRegionRequestHandler {
catalog_manager: Arc<FrontendCatalogManager>,
}
impl DistRegionRequestHandler {
pub fn arc(catalog_manager: Arc<FrontendCatalogManager>) -> Arc<Self> {
Arc::new(Self { catalog_manager })
}
}
#[async_trait]
impl RegionRequestHandler for DistRegionRequestHandler {
async fn handle(
&self,
request: region_request::Body,
ctx: QueryContextRef,
) -> Result<RegionResponse> {
match request {
region_request::Body::Inserts(inserts) => {
let inserter =
DistInserter::new(&self.catalog_manager).with_trace_id(ctx.trace_id());
let affected_rows = inserter.insert_region_requests(inserts).await? as _;
Ok(RegionResponse {
header: Some(Default::default()),
affected_rows,
})
}
region_request::Body::Deletes(_) => NotSupportedSnafu {
feat: "region deletes",
}
.fail(),
region_request::Body::Create(_) => NotSupportedSnafu {
feat: "region create",
}
.fail(),
region_request::Body::Drop(_) => NotSupportedSnafu {
feat: "region drop",
}
.fail(),
region_request::Body::Open(_) => NotSupportedSnafu {
feat: "region open",
}
.fail(),
region_request::Body::Close(_) => NotSupportedSnafu {
feat: "region close",
}
.fail(),
region_request::Body::Alter(_) => NotSupportedSnafu {
feat: "region alter",
}
.fail(),
region_request::Body::Flush(_) => NotSupportedSnafu {
feat: "region flush",
}
.fail(),
region_request::Body::Compact(_) => NotSupportedSnafu {
feat: "region compact",
}
.fail(),
}
}
}
fn create_partitions_stmt(partitions: Vec<PartitionInfo>) -> Result<Option<Partitions>> {
if partitions.is_empty() {
return Ok(None);

View File

@@ -125,9 +125,7 @@ impl DistDeleter {
let table_route = partition_manager
.find_table_route(table_id)
.await
.with_context(|_| FindTableRouteSnafu {
table_name: table_name.to_string(),
})?;
.with_context(|_| FindTableRouteSnafu { table_id })?;
for (region_number, delete) in split {
let datanode =

View File

@@ -13,172 +13,149 @@
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::InsertRequests;
use api::v1::region::{region_request, InsertRequests, RegionRequest, RegionRequestHeader};
use catalog::CatalogManager;
use client::Database;
use common_grpc_expr::insert::to_table_insert_request;
use common_meta::datanode_manager::DatanodeManager;
use common_meta::peer::Peer;
use common_meta::table_name::TableName;
use futures::future;
use futures_util::future;
use metrics::counter;
use snafu::{OptionExt, ResultExt};
use table::metadata::TableInfoRef;
use table::meter_insert_request;
use table::requests::InsertRequest;
use store_api::storage::RegionId;
use table::requests::InsertRequest as TableInsertRequest;
use crate::catalog::FrontendCatalogManager;
use crate::error::{
CatalogSnafu, FindDatanodeSnafu, FindTableRouteSnafu, JoinTaskSnafu, RequestDatanodeSnafu,
Result, SplitInsertSnafu, TableNotFoundSnafu, ToTableInsertRequestSnafu,
CatalogSnafu, FindDatanodeSnafu, FindTableRouteSnafu, JoinTaskSnafu, RequestInsertsSnafu,
Result, SplitInsertSnafu, TableNotFoundSnafu,
};
use crate::table::insert::to_grpc_insert_request;
use crate::inserter::Inserter;
/// A distributed inserter. It ingests GRPC [InsertRequests] or table [InsertRequest] (so it can be
/// used in protocol handlers or table insertion API).
/// A distributed inserter. It ingests gRPC [InsertRequests].
///
/// Table data partitioning and Datanode requests batching are handled inside.
///
/// Note that the inserter is confined to a single catalog and schema. I.e., it cannot handle
/// multiple insert requests with different catalog or schema (will throw "NotSupported" error).
/// This is because we currently do not have this kind of requirements. Let's keep it simple for now.
pub(crate) struct DistInserter {
catalog: String,
schema: String,
catalog_manager: Arc<FrontendCatalogManager>,
pub struct DistInserter<'a> {
catalog_manager: &'a FrontendCatalogManager,
trace_id: Option<u64>,
span_id: Option<u64>,
}
impl DistInserter {
pub(crate) fn new(
catalog: String,
schema: String,
catalog_manager: Arc<FrontendCatalogManager>,
) -> Self {
impl<'a> DistInserter<'a> {
pub fn new(catalog_manager: &'a FrontendCatalogManager) -> Self {
Self {
catalog,
schema,
catalog_manager,
trace_id: None,
span_id: None,
}
}
pub(crate) async fn grpc_insert(&self, requests: InsertRequests) -> Result<u32> {
let inserts = requests
.inserts
.into_iter()
.map(|x| {
to_table_insert_request(&self.catalog, &self.schema, x)
.context(ToTableInsertRequestSnafu)
})
.collect::<Result<Vec<_>>>()?;
self.insert(inserts).await
pub fn with_trace_id(mut self, trace_id: u64) -> Self {
self.trace_id = Some(trace_id);
self
}
pub(crate) async fn insert(&self, requests: Vec<InsertRequest>) -> Result<u32> {
debug_assert!(requests
.iter()
.all(|x| x.catalog_name == self.catalog && x.schema_name == self.schema));
let inserts = self.split_inserts(requests).await?;
self.request_datanodes(inserts).await
#[allow(dead_code)]
pub fn with_span_id(mut self, span_id: u64) -> Self {
self.span_id = Some(span_id);
self
}
/// Splits multiple table [InsertRequest]s into multiple GRPC [InsertRequests]s, each of which
/// is grouped by the peer of Datanode, so we can batch them together when invoking gRPC write
/// method in Datanode.
async fn split_inserts(
&self,
requests: Vec<InsertRequest>,
) -> Result<HashMap<Peer, InsertRequests>> {
let partition_manager = self.catalog_manager.partition_manager();
let mut inserts = HashMap::new();
for request in requests {
meter_insert_request!(request);
let table_name = TableName::new(&self.catalog, &self.schema, &request.table_name);
let table_info = self.find_table_info(&request.table_name).await?;
let table_meta = &table_info.meta;
let table_id = table_info.table_id();
let split = partition_manager
.split_insert_request(table_id, request, table_meta.schema.as_ref())
.await
.context(SplitInsertSnafu)?;
let table_route = partition_manager
.find_table_route(table_id)
.await
.with_context(|_| FindTableRouteSnafu {
table_name: table_name.to_string(),
})?;
for (region_number, insert) in split {
let datanode =
table_route
.find_region_leader(region_number)
.context(FindDatanodeSnafu {
region: region_number,
})?;
let insert = to_grpc_insert_request(table_meta, region_number, insert)?;
inserts
.entry(datanode.clone())
.or_insert_with(|| InsertRequests { inserts: vec![] })
.inserts
.push(insert);
}
}
Ok(inserts)
}
async fn find_table_info(&self, table_name: &str) -> Result<TableInfoRef> {
let table = self
.catalog_manager
.table(&self.catalog, &self.schema, table_name)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: common_catalog::format_full_table_name(
&self.catalog,
&self.schema,
table_name,
),
})?;
Ok(table.table_info())
}
async fn request_datanodes(&self, inserts: HashMap<Peer, InsertRequests>) -> Result<u32> {
let results = future::try_join_all(inserts.into_iter().map(|(peer, inserts)| {
pub(crate) async fn insert_region_requests(&self, requests: InsertRequests) -> Result<u64> {
let requests = self.split(requests).await?;
let trace_id = self.trace_id.unwrap_or_default();
let span_id = self.span_id.unwrap_or_default();
let results = future::try_join_all(requests.into_iter().map(|(peer, inserts)| {
let datanode_clients = self.catalog_manager.datanode_clients();
let catalog = self.catalog.clone();
let schema = self.schema.clone();
common_runtime::spawn_write(async move {
let client = datanode_clients.get_client(&peer).await;
let database = Database::new(&catalog, &schema, client);
database.insert(inserts).await.context(RequestDatanodeSnafu)
let request = RegionRequest {
header: Some(RegionRequestHeader { trace_id, span_id }),
body: Some(region_request::Body::Inserts(inserts)),
};
datanode_clients
.datanode(&peer)
.await
.handle(request)
.await
.context(RequestInsertsSnafu)
})
}))
.await
.context(JoinTaskSnafu)?;
let affected_rows = results.into_iter().sum::<Result<u32>>()?;
counter!(crate::metrics::DIST_INGEST_ROW_COUNT, affected_rows as u64);
let affected_rows = results.into_iter().sum::<Result<u64>>()?;
counter!(crate::metrics::DIST_INGEST_ROW_COUNT, affected_rows);
Ok(affected_rows)
}
pub(crate) async fn insert_table_request(&self, request: TableInsertRequest) -> Result<u64> {
let table = self
.catalog_manager
.table(
&request.catalog_name,
&request.schema_name,
&request.table_name,
)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: format!(
"{}.{}.{}",
request.catalog_name, request.schema_name, request.table_name
),
})?;
let table_info = table.table_info();
let request = Inserter::convert_req_table_to_region(&table_info, request)?;
self.insert_region_requests(request).await
}
/// Splits gRPC [InsertRequests] into multiple gRPC [InsertRequests]s, each of which
/// is grouped by the peer of Datanode, so we can batch them together when invoking gRPC write
/// method in Datanode.
async fn split(&self, requests: InsertRequests) -> Result<HashMap<Peer, InsertRequests>> {
let partition_manager = self.catalog_manager.partition_manager();
let mut inserts: HashMap<Peer, InsertRequests> = HashMap::new();
for req in requests.requests {
let table_id = RegionId::from_u64(req.region_id).table_id();
let req_splits = partition_manager
.split_insert_request(table_id, req)
.await
.context(SplitInsertSnafu)?;
let table_route = partition_manager
.find_table_route(table_id)
.await
.context(FindTableRouteSnafu { table_id })?;
for (region_number, insert) in req_splits {
let peer =
table_route
.find_region_leader(region_number)
.context(FindDatanodeSnafu {
region: region_number,
})?;
inserts
.entry(peer.clone())
.or_default()
.requests
.push(insert);
}
}
Ok(inserts)
}
}
#[cfg(test)]
mod tests {
use api::v1::column::Values;
use api::v1::{Column, ColumnDataType, InsertRequest as GrpcInsertRequest, SemanticType};
use std::sync::Arc;
use api::helper::vectors_to_rows;
use api::v1::region::InsertRequest;
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value};
use client::client_manager::DatanodeClients;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_meta::key::catalog_name::{CatalogManager, CatalogNameKey};
use common_meta::key::schema_name::{SchemaManager, SchemaNameKey};
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
@@ -186,7 +163,7 @@ mod tests {
use common_meta::kv_backend::KvBackendRef;
use common_meta::rpc::router::{Region, RegionRoute};
use datatypes::prelude::{ConcreteDataType, VectorRef};
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, Schema};
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema as DtColumnSchema, Schema};
use datatypes::vectors::Int32Vector;
use table::metadata::{RawTableInfo, TableInfoBuilder, TableMetaBuilder};
@@ -217,13 +194,13 @@ mod tests {
table_metadata_manager: &TableMetadataManagerRef,
) {
let schema = Arc::new(Schema::new(vec![
ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), false)
DtColumnSchema::new("ts", ConcreteDataType::int64_datatype(), false)
.with_time_index(true)
.with_default_constraint(Some(ColumnDefaultConstraint::Function(
"current_timestamp()".to_string(),
)))
.unwrap(),
ColumnSchema::new("a", ConcreteDataType::int32_datatype(), true),
DtColumnSchema::new("a", ConcreteDataType::int32_datatype(), true),
]));
let table_meta = TableMetaBuilder::default()
@@ -279,61 +256,65 @@ mod tests {
table_metadata_manager,
));
let inserter = DistInserter::new(
DEFAULT_CATALOG_NAME.to_string(),
DEFAULT_SCHEMA_NAME.to_string(),
catalog_manager,
);
let inserter = DistInserter::new(&catalog_manager);
let new_insert_request = |vector: VectorRef| -> InsertRequest {
let row_count = vector.len();
InsertRequest {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table_name.to_string(),
columns_values: HashMap::from([("a".to_string(), vector)]),
region_number: 0,
region_id: RegionId::new(1, 0).into(),
rows: Some(Rows {
schema: vec![ColumnSchema {
column_name: "a".to_string(),
datatype: ColumnDataType::Int32 as i32,
semantic_type: SemanticType::Field as i32,
}],
rows: vectors_to_rows([vector].iter(), row_count),
}),
}
};
let requests = vec![
new_insert_request(Arc::new(Int32Vector::from(vec![
Some(1),
None,
Some(11),
Some(101),
]))),
new_insert_request(Arc::new(Int32Vector::from(vec![
Some(2),
Some(12),
None,
Some(102),
]))),
];
let mut inserts = inserter.split_inserts(requests).await.unwrap();
let requests = InsertRequests {
requests: vec![
new_insert_request(Arc::new(Int32Vector::from(vec![
Some(1),
None,
Some(11),
Some(101),
]))),
new_insert_request(Arc::new(Int32Vector::from(vec![
Some(2),
Some(12),
None,
Some(102),
]))),
],
};
let mut inserts = inserter.split(requests).await.unwrap();
assert_eq!(inserts.len(), 3);
let new_grpc_insert_request = |column_values: Vec<i32>,
null_mask: Vec<u8>,
row_count: u32,
region_number: u32|
-> GrpcInsertRequest {
GrpcInsertRequest {
table_name: table_name.to_string(),
columns: vec![Column {
column_name: "a".to_string(),
semantic_type: SemanticType::Field as i32,
values: Some(Values {
i32_values: column_values,
..Default::default()
let new_split_insert_request =
|rows: Vec<Option<i32>>, region_id: RegionId| -> InsertRequest {
InsertRequest {
region_id: region_id.into(),
rows: Some(Rows {
schema: vec![ColumnSchema {
column_name: "a".to_string(),
datatype: ColumnDataType::Int32 as i32,
semantic_type: SemanticType::Field as i32,
}],
rows: rows
.into_iter()
.map(|v| Row {
values: vec![Value {
value_data: v.map(ValueData::I32Value),
}],
})
.collect(),
}),
null_mask,
datatype: ColumnDataType::Int32 as i32,
}],
row_count,
region_number,
}
};
}
};
// region to datanode placement:
// 1 -> 1
@@ -345,37 +326,37 @@ mod tests {
// 2 -> [10, 50)
// 3 -> (min, 10)
let datanode_inserts = inserts.remove(&Peer::new(1, "")).unwrap().inserts;
let datanode_inserts = inserts.remove(&Peer::new(1, "")).unwrap().requests;
assert_eq!(datanode_inserts.len(), 2);
assert_eq!(
datanode_inserts[0],
new_grpc_insert_request(vec![101], vec![0], 1, 1)
new_split_insert_request(vec![Some(101)], RegionId::new(1, 1))
);
assert_eq!(
datanode_inserts[1],
new_grpc_insert_request(vec![102], vec![0], 1, 1)
new_split_insert_request(vec![Some(102)], RegionId::new(1, 1))
);
let datanode_inserts = inserts.remove(&Peer::new(2, "")).unwrap().inserts;
let datanode_inserts = inserts.remove(&Peer::new(2, "")).unwrap().requests;
assert_eq!(datanode_inserts.len(), 2);
assert_eq!(
datanode_inserts[0],
new_grpc_insert_request(vec![11], vec![0], 1, 2)
new_split_insert_request(vec![Some(11)], RegionId::new(1, 2))
);
assert_eq!(
datanode_inserts[1],
new_grpc_insert_request(vec![12], vec![0], 1, 2)
new_split_insert_request(vec![Some(12)], RegionId::new(1, 2))
);
let datanode_inserts = inserts.remove(&Peer::new(3, "")).unwrap().inserts;
let datanode_inserts = inserts.remove(&Peer::new(3, "")).unwrap().requests;
assert_eq!(datanode_inserts.len(), 2);
assert_eq!(
datanode_inserts[0],
new_grpc_insert_request(vec![1], vec![2], 2, 3)
new_split_insert_request(vec![Some(1), None], RegionId::new(1, 3))
);
assert_eq!(
datanode_inserts[1],
new_grpc_insert_request(vec![2], vec![2], 2, 3)
new_split_insert_request(vec![Some(2), None], RegionId::new(1, 3))
);
}
}

View File

@@ -1,125 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::RowInsertRequests;
use catalog::CatalogManager;
use client::Database;
use common_meta::peer::Peer;
use futures_util::future;
use metrics::counter;
use snafu::{OptionExt, ResultExt};
use table::metadata::TableId;
use crate::catalog::FrontendCatalogManager;
use crate::error::{
CatalogSnafu, FindDatanodeSnafu, FindTableRouteSnafu, JoinTaskSnafu, RequestDatanodeSnafu,
Result, SplitInsertSnafu, TableNotFoundSnafu,
};
pub struct RowDistInserter {
catalog_name: String,
schema_name: String,
catalog_manager: Arc<FrontendCatalogManager>,
}
impl RowDistInserter {
pub fn new(
catalog_name: String,
schema_name: String,
catalog_manager: Arc<FrontendCatalogManager>,
) -> Self {
Self {
catalog_name,
schema_name,
catalog_manager,
}
}
pub(crate) async fn insert(&self, requests: RowInsertRequests) -> Result<u32> {
let requests = self.split(requests).await?;
let results = future::try_join_all(requests.into_iter().map(|(peer, inserts)| {
let datanode_clients = self.catalog_manager.datanode_clients();
let catalog = self.catalog_name.clone();
let schema = self.schema_name.clone();
common_runtime::spawn_write(async move {
let client = datanode_clients.get_client(&peer).await;
let database = Database::new(catalog, schema, client);
database
.row_insert(inserts)
.await
.context(RequestDatanodeSnafu)
})
}))
.await
.context(JoinTaskSnafu)?;
let affected_rows = results.into_iter().sum::<Result<u32>>()?;
counter!(crate::metrics::DIST_INGEST_ROW_COUNT, affected_rows as u64);
Ok(affected_rows)
}
async fn split(&self, requests: RowInsertRequests) -> Result<HashMap<Peer, RowInsertRequests>> {
let partition_manager = self.catalog_manager.partition_manager();
let mut inserts: HashMap<Peer, RowInsertRequests> = HashMap::new();
for req in requests.inserts {
let table_name = req.table_name.clone();
let table_id = self.get_table_id(table_name.as_str()).await?;
let req_splits = partition_manager
.split_row_insert_request(table_id, req)
.await
.context(SplitInsertSnafu)?;
let table_route = partition_manager
.find_table_route(table_id)
.await
.context(FindTableRouteSnafu { table_name })?;
for (region_number, insert) in req_splits {
let peer =
table_route
.find_region_leader(region_number)
.context(FindDatanodeSnafu {
region: region_number,
})?;
inserts
.entry(peer.clone())
.or_default()
.inserts
.push(insert);
}
}
Ok(inserts)
}
async fn get_table_id(&self, table_name: &str) -> Result<TableId> {
self.catalog_manager
.table(&self.catalog_name, &self.schema_name, table_name)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: common_catalog::format_full_table_name(
&self.catalog_name,
&self.schema_name,
table_name,
),
})
.map(|table| table.table_info().table_id())
}
}

View File

@@ -0,0 +1,32 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use api::v1::region::{region_request, RegionResponse};
use async_trait::async_trait;
use session::context::QueryContextRef;
use crate::error::Result;
#[async_trait]
pub trait RegionRequestHandler: Send + Sync {
async fn handle(
&self,
request: region_request::Body,
ctx: QueryContextRef,
) -> Result<RegionResponse>;
}
pub type RegionRequestHandlerRef = Arc<dyn RegionRequestHandler>;

View File

@@ -15,14 +15,18 @@
use std::sync::Arc;
use api::v1::greptime_request::Request;
use api::v1::region::{region_request, RegionResponse};
use async_trait::async_trait;
use common_query::Output;
use datanode::error::Error as DatanodeError;
use datanode::region_server::RegionServer;
use servers::grpc::region_server::RegionServerHandler;
use servers::query_handler::grpc::{GrpcQueryHandler, GrpcQueryHandlerRef};
use session::context::QueryContextRef;
use snafu::ResultExt;
use crate::error::{self, Result};
use super::region_handler::RegionRequestHandler;
use crate::error::{Error, InvokeDatanodeSnafu, InvokeRegionServerSnafu, Result};
pub(crate) struct StandaloneGrpcQueryHandler(GrpcQueryHandlerRef<DatanodeError>);
@@ -34,12 +38,36 @@ impl StandaloneGrpcQueryHandler {
#[async_trait]
impl GrpcQueryHandler for StandaloneGrpcQueryHandler {
type Error = error::Error;
type Error = Error;
async fn do_query(&self, query: Request, ctx: QueryContextRef) -> Result<Output> {
self.0
.do_query(query, ctx)
.await
.context(error::InvokeDatanodeSnafu)
.context(InvokeDatanodeSnafu)
}
}
pub(crate) struct StandaloneRegionRequestHandler {
region_server: RegionServer,
}
impl StandaloneRegionRequestHandler {
pub fn arc(region_server: RegionServer) -> Arc<Self> {
Arc::new(Self { region_server })
}
}
#[async_trait]
impl RegionRequestHandler for StandaloneRegionRequestHandler {
async fn handle(
&self,
request: region_request::Body,
_ctx: QueryContextRef,
) -> Result<RegionResponse> {
self.region_server
.handle(request)
.await
.context(InvokeRegionServerSnafu)
}
}

View File

@@ -24,6 +24,7 @@ use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
use api::v1::region::region_request;
use catalog::CatalogManagerRef;
use common_error::ext::BoxedError;
use common_query::Output;
@@ -50,8 +51,9 @@ use crate::error::{
self, CatalogSnafu, ExecLogicalPlanSnafu, ExecuteStatementSnafu, ExternalSnafu, InsertSnafu,
PlanStatementSnafu, Result, TableNotFoundSnafu,
};
use crate::inserter::Inserter;
use crate::instance::distributed::deleter::DistDeleter;
use crate::instance::distributed::inserter::DistInserter;
use crate::instance::region_handler::RegionRequestHandlerRef;
use crate::statement::backup::{COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY};
#[derive(Clone)]
@@ -59,6 +61,7 @@ pub struct StatementExecutor {
catalog_manager: CatalogManagerRef,
query_engine: QueryEngineRef,
sql_stmt_executor: SqlStatementExecutorRef,
region_request_handler: RegionRequestHandlerRef,
}
impl StatementExecutor {
@@ -66,11 +69,13 @@ impl StatementExecutor {
catalog_manager: CatalogManagerRef,
query_engine: QueryEngineRef,
sql_stmt_executor: SqlStatementExecutorRef,
region_request_handler: RegionRequestHandlerRef,
) -> Self {
Self {
catalog_manager,
query_engine,
sql_stmt_executor,
region_request_handler,
}
}
@@ -110,9 +115,10 @@ impl StatementExecutor {
.copy_table_to(req, query_ctx)
.await
.map(Output::AffectedRows),
CopyDirection::Import => {
self.copy_table_from(req).await.map(Output::AffectedRows)
}
CopyDirection::Import => self
.copy_table_from(req, query_ctx)
.await
.map(Output::AffectedRows),
}
}
@@ -165,45 +171,26 @@ impl StatementExecutor {
})
}
// TODO(zhongzc): A middle state that eliminates calls to table.insert,
// For DistTable, its insert is not invoked; for MitoTable, it is still called but eventually eliminated.
async fn send_insert_request(&self, request: InsertRequest) -> Result<usize> {
let frontend_catalog_manager = self
.catalog_manager
.as_any()
.downcast_ref::<FrontendCatalogManager>();
async fn handle_table_insert_request(
&self,
request: InsertRequest,
query_ctx: QueryContextRef,
) -> Result<usize> {
let table_ref = TableReference::full(
&request.catalog_name,
&request.schema_name,
&request.table_name,
);
let table = self.get_table(&table_ref).await?;
let table_info = table.table_info();
let table_name = request.table_name.clone();
match frontend_catalog_manager {
Some(frontend_catalog_manager) => {
let inserter = DistInserter::new(
request.catalog_name.clone(),
request.schema_name.clone(),
Arc::new(frontend_catalog_manager.clone()),
);
let affected_rows = inserter
.insert(vec![request])
.await
.map_err(BoxedError::new)
.context(TableOperationSnafu)
.context(InsertSnafu { table_name })?;
Ok(affected_rows as usize)
}
None => {
let table_ref = TableReference::full(
&request.catalog_name,
&request.schema_name,
&request.table_name,
);
let affected_rows = self
.get_table(&table_ref)
.await?
.insert(request)
.await
.context(InsertSnafu { table_name })?;
Ok(affected_rows)
}
}
let request = Inserter::convert_req_table_to_region(&table_info, request)?;
let region_response = self
.region_request_handler
.handle(region_request::Body::Inserts(request), query_ctx)
.await?;
Ok(region_response.affected_rows as _)
}
// TODO(zhongzc): A middle state that eliminates calls to table.delete,

View File

@@ -40,6 +40,7 @@ use datatypes::vectors::Helper;
use futures_util::StreamExt;
use object_store::{Entry, EntryMode, Metakey, ObjectStore};
use regex::Regex;
use session::context::QueryContextRef;
use snafu::ResultExt;
use table::engine::TableReference;
use table::requests::{CopyTableRequest, InsertRequest};
@@ -235,7 +236,11 @@ impl StatementExecutor {
}
}
pub async fn copy_table_from(&self, req: CopyTableRequest) -> Result<usize> {
pub async fn copy_table_from(
&self,
req: CopyTableRequest,
query_ctx: QueryContextRef,
) -> Result<usize> {
let table_ref = TableReference {
catalog: &req.catalog_name,
schema: &req.schema_name,
@@ -322,14 +327,17 @@ impl StatementExecutor {
.zip(vectors)
.collect::<HashMap<_, _>>();
pending.push(self.send_insert_request(InsertRequest {
catalog_name: req.catalog_name.to_string(),
schema_name: req.schema_name.to_string(),
table_name: req.table_name.to_string(),
columns_values,
//TODO: support multi-regions
region_number: 0,
}));
pending.push(self.handle_table_insert_request(
InsertRequest {
catalog_name: req.catalog_name.to_string(),
schema_name: req.schema_name.to_string(),
table_name: req.table_name.to_string(),
columns_values,
// TODO: support multi-regions
region_number: 0,
},
query_ctx.clone(),
));
if pending_mem_size as u64 >= pending_mem_threshold {
rows_inserted += batch_insert(&mut pending, &mut pending_mem_size).await?;

View File

@@ -74,7 +74,9 @@ impl StatementExecutor {
let record_batch = batch.context(ReadRecordBatchSnafu)?;
let insert_request =
build_insert_request(record_batch, table.schema(), &table_info)?;
affected_rows += self.send_insert_request(insert_request).await?;
affected_rows += self
.handle_table_insert_request(insert_request, query_ctx.clone())
.await?;
}
Ok(Output::AffectedRows(affected_rows))

View File

@@ -16,12 +16,10 @@ use std::collections::HashMap;
use api::helper::{push_vals, ColumnDataTypeWrapper};
use api::v1::column::Values;
use api::v1::{Column, InsertRequest as GrpcInsertRequest, SemanticType};
use api::v1::{Column, SemanticType};
use datatypes::prelude::*;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::RegionNumber;
use table::metadata::TableMeta;
use table::requests::InsertRequest;
use crate::error::{self, ColumnDataTypeSnafu, NotSupportedSnafu, Result, VectorToGrpcColumnSnafu};
@@ -55,21 +53,6 @@ pub(crate) fn to_grpc_columns(
Ok((columns, row_count))
}
pub(crate) fn to_grpc_insert_request(
table_meta: &TableMeta,
region_number: RegionNumber,
insert: InsertRequest,
) -> Result<GrpcInsertRequest> {
let table_name = insert.table_name.clone();
let (columns, row_count) = to_grpc_columns(table_meta, &insert.columns_values)?;
Ok(GrpcInsertRequest {
table_name,
region_number,
columns,
row_count,
})
}
fn vector_to_grpc_column(
table_meta: &TableMeta,
column_name: &str,
@@ -114,19 +97,12 @@ fn vector_to_grpc_column(
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::ColumnDataType;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use datatypes::prelude::ScalarVectorBuilder;
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::vectors::{
Int16VectorBuilder, Int32Vector, Int64Vector, MutableVector, StringVector,
StringVectorBuilder,
};
use datatypes::vectors::{Int32Vector, Int64Vector, StringVector};
use table::metadata::TableMetaBuilder;
use table::requests::InsertRequest;
use super::*;
@@ -189,75 +165,4 @@ mod tests {
assert_eq!(column.null_mask, vec![2]);
assert_eq!(column.datatype, ColumnDataType::String as i32);
}
#[test]
fn test_to_grpc_insert_request() {
let schema = Schema::new(vec![
ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), false)
.with_time_index(true),
ColumnSchema::new("id", ConcreteDataType::int16_datatype(), false),
ColumnSchema::new("host", ConcreteDataType::string_datatype(), false),
]);
let table_meta = TableMetaBuilder::default()
.schema(Arc::new(schema))
.primary_key_indices(vec![])
.next_column_id(3)
.build()
.unwrap();
let insert_request = mock_insert_request();
let request = to_grpc_insert_request(&table_meta, 12, insert_request).unwrap();
verify_grpc_insert_request(request);
}
fn mock_insert_request() -> InsertRequest {
let mut builder = StringVectorBuilder::with_capacity(3);
builder.push(Some("host1"));
builder.push(None);
builder.push(Some("host3"));
let host = builder.to_vector();
let mut builder = Int16VectorBuilder::with_capacity(3);
builder.push(Some(1_i16));
builder.push(Some(2_i16));
builder.push(Some(3_i16));
let id = builder.to_vector();
let columns_values = HashMap::from([("host".to_string(), host), ("id".to_string(), id)]);
InsertRequest {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "demo".to_string(),
columns_values,
region_number: 0,
}
}
fn verify_grpc_insert_request(request: GrpcInsertRequest) {
let table_name = request.table_name;
assert_eq!("demo", table_name);
for column in request.columns {
let name = column.column_name;
if name == "id" {
assert_eq!(0, column.null_mask[0]);
assert_eq!(ColumnDataType::Int16 as i32, column.datatype);
assert_eq!(vec![1, 2, 3], column.values.as_ref().unwrap().i16_values);
}
if name == "host" {
assert_eq!(2, column.null_mask[0]);
assert_eq!(ColumnDataType::String as i32, column.datatype);
assert_eq!(
vec!["host1", "host3"],
column.values.as_ref().unwrap().string_values
);
}
}
let region_number = request.region_number;
assert_eq!(12, region_number);
}
}

View File

@@ -15,25 +15,24 @@
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use api::v1::RowInsertRequest;
use api::v1::region::InsertRequest;
use common_meta::peer::Peer;
use common_meta::rpc::router::TableRoute;
use common_query::prelude::Expr;
use datafusion_expr::{BinaryExpr, Expr as DfExpr, Operator};
use datatypes::prelude::Value;
use datatypes::schema::Schema;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::{RegionId, RegionNumber};
use table::metadata::TableId;
use table::requests::{DeleteRequest, InsertRequest};
use table::requests::DeleteRequest;
use crate::columns::RangeColumnsPartitionRule;
use crate::error::{FindLeaderSnafu, Result};
use crate::partition::{PartitionBound, PartitionDef, PartitionExpr};
use crate::range::RangePartitionRule;
use crate::route::TableRoutes;
use crate::row_splitter::{RowInsertRequestSplits, RowSplitter};
use crate::splitter::{DeleteRequestSplit, InsertRequestSplit, WriteSplitter};
use crate::row_splitter::{InsertRequestSplits, RowSplitter};
use crate::splitter::{DeleteRequestSplit, WriteSplitter};
use crate::{error, PartitionRuleRef};
#[async_trait::async_trait]
@@ -236,26 +235,13 @@ impl PartitionRuleManager {
Ok(regions)
}
/// Split [InsertRequest] into [InsertRequestSplit] according to the partition rule
/// Split [InsertRequest] into [InsertRequestSplits] according to the partition rule
/// of given table.
pub async fn split_insert_request(
&self,
table: TableId,
req: InsertRequest,
schema: &Schema,
) -> Result<InsertRequestSplit> {
let partition_rule = self.find_table_partition_rule(table).await?;
let splitter = WriteSplitter::with_partition_rule(partition_rule);
splitter.split_insert(req, schema)
}
/// Split [RowInsertRequest] into [RowInsertRequestSplits] according to the partition rule
/// of given table.
pub async fn split_row_insert_request(
&self,
table: TableId,
req: RowInsertRequest,
) -> Result<RowInsertRequestSplits> {
) -> Result<InsertRequestSplits> {
let partition_rule = self.find_table_partition_rule(table).await?;
RowSplitter::new(partition_rule).split(req)
}

View File

@@ -15,14 +15,15 @@
use std::collections::HashMap;
use api::helper;
use api::v1::{ColumnSchema, Row, RowInsertRequest, Rows};
use api::v1::region::InsertRequest;
use api::v1::{ColumnSchema, Row, Rows};
use datatypes::value::Value;
use store_api::storage::RegionNumber;
use store_api::storage::{RegionId, RegionNumber, TableId};
use crate::error::Result;
use crate::PartitionRuleRef;
pub type RowInsertRequestSplits = HashMap<RegionNumber, RowInsertRequest>;
pub type InsertRequestSplits = HashMap<RegionNumber, InsertRequest>;
pub struct RowSplitter {
partition_rule: PartitionRuleRef,
@@ -33,7 +34,7 @@ impl RowSplitter {
Self { partition_rule }
}
pub fn split(&self, req: RowInsertRequest) -> Result<RowInsertRequestSplits> {
pub fn split(&self, req: InsertRequest) -> Result<InsertRequestSplits> {
// No partition
let partition_columns = self.partition_rule.partition_columns();
if partition_columns.is_empty() {
@@ -45,12 +46,13 @@ impl RowSplitter {
return Ok(HashMap::new());
};
SplitReadRowHelper::new(req.table_name, rows, &self.partition_rule).split_to_requests()
let table_id = RegionId::from_u64(req.region_id).table_id();
SplitReadRowHelper::new(table_id, rows, &self.partition_rule).split_to_requests()
}
}
struct SplitReadRowHelper<'a> {
table_name: String,
table_id: TableId,
schema: Vec<ColumnSchema>,
rows: Vec<Row>,
partition_rule: &'a PartitionRuleRef,
@@ -59,7 +61,7 @@ struct SplitReadRowHelper<'a> {
}
impl<'a> SplitReadRowHelper<'a> {
fn new(table_name: String, rows: Rows, partition_rule: &'a PartitionRuleRef) -> Self {
fn new(table_id: TableId, rows: Rows, partition_rule: &'a PartitionRuleRef) -> Self {
let col_name_to_idx = rows
.schema
.iter()
@@ -73,7 +75,7 @@ impl<'a> SplitReadRowHelper<'a> {
.collect::<Vec<_>>();
Self {
table_name,
table_id,
schema: rows.schema,
rows: rows.rows,
partition_rule,
@@ -81,7 +83,7 @@ impl<'a> SplitReadRowHelper<'a> {
}
}
fn split_to_requests(mut self) -> Result<RowInsertRequestSplits> {
fn split_to_requests(mut self) -> Result<InsertRequestSplits> {
let request_splits = self
.split_to_regions()?
.into_iter()
@@ -90,13 +92,12 @@ impl<'a> SplitReadRowHelper<'a> {
.into_iter()
.map(|row_idx| std::mem::take(&mut self.rows[row_idx]))
.collect();
let req = RowInsertRequest {
table_name: self.table_name.clone(),
let req = InsertRequest {
rows: Some(Rows {
schema: self.schema.clone(),
rows,
}),
region_number,
region_id: RegionId::new(self.table_id, region_number).into(),
};
(region_number, req)
})
@@ -145,7 +146,7 @@ mod tests {
use crate::partition::PartitionExpr;
use crate::PartitionRule;
fn mock_insert_request() -> RowInsertRequest {
fn mock_insert_request() -> InsertRequest {
let schema = vec![
ColumnSchema {
column_name: "id".to_string(),
@@ -186,10 +187,9 @@ mod tests {
],
},
];
RowInsertRequest {
table_name: "t".to_string(),
InsertRequest {
rows: Some(Rows { schema, rows }),
region_number: 0,
region_id: 0,
}
}
@@ -279,8 +279,8 @@ mod tests {
let req0 = &splits[&0];
let req1 = &splits[&1];
assert_eq!(req0.region_number, 0);
assert_eq!(req1.region_number, 1);
assert_eq!(req0.region_id, 0);
assert_eq!(req1.region_id, 1);
let rows0 = req0.rows.as_ref().unwrap();
let rows1 = req1.rows.as_ref().unwrap();
@@ -298,7 +298,7 @@ mod tests {
assert_eq!(splits.len(), 1);
let req = &splits[&1];
assert_eq!(req.region_number, 1);
assert_eq!(req.region_id, 1);
let rows = req.rows.as_ref().unwrap();
assert_eq!(rows.rows.len(), 3);
@@ -314,7 +314,7 @@ mod tests {
assert_eq!(splits.len(), 1);
let req = &splits[&0];
assert_eq!(req.region_number, 0);
assert_eq!(req.region_id, 0);
let rows = req.rows.as_ref().unwrap();
assert_eq!(rows.rows.len(), 3);