feat: support multi table engines (#1277)

* feat: support multi table engines

* refactor: adapt SqlHandler to support multiple table engines

* refactor: refactor TableEngineManager

* chore: apply review suggestions

* chore: apply review suggestions

* chore: apply review suggestions

* chore: snafu context styling
This commit is contained in:
Weny Xu
2023-04-03 23:49:12 +09:00
committed by GitHub
parent 68d3247791
commit 451f9d2d4e
34 changed files with 454 additions and 135 deletions

View File

@@ -101,6 +101,13 @@ pub enum Error {
source: BoxedError,
},
#[snafu(display("Table engine not found: {}, source: {}", engine_name, source))]
TableEngineNotFound {
engine_name: String,
#[snafu(backtrace)]
source: table::error::Error,
},
#[snafu(display("Table not found: {}", table_name))]
TableNotFound {
table_name: String,
@@ -533,6 +540,7 @@ impl ErrorExt for Error {
Insert { source, .. } => source.status_code(),
Delete { source, .. } => source.status_code(),
TableEngineNotFound { source, .. } => source.status_code(),
TableNotFound { .. } => StatusCode::TableNotFound,
ColumnNotFound { .. } => StatusCode::TableColumnNotFound,

View File

@@ -45,6 +45,7 @@ use storage::config::EngineConfig as StorageEngineConfig;
use storage::scheduler::{LocalScheduler, SchedulerConfig};
use storage::EngineImpl;
use store_api::logstore::LogStore;
use table::engine::manager::MemoryTableEngineManager;
use table::requests::FlushTableRequest;
use table::table::numbers::NumbersTable;
use table::table::TableIdProviderRef;
@@ -117,6 +118,8 @@ impl Instance {
object_store,
));
let engine_manager = Arc::new(MemoryTableEngineManager::new(table_engine.clone()));
// create remote catalog manager
let (catalog_manager, table_id_provider) = match opts.mode {
Mode::Standalone => {
@@ -141,7 +144,7 @@ impl Instance {
)
} else {
let catalog = Arc::new(
catalog::local::LocalCatalogManager::try_new(table_engine.clone())
catalog::local::LocalCatalogManager::try_new(engine_manager)
.await
.context(CatalogSnafu)?,
);
@@ -155,7 +158,7 @@ impl Instance {
Mode::Distributed => {
let catalog = Arc::new(catalog::remote::RemoteCatalogManager::new(
table_engine.clone(),
engine_manager,
opts.node_id.context(MissingNodeIdSnafu)?,
Arc::new(MetaKvBackend {
client: meta_client.as_ref().unwrap().clone(),
@@ -194,7 +197,7 @@ impl Instance {
Ok(Self {
query_engine: query_engine.clone(),
sql_handler: SqlHandler::new(
table_engine.clone(),
Arc::new(MemoryTableEngineManager::new(table_engine.clone())),
catalog_manager.clone(),
table_engine,
procedure_manager.clone(),

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use catalog::CatalogManagerRef;
use common_error::prelude::BoxedError;
use common_procedure::ProcedureManagerRef;
@@ -21,12 +23,14 @@ use query::sql::{show_databases, show_tables};
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
use sql::statements::show::{ShowDatabases, ShowTables};
use table::engine::{EngineContext, TableEngineProcedureRef, TableEngineRef, TableReference};
use table::engine::manager::TableEngineManagerRef;
use table::engine::{TableEngineProcedureRef, TableEngineRef, TableReference};
use table::requests::*;
use table::TableRef;
use table::{Table, TableRef};
use crate::error::{
CloseTableEngineSnafu, ExecuteSqlSnafu, GetTableSnafu, Result, TableNotFoundSnafu,
self, CloseTableEngineSnafu, ExecuteSqlSnafu, Result, TableEngineNotFoundSnafu,
TableNotFoundSnafu,
};
use crate::instance::sql::table_idents_to_full_name;
@@ -53,7 +57,7 @@ pub enum SqlRequest {
// Handler to execute SQL except query
#[derive(Clone)]
pub struct SqlHandler {
table_engine: TableEngineRef,
table_engine_manager: TableEngineManagerRef,
catalog_manager: CatalogManagerRef,
engine_procedure: TableEngineProcedureRef,
procedure_manager: Option<ProcedureManagerRef>,
@@ -61,13 +65,13 @@ pub struct SqlHandler {
impl SqlHandler {
pub fn new(
table_engine: TableEngineRef,
table_engine_manager: TableEngineManagerRef,
catalog_manager: CatalogManagerRef,
engine_procedure: TableEngineProcedureRef,
procedure_manager: Option<ProcedureManagerRef>,
) -> Self {
Self {
table_engine,
table_engine_manager,
catalog_manager,
engine_procedure,
procedure_manager,
@@ -103,23 +107,38 @@ impl SqlHandler {
result
}
pub(crate) fn get_table(&self, table_ref: &TableReference) -> Result<TableRef> {
self.table_engine
.get_table(&EngineContext::default(), table_ref)
.with_context(|_| GetTableSnafu {
table_name: table_ref.to_string(),
})?
pub async fn get_table(&self, table_ref: &TableReference<'_>) -> Result<TableRef> {
let TableReference {
catalog,
schema,
table,
} = table_ref;
let table = self
.catalog_manager
.table(catalog, schema, table)
.await
.context(error::CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: table_ref.to_string(),
})
})?;
Ok(table)
}
pub fn table_engine(&self) -> TableEngineRef {
self.table_engine.clone()
pub fn table_engine_manager(&self) -> TableEngineManagerRef {
self.table_engine_manager.clone()
}
pub fn table_engine(&self, table: Arc<dyn Table>) -> Result<TableEngineRef> {
let engine_name = &table.table_info().meta.engine;
let engine = self
.table_engine_manager
.engine(engine_name)
.context(TableEngineNotFoundSnafu { engine_name })?;
Ok(engine)
}
pub async fn close(&self) -> Result<()> {
self.table_engine
self.table_engine_manager
.close()
.await
.map_err(BoxedError::new)

View File

@@ -35,20 +35,24 @@ impl SqlHandler {
let full_table_name = table_ref.to_string();
// fetches table via catalog
let table = self.get_table(&table_ref).await?;
// checks the table engine exist
let table_engine = self.table_engine(table)?;
ensure!(
self.table_engine.table_exists(&ctx, &table_ref),
table_engine.table_exists(&ctx, &table_ref),
error::TableNotFoundSnafu {
table_name: &full_table_name,
}
);
let is_rename = req.is_rename_table();
let table =
self.table_engine
.alter_table(&ctx, req)
.await
.context(error::AlterTableSnafu {
table_name: full_table_name,
})?;
let table = table_engine
.alter_table(&ctx, req)
.await
.context(error::AlterTableSnafu {
table_name: full_table_name,
})?;
if is_rename {
let table_info = &table.table_info();
let rename_table_req = RenameTableRequest {

View File

@@ -40,7 +40,7 @@ impl SqlHandler {
schema: &req.schema_name,
table: &req.table_name,
};
let table = self.get_table(&table_ref)?;
let table = self.get_table(&table_ref).await?;
let (_schema, _host, path) = parse_url(&req.location).context(error::ParseUrlSnafu)?;

View File

@@ -32,7 +32,7 @@ impl SqlHandler {
schema: &req.schema_name,
table: &req.table_name,
};
let table = self.get_table(&table_ref)?;
let table = self.get_table(&table_ref).await?;
let stream = table
.scan(None, &[], None)

View File

@@ -34,7 +34,7 @@ use crate::error::{
self, CatalogNotFoundSnafu, CatalogSnafu, ConstraintNotSupportedSnafu, CreateTableSnafu,
IllegalPrimaryKeysDefSnafu, InsertSystemCatalogSnafu, KeyColumnNotFoundSnafu,
RegisterSchemaSnafu, Result, SchemaExistsSnafu, SchemaNotFoundSnafu, SubmitProcedureSnafu,
UnrecognizedTableOptionSnafu, WaitProcedureSnafu,
TableEngineNotFoundSnafu, UnrecognizedTableOptionSnafu, WaitProcedureSnafu,
};
use crate::sql::SqlHandler;
@@ -107,8 +107,14 @@ impl SqlHandler {
// determine catalog and schema from the very beginning
let table_name = req.table_name.clone();
let table = self
.table_engine
let table_engine =
self.table_engine_manager
.engine(&req.engine)
.context(TableEngineNotFoundSnafu {
engine_name: &req.engine,
})?;
let table = table_engine
.create_table(&ctx, req)
.await
.with_context(|_| CreateTableSnafu {
@@ -138,10 +144,16 @@ impl SqlHandler {
req: CreateTableRequest,
) -> Result<Output> {
let table_name = req.table_name.clone();
let table_engine =
self.table_engine_manager
.engine(&req.engine)
.context(TableEngineNotFoundSnafu {
engine_name: &req.engine,
})?;
let procedure = CreateTableProcedure::new(
req,
self.catalog_manager.clone(),
self.table_engine.clone(),
table_engine.clone(),
self.engine_procedure.clone(),
);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
@@ -286,6 +298,7 @@ impl SqlHandler {
primary_key_indices: primary_keys,
create_if_not_exists: stmt.if_not_exists,
table_options,
engine: stmt.engine,
};
Ok(request)
}

View File

@@ -12,11 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use catalog::error::TableNotExistSnafu;
use catalog::DeregisterTableRequest;
use common_error::prelude::BoxedError;
use common_query::Output;
use common_telemetry::info;
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};
use table::engine::{EngineContext, TableReference};
use table::requests::DropTableRequest;
@@ -38,20 +39,36 @@ impl SqlHandler {
};
let table_full_name = table_reference.to_string();
let table = self
.catalog_manager
.table(&req.catalog_name, &req.schema_name, &req.table_name)
.await
.context(error::CatalogSnafu)?
.context(TableNotExistSnafu {
table: &table_full_name,
})
.map_err(BoxedError::new)
.context(error::DropTableSnafu {
table_name: &table_full_name,
})?;
self.catalog_manager
.deregister_table(deregister_table_req)
.await
.map_err(BoxedError::new)
.context(error::DropTableSnafu {
table_name: table_full_name.clone(),
table_name: &table_full_name,
})?;
let ctx = EngineContext {};
self.table_engine()
let engine = self.table_engine(table)?;
engine
.drop_table(&ctx, req)
.await
.map_err(BoxedError::new)
.context(error::DropTableSnafu {
.with_context(|_| error::DropTableSnafu {
table_name: table_full_name.clone(),
})?;

View File

@@ -46,7 +46,7 @@ impl SqlHandler {
table: &req.table_name.to_string(),
};
let table = self.get_table(&table_ref)?;
let table = self.get_table(&table_ref).await?;
let affected_rows = table.insert(req).await.with_context(|_| InsertSnafu {
table_name: table_ref.to_string(),

View File

@@ -15,7 +15,9 @@
use std::sync::Arc;
use std::time::Duration;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID};
use common_catalog::consts::{
DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID, MITO_ENGINE,
};
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, RawSchema};
@@ -23,6 +25,7 @@ use mito::config::EngineConfig;
use mito::table::test_util::{new_test_object_store, MockEngine, MockMitoEngine};
use servers::Mode;
use snafu::ResultExt;
use table::engine::manager::MemoryTableEngineManager;
use table::engine::{EngineContext, TableEngineRef};
use table::requests::{CreateTableRequest, TableOptions};
@@ -122,7 +125,11 @@ pub(crate) async fn create_test_table(
];
let table_name = "demo";
let table_engine: TableEngineRef = instance.sql_handler().table_engine();
let table_engine: TableEngineRef = instance
.sql_handler()
.table_engine_manager()
.engine(MITO_ENGINE)
.unwrap();
let table = table_engine
.create_table(
&EngineContext::default(),
@@ -137,6 +144,7 @@ pub(crate) async fn create_test_table(
primary_key_indices: vec![0], // "host" is in primary keys
table_options: TableOptions::default(),
region_numbers: vec![0],
engine: MITO_ENGINE.to_string(),
},
)
.await
@@ -160,10 +168,11 @@ pub async fn create_mock_sql_handler() -> SqlHandler {
MockEngine::default(),
object_store,
));
let engine_manager = Arc::new(MemoryTableEngineManager::new(mock_engine.clone()));
let catalog_manager = Arc::new(
catalog::local::LocalCatalogManager::try_new(mock_engine.clone())
catalog::local::LocalCatalogManager::try_new(engine_manager.clone())
.await
.unwrap(),
);
SqlHandler::new(mock_engine.clone(), catalog_manager, mock_engine, None)
SqlHandler::new(engine_manager, catalog_manager, mock_engine, None)
}