feat: batch create ddl (#3194)

* feat: batch ddl to region request

* feat: return table ids

chore: by comment

chore: remove wal_options

chore: create logical tables lock key

feat: get metadata in procedure

* chore: by comment
This commit is contained in:
JeremyHi
2024-01-26 10:43:57 +08:00
committed by GitHub
parent 3fa070a0cc
commit 1fab7ab75a
21 changed files with 1438 additions and 263 deletions

2
Cargo.lock generated
View File

@@ -3651,7 +3651,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=65b008f018395f8fa917a7d3c7883b82f309cb74#65b008f018395f8fa917a7d3c7883b82f309cb74"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=519b1d0757404c8ff1eeb2a68d29f5ade54a1752#519b1d0757404c8ff1eeb2a68d29f5ade54a1752"
dependencies = [
"prost 0.12.3",
"serde",

View File

@@ -95,7 +95,7 @@ etcd-client = "0.12"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "65b008f018395f8fa917a7d3c7883b82f309cb74" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "519b1d0757404c8ff1eeb2a68d29f5ade54a1752" }
humantime-serde = "1.1"
itertools = "0.10"
lazy_static = "1.4"

View File

@@ -27,7 +27,9 @@ use crate::region_keeper::MemoryRegionKeeperRef;
use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
pub mod alter_table;
pub mod create_logical_tables;
pub mod create_table;
mod create_table_template;
pub mod drop_table;
pub mod table_meta;
pub mod truncate_table;

View File

@@ -0,0 +1,397 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use api::v1::region::region_request::Body as PbRegionRequest;
use api::v1::region::{CreateRequests, RegionRequest, RegionRequestHeader};
use api::v1::CreateTableExpr;
use async_trait::async_trait;
use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
use common_telemetry::info;
use common_telemetry::tracing_context::TracingContext;
use futures_util::future::join_all;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
use store_api::storage::{RegionId, RegionNumber};
use strum::AsRefStr;
use table::metadata::{RawTableInfo, TableId};
use crate::ddl::create_table_template::{build_template, CreateRequestBuilder};
use crate::ddl::utils::{handle_operate_region_error, handle_retry_error, region_storage_path};
use crate::ddl::DdlContext;
use crate::error::{Result, TableAlreadyExistsSnafu};
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::lock_key::{TableLock, TableNameLock};
use crate::peer::Peer;
use crate::rpc::ddl::CreateTableTask;
use crate::rpc::router::{find_leader_regions, find_leaders, RegionRoute};
use crate::{metrics, ClusterId};
pub struct CreateLogicalTablesProcedure {
pub context: DdlContext,
pub creator: TablesCreator,
}
impl CreateLogicalTablesProcedure {
pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateLogicalTables";
pub fn new(
cluster_id: ClusterId,
tasks: Vec<CreateTableTask>,
physical_table_id: TableId,
context: DdlContext,
) -> Self {
let creator = TablesCreator::new(cluster_id, tasks, physical_table_id);
Self { context, creator }
}
pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
let data = serde_json::from_str(json).context(FromJsonSnafu)?;
let creator = TablesCreator { data };
Ok(Self { context, creator })
}
async fn on_prepare(&mut self) -> Result<Status> {
let manager = &self.context.table_metadata_manager;
// Sets physical region numbers
let physical_table_id = self.creator.data.physical_table_id();
let physical_region_numbers = manager
.table_route_manager()
.get_physical_table_route(physical_table_id)
.await
.map(|(_, route)| TableRouteValue::Physical(route).region_numbers())?;
self.creator
.data
.set_physical_region_numbers(physical_region_numbers);
// Checks if the tables exists
let table_name_keys = self
.creator
.data
.all_create_table_exprs()
.iter()
.map(|expr| TableNameKey::new(&expr.catalog_name, &expr.schema_name, &expr.table_name))
.collect::<Vec<_>>();
let already_exists_tables_ids = manager
.table_name_manager()
.batch_get(table_name_keys)
.await?
.iter()
.map(|x| x.map(|x| x.table_id()))
.collect::<Vec<_>>();
// Sets table ids already exists
self.creator
.data
.set_table_ids_already_exists(already_exists_tables_ids);
// If all tables do not exists, we can create them directly.
if self.creator.data.is_all_tables_not_exists() {
self.creator.data.state = CreateTablesState::DatanodeCreateRegions;
return Ok(Status::executing(true));
}
// Filter out the tables that already exist.
let tasks = &self.creator.data.tasks;
let mut filtered_tasks = Vec::with_capacity(tasks.len());
for (task, table_id) in tasks
.iter()
.zip(self.creator.data.table_ids_already_exists().iter())
{
if table_id.is_some() {
// If a table already exists, we just ignore it.
ensure!(
task.create_table.create_if_not_exists,
TableAlreadyExistsSnafu {
table_name: task.create_table.table_name.to_string(),
}
);
continue;
}
filtered_tasks.push(task.clone());
}
// Resets tasks
self.creator.data.tasks = filtered_tasks;
if self.creator.data.tasks.is_empty() {
// If all tables already exist, we can skip the `DatanodeCreateRegions` stage.
self.creator.data.state = CreateTablesState::CreateMetadata;
return Ok(Status::executing(true));
}
self.creator.data.state = CreateTablesState::DatanodeCreateRegions;
Ok(Status::executing(true))
}
pub async fn on_datanode_create_regions(&mut self) -> Result<Status> {
let physical_table_id = self.creator.data.physical_table_id();
let (_, physical_table_route) = self
.context
.table_metadata_manager
.table_route_manager()
.get_physical_table_route(physical_table_id)
.await?;
let region_routes = &physical_table_route.region_routes;
self.create_regions(region_routes).await
}
pub async fn on_create_metadata(&self) -> Result<Status> {
let manager = &self.context.table_metadata_manager;
let physical_table_id = self.creator.data.physical_table_id();
let tables_data = self.creator.data.all_tables_data();
let num_tables = tables_data.len();
if num_tables > 0 {
manager.create_logic_tables_metadata(tables_data).await?;
}
info!("Created {num_tables} tables metadata for physical table {physical_table_id}");
Ok(Status::done_with_output(self.creator.data.real_table_ids()))
}
fn create_region_request_builder(
&self,
physical_table_id: TableId,
task: &CreateTableTask,
) -> Result<CreateRequestBuilder> {
let create_expr = &task.create_table;
let template = build_template(create_expr)?;
Ok(CreateRequestBuilder::new(template, Some(physical_table_id)))
}
fn one_datanode_region_requests(
&self,
datanode: &Peer,
region_routes: &[RegionRoute],
) -> Result<CreateRequests> {
let create_tables_data = &self.creator.data;
let tasks = &create_tables_data.tasks;
let physical_table_id = create_tables_data.physical_table_id();
let regions = find_leader_regions(region_routes, datanode);
let mut requests = Vec::with_capacity(tasks.len() * regions.len());
for task in tasks {
let create_table_expr = &task.create_table;
let catalog = &create_table_expr.catalog_name;
let schema = &create_table_expr.schema_name;
let logical_table_id = task.table_info.ident.table_id;
let storage_path = region_storage_path(catalog, schema);
let request_builder = self.create_region_request_builder(physical_table_id, task)?;
for region_number in &regions {
let region_id = RegionId::new(logical_table_id, *region_number);
let create_region_request =
request_builder.build_one(region_id, storage_path.clone(), &HashMap::new())?;
requests.push(create_region_request);
}
}
Ok(CreateRequests { requests })
}
async fn create_regions(&mut self, region_routes: &[RegionRoute]) -> Result<Status> {
let leaders = find_leaders(region_routes);
let mut create_region_tasks = Vec::with_capacity(leaders.len());
for datanode in leaders {
let requester = self.context.datanode_manager.datanode(&datanode).await;
let creates = self.one_datanode_region_requests(&datanode, region_routes)?;
let request = RegionRequest {
header: Some(RegionRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
..Default::default()
}),
body: Some(PbRegionRequest::Creates(creates)),
};
create_region_tasks.push(async move {
if let Err(err) = requester.handle(request).await {
return Err(handle_operate_region_error(datanode)(err));
}
Ok(())
});
}
join_all(create_region_tasks)
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;
self.creator.data.state = CreateTablesState::CreateMetadata;
// Ensures the procedures after the crash start from the `DatanodeCreateRegions` stage.
Ok(Status::executing(false))
}
}
#[async_trait]
impl Procedure for CreateLogicalTablesProcedure {
fn type_name(&self) -> &str {
Self::TYPE_NAME
}
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &self.creator.data.state;
let _timer = metrics::METRIC_META_PROCEDURE_CREATE_TABLES
.with_label_values(&[state.as_ref()])
.start_timer();
match state {
CreateTablesState::Prepare => self.on_prepare().await,
CreateTablesState::DatanodeCreateRegions => self.on_datanode_create_regions().await,
CreateTablesState::CreateMetadata => self.on_create_metadata().await,
}
.map_err(handle_retry_error)
}
fn dump(&self) -> ProcedureResult<String> {
serde_json::to_string(&self.creator.data).context(ToJsonSnafu)
}
fn lock_key(&self) -> LockKey {
let mut lock_key = Vec::with_capacity(1 + self.creator.data.tasks.len());
lock_key.push(TableLock::Write(self.creator.data.physical_table_id()).into());
for task in &self.creator.data.tasks {
lock_key.push(
TableNameLock::new(
&task.create_table.catalog_name,
&task.create_table.schema_name,
&task.create_table.table_name,
)
.into(),
);
}
LockKey::new(lock_key)
}
}
pub struct TablesCreator {
/// The serializable data.
pub data: CreateTablesData,
}
impl TablesCreator {
pub fn new(
cluster_id: ClusterId,
tasks: Vec<CreateTableTask>,
physical_table_id: TableId,
) -> Self {
let table_ids_from_tasks = tasks
.iter()
.map(|task| task.table_info.ident.table_id)
.collect::<Vec<_>>();
let len = table_ids_from_tasks.len();
Self {
data: CreateTablesData {
cluster_id,
state: CreateTablesState::Prepare,
tasks,
table_ids_from_tasks,
table_ids_already_exists: vec![None; len],
physical_table_id,
physical_region_numbers: vec![],
},
}
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct CreateTablesData {
cluster_id: ClusterId,
state: CreateTablesState,
tasks: Vec<CreateTableTask>,
table_ids_from_tasks: Vec<TableId>,
// Because the table_id is allocated before entering the distributed lock,
// it needs to recheck if the table exists when creating a table.
// If it does exist, then the table_id needs to be replaced with the existing one.
table_ids_already_exists: Vec<Option<TableId>>,
physical_table_id: TableId,
physical_region_numbers: Vec<RegionNumber>,
}
impl CreateTablesData {
pub fn state(&self) -> &CreateTablesState {
&self.state
}
fn physical_table_id(&self) -> TableId {
self.physical_table_id
}
fn set_physical_region_numbers(&mut self, physical_region_numbers: Vec<RegionNumber>) {
self.physical_region_numbers = physical_region_numbers;
}
fn set_table_ids_already_exists(&mut self, table_ids_already_exists: Vec<Option<TableId>>) {
self.table_ids_already_exists = table_ids_already_exists;
}
fn table_ids_already_exists(&self) -> &[Option<TableId>] {
&self.table_ids_already_exists
}
fn is_all_tables_not_exists(&self) -> bool {
self.table_ids_already_exists.iter().all(Option::is_none)
}
pub fn real_table_ids(&self) -> Vec<TableId> {
self.table_ids_from_tasks
.iter()
.zip(self.table_ids_already_exists.iter())
.map(|(table_id_from_task, table_id_already_exists)| {
table_id_already_exists.unwrap_or(*table_id_from_task)
})
.collect::<Vec<_>>()
}
fn all_create_table_exprs(&self) -> Vec<&CreateTableExpr> {
self.tasks
.iter()
.map(|task| &task.create_table)
.collect::<Vec<_>>()
}
fn all_tables_data(&self) -> Vec<(RawTableInfo, TableRouteValue)> {
self.tasks
.iter()
.map(|task| {
let table_info = task.table_info.clone();
let region_ids = self
.physical_region_numbers
.iter()
.map(|region_number| RegionId::new(table_info.ident.table_id, *region_number))
.collect();
let table_route = TableRouteValue::logical(self.physical_table_id, region_ids);
(table_info, table_route)
})
.collect::<Vec<_>>()
}
}
#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr)]
pub enum CreateTablesState {
/// Prepares to create the tables
Prepare,
/// Creates regions on the Datanode
DatanodeCreateRegions,
/// Creates metadata
CreateMetadata,
}

View File

@@ -15,10 +15,7 @@
use std::collections::HashMap;
use api::v1::region::region_request::Body as PbRegionRequest;
use api::v1::region::{
CreateRequest as PbCreateRegionRequest, RegionColumnDef, RegionRequest, RegionRequestHeader,
};
use api::v1::{ColumnDef, SemanticType};
use api::v1::region::{RegionRequest, RegionRequestHeader};
use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_procedure::error::{
@@ -30,25 +27,24 @@ use common_telemetry::tracing_context::TracingContext;
use futures::future::join_all;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY;
use store_api::storage::{RegionId, RegionNumber};
use strum::AsRefStr;
use table::metadata::{RawTableInfo, TableId};
use table::table_reference::TableReference;
use crate::ddl::create_table_template::{build_template, CreateRequestBuilder};
use crate::ddl::utils::{handle_operate_region_error, handle_retry_error, region_storage_path};
use crate::ddl::DdlContext;
use crate::error::{self, Result, TableRouteNotFoundSnafu};
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::lock_key::TableNameLock;
use crate::metrics;
use crate::region_keeper::OperatingRegionGuard;
use crate::rpc::ddl::CreateTableTask;
use crate::rpc::router::{
find_leader_regions, find_leaders, operating_leader_regions, RegionRoute,
};
use crate::wal_options_allocator::prepare_wal_options;
use crate::{metrics, ClusterId};
pub struct CreateTableProcedure {
pub context: DdlContext,
@@ -59,7 +55,7 @@ impl CreateTableProcedure {
pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateTable";
pub fn new(
cluster_id: u64,
cluster_id: ClusterId,
task: CreateTableTask,
table_route: TableRouteValue,
region_wal_options: HashMap<RegionNumber, String>,
@@ -117,7 +113,7 @@ impl CreateTableProcedure {
if let Some(value) = table_name_value {
ensure!(
self.creator.data.task.create_table.create_if_not_exists,
expr.create_if_not_exists,
error::TableAlreadyExistsSnafu {
table_name: self.creator.data.table_ref().to_string(),
}
@@ -137,67 +133,8 @@ impl CreateTableProcedure {
physical_table_id: Option<TableId>,
) -> Result<CreateRequestBuilder> {
let create_table_expr = &self.creator.data.task.create_table;
let column_defs = create_table_expr
.column_defs
.iter()
.enumerate()
.map(|(i, c)| {
let semantic_type = if create_table_expr.time_index == c.name {
SemanticType::Timestamp
} else if create_table_expr.primary_keys.contains(&c.name) {
SemanticType::Tag
} else {
SemanticType::Field
};
RegionColumnDef {
column_def: Some(ColumnDef {
name: c.name.clone(),
data_type: c.data_type,
is_nullable: c.is_nullable,
default_constraint: c.default_constraint.clone(),
semantic_type: semantic_type as i32,
comment: String::new(),
datatype_extension: c.datatype_extension.clone(),
}),
column_id: i as u32,
}
})
.collect::<Vec<_>>();
let primary_key = create_table_expr
.primary_keys
.iter()
.map(|key| {
column_defs
.iter()
.find_map(|c| {
c.column_def.as_ref().and_then(|x| {
if &x.name == key {
Some(c.column_id)
} else {
None
}
})
})
.context(error::PrimaryKeyNotFoundSnafu { key })
})
.collect::<Result<_>>()?;
let template = PbCreateRegionRequest {
region_id: 0,
engine: create_table_expr.engine.to_string(),
column_defs,
primary_key,
path: String::new(),
options: create_table_expr.table_options.clone(),
};
Ok(CreateRequestBuilder {
template,
physical_table_id,
})
let template = build_template(create_table_expr)?;
Ok(CreateRequestBuilder::new(template, physical_table_id))
}
pub async fn on_datanode_create_regions(&mut self) -> Result<Status> {
@@ -261,9 +198,11 @@ impl CreateTableProcedure {
let mut requests = Vec::with_capacity(regions.len());
for region_number in regions {
let region_id = RegionId::new(self.table_id(), region_number);
let create_region_request = request_builder
.build_one(region_id, storage_path.clone(), region_wal_options)
.await?;
let create_region_request = request_builder.build_one(
region_id,
storage_path.clone(),
region_wal_options,
)?;
requests.push(PbRegionRequest::Create(create_region_request));
}
@@ -426,7 +365,7 @@ pub struct CreateTableData {
pub task: CreateTableTask,
table_route: TableRouteValue,
pub region_wal_options: HashMap<RegionNumber, String>,
pub cluster_id: u64,
pub cluster_id: ClusterId,
}
impl CreateTableData {
@@ -434,44 +373,3 @@ impl CreateTableData {
self.task.table_ref()
}
}
/// Builder for [PbCreateRegionRequest].
pub struct CreateRequestBuilder {
template: PbCreateRegionRequest,
/// Optional. Only for metric engine.
physical_table_id: Option<TableId>,
}
impl CreateRequestBuilder {
pub fn template(&self) -> &PbCreateRegionRequest {
&self.template
}
async fn build_one(
&self,
region_id: RegionId,
storage_path: String,
region_wal_options: &HashMap<RegionNumber, String>,
) -> Result<PbCreateRegionRequest> {
let mut request = self.template.clone();
request.region_id = region_id.as_u64();
request.path = storage_path;
// Stores the encoded wal options into the request options.
prepare_wal_options(&mut request.options, region_id, region_wal_options);
if let Some(physical_table_id) = self.physical_table_id {
// Logical table has the same region numbers with physical table, and they have a one-to-one mapping.
// For example, region 0 of logical table must resides with region 0 of physical table. So here we can
// simply concat the physical table id and the logical region number to get the physical region id.
let physical_region_id = RegionId::new(physical_table_id, region_id.region_number());
request.options.insert(
LOGICAL_TABLE_METADATA_KEY.to_string(),
physical_region_id.as_u64().to_string(),
);
}
Ok(request)
}
}

View File

@@ -0,0 +1,134 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use api::v1::region::{CreateRequest, RegionColumnDef};
use api::v1::{ColumnDef, CreateTableExpr, SemanticType};
use snafu::OptionExt;
use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY;
use store_api::storage::{RegionId, RegionNumber};
use table::metadata::TableId;
use crate::error;
use crate::error::Result;
use crate::wal_options_allocator::prepare_wal_options;
pub(crate) fn build_template(create_table_expr: &CreateTableExpr) -> Result<CreateRequest> {
let column_defs = create_table_expr
.column_defs
.iter()
.enumerate()
.map(|(i, c)| {
let semantic_type = if create_table_expr.time_index == c.name {
SemanticType::Timestamp
} else if create_table_expr.primary_keys.contains(&c.name) {
SemanticType::Tag
} else {
SemanticType::Field
};
RegionColumnDef {
column_def: Some(ColumnDef {
name: c.name.clone(),
data_type: c.data_type,
is_nullable: c.is_nullable,
default_constraint: c.default_constraint.clone(),
semantic_type: semantic_type as i32,
comment: String::new(),
datatype_extension: c.datatype_extension.clone(),
}),
column_id: i as u32,
}
})
.collect::<Vec<_>>();
let primary_key = create_table_expr
.primary_keys
.iter()
.map(|key| {
column_defs
.iter()
.find_map(|c| {
c.column_def.as_ref().and_then(|x| {
if &x.name == key {
Some(c.column_id)
} else {
None
}
})
})
.context(error::PrimaryKeyNotFoundSnafu { key })
})
.collect::<Result<_>>()?;
let template = CreateRequest {
region_id: 0,
engine: create_table_expr.engine.to_string(),
column_defs,
primary_key,
path: String::new(),
options: create_table_expr.table_options.clone(),
};
Ok(template)
}
/// Builder for [PbCreateRegionRequest].
pub struct CreateRequestBuilder {
template: CreateRequest,
/// Optional. Only for metric engine.
physical_table_id: Option<TableId>,
}
impl CreateRequestBuilder {
pub(crate) fn new(template: CreateRequest, physical_table_id: Option<TableId>) -> Self {
Self {
template,
physical_table_id,
}
}
pub fn template(&self) -> &CreateRequest {
&self.template
}
pub(crate) fn build_one(
&self,
region_id: RegionId,
storage_path: String,
region_wal_options: &HashMap<RegionNumber, String>,
) -> Result<CreateRequest> {
let mut request = self.template.clone();
request.region_id = region_id.as_u64();
request.path = storage_path;
// Stores the encoded wal options into the request options.
prepare_wal_options(&mut request.options, region_id, region_wal_options);
if let Some(physical_table_id) = self.physical_table_id {
// Logical table has the same region numbers with physical table, and they have a one-to-one mapping.
// For example, region 0 of logical table must resides with region 0 of physical table. So here we can
// simply concat the physical table id and the logical region number to get the physical region id.
let physical_region_id = RegionId::new(physical_table_id, region_id.region_number());
request.options.insert(
LOGICAL_TABLE_METADATA_KEY.to_string(),
physical_region_id.as_u64().to_string(),
);
}
Ok(request)
}
}

View File

@@ -197,6 +197,15 @@ impl TableMetadataAllocator {
region_wal_options,
})
}
/// Sets table ids with all tasks.
pub async fn set_table_ids_on_logic_create(&self, tasks: &mut [CreateTableTask]) -> Result<()> {
for task in tasks {
let table_id = self.allocate_table_id(task).await?;
task.table_info.ident.table_id = table_id;
}
Ok(())
}
}
pub type PeerAllocatorRef = Arc<dyn PeerAllocator>;

View File

@@ -12,21 +12,29 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_catalog::consts::METRIC_ENGINE;
use common_error::ext::BoxedError;
use common_procedure::error::Error as ProcedureError;
use snafu::{location, Location};
use snafu::{ensure, location, Location, OptionExt};
use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY;
use table::metadata::TableId;
use crate::error::{self, Error};
use crate::error::{
EmptyCreateTableTasksSnafu, Error, Result, TableNotFoundSnafu, UnsupportedSnafu,
};
use crate::key::table_name::TableNameKey;
use crate::key::TableMetadataManagerRef;
use crate::peer::Peer;
use crate::rpc::ddl::CreateTableTask;
pub fn handle_operate_region_error(datanode: Peer) -> impl FnOnce(crate::error::Error) -> Error {
pub fn handle_operate_region_error(datanode: Peer) -> impl FnOnce(Error) -> Error {
move |err| {
if matches!(err, crate::error::Error::RetryLater { .. }) {
error::Error::RetryLater {
if matches!(err, Error::RetryLater { .. }) {
Error::RetryLater {
source: BoxedError::new(err),
}
} else {
error::Error::OperateDatanode {
Error::OperateDatanode {
location: location!(),
peer: datanode,
source: BoxedError::new(err),
@@ -47,3 +55,58 @@ pub fn handle_retry_error(e: Error) -> ProcedureError {
pub fn region_storage_path(catalog: &str, schema: &str) -> String {
format!("{}/{}", catalog, schema)
}
pub async fn check_and_get_physical_table_id(
table_metadata_manager: &TableMetadataManagerRef,
tasks: &[CreateTableTask],
) -> Result<TableId> {
let mut physical_table_name = None;
for task in tasks {
ensure!(
task.create_table.engine == METRIC_ENGINE,
UnsupportedSnafu {
operation: format!("create table with engine {}", task.create_table.engine)
}
);
let current_physical_table_name = task
.create_table
.table_options
.get(LOGICAL_TABLE_METADATA_KEY)
.context(UnsupportedSnafu {
operation: format!(
"create table without table options {}",
LOGICAL_TABLE_METADATA_KEY,
),
})?;
let current_physical_table_name = TableNameKey::new(
&task.create_table.catalog_name,
&task.create_table.schema_name,
current_physical_table_name,
);
physical_table_name = match physical_table_name {
Some(name) => {
ensure!(
name == current_physical_table_name,
UnsupportedSnafu {
operation: format!(
"create table with different physical table name {} and {}",
name, current_physical_table_name
)
}
);
Some(name)
}
None => Some(current_physical_table_name),
};
}
let physical_table_name = physical_table_name.context(EmptyCreateTableTasksSnafu)?;
table_metadata_manager
.table_name_manager()
.get(physical_table_name)
.await?
.context(TableNotFoundSnafu {
table_name: physical_table_name.to_string(),
})
.map(|table| table.table_id())
}

View File

@@ -18,35 +18,41 @@ use std::sync::Arc;
use common_procedure::{watcher, Output, ProcedureId, ProcedureManagerRef, ProcedureWithId};
use common_telemetry::tracing_context::{FutureExt, TracingContext};
use common_telemetry::{info, tracing};
use snafu::{OptionExt, ResultExt};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::{RegionNumber, TableId};
use crate::cache_invalidator::CacheInvalidatorRef;
use crate::datanode_manager::DatanodeManagerRef;
use crate::ddl::alter_table::AlterTableProcedure;
use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
use crate::ddl::create_table::CreateTableProcedure;
use crate::ddl::drop_table::DropTableProcedure;
use crate::ddl::table_meta::TableMetadataAllocator;
use crate::ddl::truncate_table::TruncateTableProcedure;
use crate::ddl::{
DdlContext, DdlTaskExecutor, ExecutorContext, TableMetadata, TableMetadataAllocatorContext,
utils, DdlContext, DdlTaskExecutor, ExecutorContext, TableMetadata,
TableMetadataAllocatorContext,
};
use crate::error::{
self, RegisterProcedureLoaderSnafu, Result, SubmitProcedureSnafu, TableNotFoundSnafu,
WaitProcedureSnafu,
self, EmptyCreateTableTasksSnafu, ProcedureOutputSnafu, RegisterProcedureLoaderSnafu, Result,
SubmitProcedureSnafu, TableNotFoundSnafu, WaitProcedureSnafu,
};
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
use crate::region_keeper::MemoryRegionKeeperRef;
use crate::rpc::ddl::DdlTask::{AlterTable, CreateTable, DropTable, TruncateTable};
use crate::rpc::ddl::DdlTask::{
AlterLogicalTables, AlterTable, CreateLogicalTables, CreateTable, DropLogicalTables, DropTable,
TruncateTable,
};
use crate::rpc::ddl::{
AlterTableTask, CreateTableTask, DropTableTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse,
TruncateTableTask,
};
use crate::rpc::router::RegionRoute;
use crate::table_name::TableName;
use crate::ClusterId;
pub type DdlManagerRef = Arc<DdlManager>;
@@ -114,6 +120,20 @@ impl DdlManager {
let context = self.create_context();
self.procedure_manager
.register_loader(
CreateLogicalTablesProcedure::TYPE_NAME,
Box::new(move |json| {
let context = context.clone();
CreateLogicalTablesProcedure::from_json(json, context).map(|p| Box::new(p) as _)
}),
)
.context(RegisterProcedureLoaderSnafu {
type_name: CreateLogicalTablesProcedure::TYPE_NAME,
})?;
let context = self.create_context();
self.procedure_manager
.register_loader(
DropTableProcedure::TYPE_NAME,
@@ -159,7 +179,7 @@ impl DdlManager {
/// Submits and executes an alter table task.
pub async fn submit_alter_table_task(
&self,
cluster_id: u64,
cluster_id: ClusterId,
alter_table_task: AlterTableTask,
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
physical_table_info: Option<(TableId, TableName)>,
@@ -183,7 +203,7 @@ impl DdlManager {
/// Submits and executes a create table task.
pub async fn submit_create_table_task(
&self,
cluster_id: u64,
cluster_id: ClusterId,
create_table_task: CreateTableTask,
table_route: TableRouteValue,
region_wal_options: HashMap<RegionNumber, String>,
@@ -203,11 +223,33 @@ impl DdlManager {
self.submit_procedure(procedure_with_id).await
}
#[tracing::instrument(skip_all)]
/// Submits and executes a create table task.
pub async fn submit_create_logical_table_tasks(
&self,
cluster_id: ClusterId,
create_table_tasks: Vec<CreateTableTask>,
physical_table_id: TableId,
) -> Result<(ProcedureId, Option<Output>)> {
let context = self.create_context();
let procedure = CreateLogicalTablesProcedure::new(
cluster_id,
create_table_tasks,
physical_table_id,
context,
);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
self.submit_procedure(procedure_with_id).await
}
#[tracing::instrument(skip_all)]
/// Submits and executes a drop table task.
pub async fn submit_drop_table_task(
&self,
cluster_id: u64,
cluster_id: ClusterId,
drop_table_task: DropTableTask,
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
table_route_value: DeserializedValueWithBytes<TableRouteValue>,
@@ -231,7 +273,7 @@ impl DdlManager {
/// Submits and executes a truncate table task.
pub async fn submit_truncate_table_task(
&self,
cluster_id: u64,
cluster_id: ClusterId,
truncate_table_task: TruncateTableTask,
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
region_routes: Vec<RegionRoute>,
@@ -272,7 +314,7 @@ impl DdlManager {
async fn handle_truncate_table_task(
ddl_manager: &DdlManager,
cluster_id: u64,
cluster_id: ClusterId,
truncate_table_task: TruncateTableTask,
) -> Result<SubmitDdlTaskResponse> {
let table_id = truncate_table_task.table_id;
@@ -310,7 +352,7 @@ async fn handle_truncate_table_task(
async fn handle_alter_table_task(
ddl_manager: &DdlManager,
cluster_id: u64,
cluster_id: ClusterId,
alter_table_task: AlterTableTask,
) -> Result<SubmitDdlTaskResponse> {
let table_ref = alter_table_task.table_ref();
@@ -385,7 +427,7 @@ async fn handle_alter_table_task(
async fn handle_drop_table_task(
ddl_manager: &DdlManager,
cluster_id: u64,
cluster_id: ClusterId,
drop_table_task: DropTableTask,
) -> Result<SubmitDdlTaskResponse> {
let table_id = drop_table_task.table_id;
@@ -427,7 +469,7 @@ async fn handle_drop_table_task(
async fn handle_create_table_task(
ddl_manager: &DdlManager,
cluster_id: u64,
cluster_id: ClusterId,
mut create_table_task: CreateTableTask,
) -> Result<SubmitDdlTaskResponse> {
let table_meta = ddl_manager
@@ -454,14 +496,66 @@ async fn handle_create_table_task(
region_wal_options,
)
.await?;
let output = output.context(error::ProcedureOutputSnafu)?;
let table_id = *(output.downcast_ref::<u32>().unwrap());
let procedure_id = id.to_string();
let output = output.context(ProcedureOutputSnafu {
procedure_id: &procedure_id,
err_msg: "empty output",
})?;
let table_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
procedure_id: &procedure_id,
err_msg: "downcast to `u32`",
})?);
info!("Table: {table_id} is created via procedure_id {id:?}");
Ok(SubmitDdlTaskResponse {
key: id.to_string().into(),
key: procedure_id.into(),
table_id: Some(table_id),
..Default::default()
})
}
async fn handle_create_logical_table_tasks(
ddl_manager: &DdlManager,
cluster_id: ClusterId,
mut create_table_tasks: Vec<CreateTableTask>,
) -> Result<SubmitDdlTaskResponse> {
ensure!(!create_table_tasks.is_empty(), EmptyCreateTableTasksSnafu);
let physical_table_id = utils::check_and_get_physical_table_id(
&ddl_manager.table_metadata_manager,
&create_table_tasks,
)
.await?;
// Sets table_ids on create_table_tasks
ddl_manager
.table_metadata_allocator
.set_table_ids_on_logic_create(&mut create_table_tasks)
.await?;
let num_logical_tables = create_table_tasks.len();
let (id, output) = ddl_manager
.submit_create_logical_table_tasks(cluster_id, create_table_tasks, physical_table_id)
.await?;
info!("{num_logical_tables} logical tables on physical table: {physical_table_id:?} is created via procedure_id {id:?}");
let procedure_id = id.to_string();
let output = output.context(ProcedureOutputSnafu {
procedure_id: &procedure_id,
err_msg: "empty output",
})?;
let table_ids = output
.downcast_ref::<Vec<TableId>>()
.context(ProcedureOutputSnafu {
procedure_id: &procedure_id,
err_msg: "downcast to `Vec<TableId>`",
})?
.clone();
Ok(SubmitDdlTaskResponse {
key: procedure_id.into(),
table_ids,
..Default::default()
})
}
@@ -494,6 +588,11 @@ impl DdlTaskExecutor for DdlManager {
TruncateTable(truncate_table_task) => {
handle_truncate_table_task(self, cluster_id, truncate_table_task).await
}
CreateLogicalTables(create_table_tasks) => {
handle_create_logical_table_tasks(self, cluster_id, create_table_tasks).await
}
DropLogicalTables(_) => todo!(),
AlterLogicalTables(_) => todo!(),
}
}
.trace(span)

View File

@@ -112,15 +112,21 @@ pub enum Error {
source: common_procedure::Error,
},
#[snafu(display(
"Failed to get procedure output, procedure id: {procedure_id}, error: {err_msg}"
))]
ProcedureOutput {
procedure_id: String,
err_msg: String,
location: Location,
},
#[snafu(display("Failed to convert RawTableInfo into TableInfo"))]
ConvertRawTableInfo {
location: Location,
source: datatypes::Error,
},
#[snafu(display("Failed to get procedure output"))]
ProcedureOutput { location: Location },
#[snafu(display("Primary key '{key}' not found when creating region request"))]
PrimaryKeyNotFound { key: String, location: Location },
@@ -357,6 +363,9 @@ pub enum Error {
#[snafu(display("Unexpected table route type: {}", err_msg))]
UnexpectedLogicalRouteTable { location: Location, err_msg: String },
#[snafu(display("The tasks of create tables cannot be empty"))]
EmptyCreateTableTasks { location: Location },
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -426,7 +435,7 @@ impl ErrorExt for Error {
InvalidCatalogValue { source, .. } => source.status_code(),
ConvertAlterTableRequest { source, .. } => source.status_code(),
InvalidNumTopics { .. } => StatusCode::InvalidArguments,
InvalidNumTopics { .. } | EmptyCreateTableTasks { .. } => StatusCode::InvalidArguments,
}
}

View File

@@ -84,7 +84,7 @@ use self::schema_name::{SchemaManager, SchemaNameKey, SchemaNameValue};
use self::table_route::{TableRouteManager, TableRouteValue};
use crate::ddl::utils::region_storage_path;
use crate::error::{self, Result, SerdeJsonSnafu};
use crate::kv_backend::txn::Txn;
use crate::kv_backend::txn::{Txn, TxnOpResponse};
use crate::kv_backend::KvBackendRef;
use crate::rpc::router::{region_distribution, RegionRoute, RegionStatus};
use crate::DatanodeId;
@@ -457,6 +457,86 @@ impl TableMetadataManager {
Ok(())
}
/// Creates metadata for multiple logical tables and return an error if different metadata exists.
pub async fn create_logic_tables_metadata(
&self,
tables_data: Vec<(RawTableInfo, TableRouteValue)>,
) -> Result<()> {
let len = tables_data.len();
let mut txns = Vec::with_capacity(3 * len);
struct OnFailure<F1, R1, F2, R2>
where
F1: FnOnce(&Vec<TxnOpResponse>) -> R1,
F2: FnOnce(&Vec<TxnOpResponse>) -> R2,
{
table_info_value: TableInfoValue,
on_create_table_info_failure: F1,
table_route_value: TableRouteValue,
on_create_table_route_failure: F2,
}
let mut on_failures = Vec::with_capacity(len);
for (mut table_info, table_route_value) in tables_data {
table_info.meta.region_numbers = table_route_value.region_numbers();
let table_id = table_info.ident.table_id;
// Creates table name.
let table_name = TableNameKey::new(
&table_info.catalog_name,
&table_info.schema_name,
&table_info.name,
);
let create_table_name_txn = self
.table_name_manager()
.build_create_txn(&table_name, table_id)?;
txns.push(create_table_name_txn);
// Creates table info.
let table_info_value = TableInfoValue::new(table_info);
let (create_table_info_txn, on_create_table_info_failure) =
self.table_info_manager()
.build_create_txn(table_id, &table_info_value)?;
txns.push(create_table_info_txn);
let (create_table_route_txn, on_create_table_route_failure) = self
.table_route_manager()
.build_create_txn(table_id, &table_route_value)?;
txns.push(create_table_route_txn);
on_failures.push(OnFailure {
table_info_value,
on_create_table_info_failure,
table_route_value,
on_create_table_route_failure,
});
}
let txn = Txn::merge_all(txns);
let r = self.kv_backend.txn(txn).await?;
// Checks whether metadata was already created.
if !r.succeeded {
for on_failure in on_failures {
let remote_table_info = (on_failure.on_create_table_info_failure)(&r.responses)?
.context(error::UnexpectedSnafu {
err_msg: "Reads the empty table info during the create table metadata",
})?
.into_inner();
let remote_table_route = (on_failure.on_create_table_route_failure)(&r.responses)?
.context(error::UnexpectedSnafu {
err_msg: "Reads the empty table route during the create table metadata",
})?
.into_inner();
let op_name = "the creating logical tables metadata";
ensure_values!(remote_table_info, on_failure.table_info_value, op_name);
ensure_values!(remote_table_route, on_failure.table_route_value, op_name);
}
}
Ok(())
}
/// Deletes metadata for table.
/// The caller MUST ensure it has the exclusive access to `TableNameKey`.
pub async fn delete_table_metadata(
@@ -907,6 +987,59 @@ mod tests {
);
}
#[tokio::test]
async fn test_create_logic_tables_metadata() {
let mem_kv = Arc::new(MemoryKvBackend::default());
let table_metadata_manager = TableMetadataManager::new(mem_kv);
let region_route = new_test_region_route();
let region_routes = vec![region_route.clone()];
let table_info: RawTableInfo =
new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
let table_id = table_info.ident.table_id;
let table_route_value = TableRouteValue::physical(region_routes.clone());
let tables_data = vec![(table_info.clone(), table_route_value.clone())];
// creates metadata.
table_metadata_manager
.create_logic_tables_metadata(tables_data.clone())
.await
.unwrap();
// if metadata was already created, it should be ok.
assert!(table_metadata_manager
.create_logic_tables_metadata(tables_data)
.await
.is_ok());
let mut modified_region_routes = region_routes.clone();
modified_region_routes.push(new_region_route(2, 3));
let modified_table_route_value = TableRouteValue::physical(modified_region_routes.clone());
let modified_tables_data = vec![(table_info.clone(), modified_table_route_value)];
// if remote metadata was exists, it should return an error.
assert!(table_metadata_manager
.create_logic_tables_metadata(modified_tables_data)
.await
.is_err());
let (remote_table_info, remote_table_route) = table_metadata_manager
.get_full_table_info(table_id)
.await
.unwrap();
assert_eq!(
remote_table_info.unwrap().into_inner().table_info,
table_info
);
assert_eq!(
remote_table_route
.unwrap()
.into_inner()
.region_routes()
.unwrap(),
&region_routes
);
}
#[tokio::test]
async fn test_delete_table_metadata() {
let mem_kv = Arc::new(MemoryKvBackend::default());

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use futures_util::stream::BoxStream;
@@ -26,11 +27,11 @@ use crate::kv_backend::memory::MemoryKvBackend;
use crate::kv_backend::txn::{Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
use crate::rpc::store::RangeRequest;
use crate::rpc::store::{BatchGetRequest, RangeRequest};
use crate::rpc::KeyValue;
use crate::table_name::TableName;
#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct TableNameKey<'a> {
pub catalog: &'a str,
pub schema: &'a str,
@@ -220,6 +221,31 @@ impl TableNameManager {
.transpose()
}
pub async fn batch_get(
&self,
keys: Vec<TableNameKey<'_>>,
) -> Result<Vec<Option<TableNameValue>>> {
let raw_keys = keys
.into_iter()
.map(|key| key.as_raw_key())
.collect::<Vec<_>>();
let req = BatchGetRequest::new().with_keys(raw_keys.clone());
let res = self.kv_backend.batch_get(req).await?;
let kvs = res
.kvs
.into_iter()
.map(|kv| (kv.key, kv.value))
.collect::<HashMap<_, _>>();
let mut array = vec![None; raw_keys.len()];
for (i, key) in raw_keys.into_iter().enumerate() {
let v = kvs.get(&key);
array[i] = v
.map(|v| TableNameValue::try_from_raw_value(v))
.transpose()?;
}
Ok(array)
}
pub async fn exists(&self, key: TableNameKey<'_>) -> Result<bool> {
let raw_key = key.as_raw_key();
self.kv_backend.exists(&raw_key).await

View File

@@ -64,6 +64,10 @@ impl TableRouteValue {
Self::Physical(PhysicalTableRouteValue::new(region_routes))
}
pub fn logical(physical_table_id: TableId, region_ids: Vec<RegionId>) -> Self {
Self::Logical(LogicalTableRouteValue::new(physical_table_id, region_ids))
}
/// Returns a new version [TableRouteValue] with `region_routes`.
pub fn update(&self, region_routes: Vec<RegionRoute>) -> Result<Self> {
ensure!(
@@ -231,7 +235,7 @@ impl TableRouteManager {
}
/// Builds a create table route transaction. it expected the `__table_route/{table_id}` wasn't occupied.
pub(crate) fn build_create_txn(
pub fn build_create_txn(
&self,
table_id: TableId,
table_route_value: &TableRouteValue,

View File

@@ -39,6 +39,12 @@ lazy_static! {
&["step"]
)
.unwrap();
pub static ref METRIC_META_PROCEDURE_CREATE_TABLES: HistogramVec = register_histogram_vec!(
"greptime_meta_procedure_create_tables",
"meta procedure create tables",
&["step"]
)
.unwrap();
pub static ref METRIC_META_PROCEDURE_DROP_TABLE: HistogramVec = register_histogram_vec!(
"greptime_meta_procedure_drop_table",
"meta procedure drop table",

View File

@@ -16,8 +16,10 @@ use std::result;
use api::v1::meta::submit_ddl_task_request::Task;
use api::v1::meta::{
AlterTableTask as PbAlterTableTask, CreateTableTask as PbCreateTableTask,
DropTableTask as PbDropTableTask, Partition, SubmitDdlTaskRequest as PbSubmitDdlTaskRequest,
AlterTableTask as PbAlterTableTask, AlterTableTasks as PbAlterTableTasks,
CreateTableTask as PbCreateTableTask, CreateTableTasks as PbCreateTableTasks,
DropTableTask as PbDropTableTask, DropTableTasks as PbDropTableTasks, Partition,
SubmitDdlTaskRequest as PbSubmitDdlTaskRequest,
SubmitDdlTaskResponse as PbSubmitDdlTaskResponse, TruncateTableTask as PbTruncateTableTask,
};
use api::v1::{AlterExpr, CreateTableExpr, DropTableExpr, TruncateTableExpr};
@@ -38,6 +40,9 @@ pub enum DdlTask {
DropTable(DropTableTask),
AlterTable(AlterTableTask),
TruncateTable(TruncateTableTask),
CreateLogicalTables(Vec<CreateTableTask>),
DropLogicalTables(Vec<DropTableTask>),
AlterLogicalTables(Vec<AlterTableTask>),
}
impl DdlTask {
@@ -49,6 +54,15 @@ impl DdlTask {
DdlTask::CreateTable(CreateTableTask::new(expr, partitions, table_info))
}
pub fn new_create_logical_tables(table_data: Vec<(CreateTableExpr, RawTableInfo)>) -> Self {
DdlTask::CreateLogicalTables(
table_data
.into_iter()
.map(|(expr, table_info)| CreateTableTask::new(expr, Vec::new(), table_info))
.collect(),
)
}
pub fn new_drop_table(
catalog: String,
schema: String,
@@ -96,6 +110,33 @@ impl TryFrom<Task> for DdlTask {
Task::TruncateTableTask(truncate_table) => {
Ok(DdlTask::TruncateTable(truncate_table.try_into()?))
}
Task::CreateTableTasks(create_tables) => {
let tasks = create_tables
.tasks
.into_iter()
.map(|task| task.try_into())
.collect::<Result<Vec<_>>>()?;
Ok(DdlTask::CreateLogicalTables(tasks))
}
Task::DropTableTasks(drop_tables) => {
let tasks = drop_tables
.tasks
.into_iter()
.map(|task| task.try_into())
.collect::<Result<Vec<_>>>()?;
Ok(DdlTask::DropLogicalTables(tasks))
}
Task::AlterTableTasks(alter_tables) => {
let tasks = alter_tables
.tasks
.into_iter()
.map(|task| task.try_into())
.collect::<Result<Vec<_>>>()?;
Ok(DdlTask::AlterLogicalTables(tasks))
}
}
}
}
@@ -110,31 +151,34 @@ impl TryFrom<SubmitDdlTaskRequest> for PbSubmitDdlTaskRequest {
fn try_from(request: SubmitDdlTaskRequest) -> Result<Self> {
let task = match request.task {
DdlTask::CreateTable(task) => Task::CreateTableTask(PbCreateTableTask {
table_info: serde_json::to_vec(&task.table_info).context(error::SerdeJsonSnafu)?,
create_table: Some(task.create_table),
partitions: task.partitions,
}),
DdlTask::DropTable(task) => Task::DropTableTask(PbDropTableTask {
drop_table: Some(DropTableExpr {
catalog_name: task.catalog,
schema_name: task.schema,
table_name: task.table,
table_id: Some(api::v1::TableId { id: task.table_id }),
drop_if_exists: task.drop_if_exists,
}),
}),
DdlTask::AlterTable(task) => Task::AlterTableTask(PbAlterTableTask {
alter_table: Some(task.alter_table),
}),
DdlTask::TruncateTable(task) => Task::TruncateTableTask(PbTruncateTableTask {
truncate_table: Some(TruncateTableExpr {
catalog_name: task.catalog,
schema_name: task.schema,
table_name: task.table,
table_id: Some(api::v1::TableId { id: task.table_id }),
}),
}),
DdlTask::CreateTable(task) => Task::CreateTableTask(task.try_into()?),
DdlTask::DropTable(task) => Task::DropTableTask(task.try_into()?),
DdlTask::AlterTable(task) => Task::AlterTableTask(task.try_into()?),
DdlTask::TruncateTable(task) => Task::TruncateTableTask(task.try_into()?),
DdlTask::CreateLogicalTables(tasks) => {
let tasks = tasks
.into_iter()
.map(|task| task.try_into())
.collect::<Result<Vec<_>>>()?;
Task::CreateTableTasks(PbCreateTableTasks { tasks })
}
DdlTask::DropLogicalTables(tasks) => {
let tasks = tasks
.into_iter()
.map(|task| task.try_into())
.collect::<Result<Vec<_>>>()?;
Task::DropTableTasks(PbDropTableTasks { tasks })
}
DdlTask::AlterLogicalTables(tasks) => {
let tasks = tasks
.into_iter()
.map(|task| task.try_into())
.collect::<Result<Vec<_>>>()?;
Task::AlterTableTasks(PbAlterTableTasks { tasks })
}
};
Ok(Self {
@@ -147,7 +191,11 @@ impl TryFrom<SubmitDdlTaskRequest> for PbSubmitDdlTaskRequest {
#[derive(Debug, Default)]
pub struct SubmitDdlTaskResponse {
pub key: Vec<u8>,
// For create physical table
// TODO(jeremy): remove it?
pub table_id: Option<TableId>,
// For create multi logical tables
pub table_ids: Vec<TableId>,
}
impl TryFrom<PbSubmitDdlTaskResponse> for SubmitDdlTaskResponse {
@@ -155,9 +203,11 @@ impl TryFrom<PbSubmitDdlTaskResponse> for SubmitDdlTaskResponse {
fn try_from(resp: PbSubmitDdlTaskResponse) -> Result<Self> {
let table_id = resp.table_id.map(|t| t.id);
let table_ids = resp.table_ids.iter().map(|t| t.id).collect();
Ok(Self {
key: resp.key,
table_id,
table_ids,
})
}
}
@@ -225,6 +275,22 @@ impl TryFrom<PbDropTableTask> for DropTableTask {
}
}
impl TryFrom<DropTableTask> for PbDropTableTask {
type Error = error::Error;
fn try_from(task: DropTableTask) -> Result<Self> {
Ok(PbDropTableTask {
drop_table: Some(DropTableExpr {
catalog_name: task.catalog,
schema_name: task.schema,
table_name: task.table,
table_id: Some(api::v1::TableId { id: task.table_id }),
drop_if_exists: task.drop_if_exists,
}),
})
}
}
#[derive(Debug, PartialEq, Clone)]
pub struct CreateTableTask {
pub create_table: CreateTableExpr,
@@ -248,6 +314,18 @@ impl TryFrom<PbCreateTableTask> for CreateTableTask {
}
}
impl TryFrom<CreateTableTask> for PbCreateTableTask {
type Error = error::Error;
fn try_from(task: CreateTableTask) -> Result<Self> {
Ok(PbCreateTableTask {
table_info: serde_json::to_vec(&task.table_info).context(error::SerdeJsonSnafu)?,
create_table: Some(task.create_table),
partitions: task.partitions,
})
}
}
impl CreateTableTask {
pub fn new(
expr: CreateTableExpr,
@@ -357,6 +435,16 @@ impl TryFrom<PbAlterTableTask> for AlterTableTask {
}
}
impl TryFrom<AlterTableTask> for PbAlterTableTask {
type Error = error::Error;
fn try_from(task: AlterTableTask) -> Result<Self> {
Ok(PbAlterTableTask {
alter_table: Some(task.alter_table),
})
}
}
impl Serialize for AlterTableTask {
fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
where
@@ -438,6 +526,21 @@ impl TryFrom<PbTruncateTableTask> for TruncateTableTask {
}
}
impl TryFrom<TruncateTableTask> for PbTruncateTableTask {
type Error = error::Error;
fn try_from(task: TruncateTableTask) -> Result<Self> {
Ok(PbTruncateTableTask {
truncate_table: Some(TruncateTableExpr {
catalog_name: task.catalog,
schema_name: task.schema,
table_name: task.table,
table_id: Some(api::v1::TableId { id: task.table_id }),
}),
})
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;

View File

@@ -335,6 +335,12 @@ impl BatchGetRequest {
Self { keys: vec![] }
}
#[inline]
pub fn with_keys(mut self, keys: Vec<Vec<u8>>) -> Self {
self.keys = keys;
self
}
#[inline]
pub fn add_key(mut self, key: impl Into<Vec<u8>>) -> Self {
self.keys.push(key.into());

View File

@@ -27,6 +27,7 @@ use client::client_manager::DatanodeClients;
use common_catalog::consts::MITO2_ENGINE;
use common_meta::datanode_manager::DatanodeManagerRef;
use common_meta::ddl::alter_table::AlterTableProcedure;
use common_meta::ddl::create_logical_tables::{CreateLogicalTablesProcedure, CreateTablesState};
use common_meta::ddl::create_table::*;
use common_meta::ddl::drop_table::DropTableProcedure;
use common_meta::key::table_info::TableInfoValue;
@@ -40,11 +41,11 @@ use store_api::storage::RegionId;
use crate::procedure::utils::mock::EchoRegionServer;
use crate::procedure::utils::test_data;
fn create_table_task() -> CreateTableTask {
fn create_table_task(table_name: Option<&str>) -> CreateTableTask {
let create_table_expr = CreateTableExpr {
catalog_name: "my_catalog".to_string(),
schema_name: "my_schema".to_string(),
table_name: "my_table".to_string(),
table_name: table_name.unwrap_or("my_table").to_string(),
desc: "blabla".to_string(),
column_defs: vec![
PbColumnDef {
@@ -99,7 +100,7 @@ fn create_table_task() -> CreateTableTask {
fn test_region_request_builder() {
let procedure = CreateTableProcedure::new(
1,
create_table_task(),
create_table_task(None),
TableRouteValue::physical(test_data::new_region_routes()),
HashMap::default(),
test_data::new_ddl_context(Arc::new(DatanodeClients::default())),
@@ -190,7 +191,7 @@ async fn test_on_datanode_create_regions() {
let mut procedure = CreateTableProcedure::new(
1,
create_table_task(),
create_table_task(None),
TableRouteValue::physical(region_routes),
HashMap::default(),
test_data::new_ddl_context(datanode_manager),
@@ -230,6 +231,74 @@ async fn test_on_datanode_create_regions() {
assert!(expected_created_regions.lock().unwrap().is_empty());
}
#[tokio::test]
async fn test_on_datanode_create_logical_regions() {
let (region_server, mut rx) = EchoRegionServer::new();
let region_routes = test_data::new_region_routes();
let datanode_manager = new_datanode_manager(&region_server, &region_routes).await;
let physical_table_route = TableRouteValue::physical(region_routes);
let physical_table_id = 111;
let task1 = create_table_task(Some("my_table1"));
let task2 = create_table_task(Some("my_table2"));
let task3 = create_table_task(Some("my_table3"));
let ctx = test_data::new_ddl_context(datanode_manager);
let kv_backend = ctx.table_metadata_manager.kv_backend();
let physical_route_txn = ctx
.table_metadata_manager
.table_route_manager()
.build_create_txn(physical_table_id, &physical_table_route)
.unwrap()
.0;
let _ = kv_backend.txn(physical_route_txn).await.unwrap();
let mut procedure =
CreateLogicalTablesProcedure::new(1, vec![task1, task2, task3], physical_table_id, ctx);
let expected_created_regions = Arc::new(Mutex::new(HashMap::from([(1, 3), (2, 3), (3, 3)])));
let handle = tokio::spawn({
let expected_created_regions = expected_created_regions.clone();
let mut max_recv = expected_created_regions.lock().unwrap().len() * 3;
async move {
while let Some(PbRegionRequest::Creates(requests)) = rx.recv().await {
for request in requests.requests {
let region_number = RegionId::from_u64(request.region_id).region_number();
let mut map = expected_created_regions.lock().unwrap();
let v = map.get_mut(&region_number).unwrap();
*v -= 1;
if *v == 0 {
map.remove(&region_number);
}
max_recv -= 1;
if max_recv == 0 {
break;
}
}
if max_recv == 0 {
break;
}
}
}
});
let status = procedure.on_datanode_create_regions().await.unwrap();
assert!(matches!(status, Status::Executing { persist: false }));
assert!(matches!(
procedure.creator.data.state(),
&CreateTablesState::CreateMetadata
));
handle.await.unwrap();
assert!(expected_created_regions.lock().unwrap().is_empty());
let status = procedure.on_create_metadata().await.unwrap();
assert!(status.is_done());
}
#[tokio::test]
async fn test_on_datanode_drop_regions() {
let drop_table_task = DropTableTask {

View File

@@ -492,6 +492,27 @@ pub enum Error {
table_name: String,
location: Location,
},
#[snafu(display(
"Do not support creating tables in multiple catalogs: {}",
catalog_names
))]
CreateTableWithMultiCatalogs {
catalog_names: String,
location: Location,
},
#[snafu(display("Do not support creating tables in multiple schemas: {}", schema_names))]
CreateTableWithMultiSchemas {
schema_names: String,
location: Location,
},
#[snafu(display("Empty creating table expr"))]
EmptyCreateTableExpr { location: Location },
#[snafu(display("Failed to create logical tables: {}", reason))]
CreateLogicalTables { reason: String, location: Location },
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -608,6 +629,12 @@ impl ErrorExt for Error {
}
Error::ColumnDefaultValue { source, .. } => source.status_code(),
Error::CreateTableWithMultiCatalogs { .. }
| Error::CreateTableWithMultiSchemas { .. }
| Error::EmptyCreateTableExpr { .. } => StatusCode::InvalidArguments,
Error::CreateLogicalTables { .. } => StatusCode::Unexpected,
}
}

View File

@@ -21,6 +21,11 @@ lazy_static! {
"table operator create table"
)
.unwrap();
pub static ref DIST_CREATE_TABLES: Histogram = register_histogram!(
"greptime_table_operator_create_tables",
"table operator create table"
)
.unwrap();
pub static ref DIST_INGEST_ROW_COUNT: IntCounter = register_int_counter!(
"greptime_table_operator_ingest_rows",
"table operator ingest rows"

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;
@@ -50,8 +50,10 @@ use table::TableRef;
use super::StatementExecutor;
use crate::error::{
self, AlterExprToRequestSnafu, CatalogSnafu, ColumnDataTypeSnafu, ColumnNotFoundSnafu,
DeserializePartitionSnafu, InvalidPartitionColumnsSnafu, InvalidTableNameSnafu, ParseSqlSnafu,
Result, SchemaNotFoundSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu,
CreateLogicalTablesSnafu, CreateTableInfoSnafu, CreateTableWithMultiCatalogsSnafu,
CreateTableWithMultiSchemasSnafu, DeserializePartitionSnafu, EmptyCreateTableExprSnafu,
InvalidPartitionColumnsSnafu, InvalidTableNameSnafu, ParseSqlSnafu, Result,
SchemaNotFoundSnafu, TableAlreadyExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu,
UnrecognizedTableOptionSnafu,
};
use crate::expr_factory;
@@ -113,12 +115,12 @@ impl StatementExecutor {
&create_table.table_name,
)
.await
.context(error::CatalogSnafu)?
.context(CatalogSnafu)?
{
return if create_table.create_if_not_exists {
Ok(table)
} else {
error::TableAlreadyExistsSnafu {
TableAlreadyExistsSnafu {
table: format_full_table_name(
&create_table.catalog_name,
&create_table.schema_name,
@@ -149,7 +151,7 @@ impl StatementExecutor {
let mut table_info = create_table_info(create_table, partition_cols, schema_opts)?;
let resp = self
.create_table_procedure(create_table, partitions, table_info.clone())
.create_table_procedure(create_table.clone(), partitions, table_info.clone())
.await?;
let table_id = resp.table_id.context(error::UnexpectedSnafu {
@@ -167,6 +169,99 @@ impl StatementExecutor {
Ok(table)
}
#[tracing::instrument(skip_all)]
pub async fn create_logical_tables(
&self,
create_table_exprs: &[CreateTableExpr],
) -> Result<Vec<TableRef>> {
let _timer = crate::metrics::DIST_CREATE_TABLES.start_timer();
ensure!(!create_table_exprs.is_empty(), EmptyCreateTableExprSnafu);
ensure!(
create_table_exprs
.windows(2)
.all(|expr| expr[0].catalog_name == expr[1].catalog_name),
CreateTableWithMultiCatalogsSnafu {
catalog_names: create_table_exprs
.iter()
.map(|x| x.catalog_name.as_str())
.collect::<HashSet<_>>()
.into_iter()
.collect::<Vec<_>>()
.join(",")
.to_string()
}
);
let catalog_name = create_table_exprs[0].catalog_name.to_string();
ensure!(
create_table_exprs
.windows(2)
.all(|expr| expr[0].schema_name == expr[1].schema_name),
CreateTableWithMultiSchemasSnafu {
schema_names: create_table_exprs
.iter()
.map(|x| x.schema_name.as_str())
.collect::<HashSet<_>>()
.into_iter()
.collect::<Vec<_>>()
.join(",")
.to_string()
}
);
let schema_name = create_table_exprs[0].schema_name.to_string();
// Check table names
for create_table in create_table_exprs {
ensure!(
NAME_PATTERN_REG.is_match(&create_table.table_name),
InvalidTableNameSnafu {
table_name: create_table.table_name.clone(),
}
);
}
let schema = self
.table_metadata_manager
.schema_manager()
.get(SchemaNameKey::new(&catalog_name, &schema_name))
.await
.context(TableMetadataManagerSnafu)?
.context(SchemaNotFoundSnafu {
schema_info: &schema_name,
})?;
let mut raw_tables_info = create_table_exprs
.iter()
.map(|create| create_table_info(create, vec![], schema.clone()))
.collect::<Result<Vec<_>>>()?;
let tables_data = create_table_exprs
.iter()
.cloned()
.zip(raw_tables_info.iter().cloned())
.collect::<Vec<_>>();
let resp = self.create_logical_tables_procedure(tables_data).await?;
let table_ids = resp.table_ids;
ensure!(table_ids.len() == raw_tables_info.len(), CreateLogicalTablesSnafu {
reason: format!("The number of tables is inconsistent with the expected number to be created, expected: {}, actual: {}", raw_tables_info.len(), table_ids.len())
});
info!("Successfully created logical tables: {:?}", table_ids);
for (i, table_info) in raw_tables_info.iter_mut().enumerate() {
table_info.ident.table_id = table_ids[i];
}
let tables_info = raw_tables_info
.into_iter()
.map(|x| x.try_into().context(CreateTableInfoSnafu))
.collect::<Result<Vec<_>>>()?;
Ok(tables_info
.into_iter()
.map(|x| DistTable::table(Arc::new(x)))
.collect())
}
#[tracing::instrument(skip_all)]
pub async fn drop_table(&self, table_name: TableName, drop_if_exists: bool) -> Result<Output> {
if let Some(table) = self
@@ -331,14 +426,28 @@ impl StatementExecutor {
async fn create_table_procedure(
&self,
create_table: &CreateTableExpr,
create_table: CreateTableExpr,
partitions: Vec<Partition>,
table_info: RawTableInfo,
) -> Result<SubmitDdlTaskResponse> {
let partitions = partitions.into_iter().map(Into::into).collect();
let request = SubmitDdlTaskRequest {
task: DdlTask::new_create_table(create_table.clone(), partitions, table_info),
task: DdlTask::new_create_table(create_table, partitions, table_info),
};
self.ddl_executor
.submit_ddl_task(&ExecutorContext::default(), request)
.await
.context(error::ExecuteDdlSnafu)
}
async fn create_logical_tables_procedure(
&self,
tables_data: Vec<(CreateTableExpr, RawTableInfo)>,
) -> Result<SubmitDdlTaskResponse> {
let request = SubmitDdlTaskRequest {
task: DdlTask::new_create_logical_tables(tables_data),
};
self.ddl_executor

View File

@@ -16,7 +16,11 @@ use std::collections::HashMap;
use std::fmt::{self};
use api::v1::add_column_location::LocationType;
use api::v1::region::{alter_request, region_request, AlterRequest};
use api::v1::region::{
alter_request, region_request, AlterRequest, AlterRequests, CloseRequest, CompactRequest,
CreateRequest, CreateRequests, DeleteRequests, DropRequest, DropRequests, FlushRequest,
InsertRequests, OpenRequest, TruncateRequest,
};
use api::v1::{self, Rows, SemanticType};
use snafu::{ensure, OptionExt};
use strum::IntoStaticStr;
@@ -69,82 +73,19 @@ impl RegionRequest {
/// Inserts/Deletes request might become multiple requests. Others are one-to-one.
pub fn try_from_request_body(body: region_request::Body) -> Result<Vec<(RegionId, Self)>> {
match body {
region_request::Body::Inserts(inserts) => Ok(inserts
.requests
.into_iter()
.filter_map(|r| {
let region_id = r.region_id.into();
r.rows
.map(|rows| (region_id, Self::Put(RegionPutRequest { rows })))
})
.collect()),
region_request::Body::Deletes(deletes) => Ok(deletes
.requests
.into_iter()
.filter_map(|r| {
let region_id = r.region_id.into();
r.rows
.map(|rows| (region_id, Self::Delete(RegionDeleteRequest { rows })))
})
.collect()),
region_request::Body::Create(create) => {
let column_metadatas = create
.column_defs
.into_iter()
.map(ColumnMetadata::try_from_column_def)
.collect::<Result<Vec<_>>>()?;
let region_id = create.region_id.into();
let region_dir = region_dir(&create.path, region_id);
Ok(vec![(
region_id,
Self::Create(RegionCreateRequest {
engine: create.engine,
column_metadatas,
primary_key: create.primary_key,
options: create.options,
region_dir,
}),
)])
}
region_request::Body::Drop(drop) => Ok(vec![(
drop.region_id.into(),
Self::Drop(RegionDropRequest {}),
)]),
region_request::Body::Open(open) => {
let region_id = open.region_id.into();
let region_dir = region_dir(&open.path, region_id);
Ok(vec![(
region_id,
Self::Open(RegionOpenRequest {
engine: open.engine,
region_dir,
options: open.options,
skip_wal_replay: false,
}),
)])
}
region_request::Body::Close(close) => Ok(vec![(
close.region_id.into(),
Self::Close(RegionCloseRequest {}),
)]),
region_request::Body::Alter(alter) => Ok(vec![(
alter.region_id.into(),
Self::Alter(RegionAlterRequest::try_from(alter)?),
)]),
region_request::Body::Flush(flush) => Ok(vec![(
flush.region_id.into(),
Self::Flush(RegionFlushRequest {
row_group_size: None,
}),
)]),
region_request::Body::Compact(compact) => Ok(vec![(
compact.region_id.into(),
Self::Compact(RegionCompactRequest {}),
)]),
region_request::Body::Truncate(truncate) => Ok(vec![(
truncate.region_id.into(),
Self::Truncate(RegionTruncateRequest {}),
)]),
region_request::Body::Inserts(inserts) => make_region_puts(inserts),
region_request::Body::Deletes(deletes) => make_region_deletes(deletes),
region_request::Body::Create(create) => make_region_create(create),
region_request::Body::Drop(drop) => make_region_drop(drop),
region_request::Body::Open(open) => make_region_open(open),
region_request::Body::Close(close) => make_region_close(close),
region_request::Body::Alter(alter) => make_region_alter(alter),
region_request::Body::Flush(flush) => make_region_flush(flush),
region_request::Body::Compact(compact) => make_region_compact(compact),
region_request::Body::Truncate(truncate) => make_region_truncate(truncate),
region_request::Body::Creates(creates) => make_region_creates(creates),
region_request::Body::Drops(drops) => make_region_drops(drops),
region_request::Body::Alters(alters) => make_region_alters(alters),
}
}
@@ -154,6 +95,141 @@ impl RegionRequest {
}
}
fn make_region_puts(inserts: InsertRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
let requests = inserts
.requests
.into_iter()
.filter_map(|r| {
let region_id = r.region_id.into();
r.rows
.map(|rows| (region_id, RegionRequest::Put(RegionPutRequest { rows })))
})
.collect();
Ok(requests)
}
fn make_region_deletes(deletes: DeleteRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
let requests = deletes
.requests
.into_iter()
.filter_map(|r| {
let region_id = r.region_id.into();
r.rows.map(|rows| {
(
region_id,
RegionRequest::Delete(RegionDeleteRequest { rows }),
)
})
})
.collect();
Ok(requests)
}
fn make_region_create(create: CreateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
let column_metadatas = create
.column_defs
.into_iter()
.map(ColumnMetadata::try_from_column_def)
.collect::<Result<Vec<_>>>()?;
let region_id = create.region_id.into();
let region_dir = region_dir(&create.path, region_id);
Ok(vec![(
region_id,
RegionRequest::Create(RegionCreateRequest {
engine: create.engine,
column_metadatas,
primary_key: create.primary_key,
options: create.options,
region_dir,
}),
)])
}
fn make_region_creates(creates: CreateRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
let mut requests = Vec::with_capacity(creates.requests.len());
for create in creates.requests {
requests.extend(make_region_create(create)?);
}
Ok(requests)
}
fn make_region_drop(drop: DropRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
let region_id = drop.region_id.into();
Ok(vec![(region_id, RegionRequest::Drop(RegionDropRequest {}))])
}
fn make_region_drops(drops: DropRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
let mut requests = Vec::with_capacity(drops.requests.len());
for drop in drops.requests {
requests.extend(make_region_drop(drop)?);
}
Ok(requests)
}
fn make_region_open(open: OpenRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
let region_id = open.region_id.into();
let region_dir = region_dir(&open.path, region_id);
Ok(vec![(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: open.engine,
region_dir,
options: open.options,
skip_wal_replay: false,
}),
)])
}
fn make_region_close(close: CloseRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
let region_id = close.region_id.into();
Ok(vec![(
region_id,
RegionRequest::Close(RegionCloseRequest {}),
)])
}
fn make_region_alter(alter: AlterRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
let region_id = alter.region_id.into();
Ok(vec![(
region_id,
RegionRequest::Alter(RegionAlterRequest::try_from(alter)?),
)])
}
fn make_region_alters(alters: AlterRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
let mut requests = Vec::with_capacity(alters.requests.len());
for alter in alters.requests {
requests.extend(make_region_alter(alter)?);
}
Ok(requests)
}
fn make_region_flush(flush: FlushRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
let region_id = flush.region_id.into();
Ok(vec![(
region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
)])
}
fn make_region_compact(compact: CompactRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
let region_id = compact.region_id.into();
Ok(vec![(
region_id,
RegionRequest::Compact(RegionCompactRequest {}),
)])
}
fn make_region_truncate(truncate: TruncateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
let region_id = truncate.region_id.into();
Ok(vec![(
region_id,
RegionRequest::Truncate(RegionTruncateRequest {}),
)])
}
/// Request to put data into a region.
#[derive(Debug)]
pub struct RegionPutRequest {