feat: impl table manifest (#157)

* feat: impl TableManifest and refactor table engine, object store etc.

* feat: persist table metadata when creating it

* fix: remove unused file src/storage/src/manifest/impl.rs

* feat: impl recover table info from manifest

* test: add open table test and table manifest test

* fix: resolve CR problems

* fix: compile error and remove region id

* doc: describe parent_dir

* fix: address CR problems

* fix: typo

* Revert "fix: compile error and remove region id"

This reverts commit c14c250f8a.

* fix: compile error and generate region id by table_id and region number
This commit is contained in:
dennis zhuang
2022-08-12 10:47:33 +08:00
committed by GitHub
parent ea40616cfe
commit 41ffbe82f8
40 changed files with 1106 additions and 451 deletions

View File

@@ -4,6 +4,7 @@ version = "0.1.0"
edition = "2021"
[dependencies]
arc-swap = "1.0"
async-stream = "0.3"
async-trait = "0.1"
chrono = { version = "0.4", features = ["serde"] }
@@ -15,6 +16,9 @@ datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git" , b
datatypes = { path = "../datatypes" }
futures = "0.3"
log-store = { path = "../log-store" }
object-store = { path = "../object-store" }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
snafu = { version = "0.7", features = ["backtraces"] }
storage ={ path = "../storage" }
store-api ={ path = "../store-api" }

View File

@@ -0,0 +1,4 @@
//! Table Engine config
#[derive(Debug, Clone, Default)]
pub struct EngineConfig {}

View File

@@ -1,34 +1,52 @@
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::sync::RwLock;
use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_telemetry::logging;
use object_store::ObjectStore;
use snafu::{OptionExt, ResultExt};
use store_api::storage::{
self, ColumnDescriptorBuilder, ColumnFamilyDescriptor, ColumnFamilyDescriptorBuilder, ColumnId,
OpenOptions, Region, RegionDescriptorBuilder, RegionMeta, RowKeyDescriptor,
RowKeyDescriptorBuilder, StorageEngine,
CreateOptions, OpenOptions, Region, RegionDescriptorBuilder, RegionId, RegionMeta,
RowKeyDescriptor, RowKeyDescriptorBuilder, StorageEngine,
};
use table::engine::{EngineContext, TableEngine};
use table::requests::{AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest};
use table::Result as TableResult;
use table::{
metadata::{TableId, TableInfoBuilder, TableMetaBuilder, TableType},
metadata::{TableId, TableInfoBuilder, TableMetaBuilder, TableType, TableVersion},
table::TableRef,
};
use tokio::sync::Mutex;
use crate::config::EngineConfig;
use crate::error::{
self, BuildColumnDescriptorSnafu, BuildColumnFamilyDescriptorSnafu, BuildRegionDescriptorSnafu,
BuildRowKeyDescriptorSnafu, MissingTimestampIndexSnafu, Result,
BuildRowKeyDescriptorSnafu, MissingTimestampIndexSnafu, Result, TableExistsSnafu,
};
use crate::table::MitoTable;
pub const MITO_ENGINE: &str = "mito";
const INIT_COLUMN_ID: ColumnId = 0;
const INIT_TABLE_VERSION: TableVersion = 0;
/// Generate region name in the form of "{TABLE_ID}_{REGION_NUMBER}"
#[inline]
fn region_name(table_id: TableId, n: u32) -> String {
format!("{}_{:010}", table_id, n)
}
#[inline]
fn region_id(table_id: TableId, n: u32) -> RegionId {
(u64::from(table_id) << 32) | u64::from(n)
}
#[inline]
fn table_dir(table_name: &str) -> String {
format!("{}/", table_name)
}
/// [TableEngine] implementation.
///
@@ -40,9 +58,9 @@ pub struct MitoEngine<S: StorageEngine> {
}
impl<S: StorageEngine> MitoEngine<S> {
pub fn new(storage_engine: S) -> Self {
pub fn new(config: EngineConfig, storage_engine: S, object_store: ObjectStore) -> Self {
Self {
inner: Arc::new(MitoEngineInner::new(storage_engine)),
inner: Arc::new(MitoEngineInner::new(config, storage_engine, object_store)),
}
}
}
@@ -100,29 +118,13 @@ struct MitoEngineInner<S: StorageEngine> {
///
/// Writing to `tables` should also hold the `table_mutex`.
tables: RwLock<HashMap<String, TableRef>>,
object_store: ObjectStore,
storage_engine: S,
// FIXME(yingwen): Remove `next_table_id`. Table id should be assigned by other module (maybe catalog).
next_table_id: AtomicU64,
/// Table mutex is used to protect the operations such as creating/opening/closing
/// a table, to avoid things like opening the same table simultaneously.
table_mutex: Mutex<()>,
}
impl<S: StorageEngine> MitoEngineInner<S> {
fn new(storage_engine: S) -> Self {
Self {
tables: RwLock::new(HashMap::default()),
storage_engine,
next_table_id: AtomicU64::new(0),
table_mutex: Mutex::new(()),
}
}
fn next_table_id(&self) -> TableId {
self.next_table_id.fetch_add(1, Ordering::Relaxed)
}
}
fn build_row_key_desc_from_schema(
mut column_id: ColumnId,
request: &CreateTableRequest,
@@ -239,17 +241,25 @@ impl<S: StorageEngine> MitoEngineInner<S> {
let table_name = &request.name;
if let Some(table) = self.get_table(table_name) {
return Ok(table);
if request.create_if_not_exists {
return Ok(table);
} else {
return TableExistsSnafu { table_name }.fail();
}
}
let (next_column_id, default_cf) =
build_column_family_from_request(INIT_COLUMN_ID, &request)?;
let (next_column_id, row_key) = build_row_key_desc_from_schema(next_column_id, &request)?;
// Now we just use table name as region name. TODO(yingwen): Naming pattern of region.
let region_name = table_name.clone();
let table_id = request.id;
// TODO(dennis): supports multi regions;
let region_number = 0;
let region_id = region_id(table_id, region_number);
let region_name = region_name(table_id, region_number);
let region_descriptor = RegionDescriptorBuilder::default()
.id(0)
.id(region_id)
.name(&region_name)
.row_key(row_key)
.default_cf(default_cf)
@@ -262,12 +272,20 @@ impl<S: StorageEngine> MitoEngineInner<S> {
let _lock = self.table_mutex.lock().await;
// Checks again, read lock should be enough since we are guarded by the mutex.
if let Some(table) = self.get_table(table_name) {
return Ok(table);
if request.create_if_not_exists {
return Ok(table);
} else {
return TableExistsSnafu { table_name }.fail();
}
}
let opts = CreateOptions {
parent_dir: table_dir(table_name),
};
let region = self
.storage_engine
.create_region(&storage::EngineContext::default(), region_descriptor)
.create_region(&storage::EngineContext::default(), region_descriptor, &opts)
.await
.map_err(BoxedError::new)
.context(error::CreateRegionSnafu)?;
@@ -282,17 +300,18 @@ impl<S: StorageEngine> MitoEngineInner<S> {
.context(error::BuildTableMetaSnafu { table_name })?;
let table_info = TableInfoBuilder::new(table_name.clone(), table_meta)
.ident(self.next_table_id())
.table_version(0u64)
.ident(table_id)
.table_version(INIT_TABLE_VERSION)
.table_type(TableType::Base)
.desc(request.desc)
.build()
.context(error::BuildTableInfoSnafu { table_name })?;
//TODO(dennis): persist table info to table manifest service.
logging::info!("Mito engine created table: {:?}.", table_info);
let table = Arc::new(
MitoTable::create(table_name, table_info, region, self.object_store.clone()).await?,
);
let table = Arc::new(MitoTable::new(table_info, region));
logging::info!("Mito engine created table: {:?}.", table.table_info());
self.tables
.write()
@@ -323,13 +342,18 @@ impl<S: StorageEngine> MitoEngineInner<S> {
}
let engine_ctx = storage::EngineContext::default();
let opts = OpenOptions::default();
let region_name = table_name;
// Now we just use table name as region name. TODO(yingwen): Naming pattern of region.
let opts = OpenOptions {
parent_dir: table_dir(table_name),
};
let table_id = request.table_id;
// TODO(dennis): supports multi regions;
let region_number = 0;
let region_name = region_name(table_id, region_number);
let region = match self
.storage_engine
.open_region(&engine_ctx, region_name, &opts)
.open_region(&engine_ctx, &region_name, &opts)
.await
.map_err(BoxedError::new)
.context(error::OpenRegionSnafu { region_name })?
@@ -338,23 +362,8 @@ impl<S: StorageEngine> MitoEngineInner<S> {
Some(region) => region,
};
//FIXME(boyan): recover table meta from table manifest
let table_meta = TableMetaBuilder::default()
.schema(region.in_memory_metadata().schema().clone())
.engine(MITO_ENGINE)
.next_column_id(INIT_COLUMN_ID)
.primary_key_indices(Vec::default())
.build()
.context(error::BuildTableMetaSnafu { table_name })?;
let table_info = TableInfoBuilder::new(table_name.clone(), table_meta)
.ident(request.table_id)
.table_version(0u64)
.table_type(TableType::Base)
.build()
.context(error::BuildTableInfoSnafu { table_name })?;
let table = Arc::new(MitoTable::new(table_info, region));
let table =
Arc::new(MitoTable::open(table_name, region, self.object_store.clone()).await?);
self.tables
.write()
@@ -373,16 +382,43 @@ impl<S: StorageEngine> MitoEngineInner<S> {
}
}
impl<S: StorageEngine> MitoEngineInner<S> {
fn new(_config: EngineConfig, storage_engine: S, object_store: ObjectStore) -> Self {
Self {
tables: RwLock::new(HashMap::default()),
storage_engine,
object_store,
table_mutex: Mutex::new(()),
}
}
}
#[cfg(test)]
mod tests {
use common_recordbatch::util;
use datafusion_common::field_util::FieldExt;
use datafusion_common::field_util::SchemaExt;
use datatypes::vectors::*;
use store_api::manifest::Manifest;
use table::requests::InsertRequest;
use super::*;
use crate::table::test_util;
use crate::table::test_util::MockRegion;
#[test]
fn test_region_name() {
assert_eq!("1_0000000000", region_name(1, 0));
assert_eq!("1_0000000001", region_name(1, 1));
assert_eq!("99_0000000100", region_name(99, 100));
assert_eq!("1000_0000009999", region_name(1000, 9999));
}
#[test]
fn test_table_dir() {
assert_eq!("test_table/", table_dir("test_table"));
assert_eq!("demo/", table_dir("demo"));
}
#[tokio::test]
async fn test_create_table_insert_scan() {
@@ -434,6 +470,55 @@ mod tests {
assert_eq!(memories.to_arrow_array(), columns[3]);
}
#[tokio::test]
async fn test_create_if_not_exists() {
common_telemetry::init_default_ut_logging();
let ctx = EngineContext::default();
let (_engine, table_engine, table, _object_store, _dir) =
test_util::setup_mock_engine_and_table().await;
let table = table
.as_any()
.downcast_ref::<MitoTable<MockRegion>>()
.unwrap();
let table_info = table.table_info();
let request = CreateTableRequest {
id: 1,
name: table_info.name.to_string(),
schema: table_info.meta.schema.clone(),
create_if_not_exists: true,
desc: None,
primary_key_indices: Vec::default(),
};
let created_table = table_engine.create_table(&ctx, request).await.unwrap();
assert_eq!(
table_info,
created_table
.as_any()
.downcast_ref::<MitoTable<MockRegion>>()
.unwrap()
.table_info()
);
// test create_if_not_exists=false
let request = CreateTableRequest {
id: 1,
name: table_info.name.to_string(),
schema: table_info.meta.schema.clone(),
create_if_not_exists: false,
desc: None,
primary_key_indices: Vec::default(),
};
let result = table_engine.create_table(&ctx, request).await;
assert!(result.is_err());
assert!(matches!(result, Err(e) if format!("{:?}", e).contains("Table already exists")));
}
#[tokio::test]
async fn test_open_table() {
common_telemetry::init_default_ut_logging();
@@ -443,12 +528,13 @@ mod tests {
catalog_name: String::new(),
schema_name: String::new(),
table_name: test_util::TABLE_NAME.to_string(),
// Currently the first table has id 0.
table_id: 0,
// the test table id is 1
table_id: 1,
};
let (engine, table) = {
let (engine, table_engine, table) = test_util::setup_mock_engine_and_table().await;
let (engine, table, object_store, _dir) = {
let (engine, table_engine, table, object_store, dir) =
test_util::setup_mock_engine_and_table().await;
assert_eq!(MITO_ENGINE, table_engine.name());
// Now try to open the table again.
let reopened = table_engine
@@ -458,16 +544,39 @@ mod tests {
.unwrap();
assert_eq!(table.schema(), reopened.schema());
(engine, table)
(engine, table, object_store, dir)
};
// Construct a new table engine, and try to open the table.
let table_engine = MitoEngine::new(engine);
let table_engine = MitoEngine::new(EngineConfig::default(), engine, object_store);
let reopened = table_engine
.open_table(&ctx, open_req.clone())
.await
.unwrap()
.unwrap();
assert_eq!(table.schema(), reopened.schema());
let table = table
.as_any()
.downcast_ref::<MitoTable<MockRegion>>()
.unwrap();
let reopened = reopened
.as_any()
.downcast_ref::<MitoTable<MockRegion>>()
.unwrap();
// assert recovered table_info is correct
assert_eq!(table.table_info(), reopened.table_info());
assert_eq!(reopened.manifest().last_version(), 1);
}
#[test]
fn test_region_id() {
assert_eq!(1, region_id(0, 1));
assert_eq!(4294967296, region_id(1, 0));
assert_eq!(4294967297, region_id(1, 1));
assert_eq!(4294967396, region_id(1, 100));
assert_eq!(8589934602, region_id(2, 10));
assert_eq!(18446744069414584330, region_id(u32::MAX, 10));
}
}

View File

@@ -95,6 +95,40 @@ pub enum Error {
region_name: String,
backtrace: Backtrace,
},
#[snafu(display(
"Failed to update table metadata to manifest, table: {}, source: {}",
table_name,
source,
))]
UpdateTableManifest {
#[snafu(backtrace)]
source: storage::error::Error,
table_name: String,
},
#[snafu(display(
"Failed to scan table metadata from manifest, table: {}, source: {}",
table_name,
source,
))]
ScanTableManifest {
#[snafu(backtrace)]
source: storage::error::Error,
table_name: String,
},
#[snafu(display("Table info not found in manifest, table: {}", table_name))]
TableInfoNotFound {
backtrace: Backtrace,
table_name: String,
},
#[snafu(display("Table already exists: {}", table_name))]
TableExists {
backtrace: Backtrace,
table_name: String,
},
}
impl From<Error> for table::error::Error {
@@ -118,7 +152,12 @@ impl ErrorExt for Error {
| BuildTableMeta { .. }
| BuildTableInfo { .. }
| BuildRegionDescriptor { .. }
| TableExists { .. }
| MissingTimestampIndex { .. } => StatusCode::InvalidArguments,
TableInfoNotFound { .. } => StatusCode::Unexpected,
ScanTableManifest { .. } | UpdateTableManifest { .. } => StatusCode::StorageUnavailable,
}
}

View File

@@ -1,3 +1,5 @@
pub mod config;
pub mod engine;
pub mod error;
mod manifest;
pub mod table;

View File

@@ -0,0 +1,85 @@
//! Table manifest service
pub mod action;
use storage::manifest::ManifestImpl;
use crate::manifest::action::TableMetaActionList;
pub type TableManifest = ManifestImpl<TableMetaActionList>;
#[cfg(test)]
mod tests {
use storage::manifest::MetaActionIteratorImpl;
use store_api::manifest::action::ProtocolAction;
use store_api::manifest::{Manifest, MetaActionIterator};
use table::metadata::TableInfo;
use super::*;
use crate::manifest::action::{TableChange, TableMetaAction, TableRemove};
use crate::table::test_util;
type TableManifestActionIter = MetaActionIteratorImpl<TableMetaActionList>;
async fn assert_actions(
iter: &mut TableManifestActionIter,
protocol: &ProtocolAction,
table_info: &TableInfo,
) {
match iter.next_action().await.unwrap() {
Some((v, action_list)) => {
assert_eq!(v, 0);
assert_eq!(2, action_list.actions.len());
assert!(
matches!(&action_list.actions[0], TableMetaAction::Protocol(p) if *p == *protocol)
);
assert!(
matches!(&action_list.actions[1], TableMetaAction::Change(c) if c.table_info == *table_info)
);
}
_ => unreachable!(),
}
}
#[tokio::test]
async fn test_table_manifest() {
let (_dir, object_store) = test_util::new_test_object_store("test_table_manifest").await;
let manifest = TableManifest::new("manifest/", object_store);
let mut iter = manifest.scan(0, 100).await.unwrap();
assert!(iter.next_action().await.unwrap().is_none());
let protocol = ProtocolAction::new();
let table_info = test_util::build_test_table_info();
let action_list = TableMetaActionList::new(vec![
TableMetaAction::Protocol(protocol.clone()),
TableMetaAction::Change(Box::new(TableChange {
table_info: table_info.clone(),
})),
]);
assert_eq!(0, manifest.update(action_list).await.unwrap());
let mut iter = manifest.scan(0, 100).await.unwrap();
assert_actions(&mut iter, &protocol, &table_info).await;
assert!(iter.next_action().await.unwrap().is_none());
// update another action
let action_list = TableMetaActionList::new(vec![TableMetaAction::Remove(TableRemove {
table_name: table_info.name.clone(),
table_ident: table_info.ident.clone(),
})]);
assert_eq!(1, manifest.update(action_list).await.unwrap());
let mut iter = manifest.scan(0, 100).await.unwrap();
assert_actions(&mut iter, &protocol, &table_info).await;
match iter.next_action().await.unwrap() {
Some((v, action_list)) => {
assert_eq!(v, 1);
assert_eq!(1, action_list.actions.len());
assert!(matches!(&action_list.actions[0],
TableMetaAction::Remove(r) if r.table_name == table_info.name && r.table_ident == table_info.ident));
}
_ => unreachable!(),
}
}
}

View File

@@ -0,0 +1,154 @@
use std::io::{BufRead, BufReader};
use serde::{Deserialize, Serialize};
use serde_json as json;
use snafu::{ensure, OptionExt, ResultExt};
use storage::error::{
DecodeJsonSnafu, DecodeMetaActionListSnafu, Error as StorageError,
ManifestProtocolForbidReadSnafu, ReadlineSnafu,
};
use storage::manifest::helper;
use store_api::manifest::action::{ProtocolAction, ProtocolVersion, VersionHeader};
use store_api::manifest::ManifestVersion;
use store_api::manifest::MetaAction;
use table::metadata::{TableIdent, TableInfo};
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct TableChange {
pub table_info: TableInfo,
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct TableRemove {
pub table_ident: TableIdent,
pub table_name: String,
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub enum TableMetaAction {
Protocol(ProtocolAction),
// Boxed TableChange to reduce the total size of enum
Change(Box<TableChange>),
Remove(TableRemove),
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct TableMetaActionList {
pub actions: Vec<TableMetaAction>,
pub prev_version: ManifestVersion,
}
impl TableMetaActionList {
pub fn new(actions: Vec<TableMetaAction>) -> Self {
Self {
actions,
prev_version: 0,
}
}
}
impl MetaAction for TableMetaActionList {
type Error = StorageError;
fn set_prev_version(&mut self, version: ManifestVersion) {
self.prev_version = version;
}
fn encode(&self) -> Result<Vec<u8>, Self::Error> {
helper::encode_actions(self.prev_version, &self.actions)
}
/// TODO(dennis): duplicated code with RegionMetaActionList::decode, try to refactor it.
fn decode(
bs: &[u8],
reader_version: ProtocolVersion,
) -> Result<(Self, Option<ProtocolAction>), Self::Error> {
let mut lines = BufReader::new(bs).lines();
let mut action_list = TableMetaActionList {
actions: Vec::default(),
prev_version: 0,
};
{
let first_line = lines
.next()
.with_context(|| DecodeMetaActionListSnafu {
msg: format!(
"Invalid content in manifest: {}",
std::str::from_utf8(bs).unwrap_or("**invalid bytes**")
),
})?
.context(ReadlineSnafu)?;
// Decode prev_version
let v: VersionHeader = json::from_str(&first_line).context(DecodeJsonSnafu)?;
action_list.prev_version = v.prev_version;
}
// Decode actions
let mut protocol_action = None;
let mut actions = Vec::default();
for line in lines {
let line = &line.context(ReadlineSnafu)?;
let action: TableMetaAction = json::from_str(line).context(DecodeJsonSnafu)?;
if let TableMetaAction::Protocol(p) = &action {
ensure!(
p.is_readable(reader_version),
ManifestProtocolForbidReadSnafu {
min_version: p.min_reader_version,
supported_version: reader_version,
}
);
protocol_action = Some(p.clone());
}
actions.push(action);
}
action_list.actions = actions;
Ok((action_list, protocol_action))
}
}
#[cfg(test)]
mod tests {
use common_telemetry::logging;
use super::*;
use crate::table::test_util;
#[test]
fn test_encode_decode_action_list() {
common_telemetry::init_default_ut_logging();
let mut protocol = ProtocolAction::new();
protocol.min_reader_version = 1;
let table_info = test_util::build_test_table_info();
let mut action_list = TableMetaActionList::new(vec![
TableMetaAction::Protocol(protocol.clone()),
TableMetaAction::Change(Box::new(TableChange { table_info })),
]);
action_list.set_prev_version(3);
let bs = action_list.encode().unwrap();
logging::debug!(
"Encoded action list: \r\n{}",
String::from_utf8(bs.clone()).unwrap()
);
let e = TableMetaActionList::decode(&bs, 0);
assert!(e.is_err());
assert_eq!(
"Manifest protocol forbid to read, min_version: 1, supported_version: 0",
format!("{}", e.err().unwrap())
);
let (decode_list, p) = TableMetaActionList::decode(&bs, 1).unwrap();
assert_eq!(decode_list, action_list);
assert_eq!(p.unwrap(), protocol);
}
}

View File

@@ -8,9 +8,13 @@ use async_trait::async_trait;
use common_query::logical_plan::Expr;
use common_recordbatch::error::{Error as RecordBatchError, Result as RecordBatchResult};
use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream};
use common_telemetry::logging;
use futures::task::{Context, Poll};
use futures::Stream;
use snafu::OptionExt;
use object_store::ObjectStore;
use snafu::{OptionExt, ResultExt};
use store_api::manifest::action::ProtocolAction;
use store_api::manifest::{self, Manifest, ManifestVersion, MetaActionIterator};
use store_api::storage::{
ChunkReader, PutOperation, ReadContext, Region, ScanRequest, SchemaRef, Snapshot, WriteContext,
WriteRequest,
@@ -22,10 +26,22 @@ use table::{
table::Table,
};
use crate::error::{
Result, ScanTableManifestSnafu, TableInfoNotFoundSnafu, UpdateTableManifestSnafu,
};
use crate::manifest::action::*;
use crate::manifest::TableManifest;
#[inline]
fn table_manifest_dir(table_name: &str) -> String {
format!("{}/manifest/", table_name)
}
/// [Table] implementation.
pub struct MitoTable<R: Region> {
manifest: TableManifest,
table_info: TableInfo,
//TODO(dennis): a table contains multi regions
// TODO(dennis): a table contains multi regions
region: R,
}
@@ -139,7 +155,118 @@ impl Stream for ChunkStream {
}
impl<R: Region> MitoTable<R> {
pub fn new(table_info: TableInfo, region: R) -> Self {
Self { table_info, region }
fn new(table_info: TableInfo, region: R, manifest: TableManifest) -> Self {
Self {
table_info,
region,
manifest,
}
}
pub async fn create(
table_name: &str,
table_info: TableInfo,
region: R,
object_store: ObjectStore,
) -> Result<MitoTable<R>> {
let manifest = TableManifest::new(&table_manifest_dir(table_name), object_store);
// TODO(dennis): save manifest version into catalog?
let _manifest_version = manifest
.update(TableMetaActionList::new(vec![
TableMetaAction::Protocol(ProtocolAction::new()),
TableMetaAction::Change(Box::new(TableChange {
table_info: table_info.clone(),
})),
]))
.await
.context(UpdateTableManifestSnafu { table_name })?;
Ok(MitoTable::new(table_info, region, manifest))
}
pub async fn open(
table_name: &str,
region: R,
object_store: ObjectStore,
) -> Result<MitoTable<R>> {
let manifest = TableManifest::new(&table_manifest_dir(table_name), object_store);
let table_info = Self::recover_table_info(table_name, &manifest)
.await?
.context(TableInfoNotFoundSnafu { table_name })?;
Ok(MitoTable::new(table_info, region, manifest))
}
async fn recover_table_info(
table_name: &str,
manifest: &TableManifest,
) -> Result<Option<TableInfo>> {
let (start, end) = Self::manifest_scan_range();
let mut iter = manifest
.scan(start, end)
.await
.context(ScanTableManifestSnafu { table_name })?;
let mut last_manifest_version = manifest::MIN_VERSION;
let mut table_info = None;
while let Some((manifest_version, action_list)) = iter
.next_action()
.await
.context(ScanTableManifestSnafu { table_name })?
{
last_manifest_version = manifest_version;
for action in action_list.actions {
match action {
TableMetaAction::Change(c) => {
table_info = Some(c.table_info);
}
TableMetaAction::Protocol(_) => {}
_ => unimplemented!(),
}
}
}
if table_info.is_some() {
// update manifest state after recovering
let protocol = iter.last_protocol();
manifest.update_state(last_manifest_version + 1, protocol.clone());
}
logging::debug!(
"Recovered table info {:?} for table: {}",
table_info,
table_name
);
Ok(table_info)
}
#[inline]
pub fn table_info(&self) -> &TableInfo {
&self.table_info
}
#[inline]
pub fn manifest(&self) -> &TableManifest {
&self.manifest
}
fn manifest_scan_range() -> (ManifestVersion, ManifestVersion) {
// TODO(dennis): use manifest version in catalog ?
(manifest::MIN_VERSION, manifest::MAX_VERSION)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_table_manifest_dir() {
assert_eq!("demo/manifest/", table_manifest_dir("demo"));
assert_eq!("numbers/manifest/", table_manifest_dir("numbers"));
}
}

View File

@@ -6,20 +6,25 @@ use datatypes::prelude::ConcreteDataType;
use datatypes::schema::SchemaRef;
use datatypes::schema::{ColumnSchema, Schema};
use log_store::fs::noop::NoopLogStore;
use storage::config::EngineConfig;
use object_store::{backend::fs::Backend, ObjectStore};
use storage::config::EngineConfig as StorageEngineConfig;
use storage::EngineImpl;
use table::engine::EngineContext;
use table::engine::TableEngine;
use table::metadata::{TableInfo, TableInfoBuilder, TableMetaBuilder, TableType};
use table::requests::CreateTableRequest;
use table::TableRef;
use tempdir::TempDir;
use crate::config::EngineConfig;
use crate::engine::MitoEngine;
use crate::table::test_util::mock_engine::MockEngine;
use crate::engine::MITO_ENGINE;
pub use crate::table::test_util::mock_engine::MockEngine;
pub use crate::table::test_util::mock_engine::MockRegion;
pub const TABLE_NAME: &str = "demo";
fn schema_for_test() -> Schema {
pub fn schema_for_test() -> Schema {
let column_schemas = vec![
ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), true),
ColumnSchema::new("host", ConcreteDataType::string_datatype(), false),
@@ -32,22 +37,47 @@ fn schema_for_test() -> Schema {
pub type MockMitoEngine = MitoEngine<MockEngine>;
pub fn build_test_table_info() -> TableInfo {
let table_meta = TableMetaBuilder::default()
.schema(Arc::new(schema_for_test()))
.engine(MITO_ENGINE)
.next_column_id(1)
.primary_key_indices(vec![0, 1])
.build()
.unwrap();
TableInfoBuilder::new(TABLE_NAME.to_string(), table_meta)
.ident(0)
.table_version(0u64)
.table_type(TableType::Base)
.build()
.unwrap()
}
pub async fn new_test_object_store(prefix: &str) -> (TempDir, ObjectStore) {
let dir = TempDir::new(prefix).unwrap();
let store_dir = dir.path().to_string_lossy();
let accessor = Backend::build().root(&store_dir).finish().await.unwrap();
(dir, ObjectStore::new(accessor))
}
pub async fn setup_test_engine_and_table() -> (
MitoEngine<EngineImpl<NoopLogStore>>,
TableRef,
SchemaRef,
TempDir,
) {
let dir = TempDir::new("setup_test_engine_and_table").unwrap();
let store_dir = dir.path().to_string_lossy();
let (dir, object_store) = new_test_object_store("setup_test_engine_and_table").await;
let table_engine = MitoEngine::new(
EngineConfig::default(),
EngineImpl::new(
EngineConfig::with_store_dir(&store_dir),
StorageEngineConfig::default(),
Arc::new(NoopLogStore::default()),
)
.await
.unwrap(),
object_store.clone(),
),
object_store,
);
let schema = Arc::new(schema_for_test());
@@ -55,6 +85,7 @@ pub async fn setup_test_engine_and_table() -> (
.create_table(
&EngineContext::default(),
CreateTableRequest {
id: 1,
name: TABLE_NAME.to_string(),
desc: Some("a test table".to_string()),
schema: schema.clone(),
@@ -68,15 +99,22 @@ pub async fn setup_test_engine_and_table() -> (
(table_engine, table, schema, dir)
}
pub async fn setup_mock_engine_and_table() -> (MockEngine, MockMitoEngine, TableRef) {
pub async fn setup_mock_engine_and_table(
) -> (MockEngine, MockMitoEngine, TableRef, ObjectStore, TempDir) {
let mock_engine = MockEngine::default();
let table_engine = MitoEngine::new(mock_engine.clone());
let (dir, object_store) = new_test_object_store("setup_mock_engine_and_table").await;
let table_engine = MitoEngine::new(
EngineConfig::default(),
mock_engine.clone(),
object_store.clone(),
);
let schema = Arc::new(schema_for_test());
let table = table_engine
.create_table(
&EngineContext::default(),
CreateTableRequest {
id: 1,
name: TABLE_NAME.to_string(),
desc: None,
schema: schema.clone(),
@@ -87,5 +125,5 @@ pub async fn setup_mock_engine_and_table() -> (MockEngine, MockMitoEngine, Table
.await
.unwrap();
(mock_engine, table_engine, table)
(mock_engine, table_engine, table, object_store, dir)
}

View File

@@ -10,9 +10,9 @@ use common_telemetry::logging;
use storage::metadata::{RegionMetaImpl, RegionMetadataRef};
use storage::write_batch::WriteBatch;
use store_api::storage::{
Chunk, ChunkReader, EngineContext, GetRequest, GetResponse, OpenOptions, ReadContext, Region,
RegionDescriptor, ScanRequest, ScanResponse, SchemaRef, Snapshot, StorageEngine, WriteContext,
WriteResponse,
Chunk, ChunkReader, CreateOptions, EngineContext, GetRequest, GetResponse, OpenOptions,
ReadContext, Region, RegionDescriptor, ScanRequest, ScanResponse, SchemaRef, Snapshot,
StorageEngine, WriteContext, WriteResponse,
};
pub type Result<T> = std::result::Result<T, MockError>;
@@ -153,6 +153,7 @@ impl StorageEngine for MockEngine {
&self,
_ctx: &EngineContext,
descriptor: RegionDescriptor,
_opts: &CreateOptions,
) -> Result<MockRegion> {
logging::info!("Mock engine create region, descriptor: {:?}", descriptor);