diff --git a/Cargo.lock b/Cargo.lock index cb58cbd242..2e2ba3c2fa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2816,6 +2816,29 @@ dependencies = [ "windows-sys 0.45.0", ] +[[package]] +name = "file-table-engine" +version = "0.1.1" +dependencies = [ + "async-trait", + "common-catalog", + "common-error", + "common-procedure", + "common-query", + "common-telemetry", + "common-test-util", + "datatypes", + "futures", + "object-store", + "serde", + "serde_json", + "snafu", + "storage", + "store-api", + "table", + "tokio", +] + [[package]] name = "filetime" version = "0.2.20" diff --git a/Cargo.toml b/Cargo.toml index b10bc70733..662ad5f50a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,7 @@ members = [ "src/common/time", "src/datanode", "src/datatypes", + "src/file-table-engine", "src/frontend", "src/log-store", "src/meta-client", diff --git a/src/common/catalog/src/consts.rs b/src/common/catalog/src/consts.rs index 2a126a0e36..0a5f9c7d60 100644 --- a/src/common/catalog/src/consts.rs +++ b/src/common/catalog/src/consts.rs @@ -27,3 +27,4 @@ pub const SYSTEM_CATALOG_TABLE_ID: u32 = 0; pub const SCRIPTS_TABLE_ID: u32 = 1; pub const MITO_ENGINE: &str = "mito"; +pub const IMMUTABLE_FILE_ENGINE: &str = "file"; diff --git a/src/file-table-engine/Cargo.toml b/src/file-table-engine/Cargo.toml new file mode 100644 index 0000000000..4781018159 --- /dev/null +++ b/src/file-table-engine/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "file-table-engine" +version.workspace = true +edition.workspace = true +license.workspace = true + +[features] +default = [] + +[dependencies] +async-trait = "0.1" +common-catalog = { path = "../common/catalog" } +common-error = { path = "../common/error" } +common-procedure = { path = "../common/procedure" } +common-query = { path = "../common/query" } +common-telemetry = { path = "../common/telemetry" } +datatypes = { path = "../datatypes" } +futures.workspace = true +object-store = { path = "../object-store" } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +snafu.workspace = true +storage = { path = "../storage" } +store-api = { path = "../store-api" } +table = { path = "../table" } +common-test-util = { path = "../common/test-util", optional = true } +tokio.workspace = true diff --git a/src/file-table-engine/src/engine.rs b/src/file-table-engine/src/engine.rs new file mode 100644 index 0000000000..1879a87156 --- /dev/null +++ b/src/file-table-engine/src/engine.rs @@ -0,0 +1,18 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use table::metadata::TableVersion; + +pub mod immutable; +const INIT_TABLE_VERSION: TableVersion = 0; diff --git a/src/file-table-engine/src/engine/immutable.rs b/src/file-table-engine/src/engine/immutable.rs new file mode 100644 index 0000000000..4d8479307a --- /dev/null +++ b/src/file-table-engine/src/engine/immutable.rs @@ -0,0 +1,355 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::{Arc, RwLock}; + +use async_trait::async_trait; +use common_catalog::consts::IMMUTABLE_FILE_ENGINE; +use common_error::prelude::BoxedError; +use common_telemetry::{debug, logging}; +use datatypes::schema::Schema; +use object_store::ObjectStore; +use snafu::ResultExt; +use table::engine::{table_dir, EngineContext, TableEngine, TableReference}; +use table::error::TableOperationSnafu; +use table::metadata::{TableInfo, TableInfoBuilder, TableMetaBuilder, TableType}; +use table::requests::{AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest}; +use table::{error as table_error, Result as TableResult, Table, TableRef}; +use tokio::sync::Mutex; + +use crate::engine::INIT_TABLE_VERSION; +use crate::error::{ + BuildTableInfoSnafu, BuildTableMetaSnafu, DropTableSnafu, InvalidRawSchemaSnafu, Result, + TableExistsSnafu, +}; +use crate::manifest::immutable::{delete_table_manifest, ImmutableMetadata}; +use crate::manifest::table_manifest_dir; +use crate::table::immutable::{ImmutableFileTable, ImmutableFileTableRef}; + +/// [TableEngine] implementation. +#[derive(Clone)] +pub struct ImmutableFileTableEngine { + inner: Arc, +} + +#[async_trait] +impl TableEngine for ImmutableFileTableEngine { + fn name(&self) -> &str { + IMMUTABLE_FILE_ENGINE + } + + async fn create_table( + &self, + ctx: &EngineContext, + request: CreateTableRequest, + ) -> TableResult { + self.inner + .create_table(ctx, request) + .await + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu) + } + + async fn open_table( + &self, + ctx: &EngineContext, + request: OpenTableRequest, + ) -> TableResult> { + self.inner + .open_table(ctx, request) + .await + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu) + } + + async fn alter_table( + &self, + _ctx: &EngineContext, + _req: AlterTableRequest, + ) -> TableResult { + table_error::UnsupportedSnafu { + operation: "ALTER TABLE", + } + .fail() + } + + fn get_table( + &self, + _ctx: &EngineContext, + table_ref: &TableReference, + ) -> TableResult> { + Ok(self.inner.get_table(table_ref)) + } + + fn table_exists(&self, _ctx: &EngineContext, table_ref: &TableReference) -> bool { + self.inner.get_table(table_ref).is_some() + } + + async fn drop_table( + &self, + _ctx: &EngineContext, + request: DropTableRequest, + ) -> TableResult { + self.inner + .drop_table(request) + .await + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu) + } + + async fn close(&self) -> TableResult<()> { + self.inner.close().await + } +} + +struct EngineInner { + /// All tables opened by the engine. Map key is formatted [TableReference]. + /// + /// Writing to `tables` should also hold the `table_mutex`. + tables: RwLock>, + object_store: ObjectStore, + + /// Table mutex is used to protect the operations such as creating/opening/closing + /// a table, to avoid things like opening the same table simultaneously. + table_mutex: Mutex<()>, +} + +impl EngineInner { + async fn create_table( + &self, + _ctx: &EngineContext, + request: CreateTableRequest, + ) -> Result { + let CreateTableRequest { + catalog_name, + schema_name, + table_name, + create_if_not_exists, + table_options, + .. + } = request; + let table_ref = TableReference { + catalog: &catalog_name, + schema: &schema_name, + table: &table_name, + }; + + if let Some(table) = self.get_table(&table_ref) { + return if create_if_not_exists { + Ok(table) + } else { + TableExistsSnafu { table_name }.fail() + }; + } + + let table_schema = + Arc::new(Schema::try_from(request.schema).context(InvalidRawSchemaSnafu)?); + + let table_id = request.id; + let table_dir = table_dir(&catalog_name, &schema_name, table_id); + + let table_full_name = table_ref.to_string(); + + let _lock = self.table_mutex.lock().await; + // Checks again, read lock should be enough since we are guarded by the mutex. + if let Some(table) = self.get_table_by_full_name(&table_full_name) { + return if request.create_if_not_exists { + Ok(table) + } else { + TableExistsSnafu { table_name }.fail() + }; + } + + let table_meta = TableMetaBuilder::new_external_table() + .schema(table_schema) + .engine(IMMUTABLE_FILE_ENGINE) + .options(table_options) + .build() + .context(BuildTableMetaSnafu { + table_name: &table_full_name, + })?; + + let table_info = TableInfoBuilder::new(&table_name, table_meta) + .ident(table_id) + .table_version(INIT_TABLE_VERSION) + .table_type(TableType::Base) + .catalog_name(catalog_name.to_string()) + .schema_name(schema_name.to_string()) + .desc(request.desc) + .build() + .context(BuildTableInfoSnafu { + table_name: &table_full_name, + })?; + + let table = Arc::new( + ImmutableFileTable::create( + &table_full_name, + &table_dir, + table_info, + self.object_store.clone(), + ) + .await?, + ); + + logging::info!( + "Immutable file engine created table: {} in schema: {}, table_id: {}.", + table_name, + schema_name, + table_id + ); + + self.tables + .write() + .unwrap() + .insert(table_full_name, table.clone()); + + Ok(table) + } + + fn get_table_by_full_name(&self, full_name: &str) -> Option { + self.tables + .read() + .unwrap() + .get(full_name) + .cloned() + .map(|table| table as _) + } + + fn get_table(&self, table_ref: &TableReference) -> Option { + self.get_table_by_full_name(&table_ref.to_string()) + } + + async fn open_table( + &self, + _ctx: &EngineContext, + request: OpenTableRequest, + ) -> TableResult> { + let OpenTableRequest { + catalog_name, + schema_name, + table_name, + .. + } = request; + let table_ref = TableReference { + catalog: &catalog_name, + schema: &schema_name, + table: &table_name, + }; + + let table_full_name = table_ref.to_string(); + + if let Some(table) = self.get_table_by_full_name(&table_full_name) { + return Ok(Some(table)); + } + + let table = { + let _lock = self.table_mutex.lock().await; + // Checks again, read lock should be enough since we are guarded by the mutex. + if let Some(table) = self.get_table_by_full_name(&table_full_name) { + return Ok(Some(table)); + } + + let table_id = request.table_id; + let table_dir = table_dir(&catalog_name, &schema_name, table_id); + + let (metadata, table_info) = self + .recover_table_manifest_and_info(&table_full_name, &table_dir) + .await + .map_err(BoxedError::new) + .context(TableOperationSnafu)?; + + debug!( + "Opening table {}, table info recovered: {:?}", + table_id, table_info + ); + + let table = Arc::new(ImmutableFileTable::new(table_info, metadata)); + + self.tables + .write() + .unwrap() + .insert(table_full_name, table.clone()); + Some(table as _) + }; + + logging::info!( + "Immutable file engine opened table: {} in schema: {}", + table_name, + schema_name + ); + + Ok(table) + } + + async fn drop_table(&self, req: DropTableRequest) -> Result { + let table_ref = TableReference { + catalog: &req.catalog_name, + schema: &req.schema_name, + table: &req.table_name, + }; + + let table_full_name = table_ref.to_string(); + let _lock = self.table_mutex.lock().await; + if let Some(table) = self.get_table_by_full_name(&table_full_name) { + let table_id = table.table_info().ident.table_id; + let table_dir = table_dir(&req.catalog_name, &req.schema_name, table_id); + + delete_table_manifest( + &table_full_name, + &table_manifest_dir(&table_dir), + self.object_store.clone(), + ) + .await + .map_err(BoxedError::new) + .context(DropTableSnafu { + table_name: &table_full_name, + })?; + self.tables.write().unwrap().remove(&table_full_name); + + Ok(true) + } else { + Ok(false) + } + } + + async fn close(&self) -> TableResult<()> { + let _lock = self.table_mutex.lock().await; + + let mut tables = self.tables.write().unwrap().clone(); + + futures::future::try_join_all(tables.values().map(|t| t.close())) + .await + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu)?; + + // Releases all closed table + tables.clear(); + + Ok(()) + } + + async fn recover_table_manifest_and_info( + &self, + table_name: &str, + table_dir: &str, + ) -> Result<(ImmutableMetadata, TableInfo)> { + ImmutableFileTable::recover_table_info( + table_name, + &table_manifest_dir(table_dir), + &self.object_store, + ) + .await + } +} diff --git a/src/file-table-engine/src/error.rs b/src/file-table-engine/src/error.rs new file mode 100644 index 0000000000..c9dbfd4ddb --- /dev/null +++ b/src/file-table-engine/src/error.rs @@ -0,0 +1,161 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; + +use common_error::prelude::*; +use serde_json::error::Error as JsonError; +use snafu::Location; +use table::metadata::{TableInfoBuilderError, TableMetaBuilderError}; + +#[derive(Debug, Snafu)] +#[snafu(visibility(pub))] +pub enum Error { + #[snafu(display("Failed to check object from path: {}, source: {}", path, source))] + CheckObject { + path: String, + location: Location, + source: object_store::Error, + }, + + #[snafu(display("Fail to encode object into json, source: {}", source))] + EncodeJson { + location: Location, + source: JsonError, + }, + + #[snafu(display("Fail to decode object from json, source: {}", source))] + DecodeJson { + location: Location, + source: JsonError, + }, + + #[snafu(display("Failed to drop table, table: {}, source: {}", table_name, source))] + DropTable { + source: BoxedError, + table_name: String, + location: Location, + }, + + #[snafu(display( + "Failed to write table manifest, table: {}, source: {}", + table_name, + source, + ))] + WriteTableManifest { + source: object_store::Error, + table_name: String, + location: Location, + }, + + #[snafu(display("Failed to write immutable manifest, path: {}", path))] + WriteImmutableManifest { path: String, location: Location }, + + #[snafu(display("Failed to delete table table manifest, source: {}", source,))] + DeleteTableManifest { + source: object_store::Error, + table_name: String, + location: Location, + }, + + #[snafu(display( + "Failed to read table manifest, table: {}, source: {}", + table_name, + source, + ))] + ReadTableManifest { + source: object_store::Error, + table_name: String, + location: Location, + }, + + #[snafu(display( + "Failed to build table meta for table: {}, source: {}", + table_name, + source + ))] + BuildTableMeta { + source: TableMetaBuilderError, + table_name: String, + location: Location, + }, + + #[snafu(display( + "Failed to build table info for table: {}, source: {}", + table_name, + source + ))] + BuildTableInfo { + source: TableInfoBuilderError, + table_name: String, + location: Location, + }, + + #[snafu(display("Table already exists: {}", table_name))] + TableExists { + location: Location, + table_name: String, + }, + + #[snafu(display( + "Failed to convert metadata from deserialized data, source: {}", + source + ))] + ConvertRaw { + #[snafu(backtrace)] + source: table::metadata::ConvertError, + }, + + #[snafu(display("Invalid schema, source: {}", source))] + InvalidRawSchema { + #[snafu(backtrace)] + source: datatypes::error::Error, + }, +} + +pub type Result = std::result::Result; + +impl ErrorExt for Error { + fn status_code(&self) -> StatusCode { + use Error::*; + + match self { + TableExists { .. } + | BuildTableMeta { .. } + | BuildTableInfo { .. } + | InvalidRawSchema { .. } => StatusCode::InvalidArguments, + + WriteTableManifest { .. } + | DeleteTableManifest { .. } + | ReadTableManifest { .. } + | CheckObject { .. } => StatusCode::StorageUnavailable, + + EncodeJson { .. } + | DecodeJson { .. } + | ConvertRaw { .. } + | DropTable { .. } + | WriteImmutableManifest { .. } => StatusCode::Unexpected, + } + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +impl From for common_procedure::Error { + fn from(e: Error) -> common_procedure::Error { + common_procedure::Error::from_error_ext(e) + } +} diff --git a/src/file-table-engine/src/lib.rs b/src/file-table-engine/src/lib.rs new file mode 100644 index 0000000000..8129a144fd --- /dev/null +++ b/src/file-table-engine/src/lib.rs @@ -0,0 +1,18 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod engine; +pub mod error; +pub mod manifest; +pub mod table; diff --git a/src/file-table-engine/src/manifest.rs b/src/file-table-engine/src/manifest.rs new file mode 100644 index 0000000000..63d9acd692 --- /dev/null +++ b/src/file-table-engine/src/manifest.rs @@ -0,0 +1,20 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod immutable; + +#[inline] +pub fn table_manifest_dir(table_dir: &str) -> String { + format!("{table_dir}/manifest/") +} diff --git a/src/file-table-engine/src/manifest/immutable.rs b/src/file-table-engine/src/manifest/immutable.rs new file mode 100644 index 0000000000..7be21c90f9 --- /dev/null +++ b/src/file-table-engine/src/manifest/immutable.rs @@ -0,0 +1,93 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use object_store::ObjectStore; +use serde::{Deserialize, Serialize}; +use snafu::{ensure, ResultExt}; +use table::metadata::RawTableInfo; + +use crate::error::{ + CheckObjectSnafu, DecodeJsonSnafu, DeleteTableManifestSnafu, EncodeJsonSnafu, + ReadTableManifestSnafu, Result, WriteImmutableManifestSnafu, WriteTableManifestSnafu, +}; + +pub type MetadataVersion = u32; +pub const INIT_META_VERSION: MetadataVersion = 0; + +const IMMUTABLE_MANIFEST_FILE: &str = "_immutable_manifest"; + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] +pub struct ImmutableMetadata { + pub table_info: RawTableInfo, + pub version: MetadataVersion, +} + +fn encode_metadata(item: &ImmutableMetadata) -> Result> { + serde_json::to_vec(&item).context(EncodeJsonSnafu) +} + +fn decode_metadata(src: &[u8]) -> Result { + serde_json::from_slice(src).context(DecodeJsonSnafu) +} + +fn manifest_path(dir: &str) -> String { + format!("{}{}", dir, IMMUTABLE_MANIFEST_FILE) +} + +pub(crate) async fn delete_table_manifest( + table_name: &str, + dir: &str, + object_store: ObjectStore, +) -> Result<()> { + object_store + .delete(&manifest_path(dir)) + .await + .context(DeleteTableManifestSnafu { table_name }) +} + +pub(crate) async fn write_table_manifest( + table_name: &str, + dir: &str, + object_store: &ObjectStore, + metadata: &ImmutableMetadata, +) -> Result<()> { + let path = &manifest_path(dir); + let exist = object_store + .is_exist(path) + .await + .context(CheckObjectSnafu { path })?; + + ensure!(!exist, WriteImmutableManifestSnafu { path }); + + let bs = encode_metadata(metadata)?; + + object_store + .write(path, bs) + .await + .context(WriteTableManifestSnafu { table_name }) +} + +pub(crate) async fn read_table_manifest( + table_name: &str, + dir: &str, + object_store: &ObjectStore, +) -> Result { + let path = manifest_path(dir); + let bs = object_store + .read(&path) + .await + .context(ReadTableManifestSnafu { table_name })?; + + decode_metadata(&bs) +} diff --git a/src/file-table-engine/src/table.rs b/src/file-table-engine/src/table.rs new file mode 100644 index 0000000000..2eb5de637a --- /dev/null +++ b/src/file-table-engine/src/table.rs @@ -0,0 +1,15 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod immutable; diff --git a/src/file-table-engine/src/table/immutable.rs b/src/file-table-engine/src/table/immutable.rs new file mode 100644 index 0000000000..2fb7a35107 --- /dev/null +++ b/src/file-table-engine/src/table/immutable.rs @@ -0,0 +1,130 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::sync::Arc; + +use async_trait::async_trait; +use common_query::physical_plan::PhysicalPlanRef; +use common_query::prelude::Expr; +use datatypes::schema::SchemaRef; +use object_store::ObjectStore; +use snafu::ResultExt; +use store_api::storage::RegionNumber; +use table::error::Result as TableResult; +use table::metadata::{RawTableInfo, TableInfo, TableInfoRef, TableType}; +use table::Table; + +use crate::error::{ConvertRawSnafu, Result}; +use crate::manifest::immutable::{ + read_table_manifest, write_table_manifest, ImmutableMetadata, INIT_META_VERSION, +}; +use crate::manifest::table_manifest_dir; + +pub struct ImmutableFileTable { + metadata: ImmutableMetadata, + // currently, it's immutable + table_info: Arc, +} + +pub type ImmutableFileTableRef = Arc; + +#[async_trait] +impl Table for ImmutableFileTable { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.table_info().meta.schema.clone() + } + + fn table_type(&self) -> TableType { + self.table_info().table_type + } + + fn table_info(&self) -> TableInfoRef { + self.table_info.clone() + } + + async fn scan( + &self, + _projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> TableResult { + todo!() + } + + async fn flush( + &self, + _region_number: Option, + _wait: Option, + ) -> TableResult<()> { + // nothing to flush + Ok(()) + } + + async fn close(&self) -> TableResult<()> { + Ok(()) + } +} + +impl ImmutableFileTable { + #[inline] + pub fn metadata(&self) -> &ImmutableMetadata { + &self.metadata + } + + pub(crate) fn new(table_info: TableInfo, metadata: ImmutableMetadata) -> Self { + Self { + metadata, + table_info: Arc::new(table_info), + } + } + + pub async fn create( + table_name: &str, + table_dir: &str, + table_info: TableInfo, + object_store: ObjectStore, + ) -> Result { + let metadata = ImmutableMetadata { + table_info: RawTableInfo::from(table_info.clone()), + version: INIT_META_VERSION, + }; + + write_table_manifest( + table_name, + &table_manifest_dir(table_dir), + &object_store, + &metadata, + ) + .await?; + + Ok(ImmutableFileTable::new(table_info, metadata)) + } + + pub(crate) async fn recover_table_info( + table_name: &str, + table_dir: &str, + object_store: &ObjectStore, + ) -> Result<(ImmutableMetadata, TableInfo)> { + let metadata = read_table_manifest(table_name, table_dir, object_store).await?; + let table_info = + TableInfo::try_from(metadata.table_info.clone()).context(ConvertRawSnafu)?; + + Ok((metadata, table_info)) + } +} diff --git a/src/table/src/metadata.rs b/src/table/src/metadata.rs index 8fc4a258db..6e30742286 100644 --- a/src/table/src/metadata.rs +++ b/src/table/src/metadata.rs @@ -128,6 +128,16 @@ impl TableMetaBuilder { _ => Err("Missing primary_key_indices or schema to create value_indices".to_string()), } } + + pub fn new_external_table() -> Self { + Self { + primary_key_indices: Some(Vec::new()), + value_indices: Some(Vec::new()), + region_numbers: Some(Vec::new()), + next_column_id: Some(0), + ..Default::default() + } + } } impl TableMeta {