diff --git a/Cargo.lock b/Cargo.lock index 2e2ba3c2fa..5b758f5f4a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2827,6 +2827,7 @@ dependencies = [ "common-query", "common-telemetry", "common-test-util", + "common-time", "datatypes", "futures", "object-store", diff --git a/src/file-table-engine/Cargo.toml b/src/file-table-engine/Cargo.toml index 4781018159..92d72a986c 100644 --- a/src/file-table-engine/Cargo.toml +++ b/src/file-table-engine/Cargo.toml @@ -6,6 +6,7 @@ license.workspace = true [features] default = [] +test = ["common-test-util"] [dependencies] async-trait = "0.1" @@ -14,6 +15,7 @@ common-error = { path = "../common/error" } common-procedure = { path = "../common/procedure" } common-query = { path = "../common/query" } common-telemetry = { path = "../common/telemetry" } +common-time = { path = "../common/time" } datatypes = { path = "../datatypes" } futures.workspace = true object-store = { path = "../object-store" } @@ -25,3 +27,6 @@ store-api = { path = "../store-api" } table = { path = "../table" } common-test-util = { path = "../common/test-util", optional = true } tokio.workspace = true + +[dev-dependencies] +common-test-util = { path = "../common/test-util" } diff --git a/src/file-table-engine/src/config.rs b/src/file-table-engine/src/config.rs new file mode 100644 index 0000000000..8c7cd78a55 --- /dev/null +++ b/src/file-table-engine/src/config.rs @@ -0,0 +1,16 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#[derive(Debug, Clone, Default)] +pub struct EngineConfig {} diff --git a/src/file-table-engine/src/engine.rs b/src/file-table-engine/src/engine.rs index 1879a87156..80bf1928a3 100644 --- a/src/file-table-engine/src/engine.rs +++ b/src/file-table-engine/src/engine.rs @@ -12,7 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod immutable; +#[cfg(test)] +mod tests; + use table::metadata::TableVersion; -pub mod immutable; const INIT_TABLE_VERSION: TableVersion = 0; diff --git a/src/file-table-engine/src/engine/immutable.rs b/src/file-table-engine/src/engine/immutable.rs index 4d8479307a..da97a8f3e0 100644 --- a/src/file-table-engine/src/engine/immutable.rs +++ b/src/file-table-engine/src/engine/immutable.rs @@ -29,6 +29,7 @@ use table::requests::{AlterTableRequest, CreateTableRequest, DropTableRequest, O use table::{error as table_error, Result as TableResult, Table, TableRef}; use tokio::sync::Mutex; +use crate::config::EngineConfig; use crate::engine::INIT_TABLE_VERSION; use crate::error::{ BuildTableInfoSnafu, BuildTableMetaSnafu, DropTableSnafu, InvalidRawSchemaSnafu, Result, @@ -114,6 +115,21 @@ impl TableEngine for ImmutableFileTableEngine { } } +#[cfg(test)] +impl ImmutableFileTableEngine { + pub async fn close_table(&self, table_ref: &TableReference<'_>) -> TableResult<()> { + self.inner.close_table(table_ref).await + } +} + +impl ImmutableFileTableEngine { + pub fn new(config: EngineConfig, object_store: ObjectStore) -> Self { + ImmutableFileTableEngine { + inner: Arc::new(EngineInner::new(config, object_store)), + } + } +} + struct EngineInner { /// All tables opened by the engine. Map key is formatted [TableReference]. /// @@ -127,6 +143,14 @@ struct EngineInner { } impl EngineInner { + pub fn new(_config: EngineConfig, object_store: ObjectStore) -> Self { + EngineInner { + tables: RwLock::new(HashMap::default()), + object_store, + table_mutex: Mutex::new(()), + } + } + async fn create_table( &self, _ctx: &EngineContext, @@ -309,7 +333,7 @@ impl EngineInner { delete_table_manifest( &table_full_name, &table_manifest_dir(&table_dir), - self.object_store.clone(), + &self.object_store, ) .await .map_err(BoxedError::new) @@ -327,7 +351,7 @@ impl EngineInner { async fn close(&self) -> TableResult<()> { let _lock = self.table_mutex.lock().await; - let mut tables = self.tables.write().unwrap().clone(); + let tables = self.tables.read().unwrap().clone(); futures::future::try_join_all(tables.values().map(|t| t.close())) .await @@ -335,7 +359,7 @@ impl EngineInner { .context(table_error::TableOperationSnafu)?; // Releases all closed table - tables.clear(); + self.tables.write().unwrap().clear(); Ok(()) } @@ -353,3 +377,24 @@ impl EngineInner { .await } } + +#[cfg(test)] +impl EngineInner { + pub async fn close_table(&self, table_ref: &TableReference<'_>) -> TableResult<()> { + let full_name = table_ref.to_string(); + + let _lock = self.table_mutex.lock().await; + + if let Some(table) = self.get_table_by_full_name(&full_name) { + table + .close() + .await + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu)?; + } + + self.tables.write().unwrap().remove(&full_name); + + Ok(()) + } +} diff --git a/src/file-table-engine/src/engine/tests.rs b/src/file-table-engine/src/engine/tests.rs new file mode 100644 index 0000000000..a254561a84 --- /dev/null +++ b/src/file-table-engine/src/engine/tests.rs @@ -0,0 +1,189 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::assert_matches::assert_matches; + +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, IMMUTABLE_FILE_ENGINE}; +use table::engine::{EngineContext, TableEngine, TableReference}; +use table::requests::{AlterKind, AlterTableRequest, DropTableRequest, OpenTableRequest}; +use table::{error as table_error, Table}; + +use crate::manifest::immutable::manifest_path; +use crate::table::immutable::ImmutableFileTable; +use crate::test_util::{self, TestEngineComponents, TEST_TABLE_NAME}; + +#[tokio::test] +async fn test_get_table() { + let TestEngineComponents { + table_engine, + table_ref: table, + dir: _dir, + .. + } = test_util::setup_test_engine_and_table("test_get_table").await; + let table_info = table.table_info(); + let table_ref = TableReference { + catalog: &table_info.catalog_name, + schema: &table_info.schema_name, + table: &table_info.name, + }; + + let got = table_engine + .get_table(&EngineContext::default(), &table_ref) + .unwrap() + .unwrap(); + + assert_eq!(table.schema(), got.schema()); +} + +#[tokio::test] +async fn test_open_table() { + common_telemetry::init_default_ut_logging(); + let ctx = EngineContext::default(); + let open_req = OpenTableRequest { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + table_name: test_util::TEST_TABLE_NAME.to_string(), + // the test table id is 1 + table_id: 1, + }; + + let table_ref = TableReference { + catalog: DEFAULT_CATALOG_NAME, + schema: DEFAULT_SCHEMA_NAME, + table: test_util::TEST_TABLE_NAME, + }; + + let TestEngineComponents { + table_engine, + table_ref: table, + dir: _dir, + .. + } = test_util::setup_test_engine_and_table("test_open_table").await; + + assert_eq!(IMMUTABLE_FILE_ENGINE, table_engine.name()); + + table_engine.close_table(&table_ref).await.unwrap(); + + let reopened = table_engine + .open_table(&ctx, open_req.clone()) + .await + .unwrap() + .unwrap(); + + let reopened = reopened + .as_any() + .downcast_ref::() + .unwrap(); + + let left = table.table_info(); + let right = reopened.table_info(); + + // assert recovered table_info is correct + assert_eq!(left, right); +} + +#[tokio::test] +async fn test_close_all_table() { + common_telemetry::init_default_ut_logging(); + + let table_ref = TableReference { + catalog: DEFAULT_CATALOG_NAME, + schema: DEFAULT_SCHEMA_NAME, + table: test_util::TEST_TABLE_NAME, + }; + + let TestEngineComponents { + table_engine, + dir: _dir, + .. + } = test_util::setup_test_engine_and_table("test_close_all_table").await; + + table_engine.close().await.unwrap(); + + let exist = table_engine.table_exists(&EngineContext::default(), &table_ref); + + assert!(!exist); +} + +#[tokio::test] +async fn test_alter_table() { + common_telemetry::init_default_ut_logging(); + let TestEngineComponents { + table_engine, + dir: _dir, + .. + } = test_util::setup_test_engine_and_table("test_alter_table").await; + + let alter_req = AlterTableRequest { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + table_name: TEST_TABLE_NAME.to_string(), + alter_kind: AlterKind::RenameTable { + new_table_name: "foo".to_string(), + }, + }; + + let unsupported = table_engine + .alter_table(&EngineContext::default(), alter_req) + .await + .err() + .unwrap(); + + assert_matches!(unsupported, table_error::Error::Unsupported { .. }) +} + +#[tokio::test] +async fn test_drop_table() { + common_telemetry::init_default_ut_logging(); + + let drop_req = DropTableRequest { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + table_name: TEST_TABLE_NAME.to_string(), + }; + + let TestEngineComponents { + table_engine, + object_store, + dir: _dir, + table_dir, + table_ref: table, + .. + } = test_util::setup_test_engine_and_table("test_drop_table").await; + + let table_info = table.table_info(); + let table_ref = TableReference { + catalog: &table_info.catalog_name, + schema: &table_info.schema_name, + table: &table_info.name, + }; + + let dropped = table_engine + .drop_table(&EngineContext::default(), drop_req) + .await + .unwrap(); + + assert!(dropped); + + let exist = table_engine.table_exists(&EngineContext::default(), &table_ref); + assert!(!exist); + + // check table_dir manifest + let exist = object_store + .is_exist(&manifest_path(&table_dir)) + .await + .unwrap(); + + assert!(!exist); +} diff --git a/src/file-table-engine/src/lib.rs b/src/file-table-engine/src/lib.rs index 8129a144fd..a7ca27665e 100644 --- a/src/file-table-engine/src/lib.rs +++ b/src/file-table-engine/src/lib.rs @@ -12,7 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +#![feature(assert_matches)] + +pub mod config; pub mod engine; pub mod error; pub mod manifest; pub mod table; +#[cfg(any(test, feature = "test"))] +pub(crate) mod test_util; diff --git a/src/file-table-engine/src/manifest/immutable.rs b/src/file-table-engine/src/manifest/immutable.rs index 7be21c90f9..00b91f7e17 100644 --- a/src/file-table-engine/src/manifest/immutable.rs +++ b/src/file-table-engine/src/manifest/immutable.rs @@ -41,14 +41,14 @@ fn decode_metadata(src: &[u8]) -> Result { serde_json::from_slice(src).context(DecodeJsonSnafu) } -fn manifest_path(dir: &str) -> String { +pub fn manifest_path(dir: &str) -> String { format!("{}{}", dir, IMMUTABLE_MANIFEST_FILE) } pub(crate) async fn delete_table_manifest( table_name: &str, dir: &str, - object_store: ObjectStore, + object_store: &ObjectStore, ) -> Result<()> { object_store .delete(&manifest_path(dir)) @@ -91,3 +91,102 @@ pub(crate) async fn read_table_manifest( decode_metadata(&bs) } + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + + use super::*; + use crate::error::Error; + use crate::manifest::table_manifest_dir; + use crate::test_util::{build_test_table_metadata, new_test_object_store, TEST_TABLE_NAME}; + + #[tokio::test] + async fn test_write_table_manifest() { + let (_dir, store) = new_test_object_store("test_write_table_manifest"); + let metadata = build_test_table_metadata(); + + write_table_manifest( + TEST_TABLE_NAME, + &table_manifest_dir(TEST_TABLE_NAME), + &store, + &metadata, + ) + .await + .unwrap(); + + // try to overwrite immutable manifest + let write_immutable = write_table_manifest( + TEST_TABLE_NAME, + &table_manifest_dir(TEST_TABLE_NAME), + &store, + &metadata, + ) + .await + .unwrap_err(); + + assert_matches!(write_immutable, Error::WriteImmutableManifest { .. }) + } + + #[tokio::test] + async fn test_read_table_manifest() { + let (_dir, store) = new_test_object_store("test_read_table_manifest"); + let metadata = build_test_table_metadata(); + + write_table_manifest( + TEST_TABLE_NAME, + &table_manifest_dir(TEST_TABLE_NAME), + &store, + &metadata, + ) + .await + .unwrap(); + + let read = read_table_manifest( + TEST_TABLE_NAME, + &table_manifest_dir(TEST_TABLE_NAME), + &store, + ) + .await + .unwrap(); + + assert_eq!(read, metadata); + } + + #[tokio::test] + async fn test_read_non_exist_table_manifest() { + let (_dir, store) = new_test_object_store("test_read_non_exist_table_manifest"); + let not_fount = read_table_manifest( + TEST_TABLE_NAME, + &table_manifest_dir(TEST_TABLE_NAME), + &store, + ) + .await + .unwrap_err(); + + assert_matches!(not_fount, Error::ReadTableManifest { .. }) + } + + #[tokio::test] + async fn test_delete_table_manifest() { + let (_dir, store) = new_test_object_store("test_delete_table_manifest"); + + let metadata = build_test_table_metadata(); + let table_dir = &table_manifest_dir(TEST_TABLE_NAME); + write_table_manifest(TEST_TABLE_NAME, table_dir, &store, &metadata) + .await + .unwrap(); + + let exist = store.is_exist(&manifest_path(table_dir)).await.unwrap(); + + assert!(exist); + + delete_table_manifest(TEST_TABLE_NAME, table_dir, &store) + .await + .unwrap(); + + let exist = store.is_exist(&manifest_path(table_dir)).await.unwrap(); + + assert!(!exist); + } +} diff --git a/src/file-table-engine/src/test_util.rs b/src/file-table-engine/src/test_util.rs new file mode 100644 index 0000000000..e568763d70 --- /dev/null +++ b/src/file-table-engine/src/test_util.rs @@ -0,0 +1,144 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_catalog::consts::IMMUTABLE_FILE_ENGINE; +use common_test_util::temp_dir::{create_temp_dir, TempDir}; +use datatypes::prelude::ConcreteDataType; +use datatypes::schema::{ColumnSchema, RawSchema, Schema, SchemaBuilder, SchemaRef}; +use object_store::services::Fs; +use object_store::ObjectStore; +use table::engine::{table_dir, EngineContext, TableEngine}; +use table::metadata::{RawTableInfo, TableInfo, TableInfoBuilder, TableMetaBuilder, TableType}; +use table::requests::{CreateTableRequest, TableOptions}; +use table::TableRef; + +use crate::config::EngineConfig; +use crate::engine::immutable::ImmutableFileTableEngine; +use crate::manifest::immutable::ImmutableMetadata; + +pub const TEST_TABLE_NAME: &str = "demo"; + +pub fn new_test_object_store(prefix: &str) -> (TempDir, ObjectStore) { + let dir = create_temp_dir(prefix); + let store_dir = dir.path().to_string_lossy(); + let mut builder = Fs::default(); + builder.root(&store_dir); + (dir, ObjectStore::new(builder).unwrap().finish()) +} + +pub fn test_schema() -> Schema { + let column_schemas = vec![ + ColumnSchema::new("host", ConcreteDataType::string_datatype(), false), + // Nullable value column: cpu + ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true), + // Non-null value column: memory + ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), false), + ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_datatype(common_time::timestamp::TimeUnit::Millisecond), + true, + ) + .with_time_index(true), + ]; + + SchemaBuilder::try_from(column_schemas) + .unwrap() + .build() + .expect("ts must be timestamp column") +} + +pub fn build_test_table_info() -> TableInfo { + let schema = test_schema(); + let table_meta = TableMetaBuilder::new_external_table() + .schema(Arc::new(schema)) + .engine(IMMUTABLE_FILE_ENGINE) + .build() + .unwrap(); + + TableInfoBuilder::new(TEST_TABLE_NAME, table_meta) + .table_version(0) + .table_type(TableType::Base) + .catalog_name("greptime".to_string()) + .schema_name("public".to_string()) + .build() + .unwrap() +} + +pub fn build_test_table_metadata() -> ImmutableMetadata { + let table_info = build_test_table_info(); + ImmutableMetadata { + table_info: RawTableInfo::from(table_info), + version: 0, + } +} + +pub struct TestEngineComponents { + pub table_engine: ImmutableFileTableEngine, + pub table_ref: TableRef, + pub schema_ref: SchemaRef, + pub object_store: ObjectStore, + pub table_dir: String, + pub dir: TempDir, +} + +pub fn new_create_request(schema: SchemaRef) -> CreateTableRequest { + CreateTableRequest { + id: 1, + catalog_name: "greptime".to_string(), + schema_name: "public".to_string(), + table_name: TEST_TABLE_NAME.to_string(), + desc: Some("a test table".to_string()), + schema: RawSchema::from(&*schema), + region_numbers: vec![0], + create_if_not_exists: true, + primary_key_indices: vec![0], + table_options: TableOptions::default(), + engine: IMMUTABLE_FILE_ENGINE.to_string(), + } +} + +pub async fn setup_test_engine_and_table(prefix: &str) -> TestEngineComponents { + let (dir, object_store) = new_test_object_store(prefix); + + let table_engine = ImmutableFileTableEngine::new(EngineConfig::default(), object_store.clone()); + + let schema_ref = Arc::new(test_schema()); + + let table_ref = table_engine + .create_table( + &EngineContext::default(), + new_create_request(schema_ref.clone()), + ) + .await + .unwrap(); + + let table_info = table_ref.table_info(); + + let table_dir = table_dir( + &table_info.catalog_name, + &table_info.schema_name, + table_info.ident.table_id, + ); + + TestEngineComponents { + table_engine, + table_ref, + schema_ref, + object_store, + table_dir, + dir, + } +}