feat: introduce file table engine (#1323)

* feat: introduce file table engine

* chore: apply cr suggestions

* refactor: refactor immutable manifest

* chore: apply cr suggestions

* refactor: refactor immutable manifest

* chore: apply suggestions from code review

Co-authored-by: dennis zhuang <killme2008@gmail.com>

* chore: apply suggestions from CR

* chore: apply suggestions from code review

Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com>

---------

Co-authored-by: dennis zhuang <killme2008@gmail.com>
Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com>
This commit is contained in:
Weny Xu
2023-04-10 13:03:36 +09:00
committed by GitHub
parent 804348966d
commit 29c6155ae3
13 changed files with 872 additions and 0 deletions

23
Cargo.lock generated
View File

@@ -2816,6 +2816,29 @@ dependencies = [
"windows-sys 0.45.0",
]
[[package]]
name = "file-table-engine"
version = "0.1.1"
dependencies = [
"async-trait",
"common-catalog",
"common-error",
"common-procedure",
"common-query",
"common-telemetry",
"common-test-util",
"datatypes",
"futures",
"object-store",
"serde",
"serde_json",
"snafu",
"storage",
"store-api",
"table",
"tokio",
]
[[package]]
name = "filetime"
version = "0.2.20"

View File

@@ -24,6 +24,7 @@ members = [
"src/common/time",
"src/datanode",
"src/datatypes",
"src/file-table-engine",
"src/frontend",
"src/log-store",
"src/meta-client",

View File

@@ -27,3 +27,4 @@ pub const SYSTEM_CATALOG_TABLE_ID: u32 = 0;
pub const SCRIPTS_TABLE_ID: u32 = 1;
pub const MITO_ENGINE: &str = "mito";
pub const IMMUTABLE_FILE_ENGINE: &str = "file";

View File

@@ -0,0 +1,27 @@
[package]
name = "file-table-engine"
version.workspace = true
edition.workspace = true
license.workspace = true
[features]
default = []
[dependencies]
async-trait = "0.1"
common-catalog = { path = "../common/catalog" }
common-error = { path = "../common/error" }
common-procedure = { path = "../common/procedure" }
common-query = { path = "../common/query" }
common-telemetry = { path = "../common/telemetry" }
datatypes = { path = "../datatypes" }
futures.workspace = true
object-store = { path = "../object-store" }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
snafu.workspace = true
storage = { path = "../storage" }
store-api = { path = "../store-api" }
table = { path = "../table" }
common-test-util = { path = "../common/test-util", optional = true }
tokio.workspace = true

View File

@@ -0,0 +1,18 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use table::metadata::TableVersion;
pub mod immutable;
const INIT_TABLE_VERSION: TableVersion = 0;

View File

@@ -0,0 +1,355 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use async_trait::async_trait;
use common_catalog::consts::IMMUTABLE_FILE_ENGINE;
use common_error::prelude::BoxedError;
use common_telemetry::{debug, logging};
use datatypes::schema::Schema;
use object_store::ObjectStore;
use snafu::ResultExt;
use table::engine::{table_dir, EngineContext, TableEngine, TableReference};
use table::error::TableOperationSnafu;
use table::metadata::{TableInfo, TableInfoBuilder, TableMetaBuilder, TableType};
use table::requests::{AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest};
use table::{error as table_error, Result as TableResult, Table, TableRef};
use tokio::sync::Mutex;
use crate::engine::INIT_TABLE_VERSION;
use crate::error::{
BuildTableInfoSnafu, BuildTableMetaSnafu, DropTableSnafu, InvalidRawSchemaSnafu, Result,
TableExistsSnafu,
};
use crate::manifest::immutable::{delete_table_manifest, ImmutableMetadata};
use crate::manifest::table_manifest_dir;
use crate::table::immutable::{ImmutableFileTable, ImmutableFileTableRef};
/// [TableEngine] implementation.
#[derive(Clone)]
pub struct ImmutableFileTableEngine {
inner: Arc<EngineInner>,
}
#[async_trait]
impl TableEngine for ImmutableFileTableEngine {
fn name(&self) -> &str {
IMMUTABLE_FILE_ENGINE
}
async fn create_table(
&self,
ctx: &EngineContext,
request: CreateTableRequest,
) -> TableResult<TableRef> {
self.inner
.create_table(ctx, request)
.await
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)
}
async fn open_table(
&self,
ctx: &EngineContext,
request: OpenTableRequest,
) -> TableResult<Option<TableRef>> {
self.inner
.open_table(ctx, request)
.await
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)
}
async fn alter_table(
&self,
_ctx: &EngineContext,
_req: AlterTableRequest,
) -> TableResult<TableRef> {
table_error::UnsupportedSnafu {
operation: "ALTER TABLE",
}
.fail()
}
fn get_table(
&self,
_ctx: &EngineContext,
table_ref: &TableReference,
) -> TableResult<Option<TableRef>> {
Ok(self.inner.get_table(table_ref))
}
fn table_exists(&self, _ctx: &EngineContext, table_ref: &TableReference) -> bool {
self.inner.get_table(table_ref).is_some()
}
async fn drop_table(
&self,
_ctx: &EngineContext,
request: DropTableRequest,
) -> TableResult<bool> {
self.inner
.drop_table(request)
.await
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)
}
async fn close(&self) -> TableResult<()> {
self.inner.close().await
}
}
struct EngineInner {
/// All tables opened by the engine. Map key is formatted [TableReference].
///
/// Writing to `tables` should also hold the `table_mutex`.
tables: RwLock<HashMap<String, ImmutableFileTableRef>>,
object_store: ObjectStore,
/// Table mutex is used to protect the operations such as creating/opening/closing
/// a table, to avoid things like opening the same table simultaneously.
table_mutex: Mutex<()>,
}
impl EngineInner {
async fn create_table(
&self,
_ctx: &EngineContext,
request: CreateTableRequest,
) -> Result<TableRef> {
let CreateTableRequest {
catalog_name,
schema_name,
table_name,
create_if_not_exists,
table_options,
..
} = request;
let table_ref = TableReference {
catalog: &catalog_name,
schema: &schema_name,
table: &table_name,
};
if let Some(table) = self.get_table(&table_ref) {
return if create_if_not_exists {
Ok(table)
} else {
TableExistsSnafu { table_name }.fail()
};
}
let table_schema =
Arc::new(Schema::try_from(request.schema).context(InvalidRawSchemaSnafu)?);
let table_id = request.id;
let table_dir = table_dir(&catalog_name, &schema_name, table_id);
let table_full_name = table_ref.to_string();
let _lock = self.table_mutex.lock().await;
// Checks again, read lock should be enough since we are guarded by the mutex.
if let Some(table) = self.get_table_by_full_name(&table_full_name) {
return if request.create_if_not_exists {
Ok(table)
} else {
TableExistsSnafu { table_name }.fail()
};
}
let table_meta = TableMetaBuilder::new_external_table()
.schema(table_schema)
.engine(IMMUTABLE_FILE_ENGINE)
.options(table_options)
.build()
.context(BuildTableMetaSnafu {
table_name: &table_full_name,
})?;
let table_info = TableInfoBuilder::new(&table_name, table_meta)
.ident(table_id)
.table_version(INIT_TABLE_VERSION)
.table_type(TableType::Base)
.catalog_name(catalog_name.to_string())
.schema_name(schema_name.to_string())
.desc(request.desc)
.build()
.context(BuildTableInfoSnafu {
table_name: &table_full_name,
})?;
let table = Arc::new(
ImmutableFileTable::create(
&table_full_name,
&table_dir,
table_info,
self.object_store.clone(),
)
.await?,
);
logging::info!(
"Immutable file engine created table: {} in schema: {}, table_id: {}.",
table_name,
schema_name,
table_id
);
self.tables
.write()
.unwrap()
.insert(table_full_name, table.clone());
Ok(table)
}
fn get_table_by_full_name(&self, full_name: &str) -> Option<TableRef> {
self.tables
.read()
.unwrap()
.get(full_name)
.cloned()
.map(|table| table as _)
}
fn get_table(&self, table_ref: &TableReference) -> Option<TableRef> {
self.get_table_by_full_name(&table_ref.to_string())
}
async fn open_table(
&self,
_ctx: &EngineContext,
request: OpenTableRequest,
) -> TableResult<Option<TableRef>> {
let OpenTableRequest {
catalog_name,
schema_name,
table_name,
..
} = request;
let table_ref = TableReference {
catalog: &catalog_name,
schema: &schema_name,
table: &table_name,
};
let table_full_name = table_ref.to_string();
if let Some(table) = self.get_table_by_full_name(&table_full_name) {
return Ok(Some(table));
}
let table = {
let _lock = self.table_mutex.lock().await;
// Checks again, read lock should be enough since we are guarded by the mutex.
if let Some(table) = self.get_table_by_full_name(&table_full_name) {
return Ok(Some(table));
}
let table_id = request.table_id;
let table_dir = table_dir(&catalog_name, &schema_name, table_id);
let (metadata, table_info) = self
.recover_table_manifest_and_info(&table_full_name, &table_dir)
.await
.map_err(BoxedError::new)
.context(TableOperationSnafu)?;
debug!(
"Opening table {}, table info recovered: {:?}",
table_id, table_info
);
let table = Arc::new(ImmutableFileTable::new(table_info, metadata));
self.tables
.write()
.unwrap()
.insert(table_full_name, table.clone());
Some(table as _)
};
logging::info!(
"Immutable file engine opened table: {} in schema: {}",
table_name,
schema_name
);
Ok(table)
}
async fn drop_table(&self, req: DropTableRequest) -> Result<bool> {
let table_ref = TableReference {
catalog: &req.catalog_name,
schema: &req.schema_name,
table: &req.table_name,
};
let table_full_name = table_ref.to_string();
let _lock = self.table_mutex.lock().await;
if let Some(table) = self.get_table_by_full_name(&table_full_name) {
let table_id = table.table_info().ident.table_id;
let table_dir = table_dir(&req.catalog_name, &req.schema_name, table_id);
delete_table_manifest(
&table_full_name,
&table_manifest_dir(&table_dir),
self.object_store.clone(),
)
.await
.map_err(BoxedError::new)
.context(DropTableSnafu {
table_name: &table_full_name,
})?;
self.tables.write().unwrap().remove(&table_full_name);
Ok(true)
} else {
Ok(false)
}
}
async fn close(&self) -> TableResult<()> {
let _lock = self.table_mutex.lock().await;
let mut tables = self.tables.write().unwrap().clone();
futures::future::try_join_all(tables.values().map(|t| t.close()))
.await
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
// Releases all closed table
tables.clear();
Ok(())
}
async fn recover_table_manifest_and_info(
&self,
table_name: &str,
table_dir: &str,
) -> Result<(ImmutableMetadata, TableInfo)> {
ImmutableFileTable::recover_table_info(
table_name,
&table_manifest_dir(table_dir),
&self.object_store,
)
.await
}
}

View File

@@ -0,0 +1,161 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use common_error::prelude::*;
use serde_json::error::Error as JsonError;
use snafu::Location;
use table::metadata::{TableInfoBuilderError, TableMetaBuilderError};
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Failed to check object from path: {}, source: {}", path, source))]
CheckObject {
path: String,
location: Location,
source: object_store::Error,
},
#[snafu(display("Fail to encode object into json, source: {}", source))]
EncodeJson {
location: Location,
source: JsonError,
},
#[snafu(display("Fail to decode object from json, source: {}", source))]
DecodeJson {
location: Location,
source: JsonError,
},
#[snafu(display("Failed to drop table, table: {}, source: {}", table_name, source))]
DropTable {
source: BoxedError,
table_name: String,
location: Location,
},
#[snafu(display(
"Failed to write table manifest, table: {}, source: {}",
table_name,
source,
))]
WriteTableManifest {
source: object_store::Error,
table_name: String,
location: Location,
},
#[snafu(display("Failed to write immutable manifest, path: {}", path))]
WriteImmutableManifest { path: String, location: Location },
#[snafu(display("Failed to delete table table manifest, source: {}", source,))]
DeleteTableManifest {
source: object_store::Error,
table_name: String,
location: Location,
},
#[snafu(display(
"Failed to read table manifest, table: {}, source: {}",
table_name,
source,
))]
ReadTableManifest {
source: object_store::Error,
table_name: String,
location: Location,
},
#[snafu(display(
"Failed to build table meta for table: {}, source: {}",
table_name,
source
))]
BuildTableMeta {
source: TableMetaBuilderError,
table_name: String,
location: Location,
},
#[snafu(display(
"Failed to build table info for table: {}, source: {}",
table_name,
source
))]
BuildTableInfo {
source: TableInfoBuilderError,
table_name: String,
location: Location,
},
#[snafu(display("Table already exists: {}", table_name))]
TableExists {
location: Location,
table_name: String,
},
#[snafu(display(
"Failed to convert metadata from deserialized data, source: {}",
source
))]
ConvertRaw {
#[snafu(backtrace)]
source: table::metadata::ConvertError,
},
#[snafu(display("Invalid schema, source: {}", source))]
InvalidRawSchema {
#[snafu(backtrace)]
source: datatypes::error::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
use Error::*;
match self {
TableExists { .. }
| BuildTableMeta { .. }
| BuildTableInfo { .. }
| InvalidRawSchema { .. } => StatusCode::InvalidArguments,
WriteTableManifest { .. }
| DeleteTableManifest { .. }
| ReadTableManifest { .. }
| CheckObject { .. } => StatusCode::StorageUnavailable,
EncodeJson { .. }
| DecodeJson { .. }
| ConvertRaw { .. }
| DropTable { .. }
| WriteImmutableManifest { .. } => StatusCode::Unexpected,
}
}
fn as_any(&self) -> &dyn Any {
self
}
}
impl From<Error> for common_procedure::Error {
fn from(e: Error) -> common_procedure::Error {
common_procedure::Error::from_error_ext(e)
}
}

View File

@@ -0,0 +1,18 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod engine;
pub mod error;
pub mod manifest;
pub mod table;

View File

@@ -0,0 +1,20 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod immutable;
#[inline]
pub fn table_manifest_dir(table_dir: &str) -> String {
format!("{table_dir}/manifest/")
}

View File

@@ -0,0 +1,93 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use object_store::ObjectStore;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
use table::metadata::RawTableInfo;
use crate::error::{
CheckObjectSnafu, DecodeJsonSnafu, DeleteTableManifestSnafu, EncodeJsonSnafu,
ReadTableManifestSnafu, Result, WriteImmutableManifestSnafu, WriteTableManifestSnafu,
};
pub type MetadataVersion = u32;
pub const INIT_META_VERSION: MetadataVersion = 0;
const IMMUTABLE_MANIFEST_FILE: &str = "_immutable_manifest";
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct ImmutableMetadata {
pub table_info: RawTableInfo,
pub version: MetadataVersion,
}
fn encode_metadata(item: &ImmutableMetadata) -> Result<Vec<u8>> {
serde_json::to_vec(&item).context(EncodeJsonSnafu)
}
fn decode_metadata(src: &[u8]) -> Result<ImmutableMetadata> {
serde_json::from_slice(src).context(DecodeJsonSnafu)
}
fn manifest_path(dir: &str) -> String {
format!("{}{}", dir, IMMUTABLE_MANIFEST_FILE)
}
pub(crate) async fn delete_table_manifest(
table_name: &str,
dir: &str,
object_store: ObjectStore,
) -> Result<()> {
object_store
.delete(&manifest_path(dir))
.await
.context(DeleteTableManifestSnafu { table_name })
}
pub(crate) async fn write_table_manifest(
table_name: &str,
dir: &str,
object_store: &ObjectStore,
metadata: &ImmutableMetadata,
) -> Result<()> {
let path = &manifest_path(dir);
let exist = object_store
.is_exist(path)
.await
.context(CheckObjectSnafu { path })?;
ensure!(!exist, WriteImmutableManifestSnafu { path });
let bs = encode_metadata(metadata)?;
object_store
.write(path, bs)
.await
.context(WriteTableManifestSnafu { table_name })
}
pub(crate) async fn read_table_manifest(
table_name: &str,
dir: &str,
object_store: &ObjectStore,
) -> Result<ImmutableMetadata> {
let path = manifest_path(dir);
let bs = object_store
.read(&path)
.await
.context(ReadTableManifestSnafu { table_name })?;
decode_metadata(&bs)
}

View File

@@ -0,0 +1,15 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod immutable;

View File

@@ -0,0 +1,130 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use std::sync::Arc;
use async_trait::async_trait;
use common_query::physical_plan::PhysicalPlanRef;
use common_query::prelude::Expr;
use datatypes::schema::SchemaRef;
use object_store::ObjectStore;
use snafu::ResultExt;
use store_api::storage::RegionNumber;
use table::error::Result as TableResult;
use table::metadata::{RawTableInfo, TableInfo, TableInfoRef, TableType};
use table::Table;
use crate::error::{ConvertRawSnafu, Result};
use crate::manifest::immutable::{
read_table_manifest, write_table_manifest, ImmutableMetadata, INIT_META_VERSION,
};
use crate::manifest::table_manifest_dir;
pub struct ImmutableFileTable {
metadata: ImmutableMetadata,
// currently, it's immutable
table_info: Arc<TableInfo>,
}
pub type ImmutableFileTableRef = Arc<ImmutableFileTable>;
#[async_trait]
impl Table for ImmutableFileTable {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.table_info().meta.schema.clone()
}
fn table_type(&self) -> TableType {
self.table_info().table_type
}
fn table_info(&self) -> TableInfoRef {
self.table_info.clone()
}
async fn scan(
&self,
_projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> TableResult<PhysicalPlanRef> {
todo!()
}
async fn flush(
&self,
_region_number: Option<RegionNumber>,
_wait: Option<bool>,
) -> TableResult<()> {
// nothing to flush
Ok(())
}
async fn close(&self) -> TableResult<()> {
Ok(())
}
}
impl ImmutableFileTable {
#[inline]
pub fn metadata(&self) -> &ImmutableMetadata {
&self.metadata
}
pub(crate) fn new(table_info: TableInfo, metadata: ImmutableMetadata) -> Self {
Self {
metadata,
table_info: Arc::new(table_info),
}
}
pub async fn create(
table_name: &str,
table_dir: &str,
table_info: TableInfo,
object_store: ObjectStore,
) -> Result<ImmutableFileTable> {
let metadata = ImmutableMetadata {
table_info: RawTableInfo::from(table_info.clone()),
version: INIT_META_VERSION,
};
write_table_manifest(
table_name,
&table_manifest_dir(table_dir),
&object_store,
&metadata,
)
.await?;
Ok(ImmutableFileTable::new(table_info, metadata))
}
pub(crate) async fn recover_table_info(
table_name: &str,
table_dir: &str,
object_store: &ObjectStore,
) -> Result<(ImmutableMetadata, TableInfo)> {
let metadata = read_table_manifest(table_name, table_dir, object_store).await?;
let table_info =
TableInfo::try_from(metadata.table_info.clone()).context(ConvertRawSnafu)?;
Ok((metadata, table_info))
}
}

View File

@@ -128,6 +128,16 @@ impl TableMetaBuilder {
_ => Err("Missing primary_key_indices or schema to create value_indices".to_string()),
}
}
pub fn new_external_table() -> Self {
Self {
primary_key_indices: Some(Vec::new()),
value_indices: Some(Vec::new()),
region_numbers: Some(Vec::new()),
next_column_id: Some(0),
..Default::default()
}
}
}
impl TableMeta {