feat: impl recover table info from manifest

This commit is contained in:
Dennis Zhuang
2022-08-10 11:46:45 +08:00
parent 012360676d
commit 3c34392d48
4 changed files with 153 additions and 70 deletions

View File

@@ -8,7 +8,6 @@ use common_error::ext::BoxedError;
use common_telemetry::logging;
use object_store::ObjectStore;
use snafu::{OptionExt, ResultExt};
use store_api::manifest::{action::ProtocolAction, Manifest};
use store_api::storage::{
self, ColumnDescriptorBuilder, ColumnFamilyDescriptor, ColumnFamilyDescriptorBuilder, ColumnId,
CreateOptions, OpenOptions, Region, RegionDescriptorBuilder, RegionId, RegionMeta,
@@ -18,7 +17,7 @@ use table::engine::{EngineContext, TableEngine};
use table::requests::{AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest};
use table::Result as TableResult;
use table::{
metadata::{TableId, TableInfoBuilder, TableMetaBuilder, TableType},
metadata::{TableId, TableInfoBuilder, TableMetaBuilder, TableType, TableVersion},
table::TableRef,
};
use tokio::sync::Mutex;
@@ -28,12 +27,11 @@ use crate::error::{
self, BuildColumnDescriptorSnafu, BuildColumnFamilyDescriptorSnafu, BuildRegionDescriptorSnafu,
BuildRowKeyDescriptorSnafu, MissingTimestampIndexSnafu, Result,
};
use crate::manifest::action::*;
use crate::manifest::TableManifest;
use crate::table::MitoTable;
pub const MITO_ENGINE: &str = "mito";
const INIT_COLUMN_ID: ColumnId = 0;
const INIT_TABLE_VERSION: TableVersion = 0;
#[inline]
fn region_name(id: RegionId) -> String {
@@ -45,11 +43,6 @@ fn table_dir(table_name: &str) -> String {
format!("{}/", table_name)
}
#[inline]
fn table_manifest_dir(table_name: &str) -> String {
format!("{}/manifest/", table_name)
}
/// [TableEngine] implementation.
///
/// About mito <https://en.wikipedia.org/wiki/Alfa_Romeo_MiTo>.
@@ -129,22 +122,6 @@ struct MitoEngineInner<S: StorageEngine> {
table_mutex: Mutex<()>,
}
impl<S: StorageEngine> MitoEngineInner<S> {
fn new(_config: EngineConfig, storage_engine: S, object_store: ObjectStore) -> Self {
Self {
tables: RwLock::new(HashMap::default()),
storage_engine,
object_store,
next_table_id: AtomicU64::new(0),
table_mutex: Mutex::new(()),
}
}
fn next_table_id(&self) -> TableId {
self.next_table_id.fetch_add(1, Ordering::Relaxed)
}
}
fn build_row_key_desc_from_schema(
mut column_id: ColumnId,
request: &CreateTableRequest,
@@ -310,28 +287,17 @@ impl<S: StorageEngine> MitoEngineInner<S> {
let table_info = TableInfoBuilder::new(table_name.clone(), table_meta)
.ident(self.next_table_id())
.table_version(0u64)
.table_version(INIT_TABLE_VERSION)
.table_type(TableType::Base)
.desc(request.desc)
.build()
.context(error::BuildTableInfoSnafu { table_name })?;
let manifest =
TableManifest::new(&table_manifest_dir(table_name), self.object_store.clone());
let table = Arc::new(
MitoTable::create(table_name, table_info, region, self.object_store.clone()).await?,
);
manifest
.update(TableMetaActionList::new(vec![
TableMetaAction::Protocol(ProtocolAction::new()),
TableMetaAction::Change(Box::new(TableChange {
table_info: table_info.clone(),
})),
]))
.await
.context(error::UpdateTableManifestSnafu { table_name })?;
logging::info!("Mito engine created table: {:?}.", table_info);
let table = Arc::new(MitoTable::new(table_info, region, manifest));
logging::info!("Mito engine created table: {:?}.", table.table_info());
self.tables
.write()
@@ -381,25 +347,8 @@ impl<S: StorageEngine> MitoEngineInner<S> {
Some(region) => region,
};
//FIXME(boyan): recover table meta from table manifest
let table_meta = TableMetaBuilder::default()
.schema(region.in_memory_metadata().schema().clone())
.engine(MITO_ENGINE)
.next_column_id(INIT_COLUMN_ID)
.primary_key_indices(Vec::default())
.build()
.context(error::BuildTableMetaSnafu { table_name })?;
let table_info = TableInfoBuilder::new(table_name.clone(), table_meta)
.ident(request.table_id)
.table_version(0u64)
.table_type(TableType::Base)
.build()
.context(error::BuildTableInfoSnafu { table_name })?;
let manifest =
TableManifest::new(&table_manifest_dir(table_name), self.object_store.clone());
let table = Arc::new(MitoTable::new(table_info, region, manifest));
let table =
Arc::new(MitoTable::open(table_name, region, self.object_store.clone()).await?);
self.tables
.write()
@@ -418,6 +367,22 @@ impl<S: StorageEngine> MitoEngineInner<S> {
}
}
impl<S: StorageEngine> MitoEngineInner<S> {
fn new(_config: EngineConfig, storage_engine: S, object_store: ObjectStore) -> Self {
Self {
tables: RwLock::new(HashMap::default()),
storage_engine,
object_store,
next_table_id: AtomicU64::new(0),
table_mutex: Mutex::new(()),
}
}
fn next_table_id(&self) -> TableId {
self.next_table_id.fetch_add(1, Ordering::Relaxed)
}
}
#[cfg(test)]
mod tests {
use common_recordbatch::util;

View File

@@ -106,6 +106,23 @@ pub enum Error {
source: storage::error::Error,
table_name: String,
},
#[snafu(display(
"Failed to scan table metadata from manifest, table: {}, source: {}",
table_name,
source,
))]
ScanTableManifest {
#[snafu(backtrace)]
source: storage::error::Error,
table_name: String,
},
#[snafu(display("Table info not found in manifest, table: {}", table_name))]
TableInfoNotFound {
backtrace: Backtrace,
table_name: String,
},
}
impl From<Error> for table::error::Error {
@@ -132,6 +149,10 @@ impl ErrorExt for Error {
| MissingTimestampIndex { .. } => StatusCode::InvalidArguments,
UpdateTableManifest { .. } => StatusCode::StorageUnavailable,
TableInfoNotFound { .. } => StatusCode::Unexpected,
ScanTableManifest { .. } | UpdateTableManifest { .. } => StatusCode::StorageUnavailable,
}
}

View File

@@ -39,13 +39,6 @@ pub struct TableMetaActionList {
}
impl TableMetaActionList {
pub fn with_action(action: TableMetaAction) -> Self {
Self {
actions: vec![action],
prev_version: 0,
}
}
pub fn new(actions: Vec<TableMetaAction>) -> Self {
Self {
actions,

View File

@@ -8,9 +8,13 @@ use async_trait::async_trait;
use common_query::logical_plan::Expr;
use common_recordbatch::error::{Error as RecordBatchError, Result as RecordBatchResult};
use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream};
use common_telemetry::logging;
use futures::task::{Context, Poll};
use futures::Stream;
use snafu::OptionExt;
use object_store::ObjectStore;
use snafu::{OptionExt, ResultExt};
use store_api::manifest::action::ProtocolAction;
use store_api::manifest::{self, Manifest, ManifestVersion, MetaActionIterator};
use store_api::storage::{
ChunkReader, PutOperation, ReadContext, Region, ScanRequest, SchemaRef, Snapshot, WriteContext,
WriteRequest,
@@ -22,13 +26,22 @@ use table::{
table::Table,
};
use crate::error::{
Result, ScanTableManifestSnafu, TableInfoNotFoundSnafu, UpdateTableManifestSnafu,
};
use crate::manifest::action::*;
use crate::manifest::TableManifest;
#[inline]
fn table_manifest_dir(table_name: &str) -> String {
format!("{}/manifest/", table_name)
}
/// [Table] implementation.
pub struct MitoTable<R: Region> {
_manifest: TableManifest,
table_info: TableInfo,
//TODO(dennis): a table contains multi regions
// TODO(dennis): a table contains multi regions
region: R,
}
@@ -142,11 +155,102 @@ impl Stream for ChunkStream {
}
impl<R: Region> MitoTable<R> {
pub fn new(table_info: TableInfo, region: R, manifest: TableManifest) -> Self {
fn new(table_info: TableInfo, region: R, manifest: TableManifest) -> Self {
Self {
table_info,
region,
_manifest: manifest,
}
}
pub async fn create(
table_name: &str,
table_info: TableInfo,
region: R,
object_store: ObjectStore,
) -> Result<MitoTable<R>> {
let manifest = TableManifest::new(&table_manifest_dir(table_name), object_store);
// TODO(dennis): save manifest version into catalog?
let _manifest_version = manifest
.update(TableMetaActionList::new(vec![
TableMetaAction::Protocol(ProtocolAction::new()),
TableMetaAction::Change(Box::new(TableChange {
table_info: table_info.clone(),
})),
]))
.await
.context(UpdateTableManifestSnafu { table_name })?;
Ok(MitoTable::new(table_info, region, manifest))
}
pub async fn open(
table_name: &str,
region: R,
object_store: ObjectStore,
) -> Result<MitoTable<R>> {
let manifest = TableManifest::new(&table_manifest_dir(table_name), object_store);
let table_info = Self::recover_table_info(table_name, &manifest)
.await?
.context(TableInfoNotFoundSnafu { table_name })?;
Ok(MitoTable::new(table_info, region, manifest))
}
async fn recover_table_info(
table_name: &str,
manifest: &TableManifest,
) -> Result<Option<TableInfo>> {
let (start, end) = Self::manifest_scan_range();
let mut iter = manifest
.scan(start, end)
.await
.context(ScanTableManifestSnafu { table_name })?;
let mut last_manifest_version = manifest::MIN_VERSION;
let mut table_info = None;
while let Some((manifest_version, action_list)) = iter
.next_action()
.await
.context(ScanTableManifestSnafu { table_name })?
{
last_manifest_version = manifest_version;
for action in action_list.actions {
match action {
TableMetaAction::Change(c) => {
table_info = Some(c.table_info);
}
TableMetaAction::Protocol(_) => {}
_ => unimplemented!(),
}
}
}
if table_info.is_some() {
// update manifest state after recovering
let protocol = iter.last_protocol();
manifest.update_state(last_manifest_version + 1, protocol.clone());
}
logging::debug!(
"Recovered table info {:?} for table: {}",
table_info,
table_name
);
Ok(table_info)
}
#[inline]
pub fn table_info(&self) -> &TableInfo {
&self.table_info
}
fn manifest_scan_range() -> (ManifestVersion, ManifestVersion) {
// TODO(dennis): use manifest version in catalog ?
(manifest::MIN_VERSION, manifest::MAX_VERSION)
}
}