refactor: remove TableEngine (#3181)

* refactor: remove TableEngine

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* Update src/table/src/table_reference.rs

Co-authored-by: LFC <990479+MichaelScofield@users.noreply.github.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: LFC <990479+MichaelScofield@users.noreply.github.com>
This commit is contained in:
Ruihang Xia
2024-01-17 12:18:37 +08:00
committed by GitHub
parent 204b9433b8
commit fbd0197794
23 changed files with 90 additions and 510 deletions

View File

@@ -17,6 +17,4 @@ pub use client::{CachedMetaKvBackend, MetaKvBackend};
mod client;
mod manager;
#[cfg(feature = "testing")]
pub mod mock;
pub use manager::KvBackendCatalogManager;

View File

@@ -1,128 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::{Arc, RwLock as StdRwLock};
use common_recordbatch::RecordBatch;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::vectors::StringVector;
use table::engine::{CloseTableResult, EngineContext, TableEngine};
use table::metadata::TableId;
use table::requests::{
AlterTableRequest, CloseTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest,
TruncateTableRequest,
};
use table::test_util::MemTable;
use table::TableRef;
#[derive(Default)]
pub struct MockTableEngine {
tables: StdRwLock<HashMap<TableId, TableRef>>,
}
#[async_trait::async_trait]
impl TableEngine for MockTableEngine {
fn name(&self) -> &str {
"MockTableEngine"
}
/// Create a table with only one column
async fn create_table(
&self,
_ctx: &EngineContext,
request: CreateTableRequest,
) -> table::Result<TableRef> {
let table_id = request.id;
let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
"name",
ConcreteDataType::string_datatype(),
true,
)]));
let data = vec![Arc::new(StringVector::from(vec!["a", "b", "c"])) as _];
let record_batch = RecordBatch::new(schema, data).unwrap();
let table = MemTable::new_with_catalog(
&request.table_name,
record_batch,
table_id,
request.catalog_name,
request.schema_name,
vec![0],
);
let mut tables = self.tables.write().unwrap();
let _ = tables.insert(table_id, table.clone() as TableRef);
Ok(table)
}
async fn open_table(
&self,
_ctx: &EngineContext,
request: OpenTableRequest,
) -> table::Result<Option<TableRef>> {
Ok(self.tables.read().unwrap().get(&request.table_id).cloned())
}
async fn alter_table(
&self,
_ctx: &EngineContext,
_request: AlterTableRequest,
) -> table::Result<TableRef> {
unimplemented!()
}
fn get_table(
&self,
_ctx: &EngineContext,
table_id: TableId,
) -> table::Result<Option<TableRef>> {
Ok(self.tables.read().unwrap().get(&table_id).cloned())
}
fn table_exists(&self, _ctx: &EngineContext, table_id: TableId) -> bool {
self.tables.read().unwrap().contains_key(&table_id)
}
async fn drop_table(
&self,
_ctx: &EngineContext,
_request: DropTableRequest,
) -> table::Result<bool> {
unimplemented!()
}
async fn close_table(
&self,
_ctx: &EngineContext,
request: CloseTableRequest,
) -> table::Result<CloseTableResult> {
let _ = self.tables.write().unwrap().remove(&request.table_id);
Ok(CloseTableResult::Released(vec![]))
}
async fn close(&self) -> table::Result<()> {
Ok(())
}
async fn truncate_table(
&self,
_ctx: &EngineContext,
_request: TruncateTableRequest,
) -> table::Result<bool> {
Ok(true)
}
}

View File

@@ -20,8 +20,8 @@ use datatypes::data_type::{ConcreteDataType, DataType};
use datatypes::prelude::VectorRef;
use datatypes::schema::SchemaRef;
use snafu::{ensure, ResultExt};
use table::engine::TableReference;
use table::metadata::TableId;
use table::table_reference::TableReference;
use crate::error::{CreateVectorSnafu, Result, UnexpectedValuesLengthSnafu};
use crate::util;

View File

@@ -20,8 +20,8 @@ use api::v1::{
};
use datatypes::schema::Schema;
use snafu::{ensure, OptionExt};
use table::engine::TableReference;
use table::metadata::TableId;
use table::table_reference::TableReference;
use crate::error::{
DuplicatedColumnNameSnafu, DuplicatedTimestampColumnSnafu, MissingTimestampColumnSnafu, Result,

View File

@@ -33,9 +33,9 @@ use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::{ColumnId, RegionId};
use strum::AsRefStr;
use table::engine::TableReference;
use table::metadata::{RawTableInfo, TableId, TableInfo};
use table::requests::AlterKind;
use table::table_reference::TableReference;
use crate::cache_invalidator::Context;
use crate::ddl::utils::handle_operate_region_error;

View File

@@ -33,8 +33,8 @@ use snafu::{ensure, OptionExt, ResultExt};
use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY;
use store_api::storage::{RegionId, RegionNumber};
use strum::AsRefStr;
use table::engine::TableReference;
use table::metadata::{RawTableInfo, TableId};
use table::table_reference::TableReference;
use crate::ddl::utils::{handle_operate_region_error, handle_retry_error, region_storage_path};
use crate::ddl::DdlContext;

View File

@@ -29,8 +29,8 @@ use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::RegionId;
use strum::AsRefStr;
use table::engine::TableReference;
use table::metadata::{RawTableInfo, TableId};
use table::table_reference::TableReference;
use super::utils::handle_retry_error;
use crate::cache_invalidator::Context;

View File

@@ -27,8 +27,8 @@ use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
use store_api::storage::RegionId;
use strum::AsRefStr;
use table::engine::TableReference;
use table::metadata::{RawTableInfo, TableId};
use table::table_reference::TableReference;
use super::utils::handle_retry_error;
use crate::ddl::utils::handle_operate_region_error;

View File

@@ -15,8 +15,8 @@
use std::collections::HashMap;
use serde::{Deserialize, Serialize};
use table::engine::TableReference;
use table::metadata::{RawTableInfo, TableId};
use table::table_reference::TableReference;
use super::{DeserializedValueWithBytes, TableMetaValue, TABLE_INFO_KEY_PREFIX};
use crate::error::Result;

View File

@@ -26,8 +26,8 @@ use base64::Engine as _;
use prost::Message;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use table::engine::TableReference;
use table::metadata::{RawTableInfo, TableId};
use table::table_reference::TableReference;
use crate::error::{self, Result};
use crate::table_name::TableName;

View File

@@ -16,7 +16,7 @@ use std::fmt::{Display, Formatter};
use api::v1::meta::TableName as PbTableName;
use serde::{Deserialize, Serialize};
use table::engine::TableReference;
use table::table_reference::TableReference;
#[derive(Debug, Clone, Hash, Eq, PartialEq, Deserialize, Serialize)]
pub struct TableName {

View File

@@ -35,8 +35,8 @@ use sql::statements::alter::{AlterTable, AlterTableOperation};
use sql::statements::create::{CreateExternalTable, CreateTable, TIME_INDEX};
use sql::statements::{column_def_to_schema, sql_column_def_to_grpc_column_def};
use sql::util::to_lowercase_options_map;
use table::engine::TableReference;
use table::requests::{TableOptions, FILE_TABLE_META_KEY};
use table::table_reference::TableReference;
use crate::error::{
BuildCreateExprOnInsertionSnafu, ColumnDataTypeSnafu, ConvertColumnDefaultConstraintSnafu,

View File

@@ -35,8 +35,8 @@ use partition::manager::PartitionRuleManagerRef;
use session::context::QueryContextRef;
use snafu::prelude::*;
use sql::statements::insert::Insert;
use table::engine::TableReference;
use table::requests::InsertRequest as TableInsertRequest;
use table::table_reference::TableReference;
use table::TableRef;
use crate::error::{

View File

@@ -46,8 +46,8 @@ use sql::statements::statement::Statement;
use sql::statements::OptionMap;
use sql::util::format_raw_object_name;
use sqlparser::ast::{Expr, ObjectName, Value};
use table::engine::TableReference;
use table::requests::{CopyDatabaseRequest, CopyDirection, CopyTableRequest};
use table::table_reference::TableReference;
use table::TableRef;
use crate::error::{

View File

@@ -42,8 +42,8 @@ use object_store::{Entry, EntryMode, ObjectStore};
use regex::Regex;
use session::context::QueryContextRef;
use snafu::ResultExt;
use table::engine::TableReference;
use table::requests::{CopyTableRequest, InsertRequest};
use table::table_reference::TableReference;
use tokio::io::BufReader;
use crate::error::{self, IntoVectorsSnafu, Result};

View File

@@ -32,9 +32,9 @@ use object_store::ObjectStore;
use query::plan::LogicalPlan;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
use table::engine::TableReference;
use table::requests::CopyTableRequest;
use table::table::adapter::DfTableProviderAdapter;
use table::table_reference::TableReference;
use crate::error::{self, BuildDfLogicalPlanSnafu, ExecLogicalPlanSnafu, Result};
use crate::statement::StatementExecutor;

View File

@@ -1,204 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::{self, Display};
use std::sync::Arc;
use common_procedure::BoxedProcedure;
use datafusion_common::TableReference as DfTableReference;
use store_api::storage::RegionNumber;
use crate::error::{self, Result};
use crate::metadata::TableId;
use crate::requests::{
AlterTableRequest, CloseTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest,
TruncateTableRequest,
};
use crate::TableRef;
/// Represents a resolved path to a table of the form “catalog.schema.table”
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct TableReference<'a> {
pub catalog: &'a str,
pub schema: &'a str,
pub table: &'a str,
}
pub type OwnedTableReference = TableReference<'static>;
// TODO(LFC): Find a better place for `TableReference`,
// so that we can reuse the default catalog and schema consts.
// Could be done together with issue #559.
impl<'a> TableReference<'a> {
pub fn bare(table: &'a str) -> Self {
TableReference {
catalog: "greptime",
schema: "public",
table,
}
}
pub fn full(catalog: &'a str, schema: &'a str, table: &'a str) -> Self {
TableReference {
catalog,
schema,
table,
}
}
}
impl<'a> Display for TableReference<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}.{}.{}", self.catalog, self.schema, self.table)
}
}
impl<'a> From<TableReference<'a>> for DfTableReference<'a> {
fn from(val: TableReference<'a>) -> Self {
DfTableReference::full(val.catalog, val.schema, val.table)
}
}
/// CloseTableResult
///
/// Returns [`CloseTableResult::Released`] and closed region numbers if a table was removed
/// from the engine.
/// Returns [`CloseTableResult::PartialClosed`] and closed region numbers if only partial
/// regions were closed.
#[derive(Debug)]
pub enum CloseTableResult {
Released(Vec<RegionNumber>),
PartialClosed(Vec<RegionNumber>),
NotFound,
}
/// Table engine abstraction.
#[async_trait::async_trait]
pub trait TableEngine: Send + Sync {
/// Return engine name
fn name(&self) -> &str;
/// Create a table by given request.
///
/// Return the created table.
async fn create_table(
&self,
ctx: &EngineContext,
request: CreateTableRequest,
) -> Result<TableRef>;
/// Open an existing table by given `request`, returns the opened table. If the table does not
/// exist, returns an `Ok(None)`.
async fn open_table(
&self,
ctx: &EngineContext,
request: OpenTableRequest,
) -> Result<Option<TableRef>>;
/// Alter table schema, options etc. by given request,
///
/// Returns the table after altered.
async fn alter_table(
&self,
ctx: &EngineContext,
request: AlterTableRequest,
) -> Result<TableRef>;
/// Returns the table by it's name.
fn get_table(&self, ctx: &EngineContext, table_id: TableId) -> Result<Option<TableRef>>;
/// Returns true when the given table is exists.
fn table_exists(&self, ctx: &EngineContext, table_id: TableId) -> bool;
/// Drops the given table. Return true if the table is dropped, or false if the table doesn't exist.
async fn drop_table(&self, ctx: &EngineContext, request: DropTableRequest) -> Result<bool>;
/// Closes the (partial) given table.
///
/// Removes a table from the engine if all regions are closed.
async fn close_table(
&self,
_ctx: &EngineContext,
_request: CloseTableRequest,
) -> Result<CloseTableResult> {
error::UnsupportedSnafu {
operation: "close_table",
}
.fail()?
}
/// Close the engine.
async fn close(&self) -> Result<()>;
async fn truncate_table(
&self,
_ctx: &EngineContext,
_request: TruncateTableRequest,
) -> Result<bool>;
}
pub type TableEngineRef = Arc<dyn TableEngine>;
/// Table engine context.
#[derive(Debug, Clone, Default)]
pub struct EngineContext {}
/// Procedures for table engine.
pub trait TableEngineProcedure: Send + Sync {
/// Returns a procedure that creates a table by specific `request`.
fn create_table_procedure(
&self,
ctx: &EngineContext,
request: CreateTableRequest,
) -> Result<BoxedProcedure>;
/// Returns a procedure that alters a table by specific `request`.
fn alter_table_procedure(
&self,
ctx: &EngineContext,
request: AlterTableRequest,
) -> Result<BoxedProcedure>;
/// Returns a procedure that drops a table by specific `request`.
fn drop_table_procedure(
&self,
ctx: &EngineContext,
request: DropTableRequest,
) -> Result<BoxedProcedure>;
/// Returns a procedure that truncates a table by specific `request`.
fn truncate_table_procedure(
&self,
ctx: &EngineContext,
request: TruncateTableRequest,
) -> Result<BoxedProcedure>;
}
pub type TableEngineProcedureRef = Arc<dyn TableEngineProcedure>;
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_table_reference() {
let table_ref = TableReference {
catalog: "greptime",
schema: "public",
table: "test",
};
assert_eq!("greptime.public.test", table_ref.to_string());
}
}

View File

@@ -43,12 +43,6 @@ pub enum Error {
location: Location,
},
#[snafu(display("Engine not found: {}", engine))]
EngineNotFound { engine: String, location: Location },
#[snafu(display("Engine exist: {}", engine))]
EngineExist { engine: String, location: Location },
#[snafu(display("Table projection error"))]
TableProjection {
#[snafu(source)]
@@ -164,9 +158,7 @@ impl ErrorExt for Error {
Error::ColumnNotExists { .. } => StatusCode::TableColumnNotFound,
Error::RegionSchemaMismatch { .. } => StatusCode::StorageUnavailable,
Error::Unsupported { .. } => StatusCode::Unsupported,
Error::ParseTableOption { .. }
| Error::EngineNotFound { .. }
| Error::EngineExist { .. } => StatusCode::InvalidArguments,
Error::ParseTableOption { .. } => StatusCode::InvalidArguments,
Error::InvalidTable { .. } | Error::MissingTimeIndexColumn { .. } => {
StatusCode::Internal

View File

@@ -15,13 +15,13 @@
#![feature(assert_matches)]
pub mod dist_table;
pub mod engine;
pub mod error;
pub mod metadata;
pub mod predicate;
pub mod requests;
pub mod stats;
pub mod table;
pub mod table_reference;
pub mod test_util;
pub mod thin_table;

View File

@@ -28,10 +28,10 @@ use serde::{Deserialize, Serialize};
use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, PHYSICAL_TABLE_METADATA_KEY};
use store_api::storage::RegionNumber;
use crate::engine::TableReference;
use crate::error;
use crate::error::ParseTableOptionSnafu;
use crate::metadata::{TableId, TableVersion};
use crate::table_reference::TableReference;
pub const FILE_TABLE_META_KEY: &str = "__private.file_table_meta";
pub const FILE_TABLE_LOCATION_KEY: &str = "location";

View File

@@ -0,0 +1,73 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::{self, Display};
use datafusion_common::TableReference as DfTableReference;
/// Represents a resolved path to a table of the form “catalog.schema.table”
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct TableReference<'a> {
pub catalog: &'a str,
pub schema: &'a str,
pub table: &'a str,
}
pub type OwnedTableReference = TableReference<'static>;
impl<'a> TableReference<'a> {
pub fn bare(table: &'a str) -> Self {
TableReference {
catalog: common_catalog::consts::DEFAULT_CATALOG_NAME,
schema: common_catalog::consts::DEFAULT_SCHEMA_NAME,
table,
}
}
pub fn full(catalog: &'a str, schema: &'a str, table: &'a str) -> Self {
TableReference {
catalog,
schema,
table,
}
}
}
impl<'a> Display for TableReference<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}.{}.{}", self.catalog, self.schema, self.table)
}
}
impl<'a> From<TableReference<'a>> for DfTableReference<'a> {
fn from(val: TableReference<'a>) -> Self {
DfTableReference::full(val.catalog, val.schema, val.table)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_table_reference() {
let table_ref = TableReference {
catalog: "greptime",
schema: "public",
table: "test",
};
assert_eq!("greptime.public.test", table_ref.to_string());
}
}

View File

@@ -14,9 +14,7 @@
mod empty_table;
mod memtable;
mod mock_engine;
pub mod table_info;
pub use empty_table::EmptyTable;
pub use memtable::MemTable;
pub use mock_engine::MockTableEngine;

View File

@@ -1,149 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use async_trait::async_trait;
use common_procedure::BoxedProcedure;
use tokio::sync::Mutex;
use crate::engine::{EngineContext, TableEngine, TableEngineProcedure};
use crate::metadata::TableId;
use crate::requests::{
AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest, TruncateTableRequest,
};
use crate::test_util::EmptyTable;
use crate::{Result, TableRef};
#[derive(Default)]
pub struct MockTableEngine {
tables: Mutex<HashMap<(String, String, String), TableRef>>,
}
impl MockTableEngine {
pub fn new() -> Self {
Self::default()
}
}
#[async_trait]
impl TableEngine for MockTableEngine {
fn name(&self) -> &str {
"MockTableEngine"
}
async fn create_table(
&self,
_ctx: &EngineContext,
request: CreateTableRequest,
) -> Result<TableRef> {
let catalog_name = request.catalog_name.clone();
let schema_name = request.schema_name.clone();
let table_name = request.table_name.clone();
let table_ref = EmptyTable::table(request);
let _ = self
.tables
.lock()
.await
.insert((catalog_name, schema_name, table_name), table_ref.clone());
Ok(table_ref)
}
async fn open_table(
&self,
_ctx: &EngineContext,
request: OpenTableRequest,
) -> Result<Option<TableRef>> {
let catalog_name = request.catalog_name;
let schema_name = request.schema_name;
let table_name = request.table_name;
let res = self
.tables
.lock()
.await
.get(&(catalog_name, schema_name, table_name))
.cloned();
Ok(res)
}
async fn alter_table(
&self,
_ctx: &EngineContext,
_request: AlterTableRequest,
) -> Result<TableRef> {
unimplemented!()
}
fn get_table(&self, _ctx: &EngineContext, _table_id: TableId) -> Result<Option<TableRef>> {
unimplemented!()
}
fn table_exists(&self, _ctx: &EngineContext, _table_id: TableId) -> bool {
unimplemented!()
}
async fn drop_table(&self, _ctx: &EngineContext, _request: DropTableRequest) -> Result<bool> {
unimplemented!()
}
async fn close(&self) -> Result<()> {
Ok(())
}
async fn truncate_table(
&self,
_ctx: &EngineContext,
_request: TruncateTableRequest,
) -> Result<bool> {
unimplemented!()
}
}
impl TableEngineProcedure for MockTableEngine {
fn create_table_procedure(
&self,
_ctx: &EngineContext,
_request: CreateTableRequest,
) -> Result<BoxedProcedure> {
unimplemented!()
}
fn alter_table_procedure(
&self,
_ctx: &EngineContext,
_request: AlterTableRequest,
) -> Result<BoxedProcedure> {
unimplemented!()
}
fn drop_table_procedure(
&self,
_ctx: &EngineContext,
_request: DropTableRequest,
) -> Result<BoxedProcedure> {
unimplemented!()
}
fn truncate_table_procedure(
&self,
_ctx: &EngineContext,
_request: TruncateTableRequest,
) -> Result<BoxedProcedure> {
unimplemented!()
}
}