diff --git a/src/catalog/src/local/manager.rs b/src/catalog/src/local/manager.rs index 9237881708..1d66e2b961 100644 --- a/src/catalog/src/local/manager.rs +++ b/src/catalog/src/local/manager.rs @@ -266,6 +266,7 @@ impl LocalCatalogManager { schema_name: t.schema_name.clone(), table_name: t.table_name.clone(), table_id: t.table_id, + region_numbers: vec![0], }; let engine = self .engine_manager diff --git a/src/catalog/src/remote/manager.rs b/src/catalog/src/remote/manager.rs index 4fc35e7424..41dbec480c 100644 --- a/src/catalog/src/remote/manager.rs +++ b/src/catalog/src/remote/manager.rs @@ -427,6 +427,7 @@ async fn open_or_create_table( schema_name: schema_name.clone(), table_name: table_name.clone(), table_id, + region_numbers: region_numbers.clone(), }; let engine = engine_manager diff --git a/src/catalog/src/system.rs b/src/catalog/src/system.rs index 7e2c905e53..12a7b81c02 100644 --- a/src/catalog/src/system.rs +++ b/src/catalog/src/system.rs @@ -93,6 +93,7 @@ impl SystemCatalogTable { schema_name: INFORMATION_SCHEMA_NAME.to_string(), table_name: SYSTEM_CATALOG_TABLE_NAME.to_string(), table_id: SYSTEM_CATALOG_TABLE_ID, + region_numbers: vec![0], }; let schema = build_system_catalog_schema(); let ctx = EngineContext::default(); diff --git a/src/common/meta/src/instruction.rs b/src/common/meta/src/instruction.rs index 2c24771436..6ef85bcd25 100644 --- a/src/common/meta/src/instruction.rs +++ b/src/common/meta/src/instruction.rs @@ -14,32 +14,34 @@ use serde::{Deserialize, Serialize}; +#[derive(Debug, Serialize, Deserialize)] +pub struct RegionIdent { + pub catalog: String, + pub schema: String, + pub table: String, + pub table_id: u32, + pub engine: String, + pub region_number: u32, +} + +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] +pub struct SimpleReply { + pub result: bool, + pub error: Option, +} + #[derive(Debug, Serialize, Deserialize)] #[serde(tag = "type", rename_all = "snake_case")] pub enum Instruction { - OpenRegion { - catalog: String, - schema: String, - table: String, - table_id: u32, - engine: String, - region_number: u32, - }, - CloseRegion { - catalog: String, - schema: String, - table: String, - table_id: u32, - engine: String, - region_number: u32, - }, + OpenRegion(RegionIdent), + CloseRegion(RegionIdent), } #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] #[serde(tag = "type", rename_all = "snake_case")] pub enum InstructionReply { - OpenRegion { result: bool, error: Option }, - CloseRegion { result: bool, error: Option }, + OpenRegion(SimpleReply), + CloseRegion(SimpleReply), } #[cfg(test)] @@ -48,14 +50,14 @@ mod tests { #[test] fn test_serialize_instruction() { - let open_region = Instruction::OpenRegion { + let open_region = Instruction::OpenRegion(RegionIdent { catalog: "foo".to_string(), schema: "bar".to_string(), table: "hi".to_string(), table_id: 1024, engine: "mito".to_string(), region_number: 1, - }; + }); let serialized = serde_json::to_string(&open_region).unwrap(); @@ -64,14 +66,14 @@ mod tests { serialized ); - let close_region = Instruction::CloseRegion { + let close_region = Instruction::CloseRegion(RegionIdent { catalog: "foo".to_string(), schema: "bar".to_string(), table: "hi".to_string(), table_id: 1024, engine: "mito".to_string(), region_number: 1, - }; + }); let serialized = serde_json::to_string(&close_region).unwrap(); diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index e0c8e7fcae..0f8198fffe 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -19,12 +19,41 @@ use common_procedure::ProcedureId; use serde_json::error::Error as JsonError; use snafu::Location; use storage::error::Error as StorageError; +use store_api::storage::RegionNumber; use table::error::Error as TableError; /// Business error of datanode. #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { + #[snafu(display("Failed to check region in table: {}, source: {}", table_name, source))] + CheckRegion { + table_name: String, + #[snafu(backtrace)] + source: TableError, + region_number: RegionNumber, + }, + + #[snafu(display("Failed to access catalog, source: {}", source))] + AccessCatalog { + #[snafu(backtrace)] + source: catalog::error::Error, + }, + + #[snafu(display("Failed to open table: {}, source: {}", table_name, source))] + OpenTable { + table_name: String, + #[snafu(backtrace)] + source: TableError, + }, + + #[snafu(display("Failed to register table: {}, source: {}", table_name, source))] + RegisterTable { + table_name: String, + #[snafu(backtrace)] + source: catalog::error::Error, + }, + #[snafu(display("Failed to send message: {err_msg}"))] SendMessage { err_msg: String, location: Location }, @@ -467,12 +496,16 @@ impl ErrorExt for Error { | ExecuteLogicalPlan { source } | DescribeStatement { source } => source.status_code(), + OpenTable { source, .. } => source.status_code(), + RegisterTable { source, .. } | AccessCatalog { source, .. } => source.status_code(), + DecodeLogicalPlan { source } => source.status_code(), NewCatalog { source } | RegisterSchema { source } => source.status_code(), FindTable { source, .. } => source.status_code(), - CreateTable { source, .. } | GetTable { source, .. } | AlterTable { source, .. } => { - source.status_code() - } + CreateTable { source, .. } + | GetTable { source, .. } + | AlterTable { source, .. } + | CheckRegion { source, .. } => source.status_code(), DropTable { source, .. } => source.status_code(), FlushTable { source, .. } => source.status_code(), diff --git a/src/datanode/src/heartbeat/handler.rs b/src/datanode/src/heartbeat/handler.rs index 9804316022..34f4724edf 100644 --- a/src/datanode/src/heartbeat/handler.rs +++ b/src/datanode/src/heartbeat/handler.rs @@ -20,6 +20,7 @@ use common_telemetry::error; use crate::error::Result; use crate::heartbeat::mailbox::{IncomingMessage, MailboxRef}; +pub mod open_region; pub mod parse_mailbox_message; #[cfg(test)] mod tests; @@ -47,6 +48,10 @@ impl HeartbeatResponseHandlerContext { pub fn is_skip_all(&self) -> bool { self.is_skip_all } + + pub fn finish(&mut self) { + self.is_skip_all = true + } } pub trait HeartbeatResponseHandler: Send + Sync { diff --git a/src/datanode/src/heartbeat/handler/open_region.rs b/src/datanode/src/heartbeat/handler/open_region.rs new file mode 100644 index 0000000000..c48a3f8172 --- /dev/null +++ b/src/datanode/src/heartbeat/handler/open_region.rs @@ -0,0 +1,201 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use catalog::{CatalogManagerRef, RegisterTableRequest}; +use common_catalog::format_full_table_name; +use common_meta::instruction::{Instruction, InstructionReply, RegionIdent, SimpleReply}; +use common_telemetry::{error, warn}; +use snafu::ResultExt; +use store_api::storage::RegionNumber; +use table::engine::manager::TableEngineManagerRef; +use table::engine::EngineContext; +use table::requests::OpenTableRequest; + +use crate::error::{self, Result}; +use crate::heartbeat::handler::HeartbeatResponseHandler; +use crate::heartbeat::HeartbeatResponseHandlerContext; + +#[derive(Clone)] +pub struct OpenRegionHandler { + catalog_manager: CatalogManagerRef, + table_engine_manager: TableEngineManagerRef, +} + +impl HeartbeatResponseHandler for OpenRegionHandler { + fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool { + matches!( + ctx.incoming_message, + Some((_, Instruction::OpenRegion { .. })) + ) + } + + fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> Result<()> { + let Some((meta, Instruction::OpenRegion(region_ident))) = ctx.incoming_message.take() else { + unreachable!("OpenRegionHandler: should be guarded by 'is_acceptable'"); + }; + + ctx.finish(); + let mailbox = ctx.mailbox.clone(); + let self_ref = Arc::new(self.clone()); + + common_runtime::spawn_bg(async move { + let (engine, request) = OpenRegionHandler::prepare_request(region_ident); + let result = self_ref.open_region_inner(engine, request).await; + if let Err(e) = mailbox + .send((meta, OpenRegionHandler::map_result(result))) + .await + { + error!(e; "Failed to send reply to mailbox"); + } + }); + Ok(()) + } +} + +impl OpenRegionHandler { + fn map_result(result: Result) -> InstructionReply { + result.map_or_else( + |error| { + InstructionReply::OpenRegion(SimpleReply { + result: false, + error: Some(error.to_string()), + }) + }, + |result| { + InstructionReply::OpenRegion(SimpleReply { + result, + error: None, + }) + }, + ) + } + + fn prepare_request(ident: RegionIdent) -> (String, OpenTableRequest) { + let RegionIdent { + catalog, + schema, + table, + table_id, + region_number, + engine, + } = ident; + + ( + engine, + OpenTableRequest { + catalog_name: catalog, + schema_name: schema, + table_name: table, + table_id, + region_numbers: vec![region_number], + }, + ) + } + + /// Returns true if table has been opened. + async fn check_table( + &self, + catalog_name: &str, + schema_name: &str, + table_name: &str, + region_numbers: &[RegionNumber], + ) -> Result { + if let Some(table) = self + .catalog_manager + .table(catalog_name, schema_name, table_name) + .await + .context(error::AccessCatalogSnafu)? + { + for r in region_numbers { + let region_exist = + table + .contain_regions(*r) + .with_context(|_| error::CheckRegionSnafu { + table_name: format_full_table_name( + catalog_name, + schema_name, + table_name, + ), + region_number: *r, + })?; + if !region_exist { + warn!( + "Failed to check table: {}, region: {} does not exist", + format_full_table_name(catalog_name, schema_name, table_name,), + r + ); + return Ok(false); + } + } + } + Ok(true) + } + + async fn open_region_inner(&self, engine: String, request: OpenTableRequest) -> Result { + let OpenTableRequest { + catalog_name, + schema_name, + table_name, + region_numbers, + .. + } = &request; + let engine = + self.table_engine_manager + .engine(&engine) + .context(error::TableEngineNotFoundSnafu { + engine_name: &engine, + })?; + let ctx = EngineContext::default(); + + if self + .check_table(catalog_name, schema_name, table_name, region_numbers) + .await? + { + return Ok(true); + } + + if let Some(table) = engine + .open_table(&ctx, request.clone()) + .await + .with_context(|_| error::OpenTableSnafu { + table_name: format_full_table_name(catalog_name, schema_name, table_name), + })? + { + let request = RegisterTableRequest { + catalog: request.catalog_name.clone(), + schema: request.schema_name.clone(), + table_name: request.table_name.clone(), + table_id: request.table_id, + table, + }; + self.catalog_manager + .register_table(request) + .await + .with_context(|_| error::RegisterTableSnafu { + table_name: format_full_table_name(catalog_name, schema_name, table_name), + })?; + Ok(true) + } else { + // Case 1: + // TODO(weny): Fix/Cleanup the broken table manifest + // The manifest writing operation should be atomic. + // Therefore, we won't meet this case, in theory. + + // Case 2: The target region was not found in table meta + Ok(false) + } + } +} diff --git a/src/datanode/src/heartbeat/handler/tests.rs b/src/datanode/src/heartbeat/handler/tests.rs index f9e56e2f9f..40708f18b1 100644 --- a/src/datanode/src/heartbeat/handler/tests.rs +++ b/src/datanode/src/heartbeat/handler/tests.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use common_meta::instruction::InstructionReply; +use common_meta::instruction::{InstructionReply, SimpleReply}; use tokio::sync::mpsc; use crate::heartbeat::mailbox::{HeartbeatMailbox, MessageMeta}; @@ -24,10 +24,10 @@ async fn test_heartbeat_mailbox() { let mailbox = HeartbeatMailbox::new(tx); let meta = MessageMeta::new_test(1, "test", "foo", "bar"); - let reply = InstructionReply::OpenRegion { + let reply = InstructionReply::OpenRegion(SimpleReply { result: true, error: None, - }; + }); mailbox.send((meta.clone(), reply.clone())).await.unwrap(); let message = rx.recv().await.unwrap(); diff --git a/src/file-table-engine/src/engine/tests.rs b/src/file-table-engine/src/engine/tests.rs index 7584ce573b..eb98d0fdf5 100644 --- a/src/file-table-engine/src/engine/tests.rs +++ b/src/file-table-engine/src/engine/tests.rs @@ -59,6 +59,7 @@ async fn test_open_table() { table_name: test_util::TEST_TABLE_NAME.to_string(), // the test table id is 1 table_id: 1, + region_numbers: vec![0], }; let table_ref = TableReference { diff --git a/src/mito/src/engine.rs b/src/mito/src/engine.rs index 4eac7a1da2..d644d73f6b 100644 --- a/src/mito/src/engine.rs +++ b/src/mito/src/engine.rs @@ -31,13 +31,13 @@ use object_store::ObjectStore; use snafu::{ensure, OptionExt, ResultExt}; use store_api::storage::{ ColumnDescriptorBuilder, ColumnFamilyDescriptor, ColumnFamilyDescriptorBuilder, ColumnId, - EngineContext as StorageEngineContext, OpenOptions, RowKeyDescriptor, RowKeyDescriptorBuilder, - StorageEngine, + EngineContext as StorageEngineContext, OpenOptions, RegionNumber, RowKeyDescriptor, + RowKeyDescriptorBuilder, StorageEngine, }; use table::engine::{ region_name, table_dir, EngineContext, TableEngine, TableEngineProcedure, TableReference, }; -use table::metadata::{TableInfo, TableVersion}; +use table::metadata::{TableId, TableInfo, TableVersion}; use table::requests::{ AlterKind, AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest, }; @@ -381,11 +381,177 @@ fn validate_create_table_request(request: &CreateTableRequest) -> Result<()> { Ok(()) } +fn all_regions_open(table: TableRef, regions: &[RegionNumber]) -> TableResult { + for r in regions { + let region_exist = table.contain_regions(*r)?; + if !region_exist { + return Ok(false); + } + } + Ok(true) +} + impl MitoEngineInner { - async fn open_table( + /// Returns Some(table) contains all specific regions + fn check_regions( + &self, + table: TableRef, + regions: &[RegionNumber], + ) -> TableResult> { + if all_regions_open(table.clone(), regions)? { + // If all regions have been opened + Ok(Some(table)) + } else { + Ok(None) + } + } + + /// Builds table from scratch. + /// Returns None if failed to recover manifest. + async fn recover_table( &self, _ctx: &EngineContext, request: OpenTableRequest, + ) -> TableResult>>> { + let catalog_name = &request.catalog_name; + let schema_name = &request.schema_name; + let table_name = &request.table_name; + let table_ref = TableReference { + catalog: catalog_name, + schema: schema_name, + table: table_name, + }; + + let table_id = request.table_id; + let engine_ctx = StorageEngineContext::default(); + let table_dir = table_dir(catalog_name, schema_name, table_id); + + let Some((manifest, table_info)) = self + .recover_table_manifest_and_info(table_name, &table_dir) + .await.map_err(BoxedError::new) + .context(table_error::TableOperationSnafu)? else { return Ok(None) }; + + let opts = OpenOptions { + parent_dir: table_dir.to_string(), + write_buffer_size: table_info + .meta + .options + .write_buffer_size + .map(|s| s.0 as usize), + ttl: table_info.meta.options.ttl, + compaction_time_window: table_info.meta.options.compaction_time_window, + }; + + debug!( + "Opening table {}, table info recovered: {:?}", + table_id, table_info + ); + + for target_region in &request.region_numbers { + if !table_info.meta.region_numbers.contains(target_region) { + table_error::RegionNotFoundSnafu { + table: table_ref.to_string(), + region: *target_region, + } + .fail()? + } + } + + let mut regions = HashMap::with_capacity(table_info.meta.region_numbers.len()); + + for region_number in &request.region_numbers { + let region = self + .open_region(&engine_ctx, table_id, *region_number, &table_ref, &opts) + .await?; + regions.insert(*region_number, region); + } + + let table = Arc::new(MitoTable::new(table_info, regions, manifest)); + + Ok(Some(table)) + } + + async fn open_region( + &self, + engine_ctx: &StorageEngineContext, + table_id: TableId, + region_number: RegionNumber, + table_ref: &TableReference<'_>, + opts: &OpenOptions, + ) -> TableResult { + let region_name = region_name(table_id, region_number); + let region = self + .storage_engine + .open_region(engine_ctx, ®ion_name, opts) + .await + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu)? + .with_context(|| RegionNotFoundSnafu { + table: format!( + "{}.{}.{}", + table_ref.catalog, table_ref.schema, table_ref.table + ), + region: region_number, + }) + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu)?; + + Ok(region) + } + + /// Loads regions + async fn load_missing_regions( + &self, + _ctx: &EngineContext, + table: Arc>, + region_numbers: &[RegionNumber], + ) -> TableResult<()> { + let table_info = table.table_info(); + let catalog = &table_info.catalog_name; + let schema = &table_info.schema_name; + let name = &table_info.name; + let table_id = table_info.ident.table_id; + + let table_dir = table_dir(catalog, schema, table_id); + let table_ref = TableReference { + catalog, + schema, + table: name, + }; + + let opts = OpenOptions { + parent_dir: table_dir.to_string(), + write_buffer_size: table_info + .meta + .options + .write_buffer_size + .map(|s| s.0 as usize), + ttl: table_info.meta.options.ttl, + compaction_time_window: table_info.meta.options.compaction_time_window, + }; + + // TODO(weny): Returns an error earlier if the target region does not exist in the meta. + for region_number in region_numbers { + if table.contain_regions(*region_number)? { + continue; + } + + let engine_ctx = StorageEngineContext::default(); + + let region = self + .open_region(&engine_ctx, table_id, *region_number, &table_ref, &opts) + .await?; + + table.load_region(*region_number, region).await?; + } + + Ok(()) + } + + async fn open_table( + &self, + ctx: &EngineContext, + request: OpenTableRequest, ) -> TableResult> { let catalog_name = &request.catalog_name; let schema_name = &request.schema_name; @@ -397,8 +563,9 @@ impl MitoEngineInner { }; if let Some(table) = self.get_table(&table_ref) { - // Table has already been opened. - return Ok(Some(table)); + if let Some(table) = self.check_regions(table, &request.region_numbers)? { + return Ok(Some(table)); + } } // Acquires the mutex before opening a new table. @@ -407,61 +574,30 @@ impl MitoEngineInner { let _lock = self.table_mutex.lock(table_name_key.clone()).await; // Checks again, read lock should be enough since we are guarded by the mutex. - if let Some(table) = self.get_table(&table_ref) { - return Ok(Some(table)); + if let Some(table) = self.get_mito_table(&table_ref) { + // Contains all regions or target region + if let Some(table) = self.check_regions(table.clone(), &request.region_numbers)? { + Some(table) + } else { + // Loads missing regions + // TODO(weny): Supports to load regions + self.load_missing_regions(ctx, table.clone(), &request.region_numbers) + .await?; + + Some(table as _) + } + } else { + // Builds table from scratch + let table = self.recover_table(ctx, request.clone()).await?; + if let Some(table) = table { + // already locked + self.tables.insert(table_ref.to_string(), table.clone()); + + Some(table as _) + } else { + None + } } - - let table_id = request.table_id; - let engine_ctx = StorageEngineContext::default(); - let table_dir = table_dir(catalog_name, schema_name, table_id); - - let Some((manifest, table_info)) = self - .recover_table_manifest_and_info(table_name, &table_dir) - .await.map_err(BoxedError::new) - .context(table_error::TableOperationSnafu)? else { return Ok(None) }; - - let opts = OpenOptions { - parent_dir: table_dir.to_string(), - write_buffer_size: table_info - .meta - .options - .write_buffer_size - .map(|s| s.0 as usize), - ttl: table_info.meta.options.ttl, - compaction_time_window: table_info.meta.options.compaction_time_window, - }; - - debug!( - "Opening table {}, table info recovered: {:?}", - table_id, table_info - ); - - let mut regions = HashMap::with_capacity(table_info.meta.region_numbers.len()); - for region_number in &table_info.meta.region_numbers { - let region_name = region_name(table_id, *region_number); - let region = self - .storage_engine - .open_region(&engine_ctx, ®ion_name, &opts) - .await - .map_err(BoxedError::new) - .context(table_error::TableOperationSnafu)? - .with_context(|| RegionNotFoundSnafu { - table: format!( - "{}.{}.{}", - request.catalog_name, request.schema_name, request.table_name - ), - region: *region_number, - }) - .map_err(BoxedError::new) - .context(table_error::TableOperationSnafu)?; - regions.insert(*region_number, region); - } - - let table = Arc::new(MitoTable::new(table_info, regions, manifest)); - - // already locked - self.tables.insert(table_ref.to_string(), table.clone()); - Some(table as _) }; logging::info!( diff --git a/src/mito/src/engine/tests.rs b/src/mito/src/engine/tests.rs index 369317a304..7ff3fa3223 100644 --- a/src/mito/src/engine/tests.rs +++ b/src/mito/src/engine/tests.rs @@ -405,6 +405,70 @@ async fn test_create_if_not_exists() { assert!(matches!(result, Err(e) if format!("{e:?}").contains("Table already exists"))); } +#[tokio::test] +async fn test_open_table_with_region_number() { + common_telemetry::init_default_ut_logging(); + + let ctx = EngineContext::default(); + let open_req = OpenTableRequest { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + table_name: test_util::TABLE_NAME.to_string(), + // the test table id is 1 + table_id: 1, + region_numbers: vec![0], + }; + + let invalid_open_req = OpenTableRequest { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + table_name: test_util::TABLE_NAME.to_string(), + // the test table id is 1 + table_id: 1, + region_numbers: vec![1], + }; + + let (_engine, storage_engine, table, object_store, _dir) = { + let TestEngineComponents { + table_engine, + storage_engine, + table_ref: table, + object_store, + dir, + .. + } = test_util::setup_test_engine_and_table().await; + + assert_eq!(MITO_ENGINE, table_engine.name()); + // Now try to open the table again. + let reopened = table_engine + .open_table(&ctx, open_req.clone()) + .await + .unwrap() + .unwrap(); + assert_eq!(table.schema(), reopened.schema()); + + (table_engine, storage_engine, table, object_store, dir) + }; + + // Construct a new table engine, and try to open the table. + let table_engine = MitoEngine::new(EngineConfig::default(), storage_engine, object_store); + + let region_not_found = table_engine + .open_table(&ctx, invalid_open_req.clone()) + .await + .err() + .unwrap(); + + assert_eq!(region_not_found.to_string(), "Failed to operate table, source: Cannot find region, table: greptime.public.demo, region: 1"); + + let reopened = table_engine + .open_table(&ctx, open_req.clone()) + .await + .unwrap() + .unwrap(); + assert_eq!(table.schema(), reopened.schema()); +} + #[tokio::test] async fn test_open_table() { common_telemetry::init_default_ut_logging(); @@ -416,6 +480,7 @@ async fn test_open_table() { table_name: test_util::TABLE_NAME.to_string(), // the test table id is 1 table_id: 1, + region_numbers: vec![0], }; let (_engine, storage_engine, table, object_store, _dir) = { @@ -648,6 +713,7 @@ async fn test_alter_rename_table() { schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: new_table_name.to_string(), table_id: 1, + region_numbers: vec![0], }; // test reopen table diff --git a/src/mito/src/table.rs b/src/mito/src/table.rs index 620807b43d..bd655ea029 100644 --- a/src/mito/src/table.rs +++ b/src/mito/src/table.rs @@ -27,7 +27,7 @@ use common_query::logical_plan::Expr; use common_query::physical_plan::PhysicalPlanRef; use common_recordbatch::error::{ExternalSnafu, Result as RecordBatchResult}; use common_recordbatch::{RecordBatch, RecordBatchStream}; -use common_telemetry::logging; +use common_telemetry::{logging, warn}; use datatypes::schema::Schema; use futures::task::{Context, Poll}; use futures::Stream; @@ -334,6 +334,10 @@ impl Table for MitoTable { }) .collect()) } + + fn contain_regions(&self, region: RegionNumber) -> TableResult { + Ok(self.regions.contains_key(®ion)) + } } struct ChunkStream { @@ -560,6 +564,18 @@ impl MitoTable { Ok(()) } + pub async fn load_region(&self, region_number: RegionNumber, _region: R) -> TableResult<()> { + let info = self.table_info.load_full(); + + // TODO(weny): Supports to load the region + warn!( + "MitoTable try to load region: {} in table: {}", + region_number, + format!("{}.{}.{}", info.catalog_name, info.schema_name, info.name) + ); + Ok(()) + } + pub(crate) fn info_and_op_for_alter( &self, current_info: &TableInfo, diff --git a/src/table/src/error.rs b/src/table/src/error.rs index 70c3758ec7..c4f1dc2e2b 100644 --- a/src/table/src/error.rs +++ b/src/table/src/error.rs @@ -19,6 +19,7 @@ use common_recordbatch::error::Error as RecordBatchError; use datafusion::error::DataFusionError; use datatypes::arrow::error::ArrowError; use snafu::Location; +use store_api::storage::RegionNumber; use crate::metadata::TableId; @@ -28,6 +29,9 @@ pub type Result = std::result::Result; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { + #[snafu(display("Failed to downcast mito table"))] + DowncastMitoTable { location: Location }, + #[snafu(display("Datafusion error: {}", source))] Datafusion { source: DataFusionError, @@ -115,6 +119,13 @@ pub enum Error { #[snafu(display("Failed to operate table, source: {}", source))] TableOperation { source: BoxedError }, + #[snafu(display("Cannot find region, table: {}, region: {}", table, region))] + RegionNotFound { + table: String, + region: RegionNumber, + location: Location, + }, + #[snafu(display("Unsupported operation: {}", operation))] Unsupported { operation: String }, @@ -159,9 +170,10 @@ impl ErrorExt for Error { | Error::EngineNotFound { .. } | Error::EngineExist { .. } => StatusCode::InvalidArguments, - Error::InvalidTable { .. } | Error::MissingTimeIndexColumn { .. } => { - StatusCode::Internal - } + Error::InvalidTable { .. } + | Error::MissingTimeIndexColumn { .. } + | Error::RegionNotFound { .. } + | Error::DowncastMitoTable { .. } => StatusCode::Internal, } } diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index 8845284bd6..a383148a79 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -176,6 +176,7 @@ pub struct OpenTableRequest { pub schema_name: String, pub table_name: String, pub table_id: TableId, + pub region_numbers: Vec, } /// Alter table request diff --git a/src/table/src/table.rs b/src/table/src/table.rs index 110113630e..5e96832078 100644 --- a/src/table/src/table.rs +++ b/src/table/src/table.rs @@ -120,6 +120,14 @@ pub trait Table: Send + Sync { .fail()? } + /// Return true if contains the region + fn contain_regions(&self, _region: RegionNumber) -> Result { + UnsupportedSnafu { + operation: "contain_region", + } + .fail()? + } + /// Get statistics for this table, if available fn statistics(&self) -> Option { None