fix: database base ttl (#4926)

* main:
 Add common-meta dependency and implement SchemaMetadataManager

 - Introduce `common-meta` as a new dependency in `mito2`.
 - Implement `SchemaMetadataManager` for managing schema-level metadata.
 - Update `DatanodeBuilder` and `MitoEngine` to pass `KvBackendRef` for schema metadata management.
 - Add `SchemaMetadataManager` to `RegionWorkerLoop` for compaction handling.
 - Include `SchemaNameKey` usage in compaction-related code.
 - Add `database_metadata_manager` module with `SchemaMetadataManager` struct and associated logic.

* fix/database-base-ttl:
 Refactor metadata management and update compaction logic

 - Remove `database_metadata_manager` and introduce `schema_metadata_manager`
 - Update compaction logic to handle TTL based on schema metadata
 - Adjust tests to use `schema_metadata_manager` for setting up schema options
 - Fix engine creation in tests to pass `kv_backend` explicitly
 - Remove unused imports and apply minor code cleanups

* fix/database-base-ttl:
 Extend CREATE TABLE LIKE to inherit schema options

 - Implement inheritance of database level options for CREATE TABLE LIKE
 - Add schema options to SHOW CREATE TABLE output
 - Refactor create_table_stmt to include schema_options in SQL generation
 - Update error handling to include TableMetadataManagerSnafu

* fix/database-base-ttl:
 Refactor error handling and remove schema dependency in table creation

 - Replace expect with the ? operator for error handling in open_compaction_region
 - Simplify create_logical_tables by removing catalog and schema name parameters
 - Remove unnecessary schema retrieval and merging of schema options in create_table_info
 - Clean up unused imports and redundant code

* fix/database-base-ttl:
 Refactor error handling and update documentation comments

 - Update comment to reflect retrieval of schema options instead of metadata
 - Introduce new error type `GetSchemaMetadataSnafu` for schema metadata retrieval failures
 - Implement error handling for schema metadata retrieval in `find_ttl` function

* fix: toml

* fix/database-base-ttl:
 Refactor SchemaMetadataManager and adjust Cargo.toml dependencies

 - Remove unused imports in schema_metadata_manager.rs
 - Add conditional compilation for SchemaMetadataManager::new
 - Update Cargo.toml to remove "testing" feature from common-meta dependency in main section and add it to dev-dependencies

* fix/database-base-ttl:
 Fix typos in comments and function names across multiple modules

 - Correct spelling of 'parallelism' in region_server, engine, and scan_region modules
 - Amend typo in TODO comment from 'persisent' to 'persistent' in server module
 - Update incorrect test query from 'versiona' to 'version' in federated module tests

* fix/database-base-ttl: Add schema existence check in StatementExecutor for CREATE TABLE operation

* fix/database-base-ttl: Add warning log for failed TTL retrieval in compaction region open function

* fix/database-base-ttl:
 Refactor to use SchemaMetadataManagerRef in Datanode and MitoEngine

 - Replace KvBackendRef with SchemaMetadataManagerRef across various components.
 - Update DatanodeBuilder and MitoEngine to pass SchemaMetadataManagerRef instead of KvBackendRef.
 - Adjust test cases to use get_schema_metadata_manager method for consistency.
This commit is contained in:
Lei, HUANG
2024-11-05 10:51:32 +08:00
committed by GitHub
parent f221ee30fd
commit 3dcd6b8e51
34 changed files with 689 additions and 81 deletions

1
Cargo.lock generated
View File

@@ -6556,6 +6556,7 @@ dependencies = [
"common-error",
"common-function",
"common-macro",
"common-meta",
"common-procedure-test",
"common-query",
"common-recordbatch",

View File

@@ -91,6 +91,7 @@ pub mod catalog_name;
pub mod datanode_table;
pub mod flow;
pub mod node_address;
mod schema_metadata_manager;
pub mod schema_name;
pub mod table_info;
pub mod table_name;
@@ -116,6 +117,7 @@ use flow::flow_route::FlowRouteValue;
use flow::table_flow::TableFlowValue;
use lazy_static::lazy_static;
use regex::Regex;
pub use schema_metadata_manager::{SchemaMetadataManager, SchemaMetadataManagerRef};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};

View File

@@ -0,0 +1,122 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Schema-level metadata manager.
use std::sync::Arc;
use snafu::OptionExt;
use store_api::storage::TableId;
use crate::error::TableInfoNotFoundSnafu;
use crate::key::schema_name::{SchemaManager, SchemaNameKey};
use crate::key::table_info::{TableInfoManager, TableInfoManagerRef};
use crate::kv_backend::KvBackendRef;
use crate::{error, SchemaOptions};
pub type SchemaMetadataManagerRef = Arc<SchemaMetadataManager>;
pub struct SchemaMetadataManager {
table_info_manager: TableInfoManagerRef,
schema_manager: SchemaManager,
#[cfg(any(test, feature = "testing"))]
kv_backend: KvBackendRef,
}
impl SchemaMetadataManager {
/// Creates a new database meta
#[cfg(not(any(test, feature = "testing")))]
pub fn new(kv_backend: KvBackendRef) -> Self {
let table_info_manager = Arc::new(TableInfoManager::new(kv_backend.clone()));
let schema_manager = SchemaManager::new(kv_backend);
Self {
table_info_manager,
schema_manager,
}
}
/// Creates a new database meta
#[cfg(any(test, feature = "testing"))]
pub fn new(kv_backend: KvBackendRef) -> Self {
let table_info_manager = Arc::new(TableInfoManager::new(kv_backend.clone()));
let schema_manager = SchemaManager::new(kv_backend.clone());
Self {
table_info_manager,
schema_manager,
kv_backend,
}
}
/// Gets schema options by table id.
pub async fn get_schema_options_by_table_id(
&self,
table_id: TableId,
) -> error::Result<Option<SchemaOptions>> {
let table_info = self
.table_info_manager
.get(table_id)
.await?
.with_context(|| TableInfoNotFoundSnafu {
table: format!("table id: {}", table_id),
})?;
let key = SchemaNameKey::new(
&table_info.table_info.catalog_name,
&table_info.table_info.schema_name,
);
self.schema_manager.get(key).await
}
#[cfg(any(test, feature = "testing"))]
pub async fn register_region_table_info(
&self,
table_id: TableId,
table_name: &str,
schema_name: &str,
catalog_name: &str,
schema_value: Option<crate::key::schema_name::SchemaNameValue>,
) {
use table::metadata::{RawTableInfo, TableType};
let value = crate::key::table_info::TableInfoValue::new(RawTableInfo {
ident: Default::default(),
name: table_name.to_string(),
desc: None,
catalog_name: catalog_name.to_string(),
schema_name: schema_name.to_string(),
meta: Default::default(),
table_type: TableType::Base,
});
let (txn, _) = self
.table_info_manager
.build_create_txn(table_id, &value)
.unwrap();
let resp = self.kv_backend.txn(txn).await.unwrap();
assert!(resp.succeeded, "Failed to create table metadata");
let key = SchemaNameKey {
catalog: catalog_name,
schema: schema_name,
};
self.schema_manager
.create(key, schema_value, false)
.await
.expect("Failed to create schema metadata");
common_telemetry::info!(
"Register table: {}, id: {}, schema: {}, catalog: {}",
table_name,
table_id,
schema_name,
catalog_name
);
}
}

View File

@@ -134,6 +134,7 @@ impl TableInfoValue {
}
pub type TableInfoManagerRef = Arc<TableInfoManager>;
#[derive(Clone)]
pub struct TableInfoManager {
kv_backend: KvBackendRef,

View File

@@ -54,4 +54,7 @@ pub type DatanodeId = u64;
// The id of the flownode.
pub type FlownodeId = u64;
/// Schema options.
pub type SchemaOptions = key::schema_name::SchemaNameValue;
pub use instruction::RegionIdent;

View File

@@ -427,7 +427,8 @@ mod test {
common_telemetry::init_default_ut_logging();
let mut region_server = mock_region_server();
let mut engine_env = TestEnv::with_prefix("region-alive-keeper");
let engine = Arc::new(engine_env.create_engine(MitoConfig::default()).await);
let engine = engine_env.create_engine(MitoConfig::default()).await;
let engine = Arc::new(engine);
region_server.register_engine(engine.clone());
let alive_keeper = Arc::new(RegionAliveKeeper::new(region_server.clone(), 100));

View File

@@ -23,6 +23,7 @@ use common_base::Plugins;
use common_error::ext::BoxedError;
use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
use common_meta::key::datanode_table::{DatanodeTableManager, DatanodeTableValue};
use common_meta::key::{SchemaMetadataManager, SchemaMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use common_meta::wal_options_allocator::prepare_wal_options;
pub use common_procedure::options::ProcedureConfig;
@@ -207,7 +208,10 @@ impl DatanodeBuilder {
(Box::new(NoopRegionServerEventListener) as _, None)
};
let region_server = self.new_region_server(region_event_listener).await?;
let schema_metadata_manager = Arc::new(SchemaMetadataManager::new(kv_backend.clone()));
let region_server = self
.new_region_server(schema_metadata_manager, region_event_listener)
.await?;
let datanode_table_manager = DatanodeTableManager::new(kv_backend.clone());
let table_values = datanode_table_manager
@@ -312,6 +316,7 @@ impl DatanodeBuilder {
async fn new_region_server(
&self,
schema_metadata_manager: SchemaMetadataManagerRef,
event_listener: RegionServerEventListenerRef,
) -> Result<RegionServer> {
let opts: &DatanodeOptions = &self.opts;
@@ -340,8 +345,13 @@ impl DatanodeBuilder {
);
let object_store_manager = Self::build_object_store_manager(&opts.storage).await?;
let engines =
Self::build_store_engines(opts, object_store_manager, self.plugins.clone()).await?;
let engines = Self::build_store_engines(
opts,
object_store_manager,
schema_metadata_manager,
self.plugins.clone(),
)
.await?;
for engine in engines {
region_server.register_engine(engine);
}
@@ -355,6 +365,7 @@ impl DatanodeBuilder {
async fn build_store_engines(
opts: &DatanodeOptions,
object_store_manager: ObjectStoreManagerRef,
schema_metadata_manager: SchemaMetadataManagerRef,
plugins: Plugins,
) -> Result<Vec<RegionEngineRef>> {
let mut engines = vec![];
@@ -365,6 +376,7 @@ impl DatanodeBuilder {
opts,
object_store_manager.clone(),
config.clone(),
schema_metadata_manager.clone(),
plugins.clone(),
)
.await?;
@@ -390,6 +402,7 @@ impl DatanodeBuilder {
opts: &DatanodeOptions,
object_store_manager: ObjectStoreManagerRef,
config: MitoConfig,
schema_metadata_manager: SchemaMetadataManagerRef,
plugins: Plugins,
) -> Result<MitoEngine> {
let mito_engine = match &opts.wal {
@@ -399,6 +412,7 @@ impl DatanodeBuilder {
Self::build_raft_engine_log_store(&opts.storage.data_home, raft_engine_config)
.await?,
object_store_manager,
schema_metadata_manager,
plugins,
)
.await
@@ -429,6 +443,7 @@ impl DatanodeBuilder {
config,
Self::build_kafka_log_store(kafka_config, global_index_collector).await?,
object_store_manager,
schema_metadata_manager,
plugins,
)
.await

View File

@@ -24,6 +24,7 @@ common-datasource.workspace = true
common-decimal.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-meta.workspace = true
common-query.workspace = true
common-recordbatch.workspace = true
common-runtime.workspace = true
@@ -74,6 +75,7 @@ uuid.workspace = true
[dev-dependencies]
common-function.workspace = true
common-meta = { workspace = true, features = ["testing"] }
common-procedure-test.workspace = true
common-test-util.workspace = true
criterion = "0.4"

View File

@@ -28,7 +28,8 @@ use std::time::{Duration, Instant};
use api::v1::region::compact_request;
use common_base::Plugins;
use common_telemetry::{debug, error, info};
use common_meta::key::SchemaMetadataManagerRef;
use common_telemetry::{debug, error, info, warn};
use common_time::range::TimestampRange;
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
@@ -37,7 +38,7 @@ use datafusion_expr::Expr;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::RegionId;
use store_api::storage::{RegionId, TableId};
use table::predicate::Predicate;
use tokio::sync::mpsc::{self, Sender};
@@ -48,8 +49,8 @@ use crate::compaction::picker::{new_picker, CompactionTask};
use crate::compaction::task::CompactionTaskImpl;
use crate::config::MitoConfig;
use crate::error::{
CompactRegionSnafu, Error, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu,
RemoteCompactionSnafu, Result, TimeRangePredicateOverflowSnafu,
CompactRegionSnafu, Error, GetSchemaMetadataSnafu, RegionClosedSnafu, RegionDroppedSnafu,
RegionTruncatedSnafu, RemoteCompactionSnafu, Result, TimeRangePredicateOverflowSnafu,
};
use crate::metrics::COMPACTION_STAGE_ELAPSED;
use crate::read::projection::ProjectionMapper;
@@ -82,6 +83,7 @@ pub struct CompactionRequest {
pub(crate) cache_manager: CacheManagerRef,
pub(crate) manifest_ctx: ManifestContextRef,
pub(crate) listener: WorkerListener,
pub(crate) schema_metadata_manager: SchemaMetadataManagerRef,
}
impl CompactionRequest {
@@ -141,6 +143,7 @@ impl CompactionScheduler {
access_layer: &AccessLayerRef,
waiter: OptionOutputTx,
manifest_ctx: &ManifestContextRef,
schema_metadata_manager: SchemaMetadataManagerRef,
) -> Result<()> {
if let Some(status) = self.region_status.get_mut(&region_id) {
// Region is compacting. Add the waiter to pending list.
@@ -158,6 +161,7 @@ impl CompactionScheduler {
self.cache_manager.clone(),
manifest_ctx,
self.listener.clone(),
schema_metadata_manager,
);
self.region_status.insert(region_id, status);
let result = self
@@ -173,6 +177,7 @@ impl CompactionScheduler {
&mut self,
region_id: RegionId,
manifest_ctx: &ManifestContextRef,
schema_metadata_manager: SchemaMetadataManagerRef,
) {
let Some(status) = self.region_status.get_mut(&region_id) else {
return;
@@ -186,6 +191,7 @@ impl CompactionScheduler {
self.cache_manager.clone(),
manifest_ctx,
self.listener.clone(),
schema_metadata_manager,
);
// Try to schedule next compaction task for this region.
if let Err(e) = self
@@ -256,10 +262,23 @@ impl CompactionScheduler {
cache_manager,
manifest_ctx,
listener,
schema_metadata_manager,
} = request;
let ttl = find_ttl(
region_id.table_id(),
current_version.options.ttl,
&schema_metadata_manager,
)
.await
.unwrap_or_else(|e| {
warn!(e; "Failed to get ttl for region: {}", region_id);
None
});
debug!(
"Pick compaction strategy {:?} for region: {}",
picker, region_id
"Pick compaction strategy {:?} for region: {}, ttl: {:?}",
picker, region_id, ttl
);
let compaction_region = CompactionRegion {
@@ -273,6 +292,7 @@ impl CompactionScheduler {
access_layer: access_layer.clone(),
manifest_ctx: manifest_ctx.clone(),
file_purger: None,
ttl,
};
let picker_output = {
@@ -414,6 +434,24 @@ impl PendingCompaction {
}
}
/// Finds TTL of table by first examine table options then database options.
async fn find_ttl(
table_id: TableId,
table_ttl: Option<Duration>,
schema_metadata_manager: &SchemaMetadataManagerRef,
) -> Result<Option<Duration>> {
if let Some(table_ttl) = table_ttl {
return Ok(Some(table_ttl));
}
let ttl = schema_metadata_manager
.get_schema_options_by_table_id(table_id)
.await
.context(GetSchemaMetadataSnafu)?
.and_then(|options| options.ttl);
Ok(ttl)
}
/// Status of running and pending region compaction tasks.
struct CompactionStatus {
/// Id of the region.
@@ -471,6 +509,7 @@ impl CompactionStatus {
cache_manager: CacheManagerRef,
manifest_ctx: &ManifestContextRef,
listener: WorkerListener,
schema_metadata_manager: SchemaMetadataManagerRef,
) -> CompactionRequest {
let current_version = self.version_control.current().version;
let start_time = Instant::now();
@@ -484,6 +523,7 @@ impl CompactionStatus {
cache_manager,
manifest_ctx: manifest_ctx.clone(),
listener,
schema_metadata_manager,
};
if let Some(pending) = self.pending_compaction.take() {
@@ -639,6 +679,9 @@ fn get_expired_ssts(
#[cfg(test)]
mod tests {
use common_meta::key::SchemaMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::KvBackendRef;
use tokio::sync::oneshot;
use super::*;
@@ -651,7 +694,19 @@ mod tests {
let (tx, _rx) = mpsc::channel(4);
let mut scheduler = env.mock_compaction_scheduler(tx);
let mut builder = VersionControlBuilder::new();
let schema_metadata_manager = Arc::new(SchemaMetadataManager::new(Arc::new(
MemoryKvBackend::new(),
)
as KvBackendRef));
schema_metadata_manager
.register_region_table_info(
builder.region_id().table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
)
.await;
// Nothing to compact.
let version_control = Arc::new(builder.build());
let (output_tx, output_rx) = oneshot::channel();
@@ -667,6 +722,7 @@ mod tests {
&env.access_layer,
waiter,
&manifest_ctx,
schema_metadata_manager.clone(),
)
.await
.unwrap();
@@ -686,6 +742,7 @@ mod tests {
&env.access_layer,
waiter,
&manifest_ctx,
schema_metadata_manager,
)
.await
.unwrap();
@@ -703,6 +760,19 @@ mod tests {
let mut builder = VersionControlBuilder::new();
let purger = builder.file_purger();
let region_id = builder.region_id();
let schema_metadata_manager = Arc::new(SchemaMetadataManager::new(Arc::new(
MemoryKvBackend::new(),
)
as KvBackendRef));
schema_metadata_manager
.register_region_table_info(
builder.region_id().table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
)
.await;
// 5 files to compact.
let end = 1000 * 1000;
@@ -726,6 +796,7 @@ mod tests {
&env.access_layer,
OptionOutputTx::none(),
&manifest_ctx,
schema_metadata_manager.clone(),
)
.await
.unwrap();
@@ -755,6 +826,7 @@ mod tests {
&env.access_layer,
OptionOutputTx::none(),
&manifest_ctx,
schema_metadata_manager.clone(),
)
.await
.unwrap();
@@ -769,7 +841,7 @@ mod tests {
// On compaction finished and schedule next compaction.
scheduler
.on_compaction_finished(region_id, &manifest_ctx)
.on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager.clone())
.await;
assert_eq!(1, scheduler.region_status.len());
assert_eq!(2, job_scheduler.num_jobs());
@@ -789,6 +861,7 @@ mod tests {
&env.access_layer,
OptionOutputTx::none(),
&manifest_ctx,
schema_metadata_manager,
)
.await
.unwrap();

View File

@@ -16,7 +16,8 @@ use std::sync::Arc;
use std::time::Duration;
use api::v1::region::compact_request;
use common_telemetry::info;
use common_meta::key::SchemaMetadataManagerRef;
use common_telemetry::{info, warn};
use object_store::manager::ObjectStoreManagerRef;
use serde::{Deserialize, Serialize};
use smallvec::SmallVec;
@@ -27,7 +28,7 @@ use store_api::storage::RegionId;
use crate::access_layer::{AccessLayer, AccessLayerRef, OperationType, SstWriteRequest};
use crate::cache::{CacheManager, CacheManagerRef};
use crate::compaction::picker::{new_picker, PickerOutput};
use crate::compaction::CompactionSstReaderBuilder;
use crate::compaction::{find_ttl, CompactionSstReaderBuilder};
use crate::config::MitoConfig;
use crate::error::{EmptyRegionDirSnafu, JoinSnafu, ObjectStoreNotFoundSnafu, Result};
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
@@ -62,6 +63,7 @@ pub struct CompactionRegion {
pub(crate) manifest_ctx: Arc<ManifestContext>,
pub(crate) current_version: VersionRef,
pub(crate) file_purger: Option<Arc<LocalFilePurger>>,
pub(crate) ttl: Option<Duration>,
}
/// OpenCompactionRegionRequest represents the request to open a compaction region.
@@ -78,6 +80,7 @@ pub async fn open_compaction_region(
req: &OpenCompactionRegionRequest,
mito_config: &MitoConfig,
object_store_manager: ObjectStoreManagerRef,
schema_metadata_manager: SchemaMetadataManagerRef,
) -> Result<CompactionRegion> {
let object_store = {
let name = &req.region_options.storage;
@@ -169,6 +172,16 @@ pub async fn open_compaction_region(
Arc::new(version)
};
let ttl = find_ttl(
req.region_id.table_id(),
current_version.options.ttl,
&schema_metadata_manager,
)
.await
.unwrap_or_else(|e| {
warn!(e; "Failed to get ttl for region: {}", region_metadata.region_id);
None
});
Ok(CompactionRegion {
region_id: req.region_id,
region_options: req.region_options.clone(),
@@ -180,6 +193,7 @@ pub async fn open_compaction_region(
manifest_ctx,
current_version,
file_purger: Some(file_purger),
ttl,
})
}

View File

@@ -212,8 +212,9 @@ impl Picker for TwcsPicker {
fn pick(&self, compaction_region: &CompactionRegion) -> Option<PickerOutput> {
let region_id = compaction_region.region_id;
let levels = compaction_region.current_version.ssts.levels();
let ttl = compaction_region.current_version.options.ttl;
let expired_ssts = get_expired_ssts(levels, ttl, Timestamp::current_millis());
let expired_ssts =
get_expired_ssts(levels, compaction_region.ttl, Timestamp::current_millis());
if !expired_ssts.is_empty() {
info!("Expired SSTs in region {}: {:?}", region_id, expired_ssts);
// here we mark expired SSTs as compacting to avoid them being picked.

View File

@@ -66,6 +66,7 @@ use api::region::RegionResponse;
use async_trait::async_trait;
use common_base::Plugins;
use common_error::ext::BoxedError;
use common_meta::key::SchemaMetadataManagerRef;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::tracing;
use common_wal::options::{WalOptions, WAL_OPTIONS_KEY};
@@ -112,13 +113,21 @@ impl MitoEngine {
mut config: MitoConfig,
log_store: Arc<S>,
object_store_manager: ObjectStoreManagerRef,
schema_metadata_manager: SchemaMetadataManagerRef,
plugins: Plugins,
) -> Result<MitoEngine> {
config.sanitize(data_home)?;
Ok(MitoEngine {
inner: Arc::new(
EngineInner::new(config, log_store, object_store_manager, plugins).await?,
EngineInner::new(
config,
log_store,
object_store_manager,
schema_metadata_manager,
plugins,
)
.await?,
),
})
}
@@ -278,13 +287,20 @@ impl EngineInner {
config: MitoConfig,
log_store: Arc<S>,
object_store_manager: ObjectStoreManagerRef,
schema_metadata_manager: SchemaMetadataManagerRef,
plugins: Plugins,
) -> Result<EngineInner> {
let config = Arc::new(config);
let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(log_store.clone()));
Ok(EngineInner {
workers: WorkerGroup::start(config.clone(), log_store, object_store_manager, plugins)
.await?,
workers: WorkerGroup::start(
config.clone(),
log_store,
object_store_manager,
schema_metadata_manager,
plugins,
)
.await?,
config,
wal_raw_entry_reader,
})
@@ -583,6 +599,7 @@ impl RegionEngine for MitoEngine {
// Tests methods.
#[cfg(any(test, feature = "test"))]
#[allow(clippy::too_many_arguments)]
impl MitoEngine {
/// Returns a new [MitoEngine] for tests.
pub async fn new_for_test<S: LogStore>(
@@ -593,6 +610,7 @@ impl MitoEngine {
write_buffer_manager: Option<crate::flush::WriteBufferManagerRef>,
listener: Option<crate::engine::listener::EventListenerRef>,
time_provider: crate::time_provider::TimeProviderRef,
schema_metadata_manager: SchemaMetadataManagerRef,
) -> Result<MitoEngine> {
config.sanitize(data_home)?;
@@ -606,6 +624,7 @@ impl MitoEngine {
object_store_manager,
write_buffer_manager,
listener,
schema_metadata_manager,
time_provider,
)
.await?,

View File

@@ -78,6 +78,16 @@ async fn test_alter_region() {
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
)
.await;
let column_schemas = rows_schema(&request);
let region_dir = request.region_dir.clone();
engine
@@ -167,10 +177,19 @@ fn build_rows_for_tags(
async fn test_put_after_alter() {
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
)
.await;
let mut column_schemas = rows_schema(&request);
let region_dir = request.region_dir.clone();
engine
@@ -266,6 +285,16 @@ async fn test_alter_region_retry() {
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
)
.await;
let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
@@ -320,6 +349,16 @@ async fn test_alter_on_flushing() {
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
)
.await;
let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))

View File

@@ -98,6 +98,16 @@ async fn test_append_mode_compaction() {
.await;
let region_id = RegionId::new(1, 1);
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
)
.await;
let request = CreateRequestBuilder::new()
.insert_option("compaction.type", "twcs")
.insert_option("compaction.twcs.max_active_window_runs", "2")

View File

@@ -112,6 +112,16 @@ async fn test_compaction_region() {
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
)
.await;
let request = CreateRequestBuilder::new()
.insert_option("compaction.type", "twcs")
.insert_option("compaction.twcs.max_active_window_runs", "1")
@@ -171,8 +181,18 @@ async fn test_compaction_region_with_overlapping() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
)
.await;
let request = CreateRequestBuilder::new()
.insert_option("compaction.type", "twcs")
.insert_option("compaction.twcs.max_active_window_runs", "2")
@@ -217,6 +237,17 @@ async fn test_compaction_region_with_overlapping_delete_all() {
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
)
.await;
let request = CreateRequestBuilder::new()
.insert_option("compaction.type", "twcs")
.insert_option("compaction.twcs.max_active_window_runs", "2")
@@ -281,6 +312,16 @@ async fn test_readonly_during_compaction() {
.await;
let region_id = RegionId::new(1, 1);
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
)
.await;
let request = CreateRequestBuilder::new()
.insert_option("compaction.type", "twcs")
.insert_option("compaction.twcs.max_active_window_runs", "1")

View File

@@ -16,6 +16,7 @@ use std::sync::Arc;
use std::time::Duration;
use api::v1::Rows;
use common_meta::key::SchemaMetadataManager;
use object_store::util::join_path;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{RegionDropRequest, RegionRequest};
@@ -40,6 +41,17 @@ async fn test_engine_drop_region() {
.await;
let region_id = RegionId::new(1, 1);
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
)
.await;
// It's okay to drop a region doesn't exist.
engine
.handle_request(region_id, RegionRequest::Drop(RegionDropRequest {}))
@@ -87,7 +99,12 @@ async fn test_engine_drop_region() {
#[tokio::test]
async fn test_engine_drop_region_for_custom_store() {
common_telemetry::init_default_ut_logging();
async fn setup(engine: &MitoEngine, region_id: RegionId, storage_name: &str) {
async fn setup(
engine: &MitoEngine,
schema_metadata_manager: &SchemaMetadataManager,
region_id: RegionId,
storage_name: &str,
) {
let request = CreateRequestBuilder::new()
.insert_option("storage", storage_name)
.region_dir(storage_name)
@@ -97,6 +114,18 @@ async fn test_engine_drop_region_for_custom_store() {
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
let table_id = format!("test_table_{}", region_id.table_id());
schema_metadata_manager
.register_region_table_info(
region_id.table_id(),
&table_id,
"test_catalog",
"test_schema",
None,
)
.await;
let rows = Rows {
schema: column_schema.clone(),
rows: build_rows_for_key("a", 0, 2, 0),
@@ -114,12 +143,19 @@ async fn test_engine_drop_region_for_custom_store() {
&["Gcs"],
)
.await;
let schema_metadata_manager = env.get_schema_metadata_manager();
let object_store_manager = env.get_object_store_manager().unwrap();
let global_region_id = RegionId::new(1, 1);
setup(&engine, global_region_id, "default").await;
setup(
&engine,
&schema_metadata_manager,
global_region_id,
"default",
)
.await;
let custom_region_id = RegionId::new(2, 1);
setup(&engine, custom_region_id, "Gcs").await;
setup(&engine, &schema_metadata_manager, custom_region_id, "Gcs").await;
let global_region = engine.get_region(global_region_id).unwrap();
let global_region_dir = global_region.access_layer.region_dir().to_string();

View File

@@ -64,6 +64,16 @@ async fn test_edit_region_schedule_compaction() {
.await;
let region_id = RegionId::new(1, 1);
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
)
.await;
engine
.handle_request(
region_id,

View File

@@ -32,6 +32,16 @@ async fn test_scan_without_filtering_deleted() {
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
)
.await;
let request = CreateRequestBuilder::new()
.insert_option("compaction.type", "twcs")
.insert_option("compaction.twcs.max_active_window_runs", "10")

View File

@@ -45,6 +45,16 @@ async fn test_manual_flush() {
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
)
.await;
let request = CreateRequestBuilder::new().build();
let column_schemas = rows_schema(&request);
@@ -92,6 +102,16 @@ async fn test_flush_engine() {
.await;
let region_id = RegionId::new(1, 1);
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
)
.await;
let request = CreateRequestBuilder::new().build();
let column_schemas = rows_schema(&request);
@@ -151,6 +171,15 @@ async fn test_write_stall() {
.await;
let region_id = RegionId::new(1, 1);
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
)
.await;
let request = CreateRequestBuilder::new().build();
let column_schemas = rows_schema(&request);
@@ -215,6 +244,15 @@ async fn test_flush_empty() {
.await;
let region_id = RegionId::new(1, 1);
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
)
.await;
let request = CreateRequestBuilder::new().build();
engine
@@ -249,8 +287,17 @@ async fn test_flush_reopen_region(factory: Option<LogStoreFactory>) {
let mut env = TestEnv::new().with_log_store_factory(factory.clone());
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
)
.await;
let topic = prepare_test_for_kafka_log_store(&factory).await;
let request = CreateRequestBuilder::new()
.kafka_topic(topic.clone())
@@ -360,8 +407,17 @@ async fn test_auto_flush_engine() {
time_provider.clone(),
)
.await;
let region_id = RegionId::new(1, 1);
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
)
.await;
let request = CreateRequestBuilder::new().build();
let column_schemas = rows_schema(&request);
@@ -421,6 +477,16 @@ async fn test_flush_workers() {
let region_id0 = RegionId::new(1, 0);
let region_id1 = RegionId::new(1, 1);
env.get_schema_metadata_manager()
.register_region_table_info(
region_id0.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
)
.await;
let request = CreateRequestBuilder::new().region_dir("r0").build();
let column_schemas = rows_schema(&request);
engine

View File

@@ -98,6 +98,16 @@ async fn test_merge_mode_compaction() {
.await;
let region_id = RegionId::new(1, 1);
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
)
.await;
let request = CreateRequestBuilder::new()
.field_num(2)
.insert_option("compaction.type", "twcs")

View File

@@ -245,6 +245,16 @@ async fn test_open_region_skip_wal_replay() {
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
)
.await;
let request = CreateRequestBuilder::new().build();
let region_dir = request.region_dir.clone();
@@ -423,6 +433,16 @@ async fn test_open_compaction_region() {
let engine = env.create_engine(mito_config.clone()).await;
let region_id = RegionId::new(1, 1);
let schema_metadata_manager = env.get_schema_metadata_manager();
schema_metadata_manager
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
)
.await;
let request = CreateRequestBuilder::new().build();
let region_dir = request.region_dir.clone();
engine
@@ -444,10 +464,14 @@ async fn test_open_compaction_region() {
region_options: RegionOptions::default(),
};
let compaction_region =
open_compaction_region(&req, &mito_config, object_store_manager.clone())
.await
.unwrap();
let compaction_region = open_compaction_region(
&req,
&mito_config,
object_store_manager.clone(),
schema_metadata_manager,
)
.await
.unwrap();
assert_eq!(region_id, compaction_region.region_id);
}

View File

@@ -76,6 +76,16 @@ async fn test_parallel_scan() {
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
)
.await;
let request = CreateRequestBuilder::new().build();
let region_dir = request.region_dir.clone();

View File

@@ -151,6 +151,17 @@ async fn test_prune_memtable() {
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
)
.await;
let request = CreateRequestBuilder::new().build();
let column_schemas = rows_schema(&request);

View File

@@ -29,6 +29,15 @@ async fn test_last_row(append_mode: bool) {
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
)
.await;
let request = CreateRequestBuilder::new()
.insert_option("append_mode", &append_mode.to_string())
.build();

View File

@@ -151,6 +151,17 @@ async fn test_engine_truncate_after_flush() {
// Create the region.
let region_id = RegionId::new(1, 1);
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
)
.await;
let request = CreateRequestBuilder::new().build();
let column_schemas = rows_schema(&request);
engine

View File

@@ -870,6 +870,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to get schema metadata"))]
GetSchemaMetadata {
source: common_meta::error::Error,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -1002,6 +1009,7 @@ impl ErrorExt for Error {
| ApplyFulltextIndex { source, .. } => source.status_code(),
DecodeStats { .. } | StatsNotPresent { .. } => StatusCode::Internal,
RegionBusy { .. } => StatusCode::RegionBusy,
GetSchemaMetadata { source, .. } => source.status_code(),
}
}

View File

@@ -35,6 +35,9 @@ use api::v1::{OpType, Row, Rows, SemanticType};
use common_base::readable_size::ReadableSize;
use common_base::Plugins;
use common_datasource::compression::CompressionType;
use common_meta::key::{SchemaMetadataManager, SchemaMetadataManagerRef};
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::KvBackendRef;
use common_telemetry::warn;
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use common_wal::options::{KafkaWalOptions, WalOptions, WAL_OPTIONS_KEY};
@@ -195,6 +198,7 @@ pub struct TestEnv {
log_store: Option<LogStoreImpl>,
log_store_factory: LogStoreFactory,
object_store_manager: Option<ObjectStoreManagerRef>,
schema_metadata_manager: SchemaMetadataManagerRef,
}
impl Default for TestEnv {
@@ -211,6 +215,10 @@ impl TestEnv {
log_store: None,
log_store_factory: LogStoreFactory::RaftEngine(RaftEngineLogStoreFactory),
object_store_manager: None,
schema_metadata_manager: Arc::new(SchemaMetadataManager::new(Arc::new(
MemoryKvBackend::new(),
)
as KvBackendRef)),
}
}
@@ -221,6 +229,10 @@ impl TestEnv {
log_store: None,
log_store_factory: LogStoreFactory::RaftEngine(RaftEngineLogStoreFactory),
object_store_manager: None,
schema_metadata_manager: Arc::new(SchemaMetadataManager::new(Arc::new(
MemoryKvBackend::new(),
)
as KvBackendRef)),
}
}
@@ -231,6 +243,10 @@ impl TestEnv {
log_store: None,
log_store_factory: LogStoreFactory::RaftEngine(RaftEngineLogStoreFactory),
object_store_manager: None,
schema_metadata_manager: Arc::new(SchemaMetadataManager::new(Arc::new(
MemoryKvBackend::new(),
)
as KvBackendRef)),
}
}
@@ -269,6 +285,7 @@ impl TestEnv {
config,
log_store,
object_store_manager,
self.schema_metadata_manager.clone(),
Plugins::new(),
)
.await
@@ -278,6 +295,7 @@ impl TestEnv {
config,
log_store,
object_store_manager,
self.schema_metadata_manager.clone(),
Plugins::new(),
)
.await
@@ -295,6 +313,7 @@ impl TestEnv {
config,
log_store,
object_store_manager,
self.schema_metadata_manager.clone(),
Plugins::new(),
)
.await
@@ -304,6 +323,7 @@ impl TestEnv {
config,
log_store,
object_store_manager,
self.schema_metadata_manager.clone(),
Plugins::new(),
)
.await
@@ -335,6 +355,7 @@ impl TestEnv {
manager,
listener,
Arc::new(StdTimeProvider),
self.schema_metadata_manager.clone(),
)
.await
.unwrap(),
@@ -346,6 +367,7 @@ impl TestEnv {
manager,
listener,
Arc::new(StdTimeProvider),
self.schema_metadata_manager.clone(),
)
.await
.unwrap(),
@@ -388,6 +410,7 @@ impl TestEnv {
manager,
listener,
Arc::new(StdTimeProvider),
self.schema_metadata_manager.clone(),
)
.await
.unwrap(),
@@ -399,6 +422,7 @@ impl TestEnv {
manager,
listener,
Arc::new(StdTimeProvider),
self.schema_metadata_manager.clone(),
)
.await
.unwrap(),
@@ -430,6 +454,7 @@ impl TestEnv {
manager,
listener,
time_provider.clone(),
self.schema_metadata_manager.clone(),
)
.await
.unwrap(),
@@ -441,6 +466,7 @@ impl TestEnv {
manager,
listener,
time_provider.clone(),
self.schema_metadata_manager.clone(),
)
.await
.unwrap(),
@@ -450,13 +476,13 @@ impl TestEnv {
/// Reopen the engine.
pub async fn reopen_engine(&mut self, engine: MitoEngine, config: MitoConfig) -> MitoEngine {
engine.stop().await.unwrap();
match self.log_store.as_ref().unwrap().clone() {
LogStoreImpl::RaftEngine(log_store) => MitoEngine::new(
&self.data_home().display().to_string(),
config,
log_store,
self.object_store_manager.clone().unwrap(),
self.schema_metadata_manager.clone(),
Plugins::new(),
)
.await
@@ -466,6 +492,7 @@ impl TestEnv {
config,
log_store,
self.object_store_manager.clone().unwrap(),
self.schema_metadata_manager.clone(),
Plugins::new(),
)
.await
@@ -481,6 +508,7 @@ impl TestEnv {
config,
log_store,
self.object_store_manager.clone().unwrap(),
self.schema_metadata_manager.clone(),
Plugins::new(),
)
.await
@@ -490,6 +518,7 @@ impl TestEnv {
config,
log_store,
self.object_store_manager.clone().unwrap(),
self.schema_metadata_manager.clone(),
Plugins::new(),
)
.await
@@ -515,6 +544,7 @@ impl TestEnv {
Arc::new(config),
log_store,
Arc::new(object_store_manager),
self.schema_metadata_manager.clone(),
Plugins::new(),
)
.await
@@ -523,6 +553,7 @@ impl TestEnv {
Arc::new(config),
log_store,
Arc::new(object_store_manager),
self.schema_metadata_manager.clone(),
Plugins::new(),
)
.await
@@ -630,6 +661,10 @@ impl TestEnv {
Arc::new(write_cache)
}
pub fn get_schema_metadata_manager(&self) -> SchemaMetadataManagerRef {
self.schema_metadata_manager.clone()
}
}
/// Builder to mock a [RegionCreateRequest].

View File

@@ -31,6 +31,7 @@ use std::sync::Arc;
use std::time::Duration;
use common_base::Plugins;
use common_meta::key::SchemaMetadataManagerRef;
use common_runtime::JoinHandle;
use common_telemetry::{error, info, warn};
use futures::future::try_join_all;
@@ -132,6 +133,7 @@ impl WorkerGroup {
config: Arc<MitoConfig>,
log_store: Arc<S>,
object_store_manager: ObjectStoreManagerRef,
schema_metadata_manager: SchemaMetadataManagerRef,
plugins: Plugins,
) -> Result<WorkerGroup> {
let (flush_sender, flush_receiver) = watch::channel(());
@@ -191,6 +193,7 @@ impl WorkerGroup {
flush_sender: flush_sender.clone(),
flush_receiver: flush_receiver.clone(),
plugins: plugins.clone(),
schema_metadata_manager: schema_metadata_manager.clone(),
}
.start()
})
@@ -273,6 +276,7 @@ impl WorkerGroup {
object_store_manager: ObjectStoreManagerRef,
write_buffer_manager: Option<WriteBufferManagerRef>,
listener: Option<crate::engine::listener::EventListenerRef>,
schema_metadata_manager: SchemaMetadataManagerRef,
time_provider: TimeProviderRef,
) -> Result<WorkerGroup> {
let (flush_sender, flush_receiver) = watch::channel(());
@@ -329,6 +333,7 @@ impl WorkerGroup {
flush_sender: flush_sender.clone(),
flush_receiver: flush_receiver.clone(),
plugins: Plugins::new(),
schema_metadata_manager: schema_metadata_manager.clone(),
}
.start()
})
@@ -405,6 +410,7 @@ struct WorkerStarter<S> {
/// Watch channel receiver to wait for background flush job.
flush_receiver: watch::Receiver<()>,
plugins: Plugins,
schema_metadata_manager: SchemaMetadataManagerRef,
}
impl<S: LogStore> WorkerStarter<S> {
@@ -455,6 +461,7 @@ impl<S: LogStore> WorkerStarter<S> {
stalled_count: WRITE_STALL_TOTAL.with_label_values(&[&id_string]),
region_count: REGION_COUNT.with_label_values(&[&id_string]),
region_edit_queues: RegionEditQueues::default(),
schema_metadata_manager: self.schema_metadata_manager,
};
let handle = common_runtime::spawn_global(async move {
worker_thread.run().await;
@@ -645,6 +652,8 @@ struct RegionWorkerLoop<S> {
region_count: IntGauge,
/// Queues for region edit requests.
region_edit_queues: RegionEditQueues,
/// Database level metadata manager.
schema_metadata_manager: SchemaMetadataManagerRef,
}
impl<S: LogStore> RegionWorkerLoop<S> {

View File

@@ -44,6 +44,7 @@ impl<S> RegionWorkerLoop<S> {
&region.access_layer,
sender,
&region.manifest_ctx,
self.schema_metadata_manager.clone(),
)
.await
{
@@ -80,7 +81,11 @@ impl<S> RegionWorkerLoop<S> {
// Schedule next compaction if necessary.
self.compaction_scheduler
.on_compaction_finished(region_id, &region.manifest_ctx)
.on_compaction_finished(
region_id,
&region.manifest_ctx,
self.schema_metadata_manager.clone(),
)
.await;
}
@@ -107,6 +112,7 @@ impl<S> RegionWorkerLoop<S> {
&region.access_layer,
OptionOutputTx::none(),
&region.manifest_ctx,
self.schema_metadata_manager.clone(),
)
.await
{

View File

@@ -760,10 +760,8 @@ impl Inserter {
ctx: &QueryContextRef,
statement_executor: &StatementExecutor,
) -> Result<Vec<TableRef>> {
let catalog_name = ctx.current_catalog();
let schema_name = ctx.current_schema();
let res = statement_executor
.create_logical_tables(catalog_name, &schema_name, &create_table_exprs, ctx.clone())
.create_logical_tables(&create_table_exprs, ctx.clone())
.await;
match res {

View File

@@ -26,7 +26,7 @@ use common_error::ext::BoxedError;
use common_meta::cache_invalidator::Context;
use common_meta::ddl::ExecutorContext;
use common_meta::instruction::CacheIdent;
use common_meta::key::schema_name::{SchemaNameKey, SchemaNameValue};
use common_meta::key::schema_name::SchemaNameKey;
use common_meta::key::NAME_PATTERN;
use common_meta::rpc::ddl::{
CreateFlowTask, DdlTask, DropFlowTask, DropViewTask, SubmitDdlTaskRequest,
@@ -116,9 +116,21 @@ impl StatementExecutor {
.await
.context(error::FindTablePartitionRuleSnafu { table_name: table })?;
// CREATE TABLE LIKE also inherits database level options.
let schema_options = self
.table_metadata_manager
.schema_manager()
.get(SchemaNameKey {
catalog: &catalog,
schema: &schema,
})
.await
.context(TableMetadataManagerSnafu)?;
let quote_style = ctx.quote_style();
let mut create_stmt = create_table_stmt(&table_ref.table_info(), quote_style)
.context(error::ParseQuerySnafu)?;
let mut create_stmt =
create_table_stmt(&table_ref.table_info(), schema_options, quote_style)
.context(error::ParseQuerySnafu)?;
create_stmt.name = stmt.table_name;
create_stmt.if_not_exists = false;
@@ -165,15 +177,8 @@ impl StatementExecutor {
.table_options
.contains_key(LOGICAL_TABLE_METADATA_KEY)
{
let catalog_name = &create_table.catalog_name;
let schema_name = &create_table.schema_name;
return self
.create_logical_tables(
catalog_name,
schema_name,
&[create_table.clone()],
query_ctx,
)
.create_logical_tables(&[create_table.clone()], query_ctx)
.await?
.into_iter()
.next()
@@ -183,6 +188,7 @@ impl StatementExecutor {
}
let _timer = crate::metrics::DIST_CREATE_TABLE.start_timer();
let schema = self
.table_metadata_manager
.schema_manager()
@@ -193,12 +199,12 @@ impl StatementExecutor {
.await
.context(TableMetadataManagerSnafu)?;
let Some(schema_opts) = schema else {
return SchemaNotFoundSnafu {
ensure!(
schema.is_some(),
SchemaNotFoundSnafu {
schema_info: &create_table.schema_name,
}
.fail();
};
);
// if table exists.
if let Some(table) = self
@@ -240,7 +246,7 @@ impl StatementExecutor {
);
let (partitions, partition_cols) = parse_partitions(create_table, partitions, &query_ctx)?;
let mut table_info = create_table_info(create_table, partition_cols, schema_opts)?;
let mut table_info = create_table_info(create_table, partition_cols)?;
let resp = self
.create_table_procedure(
@@ -273,8 +279,6 @@ impl StatementExecutor {
#[tracing::instrument(skip_all)]
pub async fn create_logical_tables(
&self,
catalog_name: &str,
schema_name: &str,
create_table_exprs: &[CreateTableExpr],
query_context: QueryContextRef,
) -> Result<Vec<TableRef>> {
@@ -296,19 +300,9 @@ impl StatementExecutor {
);
}
let schema = self
.table_metadata_manager
.schema_manager()
.get(SchemaNameKey::new(catalog_name, schema_name))
.await
.context(TableMetadataManagerSnafu)?
.context(SchemaNotFoundSnafu {
schema_info: schema_name,
})?;
let mut raw_tables_info = create_table_exprs
.iter()
.map(|create| create_table_info(create, vec![], schema.clone()))
.map(|create| create_table_info(create, vec![]))
.collect::<Result<Vec<_>>>()?;
let tables_data = create_table_exprs
.iter()
@@ -1261,7 +1255,6 @@ fn parse_partitions(
fn create_table_info(
create_table: &CreateTableExpr,
partition_columns: Vec<String>,
schema_opts: SchemaNameValue,
) -> Result<RawTableInfo> {
let mut column_schemas = Vec::with_capacity(create_table.column_defs.len());
let mut column_name_to_index_map = HashMap::new();
@@ -1310,7 +1303,6 @@ fn create_table_info(
let table_options = TableOptions::try_from_iter(&create_table.table_options)
.context(UnrecognizedTableOptionSnafu)?;
let table_options = merge_options(table_options, schema_opts);
let meta = RawTableMeta {
schema: raw_schema,
@@ -1495,12 +1487,6 @@ fn convert_value(
.context(ParseSqlValueSnafu)
}
/// Merge table level table options with schema level table options.
fn merge_options(mut table_opts: TableOptions, schema_opts: SchemaNameValue) -> TableOptions {
table_opts.ttl = table_opts.ttl.or(schema_opts.ttl);
table_opts
}
#[cfg(test)]
mod test {
use session::context::{QueryContext, QueryContextBuilder};

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use common_error::ext::BoxedError;
use common_meta::key::schema_name::SchemaNameKey;
use common_query::Output;
use common_telemetry::tracing;
use partition::manager::PartitionInfo;
@@ -33,7 +34,7 @@ use table::TableRef;
use crate::error::{
self, CatalogSnafu, ExecuteStatementSnafu, ExternalSnafu, FindViewInfoSnafu, InvalidSqlSnafu,
Result, ViewInfoNotFoundSnafu, ViewNotFoundSnafu,
Result, TableMetadataManagerSnafu, ViewInfoNotFoundSnafu, ViewNotFoundSnafu,
};
use crate::statement::StatementExecutor;
@@ -118,6 +119,16 @@ impl StatementExecutor {
.fail();
}
let schema_options = self
.table_metadata_manager
.schema_manager()
.get(SchemaNameKey {
catalog: &table_name.catalog_name,
schema: &table_name.schema_name,
})
.await
.context(TableMetadataManagerSnafu)?;
let partitions = self
.partition_manager
.find_table_partitions(table.table_info().table_id())
@@ -128,7 +139,8 @@ impl StatementExecutor {
let partitions = create_partitions_stmt(partitions)?;
query::sql::show_create_table(table, partitions, query_ctx).context(ExecuteStatementSnafu)
query::sql::show_create_table(table, schema_options, partitions, query_ctx)
.context(ExecuteStatementSnafu)
}
#[tracing::instrument(skip_all)]

View File

@@ -32,6 +32,7 @@ use common_datasource::lister::{Lister, Source};
use common_datasource::object_store::build_backend;
use common_datasource::util::find_dir_and_filename;
use common_meta::key::flow::flow_info::FlowInfoValue;
use common_meta::SchemaOptions;
use common_query::prelude::GREPTIME_TIMESTAMP;
use common_query::Output;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
@@ -703,6 +704,7 @@ pub fn show_create_database(database_name: &str, options: OptionMap) -> Result<O
pub fn show_create_table(
table: TableRef,
schema_options: Option<SchemaOptions>,
partitions: Option<Partitions>,
query_ctx: QueryContextRef,
) -> Result<Output> {
@@ -711,7 +713,7 @@ pub fn show_create_table(
let quote_style = query_ctx.quote_style();
let mut stmt = create_table_stmt(&table_info, quote_style)?;
let mut stmt = create_table_stmt(&table_info, schema_options, quote_style)?;
stmt.partitions = partitions.map(|mut p| {
p.set_quote(quote_style);
p

View File

@@ -16,6 +16,7 @@
use std::collections::HashMap;
use common_meta::SchemaOptions;
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, SchemaRef, COMMENT_KEY};
use humantime::format_duration;
use snafu::ResultExt;
@@ -36,7 +37,8 @@ use crate::error::{
ConvertSqlTypeSnafu, ConvertSqlValueSnafu, GetFulltextOptionsSnafu, Result, SqlSnafu,
};
fn create_sql_options(table_meta: &TableMeta) -> OptionMap {
/// Generates CREATE TABLE options from given table metadata and schema-level options.
fn create_sql_options(table_meta: &TableMeta, schema_options: Option<SchemaOptions>) -> OptionMap {
let table_opts = &table_meta.options;
let mut options = OptionMap::default();
if let Some(write_buffer_size) = table_opts.write_buffer_size {
@@ -47,7 +49,12 @@ fn create_sql_options(table_meta: &TableMeta) -> OptionMap {
}
if let Some(ttl) = table_opts.ttl {
options.insert(TTL_KEY.to_string(), format_duration(ttl).to_string());
}
} else if let Some(database_ttl) = schema_options.and_then(|o| o.ttl) {
options.insert(
TTL_KEY.to_string(),
format_duration(database_ttl).to_string(),
);
};
for (k, v) in table_opts
.extra_options
.iter()
@@ -169,7 +176,11 @@ fn create_table_constraints(
}
/// Create a CreateTable statement from table info.
pub fn create_table_stmt(table_info: &TableInfoRef, quote_style: char) -> Result<CreateTable> {
pub fn create_table_stmt(
table_info: &TableInfoRef,
schema_options: Option<SchemaOptions>,
quote_style: char,
) -> Result<CreateTable> {
let table_meta = &table_info.meta;
let table_name = &table_info.name;
let schema = &table_info.meta.schema;
@@ -195,7 +206,7 @@ pub fn create_table_stmt(table_info: &TableInfoRef, quote_style: char) -> Result
columns,
engine: table_meta.engine.clone(),
constraints,
options: create_sql_options(table_meta),
options: create_sql_options(table_meta, schema_options),
partitions: None,
})
}
@@ -271,7 +282,7 @@ mod tests {
.unwrap(),
);
let stmt = create_table_stmt(&info, '"').unwrap();
let stmt = create_table_stmt(&info, None, '"').unwrap();
let sql = format!("\n{}", stmt);
assert_eq!(
@@ -337,7 +348,7 @@ ENGINE=mito
.unwrap(),
);
let stmt = create_table_stmt(&info, '"').unwrap();
let stmt = create_table_stmt(&info, None, '"').unwrap();
let sql = format!("\n{}", stmt);
assert_eq!(