mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-23 06:30:05 +00:00
Compare commits
27 Commits
feature/df
...
poc/create
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7e79b4b2f6 | ||
|
|
4ad40af468 | ||
|
|
e4b048e788 | ||
|
|
ecbf372de3 | ||
|
|
3d81a17360 | ||
|
|
025cae3679 | ||
|
|
68409e28ea | ||
|
|
699406ae32 | ||
|
|
344006deca | ||
|
|
63803f2b43 | ||
|
|
cf62767b98 | ||
|
|
4e53c1531d | ||
|
|
892cb66c53 | ||
|
|
8b392477c8 | ||
|
|
905593dc16 | ||
|
|
6c04cb9b19 | ||
|
|
24da3367c1 | ||
|
|
80b14965a6 | ||
|
|
5da3f86d0c | ||
|
|
151273d1df | ||
|
|
b0289dbdde | ||
|
|
c51730a954 | ||
|
|
207709c727 | ||
|
|
deca8c44fa | ||
|
|
2edd861ce9 | ||
|
|
14f3a4ab05 | ||
|
|
34875c0346 |
21
Cargo.lock
generated
21
Cargo.lock
generated
@@ -4738,6 +4738,7 @@ dependencies = [
|
|||||||
"log-store",
|
"log-store",
|
||||||
"meta-client",
|
"meta-client",
|
||||||
"num_cpus",
|
"num_cpus",
|
||||||
|
"object-store",
|
||||||
"opentelemetry-proto 0.27.0",
|
"opentelemetry-proto 0.27.0",
|
||||||
"operator",
|
"operator",
|
||||||
"otel-arrow-rust",
|
"otel-arrow-rust",
|
||||||
@@ -6698,7 +6699,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||||||
checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4"
|
checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"cfg-if",
|
"cfg-if",
|
||||||
"windows-targets 0.48.5",
|
"windows-targets 0.52.6",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@@ -7234,6 +7235,7 @@ dependencies = [
|
|||||||
name = "metric-engine"
|
name = "metric-engine"
|
||||||
version = "0.15.0"
|
version = "0.15.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
"ahash 0.8.11",
|
||||||
"api",
|
"api",
|
||||||
"aquamarine",
|
"aquamarine",
|
||||||
"async-stream",
|
"async-stream",
|
||||||
@@ -11230,6 +11232,7 @@ dependencies = [
|
|||||||
"common-base",
|
"common-base",
|
||||||
"common-catalog",
|
"common-catalog",
|
||||||
"common-config",
|
"common-config",
|
||||||
|
"common-datasource",
|
||||||
"common-error",
|
"common-error",
|
||||||
"common-frontend",
|
"common-frontend",
|
||||||
"common-grpc",
|
"common-grpc",
|
||||||
@@ -11272,16 +11275,23 @@ dependencies = [
|
|||||||
"local-ip-address",
|
"local-ip-address",
|
||||||
"log-query",
|
"log-query",
|
||||||
"loki-proto",
|
"loki-proto",
|
||||||
|
"metric-engine",
|
||||||
"mime_guess",
|
"mime_guess",
|
||||||
|
"mito-codec",
|
||||||
|
"mito2",
|
||||||
"mysql_async",
|
"mysql_async",
|
||||||
"notify",
|
"notify",
|
||||||
"object-pool",
|
"object-pool",
|
||||||
|
"object-store",
|
||||||
"once_cell",
|
"once_cell",
|
||||||
"openmetrics-parser",
|
"openmetrics-parser",
|
||||||
"opensrv-mysql",
|
"opensrv-mysql",
|
||||||
"opentelemetry-proto 0.27.0",
|
"opentelemetry-proto 0.27.0",
|
||||||
|
"operator",
|
||||||
"otel-arrow-rust",
|
"otel-arrow-rust",
|
||||||
"parking_lot 0.12.3",
|
"parking_lot 0.12.3",
|
||||||
|
"parquet",
|
||||||
|
"partition",
|
||||||
"permutation",
|
"permutation",
|
||||||
"pgwire",
|
"pgwire",
|
||||||
"pin-project",
|
"pin-project",
|
||||||
@@ -13856,12 +13866,13 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "uuid"
|
name = "uuid"
|
||||||
version = "1.10.0"
|
version = "1.17.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "81dfa00651efa65069b0b6b651f4aaa31ba9e3c3ce0137aaad053604ee7e0314"
|
checksum = "3cf4199d1e5d15ddd86a694e4d0dffa9c323ce759fea589f00fef9d81cc1931d"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"getrandom 0.2.15",
|
"getrandom 0.3.2",
|
||||||
"rand 0.8.5",
|
"js-sys",
|
||||||
|
"rand 0.9.0",
|
||||||
"serde",
|
"serde",
|
||||||
"wasm-bindgen",
|
"wasm-bindgen",
|
||||||
]
|
]
|
||||||
|
|||||||
@@ -21,6 +21,7 @@ pub mod error;
|
|||||||
pub mod file_format;
|
pub mod file_format;
|
||||||
pub mod lister;
|
pub mod lister;
|
||||||
pub mod object_store;
|
pub mod object_store;
|
||||||
|
pub mod parquet_writer;
|
||||||
pub mod share_buffer;
|
pub mod share_buffer;
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
pub mod test_util;
|
pub mod test_util;
|
||||||
|
|||||||
52
src/common/datasource/src/parquet_writer.rs
Normal file
52
src/common/datasource/src/parquet_writer.rs
Normal file
@@ -0,0 +1,52 @@
|
|||||||
|
// Copyright 2023 Greptime Team
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
use bytes::Bytes;
|
||||||
|
use futures::future::BoxFuture;
|
||||||
|
use object_store::Writer;
|
||||||
|
use parquet::arrow::async_writer::AsyncFileWriter;
|
||||||
|
use parquet::errors::ParquetError;
|
||||||
|
|
||||||
|
/// Bridges opendal [Writer] with parquet [AsyncFileWriter].
|
||||||
|
pub struct AsyncWriter {
|
||||||
|
inner: Writer,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AsyncWriter {
|
||||||
|
/// Create a [`AsyncWriter`] by given [`Writer`].
|
||||||
|
pub fn new(writer: Writer) -> Self {
|
||||||
|
Self { inner: writer }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AsyncFileWriter for AsyncWriter {
|
||||||
|
fn write(&mut self, bs: Bytes) -> BoxFuture<'_, parquet::errors::Result<()>> {
|
||||||
|
Box::pin(async move {
|
||||||
|
self.inner
|
||||||
|
.write(bs)
|
||||||
|
.await
|
||||||
|
.map_err(|err| ParquetError::External(Box::new(err)))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn complete(&mut self) -> BoxFuture<'_, parquet::errors::Result<()>> {
|
||||||
|
Box::pin(async move {
|
||||||
|
self.inner
|
||||||
|
.close()
|
||||||
|
.await
|
||||||
|
.map(|_| ())
|
||||||
|
.map_err(|err| ParquetError::External(Box::new(err)))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -26,7 +26,7 @@ use common_error::ext::BoxedError;
|
|||||||
use common_meta::cache::{LayeredCacheRegistryRef, TableFlownodeSetCacheRef, TableRouteCacheRef};
|
use common_meta::cache::{LayeredCacheRegistryRef, TableFlownodeSetCacheRef, TableRouteCacheRef};
|
||||||
use common_meta::ddl::ProcedureExecutorRef;
|
use common_meta::ddl::ProcedureExecutorRef;
|
||||||
use common_meta::key::flow::FlowMetadataManagerRef;
|
use common_meta::key::flow::FlowMetadataManagerRef;
|
||||||
use common_meta::key::TableMetadataManagerRef;
|
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
|
||||||
use common_meta::kv_backend::KvBackendRef;
|
use common_meta::kv_backend::KvBackendRef;
|
||||||
use common_meta::node_manager::{Flownode, NodeManagerRef};
|
use common_meta::node_manager::{Flownode, NodeManagerRef};
|
||||||
use common_query::Output;
|
use common_query::Output;
|
||||||
@@ -37,6 +37,7 @@ use greptime_proto::v1::flow::{flow_server, FlowRequest, FlowResponse, InsertReq
|
|||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
use operator::delete::Deleter;
|
use operator::delete::Deleter;
|
||||||
use operator::insert::Inserter;
|
use operator::insert::Inserter;
|
||||||
|
use operator::schema_helper::SchemaHelper;
|
||||||
use operator::statement::StatementExecutor;
|
use operator::statement::StatementExecutor;
|
||||||
use partition::manager::PartitionRuleManager;
|
use partition::manager::PartitionRuleManager;
|
||||||
use query::{QueryEngine, QueryEngineFactory};
|
use query::{QueryEngine, QueryEngineFactory};
|
||||||
@@ -546,8 +547,14 @@ impl FrontendInvoker {
|
|||||||
name: TABLE_FLOWNODE_SET_CACHE_NAME,
|
name: TABLE_FLOWNODE_SET_CACHE_NAME,
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
let inserter = Arc::new(Inserter::new(
|
let schema_helper = SchemaHelper::new(
|
||||||
catalog_manager.clone(),
|
catalog_manager.clone(),
|
||||||
|
Arc::new(TableMetadataManager::new(kv_backend.clone())),
|
||||||
|
procedure_executor.clone(),
|
||||||
|
layered_cache_registry.clone(),
|
||||||
|
);
|
||||||
|
let inserter = Arc::new(Inserter::new(
|
||||||
|
schema_helper,
|
||||||
partition_manager.clone(),
|
partition_manager.clone(),
|
||||||
node_manager.clone(),
|
node_manager.clone(),
|
||||||
table_flownode_cache,
|
table_flownode_cache,
|
||||||
@@ -588,7 +595,7 @@ impl FrontendInvoker {
|
|||||||
.start_timer();
|
.start_timer();
|
||||||
|
|
||||||
self.inserter
|
self.inserter
|
||||||
.handle_row_inserts(requests, ctx, &self.statement_executor, false, false)
|
.handle_row_inserts(requests, ctx, false, false)
|
||||||
.await
|
.await
|
||||||
.map_err(BoxedError::new)
|
.map_err(BoxedError::new)
|
||||||
.context(common_frontend::error::ExternalSnafu)
|
.context(common_frontend::error::ExternalSnafu)
|
||||||
|
|||||||
@@ -49,6 +49,7 @@ log-query.workspace = true
|
|||||||
log-store.workspace = true
|
log-store.workspace = true
|
||||||
meta-client.workspace = true
|
meta-client.workspace = true
|
||||||
num_cpus.workspace = true
|
num_cpus.workspace = true
|
||||||
|
object-store.workspace = true
|
||||||
opentelemetry-proto.workspace = true
|
opentelemetry-proto.workspace = true
|
||||||
operator.workspace = true
|
operator.workspace = true
|
||||||
otel-arrow-rust.workspace = true
|
otel-arrow-rust.workspace = true
|
||||||
|
|||||||
@@ -19,6 +19,7 @@ use common_config::config::Configurable;
|
|||||||
use common_options::datanode::DatanodeClientOptions;
|
use common_options::datanode::DatanodeClientOptions;
|
||||||
use common_telemetry::logging::{LoggingOptions, SlowQueryOptions, TracingOptions};
|
use common_telemetry::logging::{LoggingOptions, SlowQueryOptions, TracingOptions};
|
||||||
use meta_client::MetaClientOptions;
|
use meta_client::MetaClientOptions;
|
||||||
|
use object_store::config::ObjectStoreConfig;
|
||||||
use query::options::QueryOptions;
|
use query::options::QueryOptions;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use servers::export_metrics::{ExportMetricsOption, ExportMetricsTask};
|
use servers::export_metrics::{ExportMetricsOption, ExportMetricsTask};
|
||||||
@@ -62,6 +63,7 @@ pub struct FrontendOptions {
|
|||||||
pub query: QueryOptions,
|
pub query: QueryOptions,
|
||||||
pub max_in_flight_write_bytes: Option<ReadableSize>,
|
pub max_in_flight_write_bytes: Option<ReadableSize>,
|
||||||
pub slow_query: Option<SlowQueryOptions>,
|
pub slow_query: Option<SlowQueryOptions>,
|
||||||
|
pub store: ObjectStoreConfig,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for FrontendOptions {
|
impl Default for FrontendOptions {
|
||||||
@@ -88,6 +90,7 @@ impl Default for FrontendOptions {
|
|||||||
query: QueryOptions::default(),
|
query: QueryOptions::default(),
|
||||||
max_in_flight_write_bytes: None,
|
max_in_flight_write_bytes: None,
|
||||||
slow_query: Some(SlowQueryOptions::default()),
|
slow_query: Some(SlowQueryOptions::default()),
|
||||||
|
store: ObjectStoreConfig::default(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -116,8 +119,7 @@ impl Frontend {
|
|||||||
if let Some(t) = self.export_metrics_task.as_ref() {
|
if let Some(t) = self.export_metrics_task.as_ref() {
|
||||||
if t.send_by_handler {
|
if t.send_by_handler {
|
||||||
let inserter = self.instance.inserter().clone();
|
let inserter = self.instance.inserter().clone();
|
||||||
let statement_executor = self.instance.statement_executor().clone();
|
let handler = ExportMetricHandler::new_handler(inserter);
|
||||||
let handler = ExportMetricHandler::new_handler(inserter, statement_executor);
|
|
||||||
t.start(Some(handler)).context(error::StartServerSnafu)?
|
t.start(Some(handler)).context(error::StartServerSnafu)?
|
||||||
} else {
|
} else {
|
||||||
t.start(None).context(error::StartServerSnafu)?;
|
t.start(None).context(error::StartServerSnafu)?;
|
||||||
|
|||||||
@@ -39,6 +39,7 @@ use common_config::KvBackendConfig;
|
|||||||
use common_error::ext::{BoxedError, ErrorExt};
|
use common_error::ext::{BoxedError, ErrorExt};
|
||||||
use common_meta::key::TableMetadataManagerRef;
|
use common_meta::key::TableMetadataManagerRef;
|
||||||
use common_meta::kv_backend::KvBackendRef;
|
use common_meta::kv_backend::KvBackendRef;
|
||||||
|
use common_meta::node_manager::NodeManagerRef;
|
||||||
use common_meta::state_store::KvStateStore;
|
use common_meta::state_store::KvStateStore;
|
||||||
use common_procedure::local::{LocalManager, ManagerConfig};
|
use common_procedure::local::{LocalManager, ManagerConfig};
|
||||||
use common_procedure::options::ProcedureConfig;
|
use common_procedure::options::ProcedureConfig;
|
||||||
@@ -49,7 +50,9 @@ use datafusion_expr::LogicalPlan;
|
|||||||
use log_store::raft_engine::RaftEngineBackend;
|
use log_store::raft_engine::RaftEngineBackend;
|
||||||
use operator::delete::DeleterRef;
|
use operator::delete::DeleterRef;
|
||||||
use operator::insert::InserterRef;
|
use operator::insert::InserterRef;
|
||||||
|
use operator::schema_helper::SchemaHelper;
|
||||||
use operator::statement::{StatementExecutor, StatementExecutorRef};
|
use operator::statement::{StatementExecutor, StatementExecutorRef};
|
||||||
|
use partition::manager::PartitionRuleManagerRef;
|
||||||
use pipeline::pipeline_operator::PipelineOperator;
|
use pipeline::pipeline_operator::PipelineOperator;
|
||||||
use prometheus::HistogramTimer;
|
use prometheus::HistogramTimer;
|
||||||
use promql_parser::label::Matcher;
|
use promql_parser::label::Matcher;
|
||||||
@@ -58,6 +61,7 @@ use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
|
|||||||
use query::query_engine::options::{validate_catalog_and_schema, QueryOptions};
|
use query::query_engine::options::{validate_catalog_and_schema, QueryOptions};
|
||||||
use query::query_engine::DescribeResult;
|
use query::query_engine::DescribeResult;
|
||||||
use query::QueryEngineRef;
|
use query::QueryEngineRef;
|
||||||
|
use servers::access_layer::AccessLayerFactory;
|
||||||
use servers::error as server_error;
|
use servers::error as server_error;
|
||||||
use servers::error::{AuthSnafu, ExecuteQuerySnafu, ParsePromQLSnafu};
|
use servers::error::{AuthSnafu, ExecuteQuerySnafu, ParsePromQLSnafu};
|
||||||
use servers::interceptor::{
|
use servers::interceptor::{
|
||||||
@@ -100,6 +104,7 @@ pub struct Instance {
|
|||||||
slow_query_recorder: Option<SlowQueryRecorder>,
|
slow_query_recorder: Option<SlowQueryRecorder>,
|
||||||
limiter: Option<LimiterRef>,
|
limiter: Option<LimiterRef>,
|
||||||
process_manager: ProcessManagerRef,
|
process_manager: ProcessManagerRef,
|
||||||
|
access_layer_factory: AccessLayerFactory,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Instance {
|
impl Instance {
|
||||||
@@ -161,6 +166,27 @@ impl Instance {
|
|||||||
pub fn process_manager(&self) -> &ProcessManagerRef {
|
pub fn process_manager(&self) -> &ProcessManagerRef {
|
||||||
&self.process_manager
|
&self.process_manager
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn create_schema_helper(&self) -> SchemaHelper {
|
||||||
|
SchemaHelper::new(
|
||||||
|
self.catalog_manager.clone(),
|
||||||
|
self.table_metadata_manager.clone(),
|
||||||
|
self.statement_executor.procedure_executor().clone(),
|
||||||
|
self.statement_executor.cache_invalidator().clone(),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn partition_manager(&self) -> &PartitionRuleManagerRef {
|
||||||
|
self.inserter.partition_manager()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn node_manager(&self) -> &NodeManagerRef {
|
||||||
|
self.inserter.node_manager()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn access_layer_factory(&self) -> &AccessLayerFactory {
|
||||||
|
&self.access_layer_factory
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn parse_stmt(sql: &str, dialect: &(dyn Dialect + Send + Sync)) -> Result<Vec<Statement>> {
|
fn parse_stmt(sql: &str, dialect: &(dyn Dialect + Send + Sync)) -> Result<Vec<Statement>> {
|
||||||
|
|||||||
@@ -30,12 +30,14 @@ use operator::flow::FlowServiceOperator;
|
|||||||
use operator::insert::Inserter;
|
use operator::insert::Inserter;
|
||||||
use operator::procedure::ProcedureServiceOperator;
|
use operator::procedure::ProcedureServiceOperator;
|
||||||
use operator::request::Requester;
|
use operator::request::Requester;
|
||||||
|
use operator::schema_helper::SchemaHelper;
|
||||||
use operator::statement::{StatementExecutor, StatementExecutorRef};
|
use operator::statement::{StatementExecutor, StatementExecutorRef};
|
||||||
use operator::table::TableMutationOperator;
|
use operator::table::TableMutationOperator;
|
||||||
use partition::manager::PartitionRuleManager;
|
use partition::manager::PartitionRuleManager;
|
||||||
use pipeline::pipeline_operator::PipelineOperator;
|
use pipeline::pipeline_operator::PipelineOperator;
|
||||||
use query::region_query::RegionQueryHandlerFactoryRef;
|
use query::region_query::RegionQueryHandlerFactoryRef;
|
||||||
use query::QueryEngineFactory;
|
use query::QueryEngineFactory;
|
||||||
|
use servers::access_layer::AccessLayerFactory;
|
||||||
use snafu::OptionExt;
|
use snafu::OptionExt;
|
||||||
|
|
||||||
use crate::error::{self, Result};
|
use crate::error::{self, Result};
|
||||||
@@ -130,8 +132,15 @@ impl FrontendBuilder {
|
|||||||
name: TABLE_FLOWNODE_SET_CACHE_NAME,
|
name: TABLE_FLOWNODE_SET_CACHE_NAME,
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
let inserter = Arc::new(Inserter::new(
|
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
|
||||||
|
let schema_helper = SchemaHelper::new(
|
||||||
self.catalog_manager.clone(),
|
self.catalog_manager.clone(),
|
||||||
|
table_metadata_manager.clone(),
|
||||||
|
self.procedure_executor.clone(),
|
||||||
|
local_cache_invalidator.clone(),
|
||||||
|
);
|
||||||
|
let inserter = Arc::new(Inserter::new(
|
||||||
|
schema_helper,
|
||||||
partition_manager.clone(),
|
partition_manager.clone(),
|
||||||
node_manager.clone(),
|
node_manager.clone(),
|
||||||
table_flownode_cache,
|
table_flownode_cache,
|
||||||
@@ -176,7 +185,7 @@ impl FrontendBuilder {
|
|||||||
self.catalog_manager.clone(),
|
self.catalog_manager.clone(),
|
||||||
query_engine.clone(),
|
query_engine.clone(),
|
||||||
self.procedure_executor,
|
self.procedure_executor,
|
||||||
kv_backend.clone(),
|
kv_backend,
|
||||||
local_cache_invalidator,
|
local_cache_invalidator,
|
||||||
inserter.clone(),
|
inserter.clone(),
|
||||||
table_route_cache,
|
table_route_cache,
|
||||||
@@ -211,6 +220,7 @@ impl FrontendBuilder {
|
|||||||
Arc::new(Limiter::new(max_in_flight_write_bytes.as_bytes()))
|
Arc::new(Limiter::new(max_in_flight_write_bytes.as_bytes()))
|
||||||
});
|
});
|
||||||
|
|
||||||
|
let access_layer_factory = AccessLayerFactory::new(&self.options.store).await.unwrap();
|
||||||
Ok(Instance {
|
Ok(Instance {
|
||||||
catalog_manager: self.catalog_manager,
|
catalog_manager: self.catalog_manager,
|
||||||
pipeline_operator,
|
pipeline_operator,
|
||||||
@@ -219,10 +229,11 @@ impl FrontendBuilder {
|
|||||||
plugins,
|
plugins,
|
||||||
inserter,
|
inserter,
|
||||||
deleter,
|
deleter,
|
||||||
table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend)),
|
table_metadata_manager,
|
||||||
slow_query_recorder,
|
slow_query_recorder,
|
||||||
limiter,
|
limiter,
|
||||||
process_manager,
|
process_manager,
|
||||||
|
access_layer_factory,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -408,7 +408,7 @@ impl Instance {
|
|||||||
ctx: QueryContextRef,
|
ctx: QueryContextRef,
|
||||||
) -> Result<Output> {
|
) -> Result<Output> {
|
||||||
self.inserter
|
self.inserter
|
||||||
.handle_column_inserts(requests, ctx, self.statement_executor.as_ref())
|
.handle_column_inserts(requests, ctx)
|
||||||
.await
|
.await
|
||||||
.context(TableOperationSnafu)
|
.context(TableOperationSnafu)
|
||||||
}
|
}
|
||||||
@@ -422,13 +422,7 @@ impl Instance {
|
|||||||
is_single_value: bool,
|
is_single_value: bool,
|
||||||
) -> Result<Output> {
|
) -> Result<Output> {
|
||||||
self.inserter
|
self.inserter
|
||||||
.handle_row_inserts(
|
.handle_row_inserts(requests, ctx, accommodate_existing_schema, is_single_value)
|
||||||
requests,
|
|
||||||
ctx,
|
|
||||||
self.statement_executor.as_ref(),
|
|
||||||
accommodate_existing_schema,
|
|
||||||
is_single_value,
|
|
||||||
)
|
|
||||||
.await
|
.await
|
||||||
.context(TableOperationSnafu)
|
.context(TableOperationSnafu)
|
||||||
}
|
}
|
||||||
@@ -441,10 +435,7 @@ impl Instance {
|
|||||||
) -> Result<Output> {
|
) -> Result<Output> {
|
||||||
self.inserter
|
self.inserter
|
||||||
.handle_last_non_null_inserts(
|
.handle_last_non_null_inserts(
|
||||||
requests,
|
requests, ctx, true,
|
||||||
ctx,
|
|
||||||
self.statement_executor.as_ref(),
|
|
||||||
true,
|
|
||||||
// Influx protocol may writes multiple fields (values).
|
// Influx protocol may writes multiple fields (values).
|
||||||
false,
|
false,
|
||||||
)
|
)
|
||||||
@@ -460,7 +451,7 @@ impl Instance {
|
|||||||
physical_table: String,
|
physical_table: String,
|
||||||
) -> Result<Output> {
|
) -> Result<Output> {
|
||||||
self.inserter
|
self.inserter
|
||||||
.handle_metric_row_inserts(requests, ctx, &self.statement_executor, physical_table)
|
.handle_metric_row_inserts(requests, ctx, physical_table)
|
||||||
.await
|
.await
|
||||||
.context(TableOperationSnafu)
|
.context(TableOperationSnafu)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -135,7 +135,7 @@ impl Instance {
|
|||||||
};
|
};
|
||||||
|
|
||||||
self.inserter
|
self.inserter
|
||||||
.handle_log_inserts(log, ctx, self.statement_executor.as_ref())
|
.handle_log_inserts(log, ctx)
|
||||||
.await
|
.await
|
||||||
.map_err(BoxedError::new)
|
.map_err(BoxedError::new)
|
||||||
.context(ExecuteGrpcRequestSnafu)
|
.context(ExecuteGrpcRequestSnafu)
|
||||||
@@ -157,7 +157,7 @@ impl Instance {
|
|||||||
};
|
};
|
||||||
|
|
||||||
self.inserter
|
self.inserter
|
||||||
.handle_trace_inserts(rows, ctx, self.statement_executor.as_ref())
|
.handle_trace_inserts(rows, ctx)
|
||||||
.await
|
.await
|
||||||
.map_err(BoxedError::new)
|
.map_err(BoxedError::new)
|
||||||
.context(ExecuteGrpcRequestSnafu)
|
.context(ExecuteGrpcRequestSnafu)
|
||||||
|
|||||||
@@ -28,7 +28,6 @@ use common_query::Output;
|
|||||||
use common_recordbatch::RecordBatches;
|
use common_recordbatch::RecordBatches;
|
||||||
use common_telemetry::{debug, tracing};
|
use common_telemetry::{debug, tracing};
|
||||||
use operator::insert::InserterRef;
|
use operator::insert::InserterRef;
|
||||||
use operator::statement::StatementExecutor;
|
|
||||||
use prost::Message;
|
use prost::Message;
|
||||||
use servers::error::{self, AuthSnafu, InFlightWriteBytesExceededSnafu, Result as ServerResult};
|
use servers::error::{self, AuthSnafu, InFlightWriteBytesExceededSnafu, Result as ServerResult};
|
||||||
use servers::http::header::{collect_plan_metrics, CONTENT_ENCODING_SNAPPY, CONTENT_TYPE_PROTOBUF};
|
use servers::http::header::{collect_plan_metrics, CONTENT_ENCODING_SNAPPY, CONTENT_TYPE_PROTOBUF};
|
||||||
@@ -271,18 +270,11 @@ impl PromStoreProtocolHandler for Instance {
|
|||||||
/// so only implement `PromStoreProtocolHandler::write` method.
|
/// so only implement `PromStoreProtocolHandler::write` method.
|
||||||
pub struct ExportMetricHandler {
|
pub struct ExportMetricHandler {
|
||||||
inserter: InserterRef,
|
inserter: InserterRef,
|
||||||
statement_executor: Arc<StatementExecutor>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ExportMetricHandler {
|
impl ExportMetricHandler {
|
||||||
pub fn new_handler(
|
pub fn new_handler(inserter: InserterRef) -> PromStoreProtocolHandlerRef {
|
||||||
inserter: InserterRef,
|
Arc::new(Self { inserter })
|
||||||
statement_executor: Arc<StatementExecutor>,
|
|
||||||
) -> PromStoreProtocolHandlerRef {
|
|
||||||
Arc::new(Self {
|
|
||||||
inserter,
|
|
||||||
statement_executor,
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -295,12 +287,7 @@ impl PromStoreProtocolHandler for ExportMetricHandler {
|
|||||||
_: bool,
|
_: bool,
|
||||||
) -> ServerResult<Output> {
|
) -> ServerResult<Output> {
|
||||||
self.inserter
|
self.inserter
|
||||||
.handle_metric_row_inserts(
|
.handle_metric_row_inserts(request, ctx, GREPTIME_PHYSICAL_TABLE.to_string())
|
||||||
request,
|
|
||||||
ctx,
|
|
||||||
&self.statement_executor,
|
|
||||||
GREPTIME_PHYSICAL_TABLE.to_string(),
|
|
||||||
)
|
|
||||||
.await
|
.await
|
||||||
.map_err(BoxedError::new)
|
.map_err(BoxedError::new)
|
||||||
.context(error::ExecuteGrpcQuerySnafu)
|
.context(error::ExecuteGrpcQuerySnafu)
|
||||||
|
|||||||
@@ -24,6 +24,7 @@ use servers::grpc::frontend_grpc_handler::FrontendGrpcHandler;
|
|||||||
use servers::grpc::greptime_handler::GreptimeRequestHandler;
|
use servers::grpc::greptime_handler::GreptimeRequestHandler;
|
||||||
use servers::grpc::{GrpcOptions, GrpcServer};
|
use servers::grpc::{GrpcOptions, GrpcServer};
|
||||||
use servers::http::event::LogValidatorRef;
|
use servers::http::event::LogValidatorRef;
|
||||||
|
use servers::http::prom_store::{PromBulkState, PromStoreState};
|
||||||
use servers::http::{HttpServer, HttpServerBuilder};
|
use servers::http::{HttpServer, HttpServerBuilder};
|
||||||
use servers::interceptor::LogIngestInterceptorRef;
|
use servers::interceptor::LogIngestInterceptorRef;
|
||||||
use servers::metrics_handler::MetricsHandler;
|
use servers::metrics_handler::MetricsHandler;
|
||||||
@@ -95,13 +96,30 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
if opts.prom_store.enable {
|
if opts.prom_store.enable {
|
||||||
|
let bulk_state = if opts.prom_store.bulk_mode {
|
||||||
|
let mut state = PromBulkState {
|
||||||
|
schema_helper: self.instance.create_schema_helper(),
|
||||||
|
partition_manager: self.instance.partition_manager().clone(),
|
||||||
|
node_manager: self.instance.node_manager().clone(),
|
||||||
|
access_layer_factory: self.instance.access_layer_factory().clone(),
|
||||||
|
tx: None,
|
||||||
|
};
|
||||||
|
state.start_background_task();
|
||||||
|
Some(state)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
|
let state = PromStoreState {
|
||||||
|
prom_store_handler: self.instance.clone(),
|
||||||
|
pipeline_handler: Some(self.instance.clone()),
|
||||||
|
prom_store_with_metric_engine: opts.prom_store.with_metric_engine,
|
||||||
|
prom_validation_mode: opts.http.prom_validation_mode,
|
||||||
|
bulk_state,
|
||||||
|
};
|
||||||
|
|
||||||
builder = builder
|
builder = builder
|
||||||
.with_prom_handler(
|
.with_prom_handler(state)
|
||||||
self.instance.clone(),
|
|
||||||
Some(self.instance.clone()),
|
|
||||||
opts.prom_store.with_metric_engine,
|
|
||||||
opts.http.prom_validation_mode,
|
|
||||||
)
|
|
||||||
.with_prometheus_handler(self.instance.clone());
|
.with_prometheus_handler(self.instance.clone());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -18,6 +18,7 @@ use serde::{Deserialize, Serialize};
|
|||||||
pub struct PromStoreOptions {
|
pub struct PromStoreOptions {
|
||||||
pub enable: bool,
|
pub enable: bool,
|
||||||
pub with_metric_engine: bool,
|
pub with_metric_engine: bool,
|
||||||
|
pub bulk_mode: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for PromStoreOptions {
|
impl Default for PromStoreOptions {
|
||||||
@@ -25,6 +26,7 @@ impl Default for PromStoreOptions {
|
|||||||
Self {
|
Self {
|
||||||
enable: true,
|
enable: true,
|
||||||
with_metric_engine: true,
|
with_metric_engine: true,
|
||||||
|
bulk_mode: false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -37,6 +39,7 @@ mod tests {
|
|||||||
fn test_prom_store_options() {
|
fn test_prom_store_options() {
|
||||||
let default = PromStoreOptions::default();
|
let default = PromStoreOptions::default();
|
||||||
assert!(default.enable);
|
assert!(default.enable);
|
||||||
assert!(default.with_metric_engine)
|
assert!(default.with_metric_engine);
|
||||||
|
assert!(!default.bulk_mode);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -233,7 +233,7 @@ impl SlowQueryEventHandler {
|
|||||||
.into();
|
.into();
|
||||||
|
|
||||||
self.inserter
|
self.inserter
|
||||||
.handle_row_inserts(requests, query_ctx, &self.statement_executor, false, false)
|
.handle_row_inserts(requests, query_ctx, false, false)
|
||||||
.await
|
.await
|
||||||
.context(TableOperationSnafu)?;
|
.context(TableOperationSnafu)?;
|
||||||
|
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ license.workspace = true
|
|||||||
workspace = true
|
workspace = true
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
ahash.workspace = true
|
||||||
api.workspace = true
|
api.workspace = true
|
||||||
aquamarine.workspace = true
|
aquamarine.workspace = true
|
||||||
async-stream.workspace = true
|
async-stream.workspace = true
|
||||||
|
|||||||
@@ -147,7 +147,7 @@ impl MetricEngineInner {
|
|||||||
fn modify_rows(
|
fn modify_rows(
|
||||||
&self,
|
&self,
|
||||||
physical_region_id: RegionId,
|
physical_region_id: RegionId,
|
||||||
table_id: TableId,
|
logical_table_id: TableId,
|
||||||
rows: &mut Rows,
|
rows: &mut Rows,
|
||||||
encoding: PrimaryKeyEncoding,
|
encoding: PrimaryKeyEncoding,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
@@ -163,7 +163,9 @@ impl MetricEngineInner {
|
|||||||
.physical_columns();
|
.physical_columns();
|
||||||
RowsIter::new(input, name_to_id)
|
RowsIter::new(input, name_to_id)
|
||||||
};
|
};
|
||||||
let output = self.row_modifier.modify_rows(iter, table_id, encoding)?;
|
let output = self
|
||||||
|
.row_modifier
|
||||||
|
.modify_rows(iter, logical_table_id, encoding)?;
|
||||||
*rows = output;
|
*rows = output;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -40,7 +40,7 @@ const TSID_HASH_SEED: u32 = 846793005;
|
|||||||
///
|
///
|
||||||
/// - For [`PrimaryKeyEncoding::Dense`] encoding,
|
/// - For [`PrimaryKeyEncoding::Dense`] encoding,
|
||||||
/// it adds two columns(`__table_id`, `__tsid`) to the row.
|
/// it adds two columns(`__table_id`, `__tsid`) to the row.
|
||||||
pub(crate) struct RowModifier {
|
pub struct RowModifier {
|
||||||
codec: SparsePrimaryKeyCodec,
|
codec: SparsePrimaryKeyCodec,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -52,7 +52,7 @@ impl RowModifier {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Modify rows with the given primary key encoding.
|
/// Modify rows with the given primary key encoding.
|
||||||
pub(crate) fn modify_rows(
|
pub fn modify_rows(
|
||||||
&self,
|
&self,
|
||||||
iter: RowsIter,
|
iter: RowsIter,
|
||||||
table_id: TableId,
|
table_id: TableId,
|
||||||
@@ -74,7 +74,7 @@ impl RowModifier {
|
|||||||
|
|
||||||
let mut buffer = vec![];
|
let mut buffer = vec![];
|
||||||
for mut iter in iter.iter_mut() {
|
for mut iter in iter.iter_mut() {
|
||||||
let (table_id, tsid) = self.fill_internal_columns(table_id, &iter);
|
let (table_id, tsid) = Self::fill_internal_columns(table_id, &iter);
|
||||||
let mut values = Vec::with_capacity(num_output_column);
|
let mut values = Vec::with_capacity(num_output_column);
|
||||||
buffer.clear();
|
buffer.clear();
|
||||||
let internal_columns = [
|
let internal_columns = [
|
||||||
@@ -135,7 +135,7 @@ impl RowModifier {
|
|||||||
options: None,
|
options: None,
|
||||||
});
|
});
|
||||||
for iter in iter.iter_mut() {
|
for iter in iter.iter_mut() {
|
||||||
let (table_id, tsid) = self.fill_internal_columns(table_id, &iter);
|
let (table_id, tsid) = Self::fill_internal_columns(table_id, &iter);
|
||||||
iter.row.values.push(table_id);
|
iter.row.values.push(table_id);
|
||||||
iter.row.values.push(tsid);
|
iter.row.values.push(tsid);
|
||||||
}
|
}
|
||||||
@@ -144,7 +144,7 @@ impl RowModifier {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Fills internal columns of a row with table name and a hash of tag values.
|
/// Fills internal columns of a row with table name and a hash of tag values.
|
||||||
fn fill_internal_columns(&self, table_id: TableId, iter: &RowIter<'_>) -> (Value, Value) {
|
pub fn fill_internal_columns(table_id: TableId, iter: &RowIter<'_>) -> (Value, Value) {
|
||||||
let mut hasher = TsidGenerator::default();
|
let mut hasher = TsidGenerator::default();
|
||||||
for (name, value) in iter.primary_keys_with_name() {
|
for (name, value) in iter.primary_keys_with_name() {
|
||||||
// The type is checked before. So only null is ignored.
|
// The type is checked before. So only null is ignored.
|
||||||
@@ -264,7 +264,7 @@ impl IterIndex {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Iterator of rows.
|
/// Iterator of rows.
|
||||||
pub(crate) struct RowsIter {
|
pub struct RowsIter {
|
||||||
rows: Rows,
|
rows: Rows,
|
||||||
index: IterIndex,
|
index: IterIndex,
|
||||||
}
|
}
|
||||||
@@ -276,7 +276,7 @@ impl RowsIter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the iterator of rows.
|
/// Returns the iterator of rows.
|
||||||
fn iter_mut(&mut self) -> impl Iterator<Item = RowIter> {
|
pub fn iter_mut(&mut self) -> impl Iterator<Item = RowIter> {
|
||||||
self.rows.rows.iter_mut().map(|row| RowIter {
|
self.rows.rows.iter_mut().map(|row| RowIter {
|
||||||
row,
|
row,
|
||||||
index: &self.index,
|
index: &self.index,
|
||||||
@@ -290,10 +290,22 @@ impl RowsIter {
|
|||||||
.iter()
|
.iter()
|
||||||
.map(|idx| std::mem::take(&mut self.rows.schema[idx.index]))
|
.map(|idx| std::mem::take(&mut self.rows.schema[idx.index]))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn num_rows(&self) -> usize {
|
||||||
|
self.rows.rows.len()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn num_columns(&self) -> usize {
|
||||||
|
self.rows.schema.len()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn num_primary_keys(&self) -> usize {
|
||||||
|
self.index.num_primary_key_column
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Iterator of a row.
|
/// Iterator of a row.
|
||||||
struct RowIter<'a> {
|
pub struct RowIter<'a> {
|
||||||
row: &'a mut Row,
|
row: &'a mut Row,
|
||||||
index: &'a IterIndex,
|
index: &'a IterIndex,
|
||||||
schema: &'a Vec<ColumnSchema>,
|
schema: &'a Vec<ColumnSchema>,
|
||||||
@@ -313,7 +325,7 @@ impl RowIter<'_> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the primary keys.
|
/// Returns the primary keys.
|
||||||
fn primary_keys(&self) -> impl Iterator<Item = (ColumnId, ValueRef)> {
|
pub fn primary_keys(&self) -> impl Iterator<Item = (ColumnId, ValueRef)> {
|
||||||
self.index.indices[..self.index.num_primary_key_column]
|
self.index.indices[..self.index.num_primary_key_column]
|
||||||
.iter()
|
.iter()
|
||||||
.map(|idx| {
|
.map(|idx| {
|
||||||
@@ -333,6 +345,13 @@ impl RowIter<'_> {
|
|||||||
.iter()
|
.iter()
|
||||||
.map(|idx| std::mem::take(&mut self.row.values[idx.index]))
|
.map(|idx| std::mem::take(&mut self.row.values[idx.index]))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns value at given offset.
|
||||||
|
/// # Panics
|
||||||
|
/// Panics if offset out-of-bound
|
||||||
|
pub fn value_at(&self, idx: usize) -> &Value {
|
||||||
|
&self.row.values[idx]
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
@@ -476,7 +495,6 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn test_fill_internal_columns() {
|
fn test_fill_internal_columns() {
|
||||||
let name_to_column_id = test_name_to_column_id();
|
let name_to_column_id = test_name_to_column_id();
|
||||||
let encoder = RowModifier::new();
|
|
||||||
let table_id = 1025;
|
let table_id = 1025;
|
||||||
let schema = test_schema();
|
let schema = test_schema();
|
||||||
let row = test_row("greptimedb", "127.0.0.1");
|
let row = test_row("greptimedb", "127.0.0.1");
|
||||||
@@ -486,7 +504,7 @@ mod tests {
|
|||||||
};
|
};
|
||||||
let mut rows_iter = RowsIter::new(rows, &name_to_column_id);
|
let mut rows_iter = RowsIter::new(rows, &name_to_column_id);
|
||||||
let row_iter = rows_iter.iter_mut().next().unwrap();
|
let row_iter = rows_iter.iter_mut().next().unwrap();
|
||||||
let (encoded_table_id, tsid) = encoder.fill_internal_columns(table_id, &row_iter);
|
let (encoded_table_id, tsid) = RowModifier::fill_internal_columns(table_id, &row_iter);
|
||||||
assert_eq!(encoded_table_id, ValueData::U32Value(1025).into());
|
assert_eq!(encoded_table_id, ValueData::U32Value(1025).into());
|
||||||
assert_eq!(tsid, ValueData::U64Value(9442261431637846000).into());
|
assert_eq!(tsid, ValueData::U64Value(9442261431637846000).into());
|
||||||
|
|
||||||
@@ -514,7 +532,7 @@ mod tests {
|
|||||||
};
|
};
|
||||||
let mut rows_iter = RowsIter::new(rows, &name_to_column_id);
|
let mut rows_iter = RowsIter::new(rows, &name_to_column_id);
|
||||||
let row_iter = rows_iter.iter_mut().next().unwrap();
|
let row_iter = rows_iter.iter_mut().next().unwrap();
|
||||||
let (encoded_table_id, tsid) = encoder.fill_internal_columns(table_id, &row_iter);
|
let (encoded_table_id, tsid) = RowModifier::fill_internal_columns(table_id, &row_iter);
|
||||||
assert_eq!(encoded_table_id, ValueData::U32Value(1025).into());
|
assert_eq!(encoded_table_id, ValueData::U32Value(1025).into());
|
||||||
assert_eq!(tsid, ValueData::U64Value(9442261431637846000).into());
|
assert_eq!(tsid, ValueData::U64Value(9442261431637846000).into());
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -41,7 +41,7 @@ pub const PARQUET_METADATA_KEY: &str = "greptime:metadata";
|
|||||||
/// Default batch size to read parquet files.
|
/// Default batch size to read parquet files.
|
||||||
pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 1024;
|
pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 1024;
|
||||||
/// Default row group size for parquet files.
|
/// Default row group size for parquet files.
|
||||||
pub(crate) const DEFAULT_ROW_GROUP_SIZE: usize = 100 * DEFAULT_READ_BATCH_SIZE;
|
pub const DEFAULT_ROW_GROUP_SIZE: usize = 100 * DEFAULT_READ_BATCH_SIZE;
|
||||||
|
|
||||||
/// Parquet write options.
|
/// Parquet write options.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
|
|||||||
@@ -860,6 +860,14 @@ pub enum Error {
|
|||||||
#[snafu(implicit)]
|
#[snafu(implicit)]
|
||||||
location: Location,
|
location: Location,
|
||||||
},
|
},
|
||||||
|
|
||||||
|
#[snafu(display("Failed to decode object from json"))]
|
||||||
|
DecodeJson {
|
||||||
|
#[snafu(source)]
|
||||||
|
error: serde_json::error::Error,
|
||||||
|
#[snafu(implicit)]
|
||||||
|
location: Location,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type Result<T> = std::result::Result<T, Error>;
|
pub type Result<T> = std::result::Result<T, Error>;
|
||||||
@@ -991,6 +999,7 @@ impl ErrorExt for Error {
|
|||||||
Error::InvalidProcessId { .. } => StatusCode::InvalidArguments,
|
Error::InvalidProcessId { .. } => StatusCode::InvalidArguments,
|
||||||
Error::ProcessManagerMissing { .. } => StatusCode::Unexpected,
|
Error::ProcessManagerMissing { .. } => StatusCode::Unexpected,
|
||||||
Error::PathNotFound { .. } => StatusCode::InvalidArguments,
|
Error::PathNotFound { .. } => StatusCode::InvalidArguments,
|
||||||
|
Error::DecodeJson { .. } => StatusCode::Unexpected,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -22,10 +22,9 @@ use api::v1::region::{
|
|||||||
RegionRequestHeader,
|
RegionRequestHeader,
|
||||||
};
|
};
|
||||||
use api::v1::{
|
use api::v1::{
|
||||||
AlterTableExpr, ColumnDataType, ColumnSchema, CreateTableExpr, InsertRequests,
|
AlterTableExpr, ColumnSchema, CreateTableExpr, InsertRequests, RowInsertRequest,
|
||||||
RowInsertRequest, RowInsertRequests, SemanticType,
|
RowInsertRequests, SemanticType,
|
||||||
};
|
};
|
||||||
use catalog::CatalogManagerRef;
|
|
||||||
use client::{OutputData, OutputMeta};
|
use client::{OutputData, OutputMeta};
|
||||||
use common_catalog::consts::{
|
use common_catalog::consts::{
|
||||||
default_engine, trace_services_table_name, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN,
|
default_engine, trace_services_table_name, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN,
|
||||||
@@ -35,7 +34,6 @@ use common_grpc_expr::util::ColumnExpr;
|
|||||||
use common_meta::cache::TableFlownodeSetCacheRef;
|
use common_meta::cache::TableFlownodeSetCacheRef;
|
||||||
use common_meta::node_manager::{AffectedRows, NodeManagerRef};
|
use common_meta::node_manager::{AffectedRows, NodeManagerRef};
|
||||||
use common_meta::peer::Peer;
|
use common_meta::peer::Peer;
|
||||||
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
|
|
||||||
use common_query::Output;
|
use common_query::Output;
|
||||||
use common_telemetry::tracing_context::TracingContext;
|
use common_telemetry::tracing_context::TracingContext;
|
||||||
use common_telemetry::{error, info, warn};
|
use common_telemetry::{error, info, warn};
|
||||||
@@ -49,9 +47,7 @@ use snafu::ResultExt;
|
|||||||
use sql::partition::partition_rule_for_hexstring;
|
use sql::partition::partition_rule_for_hexstring;
|
||||||
use sql::statements::create::Partitions;
|
use sql::statements::create::Partitions;
|
||||||
use sql::statements::insert::Insert;
|
use sql::statements::insert::Insert;
|
||||||
use store_api::metric_engine_consts::{
|
use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
|
||||||
LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY,
|
|
||||||
};
|
|
||||||
use store_api::mito_engine_options::{APPEND_MODE_KEY, MERGE_MODE_KEY};
|
use store_api::mito_engine_options::{APPEND_MODE_KEY, MERGE_MODE_KEY};
|
||||||
use store_api::storage::{RegionId, TableId};
|
use store_api::storage::{RegionId, TableId};
|
||||||
use table::metadata::TableInfo;
|
use table::metadata::TableInfo;
|
||||||
@@ -63,7 +59,7 @@ use table::table_reference::TableReference;
|
|||||||
use table::TableRef;
|
use table::TableRef;
|
||||||
|
|
||||||
use crate::error::{
|
use crate::error::{
|
||||||
CatalogSnafu, ColumnOptionsSnafu, CreatePartitionRulesSnafu, FindRegionLeaderSnafu,
|
ColumnOptionsSnafu, CreatePartitionRulesSnafu, FindRegionLeaderSnafu,
|
||||||
InvalidInsertRequestSnafu, JoinTaskSnafu, RequestInsertsSnafu, Result, TableNotFoundSnafu,
|
InvalidInsertRequestSnafu, JoinTaskSnafu, RequestInsertsSnafu, Result, TableNotFoundSnafu,
|
||||||
};
|
};
|
||||||
use crate::expr_helper;
|
use crate::expr_helper;
|
||||||
@@ -72,10 +68,10 @@ use crate::req_convert::common::preprocess_row_insert_requests;
|
|||||||
use crate::req_convert::insert::{
|
use crate::req_convert::insert::{
|
||||||
fill_reqs_with_impure_default, ColumnToRow, RowToRegion, StatementToRegion, TableToRegion,
|
fill_reqs_with_impure_default, ColumnToRow, RowToRegion, StatementToRegion, TableToRegion,
|
||||||
};
|
};
|
||||||
use crate::statement::StatementExecutor;
|
use crate::schema_helper::SchemaHelper;
|
||||||
|
|
||||||
pub struct Inserter {
|
pub struct Inserter {
|
||||||
catalog_manager: CatalogManagerRef,
|
pub(crate) schema_helper: SchemaHelper,
|
||||||
pub(crate) partition_manager: PartitionRuleManagerRef,
|
pub(crate) partition_manager: PartitionRuleManagerRef,
|
||||||
pub(crate) node_manager: NodeManagerRef,
|
pub(crate) node_manager: NodeManagerRef,
|
||||||
pub(crate) table_flownode_set_cache: TableFlownodeSetCacheRef,
|
pub(crate) table_flownode_set_cache: TableFlownodeSetCacheRef,
|
||||||
@@ -85,7 +81,7 @@ pub type InserterRef = Arc<Inserter>;
|
|||||||
|
|
||||||
/// Hint for the table type to create automatically.
|
/// Hint for the table type to create automatically.
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
enum AutoCreateTableType {
|
pub(crate) enum AutoCreateTableType {
|
||||||
/// A logical table with the physical table name.
|
/// A logical table with the physical table name.
|
||||||
Logical(String),
|
Logical(String),
|
||||||
/// A physical table.
|
/// A physical table.
|
||||||
@@ -127,27 +123,34 @@ pub struct InstantAndNormalInsertRequests {
|
|||||||
|
|
||||||
impl Inserter {
|
impl Inserter {
|
||||||
pub fn new(
|
pub fn new(
|
||||||
catalog_manager: CatalogManagerRef,
|
schema_helper: SchemaHelper,
|
||||||
partition_manager: PartitionRuleManagerRef,
|
partition_manager: PartitionRuleManagerRef,
|
||||||
node_manager: NodeManagerRef,
|
node_manager: NodeManagerRef,
|
||||||
table_flownode_set_cache: TableFlownodeSetCacheRef,
|
table_flownode_set_cache: TableFlownodeSetCacheRef,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
catalog_manager,
|
schema_helper,
|
||||||
partition_manager,
|
partition_manager,
|
||||||
node_manager,
|
node_manager,
|
||||||
table_flownode_set_cache,
|
table_flownode_set_cache,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn partition_manager(&self) -> &PartitionRuleManagerRef {
|
||||||
|
&self.partition_manager
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn node_manager(&self) -> &NodeManagerRef {
|
||||||
|
&self.node_manager
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn handle_column_inserts(
|
pub async fn handle_column_inserts(
|
||||||
&self,
|
&self,
|
||||||
requests: InsertRequests,
|
requests: InsertRequests,
|
||||||
ctx: QueryContextRef,
|
ctx: QueryContextRef,
|
||||||
statement_executor: &StatementExecutor,
|
|
||||||
) -> Result<Output> {
|
) -> Result<Output> {
|
||||||
let row_inserts = ColumnToRow::convert(requests)?;
|
let row_inserts = ColumnToRow::convert(requests)?;
|
||||||
self.handle_row_inserts(row_inserts, ctx, statement_executor, false, false)
|
self.handle_row_inserts(row_inserts, ctx, false, false)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -156,7 +159,6 @@ impl Inserter {
|
|||||||
&self,
|
&self,
|
||||||
mut requests: RowInsertRequests,
|
mut requests: RowInsertRequests,
|
||||||
ctx: QueryContextRef,
|
ctx: QueryContextRef,
|
||||||
statement_executor: &StatementExecutor,
|
|
||||||
accommodate_existing_schema: bool,
|
accommodate_existing_schema: bool,
|
||||||
is_single_value: bool,
|
is_single_value: bool,
|
||||||
) -> Result<Output> {
|
) -> Result<Output> {
|
||||||
@@ -164,7 +166,6 @@ impl Inserter {
|
|||||||
self.handle_row_inserts_with_create_type(
|
self.handle_row_inserts_with_create_type(
|
||||||
requests,
|
requests,
|
||||||
ctx,
|
ctx,
|
||||||
statement_executor,
|
|
||||||
AutoCreateTableType::Physical,
|
AutoCreateTableType::Physical,
|
||||||
accommodate_existing_schema,
|
accommodate_existing_schema,
|
||||||
is_single_value,
|
is_single_value,
|
||||||
@@ -177,12 +178,10 @@ impl Inserter {
|
|||||||
&self,
|
&self,
|
||||||
requests: RowInsertRequests,
|
requests: RowInsertRequests,
|
||||||
ctx: QueryContextRef,
|
ctx: QueryContextRef,
|
||||||
statement_executor: &StatementExecutor,
|
|
||||||
) -> Result<Output> {
|
) -> Result<Output> {
|
||||||
self.handle_row_inserts_with_create_type(
|
self.handle_row_inserts_with_create_type(
|
||||||
requests,
|
requests,
|
||||||
ctx,
|
ctx,
|
||||||
statement_executor,
|
|
||||||
AutoCreateTableType::Log,
|
AutoCreateTableType::Log,
|
||||||
false,
|
false,
|
||||||
false,
|
false,
|
||||||
@@ -194,12 +193,10 @@ impl Inserter {
|
|||||||
&self,
|
&self,
|
||||||
requests: RowInsertRequests,
|
requests: RowInsertRequests,
|
||||||
ctx: QueryContextRef,
|
ctx: QueryContextRef,
|
||||||
statement_executor: &StatementExecutor,
|
|
||||||
) -> Result<Output> {
|
) -> Result<Output> {
|
||||||
self.handle_row_inserts_with_create_type(
|
self.handle_row_inserts_with_create_type(
|
||||||
requests,
|
requests,
|
||||||
ctx,
|
ctx,
|
||||||
statement_executor,
|
|
||||||
AutoCreateTableType::Trace,
|
AutoCreateTableType::Trace,
|
||||||
false,
|
false,
|
||||||
false,
|
false,
|
||||||
@@ -212,14 +209,12 @@ impl Inserter {
|
|||||||
&self,
|
&self,
|
||||||
requests: RowInsertRequests,
|
requests: RowInsertRequests,
|
||||||
ctx: QueryContextRef,
|
ctx: QueryContextRef,
|
||||||
statement_executor: &StatementExecutor,
|
|
||||||
accommodate_existing_schema: bool,
|
accommodate_existing_schema: bool,
|
||||||
is_single_value: bool,
|
is_single_value: bool,
|
||||||
) -> Result<Output> {
|
) -> Result<Output> {
|
||||||
self.handle_row_inserts_with_create_type(
|
self.handle_row_inserts_with_create_type(
|
||||||
requests,
|
requests,
|
||||||
ctx,
|
ctx,
|
||||||
statement_executor,
|
|
||||||
AutoCreateTableType::LastNonNull,
|
AutoCreateTableType::LastNonNull,
|
||||||
accommodate_existing_schema,
|
accommodate_existing_schema,
|
||||||
is_single_value,
|
is_single_value,
|
||||||
@@ -232,7 +227,6 @@ impl Inserter {
|
|||||||
&self,
|
&self,
|
||||||
mut requests: RowInsertRequests,
|
mut requests: RowInsertRequests,
|
||||||
ctx: QueryContextRef,
|
ctx: QueryContextRef,
|
||||||
statement_executor: &StatementExecutor,
|
|
||||||
create_type: AutoCreateTableType,
|
create_type: AutoCreateTableType,
|
||||||
accommodate_existing_schema: bool,
|
accommodate_existing_schema: bool,
|
||||||
is_single_value: bool,
|
is_single_value: bool,
|
||||||
@@ -254,7 +248,6 @@ impl Inserter {
|
|||||||
&mut requests,
|
&mut requests,
|
||||||
&ctx,
|
&ctx,
|
||||||
create_type,
|
create_type,
|
||||||
statement_executor,
|
|
||||||
accommodate_existing_schema,
|
accommodate_existing_schema,
|
||||||
is_single_value,
|
is_single_value,
|
||||||
)
|
)
|
||||||
@@ -280,7 +273,6 @@ impl Inserter {
|
|||||||
&self,
|
&self,
|
||||||
mut requests: RowInsertRequests,
|
mut requests: RowInsertRequests,
|
||||||
ctx: QueryContextRef,
|
ctx: QueryContextRef,
|
||||||
statement_executor: &StatementExecutor,
|
|
||||||
physical_table: String,
|
physical_table: String,
|
||||||
) -> Result<Output> {
|
) -> Result<Output> {
|
||||||
// remove empty requests
|
// remove empty requests
|
||||||
@@ -293,7 +285,8 @@ impl Inserter {
|
|||||||
validate_column_count_match(&requests)?;
|
validate_column_count_match(&requests)?;
|
||||||
|
|
||||||
// check and create physical table
|
// check and create physical table
|
||||||
self.create_physical_table_on_demand(&ctx, physical_table.clone(), statement_executor)
|
self.schema_helper
|
||||||
|
.create_metric_physical_table(&ctx, physical_table.clone())
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
// check and create logical tables
|
// check and create logical tables
|
||||||
@@ -305,7 +298,6 @@ impl Inserter {
|
|||||||
&mut requests,
|
&mut requests,
|
||||||
&ctx,
|
&ctx,
|
||||||
AutoCreateTableType::Logical(physical_table.to_string()),
|
AutoCreateTableType::Logical(physical_table.to_string()),
|
||||||
statement_executor,
|
|
||||||
true,
|
true,
|
||||||
true,
|
true,
|
||||||
)
|
)
|
||||||
@@ -350,8 +342,11 @@ impl Inserter {
|
|||||||
insert: &Insert,
|
insert: &Insert,
|
||||||
ctx: &QueryContextRef,
|
ctx: &QueryContextRef,
|
||||||
) -> Result<Output> {
|
) -> Result<Output> {
|
||||||
let (inserts, table_info) =
|
let (inserts, table_info) = StatementToRegion::new(
|
||||||
StatementToRegion::new(self.catalog_manager.as_ref(), &self.partition_manager, ctx)
|
self.schema_helper.catalog_manager().as_ref(),
|
||||||
|
&self.partition_manager,
|
||||||
|
ctx,
|
||||||
|
)
|
||||||
.convert(insert, ctx)
|
.convert(insert, ctx)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
@@ -482,7 +477,6 @@ impl Inserter {
|
|||||||
requests: &mut RowInsertRequests,
|
requests: &mut RowInsertRequests,
|
||||||
ctx: &QueryContextRef,
|
ctx: &QueryContextRef,
|
||||||
auto_create_table_type: AutoCreateTableType,
|
auto_create_table_type: AutoCreateTableType,
|
||||||
statement_executor: &StatementExecutor,
|
|
||||||
accommodate_existing_schema: bool,
|
accommodate_existing_schema: bool,
|
||||||
is_single_value: bool,
|
is_single_value: bool,
|
||||||
) -> Result<CreateAlterTableResult> {
|
) -> Result<CreateAlterTableResult> {
|
||||||
@@ -543,7 +537,7 @@ impl Inserter {
|
|||||||
instant_table_ids.insert(table_info.table_id());
|
instant_table_ids.insert(table_info.table_id());
|
||||||
}
|
}
|
||||||
table_infos.insert(table_info.table_id(), table.table_info());
|
table_infos.insert(table_info.table_id(), table.table_info());
|
||||||
if let Some(alter_expr) = self.get_alter_table_expr_on_demand(
|
if let Some(alter_expr) = Self::get_alter_table_expr_on_demand(
|
||||||
req,
|
req,
|
||||||
&table,
|
&table,
|
||||||
ctx,
|
ctx,
|
||||||
@@ -565,9 +559,7 @@ impl Inserter {
|
|||||||
AutoCreateTableType::Logical(_) => {
|
AutoCreateTableType::Logical(_) => {
|
||||||
if !create_tables.is_empty() {
|
if !create_tables.is_empty() {
|
||||||
// Creates logical tables in batch.
|
// Creates logical tables in batch.
|
||||||
let tables = self
|
let tables = self.create_logical_tables(create_tables, ctx).await?;
|
||||||
.create_logical_tables(create_tables, ctx, statement_executor)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
for table in tables {
|
for table in tables {
|
||||||
let table_info = table.table_info();
|
let table_info = table.table_info();
|
||||||
@@ -579,7 +571,7 @@ impl Inserter {
|
|||||||
}
|
}
|
||||||
if !alter_tables.is_empty() {
|
if !alter_tables.is_empty() {
|
||||||
// Alter logical tables in batch.
|
// Alter logical tables in batch.
|
||||||
statement_executor
|
self.schema_helper
|
||||||
.alter_logical_tables(alter_tables, ctx.clone())
|
.alter_logical_tables(alter_tables, ctx.clone())
|
||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
@@ -590,9 +582,7 @@ impl Inserter {
|
|||||||
// note that auto create table shouldn't be ttl instant table
|
// note that auto create table shouldn't be ttl instant table
|
||||||
// for it's a very unexpected behavior and should be set by user explicitly
|
// for it's a very unexpected behavior and should be set by user explicitly
|
||||||
for create_table in create_tables {
|
for create_table in create_tables {
|
||||||
let table = self
|
let table = self.create_physical_table(create_table, None, ctx).await?;
|
||||||
.create_physical_table(create_table, None, ctx, statement_executor)
|
|
||||||
.await?;
|
|
||||||
let table_info = table.table_info();
|
let table_info = table.table_info();
|
||||||
if table_info.is_ttl_instant_table() {
|
if table_info.is_ttl_instant_table() {
|
||||||
instant_table_ids.insert(table_info.table_id());
|
instant_table_ids.insert(table_info.table_id());
|
||||||
@@ -600,8 +590,8 @@ impl Inserter {
|
|||||||
table_infos.insert(table_info.table_id(), table.table_info());
|
table_infos.insert(table_info.table_id(), table.table_info());
|
||||||
}
|
}
|
||||||
for alter_expr in alter_tables.into_iter() {
|
for alter_expr in alter_tables.into_iter() {
|
||||||
statement_executor
|
self.schema_helper
|
||||||
.alter_table_inner(alter_expr, ctx.clone())
|
.alter_table_by_expr(alter_expr, ctx.clone())
|
||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -619,9 +609,7 @@ impl Inserter {
|
|||||||
create_table
|
create_table
|
||||||
.table_options
|
.table_options
|
||||||
.insert(APPEND_MODE_KEY.to_string(), "false".to_string());
|
.insert(APPEND_MODE_KEY.to_string(), "false".to_string());
|
||||||
let table = self
|
let table = self.create_physical_table(create_table, None, ctx).await?;
|
||||||
.create_physical_table(create_table, None, ctx, statement_executor)
|
|
||||||
.await?;
|
|
||||||
let table_info = table.table_info();
|
let table_info = table.table_info();
|
||||||
if table_info.is_ttl_instant_table() {
|
if table_info.is_ttl_instant_table() {
|
||||||
instant_table_ids.insert(table_info.table_id());
|
instant_table_ids.insert(table_info.table_id());
|
||||||
@@ -662,12 +650,7 @@ impl Inserter {
|
|||||||
);
|
);
|
||||||
|
|
||||||
let table = self
|
let table = self
|
||||||
.create_physical_table(
|
.create_physical_table(create_table, Some(partitions), ctx)
|
||||||
create_table,
|
|
||||||
Some(partitions),
|
|
||||||
ctx,
|
|
||||||
statement_executor,
|
|
||||||
)
|
|
||||||
.await?;
|
.await?;
|
||||||
let table_info = table.table_info();
|
let table_info = table.table_info();
|
||||||
if table_info.is_ttl_instant_table() {
|
if table_info.is_ttl_instant_table() {
|
||||||
@@ -677,8 +660,8 @@ impl Inserter {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
for alter_expr in alter_tables.into_iter() {
|
for alter_expr in alter_tables.into_iter() {
|
||||||
statement_executor
|
self.schema_helper
|
||||||
.alter_table_inner(alter_expr, ctx.clone())
|
.alter_table_by_expr(alter_expr, ctx.clone())
|
||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -690,79 +673,13 @@ impl Inserter {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn create_physical_table_on_demand(
|
|
||||||
&self,
|
|
||||||
ctx: &QueryContextRef,
|
|
||||||
physical_table: String,
|
|
||||||
statement_executor: &StatementExecutor,
|
|
||||||
) -> Result<()> {
|
|
||||||
let catalog_name = ctx.current_catalog();
|
|
||||||
let schema_name = ctx.current_schema();
|
|
||||||
|
|
||||||
// check if exist
|
|
||||||
if self
|
|
||||||
.get_table(catalog_name, &schema_name, &physical_table)
|
|
||||||
.await?
|
|
||||||
.is_some()
|
|
||||||
{
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
let table_reference = TableReference::full(catalog_name, &schema_name, &physical_table);
|
|
||||||
info!("Physical metric table `{table_reference}` does not exist, try creating table");
|
|
||||||
|
|
||||||
// schema with timestamp and field column
|
|
||||||
let default_schema = vec![
|
|
||||||
ColumnSchema {
|
|
||||||
column_name: GREPTIME_TIMESTAMP.to_string(),
|
|
||||||
datatype: ColumnDataType::TimestampMillisecond as _,
|
|
||||||
semantic_type: SemanticType::Timestamp as _,
|
|
||||||
datatype_extension: None,
|
|
||||||
options: None,
|
|
||||||
},
|
|
||||||
ColumnSchema {
|
|
||||||
column_name: GREPTIME_VALUE.to_string(),
|
|
||||||
datatype: ColumnDataType::Float64 as _,
|
|
||||||
semantic_type: SemanticType::Field as _,
|
|
||||||
datatype_extension: None,
|
|
||||||
options: None,
|
|
||||||
},
|
|
||||||
];
|
|
||||||
let create_table_expr =
|
|
||||||
&mut build_create_table_expr(&table_reference, &default_schema, default_engine())?;
|
|
||||||
|
|
||||||
create_table_expr.engine = METRIC_ENGINE_NAME.to_string();
|
|
||||||
create_table_expr
|
|
||||||
.table_options
|
|
||||||
.insert(PHYSICAL_TABLE_METADATA_KEY.to_string(), "true".to_string());
|
|
||||||
|
|
||||||
// create physical table
|
|
||||||
let res = statement_executor
|
|
||||||
.create_table_inner(create_table_expr, None, ctx.clone())
|
|
||||||
.await;
|
|
||||||
|
|
||||||
match res {
|
|
||||||
Ok(_) => {
|
|
||||||
info!("Successfully created table {table_reference}",);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
Err(err) => {
|
|
||||||
error!(err; "Failed to create table {table_reference}");
|
|
||||||
Err(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn get_table(
|
async fn get_table(
|
||||||
&self,
|
&self,
|
||||||
catalog: &str,
|
catalog: &str,
|
||||||
schema: &str,
|
schema: &str,
|
||||||
table: &str,
|
table: &str,
|
||||||
) -> Result<Option<TableRef>> {
|
) -> Result<Option<TableRef>> {
|
||||||
self.catalog_manager
|
self.schema_helper.get_table(catalog, schema, table).await
|
||||||
.table(catalog, schema, table, None)
|
|
||||||
.await
|
|
||||||
.context(CatalogSnafu)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_create_table_expr_on_demand(
|
fn get_create_table_expr_on_demand(
|
||||||
@@ -771,38 +688,9 @@ impl Inserter {
|
|||||||
create_type: &AutoCreateTableType,
|
create_type: &AutoCreateTableType,
|
||||||
ctx: &QueryContextRef,
|
ctx: &QueryContextRef,
|
||||||
) -> Result<CreateTableExpr> {
|
) -> Result<CreateTableExpr> {
|
||||||
let mut table_options = Vec::with_capacity(4);
|
|
||||||
for key in VALID_TABLE_OPTION_KEYS {
|
|
||||||
if let Some(value) = ctx.extension(key) {
|
|
||||||
table_options.push((key, value));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut engine_name = default_engine();
|
let mut engine_name = default_engine();
|
||||||
match create_type {
|
if matches!(create_type, AutoCreateTableType::Logical(_)) {
|
||||||
AutoCreateTableType::Logical(physical_table) => {
|
|
||||||
engine_name = METRIC_ENGINE_NAME;
|
engine_name = METRIC_ENGINE_NAME;
|
||||||
table_options.push((LOGICAL_TABLE_METADATA_KEY, physical_table));
|
|
||||||
}
|
|
||||||
AutoCreateTableType::Physical => {
|
|
||||||
if let Some(append_mode) = ctx.extension(APPEND_MODE_KEY) {
|
|
||||||
table_options.push((APPEND_MODE_KEY, append_mode));
|
|
||||||
}
|
|
||||||
if let Some(merge_mode) = ctx.extension(MERGE_MODE_KEY) {
|
|
||||||
table_options.push((MERGE_MODE_KEY, merge_mode));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Set append_mode to true for log table.
|
|
||||||
// because log tables should keep rows with the same ts and tags.
|
|
||||||
AutoCreateTableType::Log => {
|
|
||||||
table_options.push((APPEND_MODE_KEY, "true"));
|
|
||||||
}
|
|
||||||
AutoCreateTableType::LastNonNull => {
|
|
||||||
table_options.push((MERGE_MODE_KEY, "last_non_null"));
|
|
||||||
}
|
|
||||||
AutoCreateTableType::Trace => {
|
|
||||||
table_options.push((APPEND_MODE_KEY, "true"));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let schema = ctx.current_schema();
|
let schema = ctx.current_schema();
|
||||||
@@ -813,11 +701,9 @@ impl Inserter {
|
|||||||
build_create_table_expr(&table_ref, request_schema, engine_name)?;
|
build_create_table_expr(&table_ref, request_schema, engine_name)?;
|
||||||
|
|
||||||
info!("Table `{table_ref}` does not exist, try creating table");
|
info!("Table `{table_ref}` does not exist, try creating table");
|
||||||
for (k, v) in table_options {
|
|
||||||
create_table_expr
|
// Use the common fill_table_options_for_create function to populate table options
|
||||||
.table_options
|
fill_table_options_for_create(&mut create_table_expr.table_options, create_type, ctx);
|
||||||
.insert(k.to_string(), v.to_string());
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(create_table_expr)
|
Ok(create_table_expr)
|
||||||
}
|
}
|
||||||
@@ -830,7 +716,6 @@ impl Inserter {
|
|||||||
/// When `accommodate_existing_schema` is true and `is_single_value` is true, it also consider fields when modifying the
|
/// When `accommodate_existing_schema` is true and `is_single_value` is true, it also consider fields when modifying the
|
||||||
/// input `req`.
|
/// input `req`.
|
||||||
fn get_alter_table_expr_on_demand(
|
fn get_alter_table_expr_on_demand(
|
||||||
&self,
|
|
||||||
req: &mut RowInsertRequest,
|
req: &mut RowInsertRequest,
|
||||||
table: &TableRef,
|
table: &TableRef,
|
||||||
ctx: &QueryContextRef,
|
ctx: &QueryContextRef,
|
||||||
@@ -918,7 +803,6 @@ impl Inserter {
|
|||||||
mut create_table_expr: CreateTableExpr,
|
mut create_table_expr: CreateTableExpr,
|
||||||
partitions: Option<Partitions>,
|
partitions: Option<Partitions>,
|
||||||
ctx: &QueryContextRef,
|
ctx: &QueryContextRef,
|
||||||
statement_executor: &StatementExecutor,
|
|
||||||
) -> Result<TableRef> {
|
) -> Result<TableRef> {
|
||||||
{
|
{
|
||||||
let table_ref = TableReference::full(
|
let table_ref = TableReference::full(
|
||||||
@@ -929,8 +813,9 @@ impl Inserter {
|
|||||||
|
|
||||||
info!("Table `{table_ref}` does not exist, try creating table");
|
info!("Table `{table_ref}` does not exist, try creating table");
|
||||||
}
|
}
|
||||||
let res = statement_executor
|
let res = self
|
||||||
.create_table_inner(&mut create_table_expr, partitions, ctx.clone())
|
.schema_helper
|
||||||
|
.create_table_by_expr(&mut create_table_expr, partitions, ctx.clone())
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
let table_ref = TableReference::full(
|
let table_ref = TableReference::full(
|
||||||
@@ -958,9 +843,9 @@ impl Inserter {
|
|||||||
&self,
|
&self,
|
||||||
create_table_exprs: Vec<CreateTableExpr>,
|
create_table_exprs: Vec<CreateTableExpr>,
|
||||||
ctx: &QueryContextRef,
|
ctx: &QueryContextRef,
|
||||||
statement_executor: &StatementExecutor,
|
|
||||||
) -> Result<Vec<TableRef>> {
|
) -> Result<Vec<TableRef>> {
|
||||||
let res = statement_executor
|
let res = self
|
||||||
|
.schema_helper
|
||||||
.create_logical_tables(&create_table_exprs, ctx.clone())
|
.create_logical_tables(&create_table_exprs, ctx.clone())
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
@@ -1011,7 +896,49 @@ fn validate_column_count_match(requests: &RowInsertRequests) -> Result<()> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn build_create_table_expr(
|
/// Fill table options for a new table by create type.
|
||||||
|
pub(crate) fn fill_table_options_for_create(
|
||||||
|
table_options: &mut std::collections::HashMap<String, String>,
|
||||||
|
create_type: &AutoCreateTableType,
|
||||||
|
ctx: &QueryContextRef,
|
||||||
|
) {
|
||||||
|
for key in VALID_TABLE_OPTION_KEYS {
|
||||||
|
if let Some(value) = ctx.extension(key) {
|
||||||
|
table_options.insert(key.to_string(), value.to_string());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
match create_type {
|
||||||
|
AutoCreateTableType::Logical(physical_table) => {
|
||||||
|
table_options.insert(
|
||||||
|
LOGICAL_TABLE_METADATA_KEY.to_string(),
|
||||||
|
physical_table.to_string(),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
AutoCreateTableType::Physical => {
|
||||||
|
if let Some(append_mode) = ctx.extension(APPEND_MODE_KEY) {
|
||||||
|
table_options.insert(APPEND_MODE_KEY.to_string(), append_mode.to_string());
|
||||||
|
}
|
||||||
|
if let Some(merge_mode) = ctx.extension(MERGE_MODE_KEY) {
|
||||||
|
table_options.insert(MERGE_MODE_KEY.to_string(), merge_mode.to_string());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Set append_mode to true for log table.
|
||||||
|
// because log tables should keep rows with the same ts and tags.
|
||||||
|
AutoCreateTableType::Log => {
|
||||||
|
table_options.insert(APPEND_MODE_KEY.to_string(), "true".to_string());
|
||||||
|
}
|
||||||
|
AutoCreateTableType::LastNonNull => {
|
||||||
|
table_options.insert(MERGE_MODE_KEY.to_string(), "last_non_null".to_string());
|
||||||
|
}
|
||||||
|
AutoCreateTableType::Trace => {
|
||||||
|
table_options.insert(APPEND_MODE_KEY.to_string(), "true".to_string());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Builds a [CreateTableExpr] for the given table and schema.
|
||||||
|
pub(crate) fn build_create_table_expr(
|
||||||
table: &TableReference,
|
table: &TableReference,
|
||||||
request_schema: &[ColumnSchema],
|
request_schema: &[ColumnSchema],
|
||||||
engine: &str,
|
engine: &str,
|
||||||
@@ -1144,19 +1071,14 @@ mod tests {
|
|||||||
|
|
||||||
use api::v1::{ColumnSchema as GrpcColumnSchema, RowInsertRequest, Rows, SemanticType, Value};
|
use api::v1::{ColumnSchema as GrpcColumnSchema, RowInsertRequest, Rows, SemanticType, Value};
|
||||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||||
use common_meta::cache::new_table_flownode_set_cache;
|
|
||||||
use common_meta::ddl::test_util::datanode_handler::NaiveDatanodeHandler;
|
|
||||||
use common_meta::test_util::MockDatanodeManager;
|
|
||||||
use datatypes::data_type::ConcreteDataType;
|
use datatypes::data_type::ConcreteDataType;
|
||||||
use datatypes::schema::ColumnSchema;
|
use datatypes::schema::ColumnSchema;
|
||||||
use moka::future::Cache;
|
|
||||||
use session::context::QueryContext;
|
use session::context::QueryContext;
|
||||||
use table::dist_table::DummyDataSource;
|
use table::dist_table::DummyDataSource;
|
||||||
use table::metadata::{TableInfoBuilder, TableMetaBuilder, TableType};
|
use table::metadata::{TableInfoBuilder, TableMetaBuilder, TableType};
|
||||||
use table::TableRef;
|
use table::TableRef;
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::tests::{create_partition_rule_manager, prepare_mocked_backend};
|
|
||||||
|
|
||||||
fn make_table_ref_with_schema(ts_name: &str, field_name: &str) -> TableRef {
|
fn make_table_ref_with_schema(ts_name: &str, field_name: &str) -> TableRef {
|
||||||
let schema = datatypes::schema::SchemaBuilder::try_from_columns(vec![
|
let schema = datatypes::schema::SchemaBuilder::try_from_columns(vec![
|
||||||
@@ -1236,20 +1158,8 @@ mod tests {
|
|||||||
DEFAULT_SCHEMA_NAME,
|
DEFAULT_SCHEMA_NAME,
|
||||||
));
|
));
|
||||||
|
|
||||||
let kv_backend = prepare_mocked_backend().await;
|
let alter_expr =
|
||||||
let inserter = Inserter::new(
|
Inserter::get_alter_table_expr_on_demand(&mut req, &table, &ctx, true, true).unwrap();
|
||||||
catalog::memory::MemoryCatalogManager::new(),
|
|
||||||
create_partition_rule_manager(kv_backend.clone()).await,
|
|
||||||
Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)),
|
|
||||||
Arc::new(new_table_flownode_set_cache(
|
|
||||||
String::new(),
|
|
||||||
Cache::new(100),
|
|
||||||
kv_backend.clone(),
|
|
||||||
)),
|
|
||||||
);
|
|
||||||
let alter_expr = inserter
|
|
||||||
.get_alter_table_expr_on_demand(&mut req, &table, &ctx, true, true)
|
|
||||||
.unwrap();
|
|
||||||
assert!(alter_expr.is_none());
|
assert!(alter_expr.is_none());
|
||||||
|
|
||||||
// The request's schema should have updated names for timestamp and field columns
|
// The request's schema should have updated names for timestamp and field columns
|
||||||
|
|||||||
@@ -27,6 +27,7 @@ pub mod procedure;
|
|||||||
pub mod region_req_factory;
|
pub mod region_req_factory;
|
||||||
pub mod req_convert;
|
pub mod req_convert;
|
||||||
pub mod request;
|
pub mod request;
|
||||||
|
pub mod schema_helper;
|
||||||
pub mod statement;
|
pub mod statement;
|
||||||
pub mod table;
|
pub mod table;
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|||||||
799
src/operator/src/schema_helper.rs
Normal file
799
src/operator/src/schema_helper.rs
Normal file
@@ -0,0 +1,799 @@
|
|||||||
|
// Copyright 2023 Greptime Team
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
//! Utilities to deal with table schemas.
|
||||||
|
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use api::v1::alter_table_expr::Kind;
|
||||||
|
use api::v1::region::region_request::Body;
|
||||||
|
use api::v1::region::{ListMetadataRequest, RegionRequestHeader};
|
||||||
|
use api::v1::{AlterTableExpr, ColumnDataType, ColumnSchema, CreateTableExpr, SemanticType};
|
||||||
|
use catalog::CatalogManagerRef;
|
||||||
|
use common_catalog::consts::{
|
||||||
|
default_engine, is_readonly_schema, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME,
|
||||||
|
};
|
||||||
|
use common_catalog::format_full_table_name;
|
||||||
|
use common_grpc_expr::util::ColumnExpr;
|
||||||
|
use common_meta::cache_invalidator::{CacheInvalidatorRef, Context};
|
||||||
|
use common_meta::ddl::{ExecutorContext, ProcedureExecutorRef};
|
||||||
|
use common_meta::instruction::CacheIdent;
|
||||||
|
use common_meta::key::schema_name::SchemaNameKey;
|
||||||
|
use common_meta::key::table_route::TableRouteManager;
|
||||||
|
use common_meta::key::TableMetadataManagerRef;
|
||||||
|
use common_meta::node_manager::NodeManagerRef;
|
||||||
|
use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse};
|
||||||
|
use common_meta::rpc::router::Partition;
|
||||||
|
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
|
||||||
|
use common_query::Output;
|
||||||
|
use common_telemetry::tracing;
|
||||||
|
use common_telemetry::tracing_context::TracingContext;
|
||||||
|
use futures::future;
|
||||||
|
use partition::manager::PartitionRuleManagerRef;
|
||||||
|
use session::context::QueryContextRef;
|
||||||
|
use snafu::{ensure, OptionExt, ResultExt};
|
||||||
|
use sql::statements::create::Partitions;
|
||||||
|
use store_api::metadata::RegionMetadata;
|
||||||
|
use store_api::metric_engine_consts::{
|
||||||
|
LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY,
|
||||||
|
};
|
||||||
|
use store_api::storage::RegionId;
|
||||||
|
use table::dist_table::DistTable;
|
||||||
|
use table::metadata::{RawTableInfo, TableId, TableInfo};
|
||||||
|
use table::table_name::TableName;
|
||||||
|
use table::table_reference::TableReference;
|
||||||
|
use table::TableRef;
|
||||||
|
|
||||||
|
use crate::error::{
|
||||||
|
CatalogSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu, DecodeJsonSnafu,
|
||||||
|
EmptyDdlExprSnafu, ExecuteDdlSnafu, FindRegionLeaderSnafu, InvalidPartitionRuleSnafu,
|
||||||
|
InvalidTableNameSnafu, InvalidateTableCacheSnafu, JoinTaskSnafu, RequestRegionSnafu, Result,
|
||||||
|
SchemaNotFoundSnafu, SchemaReadOnlySnafu, TableAlreadyExistsSnafu, TableMetadataManagerSnafu,
|
||||||
|
TableNotFoundSnafu, UnexpectedSnafu,
|
||||||
|
};
|
||||||
|
use crate::expr_helper;
|
||||||
|
use crate::insert::{build_create_table_expr, fill_table_options_for_create, AutoCreateTableType};
|
||||||
|
use crate::region_req_factory::RegionRequestFactory;
|
||||||
|
use crate::statement::ddl::{create_table_info, parse_partitions, verify_alter, NAME_PATTERN_REG};
|
||||||
|
|
||||||
|
/// Helper to query and manipulate (CREATE/ALTER) table schemas.
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct SchemaHelper {
|
||||||
|
catalog_manager: CatalogManagerRef,
|
||||||
|
table_metadata_manager: TableMetadataManagerRef,
|
||||||
|
procedure_executor: ProcedureExecutorRef,
|
||||||
|
cache_invalidator: CacheInvalidatorRef,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SchemaHelper {
|
||||||
|
/// Creates a new [`SchemaHelper`].
|
||||||
|
pub fn new(
|
||||||
|
catalog_manager: CatalogManagerRef,
|
||||||
|
table_metadata_manager: TableMetadataManagerRef,
|
||||||
|
procedure_executor: ProcedureExecutorRef,
|
||||||
|
cache_invalidator: CacheInvalidatorRef,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
catalog_manager,
|
||||||
|
table_metadata_manager,
|
||||||
|
procedure_executor,
|
||||||
|
cache_invalidator,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Gets the table by catalog, schema and table name.
|
||||||
|
pub async fn get_table(
|
||||||
|
&self,
|
||||||
|
catalog: &str,
|
||||||
|
schema: &str,
|
||||||
|
table: &str,
|
||||||
|
) -> Result<Option<TableRef>> {
|
||||||
|
self.catalog_manager
|
||||||
|
.table(catalog, schema, table, None)
|
||||||
|
.await
|
||||||
|
.context(CatalogSnafu)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(yingwen): Can we create the physical table with all columns from the prometheus metrics?
|
||||||
|
/// Creates a physical table for metric engine.
|
||||||
|
///
|
||||||
|
/// If table already exists, do nothing.
|
||||||
|
pub async fn create_metric_physical_table(
|
||||||
|
&self,
|
||||||
|
ctx: &QueryContextRef,
|
||||||
|
physical_table: String,
|
||||||
|
) -> Result<()> {
|
||||||
|
let catalog_name = ctx.current_catalog();
|
||||||
|
let schema_name = ctx.current_schema();
|
||||||
|
|
||||||
|
// check if exist
|
||||||
|
if self
|
||||||
|
.get_table(catalog_name, &schema_name, &physical_table)
|
||||||
|
.await?
|
||||||
|
.is_some()
|
||||||
|
{
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
let table_reference = TableReference::full(catalog_name, &schema_name, &physical_table);
|
||||||
|
common_telemetry::info!(
|
||||||
|
"Physical metric table `{table_reference}` does not exist, try creating table"
|
||||||
|
);
|
||||||
|
|
||||||
|
// schema with timestamp and field column
|
||||||
|
let default_schema = vec![
|
||||||
|
ColumnSchema {
|
||||||
|
column_name: GREPTIME_TIMESTAMP.to_string(),
|
||||||
|
datatype: ColumnDataType::TimestampMillisecond as _,
|
||||||
|
semantic_type: SemanticType::Timestamp as _,
|
||||||
|
datatype_extension: None,
|
||||||
|
options: None,
|
||||||
|
},
|
||||||
|
ColumnSchema {
|
||||||
|
column_name: GREPTIME_VALUE.to_string(),
|
||||||
|
datatype: ColumnDataType::Float64 as _,
|
||||||
|
semantic_type: SemanticType::Field as _,
|
||||||
|
datatype_extension: None,
|
||||||
|
options: None,
|
||||||
|
},
|
||||||
|
];
|
||||||
|
let create_table_expr =
|
||||||
|
&mut build_create_table_expr(&table_reference, &default_schema, default_engine())?;
|
||||||
|
create_table_expr.engine = METRIC_ENGINE_NAME.to_string();
|
||||||
|
create_table_expr
|
||||||
|
.table_options
|
||||||
|
.insert(PHYSICAL_TABLE_METADATA_KEY.to_string(), "true".to_string());
|
||||||
|
|
||||||
|
// create physical table.
|
||||||
|
// TODO(yingwen): Simplify this function. But remember to start the timer.
|
||||||
|
let res = self
|
||||||
|
.create_table_by_expr(create_table_expr, None, ctx.clone())
|
||||||
|
.await;
|
||||||
|
match res {
|
||||||
|
Ok(_) => {
|
||||||
|
common_telemetry::info!("Successfully created table {table_reference}",);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
common_telemetry::error!(err; "Failed to create table {table_reference}");
|
||||||
|
Err(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Creates a table by [CreateTableExpr].
|
||||||
|
#[tracing::instrument(skip_all)]
|
||||||
|
pub async fn create_table_by_expr(
|
||||||
|
&self,
|
||||||
|
create_table: &mut CreateTableExpr,
|
||||||
|
partitions: Option<Partitions>,
|
||||||
|
query_ctx: QueryContextRef,
|
||||||
|
) -> Result<TableRef> {
|
||||||
|
ensure!(
|
||||||
|
!is_readonly_schema(&create_table.schema_name),
|
||||||
|
SchemaReadOnlySnafu {
|
||||||
|
name: create_table.schema_name.clone()
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
if create_table.engine == METRIC_ENGINE_NAME
|
||||||
|
&& create_table
|
||||||
|
.table_options
|
||||||
|
.contains_key(LOGICAL_TABLE_METADATA_KEY)
|
||||||
|
{
|
||||||
|
// Create logical tables
|
||||||
|
ensure!(
|
||||||
|
partitions.is_none(),
|
||||||
|
InvalidPartitionRuleSnafu {
|
||||||
|
reason: "logical table in metric engine should not have partition rule, it will be inherited from physical table",
|
||||||
|
}
|
||||||
|
);
|
||||||
|
self.create_logical_tables(std::slice::from_ref(create_table), query_ctx)
|
||||||
|
.await?
|
||||||
|
.into_iter()
|
||||||
|
.next()
|
||||||
|
.context(UnexpectedSnafu {
|
||||||
|
violated: "expected to create logical tables",
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
// Create other normal table
|
||||||
|
self.create_non_logic_table(create_table, partitions, query_ctx)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Creates a non-logical table.
|
||||||
|
/// - If the schema doesn't exist, returns an error
|
||||||
|
/// - If the table already exists:
|
||||||
|
/// - If `create_if_not_exists` is true, returns the existing table
|
||||||
|
/// - If `create_if_not_exists` is false, returns an error
|
||||||
|
#[tracing::instrument(skip_all)]
|
||||||
|
pub async fn create_non_logic_table(
|
||||||
|
&self,
|
||||||
|
create_table: &mut CreateTableExpr,
|
||||||
|
partitions: Option<Partitions>,
|
||||||
|
query_ctx: QueryContextRef,
|
||||||
|
) -> Result<TableRef> {
|
||||||
|
let _timer = crate::metrics::DIST_CREATE_TABLE.start_timer();
|
||||||
|
|
||||||
|
// Check if schema exists
|
||||||
|
let schema = self
|
||||||
|
.table_metadata_manager
|
||||||
|
.schema_manager()
|
||||||
|
.get(SchemaNameKey::new(
|
||||||
|
&create_table.catalog_name,
|
||||||
|
&create_table.schema_name,
|
||||||
|
))
|
||||||
|
.await
|
||||||
|
.context(TableMetadataManagerSnafu)?;
|
||||||
|
ensure!(
|
||||||
|
schema.is_some(),
|
||||||
|
SchemaNotFoundSnafu {
|
||||||
|
schema_info: &create_table.schema_name,
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
// if table exists.
|
||||||
|
if let Some(table) = self
|
||||||
|
.catalog_manager
|
||||||
|
.table(
|
||||||
|
&create_table.catalog_name,
|
||||||
|
&create_table.schema_name,
|
||||||
|
&create_table.table_name,
|
||||||
|
Some(&query_ctx),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.context(CatalogSnafu)?
|
||||||
|
{
|
||||||
|
return if create_table.create_if_not_exists {
|
||||||
|
Ok(table)
|
||||||
|
} else {
|
||||||
|
TableAlreadyExistsSnafu {
|
||||||
|
table: format_full_table_name(
|
||||||
|
&create_table.catalog_name,
|
||||||
|
&create_table.schema_name,
|
||||||
|
&create_table.table_name,
|
||||||
|
),
|
||||||
|
}
|
||||||
|
.fail()
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
ensure!(
|
||||||
|
NAME_PATTERN_REG.is_match(&create_table.table_name),
|
||||||
|
InvalidTableNameSnafu {
|
||||||
|
table_name: &create_table.table_name,
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
let table_name = TableName::new(
|
||||||
|
&create_table.catalog_name,
|
||||||
|
&create_table.schema_name,
|
||||||
|
&create_table.table_name,
|
||||||
|
);
|
||||||
|
|
||||||
|
let (partitions, partition_cols) = parse_partitions(create_table, partitions, &query_ctx)?;
|
||||||
|
let mut table_info = create_table_info(create_table, partition_cols)?;
|
||||||
|
|
||||||
|
let resp = self
|
||||||
|
.create_table_procedure(
|
||||||
|
create_table.clone(),
|
||||||
|
partitions,
|
||||||
|
table_info.clone(),
|
||||||
|
query_ctx,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let table_id = resp.table_ids.into_iter().next().context(UnexpectedSnafu {
|
||||||
|
violated: "expected table_id",
|
||||||
|
})?;
|
||||||
|
common_telemetry::info!(
|
||||||
|
"Successfully created table '{table_name}' with table id {table_id}"
|
||||||
|
);
|
||||||
|
|
||||||
|
table_info.ident.table_id = table_id;
|
||||||
|
|
||||||
|
let table_info: Arc<TableInfo> =
|
||||||
|
Arc::new(table_info.try_into().context(CreateTableInfoSnafu)?);
|
||||||
|
create_table.table_id = Some(api::v1::TableId { id: table_id });
|
||||||
|
|
||||||
|
let table = DistTable::table(table_info);
|
||||||
|
|
||||||
|
Ok(table)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Creates logical tables.
|
||||||
|
#[tracing::instrument(skip_all)]
|
||||||
|
pub async fn create_logical_tables(
|
||||||
|
&self,
|
||||||
|
create_table_exprs: &[CreateTableExpr],
|
||||||
|
query_context: QueryContextRef,
|
||||||
|
) -> Result<Vec<TableRef>> {
|
||||||
|
let _timer = crate::metrics::DIST_CREATE_TABLES.start_timer();
|
||||||
|
ensure!(
|
||||||
|
!create_table_exprs.is_empty(),
|
||||||
|
EmptyDdlExprSnafu {
|
||||||
|
name: "create logic tables"
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
// Check table names
|
||||||
|
for create_table in create_table_exprs {
|
||||||
|
ensure!(
|
||||||
|
NAME_PATTERN_REG.is_match(&create_table.table_name),
|
||||||
|
InvalidTableNameSnafu {
|
||||||
|
table_name: &create_table.table_name,
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut raw_tables_info = create_table_exprs
|
||||||
|
.iter()
|
||||||
|
.map(|create| create_table_info(create, vec![]))
|
||||||
|
.collect::<Result<Vec<_>>>()?;
|
||||||
|
let tables_data = create_table_exprs
|
||||||
|
.iter()
|
||||||
|
.cloned()
|
||||||
|
.zip(raw_tables_info.iter().cloned())
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
let resp = self
|
||||||
|
.create_logical_tables_procedure(tables_data, query_context)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let table_ids = resp.table_ids;
|
||||||
|
ensure!(table_ids.len() == raw_tables_info.len(), CreateLogicalTablesSnafu {
|
||||||
|
reason: format!("The number of tables is inconsistent with the expected number to be created, expected: {}, actual: {}", raw_tables_info.len(), table_ids.len())
|
||||||
|
});
|
||||||
|
common_telemetry::info!("Successfully created logical tables: {:?}", table_ids);
|
||||||
|
|
||||||
|
for (i, table_info) in raw_tables_info.iter_mut().enumerate() {
|
||||||
|
table_info.ident.table_id = table_ids[i];
|
||||||
|
}
|
||||||
|
let tables_info = raw_tables_info
|
||||||
|
.into_iter()
|
||||||
|
.map(|x| x.try_into().context(CreateTableInfoSnafu))
|
||||||
|
.collect::<Result<Vec<_>>>()?;
|
||||||
|
|
||||||
|
Ok(tables_info
|
||||||
|
.into_iter()
|
||||||
|
.map(|x| DistTable::table(Arc::new(x)))
|
||||||
|
.collect())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Alters a table by [AlterTableExpr].
|
||||||
|
#[tracing::instrument(skip_all)]
|
||||||
|
pub async fn alter_table_by_expr(
|
||||||
|
&self,
|
||||||
|
expr: AlterTableExpr,
|
||||||
|
query_context: QueryContextRef,
|
||||||
|
) -> Result<Output> {
|
||||||
|
ensure!(
|
||||||
|
!is_readonly_schema(&expr.schema_name),
|
||||||
|
SchemaReadOnlySnafu {
|
||||||
|
name: expr.schema_name.clone()
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
let catalog_name = if expr.catalog_name.is_empty() {
|
||||||
|
DEFAULT_CATALOG_NAME.to_string()
|
||||||
|
} else {
|
||||||
|
expr.catalog_name.clone()
|
||||||
|
};
|
||||||
|
|
||||||
|
let schema_name = if expr.schema_name.is_empty() {
|
||||||
|
DEFAULT_SCHEMA_NAME.to_string()
|
||||||
|
} else {
|
||||||
|
expr.schema_name.clone()
|
||||||
|
};
|
||||||
|
|
||||||
|
let table_name = expr.table_name.clone();
|
||||||
|
|
||||||
|
let table = self
|
||||||
|
.catalog_manager
|
||||||
|
.table(
|
||||||
|
&catalog_name,
|
||||||
|
&schema_name,
|
||||||
|
&table_name,
|
||||||
|
Some(&query_context),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.context(CatalogSnafu)?
|
||||||
|
.with_context(|| TableNotFoundSnafu {
|
||||||
|
table_name: format_full_table_name(&catalog_name, &schema_name, &table_name),
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let table_id = table.table_info().ident.table_id;
|
||||||
|
let need_alter = verify_alter(table_id, table.table_info(), expr.clone())?;
|
||||||
|
if !need_alter {
|
||||||
|
return Ok(Output::new_with_affected_rows(0));
|
||||||
|
}
|
||||||
|
common_telemetry::info!(
|
||||||
|
"Table info before alter is {:?}, expr: {:?}",
|
||||||
|
table.table_info(),
|
||||||
|
expr
|
||||||
|
);
|
||||||
|
|
||||||
|
let physical_table_id = self
|
||||||
|
.table_metadata_manager
|
||||||
|
.table_route_manager()
|
||||||
|
.get_physical_table_id(table_id)
|
||||||
|
.await
|
||||||
|
.context(TableMetadataManagerSnafu)?;
|
||||||
|
|
||||||
|
let (req, invalidate_keys) = if physical_table_id == table_id {
|
||||||
|
// This is physical table
|
||||||
|
let req = SubmitDdlTaskRequest {
|
||||||
|
query_context,
|
||||||
|
task: DdlTask::new_alter_table(expr),
|
||||||
|
};
|
||||||
|
|
||||||
|
let invalidate_keys = vec![
|
||||||
|
CacheIdent::TableId(table_id),
|
||||||
|
CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
|
||||||
|
];
|
||||||
|
|
||||||
|
(req, invalidate_keys)
|
||||||
|
} else {
|
||||||
|
// This is logical table
|
||||||
|
let req = SubmitDdlTaskRequest {
|
||||||
|
query_context,
|
||||||
|
task: DdlTask::new_alter_logical_tables(vec![expr]),
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut invalidate_keys = vec![
|
||||||
|
CacheIdent::TableId(physical_table_id),
|
||||||
|
CacheIdent::TableId(table_id),
|
||||||
|
CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
|
||||||
|
];
|
||||||
|
|
||||||
|
let physical_table = self
|
||||||
|
.table_metadata_manager
|
||||||
|
.table_info_manager()
|
||||||
|
.get(physical_table_id)
|
||||||
|
.await
|
||||||
|
.context(TableMetadataManagerSnafu)?
|
||||||
|
.map(|x| x.into_inner());
|
||||||
|
if let Some(physical_table) = physical_table {
|
||||||
|
let physical_table_name = TableName::new(
|
||||||
|
physical_table.table_info.catalog_name,
|
||||||
|
physical_table.table_info.schema_name,
|
||||||
|
physical_table.table_info.name,
|
||||||
|
);
|
||||||
|
invalidate_keys.push(CacheIdent::TableName(physical_table_name));
|
||||||
|
}
|
||||||
|
|
||||||
|
(req, invalidate_keys)
|
||||||
|
};
|
||||||
|
|
||||||
|
self.procedure_executor
|
||||||
|
.submit_ddl_task(&ExecutorContext::default(), req)
|
||||||
|
.await
|
||||||
|
.context(ExecuteDdlSnafu)?;
|
||||||
|
|
||||||
|
// Invalidates local cache ASAP.
|
||||||
|
self.cache_invalidator
|
||||||
|
.invalidate(&Context::default(), &invalidate_keys)
|
||||||
|
.await
|
||||||
|
.context(InvalidateTableCacheSnafu)?;
|
||||||
|
|
||||||
|
Ok(Output::new_with_affected_rows(0))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Alter logical tables.
|
||||||
|
pub async fn alter_logical_tables(
|
||||||
|
&self,
|
||||||
|
alter_table_exprs: Vec<AlterTableExpr>,
|
||||||
|
query_context: QueryContextRef,
|
||||||
|
) -> Result<Output> {
|
||||||
|
let _timer = crate::metrics::DIST_ALTER_TABLES.start_timer();
|
||||||
|
ensure!(
|
||||||
|
!alter_table_exprs.is_empty(),
|
||||||
|
EmptyDdlExprSnafu {
|
||||||
|
name: "alter logical tables"
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
// group by physical table id
|
||||||
|
let mut groups: HashMap<TableId, Vec<AlterTableExpr>> = HashMap::new();
|
||||||
|
for expr in alter_table_exprs {
|
||||||
|
// Get table_id from catalog_manager
|
||||||
|
let catalog = if expr.catalog_name.is_empty() {
|
||||||
|
query_context.current_catalog()
|
||||||
|
} else {
|
||||||
|
&expr.catalog_name
|
||||||
|
};
|
||||||
|
let schema = if expr.schema_name.is_empty() {
|
||||||
|
query_context.current_schema()
|
||||||
|
} else {
|
||||||
|
expr.schema_name.to_string()
|
||||||
|
};
|
||||||
|
let table_name = &expr.table_name;
|
||||||
|
let table = self
|
||||||
|
.catalog_manager
|
||||||
|
.table(catalog, &schema, table_name, Some(&query_context))
|
||||||
|
.await
|
||||||
|
.context(CatalogSnafu)?
|
||||||
|
.with_context(|| TableNotFoundSnafu {
|
||||||
|
table_name: format_full_table_name(catalog, &schema, table_name),
|
||||||
|
})?;
|
||||||
|
let table_id = table.table_info().ident.table_id;
|
||||||
|
let physical_table_id = self
|
||||||
|
.table_metadata_manager
|
||||||
|
.table_route_manager()
|
||||||
|
.get_physical_table_id(table_id)
|
||||||
|
.await
|
||||||
|
.context(TableMetadataManagerSnafu)?;
|
||||||
|
groups.entry(physical_table_id).or_default().push(expr);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Submit procedure for each physical table
|
||||||
|
let mut handles = Vec::with_capacity(groups.len());
|
||||||
|
for (_physical_table_id, exprs) in groups {
|
||||||
|
let fut = self.alter_logical_tables_procedure(exprs, query_context.clone());
|
||||||
|
handles.push(fut);
|
||||||
|
}
|
||||||
|
let _results = futures::future::try_join_all(handles).await?;
|
||||||
|
|
||||||
|
Ok(Output::new_with_affected_rows(0))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the catalog manager.
|
||||||
|
pub fn catalog_manager(&self) -> &CatalogManagerRef {
|
||||||
|
&self.catalog_manager
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the table route manager.
|
||||||
|
pub fn table_route_manager(&self) -> &TableRouteManager {
|
||||||
|
self.table_metadata_manager.table_route_manager()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Submits a procedure to create a non-logical table.
|
||||||
|
async fn create_table_procedure(
|
||||||
|
&self,
|
||||||
|
create_table: CreateTableExpr,
|
||||||
|
partitions: Vec<Partition>,
|
||||||
|
table_info: RawTableInfo,
|
||||||
|
query_context: QueryContextRef,
|
||||||
|
) -> Result<SubmitDdlTaskResponse> {
|
||||||
|
let partitions = partitions.into_iter().map(Into::into).collect();
|
||||||
|
|
||||||
|
let request = SubmitDdlTaskRequest {
|
||||||
|
query_context,
|
||||||
|
task: DdlTask::new_create_table(create_table, partitions, table_info),
|
||||||
|
};
|
||||||
|
|
||||||
|
self.procedure_executor
|
||||||
|
.submit_ddl_task(&ExecutorContext::default(), request)
|
||||||
|
.await
|
||||||
|
.context(ExecuteDdlSnafu)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Submits a procedure to create logical tables.
|
||||||
|
async fn create_logical_tables_procedure(
|
||||||
|
&self,
|
||||||
|
tables_data: Vec<(CreateTableExpr, RawTableInfo)>,
|
||||||
|
query_context: QueryContextRef,
|
||||||
|
) -> Result<SubmitDdlTaskResponse> {
|
||||||
|
let request = SubmitDdlTaskRequest {
|
||||||
|
query_context,
|
||||||
|
task: DdlTask::new_create_logical_tables(tables_data),
|
||||||
|
};
|
||||||
|
|
||||||
|
self.procedure_executor
|
||||||
|
.submit_ddl_task(&ExecutorContext::default(), request)
|
||||||
|
.await
|
||||||
|
.context(ExecuteDdlSnafu)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Submits a procedure to alter logical tables.
|
||||||
|
async fn alter_logical_tables_procedure(
|
||||||
|
&self,
|
||||||
|
tables_data: Vec<AlterTableExpr>,
|
||||||
|
query_context: QueryContextRef,
|
||||||
|
) -> Result<SubmitDdlTaskResponse> {
|
||||||
|
let request = SubmitDdlTaskRequest {
|
||||||
|
query_context,
|
||||||
|
task: DdlTask::new_alter_logical_tables(tables_data),
|
||||||
|
};
|
||||||
|
|
||||||
|
self.procedure_executor
|
||||||
|
.submit_ddl_task(&ExecutorContext::default(), request)
|
||||||
|
.await
|
||||||
|
.context(ExecuteDdlSnafu)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Schema of a logical table.
|
||||||
|
pub struct LogicalSchema {
|
||||||
|
/// Name of the logical table.
|
||||||
|
pub name: String,
|
||||||
|
/// Schema of columns in the logical table.
|
||||||
|
pub columns: Vec<ColumnSchema>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Logical table schemas.
|
||||||
|
pub struct LogicalSchemas {
|
||||||
|
/// Logical table schemas group by physical table name.
|
||||||
|
pub schemas: HashMap<String, Vec<LogicalSchema>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Creates or alters logical tables to match the provided schemas
|
||||||
|
/// for prometheus metrics.
|
||||||
|
pub async fn ensure_logical_tables_for_metrics(
|
||||||
|
helper: &SchemaHelper,
|
||||||
|
schemas: &LogicalSchemas,
|
||||||
|
query_ctx: &QueryContextRef,
|
||||||
|
) -> Result<()> {
|
||||||
|
let catalog_name = query_ctx.current_catalog();
|
||||||
|
let schema_name = query_ctx.current_schema();
|
||||||
|
|
||||||
|
// 1. For each physical table, creates it if it doesn't exist.
|
||||||
|
for physical_table_name in schemas.schemas.keys() {
|
||||||
|
// Check if the physical table exists and create it if it doesn't
|
||||||
|
let physical_table_opt = helper
|
||||||
|
.get_table(catalog_name, &schema_name, physical_table_name)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
if physical_table_opt.is_none() {
|
||||||
|
// Physical table doesn't exist, create it
|
||||||
|
helper
|
||||||
|
.create_metric_physical_table(query_ctx, physical_table_name.clone())
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. Collects logical tables that do not exist. (CreateTableExpr)
|
||||||
|
let mut tables_to_create: Vec<CreateTableExpr> = Vec::new();
|
||||||
|
|
||||||
|
// 3. Collects alterations (columns to add) for each logical table. (AlterTableExpr)
|
||||||
|
let mut tables_to_alter: Vec<AlterTableExpr> = Vec::new();
|
||||||
|
|
||||||
|
// Process each logical table to determine if it needs to be created or altered
|
||||||
|
for (physical_table_name, logical_schemas) in &schemas.schemas {
|
||||||
|
for logical_schema in logical_schemas {
|
||||||
|
let table_name = &logical_schema.name;
|
||||||
|
|
||||||
|
// Check if the logical table exists
|
||||||
|
let table_opt = helper
|
||||||
|
.get_table(catalog_name, &schema_name, table_name)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
if let Some(existing_table) = table_opt {
|
||||||
|
// Logical table exists, determine if it needs alteration
|
||||||
|
let existing_schema = existing_table.schema();
|
||||||
|
let column_exprs = ColumnExpr::from_column_schemas(&logical_schema.columns);
|
||||||
|
let add_columns =
|
||||||
|
expr_helper::extract_add_columns_expr(&existing_schema, column_exprs)?;
|
||||||
|
let Some(add_columns) = add_columns else {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
|
||||||
|
let alter_expr = AlterTableExpr {
|
||||||
|
catalog_name: catalog_name.to_string(),
|
||||||
|
schema_name: schema_name.clone(),
|
||||||
|
table_name: table_name.to_string(),
|
||||||
|
kind: Some(Kind::AddColumns(add_columns)),
|
||||||
|
};
|
||||||
|
tables_to_alter.push(alter_expr);
|
||||||
|
} else {
|
||||||
|
// Logical table doesn't exist, prepare for creation
|
||||||
|
// Build a CreateTableExpr from the table reference and columns
|
||||||
|
let table_ref = TableReference::full(catalog_name, &schema_name, table_name);
|
||||||
|
let mut create_expr = build_create_table_expr(
|
||||||
|
&table_ref,
|
||||||
|
&logical_schema.columns,
|
||||||
|
METRIC_ENGINE_NAME,
|
||||||
|
)?;
|
||||||
|
create_expr.create_if_not_exists = true;
|
||||||
|
let create_type = AutoCreateTableType::Logical(physical_table_name.clone());
|
||||||
|
// Fill table options.
|
||||||
|
fill_table_options_for_create(
|
||||||
|
&mut create_expr.table_options,
|
||||||
|
&create_type,
|
||||||
|
query_ctx,
|
||||||
|
);
|
||||||
|
|
||||||
|
tables_to_create.push(create_expr);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 4. Creates logical tables in batch using `create_logical_tables()`.
|
||||||
|
if !tables_to_create.is_empty() {
|
||||||
|
helper
|
||||||
|
.create_logical_tables(&tables_to_create, query_ctx.clone())
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 5. Alters logical tables in batch using `alter_logical_tables()`.
|
||||||
|
if !tables_to_alter.is_empty() {
|
||||||
|
helper
|
||||||
|
.alter_logical_tables(tables_to_alter, query_ctx.clone())
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Gets the list of metadatas for a list of region ids.
|
||||||
|
// TODO(yingwen): Should we return RegionMetadataRef?
|
||||||
|
pub async fn metadatas_for_region_ids(
|
||||||
|
partition_manager: &PartitionRuleManagerRef,
|
||||||
|
node_manager: &NodeManagerRef,
|
||||||
|
region_ids: &[RegionId],
|
||||||
|
ctx: &QueryContextRef,
|
||||||
|
) -> Result<Vec<Option<RegionMetadata>>> {
|
||||||
|
// Groups regions by peers.
|
||||||
|
// This map contains: peer => (ListMetadataRequest, A vec of indices of regions).
|
||||||
|
let mut request_per_region = HashMap::new();
|
||||||
|
for (index, region_id) in region_ids.iter().copied().enumerate() {
|
||||||
|
let peer = partition_manager
|
||||||
|
.find_region_leader(region_id)
|
||||||
|
.await
|
||||||
|
.context(FindRegionLeaderSnafu)?;
|
||||||
|
let request_indices = request_per_region
|
||||||
|
.entry(peer)
|
||||||
|
.or_insert_with(|| (ListMetadataRequest::default(), Vec::new()));
|
||||||
|
request_indices.0.region_ids.push(region_id.as_u64());
|
||||||
|
request_indices.1.push(index);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sends requests to datanode and waits for responses.
|
||||||
|
let tasks = request_per_region
|
||||||
|
.into_iter()
|
||||||
|
.map(|(peer, (request, indices))| {
|
||||||
|
let node_manager = node_manager.clone();
|
||||||
|
let request_factory = RegionRequestFactory::new(RegionRequestHeader {
|
||||||
|
tracing_context: TracingContext::from_current_span().to_w3c(),
|
||||||
|
dbname: ctx.get_db_string(),
|
||||||
|
..Default::default()
|
||||||
|
});
|
||||||
|
|
||||||
|
common_runtime::spawn_global(async move {
|
||||||
|
let request = request_factory.build_request(Body::ListMetadata(request));
|
||||||
|
let resp = node_manager
|
||||||
|
.datanode(&peer)
|
||||||
|
.await
|
||||||
|
.handle(request)
|
||||||
|
.await
|
||||||
|
.context(RequestRegionSnafu)?;
|
||||||
|
|
||||||
|
let metadatas: Vec<Option<RegionMetadata>> =
|
||||||
|
serde_json::from_slice(&resp.metadata).context(DecodeJsonSnafu)?;
|
||||||
|
Ok((metadatas, indices))
|
||||||
|
})
|
||||||
|
});
|
||||||
|
let results = future::try_join_all(tasks).await.context(JoinTaskSnafu)?;
|
||||||
|
let mut output_metadatas = vec![None; region_ids.len()];
|
||||||
|
for result in results {
|
||||||
|
let (mut metadatas, indices) = result?;
|
||||||
|
ensure!(
|
||||||
|
metadatas.len() == indices.len(),
|
||||||
|
UnexpectedSnafu {
|
||||||
|
violated: format!(
|
||||||
|
"Length mismatch between request and response, expected {} metadatas, got {}",
|
||||||
|
indices.len(),
|
||||||
|
metadatas.len()
|
||||||
|
),
|
||||||
|
}
|
||||||
|
);
|
||||||
|
for index in indices {
|
||||||
|
output_metadatas[index] = metadatas[index].take();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(output_metadatas)
|
||||||
|
}
|
||||||
@@ -18,7 +18,7 @@ mod copy_query_to;
|
|||||||
mod copy_table_from;
|
mod copy_table_from;
|
||||||
mod copy_table_to;
|
mod copy_table_to;
|
||||||
mod cursor;
|
mod cursor;
|
||||||
mod ddl;
|
pub(crate) mod ddl;
|
||||||
mod describe;
|
mod describe;
|
||||||
mod dml;
|
mod dml;
|
||||||
mod kill;
|
mod kill;
|
||||||
@@ -102,6 +102,14 @@ pub struct StatementExecutor {
|
|||||||
pub type StatementExecutorRef = Arc<StatementExecutor>;
|
pub type StatementExecutorRef = Arc<StatementExecutor>;
|
||||||
|
|
||||||
impl StatementExecutor {
|
impl StatementExecutor {
|
||||||
|
pub fn procedure_executor(&self) -> &ProcedureExecutorRef {
|
||||||
|
&self.procedure_executor
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn cache_invalidator(&self) -> &CacheInvalidatorRef {
|
||||||
|
&self.cache_invalidator
|
||||||
|
}
|
||||||
|
|
||||||
#[allow(clippy::too_many_arguments)]
|
#[allow(clippy::too_many_arguments)]
|
||||||
pub fn new(
|
pub fn new(
|
||||||
catalog_manager: CatalogManagerRef,
|
catalog_manager: CatalogManagerRef,
|
||||||
|
|||||||
@@ -26,7 +26,7 @@ use api::v1::{
|
|||||||
};
|
};
|
||||||
use catalog::CatalogManagerRef;
|
use catalog::CatalogManagerRef;
|
||||||
use chrono::Utc;
|
use chrono::Utc;
|
||||||
use common_catalog::consts::{is_readonly_schema, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
use common_catalog::consts::is_readonly_schema;
|
||||||
use common_catalog::{format_full_flow_name, format_full_table_name};
|
use common_catalog::{format_full_flow_name, format_full_table_name};
|
||||||
use common_error::ext::BoxedError;
|
use common_error::ext::BoxedError;
|
||||||
use common_meta::cache_invalidator::Context;
|
use common_meta::cache_invalidator::Context;
|
||||||
@@ -43,7 +43,7 @@ use common_meta::rpc::ddl::{
|
|||||||
CreateFlowTask, DdlTask, DropFlowTask, DropViewTask, SubmitDdlTaskRequest,
|
CreateFlowTask, DdlTask, DropFlowTask, DropViewTask, SubmitDdlTaskRequest,
|
||||||
SubmitDdlTaskResponse,
|
SubmitDdlTaskResponse,
|
||||||
};
|
};
|
||||||
use common_meta::rpc::router::{Partition, Partition as MetaPartition};
|
use common_meta::rpc::router::Partition as MetaPartition;
|
||||||
use common_query::Output;
|
use common_query::Output;
|
||||||
use common_telemetry::{debug, info, tracing, warn};
|
use common_telemetry::{debug, info, tracing, warn};
|
||||||
use common_time::Timezone;
|
use common_time::Timezone;
|
||||||
@@ -74,7 +74,6 @@ use sql::statements::create::{
|
|||||||
use sql::statements::sql_value_to_value;
|
use sql::statements::sql_value_to_value;
|
||||||
use sql::statements::statement::Statement;
|
use sql::statements::statement::Statement;
|
||||||
use sqlparser::ast::{Expr, Ident, UnaryOperator, Value as ParserValue};
|
use sqlparser::ast::{Expr, Ident, UnaryOperator, Value as ParserValue};
|
||||||
use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
|
|
||||||
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
|
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
|
||||||
use table::dist_table::DistTable;
|
use table::dist_table::DistTable;
|
||||||
use table::metadata::{self, RawTableInfo, RawTableMeta, TableId, TableInfo, TableType};
|
use table::metadata::{self, RawTableInfo, RawTableMeta, TableId, TableInfo, TableType};
|
||||||
@@ -84,12 +83,11 @@ use table::TableRef;
|
|||||||
|
|
||||||
use crate::error::{
|
use crate::error::{
|
||||||
self, AlterExprToRequestSnafu, BuildDfLogicalPlanSnafu, CatalogSnafu, ColumnDataTypeSnafu,
|
self, AlterExprToRequestSnafu, BuildDfLogicalPlanSnafu, CatalogSnafu, ColumnDataTypeSnafu,
|
||||||
ColumnNotFoundSnafu, ConvertSchemaSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu,
|
ColumnNotFoundSnafu, ConvertSchemaSnafu, CreateTableInfoSnafu, DeserializePartitionSnafu,
|
||||||
DeserializePartitionSnafu, EmptyDdlExprSnafu, ExternalSnafu, ExtractTableNamesSnafu,
|
ExternalSnafu, ExtractTableNamesSnafu, FlowNotFoundSnafu, InvalidPartitionRuleSnafu,
|
||||||
FlowNotFoundSnafu, InvalidPartitionRuleSnafu, InvalidPartitionSnafu, InvalidSqlSnafu,
|
InvalidPartitionSnafu, InvalidSqlSnafu, InvalidViewNameSnafu, InvalidViewStmtSnafu,
|
||||||
InvalidTableNameSnafu, InvalidViewNameSnafu, InvalidViewStmtSnafu, ParseSqlValueSnafu, Result,
|
ParseSqlValueSnafu, Result, SchemaInUseSnafu, SchemaNotFoundSnafu, SchemaReadOnlySnafu,
|
||||||
SchemaInUseSnafu, SchemaNotFoundSnafu, SchemaReadOnlySnafu, SubstraitCodecSnafu,
|
SubstraitCodecSnafu, TableAlreadyExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu,
|
||||||
TableAlreadyExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu,
|
|
||||||
UnrecognizedTableOptionSnafu, ViewAlreadyExistsSnafu,
|
UnrecognizedTableOptionSnafu, ViewAlreadyExistsSnafu,
|
||||||
};
|
};
|
||||||
use crate::expr_helper;
|
use crate::expr_helper;
|
||||||
@@ -97,7 +95,8 @@ use crate::statement::show::create_partitions_stmt;
|
|||||||
use crate::statement::StatementExecutor;
|
use crate::statement::StatementExecutor;
|
||||||
|
|
||||||
lazy_static! {
|
lazy_static! {
|
||||||
static ref NAME_PATTERN_REG: Regex = Regex::new(&format!("^{NAME_PATTERN}$")).unwrap();
|
/// Regex to validate table name.
|
||||||
|
pub(crate) static ref NAME_PATTERN_REG: Regex = Regex::new(&format!("^{NAME_PATTERN}$")).unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
impl StatementExecutor {
|
impl StatementExecutor {
|
||||||
@@ -182,193 +181,11 @@ impl StatementExecutor {
|
|||||||
partitions: Option<Partitions>,
|
partitions: Option<Partitions>,
|
||||||
query_ctx: QueryContextRef,
|
query_ctx: QueryContextRef,
|
||||||
) -> Result<TableRef> {
|
) -> Result<TableRef> {
|
||||||
ensure!(
|
self.inserter
|
||||||
!is_readonly_schema(&create_table.schema_name),
|
.schema_helper
|
||||||
SchemaReadOnlySnafu {
|
.create_table_by_expr(create_table, partitions, query_ctx)
|
||||||
name: create_table.schema_name.clone()
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
if create_table.engine == METRIC_ENGINE_NAME
|
|
||||||
&& create_table
|
|
||||||
.table_options
|
|
||||||
.contains_key(LOGICAL_TABLE_METADATA_KEY)
|
|
||||||
{
|
|
||||||
// Create logical tables
|
|
||||||
ensure!(
|
|
||||||
partitions.is_none(),
|
|
||||||
InvalidPartitionRuleSnafu {
|
|
||||||
reason: "logical table in metric engine should not have partition rule, it will be inherited from physical table",
|
|
||||||
}
|
|
||||||
);
|
|
||||||
self.create_logical_tables(std::slice::from_ref(create_table), query_ctx)
|
|
||||||
.await?
|
|
||||||
.into_iter()
|
|
||||||
.next()
|
|
||||||
.context(error::UnexpectedSnafu {
|
|
||||||
violated: "expected to create logical tables",
|
|
||||||
})
|
|
||||||
} else {
|
|
||||||
// Create other normal table
|
|
||||||
self.create_non_logic_table(create_table, partitions, query_ctx)
|
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
#[tracing::instrument(skip_all)]
|
|
||||||
pub async fn create_non_logic_table(
|
|
||||||
&self,
|
|
||||||
create_table: &mut CreateTableExpr,
|
|
||||||
partitions: Option<Partitions>,
|
|
||||||
query_ctx: QueryContextRef,
|
|
||||||
) -> Result<TableRef> {
|
|
||||||
let _timer = crate::metrics::DIST_CREATE_TABLE.start_timer();
|
|
||||||
|
|
||||||
// Check if schema exists
|
|
||||||
let schema = self
|
|
||||||
.table_metadata_manager
|
|
||||||
.schema_manager()
|
|
||||||
.get(SchemaNameKey::new(
|
|
||||||
&create_table.catalog_name,
|
|
||||||
&create_table.schema_name,
|
|
||||||
))
|
|
||||||
.await
|
|
||||||
.context(TableMetadataManagerSnafu)?;
|
|
||||||
ensure!(
|
|
||||||
schema.is_some(),
|
|
||||||
SchemaNotFoundSnafu {
|
|
||||||
schema_info: &create_table.schema_name,
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
// if table exists.
|
|
||||||
if let Some(table) = self
|
|
||||||
.catalog_manager
|
|
||||||
.table(
|
|
||||||
&create_table.catalog_name,
|
|
||||||
&create_table.schema_name,
|
|
||||||
&create_table.table_name,
|
|
||||||
Some(&query_ctx),
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.context(CatalogSnafu)?
|
|
||||||
{
|
|
||||||
return if create_table.create_if_not_exists {
|
|
||||||
Ok(table)
|
|
||||||
} else {
|
|
||||||
TableAlreadyExistsSnafu {
|
|
||||||
table: format_full_table_name(
|
|
||||||
&create_table.catalog_name,
|
|
||||||
&create_table.schema_name,
|
|
||||||
&create_table.table_name,
|
|
||||||
),
|
|
||||||
}
|
|
||||||
.fail()
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
ensure!(
|
|
||||||
NAME_PATTERN_REG.is_match(&create_table.table_name),
|
|
||||||
InvalidTableNameSnafu {
|
|
||||||
table_name: &create_table.table_name,
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
let table_name = TableName::new(
|
|
||||||
&create_table.catalog_name,
|
|
||||||
&create_table.schema_name,
|
|
||||||
&create_table.table_name,
|
|
||||||
);
|
|
||||||
|
|
||||||
let (partitions, partition_cols) = parse_partitions(create_table, partitions, &query_ctx)?;
|
|
||||||
let mut table_info = create_table_info(create_table, partition_cols)?;
|
|
||||||
|
|
||||||
let resp = self
|
|
||||||
.create_table_procedure(
|
|
||||||
create_table.clone(),
|
|
||||||
partitions,
|
|
||||||
table_info.clone(),
|
|
||||||
query_ctx,
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
let table_id = resp
|
|
||||||
.table_ids
|
|
||||||
.into_iter()
|
|
||||||
.next()
|
|
||||||
.context(error::UnexpectedSnafu {
|
|
||||||
violated: "expected table_id",
|
|
||||||
})?;
|
|
||||||
info!("Successfully created table '{table_name}' with table id {table_id}");
|
|
||||||
|
|
||||||
table_info.ident.table_id = table_id;
|
|
||||||
|
|
||||||
let table_info: Arc<TableInfo> =
|
|
||||||
Arc::new(table_info.try_into().context(CreateTableInfoSnafu)?);
|
|
||||||
create_table.table_id = Some(api::v1::TableId { id: table_id });
|
|
||||||
|
|
||||||
let table = DistTable::table(table_info);
|
|
||||||
|
|
||||||
Ok(table)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tracing::instrument(skip_all)]
|
|
||||||
pub async fn create_logical_tables(
|
|
||||||
&self,
|
|
||||||
create_table_exprs: &[CreateTableExpr],
|
|
||||||
query_context: QueryContextRef,
|
|
||||||
) -> Result<Vec<TableRef>> {
|
|
||||||
let _timer = crate::metrics::DIST_CREATE_TABLES.start_timer();
|
|
||||||
ensure!(
|
|
||||||
!create_table_exprs.is_empty(),
|
|
||||||
EmptyDdlExprSnafu {
|
|
||||||
name: "create logic tables"
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
// Check table names
|
|
||||||
for create_table in create_table_exprs {
|
|
||||||
ensure!(
|
|
||||||
NAME_PATTERN_REG.is_match(&create_table.table_name),
|
|
||||||
InvalidTableNameSnafu {
|
|
||||||
table_name: &create_table.table_name,
|
|
||||||
}
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut raw_tables_info = create_table_exprs
|
|
||||||
.iter()
|
|
||||||
.map(|create| create_table_info(create, vec![]))
|
|
||||||
.collect::<Result<Vec<_>>>()?;
|
|
||||||
let tables_data = create_table_exprs
|
|
||||||
.iter()
|
|
||||||
.cloned()
|
|
||||||
.zip(raw_tables_info.iter().cloned())
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
|
|
||||||
let resp = self
|
|
||||||
.create_logical_tables_procedure(tables_data, query_context)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
let table_ids = resp.table_ids;
|
|
||||||
ensure!(table_ids.len() == raw_tables_info.len(), CreateLogicalTablesSnafu {
|
|
||||||
reason: format!("The number of tables is inconsistent with the expected number to be created, expected: {}, actual: {}", raw_tables_info.len(), table_ids.len())
|
|
||||||
});
|
|
||||||
info!("Successfully created logical tables: {:?}", table_ids);
|
|
||||||
|
|
||||||
for (i, table_info) in raw_tables_info.iter_mut().enumerate() {
|
|
||||||
table_info.ident.table_id = table_ids[i];
|
|
||||||
}
|
|
||||||
let tables_info = raw_tables_info
|
|
||||||
.into_iter()
|
|
||||||
.map(|x| x.try_into().context(CreateTableInfoSnafu))
|
|
||||||
.collect::<Result<Vec<_>>>()?;
|
|
||||||
|
|
||||||
Ok(tables_info
|
|
||||||
.into_iter()
|
|
||||||
.map(|x| DistTable::table(Arc::new(x)))
|
|
||||||
.collect())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(feature = "enterprise")]
|
#[cfg(feature = "enterprise")]
|
||||||
#[tracing::instrument(skip_all)]
|
#[tracing::instrument(skip_all)]
|
||||||
@@ -953,64 +770,6 @@ impl StatementExecutor {
|
|||||||
.context(error::ExecuteDdlSnafu)
|
.context(error::ExecuteDdlSnafu)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip_all)]
|
|
||||||
pub async fn alter_logical_tables(
|
|
||||||
&self,
|
|
||||||
alter_table_exprs: Vec<AlterTableExpr>,
|
|
||||||
query_context: QueryContextRef,
|
|
||||||
) -> Result<Output> {
|
|
||||||
let _timer = crate::metrics::DIST_ALTER_TABLES.start_timer();
|
|
||||||
ensure!(
|
|
||||||
!alter_table_exprs.is_empty(),
|
|
||||||
EmptyDdlExprSnafu {
|
|
||||||
name: "alter logical tables"
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
// group by physical table id
|
|
||||||
let mut groups: HashMap<TableId, Vec<AlterTableExpr>> = HashMap::new();
|
|
||||||
for expr in alter_table_exprs {
|
|
||||||
// Get table_id from catalog_manager
|
|
||||||
let catalog = if expr.catalog_name.is_empty() {
|
|
||||||
query_context.current_catalog()
|
|
||||||
} else {
|
|
||||||
&expr.catalog_name
|
|
||||||
};
|
|
||||||
let schema = if expr.schema_name.is_empty() {
|
|
||||||
query_context.current_schema()
|
|
||||||
} else {
|
|
||||||
expr.schema_name.to_string()
|
|
||||||
};
|
|
||||||
let table_name = &expr.table_name;
|
|
||||||
let table = self
|
|
||||||
.catalog_manager
|
|
||||||
.table(catalog, &schema, table_name, Some(&query_context))
|
|
||||||
.await
|
|
||||||
.context(CatalogSnafu)?
|
|
||||||
.with_context(|| TableNotFoundSnafu {
|
|
||||||
table_name: format_full_table_name(catalog, &schema, table_name),
|
|
||||||
})?;
|
|
||||||
let table_id = table.table_info().ident.table_id;
|
|
||||||
let physical_table_id = self
|
|
||||||
.table_metadata_manager
|
|
||||||
.table_route_manager()
|
|
||||||
.get_physical_table_id(table_id)
|
|
||||||
.await
|
|
||||||
.context(TableMetadataManagerSnafu)?;
|
|
||||||
groups.entry(physical_table_id).or_default().push(expr);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Submit procedure for each physical table
|
|
||||||
let mut handles = Vec::with_capacity(groups.len());
|
|
||||||
for (_physical_table_id, exprs) in groups {
|
|
||||||
let fut = self.alter_logical_tables_procedure(exprs, query_context.clone());
|
|
||||||
handles.push(fut);
|
|
||||||
}
|
|
||||||
let _results = futures::future::try_join_all(handles).await?;
|
|
||||||
|
|
||||||
Ok(Output::new_with_affected_rows(0))
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tracing::instrument(skip_all)]
|
#[tracing::instrument(skip_all)]
|
||||||
pub async fn drop_table(
|
pub async fn drop_table(
|
||||||
&self,
|
&self,
|
||||||
@@ -1152,60 +911,6 @@ impl StatementExecutor {
|
|||||||
Ok(Output::new_with_affected_rows(0))
|
Ok(Output::new_with_affected_rows(0))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Verifies an alter and returns whether it is necessary to perform the alter.
|
|
||||||
///
|
|
||||||
/// # Returns
|
|
||||||
///
|
|
||||||
/// Returns true if the alter need to be porformed; otherwise, it returns false.
|
|
||||||
fn verify_alter(
|
|
||||||
&self,
|
|
||||||
table_id: TableId,
|
|
||||||
table_info: Arc<TableInfo>,
|
|
||||||
expr: AlterTableExpr,
|
|
||||||
) -> Result<bool> {
|
|
||||||
let request: AlterTableRequest = common_grpc_expr::alter_expr_to_request(table_id, expr)
|
|
||||||
.context(AlterExprToRequestSnafu)?;
|
|
||||||
|
|
||||||
let AlterTableRequest {
|
|
||||||
table_name,
|
|
||||||
alter_kind,
|
|
||||||
..
|
|
||||||
} = &request;
|
|
||||||
|
|
||||||
if let AlterKind::RenameTable { new_table_name } = alter_kind {
|
|
||||||
ensure!(
|
|
||||||
NAME_PATTERN_REG.is_match(new_table_name),
|
|
||||||
error::UnexpectedSnafu {
|
|
||||||
violated: format!("Invalid table name: {}", new_table_name)
|
|
||||||
}
|
|
||||||
);
|
|
||||||
} else if let AlterKind::AddColumns { columns } = alter_kind {
|
|
||||||
// If all the columns are marked as add_if_not_exists and they already exist in the table,
|
|
||||||
// there is no need to perform the alter.
|
|
||||||
let column_names: HashSet<_> = table_info
|
|
||||||
.meta
|
|
||||||
.schema
|
|
||||||
.column_schemas()
|
|
||||||
.iter()
|
|
||||||
.map(|schema| &schema.name)
|
|
||||||
.collect();
|
|
||||||
if columns.iter().all(|column| {
|
|
||||||
column_names.contains(&column.column_schema.name) && column.add_if_not_exists
|
|
||||||
}) {
|
|
||||||
return Ok(false);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let _ = table_info
|
|
||||||
.meta
|
|
||||||
.builder_with_alter_kind(table_name, &request.alter_kind)
|
|
||||||
.context(error::TableSnafu)?
|
|
||||||
.build()
|
|
||||||
.context(error::BuildTableMetaSnafu { table_name })?;
|
|
||||||
|
|
||||||
Ok(true)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tracing::instrument(skip_all)]
|
#[tracing::instrument(skip_all)]
|
||||||
pub async fn alter_table(
|
pub async fn alter_table(
|
||||||
&self,
|
&self,
|
||||||
@@ -1222,116 +927,10 @@ impl StatementExecutor {
|
|||||||
expr: AlterTableExpr,
|
expr: AlterTableExpr,
|
||||||
query_context: QueryContextRef,
|
query_context: QueryContextRef,
|
||||||
) -> Result<Output> {
|
) -> Result<Output> {
|
||||||
ensure!(
|
self.inserter
|
||||||
!is_readonly_schema(&expr.schema_name),
|
.schema_helper
|
||||||
SchemaReadOnlySnafu {
|
.alter_table_by_expr(expr, query_context)
|
||||||
name: expr.schema_name.clone()
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
let catalog_name = if expr.catalog_name.is_empty() {
|
|
||||||
DEFAULT_CATALOG_NAME.to_string()
|
|
||||||
} else {
|
|
||||||
expr.catalog_name.clone()
|
|
||||||
};
|
|
||||||
|
|
||||||
let schema_name = if expr.schema_name.is_empty() {
|
|
||||||
DEFAULT_SCHEMA_NAME.to_string()
|
|
||||||
} else {
|
|
||||||
expr.schema_name.clone()
|
|
||||||
};
|
|
||||||
|
|
||||||
let table_name = expr.table_name.clone();
|
|
||||||
|
|
||||||
let table = self
|
|
||||||
.catalog_manager
|
|
||||||
.table(
|
|
||||||
&catalog_name,
|
|
||||||
&schema_name,
|
|
||||||
&table_name,
|
|
||||||
Some(&query_context),
|
|
||||||
)
|
|
||||||
.await
|
.await
|
||||||
.context(CatalogSnafu)?
|
|
||||||
.with_context(|| TableNotFoundSnafu {
|
|
||||||
table_name: format_full_table_name(&catalog_name, &schema_name, &table_name),
|
|
||||||
})?;
|
|
||||||
|
|
||||||
let table_id = table.table_info().ident.table_id;
|
|
||||||
let need_alter = self.verify_alter(table_id, table.table_info(), expr.clone())?;
|
|
||||||
if !need_alter {
|
|
||||||
return Ok(Output::new_with_affected_rows(0));
|
|
||||||
}
|
|
||||||
info!(
|
|
||||||
"Table info before alter is {:?}, expr: {:?}",
|
|
||||||
table.table_info(),
|
|
||||||
expr
|
|
||||||
);
|
|
||||||
|
|
||||||
let physical_table_id = self
|
|
||||||
.table_metadata_manager
|
|
||||||
.table_route_manager()
|
|
||||||
.get_physical_table_id(table_id)
|
|
||||||
.await
|
|
||||||
.context(TableMetadataManagerSnafu)?;
|
|
||||||
|
|
||||||
let (req, invalidate_keys) = if physical_table_id == table_id {
|
|
||||||
// This is physical table
|
|
||||||
let req = SubmitDdlTaskRequest {
|
|
||||||
query_context,
|
|
||||||
task: DdlTask::new_alter_table(expr),
|
|
||||||
};
|
|
||||||
|
|
||||||
let invalidate_keys = vec![
|
|
||||||
CacheIdent::TableId(table_id),
|
|
||||||
CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
|
|
||||||
];
|
|
||||||
|
|
||||||
(req, invalidate_keys)
|
|
||||||
} else {
|
|
||||||
// This is logical table
|
|
||||||
let req = SubmitDdlTaskRequest {
|
|
||||||
query_context,
|
|
||||||
task: DdlTask::new_alter_logical_tables(vec![expr]),
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut invalidate_keys = vec![
|
|
||||||
CacheIdent::TableId(physical_table_id),
|
|
||||||
CacheIdent::TableId(table_id),
|
|
||||||
CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
|
|
||||||
];
|
|
||||||
|
|
||||||
let physical_table = self
|
|
||||||
.table_metadata_manager
|
|
||||||
.table_info_manager()
|
|
||||||
.get(physical_table_id)
|
|
||||||
.await
|
|
||||||
.context(TableMetadataManagerSnafu)?
|
|
||||||
.map(|x| x.into_inner());
|
|
||||||
if let Some(physical_table) = physical_table {
|
|
||||||
let physical_table_name = TableName::new(
|
|
||||||
physical_table.table_info.catalog_name,
|
|
||||||
physical_table.table_info.schema_name,
|
|
||||||
physical_table.table_info.name,
|
|
||||||
);
|
|
||||||
invalidate_keys.push(CacheIdent::TableName(physical_table_name));
|
|
||||||
}
|
|
||||||
|
|
||||||
(req, invalidate_keys)
|
|
||||||
};
|
|
||||||
|
|
||||||
self.procedure_executor
|
|
||||||
.submit_ddl_task(&ExecutorContext::default(), req)
|
|
||||||
.await
|
|
||||||
.context(error::ExecuteDdlSnafu)?;
|
|
||||||
|
|
||||||
// Invalidates local cache ASAP.
|
|
||||||
self.cache_invalidator
|
|
||||||
.invalidate(&Context::default(), &invalidate_keys)
|
|
||||||
.await
|
|
||||||
.context(error::InvalidateTableCacheSnafu)?;
|
|
||||||
|
|
||||||
Ok(Output::new_with_affected_rows(0))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip_all)]
|
#[tracing::instrument(skip_all)]
|
||||||
@@ -1386,58 +985,6 @@ impl StatementExecutor {
|
|||||||
Ok(Output::new_with_affected_rows(0))
|
Ok(Output::new_with_affected_rows(0))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn create_table_procedure(
|
|
||||||
&self,
|
|
||||||
create_table: CreateTableExpr,
|
|
||||||
partitions: Vec<Partition>,
|
|
||||||
table_info: RawTableInfo,
|
|
||||||
query_context: QueryContextRef,
|
|
||||||
) -> Result<SubmitDdlTaskResponse> {
|
|
||||||
let partitions = partitions.into_iter().map(Into::into).collect();
|
|
||||||
|
|
||||||
let request = SubmitDdlTaskRequest {
|
|
||||||
query_context,
|
|
||||||
task: DdlTask::new_create_table(create_table, partitions, table_info),
|
|
||||||
};
|
|
||||||
|
|
||||||
self.procedure_executor
|
|
||||||
.submit_ddl_task(&ExecutorContext::default(), request)
|
|
||||||
.await
|
|
||||||
.context(error::ExecuteDdlSnafu)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn create_logical_tables_procedure(
|
|
||||||
&self,
|
|
||||||
tables_data: Vec<(CreateTableExpr, RawTableInfo)>,
|
|
||||||
query_context: QueryContextRef,
|
|
||||||
) -> Result<SubmitDdlTaskResponse> {
|
|
||||||
let request = SubmitDdlTaskRequest {
|
|
||||||
query_context,
|
|
||||||
task: DdlTask::new_create_logical_tables(tables_data),
|
|
||||||
};
|
|
||||||
|
|
||||||
self.procedure_executor
|
|
||||||
.submit_ddl_task(&ExecutorContext::default(), request)
|
|
||||||
.await
|
|
||||||
.context(error::ExecuteDdlSnafu)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn alter_logical_tables_procedure(
|
|
||||||
&self,
|
|
||||||
tables_data: Vec<AlterTableExpr>,
|
|
||||||
query_context: QueryContextRef,
|
|
||||||
) -> Result<SubmitDdlTaskResponse> {
|
|
||||||
let request = SubmitDdlTaskRequest {
|
|
||||||
query_context,
|
|
||||||
task: DdlTask::new_alter_logical_tables(tables_data),
|
|
||||||
};
|
|
||||||
|
|
||||||
self.procedure_executor
|
|
||||||
.submit_ddl_task(&ExecutorContext::default(), request)
|
|
||||||
.await
|
|
||||||
.context(error::ExecuteDdlSnafu)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn drop_table_procedure(
|
async fn drop_table_procedure(
|
||||||
&self,
|
&self,
|
||||||
table_name: &TableName,
|
table_name: &TableName,
|
||||||
@@ -1585,8 +1132,61 @@ impl StatementExecutor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Verifies an alter and returns whether it is necessary to perform the alter.
|
||||||
|
///
|
||||||
|
/// # Returns
|
||||||
|
///
|
||||||
|
/// Returns true if the alter need to be porformed; otherwise, it returns false.
|
||||||
|
pub(crate) fn verify_alter(
|
||||||
|
table_id: TableId,
|
||||||
|
table_info: Arc<TableInfo>,
|
||||||
|
expr: AlterTableExpr,
|
||||||
|
) -> Result<bool> {
|
||||||
|
let request: AlterTableRequest =
|
||||||
|
common_grpc_expr::alter_expr_to_request(table_id, expr).context(AlterExprToRequestSnafu)?;
|
||||||
|
|
||||||
|
let AlterTableRequest {
|
||||||
|
table_name,
|
||||||
|
alter_kind,
|
||||||
|
..
|
||||||
|
} = &request;
|
||||||
|
|
||||||
|
if let AlterKind::RenameTable { new_table_name } = alter_kind {
|
||||||
|
ensure!(
|
||||||
|
NAME_PATTERN_REG.is_match(new_table_name),
|
||||||
|
error::UnexpectedSnafu {
|
||||||
|
violated: format!("Invalid table name: {}", new_table_name)
|
||||||
|
}
|
||||||
|
);
|
||||||
|
} else if let AlterKind::AddColumns { columns } = alter_kind {
|
||||||
|
// If all the columns are marked as add_if_not_exists and they already exist in the table,
|
||||||
|
// there is no need to perform the alter.
|
||||||
|
let column_names: HashSet<_> = table_info
|
||||||
|
.meta
|
||||||
|
.schema
|
||||||
|
.column_schemas()
|
||||||
|
.iter()
|
||||||
|
.map(|schema| &schema.name)
|
||||||
|
.collect();
|
||||||
|
if columns.iter().all(|column| {
|
||||||
|
column_names.contains(&column.column_schema.name) && column.add_if_not_exists
|
||||||
|
}) {
|
||||||
|
return Ok(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let _ = table_info
|
||||||
|
.meta
|
||||||
|
.builder_with_alter_kind(table_name, &request.alter_kind)
|
||||||
|
.context(error::TableSnafu)?
|
||||||
|
.build()
|
||||||
|
.context(error::BuildTableMetaSnafu { table_name })?;
|
||||||
|
|
||||||
|
Ok(true)
|
||||||
|
}
|
||||||
|
|
||||||
/// Parse partition statement [Partitions] into [MetaPartition] and partition columns.
|
/// Parse partition statement [Partitions] into [MetaPartition] and partition columns.
|
||||||
fn parse_partitions(
|
pub(crate) fn parse_partitions(
|
||||||
create_table: &CreateTableExpr,
|
create_table: &CreateTableExpr,
|
||||||
partitions: Option<Partitions>,
|
partitions: Option<Partitions>,
|
||||||
query_ctx: &QueryContextRef,
|
query_ctx: &QueryContextRef,
|
||||||
@@ -1619,7 +1219,7 @@ fn parse_partitions(
|
|||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn create_table_info(
|
pub(crate) fn create_table_info(
|
||||||
create_table: &CreateTableExpr,
|
create_table: &CreateTableExpr,
|
||||||
partition_columns: Vec<String>,
|
partition_columns: Vec<String>,
|
||||||
) -> Result<RawTableInfo> {
|
) -> Result<RawTableInfo> {
|
||||||
|
|||||||
@@ -88,7 +88,6 @@ impl PipelineOperator {
|
|||||||
catalog.to_string(),
|
catalog.to_string(),
|
||||||
Arc::new(PipelineTable::new(
|
Arc::new(PipelineTable::new(
|
||||||
self.inserter.clone(),
|
self.inserter.clone(),
|
||||||
self.statement_executor.clone(),
|
|
||||||
table,
|
table,
|
||||||
self.query_engine.clone(),
|
self.query_engine.clone(),
|
||||||
)),
|
)),
|
||||||
|
|||||||
@@ -30,7 +30,6 @@ use datatypes::timestamp::TimestampNanosecond;
|
|||||||
use datatypes::vectors::{StringVector, TimestampNanosecondVector, Vector};
|
use datatypes::vectors::{StringVector, TimestampNanosecondVector, Vector};
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
use operator::insert::InserterRef;
|
use operator::insert::InserterRef;
|
||||||
use operator::statement::StatementExecutorRef;
|
|
||||||
use query::dataframe::DataFrame;
|
use query::dataframe::DataFrame;
|
||||||
use query::QueryEngineRef;
|
use query::QueryEngineRef;
|
||||||
use session::context::{QueryContextBuilder, QueryContextRef};
|
use session::context::{QueryContextBuilder, QueryContextRef};
|
||||||
@@ -61,7 +60,6 @@ pub(crate) const EMPTY_SCHEMA_NAME: &str = "";
|
|||||||
/// Every catalog has its own pipeline table.
|
/// Every catalog has its own pipeline table.
|
||||||
pub struct PipelineTable {
|
pub struct PipelineTable {
|
||||||
inserter: InserterRef,
|
inserter: InserterRef,
|
||||||
statement_executor: StatementExecutorRef,
|
|
||||||
table: TableRef,
|
table: TableRef,
|
||||||
query_engine: QueryEngineRef,
|
query_engine: QueryEngineRef,
|
||||||
cache: PipelineCache,
|
cache: PipelineCache,
|
||||||
@@ -69,15 +67,9 @@ pub struct PipelineTable {
|
|||||||
|
|
||||||
impl PipelineTable {
|
impl PipelineTable {
|
||||||
/// Create a new PipelineTable.
|
/// Create a new PipelineTable.
|
||||||
pub fn new(
|
pub fn new(inserter: InserterRef, table: TableRef, query_engine: QueryEngineRef) -> Self {
|
||||||
inserter: InserterRef,
|
|
||||||
statement_executor: StatementExecutorRef,
|
|
||||||
table: TableRef,
|
|
||||||
query_engine: QueryEngineRef,
|
|
||||||
) -> Self {
|
|
||||||
Self {
|
Self {
|
||||||
inserter,
|
inserter,
|
||||||
statement_executor,
|
|
||||||
table,
|
table,
|
||||||
query_engine,
|
query_engine,
|
||||||
cache: PipelineCache::new(),
|
cache: PipelineCache::new(),
|
||||||
@@ -232,13 +224,7 @@ impl PipelineTable {
|
|||||||
|
|
||||||
let output = self
|
let output = self
|
||||||
.inserter
|
.inserter
|
||||||
.handle_row_inserts(
|
.handle_row_inserts(requests, Self::query_ctx(&table_info), false, false)
|
||||||
requests,
|
|
||||||
Self::query_ctx(&table_info),
|
|
||||||
&self.statement_executor,
|
|
||||||
false,
|
|
||||||
false,
|
|
||||||
)
|
|
||||||
.await
|
.await
|
||||||
.context(InsertPipelineSnafu)?;
|
.context(InsertPipelineSnafu)?;
|
||||||
|
|
||||||
|
|||||||
@@ -36,6 +36,7 @@ chrono.workspace = true
|
|||||||
common-base.workspace = true
|
common-base.workspace = true
|
||||||
common-catalog.workspace = true
|
common-catalog.workspace = true
|
||||||
common-config.workspace = true
|
common-config.workspace = true
|
||||||
|
common-datasource.workspace = true
|
||||||
common-error.workspace = true
|
common-error.workspace = true
|
||||||
common-frontend.workspace = true
|
common-frontend.workspace = true
|
||||||
common-grpc.workspace = true
|
common-grpc.workspace = true
|
||||||
@@ -74,11 +75,18 @@ jsonb.workspace = true
|
|||||||
lazy_static.workspace = true
|
lazy_static.workspace = true
|
||||||
log-query.workspace = true
|
log-query.workspace = true
|
||||||
loki-proto.workspace = true
|
loki-proto.workspace = true
|
||||||
|
metric-engine.workspace = true
|
||||||
mime_guess = "2.0"
|
mime_guess = "2.0"
|
||||||
|
mito-codec.workspace = true
|
||||||
|
mito2.workspace = true
|
||||||
notify.workspace = true
|
notify.workspace = true
|
||||||
object-pool = "0.5"
|
object-pool = "0.5"
|
||||||
|
object-store.workspace = true
|
||||||
once_cell.workspace = true
|
once_cell.workspace = true
|
||||||
openmetrics-parser = "0.4"
|
openmetrics-parser = "0.4"
|
||||||
|
operator.workspace = true
|
||||||
|
parquet.workspace = true
|
||||||
|
partition.workspace = true
|
||||||
simd-json.workspace = true
|
simd-json.workspace = true
|
||||||
socket2 = "0.5"
|
socket2 = "0.5"
|
||||||
# use crates.io version once the following PRs is merged into the nextest release
|
# use crates.io version once the following PRs is merged into the nextest release
|
||||||
|
|||||||
415
src/servers/src/access_layer.rs
Normal file
415
src/servers/src/access_layer.rs
Normal file
@@ -0,0 +1,415 @@
|
|||||||
|
// Copyright 2023 Greptime Team
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
use arrow::array::{
|
||||||
|
Array, PrimitiveArray, RecordBatch, TimestampMicrosecondArray, TimestampMillisecondArray,
|
||||||
|
TimestampNanosecondArray, TimestampSecondArray,
|
||||||
|
};
|
||||||
|
use arrow::datatypes::Int64Type;
|
||||||
|
use arrow_schema::TimeUnit;
|
||||||
|
use common_datasource::parquet_writer::AsyncWriter;
|
||||||
|
use datafusion::parquet::arrow::AsyncArrowWriter;
|
||||||
|
use mito2::sst::file::{FileId, FileMeta};
|
||||||
|
use mito2::sst::parquet::{DEFAULT_ROW_GROUP_SIZE, PARQUET_METADATA_KEY};
|
||||||
|
use object_store::config::ObjectStoreConfig;
|
||||||
|
use object_store::util::{join_dir, join_path};
|
||||||
|
use object_store::ObjectStore;
|
||||||
|
use parquet::basic::{Compression, Encoding, ZstdLevel};
|
||||||
|
use parquet::file::metadata::KeyValue;
|
||||||
|
use parquet::file::properties::WriterProperties;
|
||||||
|
use snafu::ResultExt;
|
||||||
|
use store_api::metadata::RegionMetadataRef;
|
||||||
|
use store_api::metric_engine_consts::DATA_REGION_SUBDIR;
|
||||||
|
use store_api::storage::RegionId;
|
||||||
|
|
||||||
|
use crate::batch_builder::physical_schema;
|
||||||
|
use crate::error;
|
||||||
|
|
||||||
|
type AsyncParquetWriter = AsyncArrowWriter<AsyncWriter>;
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct AccessLayerFactory {
|
||||||
|
object_store: ObjectStore,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AccessLayerFactory {
|
||||||
|
pub async fn new(config: &ObjectStoreConfig) -> error::Result<AccessLayerFactory> {
|
||||||
|
let object_store = object_store::factory::new_raw_object_store(config, "")
|
||||||
|
.await
|
||||||
|
.context(error::ObjectStoreSnafu)?;
|
||||||
|
Ok(Self { object_store })
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn create_sst_writer(
|
||||||
|
&self,
|
||||||
|
catalog: &str,
|
||||||
|
schema: &str,
|
||||||
|
region_metadata: RegionMetadataRef,
|
||||||
|
) -> error::Result<ParquetWriter> {
|
||||||
|
let region_dir = build_data_region_dir(catalog, schema, region_metadata.region_id);
|
||||||
|
let file_id = FileId::random();
|
||||||
|
let file_path = join_path(®ion_dir, &file_id.as_parquet());
|
||||||
|
let writer = self
|
||||||
|
.object_store
|
||||||
|
.writer(&file_path)
|
||||||
|
.await
|
||||||
|
.context(error::OpendalSnafu)?;
|
||||||
|
|
||||||
|
let schema = physical_schema();
|
||||||
|
|
||||||
|
let key_value_meta = KeyValue::new(
|
||||||
|
PARQUET_METADATA_KEY.to_string(),
|
||||||
|
region_metadata.to_json().unwrap(),
|
||||||
|
);
|
||||||
|
|
||||||
|
let props = WriterProperties::builder()
|
||||||
|
.set_key_value_metadata(Some(vec![key_value_meta]))
|
||||||
|
.set_compression(Compression::ZSTD(ZstdLevel::default()))
|
||||||
|
.set_encoding(Encoding::PLAIN)
|
||||||
|
.set_max_row_group_size(DEFAULT_ROW_GROUP_SIZE)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
let writer = AsyncParquetWriter::try_new(AsyncWriter::new(writer), schema, Some(props))
|
||||||
|
.context(error::ParquetSnafu)?;
|
||||||
|
Ok(ParquetWriter {
|
||||||
|
region_id: region_metadata.region_id,
|
||||||
|
file_id,
|
||||||
|
region_metadata,
|
||||||
|
writer,
|
||||||
|
timestamp_range: None,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct ParquetWriter {
|
||||||
|
region_id: RegionId,
|
||||||
|
file_id: FileId,
|
||||||
|
region_metadata: RegionMetadataRef,
|
||||||
|
writer: AsyncParquetWriter,
|
||||||
|
timestamp_range: Option<(i64, i64)>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ParquetWriter {
|
||||||
|
pub(crate) fn file_id(&self) -> FileId {
|
||||||
|
self.file_id
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ParquetWriter {
|
||||||
|
pub async fn write_record_batch(
|
||||||
|
&mut self,
|
||||||
|
batch: &RecordBatch,
|
||||||
|
timestamp_range: Option<(i64, i64)>,
|
||||||
|
) -> error::Result<()> {
|
||||||
|
if let Err(e) = self.writer.write(&batch).await.context(error::ParquetSnafu) {
|
||||||
|
common_telemetry::error!(e; "Region metadata: {:?}, batch schema: {:?}", self.region_metadata, batch.schema_ref());
|
||||||
|
return Err(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
let (batch_min, batch_max) =
|
||||||
|
get_or_calculate_timestamp_range(timestamp_range, batch, &self.region_metadata)?;
|
||||||
|
|
||||||
|
if let Some((min, max)) = &mut self.timestamp_range {
|
||||||
|
*min = (*min).min(batch_min);
|
||||||
|
*max = (*max).max(batch_max);
|
||||||
|
} else {
|
||||||
|
self.timestamp_range = Some((batch_min, batch_max));
|
||||||
|
};
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn finish(&mut self) -> error::Result<FileMeta> {
|
||||||
|
let (min, max) = self.timestamp_range.unwrap();
|
||||||
|
let timestamp_type = self
|
||||||
|
.region_metadata
|
||||||
|
.time_index_column()
|
||||||
|
.column_schema
|
||||||
|
.data_type
|
||||||
|
.as_timestamp()
|
||||||
|
.unwrap();
|
||||||
|
let min_ts = timestamp_type.create_timestamp(min);
|
||||||
|
let max_ts = timestamp_type.create_timestamp(max);
|
||||||
|
let file_meta = self.writer.finish().await.context(error::ParquetSnafu)?;
|
||||||
|
let meta = FileMeta {
|
||||||
|
region_id: self.region_id,
|
||||||
|
file_id: self.file_id,
|
||||||
|
time_range: (min_ts, max_ts),
|
||||||
|
level: 0,
|
||||||
|
file_size: self.writer.bytes_written() as u64,
|
||||||
|
available_indexes: Default::default(),
|
||||||
|
index_file_size: 0,
|
||||||
|
num_rows: file_meta.num_rows as u64,
|
||||||
|
num_row_groups: file_meta.row_groups.len() as u64,
|
||||||
|
sequence: None, //todo(hl): use flushed sequence here.
|
||||||
|
};
|
||||||
|
Ok(meta)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Builds the data region subdir for metric physical tables.
|
||||||
|
fn build_data_region_dir(catalog: &str, schema: &str, physical_region_id: RegionId) -> String {
|
||||||
|
let storage_path = common_meta::ddl::utils::region_storage_path(&catalog, &schema);
|
||||||
|
join_dir(
|
||||||
|
&store_api::path_utils::region_dir(&storage_path, physical_region_id),
|
||||||
|
DATA_REGION_SUBDIR,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_or_calculate_timestamp_range(
|
||||||
|
timestamp_range: Option<(i64, i64)>,
|
||||||
|
rb: &RecordBatch,
|
||||||
|
region_metadata: &RegionMetadataRef,
|
||||||
|
) -> error::Result<(i64, i64)> {
|
||||||
|
if let Some(range) = timestamp_range {
|
||||||
|
return Ok(range);
|
||||||
|
};
|
||||||
|
|
||||||
|
let ts = rb
|
||||||
|
.column_by_name(®ion_metadata.time_index_column().column_schema.name)
|
||||||
|
.expect("column not found");
|
||||||
|
let arrow::datatypes::DataType::Timestamp(unit, _) = ts.data_type() else {
|
||||||
|
unreachable!("expected timestamp types");
|
||||||
|
};
|
||||||
|
let primitives: PrimitiveArray<Int64Type> = match unit {
|
||||||
|
TimeUnit::Second => ts
|
||||||
|
.as_any()
|
||||||
|
.downcast_ref::<TimestampSecondArray>()
|
||||||
|
.unwrap()
|
||||||
|
.reinterpret_cast(),
|
||||||
|
TimeUnit::Millisecond => ts
|
||||||
|
.as_any()
|
||||||
|
.downcast_ref::<TimestampMillisecondArray>()
|
||||||
|
.unwrap()
|
||||||
|
.reinterpret_cast(),
|
||||||
|
TimeUnit::Microsecond => ts
|
||||||
|
.as_any()
|
||||||
|
.downcast_ref::<TimestampMicrosecondArray>()
|
||||||
|
.unwrap()
|
||||||
|
.reinterpret_cast(),
|
||||||
|
TimeUnit::Nanosecond => ts
|
||||||
|
.as_any()
|
||||||
|
.downcast_ref::<TimestampNanosecondArray>()
|
||||||
|
.unwrap()
|
||||||
|
.reinterpret_cast(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let min = arrow::compute::min(&primitives).unwrap();
|
||||||
|
let max = arrow::compute::max(&primitives).unwrap();
|
||||||
|
Ok((min, max))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use api::v1::SemanticType;
|
||||||
|
use arrow::array::{Float64Array, StringArray};
|
||||||
|
use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
|
||||||
|
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
|
||||||
|
use common_time::Timestamp;
|
||||||
|
use datatypes::data_type::ConcreteDataType;
|
||||||
|
use datatypes::schema::ColumnSchema;
|
||||||
|
use object_store::services::MemoryConfig;
|
||||||
|
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
|
||||||
|
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_build_data_region_dir_basic() {
|
||||||
|
let result = build_data_region_dir("greptime", "public", RegionId::new(1024, 0));
|
||||||
|
assert_eq!(&result, "data/greptime/public/1024/1024_0000000000/data/");
|
||||||
|
}
|
||||||
|
|
||||||
|
fn create_test_region_metadata() -> RegionMetadataRef {
|
||||||
|
let mut builder = RegionMetadataBuilder::new(RegionId::new(1024, 0));
|
||||||
|
builder
|
||||||
|
.push_column_metadata(ColumnMetadata {
|
||||||
|
column_schema: ColumnSchema::new(
|
||||||
|
GREPTIME_TIMESTAMP,
|
||||||
|
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||||
|
false,
|
||||||
|
),
|
||||||
|
semantic_type: SemanticType::Timestamp,
|
||||||
|
column_id: 1,
|
||||||
|
})
|
||||||
|
.push_column_metadata(ColumnMetadata {
|
||||||
|
column_schema: ColumnSchema::new(
|
||||||
|
GREPTIME_VALUE,
|
||||||
|
ConcreteDataType::float64_datatype(),
|
||||||
|
true,
|
||||||
|
),
|
||||||
|
semantic_type: SemanticType::Field,
|
||||||
|
column_id: 2,
|
||||||
|
})
|
||||||
|
.push_column_metadata(ColumnMetadata {
|
||||||
|
column_schema: ColumnSchema::new("tag", ConcreteDataType::string_datatype(), true),
|
||||||
|
semantic_type: SemanticType::Tag,
|
||||||
|
column_id: 3,
|
||||||
|
})
|
||||||
|
.primary_key(vec![3]);
|
||||||
|
let metadata = builder.build().unwrap();
|
||||||
|
Arc::new(metadata)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn create_test_record_batch() -> RecordBatch {
|
||||||
|
let schema = Arc::new(Schema::new(vec![
|
||||||
|
Field::new(
|
||||||
|
GREPTIME_TIMESTAMP,
|
||||||
|
DataType::Timestamp(TimeUnit::Millisecond, None),
|
||||||
|
false,
|
||||||
|
),
|
||||||
|
Field::new(GREPTIME_VALUE, DataType::Float64, true),
|
||||||
|
Field::new("tag", DataType::Utf8, true),
|
||||||
|
]));
|
||||||
|
|
||||||
|
let timestamp_array = TimestampMillisecondArray::from(vec![1000, 2000, 3000]);
|
||||||
|
let value_array = Float64Array::from(vec![Some(10.0), None, Some(30.0)]);
|
||||||
|
let tag_array = StringArray::from(vec![Some("a"), Some("b"), Some("c")]);
|
||||||
|
|
||||||
|
RecordBatch::try_new(
|
||||||
|
schema,
|
||||||
|
vec![
|
||||||
|
Arc::new(timestamp_array),
|
||||||
|
Arc::new(value_array),
|
||||||
|
Arc::new(tag_array),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
.unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_parquet_writer_write_and_finish() {
|
||||||
|
let object_store = ObjectStore::from_config(MemoryConfig::default())
|
||||||
|
.unwrap()
|
||||||
|
.finish();
|
||||||
|
let factory = AccessLayerFactory { object_store };
|
||||||
|
|
||||||
|
let region_metadata = create_test_region_metadata();
|
||||||
|
let mut writer = factory
|
||||||
|
.create_sst_writer("test_catalog", "test_schema", region_metadata.clone())
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let batch = create_test_record_batch();
|
||||||
|
|
||||||
|
// Test writing a record batch
|
||||||
|
writer.write_record_batch(&batch, None).await.unwrap();
|
||||||
|
|
||||||
|
// Test finishing the writer
|
||||||
|
let file_meta = writer.finish().await.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(file_meta.region_id, RegionId::new(1024, 0));
|
||||||
|
assert_eq!(file_meta.level, 0);
|
||||||
|
assert_eq!(file_meta.num_rows, 3);
|
||||||
|
assert_eq!(file_meta.num_row_groups, 1);
|
||||||
|
assert!(file_meta.file_size > 0);
|
||||||
|
|
||||||
|
assert_eq!(file_meta.time_range.0, Timestamp::new_millisecond(1000));
|
||||||
|
assert_eq!(file_meta.time_range.1, Timestamp::new_millisecond(3000));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_parquet_writer_multiple_batches() {
|
||||||
|
let object_store = ObjectStore::from_config(MemoryConfig::default())
|
||||||
|
.unwrap()
|
||||||
|
.finish();
|
||||||
|
let factory = AccessLayerFactory { object_store };
|
||||||
|
|
||||||
|
let region_metadata = create_test_region_metadata();
|
||||||
|
let mut writer = factory
|
||||||
|
.create_sst_writer("test_catalog", "test_schema", region_metadata.clone())
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// Write first batch
|
||||||
|
let batch1 = create_test_record_batch();
|
||||||
|
writer.write_record_batch(&batch1, None).await.unwrap();
|
||||||
|
|
||||||
|
// Create second batch with different timestamp range
|
||||||
|
let schema = region_metadata.schema.arrow_schema().clone();
|
||||||
|
let timestamp_array = TimestampMillisecondArray::from(vec![4000, 5000]);
|
||||||
|
let value_array = Float64Array::from(vec![Some(40.0), Some(50.0)]);
|
||||||
|
let tag_array = StringArray::from(vec![Some("d"), Some("e")]);
|
||||||
|
|
||||||
|
let batch2 = RecordBatch::try_new(
|
||||||
|
schema,
|
||||||
|
vec![
|
||||||
|
Arc::new(timestamp_array),
|
||||||
|
Arc::new(value_array),
|
||||||
|
Arc::new(tag_array),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
writer.write_record_batch(&batch2, None).await.unwrap();
|
||||||
|
|
||||||
|
let file_meta = writer.finish().await.unwrap();
|
||||||
|
|
||||||
|
// Should have combined rows from both batches
|
||||||
|
assert_eq!(file_meta.num_rows, 5);
|
||||||
|
assert_eq!(file_meta.time_range.0, Timestamp::new_millisecond(1000));
|
||||||
|
assert_eq!(file_meta.time_range.1, Timestamp::new_millisecond(5000));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_parquet_writer_with_provided_timestamp_range() {
|
||||||
|
let object_store = ObjectStore::from_config(MemoryConfig::default())
|
||||||
|
.unwrap()
|
||||||
|
.finish();
|
||||||
|
let factory = AccessLayerFactory { object_store };
|
||||||
|
|
||||||
|
let region_metadata = create_test_region_metadata();
|
||||||
|
let mut writer = factory
|
||||||
|
.create_sst_writer("test_catalog", "test_schema", region_metadata.clone())
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let batch = create_test_record_batch();
|
||||||
|
|
||||||
|
// Provide explicit timestamp range that differs from actual data
|
||||||
|
let provided_range = (500, 6000);
|
||||||
|
writer
|
||||||
|
.write_record_batch(&batch, Some(provided_range))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let file_meta = writer.finish().await.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(file_meta.time_range.0, Timestamp::new_millisecond(500));
|
||||||
|
assert_eq!(file_meta.time_range.1, Timestamp::new_millisecond(6000));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_get_or_calculate_timestamp_range_with_provided_range() {
|
||||||
|
let region_metadata = create_test_region_metadata();
|
||||||
|
let batch = create_test_record_batch();
|
||||||
|
|
||||||
|
let provided_range = Some((100, 200));
|
||||||
|
let result = get_or_calculate_timestamp_range(provided_range, &batch, ®ion_metadata);
|
||||||
|
|
||||||
|
assert!(result.is_ok());
|
||||||
|
assert_eq!(result.unwrap(), (100, 200));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_get_or_calculate_timestamp_range_calculated() {
|
||||||
|
let region_metadata = create_test_region_metadata();
|
||||||
|
let batch = create_test_record_batch();
|
||||||
|
|
||||||
|
let result = get_or_calculate_timestamp_range(None, &batch, ®ion_metadata);
|
||||||
|
|
||||||
|
assert!(result.is_ok());
|
||||||
|
assert_eq!(result.unwrap(), (1000, 3000));
|
||||||
|
}
|
||||||
|
}
|
||||||
604
src/servers/src/batch_builder.rs
Normal file
604
src/servers/src/batch_builder.rs
Normal file
@@ -0,0 +1,604 @@
|
|||||||
|
// Copyright 2023 Greptime Team
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
use std::collections::{HashMap, HashSet};
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use api::v1::value::ValueData;
|
||||||
|
use api::v1::{ColumnDataType, ColumnSchema, OpType, SemanticType};
|
||||||
|
use arrow::array::{
|
||||||
|
ArrayBuilder, ArrayRef, BinaryBuilder, Float64Array, RecordBatch, TimestampMillisecondArray,
|
||||||
|
UInt64Array, UInt8Array,
|
||||||
|
};
|
||||||
|
use arrow::compute;
|
||||||
|
use arrow_schema::Field;
|
||||||
|
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||||
|
use common_meta::node_manager::NodeManagerRef;
|
||||||
|
use common_query::prelude::{GREPTIME_PHYSICAL_TABLE, GREPTIME_TIMESTAMP, GREPTIME_VALUE};
|
||||||
|
use common_telemetry::info;
|
||||||
|
use itertools::Itertools;
|
||||||
|
use metric_engine::row_modifier::{RowModifier, RowsIter};
|
||||||
|
use mito_codec::row_converter::SparsePrimaryKeyCodec;
|
||||||
|
use operator::schema_helper::{
|
||||||
|
ensure_logical_tables_for_metrics, metadatas_for_region_ids, LogicalSchema, LogicalSchemas,
|
||||||
|
SchemaHelper,
|
||||||
|
};
|
||||||
|
use partition::manager::PartitionRuleManagerRef;
|
||||||
|
use session::context::QueryContextRef;
|
||||||
|
use snafu::{OptionExt, ResultExt};
|
||||||
|
use store_api::metadata::RegionMetadataRef;
|
||||||
|
use store_api::storage::consts::{
|
||||||
|
ReservedColumnId, OP_TYPE_COLUMN_NAME, PRIMARY_KEY_COLUMN_NAME, SEQUENCE_COLUMN_NAME,
|
||||||
|
};
|
||||||
|
use store_api::storage::{ColumnId, RegionId};
|
||||||
|
use table::metadata::TableId;
|
||||||
|
|
||||||
|
use crate::error;
|
||||||
|
use crate::prom_row_builder::{PromCtx, TableBuilder};
|
||||||
|
|
||||||
|
pub struct MetricsBatchBuilder {
|
||||||
|
schema_helper: SchemaHelper,
|
||||||
|
builders:
|
||||||
|
HashMap<String /*schema*/, HashMap<RegionId /*physical table name*/, BatchEncoder>>,
|
||||||
|
partition_manager: PartitionRuleManagerRef,
|
||||||
|
node_manager: NodeManagerRef,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MetricsBatchBuilder {
|
||||||
|
pub fn new(
|
||||||
|
schema_helper: SchemaHelper,
|
||||||
|
partition_manager: PartitionRuleManagerRef,
|
||||||
|
node_manager: NodeManagerRef,
|
||||||
|
) -> Self {
|
||||||
|
MetricsBatchBuilder {
|
||||||
|
schema_helper,
|
||||||
|
builders: Default::default(),
|
||||||
|
partition_manager,
|
||||||
|
node_manager,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Detected the DDL requirements according to the staged table rows.
|
||||||
|
pub async fn create_or_alter_physical_tables(
|
||||||
|
&self,
|
||||||
|
tables: &HashMap<PromCtx, HashMap<String, TableBuilder>>,
|
||||||
|
query_ctx: &QueryContextRef,
|
||||||
|
) -> error::Result<()> {
|
||||||
|
// Physical table name -> logical tables -> tags in logical table
|
||||||
|
let mut tags: HashMap<String, HashMap<String, HashSet<String>>> = HashMap::default();
|
||||||
|
let catalog = query_ctx.current_catalog();
|
||||||
|
let schema = query_ctx.current_schema();
|
||||||
|
|
||||||
|
for (ctx, tables) in tables {
|
||||||
|
for (logical_table_name, table_builder) in tables {
|
||||||
|
let physical_table_name = self
|
||||||
|
.determine_physical_table_name(
|
||||||
|
logical_table_name,
|
||||||
|
&ctx.physical_table,
|
||||||
|
catalog,
|
||||||
|
&schema,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
tags.entry(physical_table_name)
|
||||||
|
.or_default()
|
||||||
|
.entry(logical_table_name.clone())
|
||||||
|
.or_default()
|
||||||
|
.extend(table_builder.tags().cloned());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let logical_schemas = tags_to_logical_schemas(tags);
|
||||||
|
ensure_logical_tables_for_metrics(&self.schema_helper, &logical_schemas, query_ctx)
|
||||||
|
.await
|
||||||
|
.context(error::OperatorSnafu)?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Finds physical table id for logical table.
|
||||||
|
async fn determine_physical_table_name(
|
||||||
|
&self,
|
||||||
|
logical_table_name: &str,
|
||||||
|
physical_table_name: &Option<String>,
|
||||||
|
catalog: &str,
|
||||||
|
schema: &str,
|
||||||
|
) -> error::Result<String> {
|
||||||
|
let logical_table = self
|
||||||
|
.schema_helper
|
||||||
|
.get_table(catalog, schema, logical_table_name)
|
||||||
|
.await
|
||||||
|
.context(error::OperatorSnafu)?;
|
||||||
|
if let Some(logical_table) = logical_table {
|
||||||
|
// logical table already exist, just return the physical table
|
||||||
|
let logical_table_id = logical_table.table_info().table_id();
|
||||||
|
let physical_table_id = self
|
||||||
|
.schema_helper
|
||||||
|
.table_route_manager()
|
||||||
|
.get_physical_table_id(logical_table_id)
|
||||||
|
.await
|
||||||
|
.context(error::CommonMetaSnafu)?;
|
||||||
|
let physical_table = self
|
||||||
|
.schema_helper
|
||||||
|
.catalog_manager()
|
||||||
|
.tables_by_ids(catalog, schema, &[physical_table_id])
|
||||||
|
.await
|
||||||
|
.context(error::CatalogSnafu)?
|
||||||
|
.swap_remove(0);
|
||||||
|
return Ok(physical_table.table_info().name.clone());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Logical table not exist, try assign logical table to a physical table.
|
||||||
|
let physical_table_name = physical_table_name
|
||||||
|
.as_deref()
|
||||||
|
.unwrap_or(GREPTIME_PHYSICAL_TABLE);
|
||||||
|
Ok(physical_table_name.to_string())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Retrieves physical region metadata of given logical table names.
|
||||||
|
///
|
||||||
|
/// The `logical_tables` is a list of table names, each entry contains the schema name and the table name.
|
||||||
|
/// Returns the following mapping: `schema => logical table => (logical table id, region 0 metadata of the physical table)`.
|
||||||
|
pub(crate) async fn collect_physical_region_metadata(
|
||||||
|
&self,
|
||||||
|
logical_tables: &[(String, String)],
|
||||||
|
query_ctx: &QueryContextRef,
|
||||||
|
) -> error::Result<HashMap<String, HashMap<String, (TableId, RegionMetadataRef)>>> {
|
||||||
|
let catalog = query_ctx.current_catalog();
|
||||||
|
// Logical and physical table ids.
|
||||||
|
let mut table_ids = Vec::with_capacity(logical_tables.len());
|
||||||
|
let mut physical_region_ids = HashSet::new();
|
||||||
|
for (schema, table_name) in logical_tables {
|
||||||
|
let logical_table = self
|
||||||
|
.schema_helper
|
||||||
|
.get_table(catalog, schema, table_name)
|
||||||
|
.await
|
||||||
|
.context(error::OperatorSnafu)?
|
||||||
|
.context(error::TableNotFoundSnafu {
|
||||||
|
catalog,
|
||||||
|
schema: schema,
|
||||||
|
table: table_name,
|
||||||
|
})?;
|
||||||
|
let logical_table_id = logical_table.table_info().table_id();
|
||||||
|
let physical_table_id = self
|
||||||
|
.schema_helper
|
||||||
|
.table_route_manager()
|
||||||
|
.get_physical_table_id(logical_table_id)
|
||||||
|
.await
|
||||||
|
.context(error::CommonMetaSnafu)?;
|
||||||
|
table_ids.push((logical_table_id, physical_table_id));
|
||||||
|
// We only get metadata from region 0.
|
||||||
|
physical_region_ids.insert(RegionId::new(physical_table_id, 0));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Batch get physical metadata.
|
||||||
|
let physical_region_ids = physical_region_ids.into_iter().collect_vec();
|
||||||
|
let region_metadatas = metadatas_for_region_ids(
|
||||||
|
&self.partition_manager,
|
||||||
|
&self.node_manager,
|
||||||
|
&physical_region_ids,
|
||||||
|
query_ctx,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.context(error::OperatorSnafu)?;
|
||||||
|
let mut result_map: HashMap<_, HashMap<_, _>> = HashMap::new();
|
||||||
|
let region_metadatas: HashMap<_, _> = region_metadatas
|
||||||
|
.into_iter()
|
||||||
|
.flatten()
|
||||||
|
.map(|meta| (meta.region_id, Arc::new(meta)))
|
||||||
|
.collect();
|
||||||
|
for (i, (schema, table_name)) in logical_tables.iter().enumerate() {
|
||||||
|
let physical_table_id = table_ids[i].1;
|
||||||
|
let physical_region_id = RegionId::new(physical_table_id, 0);
|
||||||
|
let physical_metadata =
|
||||||
|
region_metadatas.get(&physical_region_id).with_context(|| {
|
||||||
|
error::UnexpectedResultSnafu {
|
||||||
|
reason: format!(
|
||||||
|
"Physical region metadata {} for table {} not found",
|
||||||
|
physical_region_id, table_name
|
||||||
|
),
|
||||||
|
}
|
||||||
|
})?;
|
||||||
|
|
||||||
|
match result_map.get_mut(schema) {
|
||||||
|
Some(table_map) => {
|
||||||
|
table_map.insert(
|
||||||
|
table_name.clone(),
|
||||||
|
(table_ids[i].0, physical_metadata.clone()),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
let mut table_map = HashMap::new();
|
||||||
|
table_map.insert(
|
||||||
|
table_name.clone(),
|
||||||
|
(table_ids[i].0, physical_metadata.clone()),
|
||||||
|
);
|
||||||
|
result_map.insert(schema.to_string(), table_map);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(result_map)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Builds [RecordBatch] from rows with primary key encoded.
|
||||||
|
/// Potentially we also need to modify the column name of timestamp and value field to
|
||||||
|
/// match the schema of physical tables.
|
||||||
|
/// Note:
|
||||||
|
/// Make sure all logical table and physical table are created when reach here and the mapping
|
||||||
|
/// from logical table name to physical table ref is stored in [physical_region_metadata].
|
||||||
|
pub(crate) async fn append_rows_to_batch(
|
||||||
|
&mut self,
|
||||||
|
current_catalog: Option<String>,
|
||||||
|
current_schema: Option<String>,
|
||||||
|
table_data: &mut HashMap<PromCtx, HashMap<String, TableBuilder>>,
|
||||||
|
physical_region_metadata: &HashMap<
|
||||||
|
String, /*schema name*/
|
||||||
|
HashMap<
|
||||||
|
String, /*logical table name*/
|
||||||
|
(TableId /*logical table id*/, RegionMetadataRef),
|
||||||
|
>,
|
||||||
|
>,
|
||||||
|
) -> error::Result<()> {
|
||||||
|
for (ctx, tables_in_schema) in table_data {
|
||||||
|
// use session catalog.
|
||||||
|
let catalog = current_catalog.as_deref().unwrap_or(DEFAULT_CATALOG_NAME);
|
||||||
|
// schema in PromCtx precedes session schema.
|
||||||
|
let schema = ctx
|
||||||
|
.schema
|
||||||
|
.as_deref()
|
||||||
|
.or(current_schema.as_deref())
|
||||||
|
.unwrap_or(DEFAULT_SCHEMA_NAME);
|
||||||
|
// Look up physical region metadata by schema and table name
|
||||||
|
let schema_metadata =
|
||||||
|
physical_region_metadata
|
||||||
|
.get(schema)
|
||||||
|
.context(error::TableNotFoundSnafu {
|
||||||
|
catalog,
|
||||||
|
schema,
|
||||||
|
table: "",
|
||||||
|
})?;
|
||||||
|
|
||||||
|
for (logical_table_name, table) in tables_in_schema {
|
||||||
|
let (logical_table_id, physical_table) = schema_metadata
|
||||||
|
.get(logical_table_name)
|
||||||
|
.context(error::TableNotFoundSnafu {
|
||||||
|
catalog,
|
||||||
|
schema,
|
||||||
|
table: logical_table_name,
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let encoder = self
|
||||||
|
.builders
|
||||||
|
.entry(schema.to_string())
|
||||||
|
.or_default()
|
||||||
|
.entry(physical_table.region_id)
|
||||||
|
.or_insert_with(|| Self::create_sparse_encoder(&physical_table));
|
||||||
|
let name_to_id: HashMap<_, _> = physical_table
|
||||||
|
.column_metadatas
|
||||||
|
.iter()
|
||||||
|
.map(|c| (c.column_schema.name.clone(), c.column_id))
|
||||||
|
.collect();
|
||||||
|
let _ = std::mem::replace(encoder.name_to_id_mut(), name_to_id);
|
||||||
|
encoder.append_rows(*logical_table_id, std::mem::take(table))?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Finishes current record batch builder and returns record batches grouped by physical table id.
|
||||||
|
pub(crate) fn finish(
|
||||||
|
self,
|
||||||
|
) -> error::Result<
|
||||||
|
HashMap<
|
||||||
|
String, /*schema name*/
|
||||||
|
HashMap<RegionId /*physical region id*/, Vec<(RecordBatch, (i64, i64))>>,
|
||||||
|
>,
|
||||||
|
> {
|
||||||
|
let mut table_batches: HashMap<String, HashMap<RegionId, Vec<(RecordBatch, (i64, i64))>>> =
|
||||||
|
HashMap::with_capacity(self.builders.len());
|
||||||
|
|
||||||
|
for (schema_name, schema_tables) in self.builders {
|
||||||
|
let schema_batches = table_batches.entry(schema_name).or_default();
|
||||||
|
for (physical_region_id, table_data) in schema_tables {
|
||||||
|
let rb = table_data.finish()?;
|
||||||
|
if !rb.is_empty() {
|
||||||
|
schema_batches
|
||||||
|
.entry(physical_region_id)
|
||||||
|
.or_default()
|
||||||
|
.extend(rb);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(table_batches)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Creates Encoder that converts Rows into RecordBatch with primary key encoded.
|
||||||
|
fn create_sparse_encoder(physical_region_meta: &RegionMetadataRef) -> BatchEncoder {
|
||||||
|
let name_to_id: HashMap<_, _> = physical_region_meta
|
||||||
|
.column_metadatas
|
||||||
|
.iter()
|
||||||
|
.map(|c| (c.column_schema.name.clone(), c.column_id))
|
||||||
|
.collect();
|
||||||
|
BatchEncoder::new(name_to_id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct Columns {
|
||||||
|
encoded_primary_key_array_builder: BinaryBuilder,
|
||||||
|
timestamps: Vec<i64>,
|
||||||
|
value: Vec<f64>,
|
||||||
|
timestamp_range: Option<(i64, i64)>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Columns {
|
||||||
|
fn pk_offset(&self) -> usize {
|
||||||
|
self.encoded_primary_key_array_builder
|
||||||
|
.offsets_slice()
|
||||||
|
.last()
|
||||||
|
.copied()
|
||||||
|
.unwrap_or(0) as usize
|
||||||
|
}
|
||||||
|
|
||||||
|
fn estimated_size(&self) -> usize {
|
||||||
|
let value_size = self.encoded_primary_key_array_builder.values_slice().len();
|
||||||
|
let offset_size = self.encoded_primary_key_array_builder.offsets_slice().len() * 4;
|
||||||
|
let validity_sze = self
|
||||||
|
.encoded_primary_key_array_builder
|
||||||
|
.validity_slice()
|
||||||
|
.map(|v| v.len())
|
||||||
|
.unwrap_or(0);
|
||||||
|
let timestamp_size = self.timestamps.len() * 8 + std::mem::size_of::<Vec<i64>>();
|
||||||
|
let val_size = self.value.len() * 8 + std::mem::size_of::<Vec<f64>>();
|
||||||
|
value_size + offset_size + validity_sze + timestamp_size + val_size + size_of::<Self>()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn push(&mut self, pk: &[u8], val: f64, timestamp: i64) {
|
||||||
|
self.encoded_primary_key_array_builder.append_value(&pk);
|
||||||
|
self.value.push(val);
|
||||||
|
self.timestamps.push(timestamp);
|
||||||
|
if let Some((min, max)) = &mut self.timestamp_range {
|
||||||
|
*min = (*min).min(timestamp);
|
||||||
|
*max = (*max).max(timestamp);
|
||||||
|
} else {
|
||||||
|
self.timestamp_range = Some((timestamp, timestamp));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for Columns {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
encoded_primary_key_array_builder: BinaryBuilder::with_capacity(16, 0),
|
||||||
|
timestamps: Vec::with_capacity(16),
|
||||||
|
value: Vec::with_capacity(16),
|
||||||
|
timestamp_range: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
struct ColumnsBuilder {
|
||||||
|
columns: Vec<Columns>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ColumnsBuilder {
|
||||||
|
fn push(&mut self, pk: &[u8], val: f64, ts: i64) {
|
||||||
|
let last = match self.columns.last_mut() {
|
||||||
|
None => {
|
||||||
|
self.columns.push(Columns::default());
|
||||||
|
self.columns.last_mut().unwrap()
|
||||||
|
}
|
||||||
|
Some(last_builder) => {
|
||||||
|
if last_builder.pk_offset() + pk.len() >= i32::MAX as usize {
|
||||||
|
info!(
|
||||||
|
"Current builder is full {}, rows: {}/{}",
|
||||||
|
last_builder.pk_offset(),
|
||||||
|
last_builder.encoded_primary_key_array_builder.len(),
|
||||||
|
last_builder.timestamps.len()
|
||||||
|
);
|
||||||
|
// Current builder is full, create a new one
|
||||||
|
self.columns.push(Columns::default());
|
||||||
|
self.columns.last_mut().unwrap()
|
||||||
|
} else {
|
||||||
|
last_builder
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
last.push(pk, val, ts);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct BatchEncoder {
|
||||||
|
name_to_id: HashMap<String, ColumnId>,
|
||||||
|
pk_codec: SparsePrimaryKeyCodec,
|
||||||
|
columns_builder: ColumnsBuilder,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BatchEncoder {
|
||||||
|
fn new(name_to_id: HashMap<String, ColumnId>) -> BatchEncoder {
|
||||||
|
Self {
|
||||||
|
name_to_id,
|
||||||
|
pk_codec: SparsePrimaryKeyCodec::schemaless(),
|
||||||
|
columns_builder: ColumnsBuilder::default(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn estimated_size(&self) -> usize {
|
||||||
|
self.columns_builder
|
||||||
|
.columns
|
||||||
|
.iter()
|
||||||
|
.map(|v| v.estimated_size())
|
||||||
|
.sum()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn total_rows(&self) -> usize {
|
||||||
|
self.columns_builder
|
||||||
|
.columns
|
||||||
|
.iter()
|
||||||
|
.map(|v| v.timestamps.len())
|
||||||
|
.sum()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn name_to_id_mut(&mut self) -> &mut HashMap<String, ColumnId> {
|
||||||
|
&mut self.name_to_id
|
||||||
|
}
|
||||||
|
|
||||||
|
fn append_rows(
|
||||||
|
&mut self,
|
||||||
|
logical_table_id: TableId,
|
||||||
|
mut table_builder: TableBuilder,
|
||||||
|
) -> error::Result<()> {
|
||||||
|
// todo(hl): we can simplified the row iter because schema in TableBuilder is known (ts, val, tags...)
|
||||||
|
let row_insert_request = table_builder.as_row_insert_request("don't care".to_string());
|
||||||
|
|
||||||
|
let mut iter = RowsIter::new(row_insert_request.rows.unwrap(), &self.name_to_id);
|
||||||
|
|
||||||
|
let mut encode_buf = vec![];
|
||||||
|
for row in iter.iter_mut() {
|
||||||
|
let (table_id, ts_id) = RowModifier::fill_internal_columns(logical_table_id, &row);
|
||||||
|
let internal_columns = [
|
||||||
|
(
|
||||||
|
ReservedColumnId::table_id(),
|
||||||
|
api::helper::pb_value_to_value_ref(&table_id, &None),
|
||||||
|
),
|
||||||
|
(
|
||||||
|
ReservedColumnId::tsid(),
|
||||||
|
api::helper::pb_value_to_value_ref(&ts_id, &None),
|
||||||
|
),
|
||||||
|
];
|
||||||
|
self.pk_codec
|
||||||
|
.encode_to_vec(internal_columns.into_iter(), &mut encode_buf)
|
||||||
|
.context(error::EncodePrimaryKeySnafu)?;
|
||||||
|
self.pk_codec
|
||||||
|
.encode_to_vec(row.primary_keys(), &mut encode_buf)
|
||||||
|
.context(error::EncodePrimaryKeySnafu)?;
|
||||||
|
// safety: field values cannot be null in prom remote write
|
||||||
|
let ValueData::F64Value(val) = row.value_at(1).value_data.as_ref().unwrap() else {
|
||||||
|
return error::InvalidFieldValueTypeSnafu.fail();
|
||||||
|
};
|
||||||
|
// process timestamp and field. We already know the position of timestamps and values in [TableBuilder].
|
||||||
|
let ValueData::TimestampMillisecondValue(ts) =
|
||||||
|
// safety: timestamp values cannot be null
|
||||||
|
row.value_at(0).value_data.as_ref().unwrap()
|
||||||
|
else {
|
||||||
|
return error::InvalidTimestampValueTypeSnafu.fail();
|
||||||
|
};
|
||||||
|
|
||||||
|
self.columns_builder.push(&encode_buf, *val, *ts);
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn finish(self) -> error::Result<Vec<(RecordBatch, (i64, i64))>> {
|
||||||
|
if self.columns_builder.columns.is_empty() {
|
||||||
|
return Ok(vec![]);
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut res = Vec::with_capacity(self.columns_builder.columns.len());
|
||||||
|
|
||||||
|
for mut columns in self.columns_builder.columns {
|
||||||
|
let num_rows = columns.timestamps.len();
|
||||||
|
let value = Float64Array::from(columns.value);
|
||||||
|
let timestamp = TimestampMillisecondArray::from(columns.timestamps);
|
||||||
|
|
||||||
|
let op_type = Arc::new(UInt8Array::from_value(OpType::Put as u8, num_rows)) as ArrayRef;
|
||||||
|
// todo: now we set sequence all to 0.
|
||||||
|
let sequence = Arc::new(UInt64Array::from_value(0, num_rows)) as ArrayRef;
|
||||||
|
|
||||||
|
let pk = columns.encoded_primary_key_array_builder.finish();
|
||||||
|
let indices = compute::sort_to_indices(&pk, None, None).context(error::ArrowSnafu)?;
|
||||||
|
|
||||||
|
// Sort arrays
|
||||||
|
let value = compute::take(&value, &indices, None).context(error::ArrowSnafu)?;
|
||||||
|
let ts = compute::take(×tamp, &indices, None).context(error::ArrowSnafu)?;
|
||||||
|
let pk = compute::take(&pk, &indices, None).context(error::ArrowSnafu)?;
|
||||||
|
let rb =
|
||||||
|
RecordBatch::try_new(physical_schema(), vec![value, ts, pk, sequence, op_type])
|
||||||
|
.context(error::ArrowSnafu)?;
|
||||||
|
res.push((rb, columns.timestamp_range.unwrap()))
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(res)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn tags_to_logical_schemas(
|
||||||
|
tags: HashMap<String, HashMap<String, HashSet<String>>>,
|
||||||
|
) -> LogicalSchemas {
|
||||||
|
let schemas: HashMap<String, Vec<LogicalSchema>> = tags
|
||||||
|
.into_iter()
|
||||||
|
.map(|(physical, logical_tables)| {
|
||||||
|
let schemas: Vec<_> = logical_tables
|
||||||
|
.into_iter()
|
||||||
|
.map(|(logical, tags)| {
|
||||||
|
let mut columns: Vec<_> = tags
|
||||||
|
.into_iter()
|
||||||
|
.map(|tag_name| ColumnSchema {
|
||||||
|
column_name: tag_name,
|
||||||
|
datatype: ColumnDataType::String as i32,
|
||||||
|
semantic_type: SemanticType::Tag as i32,
|
||||||
|
..Default::default()
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
columns.push(ColumnSchema {
|
||||||
|
column_name: GREPTIME_TIMESTAMP.to_string(),
|
||||||
|
datatype: ColumnDataType::TimestampMillisecond as i32,
|
||||||
|
semantic_type: SemanticType::Timestamp as i32,
|
||||||
|
..Default::default()
|
||||||
|
});
|
||||||
|
columns.push(ColumnSchema {
|
||||||
|
column_name: GREPTIME_VALUE.to_string(),
|
||||||
|
datatype: ColumnDataType::Float64 as i32,
|
||||||
|
semantic_type: SemanticType::Field as i32,
|
||||||
|
..Default::default()
|
||||||
|
});
|
||||||
|
LogicalSchema {
|
||||||
|
name: logical,
|
||||||
|
columns,
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
(physical, schemas)
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
LogicalSchemas { schemas }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Creates the schema of output record batch.
|
||||||
|
pub fn physical_schema() -> arrow::datatypes::SchemaRef {
|
||||||
|
Arc::new(arrow::datatypes::Schema::new(vec![
|
||||||
|
Field::new(GREPTIME_VALUE, arrow::datatypes::DataType::Float64, false),
|
||||||
|
Field::new(
|
||||||
|
GREPTIME_TIMESTAMP,
|
||||||
|
arrow::datatypes::DataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
|
||||||
|
false,
|
||||||
|
),
|
||||||
|
Field::new(
|
||||||
|
PRIMARY_KEY_COLUMN_NAME,
|
||||||
|
arrow::datatypes::DataType::Binary,
|
||||||
|
false,
|
||||||
|
),
|
||||||
|
Field::new(
|
||||||
|
SEQUENCE_COLUMN_NAME,
|
||||||
|
arrow::datatypes::DataType::UInt64,
|
||||||
|
false,
|
||||||
|
),
|
||||||
|
Field::new(
|
||||||
|
OP_TYPE_COLUMN_NAME,
|
||||||
|
arrow::datatypes::DataType::UInt8,
|
||||||
|
false,
|
||||||
|
),
|
||||||
|
]))
|
||||||
|
}
|
||||||
@@ -624,6 +624,64 @@ pub enum Error {
|
|||||||
|
|
||||||
#[snafu(display("Unknown hint: {}", hint))]
|
#[snafu(display("Unknown hint: {}", hint))]
|
||||||
UnknownHint { hint: String },
|
UnknownHint { hint: String },
|
||||||
|
|
||||||
|
#[snafu(display("Failed to invoke common_meta"))]
|
||||||
|
CommonMeta {
|
||||||
|
source: common_meta::error::Error,
|
||||||
|
#[snafu(implicit)]
|
||||||
|
location: Location,
|
||||||
|
},
|
||||||
|
|
||||||
|
#[snafu(display("Failed to invoke operator"))]
|
||||||
|
Operator {
|
||||||
|
source: operator::error::Error,
|
||||||
|
#[snafu(implicit)]
|
||||||
|
location: Location,
|
||||||
|
},
|
||||||
|
|
||||||
|
#[snafu(display("Failed to encode primary key"))]
|
||||||
|
EncodePrimaryKey {
|
||||||
|
source: mito_codec::error::Error,
|
||||||
|
#[snafu(implicit)]
|
||||||
|
location: Location,
|
||||||
|
},
|
||||||
|
|
||||||
|
#[snafu(display(
|
||||||
|
"Invalid timestamp value type in row data, expected TimestampMillisecondValue"
|
||||||
|
))]
|
||||||
|
InvalidTimestampValueType {
|
||||||
|
#[snafu(implicit)]
|
||||||
|
location: Location,
|
||||||
|
},
|
||||||
|
|
||||||
|
#[snafu(display("Invalid field value type in row data, expected F64Value"))]
|
||||||
|
InvalidFieldValueType {
|
||||||
|
#[snafu(implicit)]
|
||||||
|
location: Location,
|
||||||
|
},
|
||||||
|
|
||||||
|
#[snafu(display("Failed to operate object store"))]
|
||||||
|
Opendal {
|
||||||
|
#[snafu(source)]
|
||||||
|
error: object_store::Error,
|
||||||
|
#[snafu(implicit)]
|
||||||
|
location: Location,
|
||||||
|
},
|
||||||
|
|
||||||
|
#[snafu(display("Failed to operate object store"))]
|
||||||
|
ObjectStore {
|
||||||
|
source: object_store::error::Error,
|
||||||
|
#[snafu(implicit)]
|
||||||
|
location: Location,
|
||||||
|
},
|
||||||
|
|
||||||
|
#[snafu(display("Failed to operate object store"))]
|
||||||
|
Parquet {
|
||||||
|
#[snafu(source)]
|
||||||
|
error: parquet::errors::ParquetError,
|
||||||
|
#[snafu(implicit)]
|
||||||
|
location: Location,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||||
@@ -747,6 +805,15 @@ impl ErrorExt for Error {
|
|||||||
DurationOverflow { .. } => StatusCode::InvalidArguments,
|
DurationOverflow { .. } => StatusCode::InvalidArguments,
|
||||||
|
|
||||||
HandleOtelArrowRequest { .. } => StatusCode::Internal,
|
HandleOtelArrowRequest { .. } => StatusCode::Internal,
|
||||||
|
CommonMeta { source, .. } => source.status_code(),
|
||||||
|
Operator { source, .. } => source.status_code(),
|
||||||
|
EncodePrimaryKey { source, .. } => source.status_code(),
|
||||||
|
InvalidTimestampValueType { .. } | InvalidFieldValueType { .. } => {
|
||||||
|
StatusCode::Unexpected
|
||||||
|
}
|
||||||
|
ObjectStore { source, .. } => source.status_code(),
|
||||||
|
Parquet { .. } => StatusCode::Internal,
|
||||||
|
Opendal { .. } => StatusCode::Internal,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -76,7 +76,6 @@ use crate::query_handler::sql::ServerSqlQueryHandlerRef;
|
|||||||
use crate::query_handler::{
|
use crate::query_handler::{
|
||||||
InfluxdbLineProtocolHandlerRef, JaegerQueryHandlerRef, LogQueryHandlerRef,
|
InfluxdbLineProtocolHandlerRef, JaegerQueryHandlerRef, LogQueryHandlerRef,
|
||||||
OpenTelemetryProtocolHandlerRef, OpentsdbProtocolHandlerRef, PipelineHandlerRef,
|
OpenTelemetryProtocolHandlerRef, OpentsdbProtocolHandlerRef, PipelineHandlerRef,
|
||||||
PromStoreProtocolHandlerRef,
|
|
||||||
};
|
};
|
||||||
use crate::server::Server;
|
use crate::server::Server;
|
||||||
|
|
||||||
@@ -566,20 +565,7 @@ impl HttpServerBuilder {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn with_prom_handler(
|
pub fn with_prom_handler(self, state: PromStoreState) -> Self {
|
||||||
self,
|
|
||||||
handler: PromStoreProtocolHandlerRef,
|
|
||||||
pipeline_handler: Option<PipelineHandlerRef>,
|
|
||||||
prom_store_with_metric_engine: bool,
|
|
||||||
prom_validation_mode: PromValidationMode,
|
|
||||||
) -> Self {
|
|
||||||
let state = PromStoreState {
|
|
||||||
prom_store_handler: handler,
|
|
||||||
pipeline_handler,
|
|
||||||
prom_store_with_metric_engine,
|
|
||||||
prom_validation_mode,
|
|
||||||
};
|
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
router: self.router.nest(
|
router: self.router.nest(
|
||||||
&format!("/{HTTP_API_VERSION}/prometheus"),
|
&format!("/{HTTP_API_VERSION}/prometheus"),
|
||||||
|
|||||||
@@ -12,7 +12,9 @@
|
|||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
|
use std::collections::HashMap;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use std::time::Instant;
|
||||||
|
|
||||||
use api::prom_store::remote::ReadRequest;
|
use api::prom_store::remote::ReadRequest;
|
||||||
use axum::body::Bytes;
|
use axum::body::Bytes;
|
||||||
@@ -22,22 +24,29 @@ use axum::response::IntoResponse;
|
|||||||
use axum::Extension;
|
use axum::Extension;
|
||||||
use axum_extra::TypedHeader;
|
use axum_extra::TypedHeader;
|
||||||
use common_catalog::consts::DEFAULT_SCHEMA_NAME;
|
use common_catalog::consts::DEFAULT_SCHEMA_NAME;
|
||||||
|
use common_meta::node_manager::NodeManagerRef;
|
||||||
use common_query::prelude::GREPTIME_PHYSICAL_TABLE;
|
use common_query::prelude::GREPTIME_PHYSICAL_TABLE;
|
||||||
use common_telemetry::tracing;
|
use common_telemetry::{info, tracing};
|
||||||
use hyper::HeaderMap;
|
use hyper::HeaderMap;
|
||||||
use lazy_static::lazy_static;
|
use lazy_static::lazy_static;
|
||||||
use object_pool::Pool;
|
use object_pool::Pool;
|
||||||
|
use operator::schema_helper::SchemaHelper;
|
||||||
|
use partition::manager::PartitionRuleManagerRef;
|
||||||
use pipeline::util::to_pipeline_version;
|
use pipeline::util::to_pipeline_version;
|
||||||
use pipeline::{ContextReq, PipelineDefinition};
|
use pipeline::{ContextReq, PipelineDefinition};
|
||||||
use prost::Message;
|
use prost::Message;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use session::context::{Channel, QueryContext};
|
use session::context::{Channel, QueryContext, QueryContextRef};
|
||||||
use snafu::prelude::*;
|
use snafu::prelude::*;
|
||||||
|
use tokio::sync::mpsc::Sender;
|
||||||
|
|
||||||
|
use crate::access_layer::AccessLayerFactory;
|
||||||
|
use crate::batch_builder::MetricsBatchBuilder;
|
||||||
use crate::error::{self, InternalSnafu, PipelineSnafu, Result};
|
use crate::error::{self, InternalSnafu, PipelineSnafu, Result};
|
||||||
use crate::http::extractor::PipelineInfo;
|
use crate::http::extractor::PipelineInfo;
|
||||||
use crate::http::header::{write_cost_header_map, GREPTIME_DB_HEADER_METRICS};
|
use crate::http::header::{write_cost_header_map, GREPTIME_DB_HEADER_METRICS};
|
||||||
use crate::http::PromValidationMode;
|
use crate::http::PromValidationMode;
|
||||||
|
use crate::prom_row_builder::{PromCtx, TableBuilder, TablesBuilder};
|
||||||
use crate::prom_store::{snappy_decompress, zstd_decompress};
|
use crate::prom_store::{snappy_decompress, zstd_decompress};
|
||||||
use crate::proto::{PromSeriesProcessor, PromWriteRequest};
|
use crate::proto::{PromSeriesProcessor, PromWriteRequest};
|
||||||
use crate::query_handler::{PipelineHandlerRef, PromStoreProtocolHandlerRef, PromStoreResponse};
|
use crate::query_handler::{PipelineHandlerRef, PromStoreProtocolHandlerRef, PromStoreResponse};
|
||||||
@@ -52,12 +61,174 @@ pub const DEFAULT_ENCODING: &str = "snappy";
|
|||||||
pub const VM_ENCODING: &str = "zstd";
|
pub const VM_ENCODING: &str = "zstd";
|
||||||
pub const VM_PROTO_VERSION: &str = "1";
|
pub const VM_PROTO_VERSION: &str = "1";
|
||||||
|
|
||||||
|
/// Additional states for bulk write requests.
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct PromBulkState {
|
||||||
|
pub schema_helper: SchemaHelper,
|
||||||
|
pub partition_manager: PartitionRuleManagerRef,
|
||||||
|
pub node_manager: NodeManagerRef,
|
||||||
|
pub access_layer_factory: AccessLayerFactory,
|
||||||
|
pub tx: Option<
|
||||||
|
Sender<(
|
||||||
|
QueryContextRef,
|
||||||
|
HashMap<PromCtx, HashMap<String, TableBuilder>>,
|
||||||
|
)>,
|
||||||
|
>,
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct PromStoreState {
|
pub struct PromStoreState {
|
||||||
pub prom_store_handler: PromStoreProtocolHandlerRef,
|
pub prom_store_handler: PromStoreProtocolHandlerRef,
|
||||||
pub pipeline_handler: Option<PipelineHandlerRef>,
|
pub pipeline_handler: Option<PipelineHandlerRef>,
|
||||||
pub prom_store_with_metric_engine: bool,
|
pub prom_store_with_metric_engine: bool,
|
||||||
pub prom_validation_mode: PromValidationMode,
|
pub prom_validation_mode: PromValidationMode,
|
||||||
|
pub bulk_state: Option<PromBulkState>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PromBulkState {
|
||||||
|
pub fn start_background_task(&mut self) {
|
||||||
|
let (tx, mut rx) = tokio::sync::mpsc::channel::<(
|
||||||
|
QueryContextRef,
|
||||||
|
HashMap<PromCtx, HashMap<String, TableBuilder>>,
|
||||||
|
)>(16);
|
||||||
|
|
||||||
|
self.tx = Some(tx);
|
||||||
|
let schema_helper = self.schema_helper.clone();
|
||||||
|
let partition_manager = self.partition_manager.clone();
|
||||||
|
let node_manager = self.node_manager.clone();
|
||||||
|
let access_layer_factory = self.access_layer_factory.clone();
|
||||||
|
|
||||||
|
let handle = tokio::spawn(async move {
|
||||||
|
loop {
|
||||||
|
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
|
||||||
|
let start = Instant::now();
|
||||||
|
let mut batch_builder = MetricsBatchBuilder::new(
|
||||||
|
schema_helper.clone(),
|
||||||
|
partition_manager.clone(),
|
||||||
|
node_manager.clone(),
|
||||||
|
);
|
||||||
|
let mut physical_region_metadata_total = HashMap::new();
|
||||||
|
let mut num_batches = 0;
|
||||||
|
while let Some((query_context, mut tables)) = rx.recv().await {
|
||||||
|
batch_builder
|
||||||
|
.create_or_alter_physical_tables(&tables, &query_context)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
info!(
|
||||||
|
"create_or_alter_physical_tables, elapsed time: {}ms",
|
||||||
|
start.elapsed().as_millis()
|
||||||
|
);
|
||||||
|
|
||||||
|
// Extract logical table names from tables for metadata collection
|
||||||
|
let current_schema = query_context.current_schema();
|
||||||
|
let logical_tables: Vec<(String, String)> = tables
|
||||||
|
.iter()
|
||||||
|
.flat_map(|(ctx, table_map)| {
|
||||||
|
let schema = ctx.schema.as_deref().unwrap_or(¤t_schema);
|
||||||
|
table_map
|
||||||
|
.keys()
|
||||||
|
.map(|table_name| (schema.to_string(), table_name.clone()))
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let start = Instant::now();
|
||||||
|
// Gather all region metadata for region 0 of physical tables.
|
||||||
|
let physical_region_metadata = batch_builder
|
||||||
|
.collect_physical_region_metadata(&logical_tables, &query_context)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
physical_region_metadata_total.extend(physical_region_metadata);
|
||||||
|
info!(
|
||||||
|
"collect_physical_region_metadata, elapsed time: {}ms",
|
||||||
|
start.elapsed().as_millis()
|
||||||
|
);
|
||||||
|
|
||||||
|
let start = Instant::now();
|
||||||
|
batch_builder
|
||||||
|
.append_rows_to_batch(
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
&mut tables,
|
||||||
|
&physical_region_metadata_total,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
num_batches += 1;
|
||||||
|
info!(
|
||||||
|
"append_rows_to_batch, elapsed time: {}ms, batches: {}",
|
||||||
|
num_batches,
|
||||||
|
start.elapsed().as_millis()
|
||||||
|
);
|
||||||
|
|
||||||
|
if num_batches >= 10 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let start = Instant::now();
|
||||||
|
let record_batches = batch_builder.finish().unwrap();
|
||||||
|
let physical_region_id_to_meta = physical_region_metadata_total
|
||||||
|
.into_iter()
|
||||||
|
.map(|(schema_name, tables)| {
|
||||||
|
let region_id_to_meta = tables
|
||||||
|
.into_values()
|
||||||
|
.map(|(_, physical_region_meta)| {
|
||||||
|
(physical_region_meta.region_id, physical_region_meta)
|
||||||
|
})
|
||||||
|
.collect::<HashMap<_, _>>();
|
||||||
|
(schema_name, region_id_to_meta)
|
||||||
|
})
|
||||||
|
.collect::<HashMap<_, _>>();
|
||||||
|
|
||||||
|
info!("Finishing batches cost: {}ms", start.elapsed().as_millis());
|
||||||
|
let start = Instant::now();
|
||||||
|
|
||||||
|
let mut tables_per_schema = HashMap::with_capacity(record_batches.len());
|
||||||
|
let mut file_metas = vec![];
|
||||||
|
for (schema_name, schema_batches) in record_batches {
|
||||||
|
let tables_in_schema =
|
||||||
|
tables_per_schema.entry(schema_name.clone()).or_insert(0);
|
||||||
|
*tables_in_schema = *tables_in_schema + 1;
|
||||||
|
let schema_regions = physical_region_id_to_meta
|
||||||
|
.get(&schema_name)
|
||||||
|
.expect("physical region schema not found");
|
||||||
|
for (physical_region_id, record_batches) in schema_batches {
|
||||||
|
let physical_region_metadata = schema_regions
|
||||||
|
.get(&physical_region_id)
|
||||||
|
.expect("physical region metadata not found");
|
||||||
|
for (rb, time_range) in record_batches {
|
||||||
|
let mut writer = access_layer_factory
|
||||||
|
.create_sst_writer(
|
||||||
|
"greptime", //todo(hl): use the catalog name in query context.
|
||||||
|
&schema_name,
|
||||||
|
physical_region_metadata.clone(),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let start = Instant::now();
|
||||||
|
info!("Created writer: {}", writer.file_id());
|
||||||
|
writer
|
||||||
|
.write_record_batch(&rb, Some(time_range))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let file_meta = writer.finish().await.unwrap();
|
||||||
|
info!(
|
||||||
|
"Finished writer: {}, elapsed time: {}ms",
|
||||||
|
writer.file_id(),
|
||||||
|
start.elapsed().as_millis()
|
||||||
|
);
|
||||||
|
file_metas.push(file_meta);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
info!(
|
||||||
|
"upload sst files, elapsed time: {}ms, schema num: {} tables_per_schema: {:?}, file_metas: {:?}",
|
||||||
|
start.elapsed().as_millis(),tables_per_schema.len(),tables_per_schema,file_metas
|
||||||
|
);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
@@ -98,6 +269,7 @@ pub async fn remote_write(
|
|||||||
pipeline_handler,
|
pipeline_handler,
|
||||||
prom_store_with_metric_engine,
|
prom_store_with_metric_engine,
|
||||||
prom_validation_mode,
|
prom_validation_mode,
|
||||||
|
bulk_state,
|
||||||
} = state;
|
} = state;
|
||||||
|
|
||||||
if let Some(_vm_handshake) = params.get_vm_proto_version {
|
if let Some(_vm_handshake) = params.get_vm_proto_version {
|
||||||
@@ -132,6 +304,32 @@ pub async fn remote_write(
|
|||||||
processor.set_pipeline(pipeline_handler, query_ctx.clone(), pipeline_def);
|
processor.set_pipeline(pipeline_handler, query_ctx.clone(), pipeline_def);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if let Some(state) = bulk_state {
|
||||||
|
let context = PromBulkContext {
|
||||||
|
schema_helper: state.schema_helper,
|
||||||
|
query_ctx: query_ctx.clone(),
|
||||||
|
partition_manager: state.partition_manager,
|
||||||
|
node_manager: state.node_manager,
|
||||||
|
access_layer_factory: state.access_layer_factory,
|
||||||
|
};
|
||||||
|
let builder = decode_remote_write_request_to_batch(
|
||||||
|
is_zstd,
|
||||||
|
body,
|
||||||
|
prom_validation_mode,
|
||||||
|
&mut processor,
|
||||||
|
context,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
state
|
||||||
|
.tx
|
||||||
|
.as_ref()
|
||||||
|
.unwrap()
|
||||||
|
.send((query_ctx, builder.tables))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
return Ok((StatusCode::NO_CONTENT, write_cost_header_map(0)).into_response());
|
||||||
|
}
|
||||||
|
|
||||||
let req =
|
let req =
|
||||||
decode_remote_write_request(is_zstd, body, prom_validation_mode, &mut processor).await?;
|
decode_remote_write_request(is_zstd, body, prom_validation_mode, &mut processor).await?;
|
||||||
|
|
||||||
@@ -202,6 +400,15 @@ fn try_decompress(is_zstd: bool, body: &[u8]) -> Result<Bytes> {
|
|||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Context for processing remote write requests in bulk mode.
|
||||||
|
pub struct PromBulkContext {
|
||||||
|
pub(crate) schema_helper: SchemaHelper,
|
||||||
|
pub(crate) query_ctx: QueryContextRef,
|
||||||
|
pub(crate) partition_manager: PartitionRuleManagerRef,
|
||||||
|
pub(crate) node_manager: NodeManagerRef,
|
||||||
|
pub(crate) access_layer_factory: AccessLayerFactory,
|
||||||
|
}
|
||||||
|
|
||||||
async fn decode_remote_write_request(
|
async fn decode_remote_write_request(
|
||||||
is_zstd: bool,
|
is_zstd: bool,
|
||||||
body: Bytes,
|
body: Bytes,
|
||||||
@@ -236,6 +443,38 @@ async fn decode_remote_write_request(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn decode_remote_write_request_to_batch(
|
||||||
|
is_zstd: bool,
|
||||||
|
body: Bytes,
|
||||||
|
prom_validation_mode: PromValidationMode,
|
||||||
|
processor: &mut PromSeriesProcessor,
|
||||||
|
bulk: PromBulkContext,
|
||||||
|
) -> Result<TablesBuilder> {
|
||||||
|
let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_ELAPSED.start_timer();
|
||||||
|
|
||||||
|
// due to vmagent's limitation, there is a chance that vmagent is
|
||||||
|
// sending content type wrong so we have to apply a fallback with decoding
|
||||||
|
// the content in another method.
|
||||||
|
//
|
||||||
|
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5301
|
||||||
|
// see https://github.com/GreptimeTeam/greptimedb/issues/3929
|
||||||
|
let buf = if let Ok(buf) = try_decompress(is_zstd, &body[..]) {
|
||||||
|
buf
|
||||||
|
} else {
|
||||||
|
// fallback to the other compression method
|
||||||
|
try_decompress(!is_zstd, &body[..])?
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut request = PROM_WRITE_REQUEST_POOL.pull(PromWriteRequest::default);
|
||||||
|
|
||||||
|
processor.use_pipeline = false;
|
||||||
|
request
|
||||||
|
.merge(buf, prom_validation_mode, processor)
|
||||||
|
.context(error::DecodePromRemoteRequestSnafu)?;
|
||||||
|
|
||||||
|
Ok(std::mem::take(&mut request.table_data))
|
||||||
|
}
|
||||||
|
|
||||||
async fn decode_remote_read_request(body: Bytes) -> Result<ReadRequest> {
|
async fn decode_remote_read_request(body: Bytes) -> Result<ReadRequest> {
|
||||||
let buf = snappy_decompress(&body[..])?;
|
let buf = snappy_decompress(&body[..])?;
|
||||||
|
|
||||||
|
|||||||
@@ -21,7 +21,10 @@
|
|||||||
use datafusion_expr::LogicalPlan;
|
use datafusion_expr::LogicalPlan;
|
||||||
use datatypes::schema::Schema;
|
use datatypes::schema::Schema;
|
||||||
|
|
||||||
|
pub mod access_layer;
|
||||||
pub mod addrs;
|
pub mod addrs;
|
||||||
|
#[allow(dead_code)]
|
||||||
|
mod batch_builder;
|
||||||
pub mod configurator;
|
pub mod configurator;
|
||||||
pub(crate) mod elasticsearch;
|
pub(crate) mod elasticsearch;
|
||||||
pub mod error;
|
pub mod error;
|
||||||
|
|||||||
@@ -13,16 +13,21 @@
|
|||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
use std::collections::hash_map::Entry;
|
use std::collections::hash_map::Entry;
|
||||||
|
use std::collections::HashMap;
|
||||||
use std::string::ToString;
|
use std::string::ToString;
|
||||||
|
use std::time::Instant;
|
||||||
|
|
||||||
use ahash::HashMap;
|
|
||||||
use api::prom_store::remote::Sample;
|
use api::prom_store::remote::Sample;
|
||||||
use api::v1::value::ValueData;
|
use api::v1::value::ValueData;
|
||||||
use api::v1::{ColumnDataType, ColumnSchema, Row, RowInsertRequest, Rows, SemanticType, Value};
|
use api::v1::{ColumnDataType, ColumnSchema, Row, RowInsertRequest, Rows, SemanticType, Value};
|
||||||
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
|
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
|
||||||
|
use common_telemetry::info;
|
||||||
use pipeline::{ContextOpt, ContextReq};
|
use pipeline::{ContextOpt, ContextReq};
|
||||||
use prost::DecodeError;
|
use prost::DecodeError;
|
||||||
|
|
||||||
|
use crate::batch_builder::MetricsBatchBuilder;
|
||||||
|
use crate::error::Result;
|
||||||
|
use crate::http::prom_store::PromBulkContext;
|
||||||
use crate::http::PromValidationMode;
|
use crate::http::PromValidationMode;
|
||||||
use crate::proto::{decode_string, PromLabel};
|
use crate::proto::{decode_string, PromLabel};
|
||||||
use crate::repeated_field::Clear;
|
use crate::repeated_field::Clear;
|
||||||
@@ -38,7 +43,7 @@ pub struct PromCtx {
|
|||||||
#[derive(Default, Debug)]
|
#[derive(Default, Debug)]
|
||||||
pub(crate) struct TablesBuilder {
|
pub(crate) struct TablesBuilder {
|
||||||
// schema -> table -> table_builder
|
// schema -> table -> table_builder
|
||||||
tables: HashMap<PromCtx, HashMap<String, TableBuilder>>,
|
pub tables: HashMap<PromCtx, HashMap<String, TableBuilder>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Clear for TablesBuilder {
|
impl Clear for TablesBuilder {
|
||||||
@@ -91,11 +96,113 @@ impl TablesBuilder {
|
|||||||
req
|
req
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Converts [TablesBuilder] to record batch and clears inner states.
|
||||||
|
pub(crate) async fn as_record_batches(&mut self, bulk_ctx: &PromBulkContext) -> Result<()> {
|
||||||
|
let mut batch_builder = MetricsBatchBuilder::new(
|
||||||
|
bulk_ctx.schema_helper.clone(),
|
||||||
|
bulk_ctx.partition_manager.clone(),
|
||||||
|
bulk_ctx.node_manager.clone(),
|
||||||
|
);
|
||||||
|
let mut tables = std::mem::take(&mut self.tables);
|
||||||
|
|
||||||
|
let start = Instant::now();
|
||||||
|
batch_builder
|
||||||
|
.create_or_alter_physical_tables(&tables, &bulk_ctx.query_ctx)
|
||||||
|
.await?;
|
||||||
|
info!(
|
||||||
|
"create_or_alter_physical_tables, elapsed time: {}ms",
|
||||||
|
start.elapsed().as_millis()
|
||||||
|
);
|
||||||
|
|
||||||
|
// Extract logical table names from tables for metadata collection
|
||||||
|
let current_schema = bulk_ctx.query_ctx.current_schema();
|
||||||
|
let logical_tables: Vec<(String, String)> = tables
|
||||||
|
.iter()
|
||||||
|
.flat_map(|(ctx, table_map)| {
|
||||||
|
let schema = ctx.schema.as_deref().unwrap_or(¤t_schema);
|
||||||
|
table_map
|
||||||
|
.keys()
|
||||||
|
.map(|table_name| (schema.to_string(), table_name.clone()))
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let start = Instant::now();
|
||||||
|
// Gather all region metadata for region 0 of physical tables.
|
||||||
|
let physical_region_metadata = batch_builder
|
||||||
|
.collect_physical_region_metadata(&logical_tables, &bulk_ctx.query_ctx)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
info!(
|
||||||
|
"collect_physical_region_metadata, elapsed time: {}ms",
|
||||||
|
start.elapsed().as_millis()
|
||||||
|
);
|
||||||
|
|
||||||
|
let start = Instant::now();
|
||||||
|
batch_builder
|
||||||
|
.append_rows_to_batch(None, None, &mut tables, &physical_region_metadata)
|
||||||
|
.await?;
|
||||||
|
let record_batches = batch_builder.finish()?;
|
||||||
|
info!(
|
||||||
|
"append_rows_to_batch, elapsed time: {}ms",
|
||||||
|
start.elapsed().as_millis()
|
||||||
|
);
|
||||||
|
|
||||||
|
let physical_region_id_to_meta = physical_region_metadata
|
||||||
|
.into_iter()
|
||||||
|
.map(|(schema_name, tables)| {
|
||||||
|
let region_id_to_meta = tables
|
||||||
|
.into_values()
|
||||||
|
.map(|(_, physical_region_meta)| {
|
||||||
|
(physical_region_meta.region_id, physical_region_meta)
|
||||||
|
})
|
||||||
|
.collect::<HashMap<_, _>>();
|
||||||
|
(schema_name, region_id_to_meta)
|
||||||
|
})
|
||||||
|
.collect::<HashMap<_, _>>();
|
||||||
|
|
||||||
|
let start = Instant::now();
|
||||||
|
|
||||||
|
let mut tables_per_schema = HashMap::with_capacity(record_batches.len());
|
||||||
|
let mut file_metas = vec![];
|
||||||
|
for (schema_name, schema_batches) in record_batches {
|
||||||
|
let tables_in_schema = tables_per_schema.entry(schema_name.clone()).or_insert(0);
|
||||||
|
*tables_in_schema = *tables_in_schema + 1;
|
||||||
|
let schema_regions = physical_region_id_to_meta
|
||||||
|
.get(&schema_name)
|
||||||
|
.expect("physical region metadata not found");
|
||||||
|
for (physical_region_id, record_batches) in schema_batches {
|
||||||
|
let physical_region_metadata = schema_regions
|
||||||
|
.get(&physical_region_id)
|
||||||
|
.expect("physical region metadata not found");
|
||||||
|
for (rb, time_range) in record_batches {
|
||||||
|
let mut writer = bulk_ctx
|
||||||
|
.access_layer_factory
|
||||||
|
.create_sst_writer(
|
||||||
|
"greptime", //todo(hl): use the catalog name in query context.
|
||||||
|
&schema_name,
|
||||||
|
physical_region_metadata.clone(),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
writer.write_record_batch(&rb, Some(time_range)).await?;
|
||||||
|
let file_meta = writer.finish().await?;
|
||||||
|
file_metas.push(file_meta);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
info!(
|
||||||
|
"upload sst files, elapsed time: {}ms, schema num: {} tables_per_schema: {:?}, file_metas: {:?}",
|
||||||
|
start.elapsed().as_millis(),
|
||||||
|
tables_per_schema.len(),
|
||||||
|
tables_per_schema, file_metas
|
||||||
|
);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Builder for one table.
|
/// Builder for one table.
|
||||||
#[derive(Debug)]
|
#[derive(Debug, Clone)]
|
||||||
pub(crate) struct TableBuilder {
|
pub struct TableBuilder {
|
||||||
/// Column schemas.
|
/// Column schemas.
|
||||||
schema: Vec<ColumnSchema>,
|
schema: Vec<ColumnSchema>,
|
||||||
/// Rows written.
|
/// Rows written.
|
||||||
@@ -210,6 +317,13 @@ impl TableBuilder {
|
|||||||
rows: Some(Rows { schema, rows }),
|
rows: Some(Rows { schema, rows }),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn tags(&self) -> impl Iterator<Item = &String> {
|
||||||
|
self.schema
|
||||||
|
.iter()
|
||||||
|
.filter(|v| v.semantic_type == SemanticType::Tag as i32)
|
||||||
|
.map(|c| &c.column_name)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|||||||
@@ -28,8 +28,9 @@ use prost::DecodeError;
|
|||||||
use session::context::QueryContextRef;
|
use session::context::QueryContextRef;
|
||||||
use snafu::OptionExt;
|
use snafu::OptionExt;
|
||||||
|
|
||||||
use crate::error::InternalSnafu;
|
use crate::error::{InternalSnafu, Result};
|
||||||
use crate::http::event::PipelineIngestRequest;
|
use crate::http::event::PipelineIngestRequest;
|
||||||
|
use crate::http::prom_store::PromBulkContext;
|
||||||
use crate::http::PromValidationMode;
|
use crate::http::PromValidationMode;
|
||||||
use crate::pipeline::run_pipeline;
|
use crate::pipeline::run_pipeline;
|
||||||
use crate::prom_row_builder::{PromCtx, TablesBuilder};
|
use crate::prom_row_builder::{PromCtx, TablesBuilder};
|
||||||
@@ -283,7 +284,7 @@ pub(crate) fn decode_string(
|
|||||||
|
|
||||||
#[derive(Default, Debug)]
|
#[derive(Default, Debug)]
|
||||||
pub struct PromWriteRequest {
|
pub struct PromWriteRequest {
|
||||||
table_data: TablesBuilder,
|
pub table_data: TablesBuilder,
|
||||||
series: PromTimeSeries,
|
series: PromTimeSeries,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -352,6 +353,11 @@ impl PromWriteRequest {
|
|||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Converts the write request into a record batch and reset the table data.
|
||||||
|
pub async fn as_record_batch(&mut self, bulk_ctx: &PromBulkContext) -> Result<()> {
|
||||||
|
self.table_data.as_record_batches(bulk_ctx).await
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A hook to be injected into the PromWriteRequest decoding process.
|
/// A hook to be injected into the PromWriteRequest decoding process.
|
||||||
|
|||||||
@@ -28,6 +28,7 @@ use query::parser::PromQuery;
|
|||||||
use query::query_engine::DescribeResult;
|
use query::query_engine::DescribeResult;
|
||||||
use servers::error::{Error, Result};
|
use servers::error::{Error, Result};
|
||||||
use servers::http::header::{CONTENT_ENCODING_SNAPPY, CONTENT_TYPE_PROTOBUF};
|
use servers::http::header::{CONTENT_ENCODING_SNAPPY, CONTENT_TYPE_PROTOBUF};
|
||||||
|
use servers::http::prom_store::PromStoreState;
|
||||||
use servers::http::test_helpers::TestClient;
|
use servers::http::test_helpers::TestClient;
|
||||||
use servers::http::{HttpOptions, HttpServerBuilder, PromValidationMode};
|
use servers::http::{HttpOptions, HttpServerBuilder, PromValidationMode};
|
||||||
use servers::prom_store;
|
use servers::prom_store;
|
||||||
@@ -121,9 +122,16 @@ fn make_test_app(tx: mpsc::Sender<(String, Vec<u8>)>) -> Router {
|
|||||||
};
|
};
|
||||||
|
|
||||||
let instance = Arc::new(DummyInstance { tx });
|
let instance = Arc::new(DummyInstance { tx });
|
||||||
|
let state = PromStoreState {
|
||||||
|
prom_store_handler: instance.clone(),
|
||||||
|
pipeline_handler: None,
|
||||||
|
prom_store_with_metric_engine: true,
|
||||||
|
prom_validation_mode: PromValidationMode::Unchecked,
|
||||||
|
bulk_state: None,
|
||||||
|
};
|
||||||
let server = HttpServerBuilder::new(http_opts)
|
let server = HttpServerBuilder::new(http_opts)
|
||||||
.with_sql_handler(instance.clone())
|
.with_sql_handler(instance.clone())
|
||||||
.with_prom_handler(instance, None, true, PromValidationMode::Unchecked)
|
.with_prom_handler(state)
|
||||||
.build();
|
.build();
|
||||||
server.build(server.make_app()).unwrap()
|
server.build(server.make_app()).unwrap()
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -213,6 +213,14 @@ impl TableMeta {
|
|||||||
.map(|(_, cs)| &cs.name)
|
.map(|(_, cs)| &cs.name)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns names of primary keys.
|
||||||
|
pub fn primary_key_names(&self) -> impl Iterator<Item = &String> {
|
||||||
|
let columns_schemas = self.schema.column_schemas();
|
||||||
|
self.primary_key_indices
|
||||||
|
.iter()
|
||||||
|
.map(|pk_idx| &columns_schemas[*pk_idx].name)
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns the new [TableMetaBuilder] after applying given `alter_kind`.
|
/// Returns the new [TableMetaBuilder] after applying given `alter_kind`.
|
||||||
///
|
///
|
||||||
/// The returned builder would derive the next column id of this meta.
|
/// The returned builder would derive the next column id of this meta.
|
||||||
|
|||||||
@@ -43,6 +43,7 @@ use object_store::ObjectStore;
|
|||||||
use servers::grpc::builder::GrpcServerBuilder;
|
use servers::grpc::builder::GrpcServerBuilder;
|
||||||
use servers::grpc::greptime_handler::GreptimeRequestHandler;
|
use servers::grpc::greptime_handler::GreptimeRequestHandler;
|
||||||
use servers::grpc::{FlightCompression, GrpcOptions, GrpcServer, GrpcServerConfig};
|
use servers::grpc::{FlightCompression, GrpcOptions, GrpcServer, GrpcServerConfig};
|
||||||
|
use servers::http::prom_store::PromStoreState;
|
||||||
use servers::http::{HttpOptions, HttpServerBuilder, PromValidationMode};
|
use servers::http::{HttpOptions, HttpServerBuilder, PromValidationMode};
|
||||||
use servers::metrics_handler::MetricsHandler;
|
use servers::metrics_handler::MetricsHandler;
|
||||||
use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
|
use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
|
||||||
@@ -534,15 +535,17 @@ pub async fn setup_test_prom_app_with_frontend(
|
|||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
let frontend_ref = instance.fe_instance().clone();
|
let frontend_ref = instance.fe_instance().clone();
|
||||||
|
let state = PromStoreState {
|
||||||
|
prom_store_handler: frontend_ref.clone(),
|
||||||
|
pipeline_handler: Some(frontend_ref.clone()),
|
||||||
|
prom_store_with_metric_engine: true,
|
||||||
|
prom_validation_mode: PromValidationMode::Strict,
|
||||||
|
bulk_state: None,
|
||||||
|
};
|
||||||
let http_server = HttpServerBuilder::new(http_opts)
|
let http_server = HttpServerBuilder::new(http_opts)
|
||||||
.with_sql_handler(ServerSqlQueryHandlerAdapter::arc(frontend_ref.clone()))
|
.with_sql_handler(ServerSqlQueryHandlerAdapter::arc(frontend_ref.clone()))
|
||||||
.with_logs_handler(instance.fe_instance().clone())
|
.with_logs_handler(instance.fe_instance().clone())
|
||||||
.with_prom_handler(
|
.with_prom_handler(state)
|
||||||
frontend_ref.clone(),
|
|
||||||
Some(frontend_ref.clone()),
|
|
||||||
true,
|
|
||||||
PromValidationMode::Strict,
|
|
||||||
)
|
|
||||||
.with_prometheus_handler(frontend_ref)
|
.with_prometheus_handler(frontend_ref)
|
||||||
.with_greptime_config_options(instance.opts.datanode_options().to_toml().unwrap())
|
.with_greptime_config_options(instance.opts.datanode_options().to_toml().unwrap())
|
||||||
.build();
|
.build();
|
||||||
|
|||||||
Reference in New Issue
Block a user