mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-18 14:00:39 +00:00
feat: implement OpenTableHandler (#1567)
* feat: implement OpenTableHandler * chore: apply suggestion from CR * chore: apply suggestion from CR
This commit is contained in:
@@ -266,6 +266,7 @@ impl LocalCatalogManager {
|
||||
schema_name: t.schema_name.clone(),
|
||||
table_name: t.table_name.clone(),
|
||||
table_id: t.table_id,
|
||||
region_numbers: vec![0],
|
||||
};
|
||||
let engine = self
|
||||
.engine_manager
|
||||
|
||||
@@ -427,6 +427,7 @@ async fn open_or_create_table(
|
||||
schema_name: schema_name.clone(),
|
||||
table_name: table_name.clone(),
|
||||
table_id,
|
||||
region_numbers: region_numbers.clone(),
|
||||
};
|
||||
let engine =
|
||||
engine_manager
|
||||
|
||||
@@ -93,6 +93,7 @@ impl SystemCatalogTable {
|
||||
schema_name: INFORMATION_SCHEMA_NAME.to_string(),
|
||||
table_name: SYSTEM_CATALOG_TABLE_NAME.to_string(),
|
||||
table_id: SYSTEM_CATALOG_TABLE_ID,
|
||||
region_numbers: vec![0],
|
||||
};
|
||||
let schema = build_system_catalog_schema();
|
||||
let ctx = EngineContext::default();
|
||||
|
||||
@@ -14,32 +14,34 @@
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct RegionIdent {
|
||||
pub catalog: String,
|
||||
pub schema: String,
|
||||
pub table: String,
|
||||
pub table_id: u32,
|
||||
pub engine: String,
|
||||
pub region_number: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
|
||||
pub struct SimpleReply {
|
||||
pub result: bool,
|
||||
pub error: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
#[serde(tag = "type", rename_all = "snake_case")]
|
||||
pub enum Instruction {
|
||||
OpenRegion {
|
||||
catalog: String,
|
||||
schema: String,
|
||||
table: String,
|
||||
table_id: u32,
|
||||
engine: String,
|
||||
region_number: u32,
|
||||
},
|
||||
CloseRegion {
|
||||
catalog: String,
|
||||
schema: String,
|
||||
table: String,
|
||||
table_id: u32,
|
||||
engine: String,
|
||||
region_number: u32,
|
||||
},
|
||||
OpenRegion(RegionIdent),
|
||||
CloseRegion(RegionIdent),
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
|
||||
#[serde(tag = "type", rename_all = "snake_case")]
|
||||
pub enum InstructionReply {
|
||||
OpenRegion { result: bool, error: Option<String> },
|
||||
CloseRegion { result: bool, error: Option<String> },
|
||||
OpenRegion(SimpleReply),
|
||||
CloseRegion(SimpleReply),
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -48,14 +50,14 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_serialize_instruction() {
|
||||
let open_region = Instruction::OpenRegion {
|
||||
let open_region = Instruction::OpenRegion(RegionIdent {
|
||||
catalog: "foo".to_string(),
|
||||
schema: "bar".to_string(),
|
||||
table: "hi".to_string(),
|
||||
table_id: 1024,
|
||||
engine: "mito".to_string(),
|
||||
region_number: 1,
|
||||
};
|
||||
});
|
||||
|
||||
let serialized = serde_json::to_string(&open_region).unwrap();
|
||||
|
||||
@@ -64,14 +66,14 @@ mod tests {
|
||||
serialized
|
||||
);
|
||||
|
||||
let close_region = Instruction::CloseRegion {
|
||||
let close_region = Instruction::CloseRegion(RegionIdent {
|
||||
catalog: "foo".to_string(),
|
||||
schema: "bar".to_string(),
|
||||
table: "hi".to_string(),
|
||||
table_id: 1024,
|
||||
engine: "mito".to_string(),
|
||||
region_number: 1,
|
||||
};
|
||||
});
|
||||
|
||||
let serialized = serde_json::to_string(&close_region).unwrap();
|
||||
|
||||
|
||||
@@ -19,12 +19,41 @@ use common_procedure::ProcedureId;
|
||||
use serde_json::error::Error as JsonError;
|
||||
use snafu::Location;
|
||||
use storage::error::Error as StorageError;
|
||||
use store_api::storage::RegionNumber;
|
||||
use table::error::Error as TableError;
|
||||
|
||||
/// Business error of datanode.
|
||||
#[derive(Debug, Snafu)]
|
||||
#[snafu(visibility(pub))]
|
||||
pub enum Error {
|
||||
#[snafu(display("Failed to check region in table: {}, source: {}", table_name, source))]
|
||||
CheckRegion {
|
||||
table_name: String,
|
||||
#[snafu(backtrace)]
|
||||
source: TableError,
|
||||
region_number: RegionNumber,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to access catalog, source: {}", source))]
|
||||
AccessCatalog {
|
||||
#[snafu(backtrace)]
|
||||
source: catalog::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to open table: {}, source: {}", table_name, source))]
|
||||
OpenTable {
|
||||
table_name: String,
|
||||
#[snafu(backtrace)]
|
||||
source: TableError,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to register table: {}, source: {}", table_name, source))]
|
||||
RegisterTable {
|
||||
table_name: String,
|
||||
#[snafu(backtrace)]
|
||||
source: catalog::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to send message: {err_msg}"))]
|
||||
SendMessage { err_msg: String, location: Location },
|
||||
|
||||
@@ -467,12 +496,16 @@ impl ErrorExt for Error {
|
||||
| ExecuteLogicalPlan { source }
|
||||
| DescribeStatement { source } => source.status_code(),
|
||||
|
||||
OpenTable { source, .. } => source.status_code(),
|
||||
RegisterTable { source, .. } | AccessCatalog { source, .. } => source.status_code(),
|
||||
|
||||
DecodeLogicalPlan { source } => source.status_code(),
|
||||
NewCatalog { source } | RegisterSchema { source } => source.status_code(),
|
||||
FindTable { source, .. } => source.status_code(),
|
||||
CreateTable { source, .. } | GetTable { source, .. } | AlterTable { source, .. } => {
|
||||
source.status_code()
|
||||
}
|
||||
CreateTable { source, .. }
|
||||
| GetTable { source, .. }
|
||||
| AlterTable { source, .. }
|
||||
| CheckRegion { source, .. } => source.status_code(),
|
||||
DropTable { source, .. } => source.status_code(),
|
||||
FlushTable { source, .. } => source.status_code(),
|
||||
|
||||
|
||||
@@ -20,6 +20,7 @@ use common_telemetry::error;
|
||||
use crate::error::Result;
|
||||
use crate::heartbeat::mailbox::{IncomingMessage, MailboxRef};
|
||||
|
||||
pub mod open_region;
|
||||
pub mod parse_mailbox_message;
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
@@ -47,6 +48,10 @@ impl HeartbeatResponseHandlerContext {
|
||||
pub fn is_skip_all(&self) -> bool {
|
||||
self.is_skip_all
|
||||
}
|
||||
|
||||
pub fn finish(&mut self) {
|
||||
self.is_skip_all = true
|
||||
}
|
||||
}
|
||||
|
||||
pub trait HeartbeatResponseHandler: Send + Sync {
|
||||
|
||||
201
src/datanode/src/heartbeat/handler/open_region.rs
Normal file
201
src/datanode/src/heartbeat/handler/open_region.rs
Normal file
@@ -0,0 +1,201 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use catalog::{CatalogManagerRef, RegisterTableRequest};
|
||||
use common_catalog::format_full_table_name;
|
||||
use common_meta::instruction::{Instruction, InstructionReply, RegionIdent, SimpleReply};
|
||||
use common_telemetry::{error, warn};
|
||||
use snafu::ResultExt;
|
||||
use store_api::storage::RegionNumber;
|
||||
use table::engine::manager::TableEngineManagerRef;
|
||||
use table::engine::EngineContext;
|
||||
use table::requests::OpenTableRequest;
|
||||
|
||||
use crate::error::{self, Result};
|
||||
use crate::heartbeat::handler::HeartbeatResponseHandler;
|
||||
use crate::heartbeat::HeartbeatResponseHandlerContext;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct OpenRegionHandler {
|
||||
catalog_manager: CatalogManagerRef,
|
||||
table_engine_manager: TableEngineManagerRef,
|
||||
}
|
||||
|
||||
impl HeartbeatResponseHandler for OpenRegionHandler {
|
||||
fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool {
|
||||
matches!(
|
||||
ctx.incoming_message,
|
||||
Some((_, Instruction::OpenRegion { .. }))
|
||||
)
|
||||
}
|
||||
|
||||
fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> Result<()> {
|
||||
let Some((meta, Instruction::OpenRegion(region_ident))) = ctx.incoming_message.take() else {
|
||||
unreachable!("OpenRegionHandler: should be guarded by 'is_acceptable'");
|
||||
};
|
||||
|
||||
ctx.finish();
|
||||
let mailbox = ctx.mailbox.clone();
|
||||
let self_ref = Arc::new(self.clone());
|
||||
|
||||
common_runtime::spawn_bg(async move {
|
||||
let (engine, request) = OpenRegionHandler::prepare_request(region_ident);
|
||||
let result = self_ref.open_region_inner(engine, request).await;
|
||||
if let Err(e) = mailbox
|
||||
.send((meta, OpenRegionHandler::map_result(result)))
|
||||
.await
|
||||
{
|
||||
error!(e; "Failed to send reply to mailbox");
|
||||
}
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl OpenRegionHandler {
|
||||
fn map_result(result: Result<bool>) -> InstructionReply {
|
||||
result.map_or_else(
|
||||
|error| {
|
||||
InstructionReply::OpenRegion(SimpleReply {
|
||||
result: false,
|
||||
error: Some(error.to_string()),
|
||||
})
|
||||
},
|
||||
|result| {
|
||||
InstructionReply::OpenRegion(SimpleReply {
|
||||
result,
|
||||
error: None,
|
||||
})
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
fn prepare_request(ident: RegionIdent) -> (String, OpenTableRequest) {
|
||||
let RegionIdent {
|
||||
catalog,
|
||||
schema,
|
||||
table,
|
||||
table_id,
|
||||
region_number,
|
||||
engine,
|
||||
} = ident;
|
||||
|
||||
(
|
||||
engine,
|
||||
OpenTableRequest {
|
||||
catalog_name: catalog,
|
||||
schema_name: schema,
|
||||
table_name: table,
|
||||
table_id,
|
||||
region_numbers: vec![region_number],
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
/// Returns true if table has been opened.
|
||||
async fn check_table(
|
||||
&self,
|
||||
catalog_name: &str,
|
||||
schema_name: &str,
|
||||
table_name: &str,
|
||||
region_numbers: &[RegionNumber],
|
||||
) -> Result<bool> {
|
||||
if let Some(table) = self
|
||||
.catalog_manager
|
||||
.table(catalog_name, schema_name, table_name)
|
||||
.await
|
||||
.context(error::AccessCatalogSnafu)?
|
||||
{
|
||||
for r in region_numbers {
|
||||
let region_exist =
|
||||
table
|
||||
.contain_regions(*r)
|
||||
.with_context(|_| error::CheckRegionSnafu {
|
||||
table_name: format_full_table_name(
|
||||
catalog_name,
|
||||
schema_name,
|
||||
table_name,
|
||||
),
|
||||
region_number: *r,
|
||||
})?;
|
||||
if !region_exist {
|
||||
warn!(
|
||||
"Failed to check table: {}, region: {} does not exist",
|
||||
format_full_table_name(catalog_name, schema_name, table_name,),
|
||||
r
|
||||
);
|
||||
return Ok(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
async fn open_region_inner(&self, engine: String, request: OpenTableRequest) -> Result<bool> {
|
||||
let OpenTableRequest {
|
||||
catalog_name,
|
||||
schema_name,
|
||||
table_name,
|
||||
region_numbers,
|
||||
..
|
||||
} = &request;
|
||||
let engine =
|
||||
self.table_engine_manager
|
||||
.engine(&engine)
|
||||
.context(error::TableEngineNotFoundSnafu {
|
||||
engine_name: &engine,
|
||||
})?;
|
||||
let ctx = EngineContext::default();
|
||||
|
||||
if self
|
||||
.check_table(catalog_name, schema_name, table_name, region_numbers)
|
||||
.await?
|
||||
{
|
||||
return Ok(true);
|
||||
}
|
||||
|
||||
if let Some(table) = engine
|
||||
.open_table(&ctx, request.clone())
|
||||
.await
|
||||
.with_context(|_| error::OpenTableSnafu {
|
||||
table_name: format_full_table_name(catalog_name, schema_name, table_name),
|
||||
})?
|
||||
{
|
||||
let request = RegisterTableRequest {
|
||||
catalog: request.catalog_name.clone(),
|
||||
schema: request.schema_name.clone(),
|
||||
table_name: request.table_name.clone(),
|
||||
table_id: request.table_id,
|
||||
table,
|
||||
};
|
||||
self.catalog_manager
|
||||
.register_table(request)
|
||||
.await
|
||||
.with_context(|_| error::RegisterTableSnafu {
|
||||
table_name: format_full_table_name(catalog_name, schema_name, table_name),
|
||||
})?;
|
||||
Ok(true)
|
||||
} else {
|
||||
// Case 1:
|
||||
// TODO(weny): Fix/Cleanup the broken table manifest
|
||||
// The manifest writing operation should be atomic.
|
||||
// Therefore, we won't meet this case, in theory.
|
||||
|
||||
// Case 2: The target region was not found in table meta
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_meta::instruction::InstructionReply;
|
||||
use common_meta::instruction::{InstructionReply, SimpleReply};
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
use crate::heartbeat::mailbox::{HeartbeatMailbox, MessageMeta};
|
||||
@@ -24,10 +24,10 @@ async fn test_heartbeat_mailbox() {
|
||||
let mailbox = HeartbeatMailbox::new(tx);
|
||||
|
||||
let meta = MessageMeta::new_test(1, "test", "foo", "bar");
|
||||
let reply = InstructionReply::OpenRegion {
|
||||
let reply = InstructionReply::OpenRegion(SimpleReply {
|
||||
result: true,
|
||||
error: None,
|
||||
};
|
||||
});
|
||||
mailbox.send((meta.clone(), reply.clone())).await.unwrap();
|
||||
|
||||
let message = rx.recv().await.unwrap();
|
||||
|
||||
@@ -59,6 +59,7 @@ async fn test_open_table() {
|
||||
table_name: test_util::TEST_TABLE_NAME.to_string(),
|
||||
// the test table id is 1
|
||||
table_id: 1,
|
||||
region_numbers: vec![0],
|
||||
};
|
||||
|
||||
let table_ref = TableReference {
|
||||
|
||||
@@ -31,13 +31,13 @@ use object_store::ObjectStore;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use store_api::storage::{
|
||||
ColumnDescriptorBuilder, ColumnFamilyDescriptor, ColumnFamilyDescriptorBuilder, ColumnId,
|
||||
EngineContext as StorageEngineContext, OpenOptions, RowKeyDescriptor, RowKeyDescriptorBuilder,
|
||||
StorageEngine,
|
||||
EngineContext as StorageEngineContext, OpenOptions, RegionNumber, RowKeyDescriptor,
|
||||
RowKeyDescriptorBuilder, StorageEngine,
|
||||
};
|
||||
use table::engine::{
|
||||
region_name, table_dir, EngineContext, TableEngine, TableEngineProcedure, TableReference,
|
||||
};
|
||||
use table::metadata::{TableInfo, TableVersion};
|
||||
use table::metadata::{TableId, TableInfo, TableVersion};
|
||||
use table::requests::{
|
||||
AlterKind, AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest,
|
||||
};
|
||||
@@ -381,11 +381,177 @@ fn validate_create_table_request(request: &CreateTableRequest) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn all_regions_open(table: TableRef, regions: &[RegionNumber]) -> TableResult<bool> {
|
||||
for r in regions {
|
||||
let region_exist = table.contain_regions(*r)?;
|
||||
if !region_exist {
|
||||
return Ok(false);
|
||||
}
|
||||
}
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
impl<S: StorageEngine> MitoEngineInner<S> {
|
||||
async fn open_table(
|
||||
/// Returns Some(table) contains all specific regions
|
||||
fn check_regions(
|
||||
&self,
|
||||
table: TableRef,
|
||||
regions: &[RegionNumber],
|
||||
) -> TableResult<Option<TableRef>> {
|
||||
if all_regions_open(table.clone(), regions)? {
|
||||
// If all regions have been opened
|
||||
Ok(Some(table))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
/// Builds table from scratch.
|
||||
/// Returns None if failed to recover manifest.
|
||||
async fn recover_table(
|
||||
&self,
|
||||
_ctx: &EngineContext,
|
||||
request: OpenTableRequest,
|
||||
) -> TableResult<Option<Arc<MitoTable<S::Region>>>> {
|
||||
let catalog_name = &request.catalog_name;
|
||||
let schema_name = &request.schema_name;
|
||||
let table_name = &request.table_name;
|
||||
let table_ref = TableReference {
|
||||
catalog: catalog_name,
|
||||
schema: schema_name,
|
||||
table: table_name,
|
||||
};
|
||||
|
||||
let table_id = request.table_id;
|
||||
let engine_ctx = StorageEngineContext::default();
|
||||
let table_dir = table_dir(catalog_name, schema_name, table_id);
|
||||
|
||||
let Some((manifest, table_info)) = self
|
||||
.recover_table_manifest_and_info(table_name, &table_dir)
|
||||
.await.map_err(BoxedError::new)
|
||||
.context(table_error::TableOperationSnafu)? else { return Ok(None) };
|
||||
|
||||
let opts = OpenOptions {
|
||||
parent_dir: table_dir.to_string(),
|
||||
write_buffer_size: table_info
|
||||
.meta
|
||||
.options
|
||||
.write_buffer_size
|
||||
.map(|s| s.0 as usize),
|
||||
ttl: table_info.meta.options.ttl,
|
||||
compaction_time_window: table_info.meta.options.compaction_time_window,
|
||||
};
|
||||
|
||||
debug!(
|
||||
"Opening table {}, table info recovered: {:?}",
|
||||
table_id, table_info
|
||||
);
|
||||
|
||||
for target_region in &request.region_numbers {
|
||||
if !table_info.meta.region_numbers.contains(target_region) {
|
||||
table_error::RegionNotFoundSnafu {
|
||||
table: table_ref.to_string(),
|
||||
region: *target_region,
|
||||
}
|
||||
.fail()?
|
||||
}
|
||||
}
|
||||
|
||||
let mut regions = HashMap::with_capacity(table_info.meta.region_numbers.len());
|
||||
|
||||
for region_number in &request.region_numbers {
|
||||
let region = self
|
||||
.open_region(&engine_ctx, table_id, *region_number, &table_ref, &opts)
|
||||
.await?;
|
||||
regions.insert(*region_number, region);
|
||||
}
|
||||
|
||||
let table = Arc::new(MitoTable::new(table_info, regions, manifest));
|
||||
|
||||
Ok(Some(table))
|
||||
}
|
||||
|
||||
async fn open_region(
|
||||
&self,
|
||||
engine_ctx: &StorageEngineContext,
|
||||
table_id: TableId,
|
||||
region_number: RegionNumber,
|
||||
table_ref: &TableReference<'_>,
|
||||
opts: &OpenOptions,
|
||||
) -> TableResult<S::Region> {
|
||||
let region_name = region_name(table_id, region_number);
|
||||
let region = self
|
||||
.storage_engine
|
||||
.open_region(engine_ctx, ®ion_name, opts)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(table_error::TableOperationSnafu)?
|
||||
.with_context(|| RegionNotFoundSnafu {
|
||||
table: format!(
|
||||
"{}.{}.{}",
|
||||
table_ref.catalog, table_ref.schema, table_ref.table
|
||||
),
|
||||
region: region_number,
|
||||
})
|
||||
.map_err(BoxedError::new)
|
||||
.context(table_error::TableOperationSnafu)?;
|
||||
|
||||
Ok(region)
|
||||
}
|
||||
|
||||
/// Loads regions
|
||||
async fn load_missing_regions(
|
||||
&self,
|
||||
_ctx: &EngineContext,
|
||||
table: Arc<MitoTable<S::Region>>,
|
||||
region_numbers: &[RegionNumber],
|
||||
) -> TableResult<()> {
|
||||
let table_info = table.table_info();
|
||||
let catalog = &table_info.catalog_name;
|
||||
let schema = &table_info.schema_name;
|
||||
let name = &table_info.name;
|
||||
let table_id = table_info.ident.table_id;
|
||||
|
||||
let table_dir = table_dir(catalog, schema, table_id);
|
||||
let table_ref = TableReference {
|
||||
catalog,
|
||||
schema,
|
||||
table: name,
|
||||
};
|
||||
|
||||
let opts = OpenOptions {
|
||||
parent_dir: table_dir.to_string(),
|
||||
write_buffer_size: table_info
|
||||
.meta
|
||||
.options
|
||||
.write_buffer_size
|
||||
.map(|s| s.0 as usize),
|
||||
ttl: table_info.meta.options.ttl,
|
||||
compaction_time_window: table_info.meta.options.compaction_time_window,
|
||||
};
|
||||
|
||||
// TODO(weny): Returns an error earlier if the target region does not exist in the meta.
|
||||
for region_number in region_numbers {
|
||||
if table.contain_regions(*region_number)? {
|
||||
continue;
|
||||
}
|
||||
|
||||
let engine_ctx = StorageEngineContext::default();
|
||||
|
||||
let region = self
|
||||
.open_region(&engine_ctx, table_id, *region_number, &table_ref, &opts)
|
||||
.await?;
|
||||
|
||||
table.load_region(*region_number, region).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn open_table(
|
||||
&self,
|
||||
ctx: &EngineContext,
|
||||
request: OpenTableRequest,
|
||||
) -> TableResult<Option<TableRef>> {
|
||||
let catalog_name = &request.catalog_name;
|
||||
let schema_name = &request.schema_name;
|
||||
@@ -397,8 +563,9 @@ impl<S: StorageEngine> MitoEngineInner<S> {
|
||||
};
|
||||
|
||||
if let Some(table) = self.get_table(&table_ref) {
|
||||
// Table has already been opened.
|
||||
return Ok(Some(table));
|
||||
if let Some(table) = self.check_regions(table, &request.region_numbers)? {
|
||||
return Ok(Some(table));
|
||||
}
|
||||
}
|
||||
|
||||
// Acquires the mutex before opening a new table.
|
||||
@@ -407,61 +574,30 @@ impl<S: StorageEngine> MitoEngineInner<S> {
|
||||
let _lock = self.table_mutex.lock(table_name_key.clone()).await;
|
||||
|
||||
// Checks again, read lock should be enough since we are guarded by the mutex.
|
||||
if let Some(table) = self.get_table(&table_ref) {
|
||||
return Ok(Some(table));
|
||||
if let Some(table) = self.get_mito_table(&table_ref) {
|
||||
// Contains all regions or target region
|
||||
if let Some(table) = self.check_regions(table.clone(), &request.region_numbers)? {
|
||||
Some(table)
|
||||
} else {
|
||||
// Loads missing regions
|
||||
// TODO(weny): Supports to load regions
|
||||
self.load_missing_regions(ctx, table.clone(), &request.region_numbers)
|
||||
.await?;
|
||||
|
||||
Some(table as _)
|
||||
}
|
||||
} else {
|
||||
// Builds table from scratch
|
||||
let table = self.recover_table(ctx, request.clone()).await?;
|
||||
if let Some(table) = table {
|
||||
// already locked
|
||||
self.tables.insert(table_ref.to_string(), table.clone());
|
||||
|
||||
Some(table as _)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
let table_id = request.table_id;
|
||||
let engine_ctx = StorageEngineContext::default();
|
||||
let table_dir = table_dir(catalog_name, schema_name, table_id);
|
||||
|
||||
let Some((manifest, table_info)) = self
|
||||
.recover_table_manifest_and_info(table_name, &table_dir)
|
||||
.await.map_err(BoxedError::new)
|
||||
.context(table_error::TableOperationSnafu)? else { return Ok(None) };
|
||||
|
||||
let opts = OpenOptions {
|
||||
parent_dir: table_dir.to_string(),
|
||||
write_buffer_size: table_info
|
||||
.meta
|
||||
.options
|
||||
.write_buffer_size
|
||||
.map(|s| s.0 as usize),
|
||||
ttl: table_info.meta.options.ttl,
|
||||
compaction_time_window: table_info.meta.options.compaction_time_window,
|
||||
};
|
||||
|
||||
debug!(
|
||||
"Opening table {}, table info recovered: {:?}",
|
||||
table_id, table_info
|
||||
);
|
||||
|
||||
let mut regions = HashMap::with_capacity(table_info.meta.region_numbers.len());
|
||||
for region_number in &table_info.meta.region_numbers {
|
||||
let region_name = region_name(table_id, *region_number);
|
||||
let region = self
|
||||
.storage_engine
|
||||
.open_region(&engine_ctx, ®ion_name, &opts)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(table_error::TableOperationSnafu)?
|
||||
.with_context(|| RegionNotFoundSnafu {
|
||||
table: format!(
|
||||
"{}.{}.{}",
|
||||
request.catalog_name, request.schema_name, request.table_name
|
||||
),
|
||||
region: *region_number,
|
||||
})
|
||||
.map_err(BoxedError::new)
|
||||
.context(table_error::TableOperationSnafu)?;
|
||||
regions.insert(*region_number, region);
|
||||
}
|
||||
|
||||
let table = Arc::new(MitoTable::new(table_info, regions, manifest));
|
||||
|
||||
// already locked
|
||||
self.tables.insert(table_ref.to_string(), table.clone());
|
||||
Some(table as _)
|
||||
};
|
||||
|
||||
logging::info!(
|
||||
|
||||
@@ -405,6 +405,70 @@ async fn test_create_if_not_exists() {
|
||||
assert!(matches!(result, Err(e) if format!("{e:?}").contains("Table already exists")));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_open_table_with_region_number() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let ctx = EngineContext::default();
|
||||
let open_req = OpenTableRequest {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: test_util::TABLE_NAME.to_string(),
|
||||
// the test table id is 1
|
||||
table_id: 1,
|
||||
region_numbers: vec![0],
|
||||
};
|
||||
|
||||
let invalid_open_req = OpenTableRequest {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: test_util::TABLE_NAME.to_string(),
|
||||
// the test table id is 1
|
||||
table_id: 1,
|
||||
region_numbers: vec![1],
|
||||
};
|
||||
|
||||
let (_engine, storage_engine, table, object_store, _dir) = {
|
||||
let TestEngineComponents {
|
||||
table_engine,
|
||||
storage_engine,
|
||||
table_ref: table,
|
||||
object_store,
|
||||
dir,
|
||||
..
|
||||
} = test_util::setup_test_engine_and_table().await;
|
||||
|
||||
assert_eq!(MITO_ENGINE, table_engine.name());
|
||||
// Now try to open the table again.
|
||||
let reopened = table_engine
|
||||
.open_table(&ctx, open_req.clone())
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(table.schema(), reopened.schema());
|
||||
|
||||
(table_engine, storage_engine, table, object_store, dir)
|
||||
};
|
||||
|
||||
// Construct a new table engine, and try to open the table.
|
||||
let table_engine = MitoEngine::new(EngineConfig::default(), storage_engine, object_store);
|
||||
|
||||
let region_not_found = table_engine
|
||||
.open_table(&ctx, invalid_open_req.clone())
|
||||
.await
|
||||
.err()
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(region_not_found.to_string(), "Failed to operate table, source: Cannot find region, table: greptime.public.demo, region: 1");
|
||||
|
||||
let reopened = table_engine
|
||||
.open_table(&ctx, open_req.clone())
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(table.schema(), reopened.schema());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_open_table() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
@@ -416,6 +480,7 @@ async fn test_open_table() {
|
||||
table_name: test_util::TABLE_NAME.to_string(),
|
||||
// the test table id is 1
|
||||
table_id: 1,
|
||||
region_numbers: vec![0],
|
||||
};
|
||||
|
||||
let (_engine, storage_engine, table, object_store, _dir) = {
|
||||
@@ -648,6 +713,7 @@ async fn test_alter_rename_table() {
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: new_table_name.to_string(),
|
||||
table_id: 1,
|
||||
region_numbers: vec![0],
|
||||
};
|
||||
|
||||
// test reopen table
|
||||
|
||||
@@ -27,7 +27,7 @@ use common_query::logical_plan::Expr;
|
||||
use common_query::physical_plan::PhysicalPlanRef;
|
||||
use common_recordbatch::error::{ExternalSnafu, Result as RecordBatchResult};
|
||||
use common_recordbatch::{RecordBatch, RecordBatchStream};
|
||||
use common_telemetry::logging;
|
||||
use common_telemetry::{logging, warn};
|
||||
use datatypes::schema::Schema;
|
||||
use futures::task::{Context, Poll};
|
||||
use futures::Stream;
|
||||
@@ -334,6 +334,10 @@ impl<R: Region> Table for MitoTable<R> {
|
||||
})
|
||||
.collect())
|
||||
}
|
||||
|
||||
fn contain_regions(&self, region: RegionNumber) -> TableResult<bool> {
|
||||
Ok(self.regions.contains_key(®ion))
|
||||
}
|
||||
}
|
||||
|
||||
struct ChunkStream {
|
||||
@@ -560,6 +564,18 @@ impl<R: Region> MitoTable<R> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn load_region(&self, region_number: RegionNumber, _region: R) -> TableResult<()> {
|
||||
let info = self.table_info.load_full();
|
||||
|
||||
// TODO(weny): Supports to load the region
|
||||
warn!(
|
||||
"MitoTable try to load region: {} in table: {}",
|
||||
region_number,
|
||||
format!("{}.{}.{}", info.catalog_name, info.schema_name, info.name)
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn info_and_op_for_alter(
|
||||
&self,
|
||||
current_info: &TableInfo,
|
||||
|
||||
@@ -19,6 +19,7 @@ use common_recordbatch::error::Error as RecordBatchError;
|
||||
use datafusion::error::DataFusionError;
|
||||
use datatypes::arrow::error::ArrowError;
|
||||
use snafu::Location;
|
||||
use store_api::storage::RegionNumber;
|
||||
|
||||
use crate::metadata::TableId;
|
||||
|
||||
@@ -28,6 +29,9 @@ pub type Result<T> = std::result::Result<T, Error>;
|
||||
#[derive(Debug, Snafu)]
|
||||
#[snafu(visibility(pub))]
|
||||
pub enum Error {
|
||||
#[snafu(display("Failed to downcast mito table"))]
|
||||
DowncastMitoTable { location: Location },
|
||||
|
||||
#[snafu(display("Datafusion error: {}", source))]
|
||||
Datafusion {
|
||||
source: DataFusionError,
|
||||
@@ -115,6 +119,13 @@ pub enum Error {
|
||||
#[snafu(display("Failed to operate table, source: {}", source))]
|
||||
TableOperation { source: BoxedError },
|
||||
|
||||
#[snafu(display("Cannot find region, table: {}, region: {}", table, region))]
|
||||
RegionNotFound {
|
||||
table: String,
|
||||
region: RegionNumber,
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Unsupported operation: {}", operation))]
|
||||
Unsupported { operation: String },
|
||||
|
||||
@@ -159,9 +170,10 @@ impl ErrorExt for Error {
|
||||
| Error::EngineNotFound { .. }
|
||||
| Error::EngineExist { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
Error::InvalidTable { .. } | Error::MissingTimeIndexColumn { .. } => {
|
||||
StatusCode::Internal
|
||||
}
|
||||
Error::InvalidTable { .. }
|
||||
| Error::MissingTimeIndexColumn { .. }
|
||||
| Error::RegionNotFound { .. }
|
||||
| Error::DowncastMitoTable { .. } => StatusCode::Internal,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -176,6 +176,7 @@ pub struct OpenTableRequest {
|
||||
pub schema_name: String,
|
||||
pub table_name: String,
|
||||
pub table_id: TableId,
|
||||
pub region_numbers: Vec<RegionNumber>,
|
||||
}
|
||||
|
||||
/// Alter table request
|
||||
|
||||
@@ -120,6 +120,14 @@ pub trait Table: Send + Sync {
|
||||
.fail()?
|
||||
}
|
||||
|
||||
/// Return true if contains the region
|
||||
fn contain_regions(&self, _region: RegionNumber) -> Result<bool> {
|
||||
UnsupportedSnafu {
|
||||
operation: "contain_region",
|
||||
}
|
||||
.fail()?
|
||||
}
|
||||
|
||||
/// Get statistics for this table, if available
|
||||
fn statistics(&self) -> Option<TableStatistics> {
|
||||
None
|
||||
|
||||
Reference in New Issue
Block a user