feat: Procedure to create table and register table to catalog (#1040)

* feat: Add table-procedures crate

* feat: Implement procedure to create table

* feat: Integrate procedure manager to datanode

* test: Test CreateTableProcedure

* refactor: Rename table-procedures to table-procedure

* feat: Implement create_table_by_procedure

* chore: Remove comment

* chore: Add todo

* feat: Add procedure config to standalone mode

* feat: Register table-procedure loaders

* feat: Address review comments

CreateTableProcedure just return error if the subprocedure is failed

* chore: Address CR comments
This commit is contained in:
Yingwen
2023-02-27 11:49:23 +08:00
committed by GitHub
parent df751c38b4
commit bd377ef329
20 changed files with 834 additions and 16 deletions

24
Cargo.lock generated
View File

@@ -2186,6 +2186,7 @@ dependencies = [
"common-error",
"common-grpc",
"common-grpc-expr",
"common-procedure",
"common-query",
"common-recordbatch",
"common-runtime",
@@ -2218,6 +2219,7 @@ dependencies = [
"store-api",
"substrait 0.1.0",
"table",
"table-procedure",
"tempdir",
"tokio",
"tokio-stream",
@@ -7473,6 +7475,28 @@ dependencies = [
"tokio-util",
]
[[package]]
name = "table-procedure"
version = "0.1.0"
dependencies = [
"async-trait",
"catalog",
"common-error",
"common-procedure",
"common-telemetry",
"datatypes",
"log-store",
"mito",
"object-store",
"serde",
"serde_json",
"snafu",
"storage",
"table",
"tempdir",
"tokio",
]
[[package]]
name = "tagptr"
version = "0.2.0"

View File

@@ -37,6 +37,7 @@ members = [
"src/storage",
"src/store-api",
"src/table",
"src/table-procedure",
"tests-integration",
"tests/runner",
]

View File

@@ -29,3 +29,7 @@ tcp_nodelay = false
max_inflight_tasks = 4
max_files_in_level0 = 16
max_purge_tasks = 32
[procedure.store]
type = 'File'
data_dir = '/tmp/greptimedb/procedure/'

View File

@@ -14,7 +14,6 @@ purge_threshold = '50GB'
read_batch_size = 128
sync_write = false
[storage]
type = 'File'
data_dir = '/tmp/greptimedb/data/'
@@ -42,3 +41,7 @@ enable = true
addr = '127.0.0.1:4003'
runtime_size = 2
check_pwd = false
[procedure.store]
type = 'File'
data_dir = '/tmp/greptimedb/procedure/'

View File

@@ -14,7 +14,9 @@
use clap::Parser;
use common_telemetry::logging;
use datanode::datanode::{Datanode, DatanodeOptions, FileConfig, ObjectStoreConfig};
use datanode::datanode::{
Datanode, DatanodeOptions, FileConfig, ObjectStoreConfig, ProcedureConfig,
};
use meta_client::MetaClientOptions;
use servers::Mode;
use snafu::ResultExt;
@@ -65,6 +67,8 @@ struct StartCommand {
data_dir: Option<String>,
#[clap(long)]
wal_dir: Option<String>,
#[clap(long)]
procedure_dir: Option<String>,
}
impl StartCommand {
@@ -134,6 +138,11 @@ impl TryFrom<StartCommand> for DatanodeOptions {
if let Some(wal_dir) = cmd.wal_dir {
opts.wal.dir = wal_dir;
}
if let Some(procedure_dir) = cmd.procedure_dir {
opts.procedure = Some(ProcedureConfig::from_file_path(procedure_dir));
}
Ok(opts)
}
}

View File

@@ -18,7 +18,7 @@ use clap::Parser;
use common_base::Plugins;
use common_telemetry::info;
use datanode::datanode::{
CompactionConfig, Datanode, DatanodeOptions, ObjectStoreConfig, WalConfig,
CompactionConfig, Datanode, DatanodeOptions, ObjectStoreConfig, ProcedureConfig, WalConfig,
};
use datanode::instance::InstanceRef;
use frontend::frontend::{Frontend, FrontendOptions};
@@ -81,6 +81,7 @@ pub struct StandaloneOptions {
pub wal: WalConfig,
pub storage: ObjectStoreConfig,
pub compaction: CompactionConfig,
pub procedure: Option<ProcedureConfig>,
}
impl Default for StandaloneOptions {
@@ -99,6 +100,7 @@ impl Default for StandaloneOptions {
wal: WalConfig::default(),
storage: ObjectStoreConfig::default(),
compaction: CompactionConfig::default(),
procedure: None,
}
}
}
@@ -125,6 +127,7 @@ impl StandaloneOptions {
wal: self.wal,
storage: self.storage,
compaction: self.compaction,
procedure: self.procedure,
..Default::default()
}
}

View File

@@ -123,8 +123,6 @@ pub(crate) struct ManagerContext {
loaders: Mutex<HashMap<String, BoxedProcedureLoader>>,
lock_map: LockMap,
procedures: RwLock<HashMap<ProcedureId, ProcedureMetaRef>>,
// TODO(yingwen): Now we never clean the messages. But when the root procedure is done, we
// should be able to remove the its message and all its child messages.
/// Messages loaded from the procedure store.
messages: Mutex<HashMap<ProcedureId, ProcedureMessage>>,
}

View File

@@ -21,6 +21,7 @@ common-catalog = { path = "../common/catalog" }
common-error = { path = "../common/error" }
common-grpc = { path = "../common/grpc" }
common-grpc-expr = { path = "../common/grpc-expr" }
common-procedure = { path = "../common/procedure" }
common-query = { path = "../common/query" }
common-recordbatch = { path = "../common/recordbatch" }
common-runtime = { path = "../common/runtime" }
@@ -52,6 +53,7 @@ storage = { path = "../storage" }
store-api = { path = "../store-api" }
substrait = { path = "../common/substrait" }
table = { path = "../table" }
table-procedure = { path = "../table-procedure" }
tokio.workspace = true
tokio-stream = { version = "0.1", features = ["net"] }
tonic.workspace = true

View File

@@ -144,6 +144,27 @@ impl From<&DatanodeOptions> for StorageEngineConfig {
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct ProcedureConfig {
/// Storage config for procedure manager.
pub store: ObjectStoreConfig,
}
impl Default for ProcedureConfig {
fn default() -> ProcedureConfig {
ProcedureConfig::from_file_path("/tmp/greptimedb/procedure/".to_string())
}
}
impl ProcedureConfig {
pub fn from_file_path(path: String) -> ProcedureConfig {
ProcedureConfig {
store: ObjectStoreConfig::File(FileConfig { data_dir: path }),
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct DatanodeOptions {
@@ -159,6 +180,7 @@ pub struct DatanodeOptions {
pub wal: WalConfig,
pub storage: ObjectStoreConfig,
pub compaction: CompactionConfig,
pub procedure: Option<ProcedureConfig>,
}
impl Default for DatanodeOptions {
@@ -176,6 +198,7 @@ impl Default for DatanodeOptions {
wal: WalConfig::default(),
storage: ObjectStoreConfig::default(),
compaction: CompactionConfig::default(),
procedure: None,
}
}
}

View File

@@ -15,6 +15,7 @@
use std::any::Any;
use common_error::prelude::*;
use common_procedure::ProcedureId;
use common_recordbatch::error::Error as RecordBatchError;
use datafusion::parquet;
use datatypes::prelude::ConcreteDataType;
@@ -394,6 +395,31 @@ pub enum Error {
#[snafu(backtrace)]
source: table::error::Error,
},
#[snafu(display("Failed to recover procedure, source: {}", source))]
RecoverProcedure {
#[snafu(backtrace)]
source: common_procedure::error::Error,
},
#[snafu(display("Failed to submit procedure, source: {}", source))]
SubmitProcedure {
#[snafu(backtrace)]
source: common_procedure::error::Error,
},
#[snafu(display("Failed to wait procedure done, source: {}", source))]
WaitProcedure {
source: tokio::sync::watch::error::RecvError,
backtrace: Backtrace,
},
// TODO(yingwen): Use procedure's error.
#[snafu(display("Failed to execute procedure, procedure_id: {}", procedure_id))]
ProcedureExec {
procedure_id: ProcedureId,
backtrace: Backtrace,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -470,6 +496,10 @@ impl ErrorExt for Error {
CopyTable { source, .. } => source.status_code(),
TableScanExec { source, .. } => source.status_code(),
UnrecognizedTableOption { .. } => StatusCode::InvalidArguments,
RecoverProcedure { source, .. } | SubmitProcedure { source, .. } => {
source.status_code()
}
WaitProcedure { .. } | ProcedureExec { .. } => StatusCode::Internal,
}
}

View File

@@ -21,6 +21,8 @@ use catalog::{CatalogManager, CatalogManagerRef, RegisterTableRequest};
use common_base::readable_size::ReadableSize;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID};
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::ProcedureManagerRef;
use common_telemetry::logging::info;
use log_store::raft_engine::log_store::RaftEngineLogStore;
use log_store::LogConfig;
@@ -45,11 +47,11 @@ use table::table::TableIdProviderRef;
use table::Table;
use crate::datanode::{
DatanodeOptions, ObjectStoreConfig, WalConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE,
DatanodeOptions, ObjectStoreConfig, ProcedureConfig, WalConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE,
};
use crate::error::{
self, CatalogSnafu, MetaClientInitSnafu, MissingMetasrvOptsSnafu, MissingNodeIdSnafu,
NewCatalogSnafu, OpenLogStoreSnafu, Result,
NewCatalogSnafu, OpenLogStoreSnafu, RecoverProcedureSnafu, Result,
};
use crate::heartbeat::HeartbeatTask;
use crate::script::ScriptExecutor;
@@ -173,12 +175,32 @@ impl Instance {
catalog_manager.clone(),
)),
};
let procedure_manager = create_procedure_manager(&opts.procedure).await?;
// Recover procedures.
if let Some(procedure_manager) = &procedure_manager {
table_engine.register_procedure_loaders(&**procedure_manager);
table_procedure::register_procedure_loaders(
catalog_manager.clone(),
table_engine.clone(),
table_engine.clone(),
&**procedure_manager,
);
procedure_manager
.recover()
.await
.context(RecoverProcedureSnafu)?;
}
Ok(Self {
query_engine: query_engine.clone(),
sql_handler: SqlHandler::new(
table_engine,
table_engine.clone(),
catalog_manager.clone(),
query_engine.clone(),
table_engine,
procedure_manager,
),
catalog_manager,
script_executor,
@@ -400,3 +422,21 @@ pub(crate) async fn create_log_store(wal_config: &WalConfig) -> Result<RaftEngin
.context(OpenLogStoreSnafu)?;
Ok(logstore)
}
async fn create_procedure_manager(
procedure_config: &Option<ProcedureConfig>,
) -> Result<Option<ProcedureManagerRef>> {
let Some(procedure_config) = procedure_config else {
return Ok(None);
};
info!(
"Creating procedure manager with config: {:?}",
procedure_config
);
let object_store = new_object_store(&procedure_config.store).await?;
let manager_config = ManagerConfig { object_store };
Ok(Some(Arc::new(LocalManager::new(manager_config))))
}

View File

@@ -96,9 +96,11 @@ impl Instance {
Ok(Self {
query_engine: query_engine.clone(),
sql_handler: SqlHandler::new(
table_engine,
table_engine.clone(),
catalog_manager.clone(),
query_engine.clone(),
table_engine,
None,
),
catalog_manager,
script_executor,

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use catalog::CatalogManagerRef;
use common_procedure::ProcedureManagerRef;
use common_query::Output;
use common_telemetry::error;
use query::query_engine::QueryEngineRef;
@@ -23,7 +24,7 @@ use sql::statements::delete::Delete;
use sql::statements::describe::DescribeTable;
use sql::statements::explain::Explain;
use sql::statements::show::{ShowDatabases, ShowTables};
use table::engine::{EngineContext, TableEngineRef, TableReference};
use table::engine::{EngineContext, TableEngineProcedureRef, TableEngineRef, TableReference};
use table::requests::*;
use table::TableRef;
@@ -57,6 +58,8 @@ pub struct SqlHandler {
table_engine: TableEngineRef,
catalog_manager: CatalogManagerRef,
query_engine: QueryEngineRef,
engine_procedure: TableEngineProcedureRef,
procedure_manager: Option<ProcedureManagerRef>,
}
impl SqlHandler {
@@ -64,11 +67,15 @@ impl SqlHandler {
table_engine: TableEngineRef,
catalog_manager: CatalogManagerRef,
query_engine: QueryEngineRef,
engine_procedure: TableEngineProcedureRef,
procedure_manager: Option<ProcedureManagerRef>,
) -> Self {
Self {
table_engine,
catalog_manager,
query_engine,
engine_procedure,
procedure_manager,
}
}
@@ -250,7 +257,13 @@ mod tests {
let factory = QueryEngineFactory::new(catalog_list.clone());
let query_engine = factory.query_engine();
let sql_handler = SqlHandler::new(table_engine, catalog_list.clone(), query_engine.clone());
let sql_handler = SqlHandler::new(
table_engine.clone(),
catalog_list.clone(),
query_engine.clone(),
table_engine,
None,
);
let stmt = match QueryLanguageParser::parse_sql(sql).unwrap() {
QueryStatement::Sql(Statement::Insert(i)) => i,

View File

@@ -15,9 +15,9 @@
use std::collections::HashMap;
use catalog::{RegisterSchemaRequest, RegisterTableRequest};
use common_procedure::{ProcedureManagerRef, ProcedureState, ProcedureWithId};
use common_query::Output;
use common_telemetry::tracing::info;
use common_telemetry::tracing::log::error;
use common_telemetry::tracing::{error, info};
use datatypes::schema::RawSchema;
use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
@@ -28,12 +28,13 @@ use store_api::storage::consts::TIME_INDEX_NAME;
use table::engine::{EngineContext, TableReference};
use table::metadata::TableId;
use table::requests::*;
use table_procedure::CreateTableProcedure;
use crate::error::{
self, CatalogNotFoundSnafu, CatalogSnafu, ConstraintNotSupportedSnafu, CreateTableSnafu,
IllegalPrimaryKeysDefSnafu, InsertSystemCatalogSnafu, KeyColumnNotFoundSnafu,
RegisterSchemaSnafu, Result, SchemaExistsSnafu, SchemaNotFoundSnafu,
UnrecognizedTableOptionSnafu,
ProcedureExecSnafu, RegisterSchemaSnafu, Result, SchemaExistsSnafu, SchemaNotFoundSnafu,
SubmitProcedureSnafu, UnrecognizedTableOptionSnafu, WaitProcedureSnafu,
};
use crate::sql::SqlHandler;
@@ -72,6 +73,10 @@ impl SqlHandler {
}
pub(crate) async fn create_table(&self, req: CreateTableRequest) -> Result<Output> {
if let Some(procedure_manager) = &self.procedure_manager {
return self.create_table_by_procedure(procedure_manager, req).await;
}
let ctx = EngineContext {};
// first check if catalog and schema exist
let catalog = self
@@ -127,6 +132,43 @@ impl SqlHandler {
Ok(Output::AffectedRows(0))
}
pub(crate) async fn create_table_by_procedure(
&self,
procedure_manager: &ProcedureManagerRef,
req: CreateTableRequest,
) -> Result<Output> {
let table_name = req.table_name.clone();
let procedure = CreateTableProcedure::new(
req,
self.catalog_manager.clone(),
self.table_engine.clone(),
self.engine_procedure.clone(),
);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
let procedure_id = procedure_with_id.id;
info!("Create table {} by procedure {}", table_name, procedure_id);
let mut watcher = procedure_manager
.submit(procedure_with_id)
.await
.context(SubmitProcedureSnafu)?;
// TODO(yingwen): Wrap this into a function and add error to ProcedureState::Failed.
loop {
watcher.changed().await.context(WaitProcedureSnafu)?;
match *watcher.borrow() {
ProcedureState::Running => (),
ProcedureState::Done => {
return Ok(Output::AffectedRows(0));
}
ProcedureState::Failed => {
return ProcedureExecSnafu { procedure_id }.fail();
}
}
}
}
/// Converts [CreateTable] to [SqlRequest::CreateTable].
pub(crate) fn create_to_request(
&self,

View File

@@ -140,7 +140,13 @@ pub async fn create_mock_sql_handler() -> SqlHandler {
let catalog_list = catalog::local::new_memory_catalog_list().unwrap();
let factory = QueryEngineFactory::new(catalog_list);
SqlHandler::new(mock_engine, catalog_manager, factory.query_engine())
SqlHandler::new(
mock_engine.clone(),
catalog_manager,
factory.query_engine(),
mock_engine,
None,
)
}
pub(crate) async fn setup_test_instance(test_name: &str) -> MockInstance {

View File

@@ -0,0 +1,25 @@
[package]
name = "table-procedure"
version.workspace = true
edition.workspace = true
license.workspace = true
[dependencies]
async-trait.workspace = true
catalog = { path = "../catalog" }
common-error = { path = "../common/error" }
common-procedure = { path = "../common/procedure" }
common-telemetry = { path = "../common/telemetry" }
datatypes = { path = "../datatypes" }
serde.workspace = true
serde_json.workspace = true
snafu.workspace = true
table = { path = "../table" }
[dev-dependencies]
log-store = { path = "../log-store" }
mito = { path = "../mito" }
object-store = { path = "../object-store" }
storage = { path = "../storage" }
tokio.workspace = true
tempdir = "0.3"

View File

@@ -0,0 +1,388 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Procedure to create a table.
use async_trait::async_trait;
use catalog::{CatalogManagerRef, RegisterTableRequest};
use common_procedure::{
Context, Error, LockKey, Procedure, ProcedureId, ProcedureManager, ProcedureState,
ProcedureWithId, Result, Status,
};
use common_telemetry::logging;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use table::engine::{EngineContext, TableEngineProcedureRef, TableEngineRef, TableReference};
use table::requests::CreateTableRequest;
use crate::error::{
AccessCatalogSnafu, CatalogNotFoundSnafu, DeserializeProcedureSnafu, SchemaNotFoundSnafu,
SerializeProcedureSnafu, SubprocedureFailedSnafu,
};
/// Procedure to create a table.
pub struct CreateTableProcedure {
data: CreateTableData,
catalog_manager: CatalogManagerRef,
table_engine: TableEngineRef,
engine_procedure: TableEngineProcedureRef,
}
#[async_trait]
impl Procedure for CreateTableProcedure {
fn type_name(&self) -> &str {
Self::TYPE_NAME
}
async fn execute(&mut self, ctx: &Context) -> Result<Status> {
match self.data.state {
CreateTableState::Prepare => self.on_prepare(),
CreateTableState::EngineCreateTable => self.on_engine_create_table(ctx).await,
CreateTableState::RegisterCatalog => self.on_register_catalog().await,
}
}
fn dump(&self) -> Result<String> {
let json = serde_json::to_string(&self.data).context(SerializeProcedureSnafu)?;
Ok(json)
}
fn lock_key(&self) -> LockKey {
// We lock the whole table.
let table_name = self.data.table_ref().to_string();
LockKey::single(table_name)
}
}
impl CreateTableProcedure {
const TYPE_NAME: &str = "table-procedures::CreateTableProcedure";
/// Returns a new [CreateTableProcedure].
pub fn new(
request: CreateTableRequest,
catalog_manager: CatalogManagerRef,
table_engine: TableEngineRef,
engine_procedure: TableEngineProcedureRef,
) -> CreateTableProcedure {
CreateTableProcedure {
data: CreateTableData {
state: CreateTableState::Prepare,
request,
subprocedure_id: None,
},
catalog_manager,
table_engine,
engine_procedure,
}
}
/// Register the loader of this procedure to the `procedure_manager`.
///
/// # Panics
/// Panics on error.
pub fn register_loader(
catalog_manager: CatalogManagerRef,
engine_procedure: TableEngineProcedureRef,
table_engine: TableEngineRef,
procedure_manager: &dyn ProcedureManager,
) {
procedure_manager
.register_loader(
Self::TYPE_NAME,
Box::new(move |data| {
Self::from_json(
data,
catalog_manager.clone(),
table_engine.clone(),
engine_procedure.clone(),
)
.map(|p| Box::new(p) as _)
}),
)
.unwrap()
}
/// Recover the procedure from json.
fn from_json(
json: &str,
catalog_manager: CatalogManagerRef,
table_engine: TableEngineRef,
engine_procedure: TableEngineProcedureRef,
) -> Result<Self> {
let data: CreateTableData =
serde_json::from_str(json).context(DeserializeProcedureSnafu)?;
Ok(CreateTableProcedure {
data,
catalog_manager,
table_engine,
engine_procedure,
})
}
fn on_prepare(&mut self) -> Result<Status> {
// Check whether catalog and schema exist.
let catalog = self
.catalog_manager
.catalog(&self.data.request.catalog_name)
.context(AccessCatalogSnafu)?
.with_context(|| {
logging::error!(
"Failed to create table {}, catalog not found",
self.data.table_ref()
);
CatalogNotFoundSnafu {
name: &self.data.request.catalog_name,
}
})?;
catalog
.schema(&self.data.request.schema_name)
.context(AccessCatalogSnafu)?
.with_context(|| {
logging::error!(
"Failed to create table {}, schema not found",
self.data.table_ref(),
);
SchemaNotFoundSnafu {
name: &self.data.request.schema_name,
}
})?;
self.data.state = CreateTableState::EngineCreateTable;
// Assign procedure id to the subprocedure.
self.data.subprocedure_id = Some(ProcedureId::random());
Ok(Status::executing(true))
}
async fn on_engine_create_table(&mut self, ctx: &Context) -> Result<Status> {
// Safety: subprocedure id is always set in this state.
let sub_id = self.data.subprocedure_id.unwrap();
// Query subprocedure state.
let Some(sub_state) = ctx.provider.procedure_state(sub_id).await? else {
// We need to submit the subprocedure if it doesn't exist. We always need to
// do this check as we might not submitted the subprocedure yet when the manager
// recover this procedure from procedure store.
logging::info!(
"On engine create table {}, not found, sub_id: {}",
self.data.request.table_name,
sub_id
);
// If the sub procedure is not found, we create a new sub procedure with the same id.
let engine_ctx = EngineContext::default();
let procedure = self
.engine_procedure
.create_table_procedure(&engine_ctx, self.data.request.clone())
.map_err(Error::external)?;
return Ok(Status::Suspended {
subprocedures: vec![ProcedureWithId {
id: sub_id,
procedure,
}],
persist: true,
});
};
match sub_state {
ProcedureState::Running => Ok(Status::Suspended {
subprocedures: Vec::new(),
persist: false,
}),
ProcedureState::Done => {
logging::info!(
"On engine create table {}, done, sub_id: {}",
self.data.request.table_name,
sub_id
);
// The sub procedure is done, we can execute next step.
self.data.state = CreateTableState::RegisterCatalog;
Ok(Status::executing(true))
}
ProcedureState::Failed => {
// Return error if the subprocedure is failed.
SubprocedureFailedSnafu {
subprocedure_id: sub_id,
}
.fail()?
}
}
}
async fn on_register_catalog(&mut self) -> Result<Status> {
let catalog = self
.catalog_manager
.catalog(&self.data.request.catalog_name)
.context(AccessCatalogSnafu)?
.context(CatalogNotFoundSnafu {
name: &self.data.request.catalog_name,
})?;
let schema = catalog
.schema(&self.data.request.schema_name)
.context(AccessCatalogSnafu)?
.context(SchemaNotFoundSnafu {
name: &self.data.request.schema_name,
})?;
let table_exists = schema
.table(&self.data.request.table_name)
.map_err(Error::external)?
.is_some();
if table_exists {
// Table already exists.
return Ok(Status::Done);
}
let engine_ctx = EngineContext::default();
let table_ref = self.data.table_ref();
// Safety: The procedure owns the lock so the table should exist.
let table = self
.table_engine
.get_table(&engine_ctx, &table_ref)
.map_err(Error::external)?
.unwrap();
let register_req = RegisterTableRequest {
catalog: self.data.request.catalog_name.clone(),
schema: self.data.request.schema_name.clone(),
table_name: self.data.request.table_name.clone(),
table_id: self.data.request.id,
table,
};
self.catalog_manager
.register_table(register_req)
.await
.map_err(Error::external)?;
Ok(Status::Done)
}
}
/// Represents each step while creating a table in the datanode.
#[derive(Debug, Serialize, Deserialize)]
enum CreateTableState {
/// Validate request and prepare to create table.
Prepare,
/// Create table in the table engine.
EngineCreateTable,
/// Register the table to the catalog.
RegisterCatalog,
}
/// Serializable data of [CreateTableProcedure].
#[derive(Debug, Serialize, Deserialize)]
struct CreateTableData {
/// Current state.
state: CreateTableState,
/// Request to create this table.
request: CreateTableRequest,
/// Id of the subprocedure to create this table in the engine.
///
/// This id is `Some` while the procedure is in [CreateTableState::EngineCreateTable]
/// state.
subprocedure_id: Option<ProcedureId>,
}
impl CreateTableData {
fn table_ref(&self) -> TableReference {
TableReference {
catalog: &self.request.catalog_name,
schema: &self.request.schema_name,
table: &self.request.table_name,
}
}
}
#[cfg(test)]
mod tests {
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, RawSchema};
use table::engine::{EngineContext, TableEngine};
use super::*;
use crate::test_util::TestEnv;
fn schema_for_test() -> RawSchema {
let column_schemas = vec![
// Key
ColumnSchema::new("host", ConcreteDataType::string_datatype(), false),
// Nullable value column: cpu
ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true),
// Non-null value column: memory
ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), false),
ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
true,
)
.with_time_index(true),
];
RawSchema::new(column_schemas)
}
fn new_create_request(table_name: &str) -> CreateTableRequest {
CreateTableRequest {
id: 1,
catalog_name: "greptime".to_string(),
schema_name: "public".to_string(),
table_name: table_name.to_string(),
desc: Some("a test table".to_string()),
schema: schema_for_test(),
region_numbers: vec![0, 1],
create_if_not_exists: true,
primary_key_indices: vec![0],
table_options: Default::default(),
}
}
#[tokio::test]
async fn test_create_table_procedure() {
let TestEnv {
dir: _dir,
table_engine,
procedure_manager,
catalog_manager,
} = TestEnv::new("create");
let table_name = "test_create";
let request = new_create_request(table_name);
let procedure = CreateTableProcedure::new(
request.clone(),
catalog_manager,
table_engine.clone(),
table_engine.clone(),
);
let table_ref = TableReference {
catalog: &request.catalog_name,
schema: &request.schema_name,
table: &request.table_name,
};
let engine_ctx = EngineContext::default();
assert!(table_engine
.get_table(&engine_ctx, &table_ref)
.unwrap()
.is_none());
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
let mut watcher = procedure_manager.submit(procedure_with_id).await.unwrap();
watcher.changed().await.unwrap();
assert!(table_engine
.get_table(&engine_ctx, &table_ref)
.unwrap()
.is_some());
}
}

View File

@@ -0,0 +1,89 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use common_error::prelude::*;
use common_procedure::ProcedureId;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub(crate)))]
pub enum Error {
#[snafu(display("Failed to serialize procedure to json, source: {}", source))]
SerializeProcedure {
source: serde_json::Error,
backtrace: Backtrace,
},
#[snafu(display("Failed to deserialize procedure from json, source: {}", source))]
DeserializeProcedure {
source: serde_json::Error,
backtrace: Backtrace,
},
#[snafu(display("Invalid raw schema, source: {}", source))]
InvalidRawSchema {
#[snafu(backtrace)]
source: datatypes::error::Error,
},
#[snafu(display("Failed to access catalog, source: {}", source))]
AccessCatalog {
#[snafu(backtrace)]
source: catalog::error::Error,
},
#[snafu(display("Catalog {} not found", name))]
CatalogNotFound { name: String },
#[snafu(display("Schema {} not found", name))]
SchemaNotFound { name: String },
#[snafu(display("Subprocedure {} failed", subprocedure_id))]
SubprocedureFailed {
subprocedure_id: ProcedureId,
backtrace: Backtrace,
},
}
pub type Result<T> = std::result::Result<T, Error>;
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
use Error::*;
match self {
SerializeProcedure { .. } | DeserializeProcedure { .. } | SubprocedureFailed { .. } => {
StatusCode::Internal
}
InvalidRawSchema { source, .. } => source.status_code(),
AccessCatalog { source } => source.status_code(),
CatalogNotFound { .. } | SchemaNotFound { .. } => StatusCode::InvalidArguments,
}
}
fn backtrace_opt(&self) -> Option<&Backtrace> {
ErrorCompat::backtrace(self)
}
fn as_any(&self) -> &dyn Any {
self
}
}
impl From<Error> for common_procedure::Error {
fn from(e: Error) -> common_procedure::Error {
common_procedure::Error::external(e)
}
}

View File

@@ -0,0 +1,43 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Procedures for table operations.
mod create;
pub mod error;
#[cfg(test)]
mod test_util;
use catalog::CatalogManagerRef;
use common_procedure::ProcedureManager;
pub use create::CreateTableProcedure;
use table::engine::{TableEngineProcedureRef, TableEngineRef};
/// Register all procedure loaders to the procedure manager.
///
/// # Panics
/// Panics on error.
pub fn register_procedure_loaders(
catalog_manager: CatalogManagerRef,
engine_procedure: TableEngineProcedureRef,
table_engine: TableEngineRef,
procedure_manager: &dyn ProcedureManager,
) {
CreateTableProcedure::register_loader(
catalog_manager,
engine_procedure,
table_engine,
procedure_manager,
);
}

View File

@@ -0,0 +1,73 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use catalog::local::MemoryCatalogManager;
use catalog::CatalogManagerRef;
use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::ProcedureManagerRef;
use log_store::NoopLogStore;
use mito::config::EngineConfig;
use mito::engine::MitoEngine;
use object_store::services::Fs;
use object_store::{ObjectStore, ObjectStoreBuilder};
use storage::compaction::noop::NoopCompactionScheduler;
use storage::config::EngineConfig as StorageEngineConfig;
use storage::EngineImpl;
use tempdir::TempDir;
pub struct TestEnv {
pub dir: TempDir,
pub table_engine: Arc<MitoEngine<EngineImpl<NoopLogStore>>>,
pub procedure_manager: ProcedureManagerRef,
pub catalog_manager: CatalogManagerRef,
}
impl TestEnv {
pub fn new(prefix: &str) -> TestEnv {
let dir = TempDir::new(prefix).unwrap();
let store_dir = format!("{}/db", dir.path().to_string_lossy());
let accessor = Fs::default().root(&store_dir).build().unwrap();
let object_store = ObjectStore::new(accessor).finish();
let compaction_scheduler = Arc::new(NoopCompactionScheduler::default());
let storage_engine = EngineImpl::new(
StorageEngineConfig::default(),
Arc::new(NoopLogStore::default()),
object_store.clone(),
compaction_scheduler,
);
let table_engine = Arc::new(MitoEngine::new(
EngineConfig::default(),
storage_engine,
object_store,
));
let procedure_dir = format!("{}/procedure", dir.path().to_string_lossy());
let accessor = Fs::default().root(&procedure_dir).build().unwrap();
let object_store = ObjectStore::new(accessor).finish();
let procedure_manager = Arc::new(LocalManager::new(ManagerConfig { object_store }));
let catalog_manager = Arc::new(MemoryCatalogManager::default());
TestEnv {
dir,
table_engine,
procedure_manager,
catalog_manager,
}
}
}