From 5d62e193bd2d491517680e4a332ad31731db6d31 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Tue, 7 Feb 2023 10:46:18 +0800 Subject: [PATCH] feat: support multi regions on datanode (#653) * wip: fix compile errors * chore: move splitter to partition crate * fix: remove useless variants in frontend errors * chore: move more partition related code to partition manager * fix: license header * wip: move WriteSplitter to PartitionRuleManager * fix: clippy warnings * chore: remove useless error variant and format toml * fix: cr comments * chore: resolve conflicts * chore: rebase develop * fix: cr comments * feat: support multi regions on datanode * chore: rebase onto develop * chore: rebase develop * chore: rebase develop * wip * fix: compile errors * feat: multi region * fix: CR comments * feat: allow stat existing regions without actually open it * fix: use table meta in manifest to recover region info --- Cargo.lock | 1 + src/catalog/src/lib.rs | 11 +- src/catalog/src/local/manager.rs | 9 +- src/catalog/src/remote/manager.rs | 1 - src/catalog/src/system.rs | 2 +- src/common/catalog/src/lib.rs | 6 + src/common/grpc-expr/src/insert.rs | 2 + src/datanode/src/sql/insert.rs | 1 + src/datanode/src/tests/instance_test.rs | 1 + src/frontend/src/sql.rs | 3 +- src/frontend/src/table.rs | 21 ++- src/frontend/src/table/insert.rs | 4 +- src/meta-client/src/rpc/router.rs | 8 +- src/mito/src/engine.rs | 211 +++++++++++++-------- src/mito/src/error.rs | 18 +- src/mito/src/table.rs | 232 ++++++++++++++---------- src/mito/src/table/test_util.rs | 39 ++-- src/partition/Cargo.toml | 7 +- src/partition/src/error.rs | 2 +- src/partition/src/splitter.rs | 15 +- src/script/src/table.rs | 4 +- src/servers/src/line_writer.rs | 1 + src/store-api/src/storage/requests.rs | 4 +- src/table/src/error.rs | 3 + src/table/src/metadata.rs | 2 +- src/table/src/requests.rs | 2 +- 26 files changed, 383 insertions(+), 227 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5c8674437a..d19f1ee0d5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4724,6 +4724,7 @@ dependencies = [ name = "partition" version = "0.1.0" dependencies = [ + "common-catalog", "common-error", "common-query", "datafusion", diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index 72a335e396..349f1b95f9 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -167,11 +167,6 @@ pub struct RegisterSchemaRequest { pub schema: String, } -/// Formats table fully-qualified name -pub fn format_full_table_name(catalog: &str, schema: &str, table: &str) -> String { - format!("{catalog}.{schema}.{table}") -} - pub trait CatalogProviderFactory { fn create(&self, catalog_name: String) -> CatalogProviderRef; } @@ -198,8 +193,10 @@ pub(crate) async fn handle_system_table_request<'a, M: CatalogManager>( .create_table(&EngineContext::default(), req.create_table_request.clone()) .await .with_context(|_| CreateTableSnafu { - table_info: format!( - "{catalog_name}.{schema_name}.{table_name}, id: {table_id}", + table_info: common_catalog::format_full_table_name( + catalog_name, + schema_name, + table_name, ), })?; manager diff --git a/src/catalog/src/local/manager.rs b/src/catalog/src/local/manager.rs index 939ad6ea8f..09996b4106 100644 --- a/src/catalog/src/local/manager.rs +++ b/src/catalog/src/local/manager.rs @@ -20,6 +20,7 @@ use common_catalog::consts::{ DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, MIN_USER_TABLE_ID, SYSTEM_CATALOG_NAME, SYSTEM_CATALOG_TABLE_NAME, }; +use common_catalog::format_full_table_name; use common_recordbatch::{RecordBatch, SendableRecordBatchStream}; use common_telemetry::{error, info}; use datatypes::prelude::ScalarVector; @@ -45,10 +46,9 @@ use crate::system::{ }; use crate::tables::SystemCatalog; use crate::{ - format_full_table_name, handle_system_table_request, CatalogList, CatalogManager, - CatalogProvider, CatalogProviderRef, DeregisterTableRequest, RegisterSchemaRequest, - RegisterSystemTableRequest, RegisterTableRequest, RenameTableRequest, SchemaProvider, - SchemaProviderRef, + handle_system_table_request, CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, + DeregisterTableRequest, RegisterSchemaRequest, RegisterSystemTableRequest, + RegisterTableRequest, RenameTableRequest, SchemaProvider, SchemaProviderRef, }; /// A `CatalogManager` consists of a system catalog and a bunch of user catalogs. @@ -252,7 +252,6 @@ impl LocalCatalogManager { schema_name: t.schema_name.clone(), table_name: t.table_name.clone(), table_id: t.table_id, - region_numbers: vec![0], }; let option = self diff --git a/src/catalog/src/remote/manager.rs b/src/catalog/src/remote/manager.rs index a930c29d32..6868638853 100644 --- a/src/catalog/src/remote/manager.rs +++ b/src/catalog/src/remote/manager.rs @@ -324,7 +324,6 @@ impl RemoteCatalogManager { schema_name: schema_name.clone(), table_name: table_name.clone(), table_id, - region_numbers: region_numbers.clone(), }; match self .engine diff --git a/src/catalog/src/system.rs b/src/catalog/src/system.rs index b4fc6e2674..bcc9e44446 100644 --- a/src/catalog/src/system.rs +++ b/src/catalog/src/system.rs @@ -87,7 +87,6 @@ impl SystemCatalogTable { schema_name: INFORMATION_SCHEMA_NAME.to_string(), table_name: SYSTEM_CATALOG_TABLE_NAME.to_string(), table_id: SYSTEM_CATALOG_TABLE_ID, - region_numbers: vec![0], }; let schema = Arc::new(build_system_catalog_schema()); let ctx = EngineContext::default(); @@ -271,6 +270,7 @@ pub fn build_insert_request(entry_type: EntryType, key: &[u8], value: &[u8]) -> schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: SYSTEM_CATALOG_TABLE_NAME.to_string(), columns_values, + region_number: 0, // system catalog table has only one region } } diff --git a/src/common/catalog/src/lib.rs b/src/common/catalog/src/lib.rs index 2fb83abedc..84c6c68af7 100644 --- a/src/common/catalog/src/lib.rs +++ b/src/common/catalog/src/lib.rs @@ -14,3 +14,9 @@ pub mod consts; pub mod error; + +/// Formats table fully-qualified name +#[inline] +pub fn format_full_table_name(catalog: &str, schema: &str, table: &str) -> String { + format!("{catalog}.{schema}.{table}") +} diff --git a/src/common/grpc-expr/src/insert.rs b/src/common/grpc-expr/src/insert.rs index 4295ec6f7c..9d4345d839 100644 --- a/src/common/grpc-expr/src/insert.rs +++ b/src/common/grpc-expr/src/insert.rs @@ -306,6 +306,7 @@ pub fn to_table_insert_request( schema_name: schema_name.to_string(), table_name: table_name.to_string(), columns_values, + region_number: request.region_number, }) } @@ -439,6 +440,7 @@ fn is_null(null_mask: &BitVec, idx: usize) -> Option { mod tests { use std::any::Any; use std::sync::Arc; + use std::{assert_eq, unimplemented, vec}; use api::helper::ColumnDataTypeWrapper; use api::v1::column::{self, SemanticType, Values}; diff --git a/src/datanode/src/sql/insert.rs b/src/datanode/src/sql/insert.rs index f1e596ac4f..7ca7137a59 100644 --- a/src/datanode/src/sql/insert.rs +++ b/src/datanode/src/sql/insert.rs @@ -120,6 +120,7 @@ impl SqlHandler { .into_iter() .map(|(cs, mut b)| (cs.name.to_string(), b.to_vector())) .collect(), + region_number: 0, })) } } diff --git a/src/datanode/src/tests/instance_test.rs b/src/datanode/src/tests/instance_test.rs index bb2cee88f5..8740770330 100644 --- a/src/datanode/src/tests/instance_test.rs +++ b/src/datanode/src/tests/instance_test.rs @@ -344,6 +344,7 @@ pub async fn test_execute_create() { #[tokio::test] async fn test_rename_table() { + common_telemetry::init_default_ut_logging(); let instance = MockInstance::new("test_rename_table_local").await; let output = execute_sql(&instance, "create database db").await; diff --git a/src/frontend/src/sql.rs b/src/frontend/src/sql.rs index 1aa41b928c..e6f66331cb 100644 --- a/src/frontend/src/sql.rs +++ b/src/frontend/src/sql.rs @@ -28,7 +28,7 @@ use crate::error::{self, Result}; const DEFAULT_PLACEHOLDER_VALUE: &str = "default"; // TODO(fys): Extract the common logic in datanode and frontend in the future. -#[allow(dead_code)] +// This function convert insert statement to an `InsertRequest` to region 0. pub(crate) fn insert_to_request(table: &TableRef, stmt: Insert) -> Result { let columns = stmt.columns(); let values = stmt.values().context(error::ParseSqlSnafu)?; @@ -86,6 +86,7 @@ pub(crate) fn insert_to_request(table: &TableRef, stmt: Insert) -> Result table::Result { - let split = self + let splits = self .partition_manager .split_insert_request(&self.table_name, request) .await @@ -83,7 +83,7 @@ impl Table for DistTable { .context(TableOperationSnafu)?; let output = self - .dist_insert(split) + .dist_insert(splits) .await .map_err(BoxedError::new) .context(TableOperationSnafu)?; @@ -123,7 +123,6 @@ impl Table for DistTable { let db = Database::new(&table_name.catalog_name, &table_name.schema_name, client); let datanode_instance = DatanodeInstance::new(Arc::new(self.clone()) as _, db); - // TODO(LFC): Pass in "regions" when Datanode supports multi regions for a table. partition_execs.push(Arc::new(PartitionExec { table_name: table_name.clone(), datanode_instance, @@ -852,14 +851,21 @@ mod test { (2, (30..35).collect::>()), (3, (100..105).collect::>()), ]; - for (region_id, numbers) in regional_numbers { - let datanode_id = *region_to_datanode_mapping.get(®ion_id).unwrap(); + for (region_number, numbers) in regional_numbers { + let datanode_id = *region_to_datanode_mapping.get(®ion_number).unwrap(); let instance = datanode_instances.get(&datanode_id).unwrap().clone(); let start_ts = global_start_ts; global_start_ts += numbers.len() as i64; - insert_testing_data(&table_name, instance.clone(), numbers, start_ts).await; + insert_testing_data( + &table_name, + instance.clone(), + numbers, + start_ts, + region_number, + ) + .await; } let meta = TableMetaBuilder::default() @@ -887,6 +893,7 @@ mod test { dn_instance: Arc, data: Vec, start_ts: i64, + region_number: RegionNumber, ) { let row_count = data.len() as u32; let columns = vec![ @@ -923,7 +930,7 @@ mod test { table_name: table_name.table_name.clone(), columns, row_count, - region_number: 0, + region_number, }; dn_instance .handle_insert(request, QueryContext::arc()) diff --git a/src/frontend/src/table/insert.rs b/src/frontend/src/table/insert.rs index 228e0982e6..9640540957 100644 --- a/src/frontend/src/table/insert.rs +++ b/src/frontend/src/table/insert.rs @@ -62,8 +62,7 @@ impl DistTable { let db = Database::new(&table_name.catalog_name, &table_name.schema_name, client); let instance = DatanodeInstance::new(Arc::new(self.clone()) as _, db); - // TODO(fys): a separate runtime should be used here. - let join = tokio::spawn(async move { + let join = common_runtime::spawn_write(async move { instance .grpc_insert(to_grpc_insert_request(region_id, insert)?) .await @@ -186,6 +185,7 @@ mod tests { schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: "demo".to_string(), columns_values, + region_number: 0, } } diff --git a/src/meta-client/src/rpc/router.rs b/src/meta-client/src/rpc/router.rs index 2dfd2e9a38..a5b3d47fd2 100644 --- a/src/meta-client/src/rpc/router.rs +++ b/src/meta-client/src/rpc/router.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use api::v1::meta::{ CreateRequest as PbCreateRequest, DeleteRequest as PbDeleteRequest, Partition as PbPartition, @@ -170,12 +170,12 @@ pub struct TableRoute { } impl TableRoute { - pub fn find_leaders(&self) -> Vec { + pub fn find_leaders(&self) -> HashSet { self.region_routes .iter() .flat_map(|x| &x.leader_peer) .cloned() - .collect::>() + .collect() } pub fn find_leader_regions(&self, datanode: &Peer) -> Vec { @@ -189,7 +189,7 @@ impl TableRoute { } None }) - .collect::>() + .collect() } } diff --git a/src/mito/src/engine.rs b/src/mito/src/engine.rs index b297b624e6..f9bde0feb7 100644 --- a/src/mito/src/engine.rs +++ b/src/mito/src/engine.rs @@ -17,17 +17,21 @@ use std::sync::{Arc, RwLock}; use async_trait::async_trait; use common_error::ext::BoxedError; -use common_telemetry::logging; +use common_telemetry::tracing::log::info; +use common_telemetry::{debug, logging}; use datatypes::schema::SchemaRef; use object_store::ObjectStore; use snafu::{ensure, OptionExt, ResultExt}; use store_api::storage::{ ColumnDescriptorBuilder, ColumnFamilyDescriptor, ColumnFamilyDescriptorBuilder, ColumnId, - CreateOptions, EngineContext as StorageEngineContext, OpenOptions, RegionDescriptorBuilder, - RegionId, RowKeyDescriptor, RowKeyDescriptorBuilder, StorageEngine, + CreateOptions, EngineContext as StorageEngineContext, OpenOptions, Region, + RegionDescriptorBuilder, RegionId, RowKeyDescriptor, RowKeyDescriptorBuilder, StorageEngine, }; use table::engine::{EngineContext, TableEngine, TableReference}; -use table::metadata::{TableId, TableInfoBuilder, TableMetaBuilder, TableType, TableVersion}; +use table::error::TableOperationSnafu; +use table::metadata::{ + TableId, TableInfo, TableInfoBuilder, TableMetaBuilder, TableType, TableVersion, +}; use table::requests::{ AlterKind, AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest, }; @@ -38,9 +42,10 @@ use tokio::sync::Mutex; use crate::config::EngineConfig; use crate::error::{ self, BuildColumnDescriptorSnafu, BuildColumnFamilyDescriptorSnafu, BuildRegionDescriptorSnafu, - BuildRowKeyDescriptorSnafu, InvalidPrimaryKeySnafu, MissingTimestampIndexSnafu, Result, - TableExistsSnafu, + BuildRowKeyDescriptorSnafu, InvalidPrimaryKeySnafu, MissingTimestampIndexSnafu, + RegionNotFoundSnafu, Result, TableExistsSnafu, }; +use crate::manifest::TableManifest; use crate::table::MitoTable; pub const MITO_ENGINE: &str = "mito"; @@ -331,51 +336,53 @@ impl MitoEngineInner { )?; let table_id = request.id; - // TODO(dennis): supports multi regions; - assert_eq!(1, request.region_numbers.len()); - let region_number = request.region_numbers[0]; - let region_id = region_id(table_id, region_number); - - let region_name = region_name(table_id, region_number); - let region_descriptor = RegionDescriptorBuilder::default() - .id(region_id) - .name(®ion_name) - .row_key(row_key) - .default_cf(default_cf) - .build() - .context(BuildRegionDescriptorSnafu { - table_name, - region_name, - })?; + let table_dir = table_dir(schema_name, table_id); + let mut regions = HashMap::with_capacity(request.region_numbers.len()); let _lock = self.table_mutex.lock().await; // Checks again, read lock should be enough since we are guarded by the mutex. if let Some(table) = self.get_table(&table_ref) { - if request.create_if_not_exists { - return Ok(table); + return if request.create_if_not_exists { + Ok(table) } else { - return TableExistsSnafu { table_name }.fail(); - } + TableExistsSnafu { table_name }.fail() + }; } - let table_dir = table_dir(schema_name, table_id); - let opts = CreateOptions { - parent_dir: table_dir.clone(), - }; + for region_number in &request.region_numbers { + let region_id = region_id(table_id, *region_number); - let region = self - .storage_engine - .create_region(&StorageEngineContext::default(), region_descriptor, &opts) - .await - .map_err(BoxedError::new) - .context(error::CreateRegionSnafu)?; + let region_name = region_name(table_id, *region_number); + let region_descriptor = RegionDescriptorBuilder::default() + .id(region_id) + .name(®ion_name) + .row_key(row_key.clone()) + .default_cf(default_cf.clone()) + .build() + .context(BuildRegionDescriptorSnafu { + table_name, + region_name, + })?; + let opts = CreateOptions { + parent_dir: table_dir.clone(), + }; + + let region = self + .storage_engine + .create_region(&StorageEngineContext::default(), region_descriptor, &opts) + .await + .map_err(BoxedError::new) + .context(error::CreateRegionSnafu)?; + info!("Mito engine created region: {:?}", region.id()); + regions.insert(*region_number, region); + } let table_meta = TableMetaBuilder::default() .schema(request.schema) .engine(MITO_ENGINE) .next_column_id(next_column_id) .primary_key_indices(request.primary_key_indices.clone()) - .region_numbers(vec![region_number]) + .region_numbers(request.region_numbers) .build() .context(error::BuildTableMetaSnafu { table_name })?; @@ -394,7 +401,7 @@ impl MitoEngineInner { table_name, &table_dir, table_info, - region, + regions, self.object_store.clone(), ) .await?, @@ -444,28 +451,37 @@ impl MitoEngineInner { parent_dir: table_dir.to_string(), }; - // TODO(dennis): supports multi regions; - assert_eq!(request.region_numbers.len(), 1); - let region_number = request.region_numbers[0]; - let region_name = region_name(table_id, region_number); + let Some((manifest, table_info)) = self + .recover_table_manifest_and_info(table_name, &table_dir) + .await? else { return Ok(None) }; - let region = match self - .storage_engine - .open_region(&engine_ctx, ®ion_name, &opts) - .await - .map_err(BoxedError::new) - .context(table_error::TableOperationSnafu)? - { - None => return Ok(None), - Some(region) => region, - }; + debug!( + "Opening table {}, table info recovered: {:?}", + table_id, table_info + ); - let table = Arc::new( - MitoTable::open(table_name, &table_dir, region, self.object_store.clone()) + let mut regions = HashMap::with_capacity(table_info.meta.region_numbers.len()); + for region_number in &table_info.meta.region_numbers { + let region_name = region_name(table_id, *region_number); + let region = self + .storage_engine + .open_region(&engine_ctx, ®ion_name, &opts) .await .map_err(BoxedError::new) - .context(table_error::TableOperationSnafu)?, - ); + .context(table_error::TableOperationSnafu)? + .with_context(|| RegionNotFoundSnafu { + table: format!( + "{}.{}.{}", + request.catalog_name, request.schema_name, request.table_name + ), + region: *region_number, + }) + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu)?; + regions.insert(*region_number, region); + } + + let table = Arc::new(MitoTable::new(table_info, regions, manifest)); self.tables .write() @@ -479,6 +495,24 @@ impl MitoEngineInner { Ok(table) } + async fn recover_table_manifest_and_info( + &self, + table_name: &str, + table_dir: &str, + ) -> TableResult> { + let manifest = MitoTable::<::Region>::build_manifest( + table_dir, + self.object_store.clone(), + ); + let Some(table_info) = + MitoTable::<::Region>::recover_table_info(table_name, &manifest) + .await + .map_err(BoxedError::new) + .context(TableOperationSnafu)? else { return Ok(None) }; + + Ok(Some((manifest, table_info))) + } + fn get_table(&self, table_ref: &TableReference) -> Option { self.tables .read() @@ -572,6 +606,7 @@ mod tests { }; use log_store::NoopLogStore; use storage::config::EngineConfig as StorageEngineConfig; + use storage::region::RegionImpl; use storage::EngineImpl; use store_api::manifest::Manifest; use store_api::storage::ReadContext; @@ -580,7 +615,9 @@ mod tests { use super::*; use crate::table::test_util; - use crate::table::test_util::{new_insert_request, schema_for_test, MockRegion, TABLE_NAME}; + use crate::table::test_util::{ + new_insert_request, schema_for_test, TestEngineComponents, TABLE_NAME, + }; async fn setup_table_with_column_default_constraint() -> (TempDir, String, TableRef) { let table_name = "test_default_constraint"; @@ -757,10 +794,14 @@ mod tests { #[tokio::test] async fn test_create_table_insert_scan() { - let (_engine, table, schema, _dir) = test_util::setup_test_engine_and_table().await; - + let TestEngineComponents { + table_ref: table, + schema_ref, + dir: _dir, + .. + } = test_util::setup_test_engine_and_table().await; assert_eq!(TableType::Base, table.table_type()); - assert_eq!(schema, table.schema()); + assert_eq!(schema_ref, table.schema()); let insert_req = new_insert_request("demo".to_string(), HashMap::default()); assert_eq!(0, table.insert(insert_req).await.unwrap()); @@ -839,7 +880,11 @@ mod tests { async fn test_create_table_scan_batches() { common_telemetry::init_default_ut_logging(); - let (_engine, table, _schema, _dir) = test_util::setup_test_engine_and_table().await; + let TestEngineComponents { + table_ref: table, + dir: _dir, + .. + } = test_util::setup_test_engine_and_table().await; // TODO(yingwen): Custom batch size once the table support setting batch_size. let default_batch_size = ReadContext::default().batch_size; @@ -933,12 +978,18 @@ mod tests { table_name: test_util::TABLE_NAME.to_string(), // the test table id is 1 table_id: 1, - region_numbers: vec![0], }; - let (engine, table, object_store, _dir) = { - let (engine, table_engine, table, object_store, dir) = - test_util::setup_mock_engine_and_table().await; + let (_engine, storage_engine, table, object_store, _dir) = { + let TestEngineComponents { + table_engine, + storage_engine, + table_ref: table, + object_store, + dir, + .. + } = test_util::setup_test_engine_and_table().await; + assert_eq!(MITO_ENGINE, table_engine.name()); // Now try to open the table again. let reopened = table_engine @@ -948,11 +999,11 @@ mod tests { .unwrap(); assert_eq!(table.schema(), reopened.schema()); - (engine, table, object_store, dir) + (table_engine, storage_engine, table, object_store, dir) }; // Construct a new table engine, and try to open the table. - let table_engine = MitoEngine::new(EngineConfig::default(), engine, object_store); + let table_engine = MitoEngine::new(EngineConfig::default(), storage_engine, object_store); let reopened = table_engine .open_table(&ctx, open_req.clone()) .await @@ -962,11 +1013,13 @@ mod tests { let reopened = reopened .as_any() - .downcast_ref::>() + .downcast_ref::>>() .unwrap(); + let left = table.table_info(); // assert recovered table_info is correct - assert_eq!(table.table_info(), reopened.table_info()); + let right = reopened.table_info(); + assert_eq!(left, right); assert_eq!(reopened.manifest().last_version(), 1); } @@ -1092,8 +1145,13 @@ mod tests { #[tokio::test] async fn test_alter_rename_table() { - let (engine, table_engine, _table, object_store, _dir) = - test_util::setup_mock_engine_and_table().await; + let TestEngineComponents { + table_engine, + storage_engine, + object_store, + dir: _dir, + .. + } = test_util::setup_test_engine_and_table().await; let ctx = EngineContext::default(); // register another table @@ -1143,13 +1201,12 @@ mod tests { assert_eq!(table.table_info().name, new_table_name); - let table_engine = MitoEngine::new(EngineConfig::default(), engine, object_store); + let table_engine = MitoEngine::new(EngineConfig::default(), storage_engine, object_store); let open_req = OpenTableRequest { catalog_name: DEFAULT_CATALOG_NAME.to_string(), schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: new_table_name.to_string(), table_id: 1, - region_numbers: vec![0], }; // test reopen table @@ -1160,7 +1217,7 @@ mod tests { .unwrap(); let reopened = reopened .as_any() - .downcast_ref::>() + .downcast_ref::>>() .unwrap(); assert_eq!(reopened.table_info(), table.table_info()); assert_eq!(reopened.table_info().name, new_table_name); @@ -1234,7 +1291,11 @@ mod tests { #[tokio::test] async fn test_table_delete_rows() { - let (_engine, table, _schema, _dir) = test_util::setup_test_engine_and_table().await; + let TestEngineComponents { + table_ref: table, + dir: _dir, + .. + } = test_util::setup_test_engine_and_table().await; let mut columns_values: HashMap = HashMap::with_capacity(4); let hosts: VectorRef = diff --git a/src/mito/src/error.rs b/src/mito/src/error.rs index 9a93ef729e..ee1aa15265 100644 --- a/src/mito/src/error.rs +++ b/src/mito/src/error.rs @@ -16,6 +16,7 @@ use std::any::Any; use common_error::ext::BoxedError; use common_error::prelude::*; +use store_api::storage::RegionNumber; use table::metadata::{TableInfoBuilderError, TableMetaBuilderError}; #[derive(Debug, Snafu)] @@ -154,7 +155,7 @@ pub enum Error { }, #[snafu(display( - "Projected columnd not found in region, column: {}", + "Projected column not found in region, column: {}", column_qualified_name ))] ProjectedColumnNotFound { @@ -170,6 +171,19 @@ pub enum Error { #[snafu(backtrace)] source: table::metadata::ConvertError, }, + + #[snafu(display("Cannot find region, table: {}, region: {}", table, region))] + RegionNotFound { + table: String, + region: RegionNumber, + backtrace: Backtrace, + }, + + #[snafu(display("Invalid region name: {}", region_name))] + InvalidRegionName { + region_name: String, + backtrace: Backtrace, + }, } pub type Result = std::result::Result; @@ -198,6 +212,8 @@ impl ErrorExt for Error { TableInfoNotFound { .. } | ConvertRaw { .. } => StatusCode::Unexpected, ScanTableManifest { .. } | UpdateTableManifest { .. } => StatusCode::StorageUnavailable, + RegionNotFound { .. } => StatusCode::Internal, + InvalidRegionName { .. } => StatusCode::Internal, } } diff --git a/src/mito/src/table.rs b/src/mito/src/table.rs index 16fd8ce635..8f1dd5b7d2 100644 --- a/src/mito/src/table.rs +++ b/src/mito/src/table.rs @@ -16,6 +16,7 @@ pub mod test_util; use std::any::Any; +use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; @@ -27,17 +28,18 @@ use common_query::physical_plan::PhysicalPlanRef; use common_recordbatch::error::{ExternalSnafu, Result as RecordBatchResult}; use common_recordbatch::{RecordBatch, RecordBatchStream}; use common_telemetry::logging; +use datatypes::schema::Schema; use futures::task::{Context, Poll}; use futures::Stream; use object_store::ObjectStore; -use snafu::{OptionExt, ResultExt}; +use snafu::{ensure, OptionExt, ResultExt}; use store_api::manifest::{self, Manifest, ManifestVersion, MetaActionIterator}; use store_api::storage::{ AddColumn, AlterOperation, AlterRequest, ChunkReader, ReadContext, Region, RegionMeta, - ScanRequest, SchemaRef, Snapshot, WriteContext, WriteRequest, + RegionNumber, ScanRequest, SchemaRef, Snapshot, WriteContext, WriteRequest, }; use table::error as table_error; -use table::error::Result as TableResult; +use table::error::{RegionSchemaMismatchSnafu, Result as TableResult, TableOperationSnafu}; use table::metadata::{ FilterPushDownType, RawTableInfo, TableInfo, TableInfoRef, TableMeta, TableType, }; @@ -48,8 +50,9 @@ use table::table::scan::SimpleTableScan; use table::table::{AlterContext, Table}; use tokio::sync::Mutex; +use crate::error; use crate::error::{ - self, ProjectedColumnNotFoundSnafu, Result, ScanTableManifestSnafu, TableInfoNotFoundSnafu, + ProjectedColumnNotFoundSnafu, RegionNotFoundSnafu, Result, ScanTableManifestSnafu, UpdateTableManifestSnafu, }; use crate::manifest::action::*; @@ -65,8 +68,7 @@ pub struct MitoTable { manifest: TableManifest, // guarded by `self.alter_lock` table_info: ArcSwap, - // TODO(dennis): a table contains multi regions - region: R, + regions: HashMap, alter_lock: Mutex<()>, } @@ -85,15 +87,29 @@ impl Table for MitoTable { return Ok(0); } - let mut write_request = self.region.write_request(); + let region = self + .regions + .get(&request.region_number) + .with_context(|| RegionNotFoundSnafu { + table: common_catalog::format_full_table_name( + &request.catalog_name, + &request.schema_name, + &request.table_name, + ), + region: request.region_number, + }) + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu)?; + let mut write_request = region.write_request(); let columns_values = request.columns_values; // columns_values is not empty, it's safe to unwrap let rows_num = columns_values.values().next().unwrap().len(); logging::trace!( - "Insert into table {} with data: {:?}", + "Insert into table {} region {} with data: {:?}", self.table_info().name, + region.id(), columns_values ); @@ -102,8 +118,7 @@ impl Table for MitoTable { .map_err(BoxedError::new) .context(table_error::TableOperationSnafu)?; - let _resp = self - .region + let _resp = region .write(&WriteContext::default(), write_request) .await .map_err(BoxedError::new) @@ -127,35 +142,64 @@ impl Table for MitoTable { _limit: Option, ) -> TableResult { let read_ctx = ReadContext::default(); - let snapshot = self - .region - .snapshot(&read_ctx) - .map_err(BoxedError::new) - .context(table_error::TableOperationSnafu)?; + let mut readers = Vec::with_capacity(self.regions.len()); + let mut first_schema: Option> = None; - let projection = self - .transform_projection(&self.region, projection.cloned()) - .map_err(BoxedError::new) - .context(table_error::TableOperationSnafu)?; - let filters = filters.into(); - let scan_request = ScanRequest { - projection, - filters, - ..Default::default() - }; - let mut reader = snapshot - .scan(&read_ctx, scan_request) - .await - .map_err(BoxedError::new) - .context(table_error::TableOperationSnafu)? - .reader; + let table_info = self.table_info.load(); + // TODO(hl): Currently the API between frontend and datanode is under refactoring in + // https://github.com/GreptimeTeam/greptimedb/issues/597 . Once it's finished, query plan + // can carry filtered region info to avoid scanning all regions on datanode. + for region in self.regions.values() { + let snapshot = region + .snapshot(&read_ctx) + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu)?; + let projection = self + .transform_projection(region, projection.cloned()) + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu)?; + let filters = filters.into(); + let scan_request = ScanRequest { + projection, + filters, + ..Default::default() + }; + let reader = snapshot + .scan(&read_ctx, scan_request) + .await + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu)? + .reader; - let schema = reader.schema().clone(); - let stream_schema = schema.clone(); + let schema = reader.schema().clone(); + if let Some(first_schema) = &first_schema { + // TODO(hl): we assume all regions' schemas are the same, but undergoing table altering + // may make these schemas inconsistent. + ensure!( + first_schema.version() == schema.version(), + RegionSchemaMismatchSnafu { + table: common_catalog::format_full_table_name( + &table_info.catalog_name, + &table_info.schema_name, + &table_info.name + ) + } + ); + } else { + first_schema = Some(schema); + } + readers.push(reader); + } + // TODO(hl): we assume table contains at least one region, but with region migration this + // assumption may become invalid. + let stream_schema = first_schema.unwrap(); + let schema = stream_schema.clone(); let stream = Box::pin(async_stream::try_stream! { - while let Some(chunk) = reader.next_chunk().await.map_err(BoxedError::new).context(ExternalSnafu)? { - yield RecordBatch::new(stream_schema.clone(), chunk.columns)? + for mut reader in readers { + while let Some(chunk) = reader.next_chunk().await.map_err(BoxedError::new).context(ExternalSnafu)? { + yield RecordBatch::new(stream_schema.clone(), chunk.columns)? + } } }); @@ -218,24 +262,26 @@ impl Table for MitoTable { { // TODO(yingwen): Error handling. Maybe the region need to provide a method to // validate the request first. - let region = self.region(); - let region_meta = region.in_memory_metadata(); - let alter_req = AlterRequest { - operation: alter_op, - version: region_meta.version(), - }; - // Alter the region. - logging::debug!( - "start altering region {} of table {}, with request {:?}", - region.name(), - table_name, - alter_req, - ); - region - .alter(alter_req) - .await - .map_err(BoxedError::new) - .context(table_error::TableOperationSnafu)?; + let regions = self.regions(); + for region in regions.values() { + let region_meta = region.in_memory_metadata(); + let alter_req = AlterRequest { + operation: alter_op.clone(), + version: region_meta.version(), + }; + // Alter the region. + logging::debug!( + "start altering region {} of table {}, with request {:?}", + region.name(), + table_name, + alter_req, + ); + region + .alter(alter_req) + .await + .map_err(BoxedError::new) + .context(TableOperationSnafu)?; + } } // Update in memory metadata of the table. self.set_table_info(new_info); @@ -247,30 +293,33 @@ impl Table for MitoTable { if request.key_column_values.is_empty() { return Ok(0); } + let mut rows_deleted = 0; + // TODO(hl): Should be tracked by procedure. + // TODO(hl): Parse delete request into region->keys instead of delete in each region + for region in self.regions.values() { + let mut write_request = region.write_request(); + let key_column_values = request.key_column_values.clone(); + // Safety: key_column_values isn't empty. + let rows_num = key_column_values.values().next().unwrap().len(); - let mut write_request = self.region.write_request(); + logging::trace!( + "Delete from table {} where key_columns are: {:?}", + self.table_info().name, + key_column_values + ); - let key_column_values = request.key_column_values; - // Safety: key_column_values isn't empty. - let rows_num = key_column_values.values().next().unwrap().len(); - - logging::trace!( - "Delete from table {} where key_columns are: {:?}", - self.table_info().name, - key_column_values - ); - - write_request - .delete(key_column_values) - .map_err(BoxedError::new) - .context(table_error::TableOperationSnafu)?; - self.region - .write(&WriteContext::default(), write_request) - .await - .map_err(BoxedError::new) - .context(table_error::TableOperationSnafu)?; - - Ok(rows_num) + write_request + .delete(key_column_values) + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu)?; + region + .write(&WriteContext::default(), write_request) + .await + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu)?; + rows_deleted += rows_num; + } + Ok(rows_deleted) } } @@ -299,10 +348,14 @@ fn column_qualified_name(table_name: &str, region_name: &str, column_name: &str) } impl MitoTable { - fn new(table_info: TableInfo, region: R, manifest: TableManifest) -> Self { + pub(crate) fn new( + table_info: TableInfo, + regions: HashMap, + manifest: TableManifest, + ) -> Self { Self { table_info: ArcSwap::new(Arc::new(table_info)), - region, + regions, manifest, alter_lock: Mutex::new(()), } @@ -368,7 +421,7 @@ impl MitoTable { table_name: &str, table_dir: &str, table_info: TableInfo, - region: R, + regions: HashMap, object_store: ObjectStore, ) -> Result> { let manifest = TableManifest::new(&table_manifest_dir(table_dir), object_store); @@ -383,25 +436,14 @@ impl MitoTable { .await .context(UpdateTableManifestSnafu { table_name })?; - Ok(MitoTable::new(table_info, region, manifest)) + Ok(MitoTable::new(table_info, regions, manifest)) } - pub async fn open( - table_name: &str, - table_dir: &str, - region: R, - object_store: ObjectStore, - ) -> Result> { - let manifest = TableManifest::new(&table_manifest_dir(table_dir), object_store); - - let mut table_info = Self::recover_table_info(table_name, &manifest) - .await? - .context(TableInfoNotFoundSnafu { table_name })?; - table_info.meta.region_numbers = vec![(region.id() & 0xFFFFFFFF) as u32]; - Ok(MitoTable::new(table_info, region, manifest)) + pub(crate) fn build_manifest(table_dir: &str, object_store: ObjectStore) -> TableManifest { + TableManifest::new(&table_manifest_dir(table_dir), object_store) } - async fn recover_table_info( + pub(crate) async fn recover_table_info( table_name: &str, manifest: &TableManifest, ) -> Result> { @@ -449,8 +491,8 @@ impl MitoTable { } #[inline] - pub fn region(&self) -> &R { - &self.region + pub fn regions(&self) -> &HashMap { + &self.regions } pub fn set_table_info(&self, table_info: TableInfo) { diff --git a/src/mito/src/table/test_util.rs b/src/mito/src/table/test_util.rs index ba5434d0dd..35721d8924 100644 --- a/src/mito/src/table/test_util.rs +++ b/src/mito/src/table/test_util.rs @@ -47,6 +47,7 @@ pub fn new_insert_request( schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name, columns_values, + region_number: 0, } } @@ -115,22 +116,27 @@ fn new_create_request(schema: SchemaRef) -> CreateTableRequest { } } -pub async fn setup_test_engine_and_table() -> ( - MitoEngine>, - TableRef, - SchemaRef, - TempDir, -) { +pub struct TestEngineComponents { + pub table_engine: MitoEngine>, + pub storage_engine: EngineImpl, + pub table_ref: TableRef, + pub schema_ref: SchemaRef, + pub object_store: ObjectStore, + pub dir: TempDir, +} + +pub async fn setup_test_engine_and_table() -> TestEngineComponents { let (dir, object_store) = new_test_object_store("setup_test_engine_and_table").await; + let storage_engine = EngineImpl::new( + StorageEngineConfig::default(), + Arc::new(NoopLogStore::default()), + object_store.clone(), + ); let table_engine = MitoEngine::new( EngineConfig::default(), - EngineImpl::new( - StorageEngineConfig::default(), - Arc::new(NoopLogStore::default()), - object_store.clone(), - ), - object_store, + storage_engine.clone(), + object_store.clone(), ); let schema = Arc::new(schema_for_test()); @@ -142,7 +148,14 @@ pub async fn setup_test_engine_and_table() -> ( .await .unwrap(); - (table_engine, table, schema, dir) + TestEngineComponents { + table_engine, + storage_engine, + table_ref: table, + schema_ref: schema, + object_store, + dir, + } } pub async fn setup_mock_engine_and_table( diff --git a/src/partition/Cargo.toml b/src/partition/Cargo.toml index e0f7e8866d..d47c85b0bb 100644 --- a/src/partition/Cargo.toml +++ b/src/partition/Cargo.toml @@ -7,16 +7,17 @@ license.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +common-catalog = { path = "../common/catalog" } common-error = { path = "../common/error" } common-query = { path = "../common/query" } -datafusion.workspace = true datafusion-common.workspace = true datafusion-expr.workspace = true +datafusion.workspace = true datatypes = { path = "../datatypes" } meta-client = { path = "../meta-client" } moka = { version = "0.9", features = ["future"] } -snafu.workspace = true -store-api = { path = "../store-api" } serde.workspace = true serde_json = "1.0" +snafu.workspace = true +store-api = { path = "../store-api" } table = { path = "../table" } diff --git a/src/partition/src/error.rs b/src/partition/src/error.rs index f3b4a8bdea..f93478f3ed 100644 --- a/src/partition/src/error.rs +++ b/src/partition/src/error.rs @@ -23,7 +23,7 @@ use store_api::storage::RegionId; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { - #[snafu(display("Failed to get cache, error: {}", err_msg))] + #[snafu(display("Failed to get meta info from cache, error: {}", err_msg))] GetCache { err_msg: String, backtrace: Backtrace, diff --git a/src/partition/src/splitter.rs b/src/partition/src/splitter.rs index 0fcccb3f23..c8a83098c1 100644 --- a/src/partition/src/splitter.rs +++ b/src/partition/src/splitter.rs @@ -155,18 +155,19 @@ fn split_insert_request( let table_name = &insert.table_name; dist_insert .into_iter() - .map(|(region_id, vector_map)| { + .map(|(region_number, vector_map)| { let columns_values = vector_map .into_iter() .map(|(column_name, mut builder)| (column_name.to_string(), builder.to_vector())) .collect(); ( - region_id, + region_number, InsertRequest { catalog_name: catalog_name.to_string(), schema_name: schema_name.to_string(), table_name: table_name.to_string(), columns_values, + region_number, }, ) }) @@ -396,10 +397,11 @@ mod tests { columns_values.insert("id".to_string(), builder.to_vector()); InsertRequest { - catalog_name: "greptime".to_string(), - schema_name: "public".to_string(), + catalog_name: common_catalog::consts::DEFAULT_CATALOG_NAME.to_string(), + schema_name: common_catalog::consts::DEFAULT_SCHEMA_NAME.to_string(), table_name: "demo".to_string(), columns_values, + region_number: 0, } } @@ -423,10 +425,11 @@ mod tests { columns_values.insert("id".to_string(), builder.to_vector()); InsertRequest { - catalog_name: "greptime".to_string(), - schema_name: "public".to_string(), + catalog_name: common_catalog::consts::DEFAULT_CATALOG_NAME.to_string(), + schema_name: common_catalog::consts::DEFAULT_SCHEMA_NAME.to_string(), table_name: "demo".to_string(), columns_values, + region_number: 0, } } diff --git a/src/script/src/table.rs b/src/script/src/table.rs index e0d8865571..e885a9e598 100644 --- a/src/script/src/table.rs +++ b/src/script/src/table.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use catalog::{CatalogManagerRef, RegisterSystemTableRequest}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, SCRIPTS_TABLE_ID}; +use common_catalog::format_full_table_name; use common_query::Output; use common_recordbatch::util as record_util; use common_telemetry::logging; @@ -77,7 +78,7 @@ impl ScriptsTable { Ok(Self { catalog_manager, query_engine, - name: catalog::format_full_table_name( + name: format_full_table_name( DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, SCRIPTS_TABLE_NAME, @@ -131,6 +132,7 @@ impl ScriptsTable { schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: SCRIPTS_TABLE_NAME.to_string(), columns_values, + region_number: 0, }) .await .context(InsertScriptSnafu { name })?; diff --git a/src/servers/src/line_writer.rs b/src/servers/src/line_writer.rs index 0c34d78a8a..89d6993454 100644 --- a/src/servers/src/line_writer.rs +++ b/src/servers/src/line_writer.rs @@ -153,6 +153,7 @@ impl LineWriter { schema_name: self.db, table_name: self.table_name, columns_values, + region_number: 0, // TODO(hl): Check if assign 0 region is ok? } } } diff --git a/src/store-api/src/storage/requests.rs b/src/store-api/src/storage/requests.rs index 30260aac76..94c56f5378 100644 --- a/src/store-api/src/storage/requests.rs +++ b/src/store-api/src/storage/requests.rs @@ -55,7 +55,7 @@ pub struct ScanRequest { pub struct GetRequest {} /// Operation to add a column. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct AddColumn { /// Descriptor of the column to add. pub desc: ColumnDescriptor, @@ -64,7 +64,7 @@ pub struct AddColumn { } /// Operation to alter a region. -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum AlterOperation { /// Add columns to the region. AddColumns { diff --git a/src/table/src/error.rs b/src/table/src/error.rs index 381b192398..f494f10e08 100644 --- a/src/table/src/error.rs +++ b/src/table/src/error.rs @@ -99,6 +99,8 @@ pub enum Error { column_name: String, backtrace: Backtrace, }, + #[snafu(display("Regions schemas mismatch in table: {}", table))] + RegionSchemaMismatch { table: String, backtrace: Backtrace }, #[snafu(display("Failed to operate table, source: {}", source))] TableOperation { source: BoxedError }, @@ -122,6 +124,7 @@ impl ErrorExt for Error { Error::SchemaBuild { source, .. } => source.status_code(), Error::TableOperation { source } => source.status_code(), Error::ColumnNotExists { .. } => StatusCode::TableColumnNotFound, + Error::RegionSchemaMismatch { .. } => StatusCode::StorageUnavailable, Error::Unsupported { .. } => StatusCode::Unsupported, } } diff --git a/src/table/src/metadata.rs b/src/table/src/metadata.rs index 268a5dd777..139112cb84 100644 --- a/src/table/src/metadata.rs +++ b/src/table/src/metadata.rs @@ -416,7 +416,7 @@ impl TryFrom for TableMeta { primary_key_indices: raw.primary_key_indices, value_indices: raw.value_indices, engine: raw.engine, - region_numbers: vec![], + region_numbers: raw.region_numbers, next_column_id: raw.next_column_id, engine_options: raw.engine_options, options: raw.options, diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index 9de1f22777..27ff4375ed 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -28,6 +28,7 @@ pub struct InsertRequest { pub schema_name: String, pub table_name: String, pub columns_values: HashMap, + pub region_number: RegionNumber, } #[derive(Debug, Clone)] @@ -58,7 +59,6 @@ pub struct OpenTableRequest { pub schema_name: String, pub table_name: String, pub table_id: TableId, - pub region_numbers: Vec, } /// Alter table request