From 3c34392d48431c8d2f61ec89d48aaef1f8090ffb Mon Sep 17 00:00:00 2001 From: Dennis Zhuang Date: Wed, 10 Aug 2022 11:46:45 +0800 Subject: [PATCH] feat: impl recover table info from manifest --- src/table-engine/src/engine.rs | 85 ++++++------------ src/table-engine/src/error.rs | 21 +++++ src/table-engine/src/manifest/action.rs | 7 -- src/table-engine/src/table.rs | 110 +++++++++++++++++++++++- 4 files changed, 153 insertions(+), 70 deletions(-) diff --git a/src/table-engine/src/engine.rs b/src/table-engine/src/engine.rs index dbc050cce2..6157fb839e 100644 --- a/src/table-engine/src/engine.rs +++ b/src/table-engine/src/engine.rs @@ -8,7 +8,6 @@ use common_error::ext::BoxedError; use common_telemetry::logging; use object_store::ObjectStore; use snafu::{OptionExt, ResultExt}; -use store_api::manifest::{action::ProtocolAction, Manifest}; use store_api::storage::{ self, ColumnDescriptorBuilder, ColumnFamilyDescriptor, ColumnFamilyDescriptorBuilder, ColumnId, CreateOptions, OpenOptions, Region, RegionDescriptorBuilder, RegionId, RegionMeta, @@ -18,7 +17,7 @@ use table::engine::{EngineContext, TableEngine}; use table::requests::{AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest}; use table::Result as TableResult; use table::{ - metadata::{TableId, TableInfoBuilder, TableMetaBuilder, TableType}, + metadata::{TableId, TableInfoBuilder, TableMetaBuilder, TableType, TableVersion}, table::TableRef, }; use tokio::sync::Mutex; @@ -28,12 +27,11 @@ use crate::error::{ self, BuildColumnDescriptorSnafu, BuildColumnFamilyDescriptorSnafu, BuildRegionDescriptorSnafu, BuildRowKeyDescriptorSnafu, MissingTimestampIndexSnafu, Result, }; -use crate::manifest::action::*; -use crate::manifest::TableManifest; use crate::table::MitoTable; pub const MITO_ENGINE: &str = "mito"; const INIT_COLUMN_ID: ColumnId = 0; +const INIT_TABLE_VERSION: TableVersion = 0; #[inline] fn region_name(id: RegionId) -> String { @@ -45,11 +43,6 @@ fn table_dir(table_name: &str) -> String { format!("{}/", table_name) } -#[inline] -fn table_manifest_dir(table_name: &str) -> String { - format!("{}/manifest/", table_name) -} - /// [TableEngine] implementation. /// /// About mito . @@ -129,22 +122,6 @@ struct MitoEngineInner { table_mutex: Mutex<()>, } -impl MitoEngineInner { - fn new(_config: EngineConfig, storage_engine: S, object_store: ObjectStore) -> Self { - Self { - tables: RwLock::new(HashMap::default()), - storage_engine, - object_store, - next_table_id: AtomicU64::new(0), - table_mutex: Mutex::new(()), - } - } - - fn next_table_id(&self) -> TableId { - self.next_table_id.fetch_add(1, Ordering::Relaxed) - } -} - fn build_row_key_desc_from_schema( mut column_id: ColumnId, request: &CreateTableRequest, @@ -310,28 +287,17 @@ impl MitoEngineInner { let table_info = TableInfoBuilder::new(table_name.clone(), table_meta) .ident(self.next_table_id()) - .table_version(0u64) + .table_version(INIT_TABLE_VERSION) .table_type(TableType::Base) .desc(request.desc) .build() .context(error::BuildTableInfoSnafu { table_name })?; - let manifest = - TableManifest::new(&table_manifest_dir(table_name), self.object_store.clone()); + let table = Arc::new( + MitoTable::create(table_name, table_info, region, self.object_store.clone()).await?, + ); - manifest - .update(TableMetaActionList::new(vec![ - TableMetaAction::Protocol(ProtocolAction::new()), - TableMetaAction::Change(Box::new(TableChange { - table_info: table_info.clone(), - })), - ])) - .await - .context(error::UpdateTableManifestSnafu { table_name })?; - - logging::info!("Mito engine created table: {:?}.", table_info); - - let table = Arc::new(MitoTable::new(table_info, region, manifest)); + logging::info!("Mito engine created table: {:?}.", table.table_info()); self.tables .write() @@ -381,25 +347,8 @@ impl MitoEngineInner { Some(region) => region, }; - //FIXME(boyan): recover table meta from table manifest - let table_meta = TableMetaBuilder::default() - .schema(region.in_memory_metadata().schema().clone()) - .engine(MITO_ENGINE) - .next_column_id(INIT_COLUMN_ID) - .primary_key_indices(Vec::default()) - .build() - .context(error::BuildTableMetaSnafu { table_name })?; - - let table_info = TableInfoBuilder::new(table_name.clone(), table_meta) - .ident(request.table_id) - .table_version(0u64) - .table_type(TableType::Base) - .build() - .context(error::BuildTableInfoSnafu { table_name })?; - let manifest = - TableManifest::new(&table_manifest_dir(table_name), self.object_store.clone()); - - let table = Arc::new(MitoTable::new(table_info, region, manifest)); + let table = + Arc::new(MitoTable::open(table_name, region, self.object_store.clone()).await?); self.tables .write() @@ -418,6 +367,22 @@ impl MitoEngineInner { } } +impl MitoEngineInner { + fn new(_config: EngineConfig, storage_engine: S, object_store: ObjectStore) -> Self { + Self { + tables: RwLock::new(HashMap::default()), + storage_engine, + object_store, + next_table_id: AtomicU64::new(0), + table_mutex: Mutex::new(()), + } + } + + fn next_table_id(&self) -> TableId { + self.next_table_id.fetch_add(1, Ordering::Relaxed) + } +} + #[cfg(test)] mod tests { use common_recordbatch::util; diff --git a/src/table-engine/src/error.rs b/src/table-engine/src/error.rs index bdcd7aa84f..d15fbcdfff 100644 --- a/src/table-engine/src/error.rs +++ b/src/table-engine/src/error.rs @@ -106,6 +106,23 @@ pub enum Error { source: storage::error::Error, table_name: String, }, + + #[snafu(display( + "Failed to scan table metadata from manifest, table: {}, source: {}", + table_name, + source, + ))] + ScanTableManifest { + #[snafu(backtrace)] + source: storage::error::Error, + table_name: String, + }, + + #[snafu(display("Table info not found in manifest, table: {}", table_name))] + TableInfoNotFound { + backtrace: Backtrace, + table_name: String, + }, } impl From for table::error::Error { @@ -132,6 +149,10 @@ impl ErrorExt for Error { | MissingTimestampIndex { .. } => StatusCode::InvalidArguments, UpdateTableManifest { .. } => StatusCode::StorageUnavailable, + + TableInfoNotFound { .. } => StatusCode::Unexpected, + + ScanTableManifest { .. } | UpdateTableManifest { .. } => StatusCode::StorageUnavailable, } } diff --git a/src/table-engine/src/manifest/action.rs b/src/table-engine/src/manifest/action.rs index 4de99a4b22..6434c99e1a 100644 --- a/src/table-engine/src/manifest/action.rs +++ b/src/table-engine/src/manifest/action.rs @@ -39,13 +39,6 @@ pub struct TableMetaActionList { } impl TableMetaActionList { - pub fn with_action(action: TableMetaAction) -> Self { - Self { - actions: vec![action], - prev_version: 0, - } - } - pub fn new(actions: Vec) -> Self { Self { actions, diff --git a/src/table-engine/src/table.rs b/src/table-engine/src/table.rs index 3e0daca73b..a44d9f1d18 100644 --- a/src/table-engine/src/table.rs +++ b/src/table-engine/src/table.rs @@ -8,9 +8,13 @@ use async_trait::async_trait; use common_query::logical_plan::Expr; use common_recordbatch::error::{Error as RecordBatchError, Result as RecordBatchResult}; use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream}; +use common_telemetry::logging; use futures::task::{Context, Poll}; use futures::Stream; -use snafu::OptionExt; +use object_store::ObjectStore; +use snafu::{OptionExt, ResultExt}; +use store_api::manifest::action::ProtocolAction; +use store_api::manifest::{self, Manifest, ManifestVersion, MetaActionIterator}; use store_api::storage::{ ChunkReader, PutOperation, ReadContext, Region, ScanRequest, SchemaRef, Snapshot, WriteContext, WriteRequest, @@ -22,13 +26,22 @@ use table::{ table::Table, }; +use crate::error::{ + Result, ScanTableManifestSnafu, TableInfoNotFoundSnafu, UpdateTableManifestSnafu, +}; +use crate::manifest::action::*; use crate::manifest::TableManifest; +#[inline] +fn table_manifest_dir(table_name: &str) -> String { + format!("{}/manifest/", table_name) +} + /// [Table] implementation. pub struct MitoTable { _manifest: TableManifest, table_info: TableInfo, - //TODO(dennis): a table contains multi regions + // TODO(dennis): a table contains multi regions region: R, } @@ -142,11 +155,102 @@ impl Stream for ChunkStream { } impl MitoTable { - pub fn new(table_info: TableInfo, region: R, manifest: TableManifest) -> Self { + fn new(table_info: TableInfo, region: R, manifest: TableManifest) -> Self { Self { table_info, region, _manifest: manifest, } } + + pub async fn create( + table_name: &str, + table_info: TableInfo, + region: R, + object_store: ObjectStore, + ) -> Result> { + let manifest = TableManifest::new(&table_manifest_dir(table_name), object_store); + + // TODO(dennis): save manifest version into catalog? + let _manifest_version = manifest + .update(TableMetaActionList::new(vec![ + TableMetaAction::Protocol(ProtocolAction::new()), + TableMetaAction::Change(Box::new(TableChange { + table_info: table_info.clone(), + })), + ])) + .await + .context(UpdateTableManifestSnafu { table_name })?; + + Ok(MitoTable::new(table_info, region, manifest)) + } + + pub async fn open( + table_name: &str, + region: R, + object_store: ObjectStore, + ) -> Result> { + let manifest = TableManifest::new(&table_manifest_dir(table_name), object_store); + + let table_info = Self::recover_table_info(table_name, &manifest) + .await? + .context(TableInfoNotFoundSnafu { table_name })?; + + Ok(MitoTable::new(table_info, region, manifest)) + } + + async fn recover_table_info( + table_name: &str, + manifest: &TableManifest, + ) -> Result> { + let (start, end) = Self::manifest_scan_range(); + let mut iter = manifest + .scan(start, end) + .await + .context(ScanTableManifestSnafu { table_name })?; + + let mut last_manifest_version = manifest::MIN_VERSION; + let mut table_info = None; + while let Some((manifest_version, action_list)) = iter + .next_action() + .await + .context(ScanTableManifestSnafu { table_name })? + { + last_manifest_version = manifest_version; + + for action in action_list.actions { + match action { + TableMetaAction::Change(c) => { + table_info = Some(c.table_info); + } + TableMetaAction::Protocol(_) => {} + _ => unimplemented!(), + } + } + } + + if table_info.is_some() { + // update manifest state after recovering + let protocol = iter.last_protocol(); + manifest.update_state(last_manifest_version + 1, protocol.clone()); + } + + logging::debug!( + "Recovered table info {:?} for table: {}", + table_info, + table_name + ); + + Ok(table_info) + } + + #[inline] + pub fn table_info(&self) -> &TableInfo { + &self.table_info + } + + fn manifest_scan_range() -> (ManifestVersion, ManifestVersion) { + // TODO(dennis): use manifest version in catalog ? + (manifest::MIN_VERSION, manifest::MAX_VERSION) + } }