feat: create distributed Mito2 table (#2246)

* feat: create distributed Mito2 table

* rebase develop
This commit is contained in:
LFC
2023-08-28 20:07:52 +08:00
committed by GitHub
parent 71fc3c42d9
commit ef75e8f7c3
27 changed files with 712 additions and 247 deletions

73
Cargo.lock generated
View File

@@ -767,7 +767,7 @@ version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cdca6a10ecad987bda04e95606ef85a5417dcaac1a78455242d72e031e2b6b62"
dependencies = [
"heck 0.4.1",
"heck",
"proc-macro2",
"quote",
"syn 2.0.28",
@@ -1475,7 +1475,7 @@ version = "3.2.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae6371b8bdc8b7d3959e9cf7b22d4435ef3e79e138688421ec654acf8c81b008"
dependencies = [
"heck 0.4.1",
"heck",
"proc-macro-error",
"proc-macro2",
"quote",
@@ -1488,7 +1488,7 @@ version = "4.3.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "54a9bb5758fc5dfe728d1019941681eccaf0cf8a4189b692a0ee2f2ecf90a050"
dependencies = [
"heck 0.4.1",
"heck",
"proc-macro2",
"quote",
"syn 2.0.28",
@@ -1530,6 +1530,7 @@ dependencies = [
"datanode",
"datatypes",
"derive-new",
"derive_builder 0.12.0",
"enum_dispatch",
"futures-util",
"moka 0.9.9",
@@ -1689,7 +1690,7 @@ dependencies = [
"paste",
"regex",
"snafu",
"strum 0.21.0",
"strum 0.25.0",
"tokio",
"tokio-util",
"url",
@@ -1700,7 +1701,7 @@ name = "common-error"
version = "0.4.0-nightly"
dependencies = [
"snafu",
"strum 0.24.1",
"strum 0.25.0",
]
[[package]]
@@ -4153,11 +4154,13 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=39b0ea8d086d0ab762046b0f473aa3ef8bd347f9#39b0ea8d086d0ab762046b0f473aa3ef8bd347f9"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=ec2d346e09a2f6db3b1d0aaf010e89ed8a69eccc#ec2d346e09a2f6db3b1d0aaf010e89ed8a69eccc"
dependencies = [
"prost",
"serde",
"serde_json",
"strum 0.25.0",
"strum_macros 0.25.2",
"tonic 0.9.2",
"tonic-build",
]
@@ -4273,15 +4276,6 @@ dependencies = [
"http",
]
[[package]]
name = "heck"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d621efb26863f0e9924c6ac577e8275e5e6b77455db64ffa6c65c904e9e132c"
dependencies = [
"unicode-segmentation",
]
[[package]]
name = "heck"
version = "0.4.1"
@@ -5345,6 +5339,7 @@ dependencies = [
"session",
"snafu",
"store-api",
"strum 0.25.0",
"table",
"tokio",
"tokio-stream",
@@ -5574,7 +5569,7 @@ dependencies = [
"snafu",
"storage",
"store-api",
"strum 0.21.0",
"strum 0.25.0",
"table",
"tokio",
"tokio-util",
@@ -5654,7 +5649,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56b0d8a0db9bf6d2213e11f2c701cb91387b0614361625ab7b9743b41aa4938f"
dependencies = [
"darling 0.20.3",
"heck 0.4.1",
"heck",
"num-bigint",
"proc-macro-crate 1.3.1",
"proc-macro-error",
@@ -7098,7 +7093,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "119533552c9a7ffacc21e099c24a0ac8bb19c2a2a3f363de84cd9b844feab270"
dependencies = [
"bytes",
"heck 0.4.1",
"heck",
"itertools 0.10.5",
"lazy_static",
"log",
@@ -8918,7 +8913,7 @@ dependencies = [
"snafu",
"snap",
"sql",
"strum 0.24.1",
"strum 0.25.0",
"table",
"tikv-jemalloc-ctl",
"tokio",
@@ -9140,7 +9135,7 @@ version = "0.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "990079665f075b699031e9c08fd3ab99be5029b96f3b78dc0709e8f77e4efebf"
dependencies = [
"heck 0.4.1",
"heck",
"proc-macro2",
"quote",
"syn 1.0.109",
@@ -9403,7 +9398,7 @@ checksum = "9966e64ae989e7e575b19d7265cb79d7fc3cbbdf179835cb0d716f294c2049c9"
dependencies = [
"dotenvy",
"either",
"heck 0.4.1",
"heck",
"once_cell",
"proc-macro2",
"quote",
@@ -9608,23 +9603,11 @@ version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"
[[package]]
name = "strum"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aaf86bbcfd1fa9670b7a129f64fc0c9fcbbfe4f1bc4210e9e98fe71ffc12cde2"
dependencies = [
"strum_macros 0.21.1",
]
[[package]]
name = "strum"
version = "0.24.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "063e6045c0e62079840579a7e47a355ae92f60eb74daaf156fb1e84ba164e63f"
dependencies = [
"strum_macros 0.24.3",
]
[[package]]
name = "strum"
@@ -9635,25 +9618,13 @@ dependencies = [
"strum_macros 0.25.2",
]
[[package]]
name = "strum_macros"
version = "0.21.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d06aaeeee809dbc59eb4556183dd927df67db1540de5be8d3ec0b6636358a5ec"
dependencies = [
"heck 0.3.3",
"proc-macro2",
"quote",
"syn 1.0.109",
]
[[package]]
name = "strum_macros"
version = "0.24.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e385be0d24f186b4ce2f9982191e7101bb737312ad61c1f2f984f34bcf85d59"
dependencies = [
"heck 0.4.1",
"heck",
"proc-macro2",
"quote",
"rustversion",
@@ -9666,7 +9637,7 @@ version = "0.25.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ad8d03b598d3d0fff69bf533ee3ef19b8eeb342729596df84bcc7e1f96ec4059"
dependencies = [
"heck 0.4.1",
"heck",
"proc-macro2",
"quote",
"rustversion",
@@ -9716,7 +9687,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3ae64fb7ad0670c7d6d53d57b1b91beb2212afc30e164cc8edb02d6b2cff32a"
dependencies = [
"gix",
"heck 0.4.1",
"heck",
"prettyplease 0.2.12",
"prost",
"prost-build",
@@ -9738,7 +9709,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "658f6cbbd29a250869b87e1bb5a4b42db534cacfc1c03284f2536cd36b6c1617"
dependencies = [
"git2",
"heck 0.4.1",
"heck",
"prettyplease 0.2.12",
"prost",
"prost-build",
@@ -10777,7 +10748,7 @@ version = "0.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95d27d749378ceab6ec22188ed7ad102205c89ddb92ab662371c850ffc71aa1a"
dependencies = [
"heck 0.4.1",
"heck",
"log",
"proc-macro2",
"quote",
@@ -10795,7 +10766,7 @@ version = "0.0.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c8d9ecedde2fd77e975c38eeb9ca40b34ad0247b2259c6e6bbd2a8d6cc2444f"
dependencies = [
"heck 0.4.1",
"heck",
"log",
"proc-macro2",
"quote",

View File

@@ -77,7 +77,7 @@ datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git
derive_builder = "0.12"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "39b0ea8d086d0ab762046b0f473aa3ef8bd347f9" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "ec2d346e09a2f6db3b1d0aaf010e89ed8a69eccc" }
itertools = "0.10"
lazy_static = "1.4"
once_cell = "1.18"
@@ -93,6 +93,7 @@ snafu = { version = "0.7", features = ["backtraces"] }
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "296a4f6c73b129d6f565a42a2e5e53c6bc2b9da4", features = [
"visitor",
] }
strum = { version = "0.25", features = ["derive"] }
tempfile = "3"
tokio = { version = "1.28", features = ["full"] }
tokio-util = { version = "0.7", features = ["io-util", "compat"] }

View File

@@ -37,7 +37,6 @@ use greptime_proto::v1;
use greptime_proto::v1::ddl_request::Expr;
use greptime_proto::v1::greptime_request::Request;
use greptime_proto::v1::query_request::Query;
use greptime_proto::v1::region::region_request;
use greptime_proto::v1::value::ValueData;
use greptime_proto::v1::{DdlRequest, IntervalMonthDayNano, QueryRequest, SemanticType};
use snafu::prelude::*;
@@ -333,21 +332,6 @@ fn query_request_type(request: &QueryRequest) -> &'static str {
}
}
/// Returns the type name of the [RegionRequest].
pub fn region_request_type(request: &region_request::Body) -> &'static str {
match request {
region_request::Body::Inserts(_) => "region.inserts",
region_request::Body::Deletes(_) => "region.deletes",
region_request::Body::Create(_) => "region.create",
region_request::Body::Drop(_) => "region.drop",
region_request::Body::Open(_) => "region.open",
region_request::Body::Close(_) => "region.close",
region_request::Body::Alter(_) => "region.alter",
region_request::Body::Flush(_) => "region.flush",
region_request::Body::Compact(_) => "region.compact",
}
}
/// Returns the type name of the [DdlRequest].
fn ddl_request_type(request: &DdlRequest) -> &'static str {
match request.expr {

View File

@@ -22,6 +22,7 @@ common-telemetry = { workspace = true }
common-time = { workspace = true }
datafusion.workspace = true
datatypes = { workspace = true }
derive_builder.workspace = true
enum_dispatch = "0.3"
futures-util.workspace = true
moka = { version = "0.9", features = ["future"] }

View File

@@ -17,6 +17,7 @@ use std::sync::Arc;
use api::v1::greptime_database_client::GreptimeDatabaseClient;
use api::v1::health_check_client::HealthCheckClient;
use api::v1::prometheus_gateway_client::PrometheusGatewayClient;
use api::v1::region::region_client::RegionClient as PbRegionClient;
use api::v1::HealthCheckRequest;
use arrow_flight::flight_service_client::FlightServiceClient;
use common_grpc::channel_manager::ChannelManager;
@@ -82,11 +83,6 @@ impl Client {
Default::default()
}
pub fn with_manager(channel_manager: ChannelManager) -> Self {
let inner = Arc::new(Inner::with_manager(channel_manager));
Self { inner }
}
pub fn with_urls<U, A>(urls: A) -> Self
where
U: AsRef<str>,
@@ -157,6 +153,11 @@ impl Client {
})
}
pub(crate) fn raw_region_client(&self) -> Result<PbRegionClient<Channel>> {
let (_, channel) = self.find_channel()?;
Ok(PbRegionClient::new(channel))
}
pub fn make_prometheus_gateway_client(&self) -> Result<PrometheusGatewayClient<Channel>> {
let (_, channel) = self.find_channel()?;
Ok(PrometheusGatewayClient::new(channel))

View File

@@ -18,6 +18,7 @@ mod database;
pub mod error;
pub mod load_balance;
mod metrics;
pub mod region;
mod stream_insert;
pub use api;

View File

@@ -25,3 +25,4 @@ pub const METRIC_GRPC_FLUSH_TABLE: &str = "grpc.flush_table";
pub const METRIC_GRPC_COMPACT_TABLE: &str = "grpc.compact_table";
pub const METRIC_GRPC_TRUNCATE_TABLE: &str = "grpc.truncate_table";
pub const METRIC_GRPC_DO_GET: &str = "grpc.do_get";
pub(crate) const METRIC_REGION_REQUEST_GRPC: &str = "grpc.region_request";

146
src/client/src/region.rs Normal file
View File

@@ -0,0 +1,146 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::region::{region_request, RegionRequest, RegionRequestHeader, RegionResponse};
use api::v1::ResponseHeader;
use common_error::status_code::StatusCode;
use common_telemetry::timer;
use snafu::OptionExt;
use crate::error::{IllegalDatabaseResponseSnafu, Result, ServerSnafu};
use crate::{metrics, Client};
type AffectedRows = u64;
#[derive(Debug)]
pub struct RegionRequester {
trace_id: Option<u64>,
span_id: Option<u64>,
client: Client,
}
impl RegionRequester {
pub fn new(client: Client) -> Self {
// TODO(LFC): Pass in trace_id and span_id from some context when we have it.
Self {
trace_id: None,
span_id: None,
client,
}
}
pub async fn handle(self, request: region_request::Body) -> Result<AffectedRows> {
let request_type = request.as_ref().to_string();
let request = RegionRequest {
header: Some(RegionRequestHeader {
trace_id: self.trace_id,
span_id: self.span_id,
}),
body: Some(request),
};
let _timer = timer!(
metrics::METRIC_REGION_REQUEST_GRPC,
&[("request_type", request_type)]
);
let mut client = self.client.raw_region_client()?;
let RegionResponse {
header,
affected_rows,
} = client.handle(request).await?.into_inner();
check_response_header(header)?;
Ok(affected_rows)
}
}
fn check_response_header(header: Option<ResponseHeader>) -> Result<()> {
let status = header
.and_then(|header| header.status)
.context(IllegalDatabaseResponseSnafu {
err_msg: "either response header or status is missing",
})?;
if StatusCode::is_success(status.status_code) {
Ok(())
} else {
let code =
StatusCode::from_u32(status.status_code).context(IllegalDatabaseResponseSnafu {
err_msg: format!("unknown server status: {:?}", status),
})?;
ServerSnafu {
code,
msg: status.err_msg,
}
.fail()
}
}
#[cfg(test)]
mod test {
use api::v1::Status as PbStatus;
use super::*;
use crate::Error::{IllegalDatabaseResponse, Server};
#[test]
fn test_check_response_header() {
let result = check_response_header(None);
assert!(matches!(
result.unwrap_err(),
IllegalDatabaseResponse { .. }
));
let result = check_response_header(Some(ResponseHeader { status: None }));
assert!(matches!(
result.unwrap_err(),
IllegalDatabaseResponse { .. }
));
let result = check_response_header(Some(ResponseHeader {
status: Some(PbStatus {
status_code: StatusCode::Success as u32,
err_msg: "".to_string(),
}),
}));
assert!(result.is_ok());
let result = check_response_header(Some(ResponseHeader {
status: Some(PbStatus {
status_code: u32::MAX,
err_msg: "".to_string(),
}),
}));
assert!(matches!(
result.unwrap_err(),
IllegalDatabaseResponse { .. }
));
let result = check_response_header(Some(ResponseHeader {
status: Some(PbStatus {
status_code: StatusCode::Internal as u32,
err_msg: "blabla".to_string(),
}),
}));
let Server { code, msg } = result.unwrap_err() else {
unreachable!()
};
assert_eq!(code, StatusCode::Internal);
assert_eq!(msg, "blabla");
}
}

View File

@@ -35,6 +35,12 @@ pub const INFORMATION_SCHEMA_TABLES_TABLE_ID: u32 = 3;
pub const INFORMATION_SCHEMA_COLUMNS_TABLE_ID: u32 = 4;
pub const MITO_ENGINE: &str = "mito";
pub const MITO2_ENGINE: &str = "mito2";
pub fn default_engine() -> &'static str {
MITO_ENGINE
}
pub const IMMUTABLE_FILE_ENGINE: &str = "file";
pub const SEMANTIC_TYPE_PRIMARY_KEY: &str = "TAG";

View File

@@ -27,7 +27,7 @@ orc-rust = "0.2"
paste = "1.0"
regex = "1.7"
snafu.workspace = true
strum = { version = "0.21", features = ["derive"] }
strum.workspace = true
tokio-util.workspace = true
tokio.workspace = true
url = "2.3"

View File

@@ -6,4 +6,4 @@ license.workspace = true
[dependencies]
snafu = { version = "0.7", features = ["backtraces"] }
strum = { version = "0.24", features = ["std", "derive"] }
strum.workspace = true

View File

@@ -38,7 +38,7 @@ use catalog::remote::CachedMetaKvBackend;
use catalog::CatalogManagerRef;
use client::client_manager::DatanodeClients;
use common_base::Plugins;
use common_catalog::consts::MITO_ENGINE;
use common_catalog::consts::default_engine;
use common_error::ext::BoxedError;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
@@ -213,7 +213,6 @@ impl Instance {
let create_expr_factory = CreateExprFactory;
let row_inserter = Arc::new(RowInserter::new(
MITO_ENGINE.to_string(),
catalog_manager.clone(),
create_expr_factory,
dist_instance.clone(),
@@ -286,7 +285,6 @@ impl Instance {
let grpc_query_handler = StandaloneGrpcQueryHandler::arc(dn_instance.clone());
let row_inserter = Arc::new(RowInserter::new(
MITO_ENGINE.to_string(),
catalog_manager.clone(),
create_expr_factory,
grpc_query_handler.clone(),
@@ -366,7 +364,7 @@ impl Instance {
catalog_name, schema_name, table_name,
);
let _ = self
.create_table_by_columns(ctx, table_name, columns, MITO_ENGINE)
.create_table_by_columns(ctx, table_name, columns, default_engine())
.await?;
info!(
"Successfully created table on insertion: {}.{}.{}",

View File

@@ -17,6 +17,7 @@ use api::v1::ddl_request::Expr;
use api::v1::greptime_request::Request;
use api::v1::{AlterExpr, ColumnSchema, DdlRequest, Row, RowInsertRequest, RowInsertRequests};
use catalog::CatalogManagerRef;
use common_catalog::consts::default_engine;
use common_grpc_expr::util::{extract_new_columns, ColumnExpr};
use common_query::Output;
use common_telemetry::info;
@@ -30,7 +31,6 @@ use crate::error::{CatalogSnafu, EmptyDataSnafu, Error, FindNewColumnsOnInsertio
use crate::expr_factory::CreateExprFactory;
pub struct RowInserter {
engine_name: String,
catalog_manager: CatalogManagerRef,
create_expr_factory: CreateExprFactory,
grpc_query_handler: GrpcQueryHandlerRef<Error>,
@@ -38,13 +38,11 @@ pub struct RowInserter {
impl RowInserter {
pub fn new(
engine_name: String,
catalog_manager: CatalogManagerRef,
create_expr_factory: CreateExprFactory,
grpc_query_handler: GrpcQueryHandlerRef<Error>,
) -> Self {
Self {
engine_name,
catalog_manager,
create_expr_factory,
grpc_query_handler,
@@ -105,7 +103,7 @@ impl RowInserter {
let (column_schemas, _) = extract_schema_and_rows(req)?;
let create_table_expr = self
.create_expr_factory
.create_table_expr_by_column_schemas(table_name, column_schemas, &self.engine_name)?;
.create_table_expr_by_column_schemas(table_name, column_schemas, default_engine())?;
let req = Request::Ddl(DdlRequest {
expr: Some(Expr::CreateTable(create_table_expr)),

View File

@@ -44,6 +44,7 @@ serde_json = "1.0"
servers = { workspace = true }
snafu.workspace = true
store-api = { workspace = true }
strum.workspace = true
table = { workspace = true }
tokio-stream = { version = "0.1", features = ["net"] }
tokio.workspace = true
@@ -56,6 +57,7 @@ uuid.workspace = true
[dev-dependencies]
chrono.workspace = true
client = { workspace = true, features = ["testing"] }
common-procedure-test = { workspace = true }
session = { workspace = true }
tracing = "0.1"

View File

@@ -516,6 +516,9 @@ pub enum Error {
operation: String,
location: Location,
},
#[snafu(display("Primary key '{key}' not found when creating region request, at {location}"))]
PrimaryKeyNotFound { key: String, location: Location },
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -570,6 +573,7 @@ impl ErrorExt for Error {
| Error::UnsupportedSelectorType { .. }
| Error::InvalidArguments { .. }
| Error::InvalidHeartbeatRequest { .. }
| Error::PrimaryKeyNotFound { .. }
| Error::TooManyPartitions { .. } => StatusCode::InvalidArguments,
Error::LeaseKeyFromUtf8 { .. }
| Error::LeaseValueFromUtf8 { .. }

View File

@@ -19,7 +19,4 @@ pub(crate) const METRIC_META_ROUTE_REQUEST: &str = "meta.route_request";
pub(crate) const METRIC_META_HEARTBEAT_CONNECTION_NUM: &str = "meta.heartbeat_connection_num";
pub(crate) const METRIC_META_HANDLER_EXECUTE: &str = "meta.handler_execute";
pub(crate) const METRIC_META_CREATE_TABLE_PROCEDURE_CREATE_META: &str =
"meta.procedure.create_table.create_meta";
pub(crate) const METRIC_META_CREATE_TABLE_PROCEDURE_CREATE_TABLE: &str =
"meta.procedure.create_table.create_table";
pub(crate) const METRIC_META_PROCEDURE_CREATE_TABLE: &str = "meta.procedure.create_table";

View File

@@ -12,8 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::region::region_request::Body as PbRegionRequest;
use api::v1::region::{ColumnDef, CreateRequest as PbCreateRegionRequest};
use api::v1::SemanticType;
use async_trait::async_trait;
use client::region::RegionRequester;
use client::Database;
use common_catalog::consts::MITO2_ENGINE;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_meta::key::table_name::TableNameKey;
@@ -25,13 +30,16 @@ use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
use common_telemetry::info;
use futures::future::join_all;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
use table::engine::TableReference;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::RegionId;
use strum::AsRefStr;
use table::engine::{region_dir, TableReference};
use table::metadata::{RawTableInfo, TableId};
use super::utils::{handle_request_datanode_error, handle_retry_error};
use crate::ddl::DdlContext;
use crate::error::{self, Result, TableMetadataManagerSnafu};
use crate::error::{self, PrimaryKeyNotFoundSnafu, Result, TableMetadataManagerSnafu};
use crate::metrics;
pub struct CreateTableProcedure {
context: DdlContext,
@@ -69,6 +77,10 @@ impl CreateTableProcedure {
&self.creator.data.task.table_info
}
fn table_id(&self) -> TableId {
self.table_info().ident.table_id
}
pub fn region_routes(&self) -> &Vec<RegionRoute> {
&self.creator.data.region_routes
}
@@ -99,17 +111,126 @@ impl CreateTableProcedure {
return Ok(Status::Done);
}
self.creator.data.state = CreateTableState::DatanodeCreateTable;
self.creator.data.state = if expr.engine == MITO2_ENGINE {
CreateTableState::DatanodeCreateRegions
} else {
CreateTableState::DatanodeCreateTable
};
Ok(Status::executing(true))
}
fn create_region_request_template(&self) -> Result<PbCreateRegionRequest> {
let create_table_expr = &self.creator.data.task.create_table;
let column_defs = create_table_expr
.column_defs
.iter()
.enumerate()
.map(|(i, c)| {
let semantic_type = if create_table_expr.time_index == c.name {
SemanticType::Timestamp
} else if create_table_expr.primary_keys.contains(&c.name) {
SemanticType::Tag
} else {
SemanticType::Field
};
ColumnDef {
name: c.name.clone(),
column_id: i as u32,
datatype: c.datatype,
is_nullable: c.is_nullable,
default_constraint: c.default_constraint.clone(),
semantic_type: semantic_type as i32,
}
})
.collect::<Vec<_>>();
let primary_key = create_table_expr
.primary_keys
.iter()
.map(|key| {
column_defs
.iter()
.find_map(|c| {
if &c.name == key {
Some(c.column_id)
} else {
None
}
})
.context(PrimaryKeyNotFoundSnafu { key })
})
.collect::<Result<_>>()?;
Ok(PbCreateRegionRequest {
region_id: 0,
engine: create_table_expr.engine.to_string(),
column_defs,
primary_key,
create_if_not_exists: true,
region_dir: "".to_string(),
options: create_table_expr.table_options.clone(),
})
}
async fn on_datanode_create_regions(&mut self) -> Result<Status> {
let create_table_data = &self.creator.data;
let region_routes = &create_table_data.region_routes;
let create_table_expr = &create_table_data.task.create_table;
let catalog = &create_table_expr.catalog_name;
let schema = &create_table_expr.schema_name;
let request_template = self.create_region_request_template()?;
let leaders = find_leaders(region_routes);
let mut create_table_tasks = Vec::with_capacity(leaders.len());
for datanode in leaders {
let clients = self.context.datanode_clients.clone();
let regions = find_leader_regions(region_routes, &datanode);
let requests = regions
.iter()
.map(|region_number| {
let region_id = RegionId::new(self.table_id(), *region_number);
let mut create_table_request = request_template.clone();
create_table_request.region_id = region_id.as_u64();
create_table_request.region_dir = region_dir(catalog, schema, region_id);
PbRegionRequest::Create(create_table_request)
})
.collect::<Vec<_>>();
create_table_tasks.push(common_runtime::spawn_bg(async move {
for request in requests {
let client = clients.get_client(&datanode).await;
let requester = RegionRequester::new(client);
if let Err(err) = requester.handle(request).await {
return Err(handle_request_datanode_error(datanode)(err));
}
}
Ok(())
}));
}
join_all(create_table_tasks)
.await
.into_iter()
.map(|e| e.context(error::JoinSnafu).flatten())
.collect::<Result<Vec<_>>>()?;
self.creator.data.state = CreateTableState::CreateMetadata;
Ok(Status::executing(true))
}
async fn on_create_metadata(&self) -> Result<Status> {
let _timer = common_telemetry::timer!(
crate::metrics::METRIC_META_CREATE_TABLE_PROCEDURE_CREATE_META
);
let table_id = self.table_info().ident.table_id as TableId;
let table_id = self.table_id();
let manager = &self.context.table_metadata_manager;
let raw_table_info = self.table_info().clone();
@@ -124,15 +245,12 @@ impl CreateTableProcedure {
}
async fn on_datanode_create_table(&mut self) -> Result<Status> {
let _timer = common_telemetry::timer!(
crate::metrics::METRIC_META_CREATE_TABLE_PROCEDURE_CREATE_TABLE
);
let region_routes = &self.creator.data.region_routes;
let table_name = self.table_name();
let clients = self.context.datanode_clients.clone();
let leaders = find_leaders(region_routes);
let mut joins = Vec::with_capacity(leaders.len());
let table_id = self.table_info().ident.table_id;
let table_id = self.table_id();
for datanode in leaders {
let client = clients.get_client(&datanode).await;
@@ -172,9 +290,17 @@ impl Procedure for CreateTableProcedure {
}
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
match self.creator.data.state {
let state = &self.creator.data.state;
let _timer = common_telemetry::timer!(
metrics::METRIC_META_PROCEDURE_CREATE_TABLE,
&[("step", state.as_ref().to_string())]
);
match state {
CreateTableState::Prepare => self.on_prepare().await,
CreateTableState::DatanodeCreateTable => self.on_datanode_create_table().await,
CreateTableState::DatanodeCreateRegions => self.on_datanode_create_regions().await,
CreateTableState::CreateMetadata => self.on_create_metadata().await,
}
.map_err(handle_retry_error)
@@ -213,12 +339,14 @@ impl TableCreator {
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr)]
enum CreateTableState {
/// Prepares to create the table
Prepare,
/// Datanode creates the table
DatanodeCreateTable,
/// Create regions on the Datanode
DatanodeCreateRegions,
/// Creates metadata
CreateMetadata,
}
@@ -236,3 +364,323 @@ impl CreateTableData {
self.task.table_ref()
}
}
#[cfg(test)]
mod test {
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, Mutex};
use api::v1::region::region_server::RegionServer;
use api::v1::region::RegionResponse;
use api::v1::{
ColumnDataType, ColumnDef as PbColumnDef, CreateTableExpr, ResponseHeader,
Status as PbStatus,
};
use chrono::DateTime;
use client::client_manager::DatanodeClients;
use client::Client;
use common_grpc::channel_manager::ChannelManager;
use common_meta::key::TableMetadataManager;
use common_meta::peer::Peer;
use common_runtime::{Builder as RuntimeBuilder, Runtime};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, RawSchema};
use servers::grpc::region_server::{RegionServerHandler, RegionServerRequestHandler};
use table::metadata::{RawTableMeta, TableIdent, TableType};
use table::requests::TableOptions;
use tokio::sync::mpsc;
use tonic::transport::Server;
use tower::service_fn;
use super::*;
use crate::handler::{HeartbeatMailbox, Pushers};
use crate::sequence::Sequence;
use crate::service::store::kv::KvBackendAdapter;
use crate::service::store::memory::MemStore;
use crate::test_util::new_region_route;
fn create_table_procedure() -> CreateTableProcedure {
let create_table_expr = CreateTableExpr {
catalog_name: "my_catalog".to_string(),
schema_name: "my_schema".to_string(),
table_name: "my_table".to_string(),
desc: "blabla".to_string(),
column_defs: vec![
PbColumnDef {
name: "ts".to_string(),
datatype: ColumnDataType::TimestampMillisecond as i32,
is_nullable: false,
default_constraint: vec![],
},
PbColumnDef {
name: "my_tag1".to_string(),
datatype: ColumnDataType::String as i32,
is_nullable: true,
default_constraint: vec![],
},
PbColumnDef {
name: "my_tag2".to_string(),
datatype: ColumnDataType::String as i32,
is_nullable: true,
default_constraint: vec![],
},
PbColumnDef {
name: "my_field_column".to_string(),
datatype: ColumnDataType::Int32 as i32,
is_nullable: true,
default_constraint: vec![],
},
],
time_index: "ts".to_string(),
primary_keys: vec!["my_tag2".to_string(), "my_tag1".to_string()],
create_if_not_exists: false,
table_options: HashMap::new(),
table_id: None,
region_numbers: vec![1, 2, 3],
engine: MITO2_ENGINE.to_string(),
};
let raw_table_info = RawTableInfo {
ident: TableIdent::new(42),
name: "my_table".to_string(),
desc: Some("blabla".to_string()),
catalog_name: "my_catalog".to_string(),
schema_name: "my_schema".to_string(),
meta: RawTableMeta {
schema: RawSchema {
column_schemas: vec![
ColumnSchema::new(
"ts".to_string(),
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
ColumnSchema::new(
"my_tag1".to_string(),
ConcreteDataType::string_datatype(),
true,
),
ColumnSchema::new(
"my_tag2".to_string(),
ConcreteDataType::string_datatype(),
true,
),
ColumnSchema::new(
"my_field_column".to_string(),
ConcreteDataType::int32_datatype(),
true,
),
],
timestamp_index: Some(0),
version: 0,
},
primary_key_indices: vec![1, 2],
value_indices: vec![2],
engine: MITO2_ENGINE.to_string(),
next_column_id: 3,
region_numbers: vec![1, 2, 3],
engine_options: HashMap::new(),
options: TableOptions::default(),
created_on: DateTime::default(),
partition_key_indices: vec![],
},
table_type: TableType::Base,
};
let peers = vec![
Peer::new(1, "127.0.0.1:4001"),
Peer::new(2, "127.0.0.1:4002"),
Peer::new(3, "127.0.0.1:4003"),
];
let region_routes = vec![
new_region_route(1, &peers, 3),
new_region_route(2, &peers, 2),
new_region_route(3, &peers, 1),
];
let kv_store = Arc::new(MemStore::new());
let mailbox_sequence = Sequence::new("test_heartbeat_mailbox", 0, 100, kv_store.clone());
let mailbox = HeartbeatMailbox::create(Pushers::default(), mailbox_sequence);
CreateTableProcedure::new(
1,
CreateTableTask::new(create_table_expr, vec![], raw_table_info),
region_routes,
DdlContext {
datanode_clients: Arc::new(DatanodeClients::default()),
mailbox,
server_addr: "127.0.0.1:4321".to_string(),
table_metadata_manager: Arc::new(TableMetadataManager::new(
KvBackendAdapter::wrap(kv_store),
)),
},
)
}
#[test]
fn test_create_region_request_template() {
let procedure = create_table_procedure();
let template = procedure.create_region_request_template().unwrap();
let expected = PbCreateRegionRequest {
region_id: 0,
engine: MITO2_ENGINE.to_string(),
column_defs: vec![
ColumnDef {
name: "ts".to_string(),
column_id: 0,
datatype: ColumnDataType::TimestampMillisecond as i32,
is_nullable: false,
default_constraint: vec![],
semantic_type: SemanticType::Timestamp as i32,
},
ColumnDef {
name: "my_tag1".to_string(),
column_id: 1,
datatype: ColumnDataType::String as i32,
is_nullable: true,
default_constraint: vec![],
semantic_type: SemanticType::Tag as i32,
},
ColumnDef {
name: "my_tag2".to_string(),
column_id: 2,
datatype: ColumnDataType::String as i32,
is_nullable: true,
default_constraint: vec![],
semantic_type: SemanticType::Tag as i32,
},
ColumnDef {
name: "my_field_column".to_string(),
column_id: 3,
datatype: ColumnDataType::Int32 as i32,
is_nullable: true,
default_constraint: vec![],
semantic_type: SemanticType::Field as i32,
},
],
primary_key: vec![2, 1],
create_if_not_exists: true,
region_dir: "".to_string(),
options: HashMap::new(),
};
assert_eq!(template, expected);
}
#[derive(Clone)]
struct TestingRegionServerHandler {
runtime: Arc<Runtime>,
create_region_notifier: mpsc::Sender<RegionId>,
}
impl TestingRegionServerHandler {
fn new(create_region_notifier: mpsc::Sender<RegionId>) -> Self {
Self {
runtime: Arc::new(RuntimeBuilder::default().worker_threads(2).build().unwrap()),
create_region_notifier,
}
}
fn new_client(&self, datanode: &Peer) -> Client {
let (client, server) = tokio::io::duplex(1024);
let handler =
RegionServerRequestHandler::new(Arc::new(self.clone()), self.runtime.clone());
tokio::spawn(async move {
Server::builder()
.add_service(RegionServer::new(handler))
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(
server,
)]))
.await
});
let channel_manager = ChannelManager::new();
let mut client = Some(client);
channel_manager
.reset_with_connector(
datanode.addr.clone(),
service_fn(move |_| {
let client = client.take().unwrap();
async move { Ok::<_, std::io::Error>(client) }
}),
)
.unwrap();
Client::with_manager_and_urls(channel_manager, vec![datanode.addr.clone()])
}
}
#[async_trait]
impl RegionServerHandler for TestingRegionServerHandler {
async fn handle(&self, request: PbRegionRequest) -> servers::error::Result<RegionResponse> {
let PbRegionRequest::Create(request) = request else {
unreachable!()
};
let region_id = request.region_id.into();
self.create_region_notifier.send(region_id).await.unwrap();
Ok(RegionResponse {
header: Some(ResponseHeader {
status: Some(PbStatus {
status_code: 0,
err_msg: "".to_string(),
}),
}),
affected_rows: 0,
})
}
}
#[tokio::test]
async fn test_on_datanode_create_regions() {
let mut procedure = create_table_procedure();
let (tx, mut rx) = mpsc::channel(10);
let region_server = TestingRegionServerHandler::new(tx);
let datanodes = find_leaders(&procedure.creator.data.region_routes);
for peer in datanodes {
let client = region_server.new_client(&peer);
procedure
.context
.datanode_clients
.insert_client(peer, client)
.await;
}
let expected_created_regions = Arc::new(Mutex::new(HashSet::from([
RegionId::new(42, 1),
RegionId::new(42, 2),
RegionId::new(42, 3),
])));
let handle = tokio::spawn({
let expected_created_regions = expected_created_regions.clone();
let mut max_recv = expected_created_regions.lock().unwrap().len();
async move {
while let Some(region_id) = rx.recv().await {
expected_created_regions.lock().unwrap().remove(&region_id);
max_recv -= 1;
if max_recv == 0 {
break;
}
}
}
});
let status = procedure.on_datanode_create_regions().await.unwrap();
assert!(matches!(status, Status::Executing { persist: true }));
assert!(matches!(
procedure.creator.data.state,
CreateTableState::CreateMetadata
));
handle.await.unwrap();
assert!(expected_created_regions.lock().unwrap().is_empty());
}
}

View File

@@ -158,7 +158,7 @@ mod tests {
use super::super::tests::{TestingEnv, TestingEnvBuilder};
use super::{State, *};
use crate::table_routes::tests::new_region_route;
use crate::test_util::new_region_route;
#[tokio::test]
async fn test_next_state() {

View File

@@ -72,7 +72,7 @@ pub(crate) async fn fetch_tables(
#[cfg(test)]
pub(crate) mod tests {
use std::collections::{BTreeMap, HashMap};
use std::collections::HashMap;
use chrono::DateTime;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE};
@@ -141,25 +141,4 @@ pub(crate) mod tests {
.await
.unwrap();
}
pub(crate) fn new_region_route(
region_number: u64,
peers: &[Peer],
leader_node: u64,
) -> RegionRoute {
let region = Region {
id: region_number.into(),
name: "".to_string(),
partition: None,
attrs: BTreeMap::new(),
};
let leader_peer = peers.iter().find(|peer| peer.id == leader_node).cloned();
RegionRoute {
region,
leader_peer,
follower_peers: vec![],
}
}
}

View File

@@ -15,6 +15,8 @@
use std::sync::Arc;
use common_meta::key::TableMetadataManager;
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use common_procedure::local::{LocalManager, ManagerConfig};
use crate::cluster::MetaPeerClientBuilder;
@@ -28,6 +30,21 @@ use crate::sequence::Sequence;
use crate::service::store::kv::KvBackendAdapter;
use crate::service::store::memory::MemStore;
pub(crate) fn new_region_route(region_id: u64, peers: &[Peer], leader_node: u64) -> RegionRoute {
let region = Region {
id: region_id.into(),
..Default::default()
};
let leader_peer = peers.iter().find(|peer| peer.id == leader_node).cloned();
RegionRoute {
region,
leader_peer,
follower_peers: vec![],
}
}
pub(crate) fn create_region_failover_manager() -> Arc<RegionFailoverManager> {
let kv_store = Arc::new(MemStore::new());

View File

@@ -49,7 +49,7 @@ serde_json = "1.0"
snafu.workspace = true
storage = { workspace = true }
store-api = { workspace = true }
strum = "0.21"
strum.workspace = true
table = { workspace = true }
tokio-util.workspace = true
tokio.workspace = true

View File

@@ -83,7 +83,7 @@ sha1 = "0.10"
snafu.workspace = true
snap = "1"
sql = { workspace = true }
strum = { version = "0.24", features = ["derive"] }
strum.workspace = true
table = { workspace = true }
tokio-rustls = "0.24"
tokio-stream = { version = "0.1", features = ["net"] }

View File

@@ -88,9 +88,8 @@ impl GrpcServer {
) -> Self {
let database_handler =
GreptimeRequestHandler::new(query_handler, user_provider.clone(), runtime.clone());
let region_server_handler = region_server_handler.map(|handler| {
RegionServerRequestHandler::new(handler, user_provider.clone(), runtime.clone())
});
let region_server_handler =
region_server_handler.map(|handler| RegionServerRequestHandler::new(handler, runtime));
Self {
shutdown_tx: Mutex::new(None),
user_provider,

View File

@@ -71,7 +71,7 @@ impl GreptimeRequestHandler {
query_ctx.set_current_user(user_info);
let handler = self.handler.clone();
let request_type = request_type(&query);
let request_type = request_type(&query).to_string();
let db = query_ctx.get_db_string();
let timer = RequestTimer::new(db.clone(), request_type);
@@ -180,13 +180,13 @@ pub(crate) fn create_query_context(header: Option<&RequestHeader>) -> QueryConte
pub(crate) struct RequestTimer {
start: Instant,
db: String,
request_type: &'static str,
request_type: String,
status_code: StatusCode,
}
impl RequestTimer {
/// Returns a new timer.
pub fn new(db: String, request_type: &'static str) -> RequestTimer {
pub fn new(db: String, request_type: String) -> RequestTimer {
RequestTimer {
start: Instant::now(),
db,
@@ -208,7 +208,7 @@ impl Drop for RequestTimer {
self.start.elapsed(),
&[
(METRIC_DB_LABEL, std::mem::take(&mut self.db)),
(METRIC_TYPE_LABEL, self.request_type.to_string()),
(METRIC_TYPE_LABEL, std::mem::take(&mut self.request_type)),
(METRIC_CODE_LABEL, self.status_code.to_string())
]
);

View File

@@ -14,30 +14,17 @@
use std::sync::Arc;
use api::helper::region_request_type;
use api::v1::auth_header::AuthScheme;
use api::v1::region::region_server::Region as RegionServer;
use api::v1::region::{region_request, RegionRequest, RegionResponse};
use api::v1::{Basic, RequestHeader};
use async_trait::async_trait;
use auth::{Identity, Password, UserInfoRef, UserProviderRef};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_catalog::parse_catalog_and_schema_from_db_string;
use common_error::ext::ErrorExt;
use common_runtime::Runtime;
use common_telemetry::{debug, error};
use metrics::increment_counter;
use session::context::{QueryContextBuilder, QueryContextRef};
use snafu::{OptionExt, ResultExt};
use tonic::{Request, Response};
use crate::error::{
AuthSnafu, InvalidQuerySnafu, JoinTaskSnafu, NotFoundAuthHeaderSnafu, Result,
UnsupportedAuthSchemeSnafu,
};
use crate::grpc::greptime_handler::RequestTimer;
use crate::error::{InvalidQuerySnafu, JoinTaskSnafu, Result};
use crate::grpc::TonicResult;
use crate::metrics::{METRIC_AUTH_FAILURE, METRIC_CODE_LABEL};
#[async_trait]
pub trait RegionServerHandler: Send + Sync {
@@ -49,21 +36,12 @@ pub type RegionServerHandlerRef = Arc<dyn RegionServerHandler>;
#[derive(Clone)]
pub struct RegionServerRequestHandler {
handler: Arc<dyn RegionServerHandler>,
user_provider: Option<UserProviderRef>,
runtime: Arc<Runtime>,
}
impl RegionServerRequestHandler {
pub fn new(
handler: Arc<dyn RegionServerHandler>,
user_provider: Option<UserProviderRef>,
runtime: Arc<Runtime>,
) -> Self {
Self {
handler,
user_provider,
runtime,
}
pub fn new(handler: Arc<dyn RegionServerHandler>, runtime: Arc<Runtime>) -> Self {
Self { handler, runtime }
}
async fn handle(&self, request: RegionRequest) -> Result<RegionResponse> {
@@ -71,15 +49,7 @@ impl RegionServerRequestHandler {
reason: "Expecting non-empty GreptimeRequest.",
})?;
let header = request.header.as_ref();
let query_ctx = create_query_context(header);
let user_info = self.auth(header, &query_ctx).await?;
query_ctx.set_current_user(user_info);
let handler = self.handler.clone();
let request_type = region_request_type(&query);
let db = query_ctx.get_db_string();
let timer = RequestTimer::new(db.clone(), request_type);
// Executes requests in another runtime to
// 1. prevent the execution from being cancelled unexpected by Tonic runtime;
@@ -100,85 +70,8 @@ impl RegionServerRequestHandler {
})
});
handle.await.context(JoinTaskSnafu).map_err(|e| {
timer.record(e.status_code());
e
})?
handle.await.context(JoinTaskSnafu)?
}
async fn auth(
&self,
header: Option<&RequestHeader>,
query_ctx: &QueryContextRef,
) -> Result<Option<UserInfoRef>> {
let Some(user_provider) = self.user_provider.as_ref() else {
return Ok(None);
};
let auth_scheme = header
.and_then(|header| {
header
.authorization
.as_ref()
.and_then(|x| x.auth_scheme.clone())
})
.context(NotFoundAuthHeaderSnafu)?;
match auth_scheme {
AuthScheme::Basic(Basic { username, password }) => user_provider
.auth(
Identity::UserId(&username, None),
Password::PlainText(password.into()),
query_ctx.current_catalog(),
query_ctx.current_schema(),
)
.await
.context(AuthSnafu),
AuthScheme::Token(_) => UnsupportedAuthSchemeSnafu {
name: "Token AuthScheme".to_string(),
}
.fail(),
}
.map(Some)
.map_err(|e| {
increment_counter!(
METRIC_AUTH_FAILURE,
&[(METRIC_CODE_LABEL, format!("{}", e.status_code()))]
);
e
})
}
}
pub(crate) fn create_query_context(header: Option<&RequestHeader>) -> QueryContextRef {
let (catalog, schema) = header
.map(|header| {
// We provide dbname field in newer versions of protos/sdks
// parse dbname from header in priority
if !header.dbname.is_empty() {
parse_catalog_and_schema_from_db_string(&header.dbname)
} else {
(
if !header.catalog.is_empty() {
&header.catalog
} else {
DEFAULT_CATALOG_NAME
},
if !header.schema.is_empty() {
&header.schema
} else {
DEFAULT_SCHEMA_NAME
},
)
}
})
.unwrap_or((DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME));
QueryContextBuilder::default()
.current_catalog(catalog.to_string())
.current_schema(schema.to_string())
.try_trace_id(header.and_then(|h: &RequestHeader| h.trace_id))
.build()
}
#[async_trait]

View File

@@ -14,6 +14,7 @@
use std::cmp::Ordering;
use common_catalog::consts::default_engine;
use itertools::Itertools;
use once_cell::sync::Lazy;
use snafu::{ensure, OptionExt, ResultExt};
@@ -143,7 +144,7 @@ impl<'a> ParserContext<'a> {
let partitions = self.parse_partitions()?;
let engine = self.parse_table_engine(common_catalog::consts::MITO_ENGINE)?;
let engine = self.parse_table_engine(default_engine())?;
let options = self
.parser
.parse_options(Keyword::WITH)

View File

@@ -18,7 +18,7 @@ use std::sync::Arc;
use common_base::paths::DATA_DIR;
use common_procedure::BoxedProcedure;
use datafusion_common::TableReference as DfTableReference;
use store_api::storage::RegionNumber;
use store_api::storage::{RegionId, RegionNumber};
use crate::error::{self, Result};
use crate::metadata::TableId;
@@ -198,6 +198,14 @@ pub fn table_dir(catalog_name: &str, schema_name: &str, table_id: TableId) -> St
format!("{DATA_DIR}{catalog_name}/{schema_name}/{table_id}/")
}
pub fn region_dir(catalog_name: &str, schema_name: &str, region_id: RegionId) -> String {
format!(
"{}{}",
table_dir(catalog_name, schema_name, region_id.table_id()),
region_name(region_id.table_id(), region_id.region_number())
)
}
#[cfg(test)]
mod tests {
use super::*;
@@ -212,4 +220,13 @@ mod tests {
assert_eq!("greptime.public.test", table_ref.to_string());
}
#[test]
fn test_region_dir() {
let region_id = RegionId::new(42, 1);
assert_eq!(
region_dir("my_catalog", "my_schema", region_id),
"data/my_catalog/my_schema/42/42_0000000001"
);
}
}