diff --git a/Cargo.lock b/Cargo.lock index 8230e5ec81..40ccd68074 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4074,7 +4074,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=7fb5aa1095b62794c5c4333a1a2ed45da1c86a07#7fb5aa1095b62794c5c4333a1a2ed45da1c86a07" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=6d4131f57ece9a8d250a289b3af1567eab768c86#6d4131f57ece9a8d250a289b3af1567eab768c86" dependencies = [ "prost", "serde", diff --git a/Cargo.toml b/Cargo.toml index fcb8f0dd8c..6a8a500fe1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -78,7 +78,7 @@ datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git derive_builder = "0.12" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "7fb5aa1095b62794c5c4333a1a2ed45da1c86a07" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "6d4131f57ece9a8d250a289b3af1567eab768c86" } humantime-serde = "1.1" itertools = "0.10" lazy_static = "1.4" diff --git a/src/common/base/src/lib.rs b/src/common/base/src/lib.rs index 5552a2ebf9..5fc8087aae 100644 --- a/src/common/base/src/lib.rs +++ b/src/common/base/src/lib.rs @@ -15,7 +15,6 @@ pub mod bit_vec; pub mod buffer; pub mod bytes; -pub mod paths; #[allow(clippy::all)] pub mod readable_size; diff --git a/src/common/base/src/paths.rs b/src/common/base/src/paths.rs deleted file mode 100644 index 6b0c282f45..0000000000 --- a/src/common/base/src/paths.rs +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//! Path constants for table engines, cluster states and WAL -/// All paths relative to data_home(file storage) or root path(S3, OSS etc). - -/// WAL dir for local file storage -pub const WAL_DIR: &str = "wal/"; - -/// Data dir for table engines -pub const DATA_DIR: &str = "data/"; - -/// Cluster state dir -pub const CLUSTER_DIR: &str = "cluster/"; diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 10f40bca81..f55e9bc7a0 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -21,7 +21,6 @@ use api::v1::meta::Role; use catalog::remote::region_alive_keeper::RegionAliveKeepers; use catalog::remote::{CachedMetaKvBackend, RemoteCatalogManager}; use catalog::{CatalogManager, CatalogManagerRef, RegisterTableRequest}; -use common_base::paths::{CLUSTER_DIR, WAL_DIR}; use common_base::Plugins; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID}; use common_error::ext::BoxedError; @@ -51,6 +50,7 @@ use storage::config::EngineConfig as StorageEngineConfig; use storage::scheduler::{LocalScheduler, SchedulerConfig}; use storage::EngineImpl; use store_api::logstore::LogStore; +use store_api::path_utils::{CLUSTER_DIR, WAL_DIR}; use table::engine::manager::{MemoryTableEngineManager, TableEngineManagerRef}; use table::engine::{TableEngine, TableEngineProcedureRef}; use table::requests::FlushTableRequest; diff --git a/src/file-table-engine/src/engine/immutable.rs b/src/file-table-engine/src/engine/immutable.rs index d0ed345e9e..a0729403bb 100644 --- a/src/file-table-engine/src/engine/immutable.rs +++ b/src/file-table-engine/src/engine/immutable.rs @@ -23,7 +23,8 @@ use common_telemetry::{debug, logging}; use datatypes::schema::Schema; use object_store::ObjectStore; use snafu::ResultExt; -use table::engine::{table_dir, EngineContext, TableEngine, TableEngineProcedure, TableReference}; +use store_api::path_utils::table_dir; +use table::engine::{EngineContext, TableEngine, TableEngineProcedure, TableReference}; use table::error::TableOperationSnafu; use table::metadata::{TableId, TableInfo, TableInfoBuilder, TableMetaBuilder, TableType}; use table::requests::{ diff --git a/src/file-table-engine/src/test_util.rs b/src/file-table-engine/src/test_util.rs index 5eb5619939..28a852c86c 100644 --- a/src/file-table-engine/src/test_util.rs +++ b/src/file-table-engine/src/test_util.rs @@ -20,7 +20,8 @@ use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnSchema, RawSchema, Schema, SchemaBuilder, SchemaRef}; use object_store::services::Fs; use object_store::ObjectStore; -use table::engine::{table_dir, EngineContext, TableEngine}; +use store_api::path_utils::table_dir; +use table::engine::{EngineContext, TableEngine}; use table::metadata::{RawTableInfo, TableInfo, TableInfoBuilder, TableMetaBuilder, TableType}; use table::requests::{self, CreateTableRequest, TableOptions}; use table::TableRef; diff --git a/src/meta-srv/src/procedure/create_table.rs b/src/meta-srv/src/procedure/create_table.rs index 1a4713d628..da80bb7a76 100644 --- a/src/meta-srv/src/procedure/create_table.rs +++ b/src/meta-srv/src/procedure/create_table.rs @@ -33,7 +33,7 @@ use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt, ResultExt}; use store_api::storage::RegionId; use strum::AsRefStr; -use table::engine::{region_dir, TableReference}; +use table::engine::TableReference; use table::metadata::{RawTableInfo, TableId}; use super::utils::{handle_request_datanode_error, handle_retry_error}; @@ -170,7 +170,8 @@ impl CreateTableProcedure { column_defs, primary_key, create_if_not_exists: true, - region_dir: "".to_string(), + catalog: String::new(), + schema: String::new(), options: create_table_expr.table_options.clone(), }) } @@ -199,7 +200,8 @@ impl CreateTableProcedure { let mut create_table_request = request_template.clone(); create_table_request.region_id = region_id.as_u64(); - create_table_request.region_dir = region_dir(catalog, schema, region_id); + create_table_request.catalog = catalog.to_string(); + create_table_request.schema = schema.to_string(); PbRegionRequest::Create(create_table_request) }) @@ -562,7 +564,8 @@ mod test { ], primary_key: vec![2, 1], create_if_not_exists: true, - region_dir: "".to_string(), + catalog: String::new(), + schema: String::new(), options: HashMap::new(), }; assert_eq!(template, expected); diff --git a/src/mito/src/engine.rs b/src/mito/src/engine.rs index 3f45df8afd..09f081673a 100644 --- a/src/mito/src/engine.rs +++ b/src/mito/src/engine.rs @@ -31,14 +31,14 @@ use key_lock::KeyLock; use object_store::ObjectStore; use snafu::{ensure, OptionExt, ResultExt}; use storage::manifest::manifest_compress_type; +use store_api::path_utils::{region_name, table_dir}; use store_api::storage::{ CloseOptions, ColumnDescriptorBuilder, ColumnFamilyDescriptor, ColumnFamilyDescriptorBuilder, ColumnId, CompactionStrategy, EngineContext as StorageEngineContext, OpenOptions, RegionNumber, RowKeyDescriptor, RowKeyDescriptorBuilder, StorageEngine, }; use table::engine::{ - region_name, table_dir, CloseTableResult, EngineContext, TableEngine, TableEngineProcedure, - TableReference, + CloseTableResult, EngineContext, TableEngine, TableEngineProcedure, TableReference, }; use table::metadata::{TableId, TableInfo, TableVersion}; use table::requests::{ diff --git a/src/mito/src/engine/procedure/create.rs b/src/mito/src/engine/procedure/create.rs index 427e37cd23..ab8ef60405 100644 --- a/src/mito/src/engine/procedure/create.rs +++ b/src/mito/src/engine/procedure/create.rs @@ -23,11 +23,11 @@ use common_telemetry::metric::Timer; use datatypes::schema::{Schema, SchemaRef}; use serde::{Deserialize, Serialize}; use snafu::{ensure, ResultExt}; +use store_api::path_utils::table_dir; use store_api::storage::{ ColumnId, CompactionStrategy, CreateOptions, EngineContext, OpenOptions, RegionDescriptorBuilder, RegionId, RegionNumber, StorageEngine, }; -use table::engine::table_dir; use table::metadata::{TableInfoBuilder, TableMetaBuilder, TableType}; use table::requests::CreateTableRequest; use table::TableRef; diff --git a/src/store-api/src/lib.rs b/src/store-api/src/lib.rs index dedc5f1cc9..d703794598 100644 --- a/src/store-api/src/lib.rs +++ b/src/store-api/src/lib.rs @@ -19,6 +19,7 @@ pub mod data_source; pub mod logstore; pub mod manifest; pub mod metadata; +pub mod path_utils; pub mod region_engine; pub mod region_request; pub mod storage; diff --git a/src/store-api/src/path_utils.rs b/src/store-api/src/path_utils.rs new file mode 100644 index 0000000000..7f35709b0e --- /dev/null +++ b/src/store-api/src/path_utils.rs @@ -0,0 +1,60 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Path constants for table engines, cluster states and WAL + +use crate::storage::{RegionId, RegionNumber, TableId}; +/// All paths relative to data_home(file storage) or root path(S3, OSS etc). + +/// WAL dir for local file storage +pub const WAL_DIR: &str = "wal/"; + +/// Data dir for table engines +pub const DATA_DIR: &str = "data/"; + +/// Cluster state dir +pub const CLUSTER_DIR: &str = "cluster/"; + +/// Generate region name in the form of "{TABLE_ID}_{REGION_NUMBER}" +#[inline] +pub fn region_name(table_id: TableId, region_number: RegionNumber) -> String { + format!("{table_id}_{region_number:010}") +} + +#[inline] +pub fn table_dir(catalog_name: &str, schema_name: &str, table_id: TableId) -> String { + format!("{DATA_DIR}{catalog_name}/{schema_name}/{table_id}/") +} + +pub fn region_dir(catalog_name: &str, schema_name: &str, region_id: RegionId) -> String { + format!( + "{}{}", + table_dir(catalog_name, schema_name, region_id.table_id()), + region_name(region_id.table_id(), region_id.region_number()) + ) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_region_dir() { + let region_id = RegionId::new(42, 1); + assert_eq!( + region_dir("my_catalog", "my_schema", region_id), + "data/my_catalog/my_schema/42/42_0000000001" + ); + } +} diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index dfcca0a98d..197c1e3a3f 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -18,6 +18,7 @@ use api::v1::region::region_request; use api::v1::Rows; use crate::metadata::{ColumnMetadata, MetadataError}; +use crate::path_utils::region_dir; use crate::storage::{AlterRequest, ColumnId, RegionId, ScanRequest}; #[derive(Debug)] @@ -67,15 +68,17 @@ impl RegionRequest { .into_iter() .map(ColumnMetadata::try_from_column_def) .collect::, _>>()?; + let region_id = create.region_id.into(); + let region_dir = region_dir(&create.catalog, &create.schema, region_id); Ok(vec![( - create.region_id.into(), + region_id, Self::Create(RegionCreateRequest { engine: create.engine, column_metadatas, primary_key: create.primary_key, create_if_not_exists: create.create_if_not_exists, options: create.options, - region_dir: create.region_dir, + region_dir, }), )]) } @@ -83,14 +86,18 @@ impl RegionRequest { drop.region_id.into(), Self::Drop(RegionDropRequest {}), )]), - region_request::Body::Open(open) => Ok(vec![( - open.region_id.into(), - Self::Open(RegionOpenRequest { - engine: open.engine, - region_dir: open.region_dir, - options: open.options, - }), - )]), + region_request::Body::Open(open) => { + let region_id = open.region_id.into(); + let region_dir = region_dir(&open.catalog, &open.schema, region_id); + Ok(vec![( + region_id, + Self::Open(RegionOpenRequest { + engine: open.engine, + region_dir, + options: open.options, + }), + )]) + } region_request::Body::Close(close) => Ok(vec![( close.region_id.into(), Self::Close(RegionCloseRequest {}), diff --git a/src/table/src/engine.rs b/src/table/src/engine.rs index 7d3fb5809f..38350646ce 100644 --- a/src/table/src/engine.rs +++ b/src/table/src/engine.rs @@ -15,10 +15,9 @@ use std::fmt::{self, Display}; use std::sync::Arc; -use common_base::paths::DATA_DIR; use common_procedure::BoxedProcedure; use datafusion_common::TableReference as DfTableReference; -use store_api::storage::{RegionId, RegionNumber}; +use store_api::storage::RegionNumber; use crate::error::{self, Result}; use crate::metadata::TableId; @@ -187,25 +186,6 @@ pub trait TableEngineProcedure: Send + Sync { pub type TableEngineProcedureRef = Arc; -/// Generate region name in the form of "{TABLE_ID}_{REGION_NUMBER}" -#[inline] -pub fn region_name(table_id: TableId, region_number: RegionNumber) -> String { - format!("{table_id}_{region_number:010}") -} - -#[inline] -pub fn table_dir(catalog_name: &str, schema_name: &str, table_id: TableId) -> String { - format!("{DATA_DIR}{catalog_name}/{schema_name}/{table_id}/") -} - -pub fn region_dir(catalog_name: &str, schema_name: &str, region_id: RegionId) -> String { - format!( - "{}{}", - table_dir(catalog_name, schema_name, region_id.table_id()), - region_name(region_id.table_id(), region_id.region_number()) - ) -} - #[cfg(test)] mod tests { use super::*; @@ -220,13 +200,4 @@ mod tests { assert_eq!("greptime.public.test", table_ref.to_string()); } - - #[test] - fn test_region_dir() { - let region_id = RegionId::new(42, 1); - assert_eq!( - region_dir("my_catalog", "my_schema", region_id), - "data/my_catalog/my_schema/42/42_0000000001" - ); - } }