mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-25 09:20:40 +00:00
test: add open table test and table manifest test
This commit is contained in:
@@ -25,7 +25,7 @@ use tokio::sync::Mutex;
|
||||
use crate::config::EngineConfig;
|
||||
use crate::error::{
|
||||
self, BuildColumnDescriptorSnafu, BuildColumnFamilyDescriptorSnafu, BuildRegionDescriptorSnafu,
|
||||
BuildRowKeyDescriptorSnafu, MissingTimestampIndexSnafu, Result,
|
||||
BuildRowKeyDescriptorSnafu, MissingTimestampIndexSnafu, Result, TableExistsSnafu
|
||||
};
|
||||
use crate::table::MitoTable;
|
||||
|
||||
@@ -238,7 +238,11 @@ impl<S: StorageEngine> MitoEngineInner<S> {
|
||||
let table_name = &request.name;
|
||||
|
||||
if let Some(table) = self.get_table(table_name) {
|
||||
return Ok(table);
|
||||
if request.create_if_not_exists {
|
||||
return Ok(table);
|
||||
} else {
|
||||
return TableExistsSnafu { table_name }.fail();
|
||||
}
|
||||
}
|
||||
|
||||
let (next_column_id, default_cf) =
|
||||
@@ -262,7 +266,11 @@ impl<S: StorageEngine> MitoEngineInner<S> {
|
||||
let _lock = self.table_mutex.lock().await;
|
||||
// Checks again, read lock should be enough since we are guarded by the mutex.
|
||||
if let Some(table) = self.get_table(table_name) {
|
||||
return Ok(table);
|
||||
if request.create_if_not_exists {
|
||||
return Ok(table);
|
||||
} else {
|
||||
return TableExistsSnafu { table_name }.fail();
|
||||
}
|
||||
}
|
||||
|
||||
let opts = CreateOptions {
|
||||
@@ -389,10 +397,26 @@ mod tests {
|
||||
use datafusion_common::field_util::FieldExt;
|
||||
use datafusion_common::field_util::SchemaExt;
|
||||
use datatypes::vectors::*;
|
||||
use store_api::manifest::Manifest;
|
||||
use table::requests::InsertRequest;
|
||||
|
||||
use super::*;
|
||||
use crate::table::test_util;
|
||||
use crate::table::test_util::MockRegion;
|
||||
|
||||
#[test]
|
||||
fn test_region_name() {
|
||||
assert_eq!("0000000000", region_name(0));
|
||||
assert_eq!("0000000001", region_name(1));
|
||||
assert_eq!("0000000100", region_name(100));
|
||||
assert_eq!("0000009999", region_name(9999));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_table_dir() {
|
||||
assert_eq!("test_table/", table_dir("test_table"));
|
||||
assert_eq!("demo/", table_dir("demo"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_create_table_insert_scan() {
|
||||
@@ -444,6 +468,53 @@ mod tests {
|
||||
assert_eq!(memories.to_arrow_array(), columns[3]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_create_if_not_exists() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let ctx = EngineContext::default();
|
||||
|
||||
let (_engine, table_engine, table, _object_store, _dir) =
|
||||
test_util::setup_mock_engine_and_table().await;
|
||||
|
||||
let table = table
|
||||
.as_any()
|
||||
.downcast_ref::<MitoTable<MockRegion>>()
|
||||
.unwrap();
|
||||
let table_info = table.table_info();
|
||||
|
||||
let request = CreateTableRequest {
|
||||
name: table_info.name.to_string(),
|
||||
schema: table_info.meta.schema.clone(),
|
||||
create_if_not_exists: true,
|
||||
desc: None,
|
||||
primary_key_indices: Vec::default(),
|
||||
};
|
||||
|
||||
let created_table = table_engine.create_table(&ctx, request).await.unwrap();
|
||||
assert_eq!(
|
||||
table_info,
|
||||
created_table
|
||||
.as_any()
|
||||
.downcast_ref::<MitoTable<MockRegion>>()
|
||||
.unwrap()
|
||||
.table_info()
|
||||
);
|
||||
|
||||
// test create_if_not_exists=false
|
||||
let request = CreateTableRequest {
|
||||
name: table_info.name.to_string(),
|
||||
schema: table_info.meta.schema.clone(),
|
||||
create_if_not_exists: false,
|
||||
desc: None,
|
||||
primary_key_indices: Vec::default(),
|
||||
};
|
||||
|
||||
let result = table_engine.create_table(&ctx, request).await;
|
||||
|
||||
assert!(result.is_err());
|
||||
assert!(matches!(result, Err(e) if format!("{:?}", e).contains("Table already exists")));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_open_table() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
@@ -457,8 +528,8 @@ mod tests {
|
||||
table_id: 0,
|
||||
};
|
||||
|
||||
let (engine, table, object_store) = {
|
||||
let (engine, table_engine, table, object_store) =
|
||||
let (engine, table, object_store, _dir) = {
|
||||
let (engine, table_engine, table, object_store, dir) =
|
||||
test_util::setup_mock_engine_and_table().await;
|
||||
assert_eq!(MITO_ENGINE, table_engine.name());
|
||||
// Now try to open the table again.
|
||||
@@ -469,7 +540,7 @@ mod tests {
|
||||
.unwrap();
|
||||
assert_eq!(table.schema(), reopened.schema());
|
||||
|
||||
(engine, table, object_store)
|
||||
(engine, table, object_store, dir)
|
||||
};
|
||||
|
||||
// Construct a new table engine, and try to open the table.
|
||||
@@ -480,5 +551,17 @@ mod tests {
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(table.schema(), reopened.schema());
|
||||
|
||||
let table = table
|
||||
.as_any()
|
||||
.downcast_ref::<MitoTable<MockRegion>>()
|
||||
.unwrap();
|
||||
let reopened = reopened
|
||||
.as_any()
|
||||
.downcast_ref::<MitoTable<MockRegion>>()
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(table.table_info(), reopened.table_info());
|
||||
assert_eq!(reopened.manifest().last_version(), 1);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -123,6 +123,12 @@ pub enum Error {
|
||||
backtrace: Backtrace,
|
||||
table_name: String,
|
||||
},
|
||||
|
||||
#[snafu(display("Table already exists: {}", table_name))]
|
||||
TableExists {
|
||||
backtrace: Backtrace,
|
||||
table_name: String,
|
||||
},
|
||||
}
|
||||
|
||||
impl From<Error> for table::error::Error {
|
||||
@@ -146,6 +152,7 @@ impl ErrorExt for Error {
|
||||
| BuildTableMeta { .. }
|
||||
| BuildTableInfo { .. }
|
||||
| BuildRegionDescriptor { .. }
|
||||
| TableExists { .. }
|
||||
| MissingTimestampIndex { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
UpdateTableManifest { .. } => StatusCode::StorageUnavailable,
|
||||
|
||||
@@ -8,4 +8,77 @@ use crate::manifest::action::*;
|
||||
pub type TableManifest = ManifestImpl<TableMetaActionList>;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {}
|
||||
mod tests {
|
||||
use storage::manifest::MetaActionIteratorImpl;
|
||||
use store_api::manifest::action::ProtocolAction;
|
||||
use store_api::manifest::{Manifest, MetaActionIterator};
|
||||
use table::metadata::TableInfo;
|
||||
|
||||
use super::*;
|
||||
use crate::table::test_util;
|
||||
type TableManifestActionIter = MetaActionIteratorImpl<TableMetaActionList>;
|
||||
|
||||
async fn assert_actions(
|
||||
iter: &mut TableManifestActionIter,
|
||||
protocol: &ProtocolAction,
|
||||
table_info: &TableInfo,
|
||||
) {
|
||||
match iter.next_action().await.unwrap() {
|
||||
Some((v, action_list)) => {
|
||||
assert_eq!(v, 0);
|
||||
assert_eq!(2, action_list.actions.len());
|
||||
assert!(
|
||||
matches!(&action_list.actions[0], TableMetaAction::Protocol(p) if *p == *protocol)
|
||||
);
|
||||
assert!(
|
||||
matches!(&action_list.actions[1], TableMetaAction::Change(c) if c.table_info == *table_info)
|
||||
);
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_table_manifest() {
|
||||
let (_dir, object_store) = test_util::new_test_object_store("test_table_manifest").await;
|
||||
|
||||
let manifest = TableManifest::new("manifest/", object_store);
|
||||
|
||||
let mut iter = manifest.scan(0, 100).await.unwrap();
|
||||
assert!(iter.next_action().await.unwrap().is_none());
|
||||
|
||||
let protocol = ProtocolAction::new();
|
||||
let table_info = test_util::build_test_table_info();
|
||||
let action_list = TableMetaActionList::new(vec![
|
||||
TableMetaAction::Protocol(protocol.clone()),
|
||||
TableMetaAction::Change(Box::new(TableChange {
|
||||
table_info: table_info.clone(),
|
||||
})),
|
||||
]);
|
||||
|
||||
assert_eq!(0, manifest.update(action_list).await.unwrap());
|
||||
|
||||
let mut iter = manifest.scan(0, 100).await.unwrap();
|
||||
assert_actions(&mut iter, &protocol, &table_info).await;
|
||||
assert!(iter.next_action().await.unwrap().is_none());
|
||||
|
||||
// update another action
|
||||
let action_list = TableMetaActionList::new(vec![TableMetaAction::Remove(TableRemove {
|
||||
table_name: table_info.name.clone(),
|
||||
table_ident: table_info.ident.clone(),
|
||||
})]);
|
||||
assert_eq!(1, manifest.update(action_list).await.unwrap());
|
||||
let mut iter = manifest.scan(0, 100).await.unwrap();
|
||||
assert_actions(&mut iter, &protocol, &table_info).await;
|
||||
|
||||
match iter.next_action().await.unwrap() {
|
||||
Some((v, action_list)) => {
|
||||
assert_eq!(v, 1);
|
||||
assert_eq!(1, action_list.actions.len());
|
||||
assert!(matches!(&action_list.actions[0],
|
||||
TableMetaAction::Remove(r) if r.table_name == table_info.name && r.table_ident == table_info.ident));
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -113,4 +113,42 @@ impl MetaAction for TableMetaActionList {
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {}
|
||||
mod tests {
|
||||
use common_telemetry::logging;
|
||||
|
||||
use super::*;
|
||||
use crate::table::test_util;
|
||||
|
||||
#[test]
|
||||
fn test_encode_decode_action_list() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let mut protocol = ProtocolAction::new();
|
||||
protocol.min_reader_version = 1;
|
||||
|
||||
let table_info = test_util::build_test_table_info();
|
||||
|
||||
let mut action_list = TableMetaActionList::new(vec![
|
||||
TableMetaAction::Protocol(protocol.clone()),
|
||||
TableMetaAction::Change(Box::new(TableChange { table_info })),
|
||||
]);
|
||||
action_list.set_prev_version(3);
|
||||
|
||||
let bs = action_list.encode().unwrap();
|
||||
|
||||
logging::debug!(
|
||||
"Encoded action list: \r\n{}",
|
||||
String::from_utf8(bs.clone()).unwrap()
|
||||
);
|
||||
|
||||
let e = TableMetaActionList::decode(&bs, 0);
|
||||
assert!(e.is_err());
|
||||
assert_eq!(
|
||||
"Manifest protocol forbid to read, min_version: 1, supported_version: 0",
|
||||
format!("{}", e.err().unwrap())
|
||||
);
|
||||
|
||||
let (decode_list, p) = TableMetaActionList::decode(&bs, 1).unwrap();
|
||||
assert_eq!(decode_list, action_list);
|
||||
assert_eq!(p.unwrap(), protocol);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -39,7 +39,7 @@ fn table_manifest_dir(table_name: &str) -> String {
|
||||
|
||||
/// [Table] implementation.
|
||||
pub struct MitoTable<R: Region> {
|
||||
_manifest: TableManifest,
|
||||
manifest: TableManifest,
|
||||
table_info: TableInfo,
|
||||
// TODO(dennis): a table contains multi regions
|
||||
region: R,
|
||||
@@ -159,7 +159,7 @@ impl<R: Region> MitoTable<R> {
|
||||
Self {
|
||||
table_info,
|
||||
region,
|
||||
_manifest: manifest,
|
||||
manifest,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -249,8 +249,24 @@ impl<R: Region> MitoTable<R> {
|
||||
&self.table_info
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn manifest(&self) -> &TableManifest {
|
||||
&self.manifest
|
||||
}
|
||||
|
||||
fn manifest_scan_range() -> (ManifestVersion, ManifestVersion) {
|
||||
// TODO(dennis): use manifest version in catalog ?
|
||||
(manifest::MIN_VERSION, manifest::MAX_VERSION)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_table_manifest_dir() {
|
||||
assert_eq!("demo/manifest/", table_manifest_dir("demo"));
|
||||
assert_eq!("numbers/manifest/", table_manifest_dir("numbers"));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,17 +11,20 @@ use storage::config::EngineConfig as StorageEngineConfig;
|
||||
use storage::EngineImpl;
|
||||
use table::engine::EngineContext;
|
||||
use table::engine::TableEngine;
|
||||
use table::metadata::{TableInfo, TableInfoBuilder, TableMetaBuilder, TableType};
|
||||
use table::requests::CreateTableRequest;
|
||||
use table::TableRef;
|
||||
use tempdir::TempDir;
|
||||
|
||||
use crate::config::EngineConfig;
|
||||
use crate::engine::MitoEngine;
|
||||
use crate::table::test_util::mock_engine::MockEngine;
|
||||
use crate::engine::DEFAULT_ENGINE;
|
||||
pub use crate::table::test_util::mock_engine::MockEngine;
|
||||
pub use crate::table::test_util::mock_engine::MockRegion;
|
||||
|
||||
pub const TABLE_NAME: &str = "demo";
|
||||
|
||||
fn schema_for_test() -> Schema {
|
||||
pub fn schema_for_test() -> Schema {
|
||||
let column_schemas = vec![
|
||||
ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), true),
|
||||
ColumnSchema::new("host", ConcreteDataType::string_datatype(), false),
|
||||
@@ -34,6 +37,23 @@ fn schema_for_test() -> Schema {
|
||||
|
||||
pub type MockMitoEngine = MitoEngine<MockEngine>;
|
||||
|
||||
pub fn build_test_table_info() -> TableInfo {
|
||||
let table_meta = TableMetaBuilder::default()
|
||||
.schema(Arc::new(schema_for_test()))
|
||||
.engine(DEFAULT_ENGINE)
|
||||
.next_column_id(1)
|
||||
.primary_key_indices(vec![0, 1])
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
TableInfoBuilder::new(TABLE_NAME.to_string(), table_meta)
|
||||
.ident(0)
|
||||
.table_version(0u64)
|
||||
.table_type(TableType::Base)
|
||||
.build()
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
pub async fn new_test_object_store(prefix: &str) -> (TempDir, ObjectStore) {
|
||||
let dir = TempDir::new(prefix).unwrap();
|
||||
let store_dir = dir.path().to_string_lossy();
|
||||
@@ -78,9 +98,10 @@ pub async fn setup_test_engine_and_table() -> (
|
||||
(table_engine, table, schema, dir)
|
||||
}
|
||||
|
||||
pub async fn setup_mock_engine_and_table() -> (MockEngine, MockMitoEngine, TableRef, ObjectStore) {
|
||||
pub async fn setup_mock_engine_and_table(
|
||||
) -> (MockEngine, MockMitoEngine, TableRef, ObjectStore, TempDir) {
|
||||
let mock_engine = MockEngine::default();
|
||||
let (_dir, object_store) = new_test_object_store("setup_mock_engine_and_table").await;
|
||||
let (dir, object_store) = new_test_object_store("setup_mock_engine_and_table").await;
|
||||
let table_engine = MitoEngine::new(
|
||||
EngineConfig::default(),
|
||||
mock_engine.clone(),
|
||||
@@ -102,5 +123,5 @@ pub async fn setup_mock_engine_and_table() -> (MockEngine, MockMitoEngine, Table
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
(mock_engine, table_engine, table, object_store)
|
||||
(mock_engine, table_engine, table, object_store, dir)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user