mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-23 06:30:05 +00:00
Compare commits
27 Commits
feature/df
...
poc/create
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7e79b4b2f6 | ||
|
|
4ad40af468 | ||
|
|
e4b048e788 | ||
|
|
ecbf372de3 | ||
|
|
3d81a17360 | ||
|
|
025cae3679 | ||
|
|
68409e28ea | ||
|
|
699406ae32 | ||
|
|
344006deca | ||
|
|
63803f2b43 | ||
|
|
cf62767b98 | ||
|
|
4e53c1531d | ||
|
|
892cb66c53 | ||
|
|
8b392477c8 | ||
|
|
905593dc16 | ||
|
|
6c04cb9b19 | ||
|
|
24da3367c1 | ||
|
|
80b14965a6 | ||
|
|
5da3f86d0c | ||
|
|
151273d1df | ||
|
|
b0289dbdde | ||
|
|
c51730a954 | ||
|
|
207709c727 | ||
|
|
deca8c44fa | ||
|
|
2edd861ce9 | ||
|
|
14f3a4ab05 | ||
|
|
34875c0346 |
21
Cargo.lock
generated
21
Cargo.lock
generated
@@ -4738,6 +4738,7 @@ dependencies = [
|
||||
"log-store",
|
||||
"meta-client",
|
||||
"num_cpus",
|
||||
"object-store",
|
||||
"opentelemetry-proto 0.27.0",
|
||||
"operator",
|
||||
"otel-arrow-rust",
|
||||
@@ -6698,7 +6699,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"windows-targets 0.48.5",
|
||||
"windows-targets 0.52.6",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -7234,6 +7235,7 @@ dependencies = [
|
||||
name = "metric-engine"
|
||||
version = "0.15.0"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"api",
|
||||
"aquamarine",
|
||||
"async-stream",
|
||||
@@ -11230,6 +11232,7 @@ dependencies = [
|
||||
"common-base",
|
||||
"common-catalog",
|
||||
"common-config",
|
||||
"common-datasource",
|
||||
"common-error",
|
||||
"common-frontend",
|
||||
"common-grpc",
|
||||
@@ -11272,16 +11275,23 @@ dependencies = [
|
||||
"local-ip-address",
|
||||
"log-query",
|
||||
"loki-proto",
|
||||
"metric-engine",
|
||||
"mime_guess",
|
||||
"mito-codec",
|
||||
"mito2",
|
||||
"mysql_async",
|
||||
"notify",
|
||||
"object-pool",
|
||||
"object-store",
|
||||
"once_cell",
|
||||
"openmetrics-parser",
|
||||
"opensrv-mysql",
|
||||
"opentelemetry-proto 0.27.0",
|
||||
"operator",
|
||||
"otel-arrow-rust",
|
||||
"parking_lot 0.12.3",
|
||||
"parquet",
|
||||
"partition",
|
||||
"permutation",
|
||||
"pgwire",
|
||||
"pin-project",
|
||||
@@ -13856,12 +13866,13 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
|
||||
|
||||
[[package]]
|
||||
name = "uuid"
|
||||
version = "1.10.0"
|
||||
version = "1.17.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "81dfa00651efa65069b0b6b651f4aaa31ba9e3c3ce0137aaad053604ee7e0314"
|
||||
checksum = "3cf4199d1e5d15ddd86a694e4d0dffa9c323ce759fea589f00fef9d81cc1931d"
|
||||
dependencies = [
|
||||
"getrandom 0.2.15",
|
||||
"rand 0.8.5",
|
||||
"getrandom 0.3.2",
|
||||
"js-sys",
|
||||
"rand 0.9.0",
|
||||
"serde",
|
||||
"wasm-bindgen",
|
||||
]
|
||||
|
||||
@@ -21,6 +21,7 @@ pub mod error;
|
||||
pub mod file_format;
|
||||
pub mod lister;
|
||||
pub mod object_store;
|
||||
pub mod parquet_writer;
|
||||
pub mod share_buffer;
|
||||
#[cfg(test)]
|
||||
pub mod test_util;
|
||||
|
||||
52
src/common/datasource/src/parquet_writer.rs
Normal file
52
src/common/datasource/src/parquet_writer.rs
Normal file
@@ -0,0 +1,52 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use bytes::Bytes;
|
||||
use futures::future::BoxFuture;
|
||||
use object_store::Writer;
|
||||
use parquet::arrow::async_writer::AsyncFileWriter;
|
||||
use parquet::errors::ParquetError;
|
||||
|
||||
/// Bridges opendal [Writer] with parquet [AsyncFileWriter].
|
||||
pub struct AsyncWriter {
|
||||
inner: Writer,
|
||||
}
|
||||
|
||||
impl AsyncWriter {
|
||||
/// Create a [`AsyncWriter`] by given [`Writer`].
|
||||
pub fn new(writer: Writer) -> Self {
|
||||
Self { inner: writer }
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncFileWriter for AsyncWriter {
|
||||
fn write(&mut self, bs: Bytes) -> BoxFuture<'_, parquet::errors::Result<()>> {
|
||||
Box::pin(async move {
|
||||
self.inner
|
||||
.write(bs)
|
||||
.await
|
||||
.map_err(|err| ParquetError::External(Box::new(err)))
|
||||
})
|
||||
}
|
||||
|
||||
fn complete(&mut self) -> BoxFuture<'_, parquet::errors::Result<()>> {
|
||||
Box::pin(async move {
|
||||
self.inner
|
||||
.close()
|
||||
.await
|
||||
.map(|_| ())
|
||||
.map_err(|err| ParquetError::External(Box::new(err)))
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -26,7 +26,7 @@ use common_error::ext::BoxedError;
|
||||
use common_meta::cache::{LayeredCacheRegistryRef, TableFlownodeSetCacheRef, TableRouteCacheRef};
|
||||
use common_meta::ddl::ProcedureExecutorRef;
|
||||
use common_meta::key::flow::FlowMetadataManagerRef;
|
||||
use common_meta::key::TableMetadataManagerRef;
|
||||
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
|
||||
use common_meta::kv_backend::KvBackendRef;
|
||||
use common_meta::node_manager::{Flownode, NodeManagerRef};
|
||||
use common_query::Output;
|
||||
@@ -37,6 +37,7 @@ use greptime_proto::v1::flow::{flow_server, FlowRequest, FlowResponse, InsertReq
|
||||
use itertools::Itertools;
|
||||
use operator::delete::Deleter;
|
||||
use operator::insert::Inserter;
|
||||
use operator::schema_helper::SchemaHelper;
|
||||
use operator::statement::StatementExecutor;
|
||||
use partition::manager::PartitionRuleManager;
|
||||
use query::{QueryEngine, QueryEngineFactory};
|
||||
@@ -546,8 +547,14 @@ impl FrontendInvoker {
|
||||
name: TABLE_FLOWNODE_SET_CACHE_NAME,
|
||||
})?;
|
||||
|
||||
let inserter = Arc::new(Inserter::new(
|
||||
let schema_helper = SchemaHelper::new(
|
||||
catalog_manager.clone(),
|
||||
Arc::new(TableMetadataManager::new(kv_backend.clone())),
|
||||
procedure_executor.clone(),
|
||||
layered_cache_registry.clone(),
|
||||
);
|
||||
let inserter = Arc::new(Inserter::new(
|
||||
schema_helper,
|
||||
partition_manager.clone(),
|
||||
node_manager.clone(),
|
||||
table_flownode_cache,
|
||||
@@ -588,7 +595,7 @@ impl FrontendInvoker {
|
||||
.start_timer();
|
||||
|
||||
self.inserter
|
||||
.handle_row_inserts(requests, ctx, &self.statement_executor, false, false)
|
||||
.handle_row_inserts(requests, ctx, false, false)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(common_frontend::error::ExternalSnafu)
|
||||
|
||||
@@ -49,6 +49,7 @@ log-query.workspace = true
|
||||
log-store.workspace = true
|
||||
meta-client.workspace = true
|
||||
num_cpus.workspace = true
|
||||
object-store.workspace = true
|
||||
opentelemetry-proto.workspace = true
|
||||
operator.workspace = true
|
||||
otel-arrow-rust.workspace = true
|
||||
|
||||
@@ -19,6 +19,7 @@ use common_config::config::Configurable;
|
||||
use common_options::datanode::DatanodeClientOptions;
|
||||
use common_telemetry::logging::{LoggingOptions, SlowQueryOptions, TracingOptions};
|
||||
use meta_client::MetaClientOptions;
|
||||
use object_store::config::ObjectStoreConfig;
|
||||
use query::options::QueryOptions;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use servers::export_metrics::{ExportMetricsOption, ExportMetricsTask};
|
||||
@@ -62,6 +63,7 @@ pub struct FrontendOptions {
|
||||
pub query: QueryOptions,
|
||||
pub max_in_flight_write_bytes: Option<ReadableSize>,
|
||||
pub slow_query: Option<SlowQueryOptions>,
|
||||
pub store: ObjectStoreConfig,
|
||||
}
|
||||
|
||||
impl Default for FrontendOptions {
|
||||
@@ -88,6 +90,7 @@ impl Default for FrontendOptions {
|
||||
query: QueryOptions::default(),
|
||||
max_in_flight_write_bytes: None,
|
||||
slow_query: Some(SlowQueryOptions::default()),
|
||||
store: ObjectStoreConfig::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -116,8 +119,7 @@ impl Frontend {
|
||||
if let Some(t) = self.export_metrics_task.as_ref() {
|
||||
if t.send_by_handler {
|
||||
let inserter = self.instance.inserter().clone();
|
||||
let statement_executor = self.instance.statement_executor().clone();
|
||||
let handler = ExportMetricHandler::new_handler(inserter, statement_executor);
|
||||
let handler = ExportMetricHandler::new_handler(inserter);
|
||||
t.start(Some(handler)).context(error::StartServerSnafu)?
|
||||
} else {
|
||||
t.start(None).context(error::StartServerSnafu)?;
|
||||
|
||||
@@ -39,6 +39,7 @@ use common_config::KvBackendConfig;
|
||||
use common_error::ext::{BoxedError, ErrorExt};
|
||||
use common_meta::key::TableMetadataManagerRef;
|
||||
use common_meta::kv_backend::KvBackendRef;
|
||||
use common_meta::node_manager::NodeManagerRef;
|
||||
use common_meta::state_store::KvStateStore;
|
||||
use common_procedure::local::{LocalManager, ManagerConfig};
|
||||
use common_procedure::options::ProcedureConfig;
|
||||
@@ -49,7 +50,9 @@ use datafusion_expr::LogicalPlan;
|
||||
use log_store::raft_engine::RaftEngineBackend;
|
||||
use operator::delete::DeleterRef;
|
||||
use operator::insert::InserterRef;
|
||||
use operator::schema_helper::SchemaHelper;
|
||||
use operator::statement::{StatementExecutor, StatementExecutorRef};
|
||||
use partition::manager::PartitionRuleManagerRef;
|
||||
use pipeline::pipeline_operator::PipelineOperator;
|
||||
use prometheus::HistogramTimer;
|
||||
use promql_parser::label::Matcher;
|
||||
@@ -58,6 +61,7 @@ use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
|
||||
use query::query_engine::options::{validate_catalog_and_schema, QueryOptions};
|
||||
use query::query_engine::DescribeResult;
|
||||
use query::QueryEngineRef;
|
||||
use servers::access_layer::AccessLayerFactory;
|
||||
use servers::error as server_error;
|
||||
use servers::error::{AuthSnafu, ExecuteQuerySnafu, ParsePromQLSnafu};
|
||||
use servers::interceptor::{
|
||||
@@ -100,6 +104,7 @@ pub struct Instance {
|
||||
slow_query_recorder: Option<SlowQueryRecorder>,
|
||||
limiter: Option<LimiterRef>,
|
||||
process_manager: ProcessManagerRef,
|
||||
access_layer_factory: AccessLayerFactory,
|
||||
}
|
||||
|
||||
impl Instance {
|
||||
@@ -161,6 +166,27 @@ impl Instance {
|
||||
pub fn process_manager(&self) -> &ProcessManagerRef {
|
||||
&self.process_manager
|
||||
}
|
||||
|
||||
pub fn create_schema_helper(&self) -> SchemaHelper {
|
||||
SchemaHelper::new(
|
||||
self.catalog_manager.clone(),
|
||||
self.table_metadata_manager.clone(),
|
||||
self.statement_executor.procedure_executor().clone(),
|
||||
self.statement_executor.cache_invalidator().clone(),
|
||||
)
|
||||
}
|
||||
|
||||
pub fn partition_manager(&self) -> &PartitionRuleManagerRef {
|
||||
self.inserter.partition_manager()
|
||||
}
|
||||
|
||||
pub fn node_manager(&self) -> &NodeManagerRef {
|
||||
self.inserter.node_manager()
|
||||
}
|
||||
|
||||
pub fn access_layer_factory(&self) -> &AccessLayerFactory {
|
||||
&self.access_layer_factory
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_stmt(sql: &str, dialect: &(dyn Dialect + Send + Sync)) -> Result<Vec<Statement>> {
|
||||
|
||||
@@ -30,12 +30,14 @@ use operator::flow::FlowServiceOperator;
|
||||
use operator::insert::Inserter;
|
||||
use operator::procedure::ProcedureServiceOperator;
|
||||
use operator::request::Requester;
|
||||
use operator::schema_helper::SchemaHelper;
|
||||
use operator::statement::{StatementExecutor, StatementExecutorRef};
|
||||
use operator::table::TableMutationOperator;
|
||||
use partition::manager::PartitionRuleManager;
|
||||
use pipeline::pipeline_operator::PipelineOperator;
|
||||
use query::region_query::RegionQueryHandlerFactoryRef;
|
||||
use query::QueryEngineFactory;
|
||||
use servers::access_layer::AccessLayerFactory;
|
||||
use snafu::OptionExt;
|
||||
|
||||
use crate::error::{self, Result};
|
||||
@@ -130,8 +132,15 @@ impl FrontendBuilder {
|
||||
name: TABLE_FLOWNODE_SET_CACHE_NAME,
|
||||
})?;
|
||||
|
||||
let inserter = Arc::new(Inserter::new(
|
||||
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
|
||||
let schema_helper = SchemaHelper::new(
|
||||
self.catalog_manager.clone(),
|
||||
table_metadata_manager.clone(),
|
||||
self.procedure_executor.clone(),
|
||||
local_cache_invalidator.clone(),
|
||||
);
|
||||
let inserter = Arc::new(Inserter::new(
|
||||
schema_helper,
|
||||
partition_manager.clone(),
|
||||
node_manager.clone(),
|
||||
table_flownode_cache,
|
||||
@@ -176,7 +185,7 @@ impl FrontendBuilder {
|
||||
self.catalog_manager.clone(),
|
||||
query_engine.clone(),
|
||||
self.procedure_executor,
|
||||
kv_backend.clone(),
|
||||
kv_backend,
|
||||
local_cache_invalidator,
|
||||
inserter.clone(),
|
||||
table_route_cache,
|
||||
@@ -211,6 +220,7 @@ impl FrontendBuilder {
|
||||
Arc::new(Limiter::new(max_in_flight_write_bytes.as_bytes()))
|
||||
});
|
||||
|
||||
let access_layer_factory = AccessLayerFactory::new(&self.options.store).await.unwrap();
|
||||
Ok(Instance {
|
||||
catalog_manager: self.catalog_manager,
|
||||
pipeline_operator,
|
||||
@@ -219,10 +229,11 @@ impl FrontendBuilder {
|
||||
plugins,
|
||||
inserter,
|
||||
deleter,
|
||||
table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend)),
|
||||
table_metadata_manager,
|
||||
slow_query_recorder,
|
||||
limiter,
|
||||
process_manager,
|
||||
access_layer_factory,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -408,7 +408,7 @@ impl Instance {
|
||||
ctx: QueryContextRef,
|
||||
) -> Result<Output> {
|
||||
self.inserter
|
||||
.handle_column_inserts(requests, ctx, self.statement_executor.as_ref())
|
||||
.handle_column_inserts(requests, ctx)
|
||||
.await
|
||||
.context(TableOperationSnafu)
|
||||
}
|
||||
@@ -422,13 +422,7 @@ impl Instance {
|
||||
is_single_value: bool,
|
||||
) -> Result<Output> {
|
||||
self.inserter
|
||||
.handle_row_inserts(
|
||||
requests,
|
||||
ctx,
|
||||
self.statement_executor.as_ref(),
|
||||
accommodate_existing_schema,
|
||||
is_single_value,
|
||||
)
|
||||
.handle_row_inserts(requests, ctx, accommodate_existing_schema, is_single_value)
|
||||
.await
|
||||
.context(TableOperationSnafu)
|
||||
}
|
||||
@@ -441,10 +435,7 @@ impl Instance {
|
||||
) -> Result<Output> {
|
||||
self.inserter
|
||||
.handle_last_non_null_inserts(
|
||||
requests,
|
||||
ctx,
|
||||
self.statement_executor.as_ref(),
|
||||
true,
|
||||
requests, ctx, true,
|
||||
// Influx protocol may writes multiple fields (values).
|
||||
false,
|
||||
)
|
||||
@@ -460,7 +451,7 @@ impl Instance {
|
||||
physical_table: String,
|
||||
) -> Result<Output> {
|
||||
self.inserter
|
||||
.handle_metric_row_inserts(requests, ctx, &self.statement_executor, physical_table)
|
||||
.handle_metric_row_inserts(requests, ctx, physical_table)
|
||||
.await
|
||||
.context(TableOperationSnafu)
|
||||
}
|
||||
|
||||
@@ -135,7 +135,7 @@ impl Instance {
|
||||
};
|
||||
|
||||
self.inserter
|
||||
.handle_log_inserts(log, ctx, self.statement_executor.as_ref())
|
||||
.handle_log_inserts(log, ctx)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExecuteGrpcRequestSnafu)
|
||||
@@ -157,7 +157,7 @@ impl Instance {
|
||||
};
|
||||
|
||||
self.inserter
|
||||
.handle_trace_inserts(rows, ctx, self.statement_executor.as_ref())
|
||||
.handle_trace_inserts(rows, ctx)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExecuteGrpcRequestSnafu)
|
||||
|
||||
@@ -28,7 +28,6 @@ use common_query::Output;
|
||||
use common_recordbatch::RecordBatches;
|
||||
use common_telemetry::{debug, tracing};
|
||||
use operator::insert::InserterRef;
|
||||
use operator::statement::StatementExecutor;
|
||||
use prost::Message;
|
||||
use servers::error::{self, AuthSnafu, InFlightWriteBytesExceededSnafu, Result as ServerResult};
|
||||
use servers::http::header::{collect_plan_metrics, CONTENT_ENCODING_SNAPPY, CONTENT_TYPE_PROTOBUF};
|
||||
@@ -271,18 +270,11 @@ impl PromStoreProtocolHandler for Instance {
|
||||
/// so only implement `PromStoreProtocolHandler::write` method.
|
||||
pub struct ExportMetricHandler {
|
||||
inserter: InserterRef,
|
||||
statement_executor: Arc<StatementExecutor>,
|
||||
}
|
||||
|
||||
impl ExportMetricHandler {
|
||||
pub fn new_handler(
|
||||
inserter: InserterRef,
|
||||
statement_executor: Arc<StatementExecutor>,
|
||||
) -> PromStoreProtocolHandlerRef {
|
||||
Arc::new(Self {
|
||||
inserter,
|
||||
statement_executor,
|
||||
})
|
||||
pub fn new_handler(inserter: InserterRef) -> PromStoreProtocolHandlerRef {
|
||||
Arc::new(Self { inserter })
|
||||
}
|
||||
}
|
||||
|
||||
@@ -295,12 +287,7 @@ impl PromStoreProtocolHandler for ExportMetricHandler {
|
||||
_: bool,
|
||||
) -> ServerResult<Output> {
|
||||
self.inserter
|
||||
.handle_metric_row_inserts(
|
||||
request,
|
||||
ctx,
|
||||
&self.statement_executor,
|
||||
GREPTIME_PHYSICAL_TABLE.to_string(),
|
||||
)
|
||||
.handle_metric_row_inserts(request, ctx, GREPTIME_PHYSICAL_TABLE.to_string())
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::ExecuteGrpcQuerySnafu)
|
||||
|
||||
@@ -24,6 +24,7 @@ use servers::grpc::frontend_grpc_handler::FrontendGrpcHandler;
|
||||
use servers::grpc::greptime_handler::GreptimeRequestHandler;
|
||||
use servers::grpc::{GrpcOptions, GrpcServer};
|
||||
use servers::http::event::LogValidatorRef;
|
||||
use servers::http::prom_store::{PromBulkState, PromStoreState};
|
||||
use servers::http::{HttpServer, HttpServerBuilder};
|
||||
use servers::interceptor::LogIngestInterceptorRef;
|
||||
use servers::metrics_handler::MetricsHandler;
|
||||
@@ -95,13 +96,30 @@ where
|
||||
}
|
||||
|
||||
if opts.prom_store.enable {
|
||||
let bulk_state = if opts.prom_store.bulk_mode {
|
||||
let mut state = PromBulkState {
|
||||
schema_helper: self.instance.create_schema_helper(),
|
||||
partition_manager: self.instance.partition_manager().clone(),
|
||||
node_manager: self.instance.node_manager().clone(),
|
||||
access_layer_factory: self.instance.access_layer_factory().clone(),
|
||||
tx: None,
|
||||
};
|
||||
state.start_background_task();
|
||||
Some(state)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let state = PromStoreState {
|
||||
prom_store_handler: self.instance.clone(),
|
||||
pipeline_handler: Some(self.instance.clone()),
|
||||
prom_store_with_metric_engine: opts.prom_store.with_metric_engine,
|
||||
prom_validation_mode: opts.http.prom_validation_mode,
|
||||
bulk_state,
|
||||
};
|
||||
|
||||
builder = builder
|
||||
.with_prom_handler(
|
||||
self.instance.clone(),
|
||||
Some(self.instance.clone()),
|
||||
opts.prom_store.with_metric_engine,
|
||||
opts.http.prom_validation_mode,
|
||||
)
|
||||
.with_prom_handler(state)
|
||||
.with_prometheus_handler(self.instance.clone());
|
||||
}
|
||||
|
||||
|
||||
@@ -18,6 +18,7 @@ use serde::{Deserialize, Serialize};
|
||||
pub struct PromStoreOptions {
|
||||
pub enable: bool,
|
||||
pub with_metric_engine: bool,
|
||||
pub bulk_mode: bool,
|
||||
}
|
||||
|
||||
impl Default for PromStoreOptions {
|
||||
@@ -25,6 +26,7 @@ impl Default for PromStoreOptions {
|
||||
Self {
|
||||
enable: true,
|
||||
with_metric_engine: true,
|
||||
bulk_mode: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -37,6 +39,7 @@ mod tests {
|
||||
fn test_prom_store_options() {
|
||||
let default = PromStoreOptions::default();
|
||||
assert!(default.enable);
|
||||
assert!(default.with_metric_engine)
|
||||
assert!(default.with_metric_engine);
|
||||
assert!(!default.bulk_mode);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -233,7 +233,7 @@ impl SlowQueryEventHandler {
|
||||
.into();
|
||||
|
||||
self.inserter
|
||||
.handle_row_inserts(requests, query_ctx, &self.statement_executor, false, false)
|
||||
.handle_row_inserts(requests, query_ctx, false, false)
|
||||
.await
|
||||
.context(TableOperationSnafu)?;
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@ license.workspace = true
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
ahash.workspace = true
|
||||
api.workspace = true
|
||||
aquamarine.workspace = true
|
||||
async-stream.workspace = true
|
||||
|
||||
@@ -147,7 +147,7 @@ impl MetricEngineInner {
|
||||
fn modify_rows(
|
||||
&self,
|
||||
physical_region_id: RegionId,
|
||||
table_id: TableId,
|
||||
logical_table_id: TableId,
|
||||
rows: &mut Rows,
|
||||
encoding: PrimaryKeyEncoding,
|
||||
) -> Result<()> {
|
||||
@@ -163,7 +163,9 @@ impl MetricEngineInner {
|
||||
.physical_columns();
|
||||
RowsIter::new(input, name_to_id)
|
||||
};
|
||||
let output = self.row_modifier.modify_rows(iter, table_id, encoding)?;
|
||||
let output = self
|
||||
.row_modifier
|
||||
.modify_rows(iter, logical_table_id, encoding)?;
|
||||
*rows = output;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -40,7 +40,7 @@ const TSID_HASH_SEED: u32 = 846793005;
|
||||
///
|
||||
/// - For [`PrimaryKeyEncoding::Dense`] encoding,
|
||||
/// it adds two columns(`__table_id`, `__tsid`) to the row.
|
||||
pub(crate) struct RowModifier {
|
||||
pub struct RowModifier {
|
||||
codec: SparsePrimaryKeyCodec,
|
||||
}
|
||||
|
||||
@@ -52,7 +52,7 @@ impl RowModifier {
|
||||
}
|
||||
|
||||
/// Modify rows with the given primary key encoding.
|
||||
pub(crate) fn modify_rows(
|
||||
pub fn modify_rows(
|
||||
&self,
|
||||
iter: RowsIter,
|
||||
table_id: TableId,
|
||||
@@ -74,7 +74,7 @@ impl RowModifier {
|
||||
|
||||
let mut buffer = vec![];
|
||||
for mut iter in iter.iter_mut() {
|
||||
let (table_id, tsid) = self.fill_internal_columns(table_id, &iter);
|
||||
let (table_id, tsid) = Self::fill_internal_columns(table_id, &iter);
|
||||
let mut values = Vec::with_capacity(num_output_column);
|
||||
buffer.clear();
|
||||
let internal_columns = [
|
||||
@@ -135,7 +135,7 @@ impl RowModifier {
|
||||
options: None,
|
||||
});
|
||||
for iter in iter.iter_mut() {
|
||||
let (table_id, tsid) = self.fill_internal_columns(table_id, &iter);
|
||||
let (table_id, tsid) = Self::fill_internal_columns(table_id, &iter);
|
||||
iter.row.values.push(table_id);
|
||||
iter.row.values.push(tsid);
|
||||
}
|
||||
@@ -144,7 +144,7 @@ impl RowModifier {
|
||||
}
|
||||
|
||||
/// Fills internal columns of a row with table name and a hash of tag values.
|
||||
fn fill_internal_columns(&self, table_id: TableId, iter: &RowIter<'_>) -> (Value, Value) {
|
||||
pub fn fill_internal_columns(table_id: TableId, iter: &RowIter<'_>) -> (Value, Value) {
|
||||
let mut hasher = TsidGenerator::default();
|
||||
for (name, value) in iter.primary_keys_with_name() {
|
||||
// The type is checked before. So only null is ignored.
|
||||
@@ -264,7 +264,7 @@ impl IterIndex {
|
||||
}
|
||||
|
||||
/// Iterator of rows.
|
||||
pub(crate) struct RowsIter {
|
||||
pub struct RowsIter {
|
||||
rows: Rows,
|
||||
index: IterIndex,
|
||||
}
|
||||
@@ -276,7 +276,7 @@ impl RowsIter {
|
||||
}
|
||||
|
||||
/// Returns the iterator of rows.
|
||||
fn iter_mut(&mut self) -> impl Iterator<Item = RowIter> {
|
||||
pub fn iter_mut(&mut self) -> impl Iterator<Item = RowIter> {
|
||||
self.rows.rows.iter_mut().map(|row| RowIter {
|
||||
row,
|
||||
index: &self.index,
|
||||
@@ -290,10 +290,22 @@ impl RowsIter {
|
||||
.iter()
|
||||
.map(|idx| std::mem::take(&mut self.rows.schema[idx.index]))
|
||||
}
|
||||
|
||||
pub fn num_rows(&self) -> usize {
|
||||
self.rows.rows.len()
|
||||
}
|
||||
|
||||
pub fn num_columns(&self) -> usize {
|
||||
self.rows.schema.len()
|
||||
}
|
||||
|
||||
pub fn num_primary_keys(&self) -> usize {
|
||||
self.index.num_primary_key_column
|
||||
}
|
||||
}
|
||||
|
||||
/// Iterator of a row.
|
||||
struct RowIter<'a> {
|
||||
pub struct RowIter<'a> {
|
||||
row: &'a mut Row,
|
||||
index: &'a IterIndex,
|
||||
schema: &'a Vec<ColumnSchema>,
|
||||
@@ -313,7 +325,7 @@ impl RowIter<'_> {
|
||||
}
|
||||
|
||||
/// Returns the primary keys.
|
||||
fn primary_keys(&self) -> impl Iterator<Item = (ColumnId, ValueRef)> {
|
||||
pub fn primary_keys(&self) -> impl Iterator<Item = (ColumnId, ValueRef)> {
|
||||
self.index.indices[..self.index.num_primary_key_column]
|
||||
.iter()
|
||||
.map(|idx| {
|
||||
@@ -333,6 +345,13 @@ impl RowIter<'_> {
|
||||
.iter()
|
||||
.map(|idx| std::mem::take(&mut self.row.values[idx.index]))
|
||||
}
|
||||
|
||||
/// Returns value at given offset.
|
||||
/// # Panics
|
||||
/// Panics if offset out-of-bound
|
||||
pub fn value_at(&self, idx: usize) -> &Value {
|
||||
&self.row.values[idx]
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -476,7 +495,6 @@ mod tests {
|
||||
#[test]
|
||||
fn test_fill_internal_columns() {
|
||||
let name_to_column_id = test_name_to_column_id();
|
||||
let encoder = RowModifier::new();
|
||||
let table_id = 1025;
|
||||
let schema = test_schema();
|
||||
let row = test_row("greptimedb", "127.0.0.1");
|
||||
@@ -486,7 +504,7 @@ mod tests {
|
||||
};
|
||||
let mut rows_iter = RowsIter::new(rows, &name_to_column_id);
|
||||
let row_iter = rows_iter.iter_mut().next().unwrap();
|
||||
let (encoded_table_id, tsid) = encoder.fill_internal_columns(table_id, &row_iter);
|
||||
let (encoded_table_id, tsid) = RowModifier::fill_internal_columns(table_id, &row_iter);
|
||||
assert_eq!(encoded_table_id, ValueData::U32Value(1025).into());
|
||||
assert_eq!(tsid, ValueData::U64Value(9442261431637846000).into());
|
||||
|
||||
@@ -514,7 +532,7 @@ mod tests {
|
||||
};
|
||||
let mut rows_iter = RowsIter::new(rows, &name_to_column_id);
|
||||
let row_iter = rows_iter.iter_mut().next().unwrap();
|
||||
let (encoded_table_id, tsid) = encoder.fill_internal_columns(table_id, &row_iter);
|
||||
let (encoded_table_id, tsid) = RowModifier::fill_internal_columns(table_id, &row_iter);
|
||||
assert_eq!(encoded_table_id, ValueData::U32Value(1025).into());
|
||||
assert_eq!(tsid, ValueData::U64Value(9442261431637846000).into());
|
||||
}
|
||||
|
||||
@@ -41,7 +41,7 @@ pub const PARQUET_METADATA_KEY: &str = "greptime:metadata";
|
||||
/// Default batch size to read parquet files.
|
||||
pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 1024;
|
||||
/// Default row group size for parquet files.
|
||||
pub(crate) const DEFAULT_ROW_GROUP_SIZE: usize = 100 * DEFAULT_READ_BATCH_SIZE;
|
||||
pub const DEFAULT_ROW_GROUP_SIZE: usize = 100 * DEFAULT_READ_BATCH_SIZE;
|
||||
|
||||
/// Parquet write options.
|
||||
#[derive(Debug)]
|
||||
|
||||
@@ -860,6 +860,14 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to decode object from json"))]
|
||||
DecodeJson {
|
||||
#[snafu(source)]
|
||||
error: serde_json::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -991,6 +999,7 @@ impl ErrorExt for Error {
|
||||
Error::InvalidProcessId { .. } => StatusCode::InvalidArguments,
|
||||
Error::ProcessManagerMissing { .. } => StatusCode::Unexpected,
|
||||
Error::PathNotFound { .. } => StatusCode::InvalidArguments,
|
||||
Error::DecodeJson { .. } => StatusCode::Unexpected,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -22,10 +22,9 @@ use api::v1::region::{
|
||||
RegionRequestHeader,
|
||||
};
|
||||
use api::v1::{
|
||||
AlterTableExpr, ColumnDataType, ColumnSchema, CreateTableExpr, InsertRequests,
|
||||
RowInsertRequest, RowInsertRequests, SemanticType,
|
||||
AlterTableExpr, ColumnSchema, CreateTableExpr, InsertRequests, RowInsertRequest,
|
||||
RowInsertRequests, SemanticType,
|
||||
};
|
||||
use catalog::CatalogManagerRef;
|
||||
use client::{OutputData, OutputMeta};
|
||||
use common_catalog::consts::{
|
||||
default_engine, trace_services_table_name, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN,
|
||||
@@ -35,7 +34,6 @@ use common_grpc_expr::util::ColumnExpr;
|
||||
use common_meta::cache::TableFlownodeSetCacheRef;
|
||||
use common_meta::node_manager::{AffectedRows, NodeManagerRef};
|
||||
use common_meta::peer::Peer;
|
||||
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
|
||||
use common_query::Output;
|
||||
use common_telemetry::tracing_context::TracingContext;
|
||||
use common_telemetry::{error, info, warn};
|
||||
@@ -49,9 +47,7 @@ use snafu::ResultExt;
|
||||
use sql::partition::partition_rule_for_hexstring;
|
||||
use sql::statements::create::Partitions;
|
||||
use sql::statements::insert::Insert;
|
||||
use store_api::metric_engine_consts::{
|
||||
LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY,
|
||||
};
|
||||
use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
|
||||
use store_api::mito_engine_options::{APPEND_MODE_KEY, MERGE_MODE_KEY};
|
||||
use store_api::storage::{RegionId, TableId};
|
||||
use table::metadata::TableInfo;
|
||||
@@ -63,7 +59,7 @@ use table::table_reference::TableReference;
|
||||
use table::TableRef;
|
||||
|
||||
use crate::error::{
|
||||
CatalogSnafu, ColumnOptionsSnafu, CreatePartitionRulesSnafu, FindRegionLeaderSnafu,
|
||||
ColumnOptionsSnafu, CreatePartitionRulesSnafu, FindRegionLeaderSnafu,
|
||||
InvalidInsertRequestSnafu, JoinTaskSnafu, RequestInsertsSnafu, Result, TableNotFoundSnafu,
|
||||
};
|
||||
use crate::expr_helper;
|
||||
@@ -72,10 +68,10 @@ use crate::req_convert::common::preprocess_row_insert_requests;
|
||||
use crate::req_convert::insert::{
|
||||
fill_reqs_with_impure_default, ColumnToRow, RowToRegion, StatementToRegion, TableToRegion,
|
||||
};
|
||||
use crate::statement::StatementExecutor;
|
||||
use crate::schema_helper::SchemaHelper;
|
||||
|
||||
pub struct Inserter {
|
||||
catalog_manager: CatalogManagerRef,
|
||||
pub(crate) schema_helper: SchemaHelper,
|
||||
pub(crate) partition_manager: PartitionRuleManagerRef,
|
||||
pub(crate) node_manager: NodeManagerRef,
|
||||
pub(crate) table_flownode_set_cache: TableFlownodeSetCacheRef,
|
||||
@@ -85,7 +81,7 @@ pub type InserterRef = Arc<Inserter>;
|
||||
|
||||
/// Hint for the table type to create automatically.
|
||||
#[derive(Clone)]
|
||||
enum AutoCreateTableType {
|
||||
pub(crate) enum AutoCreateTableType {
|
||||
/// A logical table with the physical table name.
|
||||
Logical(String),
|
||||
/// A physical table.
|
||||
@@ -127,27 +123,34 @@ pub struct InstantAndNormalInsertRequests {
|
||||
|
||||
impl Inserter {
|
||||
pub fn new(
|
||||
catalog_manager: CatalogManagerRef,
|
||||
schema_helper: SchemaHelper,
|
||||
partition_manager: PartitionRuleManagerRef,
|
||||
node_manager: NodeManagerRef,
|
||||
table_flownode_set_cache: TableFlownodeSetCacheRef,
|
||||
) -> Self {
|
||||
Self {
|
||||
catalog_manager,
|
||||
schema_helper,
|
||||
partition_manager,
|
||||
node_manager,
|
||||
table_flownode_set_cache,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn partition_manager(&self) -> &PartitionRuleManagerRef {
|
||||
&self.partition_manager
|
||||
}
|
||||
|
||||
pub fn node_manager(&self) -> &NodeManagerRef {
|
||||
&self.node_manager
|
||||
}
|
||||
|
||||
pub async fn handle_column_inserts(
|
||||
&self,
|
||||
requests: InsertRequests,
|
||||
ctx: QueryContextRef,
|
||||
statement_executor: &StatementExecutor,
|
||||
) -> Result<Output> {
|
||||
let row_inserts = ColumnToRow::convert(requests)?;
|
||||
self.handle_row_inserts(row_inserts, ctx, statement_executor, false, false)
|
||||
self.handle_row_inserts(row_inserts, ctx, false, false)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -156,7 +159,6 @@ impl Inserter {
|
||||
&self,
|
||||
mut requests: RowInsertRequests,
|
||||
ctx: QueryContextRef,
|
||||
statement_executor: &StatementExecutor,
|
||||
accommodate_existing_schema: bool,
|
||||
is_single_value: bool,
|
||||
) -> Result<Output> {
|
||||
@@ -164,7 +166,6 @@ impl Inserter {
|
||||
self.handle_row_inserts_with_create_type(
|
||||
requests,
|
||||
ctx,
|
||||
statement_executor,
|
||||
AutoCreateTableType::Physical,
|
||||
accommodate_existing_schema,
|
||||
is_single_value,
|
||||
@@ -177,12 +178,10 @@ impl Inserter {
|
||||
&self,
|
||||
requests: RowInsertRequests,
|
||||
ctx: QueryContextRef,
|
||||
statement_executor: &StatementExecutor,
|
||||
) -> Result<Output> {
|
||||
self.handle_row_inserts_with_create_type(
|
||||
requests,
|
||||
ctx,
|
||||
statement_executor,
|
||||
AutoCreateTableType::Log,
|
||||
false,
|
||||
false,
|
||||
@@ -194,12 +193,10 @@ impl Inserter {
|
||||
&self,
|
||||
requests: RowInsertRequests,
|
||||
ctx: QueryContextRef,
|
||||
statement_executor: &StatementExecutor,
|
||||
) -> Result<Output> {
|
||||
self.handle_row_inserts_with_create_type(
|
||||
requests,
|
||||
ctx,
|
||||
statement_executor,
|
||||
AutoCreateTableType::Trace,
|
||||
false,
|
||||
false,
|
||||
@@ -212,14 +209,12 @@ impl Inserter {
|
||||
&self,
|
||||
requests: RowInsertRequests,
|
||||
ctx: QueryContextRef,
|
||||
statement_executor: &StatementExecutor,
|
||||
accommodate_existing_schema: bool,
|
||||
is_single_value: bool,
|
||||
) -> Result<Output> {
|
||||
self.handle_row_inserts_with_create_type(
|
||||
requests,
|
||||
ctx,
|
||||
statement_executor,
|
||||
AutoCreateTableType::LastNonNull,
|
||||
accommodate_existing_schema,
|
||||
is_single_value,
|
||||
@@ -232,7 +227,6 @@ impl Inserter {
|
||||
&self,
|
||||
mut requests: RowInsertRequests,
|
||||
ctx: QueryContextRef,
|
||||
statement_executor: &StatementExecutor,
|
||||
create_type: AutoCreateTableType,
|
||||
accommodate_existing_schema: bool,
|
||||
is_single_value: bool,
|
||||
@@ -254,7 +248,6 @@ impl Inserter {
|
||||
&mut requests,
|
||||
&ctx,
|
||||
create_type,
|
||||
statement_executor,
|
||||
accommodate_existing_schema,
|
||||
is_single_value,
|
||||
)
|
||||
@@ -280,7 +273,6 @@ impl Inserter {
|
||||
&self,
|
||||
mut requests: RowInsertRequests,
|
||||
ctx: QueryContextRef,
|
||||
statement_executor: &StatementExecutor,
|
||||
physical_table: String,
|
||||
) -> Result<Output> {
|
||||
// remove empty requests
|
||||
@@ -293,7 +285,8 @@ impl Inserter {
|
||||
validate_column_count_match(&requests)?;
|
||||
|
||||
// check and create physical table
|
||||
self.create_physical_table_on_demand(&ctx, physical_table.clone(), statement_executor)
|
||||
self.schema_helper
|
||||
.create_metric_physical_table(&ctx, physical_table.clone())
|
||||
.await?;
|
||||
|
||||
// check and create logical tables
|
||||
@@ -305,7 +298,6 @@ impl Inserter {
|
||||
&mut requests,
|
||||
&ctx,
|
||||
AutoCreateTableType::Logical(physical_table.to_string()),
|
||||
statement_executor,
|
||||
true,
|
||||
true,
|
||||
)
|
||||
@@ -350,10 +342,13 @@ impl Inserter {
|
||||
insert: &Insert,
|
||||
ctx: &QueryContextRef,
|
||||
) -> Result<Output> {
|
||||
let (inserts, table_info) =
|
||||
StatementToRegion::new(self.catalog_manager.as_ref(), &self.partition_manager, ctx)
|
||||
.convert(insert, ctx)
|
||||
.await?;
|
||||
let (inserts, table_info) = StatementToRegion::new(
|
||||
self.schema_helper.catalog_manager().as_ref(),
|
||||
&self.partition_manager,
|
||||
ctx,
|
||||
)
|
||||
.convert(insert, ctx)
|
||||
.await?;
|
||||
|
||||
let table_infos =
|
||||
HashMap::from_iter([(table_info.table_id(), table_info.clone())].into_iter());
|
||||
@@ -482,7 +477,6 @@ impl Inserter {
|
||||
requests: &mut RowInsertRequests,
|
||||
ctx: &QueryContextRef,
|
||||
auto_create_table_type: AutoCreateTableType,
|
||||
statement_executor: &StatementExecutor,
|
||||
accommodate_existing_schema: bool,
|
||||
is_single_value: bool,
|
||||
) -> Result<CreateAlterTableResult> {
|
||||
@@ -543,7 +537,7 @@ impl Inserter {
|
||||
instant_table_ids.insert(table_info.table_id());
|
||||
}
|
||||
table_infos.insert(table_info.table_id(), table.table_info());
|
||||
if let Some(alter_expr) = self.get_alter_table_expr_on_demand(
|
||||
if let Some(alter_expr) = Self::get_alter_table_expr_on_demand(
|
||||
req,
|
||||
&table,
|
||||
ctx,
|
||||
@@ -565,9 +559,7 @@ impl Inserter {
|
||||
AutoCreateTableType::Logical(_) => {
|
||||
if !create_tables.is_empty() {
|
||||
// Creates logical tables in batch.
|
||||
let tables = self
|
||||
.create_logical_tables(create_tables, ctx, statement_executor)
|
||||
.await?;
|
||||
let tables = self.create_logical_tables(create_tables, ctx).await?;
|
||||
|
||||
for table in tables {
|
||||
let table_info = table.table_info();
|
||||
@@ -579,7 +571,7 @@ impl Inserter {
|
||||
}
|
||||
if !alter_tables.is_empty() {
|
||||
// Alter logical tables in batch.
|
||||
statement_executor
|
||||
self.schema_helper
|
||||
.alter_logical_tables(alter_tables, ctx.clone())
|
||||
.await?;
|
||||
}
|
||||
@@ -590,9 +582,7 @@ impl Inserter {
|
||||
// note that auto create table shouldn't be ttl instant table
|
||||
// for it's a very unexpected behavior and should be set by user explicitly
|
||||
for create_table in create_tables {
|
||||
let table = self
|
||||
.create_physical_table(create_table, None, ctx, statement_executor)
|
||||
.await?;
|
||||
let table = self.create_physical_table(create_table, None, ctx).await?;
|
||||
let table_info = table.table_info();
|
||||
if table_info.is_ttl_instant_table() {
|
||||
instant_table_ids.insert(table_info.table_id());
|
||||
@@ -600,8 +590,8 @@ impl Inserter {
|
||||
table_infos.insert(table_info.table_id(), table.table_info());
|
||||
}
|
||||
for alter_expr in alter_tables.into_iter() {
|
||||
statement_executor
|
||||
.alter_table_inner(alter_expr, ctx.clone())
|
||||
self.schema_helper
|
||||
.alter_table_by_expr(alter_expr, ctx.clone())
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
@@ -619,9 +609,7 @@ impl Inserter {
|
||||
create_table
|
||||
.table_options
|
||||
.insert(APPEND_MODE_KEY.to_string(), "false".to_string());
|
||||
let table = self
|
||||
.create_physical_table(create_table, None, ctx, statement_executor)
|
||||
.await?;
|
||||
let table = self.create_physical_table(create_table, None, ctx).await?;
|
||||
let table_info = table.table_info();
|
||||
if table_info.is_ttl_instant_table() {
|
||||
instant_table_ids.insert(table_info.table_id());
|
||||
@@ -662,12 +650,7 @@ impl Inserter {
|
||||
);
|
||||
|
||||
let table = self
|
||||
.create_physical_table(
|
||||
create_table,
|
||||
Some(partitions),
|
||||
ctx,
|
||||
statement_executor,
|
||||
)
|
||||
.create_physical_table(create_table, Some(partitions), ctx)
|
||||
.await?;
|
||||
let table_info = table.table_info();
|
||||
if table_info.is_ttl_instant_table() {
|
||||
@@ -677,8 +660,8 @@ impl Inserter {
|
||||
}
|
||||
}
|
||||
for alter_expr in alter_tables.into_iter() {
|
||||
statement_executor
|
||||
.alter_table_inner(alter_expr, ctx.clone())
|
||||
self.schema_helper
|
||||
.alter_table_by_expr(alter_expr, ctx.clone())
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
@@ -690,79 +673,13 @@ impl Inserter {
|
||||
})
|
||||
}
|
||||
|
||||
async fn create_physical_table_on_demand(
|
||||
&self,
|
||||
ctx: &QueryContextRef,
|
||||
physical_table: String,
|
||||
statement_executor: &StatementExecutor,
|
||||
) -> Result<()> {
|
||||
let catalog_name = ctx.current_catalog();
|
||||
let schema_name = ctx.current_schema();
|
||||
|
||||
// check if exist
|
||||
if self
|
||||
.get_table(catalog_name, &schema_name, &physical_table)
|
||||
.await?
|
||||
.is_some()
|
||||
{
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let table_reference = TableReference::full(catalog_name, &schema_name, &physical_table);
|
||||
info!("Physical metric table `{table_reference}` does not exist, try creating table");
|
||||
|
||||
// schema with timestamp and field column
|
||||
let default_schema = vec![
|
||||
ColumnSchema {
|
||||
column_name: GREPTIME_TIMESTAMP.to_string(),
|
||||
datatype: ColumnDataType::TimestampMillisecond as _,
|
||||
semantic_type: SemanticType::Timestamp as _,
|
||||
datatype_extension: None,
|
||||
options: None,
|
||||
},
|
||||
ColumnSchema {
|
||||
column_name: GREPTIME_VALUE.to_string(),
|
||||
datatype: ColumnDataType::Float64 as _,
|
||||
semantic_type: SemanticType::Field as _,
|
||||
datatype_extension: None,
|
||||
options: None,
|
||||
},
|
||||
];
|
||||
let create_table_expr =
|
||||
&mut build_create_table_expr(&table_reference, &default_schema, default_engine())?;
|
||||
|
||||
create_table_expr.engine = METRIC_ENGINE_NAME.to_string();
|
||||
create_table_expr
|
||||
.table_options
|
||||
.insert(PHYSICAL_TABLE_METADATA_KEY.to_string(), "true".to_string());
|
||||
|
||||
// create physical table
|
||||
let res = statement_executor
|
||||
.create_table_inner(create_table_expr, None, ctx.clone())
|
||||
.await;
|
||||
|
||||
match res {
|
||||
Ok(_) => {
|
||||
info!("Successfully created table {table_reference}",);
|
||||
Ok(())
|
||||
}
|
||||
Err(err) => {
|
||||
error!(err; "Failed to create table {table_reference}");
|
||||
Err(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_table(
|
||||
&self,
|
||||
catalog: &str,
|
||||
schema: &str,
|
||||
table: &str,
|
||||
) -> Result<Option<TableRef>> {
|
||||
self.catalog_manager
|
||||
.table(catalog, schema, table, None)
|
||||
.await
|
||||
.context(CatalogSnafu)
|
||||
self.schema_helper.get_table(catalog, schema, table).await
|
||||
}
|
||||
|
||||
fn get_create_table_expr_on_demand(
|
||||
@@ -771,38 +688,9 @@ impl Inserter {
|
||||
create_type: &AutoCreateTableType,
|
||||
ctx: &QueryContextRef,
|
||||
) -> Result<CreateTableExpr> {
|
||||
let mut table_options = Vec::with_capacity(4);
|
||||
for key in VALID_TABLE_OPTION_KEYS {
|
||||
if let Some(value) = ctx.extension(key) {
|
||||
table_options.push((key, value));
|
||||
}
|
||||
}
|
||||
|
||||
let mut engine_name = default_engine();
|
||||
match create_type {
|
||||
AutoCreateTableType::Logical(physical_table) => {
|
||||
engine_name = METRIC_ENGINE_NAME;
|
||||
table_options.push((LOGICAL_TABLE_METADATA_KEY, physical_table));
|
||||
}
|
||||
AutoCreateTableType::Physical => {
|
||||
if let Some(append_mode) = ctx.extension(APPEND_MODE_KEY) {
|
||||
table_options.push((APPEND_MODE_KEY, append_mode));
|
||||
}
|
||||
if let Some(merge_mode) = ctx.extension(MERGE_MODE_KEY) {
|
||||
table_options.push((MERGE_MODE_KEY, merge_mode));
|
||||
}
|
||||
}
|
||||
// Set append_mode to true for log table.
|
||||
// because log tables should keep rows with the same ts and tags.
|
||||
AutoCreateTableType::Log => {
|
||||
table_options.push((APPEND_MODE_KEY, "true"));
|
||||
}
|
||||
AutoCreateTableType::LastNonNull => {
|
||||
table_options.push((MERGE_MODE_KEY, "last_non_null"));
|
||||
}
|
||||
AutoCreateTableType::Trace => {
|
||||
table_options.push((APPEND_MODE_KEY, "true"));
|
||||
}
|
||||
if matches!(create_type, AutoCreateTableType::Logical(_)) {
|
||||
engine_name = METRIC_ENGINE_NAME;
|
||||
}
|
||||
|
||||
let schema = ctx.current_schema();
|
||||
@@ -813,11 +701,9 @@ impl Inserter {
|
||||
build_create_table_expr(&table_ref, request_schema, engine_name)?;
|
||||
|
||||
info!("Table `{table_ref}` does not exist, try creating table");
|
||||
for (k, v) in table_options {
|
||||
create_table_expr
|
||||
.table_options
|
||||
.insert(k.to_string(), v.to_string());
|
||||
}
|
||||
|
||||
// Use the common fill_table_options_for_create function to populate table options
|
||||
fill_table_options_for_create(&mut create_table_expr.table_options, create_type, ctx);
|
||||
|
||||
Ok(create_table_expr)
|
||||
}
|
||||
@@ -830,7 +716,6 @@ impl Inserter {
|
||||
/// When `accommodate_existing_schema` is true and `is_single_value` is true, it also consider fields when modifying the
|
||||
/// input `req`.
|
||||
fn get_alter_table_expr_on_demand(
|
||||
&self,
|
||||
req: &mut RowInsertRequest,
|
||||
table: &TableRef,
|
||||
ctx: &QueryContextRef,
|
||||
@@ -918,7 +803,6 @@ impl Inserter {
|
||||
mut create_table_expr: CreateTableExpr,
|
||||
partitions: Option<Partitions>,
|
||||
ctx: &QueryContextRef,
|
||||
statement_executor: &StatementExecutor,
|
||||
) -> Result<TableRef> {
|
||||
{
|
||||
let table_ref = TableReference::full(
|
||||
@@ -929,8 +813,9 @@ impl Inserter {
|
||||
|
||||
info!("Table `{table_ref}` does not exist, try creating table");
|
||||
}
|
||||
let res = statement_executor
|
||||
.create_table_inner(&mut create_table_expr, partitions, ctx.clone())
|
||||
let res = self
|
||||
.schema_helper
|
||||
.create_table_by_expr(&mut create_table_expr, partitions, ctx.clone())
|
||||
.await;
|
||||
|
||||
let table_ref = TableReference::full(
|
||||
@@ -958,9 +843,9 @@ impl Inserter {
|
||||
&self,
|
||||
create_table_exprs: Vec<CreateTableExpr>,
|
||||
ctx: &QueryContextRef,
|
||||
statement_executor: &StatementExecutor,
|
||||
) -> Result<Vec<TableRef>> {
|
||||
let res = statement_executor
|
||||
let res = self
|
||||
.schema_helper
|
||||
.create_logical_tables(&create_table_exprs, ctx.clone())
|
||||
.await;
|
||||
|
||||
@@ -1011,7 +896,49 @@ fn validate_column_count_match(requests: &RowInsertRequests) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn build_create_table_expr(
|
||||
/// Fill table options for a new table by create type.
|
||||
pub(crate) fn fill_table_options_for_create(
|
||||
table_options: &mut std::collections::HashMap<String, String>,
|
||||
create_type: &AutoCreateTableType,
|
||||
ctx: &QueryContextRef,
|
||||
) {
|
||||
for key in VALID_TABLE_OPTION_KEYS {
|
||||
if let Some(value) = ctx.extension(key) {
|
||||
table_options.insert(key.to_string(), value.to_string());
|
||||
}
|
||||
}
|
||||
|
||||
match create_type {
|
||||
AutoCreateTableType::Logical(physical_table) => {
|
||||
table_options.insert(
|
||||
LOGICAL_TABLE_METADATA_KEY.to_string(),
|
||||
physical_table.to_string(),
|
||||
);
|
||||
}
|
||||
AutoCreateTableType::Physical => {
|
||||
if let Some(append_mode) = ctx.extension(APPEND_MODE_KEY) {
|
||||
table_options.insert(APPEND_MODE_KEY.to_string(), append_mode.to_string());
|
||||
}
|
||||
if let Some(merge_mode) = ctx.extension(MERGE_MODE_KEY) {
|
||||
table_options.insert(MERGE_MODE_KEY.to_string(), merge_mode.to_string());
|
||||
}
|
||||
}
|
||||
// Set append_mode to true for log table.
|
||||
// because log tables should keep rows with the same ts and tags.
|
||||
AutoCreateTableType::Log => {
|
||||
table_options.insert(APPEND_MODE_KEY.to_string(), "true".to_string());
|
||||
}
|
||||
AutoCreateTableType::LastNonNull => {
|
||||
table_options.insert(MERGE_MODE_KEY.to_string(), "last_non_null".to_string());
|
||||
}
|
||||
AutoCreateTableType::Trace => {
|
||||
table_options.insert(APPEND_MODE_KEY.to_string(), "true".to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Builds a [CreateTableExpr] for the given table and schema.
|
||||
pub(crate) fn build_create_table_expr(
|
||||
table: &TableReference,
|
||||
request_schema: &[ColumnSchema],
|
||||
engine: &str,
|
||||
@@ -1144,19 +1071,14 @@ mod tests {
|
||||
|
||||
use api::v1::{ColumnSchema as GrpcColumnSchema, RowInsertRequest, Rows, SemanticType, Value};
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use common_meta::cache::new_table_flownode_set_cache;
|
||||
use common_meta::ddl::test_util::datanode_handler::NaiveDatanodeHandler;
|
||||
use common_meta::test_util::MockDatanodeManager;
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::schema::ColumnSchema;
|
||||
use moka::future::Cache;
|
||||
use session::context::QueryContext;
|
||||
use table::dist_table::DummyDataSource;
|
||||
use table::metadata::{TableInfoBuilder, TableMetaBuilder, TableType};
|
||||
use table::TableRef;
|
||||
|
||||
use super::*;
|
||||
use crate::tests::{create_partition_rule_manager, prepare_mocked_backend};
|
||||
|
||||
fn make_table_ref_with_schema(ts_name: &str, field_name: &str) -> TableRef {
|
||||
let schema = datatypes::schema::SchemaBuilder::try_from_columns(vec![
|
||||
@@ -1236,20 +1158,8 @@ mod tests {
|
||||
DEFAULT_SCHEMA_NAME,
|
||||
));
|
||||
|
||||
let kv_backend = prepare_mocked_backend().await;
|
||||
let inserter = Inserter::new(
|
||||
catalog::memory::MemoryCatalogManager::new(),
|
||||
create_partition_rule_manager(kv_backend.clone()).await,
|
||||
Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)),
|
||||
Arc::new(new_table_flownode_set_cache(
|
||||
String::new(),
|
||||
Cache::new(100),
|
||||
kv_backend.clone(),
|
||||
)),
|
||||
);
|
||||
let alter_expr = inserter
|
||||
.get_alter_table_expr_on_demand(&mut req, &table, &ctx, true, true)
|
||||
.unwrap();
|
||||
let alter_expr =
|
||||
Inserter::get_alter_table_expr_on_demand(&mut req, &table, &ctx, true, true).unwrap();
|
||||
assert!(alter_expr.is_none());
|
||||
|
||||
// The request's schema should have updated names for timestamp and field columns
|
||||
|
||||
@@ -27,6 +27,7 @@ pub mod procedure;
|
||||
pub mod region_req_factory;
|
||||
pub mod req_convert;
|
||||
pub mod request;
|
||||
pub mod schema_helper;
|
||||
pub mod statement;
|
||||
pub mod table;
|
||||
#[cfg(test)]
|
||||
|
||||
799
src/operator/src/schema_helper.rs
Normal file
799
src/operator/src/schema_helper.rs
Normal file
@@ -0,0 +1,799 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Utilities to deal with table schemas.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::alter_table_expr::Kind;
|
||||
use api::v1::region::region_request::Body;
|
||||
use api::v1::region::{ListMetadataRequest, RegionRequestHeader};
|
||||
use api::v1::{AlterTableExpr, ColumnDataType, ColumnSchema, CreateTableExpr, SemanticType};
|
||||
use catalog::CatalogManagerRef;
|
||||
use common_catalog::consts::{
|
||||
default_engine, is_readonly_schema, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME,
|
||||
};
|
||||
use common_catalog::format_full_table_name;
|
||||
use common_grpc_expr::util::ColumnExpr;
|
||||
use common_meta::cache_invalidator::{CacheInvalidatorRef, Context};
|
||||
use common_meta::ddl::{ExecutorContext, ProcedureExecutorRef};
|
||||
use common_meta::instruction::CacheIdent;
|
||||
use common_meta::key::schema_name::SchemaNameKey;
|
||||
use common_meta::key::table_route::TableRouteManager;
|
||||
use common_meta::key::TableMetadataManagerRef;
|
||||
use common_meta::node_manager::NodeManagerRef;
|
||||
use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse};
|
||||
use common_meta::rpc::router::Partition;
|
||||
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
|
||||
use common_query::Output;
|
||||
use common_telemetry::tracing;
|
||||
use common_telemetry::tracing_context::TracingContext;
|
||||
use futures::future;
|
||||
use partition::manager::PartitionRuleManagerRef;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use sql::statements::create::Partitions;
|
||||
use store_api::metadata::RegionMetadata;
|
||||
use store_api::metric_engine_consts::{
|
||||
LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY,
|
||||
};
|
||||
use store_api::storage::RegionId;
|
||||
use table::dist_table::DistTable;
|
||||
use table::metadata::{RawTableInfo, TableId, TableInfo};
|
||||
use table::table_name::TableName;
|
||||
use table::table_reference::TableReference;
|
||||
use table::TableRef;
|
||||
|
||||
use crate::error::{
|
||||
CatalogSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu, DecodeJsonSnafu,
|
||||
EmptyDdlExprSnafu, ExecuteDdlSnafu, FindRegionLeaderSnafu, InvalidPartitionRuleSnafu,
|
||||
InvalidTableNameSnafu, InvalidateTableCacheSnafu, JoinTaskSnafu, RequestRegionSnafu, Result,
|
||||
SchemaNotFoundSnafu, SchemaReadOnlySnafu, TableAlreadyExistsSnafu, TableMetadataManagerSnafu,
|
||||
TableNotFoundSnafu, UnexpectedSnafu,
|
||||
};
|
||||
use crate::expr_helper;
|
||||
use crate::insert::{build_create_table_expr, fill_table_options_for_create, AutoCreateTableType};
|
||||
use crate::region_req_factory::RegionRequestFactory;
|
||||
use crate::statement::ddl::{create_table_info, parse_partitions, verify_alter, NAME_PATTERN_REG};
|
||||
|
||||
/// Helper to query and manipulate (CREATE/ALTER) table schemas.
|
||||
#[derive(Clone)]
|
||||
pub struct SchemaHelper {
|
||||
catalog_manager: CatalogManagerRef,
|
||||
table_metadata_manager: TableMetadataManagerRef,
|
||||
procedure_executor: ProcedureExecutorRef,
|
||||
cache_invalidator: CacheInvalidatorRef,
|
||||
}
|
||||
|
||||
impl SchemaHelper {
|
||||
/// Creates a new [`SchemaHelper`].
|
||||
pub fn new(
|
||||
catalog_manager: CatalogManagerRef,
|
||||
table_metadata_manager: TableMetadataManagerRef,
|
||||
procedure_executor: ProcedureExecutorRef,
|
||||
cache_invalidator: CacheInvalidatorRef,
|
||||
) -> Self {
|
||||
Self {
|
||||
catalog_manager,
|
||||
table_metadata_manager,
|
||||
procedure_executor,
|
||||
cache_invalidator,
|
||||
}
|
||||
}
|
||||
|
||||
/// Gets the table by catalog, schema and table name.
|
||||
pub async fn get_table(
|
||||
&self,
|
||||
catalog: &str,
|
||||
schema: &str,
|
||||
table: &str,
|
||||
) -> Result<Option<TableRef>> {
|
||||
self.catalog_manager
|
||||
.table(catalog, schema, table, None)
|
||||
.await
|
||||
.context(CatalogSnafu)
|
||||
}
|
||||
|
||||
// TODO(yingwen): Can we create the physical table with all columns from the prometheus metrics?
|
||||
/// Creates a physical table for metric engine.
|
||||
///
|
||||
/// If table already exists, do nothing.
|
||||
pub async fn create_metric_physical_table(
|
||||
&self,
|
||||
ctx: &QueryContextRef,
|
||||
physical_table: String,
|
||||
) -> Result<()> {
|
||||
let catalog_name = ctx.current_catalog();
|
||||
let schema_name = ctx.current_schema();
|
||||
|
||||
// check if exist
|
||||
if self
|
||||
.get_table(catalog_name, &schema_name, &physical_table)
|
||||
.await?
|
||||
.is_some()
|
||||
{
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let table_reference = TableReference::full(catalog_name, &schema_name, &physical_table);
|
||||
common_telemetry::info!(
|
||||
"Physical metric table `{table_reference}` does not exist, try creating table"
|
||||
);
|
||||
|
||||
// schema with timestamp and field column
|
||||
let default_schema = vec![
|
||||
ColumnSchema {
|
||||
column_name: GREPTIME_TIMESTAMP.to_string(),
|
||||
datatype: ColumnDataType::TimestampMillisecond as _,
|
||||
semantic_type: SemanticType::Timestamp as _,
|
||||
datatype_extension: None,
|
||||
options: None,
|
||||
},
|
||||
ColumnSchema {
|
||||
column_name: GREPTIME_VALUE.to_string(),
|
||||
datatype: ColumnDataType::Float64 as _,
|
||||
semantic_type: SemanticType::Field as _,
|
||||
datatype_extension: None,
|
||||
options: None,
|
||||
},
|
||||
];
|
||||
let create_table_expr =
|
||||
&mut build_create_table_expr(&table_reference, &default_schema, default_engine())?;
|
||||
create_table_expr.engine = METRIC_ENGINE_NAME.to_string();
|
||||
create_table_expr
|
||||
.table_options
|
||||
.insert(PHYSICAL_TABLE_METADATA_KEY.to_string(), "true".to_string());
|
||||
|
||||
// create physical table.
|
||||
// TODO(yingwen): Simplify this function. But remember to start the timer.
|
||||
let res = self
|
||||
.create_table_by_expr(create_table_expr, None, ctx.clone())
|
||||
.await;
|
||||
match res {
|
||||
Ok(_) => {
|
||||
common_telemetry::info!("Successfully created table {table_reference}",);
|
||||
Ok(())
|
||||
}
|
||||
Err(err) => {
|
||||
common_telemetry::error!(err; "Failed to create table {table_reference}");
|
||||
Err(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a table by [CreateTableExpr].
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub async fn create_table_by_expr(
|
||||
&self,
|
||||
create_table: &mut CreateTableExpr,
|
||||
partitions: Option<Partitions>,
|
||||
query_ctx: QueryContextRef,
|
||||
) -> Result<TableRef> {
|
||||
ensure!(
|
||||
!is_readonly_schema(&create_table.schema_name),
|
||||
SchemaReadOnlySnafu {
|
||||
name: create_table.schema_name.clone()
|
||||
}
|
||||
);
|
||||
|
||||
if create_table.engine == METRIC_ENGINE_NAME
|
||||
&& create_table
|
||||
.table_options
|
||||
.contains_key(LOGICAL_TABLE_METADATA_KEY)
|
||||
{
|
||||
// Create logical tables
|
||||
ensure!(
|
||||
partitions.is_none(),
|
||||
InvalidPartitionRuleSnafu {
|
||||
reason: "logical table in metric engine should not have partition rule, it will be inherited from physical table",
|
||||
}
|
||||
);
|
||||
self.create_logical_tables(std::slice::from_ref(create_table), query_ctx)
|
||||
.await?
|
||||
.into_iter()
|
||||
.next()
|
||||
.context(UnexpectedSnafu {
|
||||
violated: "expected to create logical tables",
|
||||
})
|
||||
} else {
|
||||
// Create other normal table
|
||||
self.create_non_logic_table(create_table, partitions, query_ctx)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a non-logical table.
|
||||
/// - If the schema doesn't exist, returns an error
|
||||
/// - If the table already exists:
|
||||
/// - If `create_if_not_exists` is true, returns the existing table
|
||||
/// - If `create_if_not_exists` is false, returns an error
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub async fn create_non_logic_table(
|
||||
&self,
|
||||
create_table: &mut CreateTableExpr,
|
||||
partitions: Option<Partitions>,
|
||||
query_ctx: QueryContextRef,
|
||||
) -> Result<TableRef> {
|
||||
let _timer = crate::metrics::DIST_CREATE_TABLE.start_timer();
|
||||
|
||||
// Check if schema exists
|
||||
let schema = self
|
||||
.table_metadata_manager
|
||||
.schema_manager()
|
||||
.get(SchemaNameKey::new(
|
||||
&create_table.catalog_name,
|
||||
&create_table.schema_name,
|
||||
))
|
||||
.await
|
||||
.context(TableMetadataManagerSnafu)?;
|
||||
ensure!(
|
||||
schema.is_some(),
|
||||
SchemaNotFoundSnafu {
|
||||
schema_info: &create_table.schema_name,
|
||||
}
|
||||
);
|
||||
|
||||
// if table exists.
|
||||
if let Some(table) = self
|
||||
.catalog_manager
|
||||
.table(
|
||||
&create_table.catalog_name,
|
||||
&create_table.schema_name,
|
||||
&create_table.table_name,
|
||||
Some(&query_ctx),
|
||||
)
|
||||
.await
|
||||
.context(CatalogSnafu)?
|
||||
{
|
||||
return if create_table.create_if_not_exists {
|
||||
Ok(table)
|
||||
} else {
|
||||
TableAlreadyExistsSnafu {
|
||||
table: format_full_table_name(
|
||||
&create_table.catalog_name,
|
||||
&create_table.schema_name,
|
||||
&create_table.table_name,
|
||||
),
|
||||
}
|
||||
.fail()
|
||||
};
|
||||
}
|
||||
|
||||
ensure!(
|
||||
NAME_PATTERN_REG.is_match(&create_table.table_name),
|
||||
InvalidTableNameSnafu {
|
||||
table_name: &create_table.table_name,
|
||||
}
|
||||
);
|
||||
|
||||
let table_name = TableName::new(
|
||||
&create_table.catalog_name,
|
||||
&create_table.schema_name,
|
||||
&create_table.table_name,
|
||||
);
|
||||
|
||||
let (partitions, partition_cols) = parse_partitions(create_table, partitions, &query_ctx)?;
|
||||
let mut table_info = create_table_info(create_table, partition_cols)?;
|
||||
|
||||
let resp = self
|
||||
.create_table_procedure(
|
||||
create_table.clone(),
|
||||
partitions,
|
||||
table_info.clone(),
|
||||
query_ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let table_id = resp.table_ids.into_iter().next().context(UnexpectedSnafu {
|
||||
violated: "expected table_id",
|
||||
})?;
|
||||
common_telemetry::info!(
|
||||
"Successfully created table '{table_name}' with table id {table_id}"
|
||||
);
|
||||
|
||||
table_info.ident.table_id = table_id;
|
||||
|
||||
let table_info: Arc<TableInfo> =
|
||||
Arc::new(table_info.try_into().context(CreateTableInfoSnafu)?);
|
||||
create_table.table_id = Some(api::v1::TableId { id: table_id });
|
||||
|
||||
let table = DistTable::table(table_info);
|
||||
|
||||
Ok(table)
|
||||
}
|
||||
|
||||
/// Creates logical tables.
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub async fn create_logical_tables(
|
||||
&self,
|
||||
create_table_exprs: &[CreateTableExpr],
|
||||
query_context: QueryContextRef,
|
||||
) -> Result<Vec<TableRef>> {
|
||||
let _timer = crate::metrics::DIST_CREATE_TABLES.start_timer();
|
||||
ensure!(
|
||||
!create_table_exprs.is_empty(),
|
||||
EmptyDdlExprSnafu {
|
||||
name: "create logic tables"
|
||||
}
|
||||
);
|
||||
|
||||
// Check table names
|
||||
for create_table in create_table_exprs {
|
||||
ensure!(
|
||||
NAME_PATTERN_REG.is_match(&create_table.table_name),
|
||||
InvalidTableNameSnafu {
|
||||
table_name: &create_table.table_name,
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
let mut raw_tables_info = create_table_exprs
|
||||
.iter()
|
||||
.map(|create| create_table_info(create, vec![]))
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
let tables_data = create_table_exprs
|
||||
.iter()
|
||||
.cloned()
|
||||
.zip(raw_tables_info.iter().cloned())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let resp = self
|
||||
.create_logical_tables_procedure(tables_data, query_context)
|
||||
.await?;
|
||||
|
||||
let table_ids = resp.table_ids;
|
||||
ensure!(table_ids.len() == raw_tables_info.len(), CreateLogicalTablesSnafu {
|
||||
reason: format!("The number of tables is inconsistent with the expected number to be created, expected: {}, actual: {}", raw_tables_info.len(), table_ids.len())
|
||||
});
|
||||
common_telemetry::info!("Successfully created logical tables: {:?}", table_ids);
|
||||
|
||||
for (i, table_info) in raw_tables_info.iter_mut().enumerate() {
|
||||
table_info.ident.table_id = table_ids[i];
|
||||
}
|
||||
let tables_info = raw_tables_info
|
||||
.into_iter()
|
||||
.map(|x| x.try_into().context(CreateTableInfoSnafu))
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
|
||||
Ok(tables_info
|
||||
.into_iter()
|
||||
.map(|x| DistTable::table(Arc::new(x)))
|
||||
.collect())
|
||||
}
|
||||
|
||||
/// Alters a table by [AlterTableExpr].
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub async fn alter_table_by_expr(
|
||||
&self,
|
||||
expr: AlterTableExpr,
|
||||
query_context: QueryContextRef,
|
||||
) -> Result<Output> {
|
||||
ensure!(
|
||||
!is_readonly_schema(&expr.schema_name),
|
||||
SchemaReadOnlySnafu {
|
||||
name: expr.schema_name.clone()
|
||||
}
|
||||
);
|
||||
|
||||
let catalog_name = if expr.catalog_name.is_empty() {
|
||||
DEFAULT_CATALOG_NAME.to_string()
|
||||
} else {
|
||||
expr.catalog_name.clone()
|
||||
};
|
||||
|
||||
let schema_name = if expr.schema_name.is_empty() {
|
||||
DEFAULT_SCHEMA_NAME.to_string()
|
||||
} else {
|
||||
expr.schema_name.clone()
|
||||
};
|
||||
|
||||
let table_name = expr.table_name.clone();
|
||||
|
||||
let table = self
|
||||
.catalog_manager
|
||||
.table(
|
||||
&catalog_name,
|
||||
&schema_name,
|
||||
&table_name,
|
||||
Some(&query_context),
|
||||
)
|
||||
.await
|
||||
.context(CatalogSnafu)?
|
||||
.with_context(|| TableNotFoundSnafu {
|
||||
table_name: format_full_table_name(&catalog_name, &schema_name, &table_name),
|
||||
})?;
|
||||
|
||||
let table_id = table.table_info().ident.table_id;
|
||||
let need_alter = verify_alter(table_id, table.table_info(), expr.clone())?;
|
||||
if !need_alter {
|
||||
return Ok(Output::new_with_affected_rows(0));
|
||||
}
|
||||
common_telemetry::info!(
|
||||
"Table info before alter is {:?}, expr: {:?}",
|
||||
table.table_info(),
|
||||
expr
|
||||
);
|
||||
|
||||
let physical_table_id = self
|
||||
.table_metadata_manager
|
||||
.table_route_manager()
|
||||
.get_physical_table_id(table_id)
|
||||
.await
|
||||
.context(TableMetadataManagerSnafu)?;
|
||||
|
||||
let (req, invalidate_keys) = if physical_table_id == table_id {
|
||||
// This is physical table
|
||||
let req = SubmitDdlTaskRequest {
|
||||
query_context,
|
||||
task: DdlTask::new_alter_table(expr),
|
||||
};
|
||||
|
||||
let invalidate_keys = vec![
|
||||
CacheIdent::TableId(table_id),
|
||||
CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
|
||||
];
|
||||
|
||||
(req, invalidate_keys)
|
||||
} else {
|
||||
// This is logical table
|
||||
let req = SubmitDdlTaskRequest {
|
||||
query_context,
|
||||
task: DdlTask::new_alter_logical_tables(vec![expr]),
|
||||
};
|
||||
|
||||
let mut invalidate_keys = vec![
|
||||
CacheIdent::TableId(physical_table_id),
|
||||
CacheIdent::TableId(table_id),
|
||||
CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
|
||||
];
|
||||
|
||||
let physical_table = self
|
||||
.table_metadata_manager
|
||||
.table_info_manager()
|
||||
.get(physical_table_id)
|
||||
.await
|
||||
.context(TableMetadataManagerSnafu)?
|
||||
.map(|x| x.into_inner());
|
||||
if let Some(physical_table) = physical_table {
|
||||
let physical_table_name = TableName::new(
|
||||
physical_table.table_info.catalog_name,
|
||||
physical_table.table_info.schema_name,
|
||||
physical_table.table_info.name,
|
||||
);
|
||||
invalidate_keys.push(CacheIdent::TableName(physical_table_name));
|
||||
}
|
||||
|
||||
(req, invalidate_keys)
|
||||
};
|
||||
|
||||
self.procedure_executor
|
||||
.submit_ddl_task(&ExecutorContext::default(), req)
|
||||
.await
|
||||
.context(ExecuteDdlSnafu)?;
|
||||
|
||||
// Invalidates local cache ASAP.
|
||||
self.cache_invalidator
|
||||
.invalidate(&Context::default(), &invalidate_keys)
|
||||
.await
|
||||
.context(InvalidateTableCacheSnafu)?;
|
||||
|
||||
Ok(Output::new_with_affected_rows(0))
|
||||
}
|
||||
|
||||
/// Alter logical tables.
|
||||
pub async fn alter_logical_tables(
|
||||
&self,
|
||||
alter_table_exprs: Vec<AlterTableExpr>,
|
||||
query_context: QueryContextRef,
|
||||
) -> Result<Output> {
|
||||
let _timer = crate::metrics::DIST_ALTER_TABLES.start_timer();
|
||||
ensure!(
|
||||
!alter_table_exprs.is_empty(),
|
||||
EmptyDdlExprSnafu {
|
||||
name: "alter logical tables"
|
||||
}
|
||||
);
|
||||
|
||||
// group by physical table id
|
||||
let mut groups: HashMap<TableId, Vec<AlterTableExpr>> = HashMap::new();
|
||||
for expr in alter_table_exprs {
|
||||
// Get table_id from catalog_manager
|
||||
let catalog = if expr.catalog_name.is_empty() {
|
||||
query_context.current_catalog()
|
||||
} else {
|
||||
&expr.catalog_name
|
||||
};
|
||||
let schema = if expr.schema_name.is_empty() {
|
||||
query_context.current_schema()
|
||||
} else {
|
||||
expr.schema_name.to_string()
|
||||
};
|
||||
let table_name = &expr.table_name;
|
||||
let table = self
|
||||
.catalog_manager
|
||||
.table(catalog, &schema, table_name, Some(&query_context))
|
||||
.await
|
||||
.context(CatalogSnafu)?
|
||||
.with_context(|| TableNotFoundSnafu {
|
||||
table_name: format_full_table_name(catalog, &schema, table_name),
|
||||
})?;
|
||||
let table_id = table.table_info().ident.table_id;
|
||||
let physical_table_id = self
|
||||
.table_metadata_manager
|
||||
.table_route_manager()
|
||||
.get_physical_table_id(table_id)
|
||||
.await
|
||||
.context(TableMetadataManagerSnafu)?;
|
||||
groups.entry(physical_table_id).or_default().push(expr);
|
||||
}
|
||||
|
||||
// Submit procedure for each physical table
|
||||
let mut handles = Vec::with_capacity(groups.len());
|
||||
for (_physical_table_id, exprs) in groups {
|
||||
let fut = self.alter_logical_tables_procedure(exprs, query_context.clone());
|
||||
handles.push(fut);
|
||||
}
|
||||
let _results = futures::future::try_join_all(handles).await?;
|
||||
|
||||
Ok(Output::new_with_affected_rows(0))
|
||||
}
|
||||
|
||||
/// Returns the catalog manager.
|
||||
pub fn catalog_manager(&self) -> &CatalogManagerRef {
|
||||
&self.catalog_manager
|
||||
}
|
||||
|
||||
/// Returns the table route manager.
|
||||
pub fn table_route_manager(&self) -> &TableRouteManager {
|
||||
self.table_metadata_manager.table_route_manager()
|
||||
}
|
||||
|
||||
/// Submits a procedure to create a non-logical table.
|
||||
async fn create_table_procedure(
|
||||
&self,
|
||||
create_table: CreateTableExpr,
|
||||
partitions: Vec<Partition>,
|
||||
table_info: RawTableInfo,
|
||||
query_context: QueryContextRef,
|
||||
) -> Result<SubmitDdlTaskResponse> {
|
||||
let partitions = partitions.into_iter().map(Into::into).collect();
|
||||
|
||||
let request = SubmitDdlTaskRequest {
|
||||
query_context,
|
||||
task: DdlTask::new_create_table(create_table, partitions, table_info),
|
||||
};
|
||||
|
||||
self.procedure_executor
|
||||
.submit_ddl_task(&ExecutorContext::default(), request)
|
||||
.await
|
||||
.context(ExecuteDdlSnafu)
|
||||
}
|
||||
|
||||
/// Submits a procedure to create logical tables.
|
||||
async fn create_logical_tables_procedure(
|
||||
&self,
|
||||
tables_data: Vec<(CreateTableExpr, RawTableInfo)>,
|
||||
query_context: QueryContextRef,
|
||||
) -> Result<SubmitDdlTaskResponse> {
|
||||
let request = SubmitDdlTaskRequest {
|
||||
query_context,
|
||||
task: DdlTask::new_create_logical_tables(tables_data),
|
||||
};
|
||||
|
||||
self.procedure_executor
|
||||
.submit_ddl_task(&ExecutorContext::default(), request)
|
||||
.await
|
||||
.context(ExecuteDdlSnafu)
|
||||
}
|
||||
|
||||
/// Submits a procedure to alter logical tables.
|
||||
async fn alter_logical_tables_procedure(
|
||||
&self,
|
||||
tables_data: Vec<AlterTableExpr>,
|
||||
query_context: QueryContextRef,
|
||||
) -> Result<SubmitDdlTaskResponse> {
|
||||
let request = SubmitDdlTaskRequest {
|
||||
query_context,
|
||||
task: DdlTask::new_alter_logical_tables(tables_data),
|
||||
};
|
||||
|
||||
self.procedure_executor
|
||||
.submit_ddl_task(&ExecutorContext::default(), request)
|
||||
.await
|
||||
.context(ExecuteDdlSnafu)
|
||||
}
|
||||
}
|
||||
|
||||
/// Schema of a logical table.
|
||||
pub struct LogicalSchema {
|
||||
/// Name of the logical table.
|
||||
pub name: String,
|
||||
/// Schema of columns in the logical table.
|
||||
pub columns: Vec<ColumnSchema>,
|
||||
}
|
||||
|
||||
/// Logical table schemas.
|
||||
pub struct LogicalSchemas {
|
||||
/// Logical table schemas group by physical table name.
|
||||
pub schemas: HashMap<String, Vec<LogicalSchema>>,
|
||||
}
|
||||
|
||||
/// Creates or alters logical tables to match the provided schemas
|
||||
/// for prometheus metrics.
|
||||
pub async fn ensure_logical_tables_for_metrics(
|
||||
helper: &SchemaHelper,
|
||||
schemas: &LogicalSchemas,
|
||||
query_ctx: &QueryContextRef,
|
||||
) -> Result<()> {
|
||||
let catalog_name = query_ctx.current_catalog();
|
||||
let schema_name = query_ctx.current_schema();
|
||||
|
||||
// 1. For each physical table, creates it if it doesn't exist.
|
||||
for physical_table_name in schemas.schemas.keys() {
|
||||
// Check if the physical table exists and create it if it doesn't
|
||||
let physical_table_opt = helper
|
||||
.get_table(catalog_name, &schema_name, physical_table_name)
|
||||
.await?;
|
||||
|
||||
if physical_table_opt.is_none() {
|
||||
// Physical table doesn't exist, create it
|
||||
helper
|
||||
.create_metric_physical_table(query_ctx, physical_table_name.clone())
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
// 2. Collects logical tables that do not exist. (CreateTableExpr)
|
||||
let mut tables_to_create: Vec<CreateTableExpr> = Vec::new();
|
||||
|
||||
// 3. Collects alterations (columns to add) for each logical table. (AlterTableExpr)
|
||||
let mut tables_to_alter: Vec<AlterTableExpr> = Vec::new();
|
||||
|
||||
// Process each logical table to determine if it needs to be created or altered
|
||||
for (physical_table_name, logical_schemas) in &schemas.schemas {
|
||||
for logical_schema in logical_schemas {
|
||||
let table_name = &logical_schema.name;
|
||||
|
||||
// Check if the logical table exists
|
||||
let table_opt = helper
|
||||
.get_table(catalog_name, &schema_name, table_name)
|
||||
.await?;
|
||||
|
||||
if let Some(existing_table) = table_opt {
|
||||
// Logical table exists, determine if it needs alteration
|
||||
let existing_schema = existing_table.schema();
|
||||
let column_exprs = ColumnExpr::from_column_schemas(&logical_schema.columns);
|
||||
let add_columns =
|
||||
expr_helper::extract_add_columns_expr(&existing_schema, column_exprs)?;
|
||||
let Some(add_columns) = add_columns else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let alter_expr = AlterTableExpr {
|
||||
catalog_name: catalog_name.to_string(),
|
||||
schema_name: schema_name.clone(),
|
||||
table_name: table_name.to_string(),
|
||||
kind: Some(Kind::AddColumns(add_columns)),
|
||||
};
|
||||
tables_to_alter.push(alter_expr);
|
||||
} else {
|
||||
// Logical table doesn't exist, prepare for creation
|
||||
// Build a CreateTableExpr from the table reference and columns
|
||||
let table_ref = TableReference::full(catalog_name, &schema_name, table_name);
|
||||
let mut create_expr = build_create_table_expr(
|
||||
&table_ref,
|
||||
&logical_schema.columns,
|
||||
METRIC_ENGINE_NAME,
|
||||
)?;
|
||||
create_expr.create_if_not_exists = true;
|
||||
let create_type = AutoCreateTableType::Logical(physical_table_name.clone());
|
||||
// Fill table options.
|
||||
fill_table_options_for_create(
|
||||
&mut create_expr.table_options,
|
||||
&create_type,
|
||||
query_ctx,
|
||||
);
|
||||
|
||||
tables_to_create.push(create_expr);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 4. Creates logical tables in batch using `create_logical_tables()`.
|
||||
if !tables_to_create.is_empty() {
|
||||
helper
|
||||
.create_logical_tables(&tables_to_create, query_ctx.clone())
|
||||
.await?;
|
||||
}
|
||||
|
||||
// 5. Alters logical tables in batch using `alter_logical_tables()`.
|
||||
if !tables_to_alter.is_empty() {
|
||||
helper
|
||||
.alter_logical_tables(tables_to_alter, query_ctx.clone())
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Gets the list of metadatas for a list of region ids.
|
||||
// TODO(yingwen): Should we return RegionMetadataRef?
|
||||
pub async fn metadatas_for_region_ids(
|
||||
partition_manager: &PartitionRuleManagerRef,
|
||||
node_manager: &NodeManagerRef,
|
||||
region_ids: &[RegionId],
|
||||
ctx: &QueryContextRef,
|
||||
) -> Result<Vec<Option<RegionMetadata>>> {
|
||||
// Groups regions by peers.
|
||||
// This map contains: peer => (ListMetadataRequest, A vec of indices of regions).
|
||||
let mut request_per_region = HashMap::new();
|
||||
for (index, region_id) in region_ids.iter().copied().enumerate() {
|
||||
let peer = partition_manager
|
||||
.find_region_leader(region_id)
|
||||
.await
|
||||
.context(FindRegionLeaderSnafu)?;
|
||||
let request_indices = request_per_region
|
||||
.entry(peer)
|
||||
.or_insert_with(|| (ListMetadataRequest::default(), Vec::new()));
|
||||
request_indices.0.region_ids.push(region_id.as_u64());
|
||||
request_indices.1.push(index);
|
||||
}
|
||||
|
||||
// Sends requests to datanode and waits for responses.
|
||||
let tasks = request_per_region
|
||||
.into_iter()
|
||||
.map(|(peer, (request, indices))| {
|
||||
let node_manager = node_manager.clone();
|
||||
let request_factory = RegionRequestFactory::new(RegionRequestHeader {
|
||||
tracing_context: TracingContext::from_current_span().to_w3c(),
|
||||
dbname: ctx.get_db_string(),
|
||||
..Default::default()
|
||||
});
|
||||
|
||||
common_runtime::spawn_global(async move {
|
||||
let request = request_factory.build_request(Body::ListMetadata(request));
|
||||
let resp = node_manager
|
||||
.datanode(&peer)
|
||||
.await
|
||||
.handle(request)
|
||||
.await
|
||||
.context(RequestRegionSnafu)?;
|
||||
|
||||
let metadatas: Vec<Option<RegionMetadata>> =
|
||||
serde_json::from_slice(&resp.metadata).context(DecodeJsonSnafu)?;
|
||||
Ok((metadatas, indices))
|
||||
})
|
||||
});
|
||||
let results = future::try_join_all(tasks).await.context(JoinTaskSnafu)?;
|
||||
let mut output_metadatas = vec![None; region_ids.len()];
|
||||
for result in results {
|
||||
let (mut metadatas, indices) = result?;
|
||||
ensure!(
|
||||
metadatas.len() == indices.len(),
|
||||
UnexpectedSnafu {
|
||||
violated: format!(
|
||||
"Length mismatch between request and response, expected {} metadatas, got {}",
|
||||
indices.len(),
|
||||
metadatas.len()
|
||||
),
|
||||
}
|
||||
);
|
||||
for index in indices {
|
||||
output_metadatas[index] = metadatas[index].take();
|
||||
}
|
||||
}
|
||||
|
||||
Ok(output_metadatas)
|
||||
}
|
||||
@@ -18,7 +18,7 @@ mod copy_query_to;
|
||||
mod copy_table_from;
|
||||
mod copy_table_to;
|
||||
mod cursor;
|
||||
mod ddl;
|
||||
pub(crate) mod ddl;
|
||||
mod describe;
|
||||
mod dml;
|
||||
mod kill;
|
||||
@@ -102,6 +102,14 @@ pub struct StatementExecutor {
|
||||
pub type StatementExecutorRef = Arc<StatementExecutor>;
|
||||
|
||||
impl StatementExecutor {
|
||||
pub fn procedure_executor(&self) -> &ProcedureExecutorRef {
|
||||
&self.procedure_executor
|
||||
}
|
||||
|
||||
pub fn cache_invalidator(&self) -> &CacheInvalidatorRef {
|
||||
&self.cache_invalidator
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new(
|
||||
catalog_manager: CatalogManagerRef,
|
||||
|
||||
@@ -26,7 +26,7 @@ use api::v1::{
|
||||
};
|
||||
use catalog::CatalogManagerRef;
|
||||
use chrono::Utc;
|
||||
use common_catalog::consts::{is_readonly_schema, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use common_catalog::consts::is_readonly_schema;
|
||||
use common_catalog::{format_full_flow_name, format_full_table_name};
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::cache_invalidator::Context;
|
||||
@@ -43,7 +43,7 @@ use common_meta::rpc::ddl::{
|
||||
CreateFlowTask, DdlTask, DropFlowTask, DropViewTask, SubmitDdlTaskRequest,
|
||||
SubmitDdlTaskResponse,
|
||||
};
|
||||
use common_meta::rpc::router::{Partition, Partition as MetaPartition};
|
||||
use common_meta::rpc::router::Partition as MetaPartition;
|
||||
use common_query::Output;
|
||||
use common_telemetry::{debug, info, tracing, warn};
|
||||
use common_time::Timezone;
|
||||
@@ -74,7 +74,6 @@ use sql::statements::create::{
|
||||
use sql::statements::sql_value_to_value;
|
||||
use sql::statements::statement::Statement;
|
||||
use sqlparser::ast::{Expr, Ident, UnaryOperator, Value as ParserValue};
|
||||
use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
|
||||
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
|
||||
use table::dist_table::DistTable;
|
||||
use table::metadata::{self, RawTableInfo, RawTableMeta, TableId, TableInfo, TableType};
|
||||
@@ -84,12 +83,11 @@ use table::TableRef;
|
||||
|
||||
use crate::error::{
|
||||
self, AlterExprToRequestSnafu, BuildDfLogicalPlanSnafu, CatalogSnafu, ColumnDataTypeSnafu,
|
||||
ColumnNotFoundSnafu, ConvertSchemaSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu,
|
||||
DeserializePartitionSnafu, EmptyDdlExprSnafu, ExternalSnafu, ExtractTableNamesSnafu,
|
||||
FlowNotFoundSnafu, InvalidPartitionRuleSnafu, InvalidPartitionSnafu, InvalidSqlSnafu,
|
||||
InvalidTableNameSnafu, InvalidViewNameSnafu, InvalidViewStmtSnafu, ParseSqlValueSnafu, Result,
|
||||
SchemaInUseSnafu, SchemaNotFoundSnafu, SchemaReadOnlySnafu, SubstraitCodecSnafu,
|
||||
TableAlreadyExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu,
|
||||
ColumnNotFoundSnafu, ConvertSchemaSnafu, CreateTableInfoSnafu, DeserializePartitionSnafu,
|
||||
ExternalSnafu, ExtractTableNamesSnafu, FlowNotFoundSnafu, InvalidPartitionRuleSnafu,
|
||||
InvalidPartitionSnafu, InvalidSqlSnafu, InvalidViewNameSnafu, InvalidViewStmtSnafu,
|
||||
ParseSqlValueSnafu, Result, SchemaInUseSnafu, SchemaNotFoundSnafu, SchemaReadOnlySnafu,
|
||||
SubstraitCodecSnafu, TableAlreadyExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu,
|
||||
UnrecognizedTableOptionSnafu, ViewAlreadyExistsSnafu,
|
||||
};
|
||||
use crate::expr_helper;
|
||||
@@ -97,7 +95,8 @@ use crate::statement::show::create_partitions_stmt;
|
||||
use crate::statement::StatementExecutor;
|
||||
|
||||
lazy_static! {
|
||||
static ref NAME_PATTERN_REG: Regex = Regex::new(&format!("^{NAME_PATTERN}$")).unwrap();
|
||||
/// Regex to validate table name.
|
||||
pub(crate) static ref NAME_PATTERN_REG: Regex = Regex::new(&format!("^{NAME_PATTERN}$")).unwrap();
|
||||
}
|
||||
|
||||
impl StatementExecutor {
|
||||
@@ -182,192 +181,10 @@ impl StatementExecutor {
|
||||
partitions: Option<Partitions>,
|
||||
query_ctx: QueryContextRef,
|
||||
) -> Result<TableRef> {
|
||||
ensure!(
|
||||
!is_readonly_schema(&create_table.schema_name),
|
||||
SchemaReadOnlySnafu {
|
||||
name: create_table.schema_name.clone()
|
||||
}
|
||||
);
|
||||
|
||||
if create_table.engine == METRIC_ENGINE_NAME
|
||||
&& create_table
|
||||
.table_options
|
||||
.contains_key(LOGICAL_TABLE_METADATA_KEY)
|
||||
{
|
||||
// Create logical tables
|
||||
ensure!(
|
||||
partitions.is_none(),
|
||||
InvalidPartitionRuleSnafu {
|
||||
reason: "logical table in metric engine should not have partition rule, it will be inherited from physical table",
|
||||
}
|
||||
);
|
||||
self.create_logical_tables(std::slice::from_ref(create_table), query_ctx)
|
||||
.await?
|
||||
.into_iter()
|
||||
.next()
|
||||
.context(error::UnexpectedSnafu {
|
||||
violated: "expected to create logical tables",
|
||||
})
|
||||
} else {
|
||||
// Create other normal table
|
||||
self.create_non_logic_table(create_table, partitions, query_ctx)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub async fn create_non_logic_table(
|
||||
&self,
|
||||
create_table: &mut CreateTableExpr,
|
||||
partitions: Option<Partitions>,
|
||||
query_ctx: QueryContextRef,
|
||||
) -> Result<TableRef> {
|
||||
let _timer = crate::metrics::DIST_CREATE_TABLE.start_timer();
|
||||
|
||||
// Check if schema exists
|
||||
let schema = self
|
||||
.table_metadata_manager
|
||||
.schema_manager()
|
||||
.get(SchemaNameKey::new(
|
||||
&create_table.catalog_name,
|
||||
&create_table.schema_name,
|
||||
))
|
||||
self.inserter
|
||||
.schema_helper
|
||||
.create_table_by_expr(create_table, partitions, query_ctx)
|
||||
.await
|
||||
.context(TableMetadataManagerSnafu)?;
|
||||
ensure!(
|
||||
schema.is_some(),
|
||||
SchemaNotFoundSnafu {
|
||||
schema_info: &create_table.schema_name,
|
||||
}
|
||||
);
|
||||
|
||||
// if table exists.
|
||||
if let Some(table) = self
|
||||
.catalog_manager
|
||||
.table(
|
||||
&create_table.catalog_name,
|
||||
&create_table.schema_name,
|
||||
&create_table.table_name,
|
||||
Some(&query_ctx),
|
||||
)
|
||||
.await
|
||||
.context(CatalogSnafu)?
|
||||
{
|
||||
return if create_table.create_if_not_exists {
|
||||
Ok(table)
|
||||
} else {
|
||||
TableAlreadyExistsSnafu {
|
||||
table: format_full_table_name(
|
||||
&create_table.catalog_name,
|
||||
&create_table.schema_name,
|
||||
&create_table.table_name,
|
||||
),
|
||||
}
|
||||
.fail()
|
||||
};
|
||||
}
|
||||
|
||||
ensure!(
|
||||
NAME_PATTERN_REG.is_match(&create_table.table_name),
|
||||
InvalidTableNameSnafu {
|
||||
table_name: &create_table.table_name,
|
||||
}
|
||||
);
|
||||
|
||||
let table_name = TableName::new(
|
||||
&create_table.catalog_name,
|
||||
&create_table.schema_name,
|
||||
&create_table.table_name,
|
||||
);
|
||||
|
||||
let (partitions, partition_cols) = parse_partitions(create_table, partitions, &query_ctx)?;
|
||||
let mut table_info = create_table_info(create_table, partition_cols)?;
|
||||
|
||||
let resp = self
|
||||
.create_table_procedure(
|
||||
create_table.clone(),
|
||||
partitions,
|
||||
table_info.clone(),
|
||||
query_ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let table_id = resp
|
||||
.table_ids
|
||||
.into_iter()
|
||||
.next()
|
||||
.context(error::UnexpectedSnafu {
|
||||
violated: "expected table_id",
|
||||
})?;
|
||||
info!("Successfully created table '{table_name}' with table id {table_id}");
|
||||
|
||||
table_info.ident.table_id = table_id;
|
||||
|
||||
let table_info: Arc<TableInfo> =
|
||||
Arc::new(table_info.try_into().context(CreateTableInfoSnafu)?);
|
||||
create_table.table_id = Some(api::v1::TableId { id: table_id });
|
||||
|
||||
let table = DistTable::table(table_info);
|
||||
|
||||
Ok(table)
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub async fn create_logical_tables(
|
||||
&self,
|
||||
create_table_exprs: &[CreateTableExpr],
|
||||
query_context: QueryContextRef,
|
||||
) -> Result<Vec<TableRef>> {
|
||||
let _timer = crate::metrics::DIST_CREATE_TABLES.start_timer();
|
||||
ensure!(
|
||||
!create_table_exprs.is_empty(),
|
||||
EmptyDdlExprSnafu {
|
||||
name: "create logic tables"
|
||||
}
|
||||
);
|
||||
|
||||
// Check table names
|
||||
for create_table in create_table_exprs {
|
||||
ensure!(
|
||||
NAME_PATTERN_REG.is_match(&create_table.table_name),
|
||||
InvalidTableNameSnafu {
|
||||
table_name: &create_table.table_name,
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
let mut raw_tables_info = create_table_exprs
|
||||
.iter()
|
||||
.map(|create| create_table_info(create, vec![]))
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
let tables_data = create_table_exprs
|
||||
.iter()
|
||||
.cloned()
|
||||
.zip(raw_tables_info.iter().cloned())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let resp = self
|
||||
.create_logical_tables_procedure(tables_data, query_context)
|
||||
.await?;
|
||||
|
||||
let table_ids = resp.table_ids;
|
||||
ensure!(table_ids.len() == raw_tables_info.len(), CreateLogicalTablesSnafu {
|
||||
reason: format!("The number of tables is inconsistent with the expected number to be created, expected: {}, actual: {}", raw_tables_info.len(), table_ids.len())
|
||||
});
|
||||
info!("Successfully created logical tables: {:?}", table_ids);
|
||||
|
||||
for (i, table_info) in raw_tables_info.iter_mut().enumerate() {
|
||||
table_info.ident.table_id = table_ids[i];
|
||||
}
|
||||
let tables_info = raw_tables_info
|
||||
.into_iter()
|
||||
.map(|x| x.try_into().context(CreateTableInfoSnafu))
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
|
||||
Ok(tables_info
|
||||
.into_iter()
|
||||
.map(|x| DistTable::table(Arc::new(x)))
|
||||
.collect())
|
||||
}
|
||||
|
||||
#[cfg(feature = "enterprise")]
|
||||
@@ -953,64 +770,6 @@ impl StatementExecutor {
|
||||
.context(error::ExecuteDdlSnafu)
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub async fn alter_logical_tables(
|
||||
&self,
|
||||
alter_table_exprs: Vec<AlterTableExpr>,
|
||||
query_context: QueryContextRef,
|
||||
) -> Result<Output> {
|
||||
let _timer = crate::metrics::DIST_ALTER_TABLES.start_timer();
|
||||
ensure!(
|
||||
!alter_table_exprs.is_empty(),
|
||||
EmptyDdlExprSnafu {
|
||||
name: "alter logical tables"
|
||||
}
|
||||
);
|
||||
|
||||
// group by physical table id
|
||||
let mut groups: HashMap<TableId, Vec<AlterTableExpr>> = HashMap::new();
|
||||
for expr in alter_table_exprs {
|
||||
// Get table_id from catalog_manager
|
||||
let catalog = if expr.catalog_name.is_empty() {
|
||||
query_context.current_catalog()
|
||||
} else {
|
||||
&expr.catalog_name
|
||||
};
|
||||
let schema = if expr.schema_name.is_empty() {
|
||||
query_context.current_schema()
|
||||
} else {
|
||||
expr.schema_name.to_string()
|
||||
};
|
||||
let table_name = &expr.table_name;
|
||||
let table = self
|
||||
.catalog_manager
|
||||
.table(catalog, &schema, table_name, Some(&query_context))
|
||||
.await
|
||||
.context(CatalogSnafu)?
|
||||
.with_context(|| TableNotFoundSnafu {
|
||||
table_name: format_full_table_name(catalog, &schema, table_name),
|
||||
})?;
|
||||
let table_id = table.table_info().ident.table_id;
|
||||
let physical_table_id = self
|
||||
.table_metadata_manager
|
||||
.table_route_manager()
|
||||
.get_physical_table_id(table_id)
|
||||
.await
|
||||
.context(TableMetadataManagerSnafu)?;
|
||||
groups.entry(physical_table_id).or_default().push(expr);
|
||||
}
|
||||
|
||||
// Submit procedure for each physical table
|
||||
let mut handles = Vec::with_capacity(groups.len());
|
||||
for (_physical_table_id, exprs) in groups {
|
||||
let fut = self.alter_logical_tables_procedure(exprs, query_context.clone());
|
||||
handles.push(fut);
|
||||
}
|
||||
let _results = futures::future::try_join_all(handles).await?;
|
||||
|
||||
Ok(Output::new_with_affected_rows(0))
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub async fn drop_table(
|
||||
&self,
|
||||
@@ -1152,60 +911,6 @@ impl StatementExecutor {
|
||||
Ok(Output::new_with_affected_rows(0))
|
||||
}
|
||||
|
||||
/// Verifies an alter and returns whether it is necessary to perform the alter.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// Returns true if the alter need to be porformed; otherwise, it returns false.
|
||||
fn verify_alter(
|
||||
&self,
|
||||
table_id: TableId,
|
||||
table_info: Arc<TableInfo>,
|
||||
expr: AlterTableExpr,
|
||||
) -> Result<bool> {
|
||||
let request: AlterTableRequest = common_grpc_expr::alter_expr_to_request(table_id, expr)
|
||||
.context(AlterExprToRequestSnafu)?;
|
||||
|
||||
let AlterTableRequest {
|
||||
table_name,
|
||||
alter_kind,
|
||||
..
|
||||
} = &request;
|
||||
|
||||
if let AlterKind::RenameTable { new_table_name } = alter_kind {
|
||||
ensure!(
|
||||
NAME_PATTERN_REG.is_match(new_table_name),
|
||||
error::UnexpectedSnafu {
|
||||
violated: format!("Invalid table name: {}", new_table_name)
|
||||
}
|
||||
);
|
||||
} else if let AlterKind::AddColumns { columns } = alter_kind {
|
||||
// If all the columns are marked as add_if_not_exists and they already exist in the table,
|
||||
// there is no need to perform the alter.
|
||||
let column_names: HashSet<_> = table_info
|
||||
.meta
|
||||
.schema
|
||||
.column_schemas()
|
||||
.iter()
|
||||
.map(|schema| &schema.name)
|
||||
.collect();
|
||||
if columns.iter().all(|column| {
|
||||
column_names.contains(&column.column_schema.name) && column.add_if_not_exists
|
||||
}) {
|
||||
return Ok(false);
|
||||
}
|
||||
}
|
||||
|
||||
let _ = table_info
|
||||
.meta
|
||||
.builder_with_alter_kind(table_name, &request.alter_kind)
|
||||
.context(error::TableSnafu)?
|
||||
.build()
|
||||
.context(error::BuildTableMetaSnafu { table_name })?;
|
||||
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub async fn alter_table(
|
||||
&self,
|
||||
@@ -1222,116 +927,10 @@ impl StatementExecutor {
|
||||
expr: AlterTableExpr,
|
||||
query_context: QueryContextRef,
|
||||
) -> Result<Output> {
|
||||
ensure!(
|
||||
!is_readonly_schema(&expr.schema_name),
|
||||
SchemaReadOnlySnafu {
|
||||
name: expr.schema_name.clone()
|
||||
}
|
||||
);
|
||||
|
||||
let catalog_name = if expr.catalog_name.is_empty() {
|
||||
DEFAULT_CATALOG_NAME.to_string()
|
||||
} else {
|
||||
expr.catalog_name.clone()
|
||||
};
|
||||
|
||||
let schema_name = if expr.schema_name.is_empty() {
|
||||
DEFAULT_SCHEMA_NAME.to_string()
|
||||
} else {
|
||||
expr.schema_name.clone()
|
||||
};
|
||||
|
||||
let table_name = expr.table_name.clone();
|
||||
|
||||
let table = self
|
||||
.catalog_manager
|
||||
.table(
|
||||
&catalog_name,
|
||||
&schema_name,
|
||||
&table_name,
|
||||
Some(&query_context),
|
||||
)
|
||||
self.inserter
|
||||
.schema_helper
|
||||
.alter_table_by_expr(expr, query_context)
|
||||
.await
|
||||
.context(CatalogSnafu)?
|
||||
.with_context(|| TableNotFoundSnafu {
|
||||
table_name: format_full_table_name(&catalog_name, &schema_name, &table_name),
|
||||
})?;
|
||||
|
||||
let table_id = table.table_info().ident.table_id;
|
||||
let need_alter = self.verify_alter(table_id, table.table_info(), expr.clone())?;
|
||||
if !need_alter {
|
||||
return Ok(Output::new_with_affected_rows(0));
|
||||
}
|
||||
info!(
|
||||
"Table info before alter is {:?}, expr: {:?}",
|
||||
table.table_info(),
|
||||
expr
|
||||
);
|
||||
|
||||
let physical_table_id = self
|
||||
.table_metadata_manager
|
||||
.table_route_manager()
|
||||
.get_physical_table_id(table_id)
|
||||
.await
|
||||
.context(TableMetadataManagerSnafu)?;
|
||||
|
||||
let (req, invalidate_keys) = if physical_table_id == table_id {
|
||||
// This is physical table
|
||||
let req = SubmitDdlTaskRequest {
|
||||
query_context,
|
||||
task: DdlTask::new_alter_table(expr),
|
||||
};
|
||||
|
||||
let invalidate_keys = vec![
|
||||
CacheIdent::TableId(table_id),
|
||||
CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
|
||||
];
|
||||
|
||||
(req, invalidate_keys)
|
||||
} else {
|
||||
// This is logical table
|
||||
let req = SubmitDdlTaskRequest {
|
||||
query_context,
|
||||
task: DdlTask::new_alter_logical_tables(vec![expr]),
|
||||
};
|
||||
|
||||
let mut invalidate_keys = vec![
|
||||
CacheIdent::TableId(physical_table_id),
|
||||
CacheIdent::TableId(table_id),
|
||||
CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
|
||||
];
|
||||
|
||||
let physical_table = self
|
||||
.table_metadata_manager
|
||||
.table_info_manager()
|
||||
.get(physical_table_id)
|
||||
.await
|
||||
.context(TableMetadataManagerSnafu)?
|
||||
.map(|x| x.into_inner());
|
||||
if let Some(physical_table) = physical_table {
|
||||
let physical_table_name = TableName::new(
|
||||
physical_table.table_info.catalog_name,
|
||||
physical_table.table_info.schema_name,
|
||||
physical_table.table_info.name,
|
||||
);
|
||||
invalidate_keys.push(CacheIdent::TableName(physical_table_name));
|
||||
}
|
||||
|
||||
(req, invalidate_keys)
|
||||
};
|
||||
|
||||
self.procedure_executor
|
||||
.submit_ddl_task(&ExecutorContext::default(), req)
|
||||
.await
|
||||
.context(error::ExecuteDdlSnafu)?;
|
||||
|
||||
// Invalidates local cache ASAP.
|
||||
self.cache_invalidator
|
||||
.invalidate(&Context::default(), &invalidate_keys)
|
||||
.await
|
||||
.context(error::InvalidateTableCacheSnafu)?;
|
||||
|
||||
Ok(Output::new_with_affected_rows(0))
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
@@ -1386,58 +985,6 @@ impl StatementExecutor {
|
||||
Ok(Output::new_with_affected_rows(0))
|
||||
}
|
||||
|
||||
async fn create_table_procedure(
|
||||
&self,
|
||||
create_table: CreateTableExpr,
|
||||
partitions: Vec<Partition>,
|
||||
table_info: RawTableInfo,
|
||||
query_context: QueryContextRef,
|
||||
) -> Result<SubmitDdlTaskResponse> {
|
||||
let partitions = partitions.into_iter().map(Into::into).collect();
|
||||
|
||||
let request = SubmitDdlTaskRequest {
|
||||
query_context,
|
||||
task: DdlTask::new_create_table(create_table, partitions, table_info),
|
||||
};
|
||||
|
||||
self.procedure_executor
|
||||
.submit_ddl_task(&ExecutorContext::default(), request)
|
||||
.await
|
||||
.context(error::ExecuteDdlSnafu)
|
||||
}
|
||||
|
||||
async fn create_logical_tables_procedure(
|
||||
&self,
|
||||
tables_data: Vec<(CreateTableExpr, RawTableInfo)>,
|
||||
query_context: QueryContextRef,
|
||||
) -> Result<SubmitDdlTaskResponse> {
|
||||
let request = SubmitDdlTaskRequest {
|
||||
query_context,
|
||||
task: DdlTask::new_create_logical_tables(tables_data),
|
||||
};
|
||||
|
||||
self.procedure_executor
|
||||
.submit_ddl_task(&ExecutorContext::default(), request)
|
||||
.await
|
||||
.context(error::ExecuteDdlSnafu)
|
||||
}
|
||||
|
||||
async fn alter_logical_tables_procedure(
|
||||
&self,
|
||||
tables_data: Vec<AlterTableExpr>,
|
||||
query_context: QueryContextRef,
|
||||
) -> Result<SubmitDdlTaskResponse> {
|
||||
let request = SubmitDdlTaskRequest {
|
||||
query_context,
|
||||
task: DdlTask::new_alter_logical_tables(tables_data),
|
||||
};
|
||||
|
||||
self.procedure_executor
|
||||
.submit_ddl_task(&ExecutorContext::default(), request)
|
||||
.await
|
||||
.context(error::ExecuteDdlSnafu)
|
||||
}
|
||||
|
||||
async fn drop_table_procedure(
|
||||
&self,
|
||||
table_name: &TableName,
|
||||
@@ -1585,8 +1132,61 @@ impl StatementExecutor {
|
||||
}
|
||||
}
|
||||
|
||||
/// Verifies an alter and returns whether it is necessary to perform the alter.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// Returns true if the alter need to be porformed; otherwise, it returns false.
|
||||
pub(crate) fn verify_alter(
|
||||
table_id: TableId,
|
||||
table_info: Arc<TableInfo>,
|
||||
expr: AlterTableExpr,
|
||||
) -> Result<bool> {
|
||||
let request: AlterTableRequest =
|
||||
common_grpc_expr::alter_expr_to_request(table_id, expr).context(AlterExprToRequestSnafu)?;
|
||||
|
||||
let AlterTableRequest {
|
||||
table_name,
|
||||
alter_kind,
|
||||
..
|
||||
} = &request;
|
||||
|
||||
if let AlterKind::RenameTable { new_table_name } = alter_kind {
|
||||
ensure!(
|
||||
NAME_PATTERN_REG.is_match(new_table_name),
|
||||
error::UnexpectedSnafu {
|
||||
violated: format!("Invalid table name: {}", new_table_name)
|
||||
}
|
||||
);
|
||||
} else if let AlterKind::AddColumns { columns } = alter_kind {
|
||||
// If all the columns are marked as add_if_not_exists and they already exist in the table,
|
||||
// there is no need to perform the alter.
|
||||
let column_names: HashSet<_> = table_info
|
||||
.meta
|
||||
.schema
|
||||
.column_schemas()
|
||||
.iter()
|
||||
.map(|schema| &schema.name)
|
||||
.collect();
|
||||
if columns.iter().all(|column| {
|
||||
column_names.contains(&column.column_schema.name) && column.add_if_not_exists
|
||||
}) {
|
||||
return Ok(false);
|
||||
}
|
||||
}
|
||||
|
||||
let _ = table_info
|
||||
.meta
|
||||
.builder_with_alter_kind(table_name, &request.alter_kind)
|
||||
.context(error::TableSnafu)?
|
||||
.build()
|
||||
.context(error::BuildTableMetaSnafu { table_name })?;
|
||||
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
/// Parse partition statement [Partitions] into [MetaPartition] and partition columns.
|
||||
fn parse_partitions(
|
||||
pub(crate) fn parse_partitions(
|
||||
create_table: &CreateTableExpr,
|
||||
partitions: Option<Partitions>,
|
||||
query_ctx: &QueryContextRef,
|
||||
@@ -1619,7 +1219,7 @@ fn parse_partitions(
|
||||
))
|
||||
}
|
||||
|
||||
fn create_table_info(
|
||||
pub(crate) fn create_table_info(
|
||||
create_table: &CreateTableExpr,
|
||||
partition_columns: Vec<String>,
|
||||
) -> Result<RawTableInfo> {
|
||||
|
||||
@@ -88,7 +88,6 @@ impl PipelineOperator {
|
||||
catalog.to_string(),
|
||||
Arc::new(PipelineTable::new(
|
||||
self.inserter.clone(),
|
||||
self.statement_executor.clone(),
|
||||
table,
|
||||
self.query_engine.clone(),
|
||||
)),
|
||||
|
||||
@@ -30,7 +30,6 @@ use datatypes::timestamp::TimestampNanosecond;
|
||||
use datatypes::vectors::{StringVector, TimestampNanosecondVector, Vector};
|
||||
use itertools::Itertools;
|
||||
use operator::insert::InserterRef;
|
||||
use operator::statement::StatementExecutorRef;
|
||||
use query::dataframe::DataFrame;
|
||||
use query::QueryEngineRef;
|
||||
use session::context::{QueryContextBuilder, QueryContextRef};
|
||||
@@ -61,7 +60,6 @@ pub(crate) const EMPTY_SCHEMA_NAME: &str = "";
|
||||
/// Every catalog has its own pipeline table.
|
||||
pub struct PipelineTable {
|
||||
inserter: InserterRef,
|
||||
statement_executor: StatementExecutorRef,
|
||||
table: TableRef,
|
||||
query_engine: QueryEngineRef,
|
||||
cache: PipelineCache,
|
||||
@@ -69,15 +67,9 @@ pub struct PipelineTable {
|
||||
|
||||
impl PipelineTable {
|
||||
/// Create a new PipelineTable.
|
||||
pub fn new(
|
||||
inserter: InserterRef,
|
||||
statement_executor: StatementExecutorRef,
|
||||
table: TableRef,
|
||||
query_engine: QueryEngineRef,
|
||||
) -> Self {
|
||||
pub fn new(inserter: InserterRef, table: TableRef, query_engine: QueryEngineRef) -> Self {
|
||||
Self {
|
||||
inserter,
|
||||
statement_executor,
|
||||
table,
|
||||
query_engine,
|
||||
cache: PipelineCache::new(),
|
||||
@@ -232,13 +224,7 @@ impl PipelineTable {
|
||||
|
||||
let output = self
|
||||
.inserter
|
||||
.handle_row_inserts(
|
||||
requests,
|
||||
Self::query_ctx(&table_info),
|
||||
&self.statement_executor,
|
||||
false,
|
||||
false,
|
||||
)
|
||||
.handle_row_inserts(requests, Self::query_ctx(&table_info), false, false)
|
||||
.await
|
||||
.context(InsertPipelineSnafu)?;
|
||||
|
||||
|
||||
@@ -36,6 +36,7 @@ chrono.workspace = true
|
||||
common-base.workspace = true
|
||||
common-catalog.workspace = true
|
||||
common-config.workspace = true
|
||||
common-datasource.workspace = true
|
||||
common-error.workspace = true
|
||||
common-frontend.workspace = true
|
||||
common-grpc.workspace = true
|
||||
@@ -74,11 +75,18 @@ jsonb.workspace = true
|
||||
lazy_static.workspace = true
|
||||
log-query.workspace = true
|
||||
loki-proto.workspace = true
|
||||
metric-engine.workspace = true
|
||||
mime_guess = "2.0"
|
||||
mito-codec.workspace = true
|
||||
mito2.workspace = true
|
||||
notify.workspace = true
|
||||
object-pool = "0.5"
|
||||
object-store.workspace = true
|
||||
once_cell.workspace = true
|
||||
openmetrics-parser = "0.4"
|
||||
operator.workspace = true
|
||||
parquet.workspace = true
|
||||
partition.workspace = true
|
||||
simd-json.workspace = true
|
||||
socket2 = "0.5"
|
||||
# use crates.io version once the following PRs is merged into the nextest release
|
||||
|
||||
415
src/servers/src/access_layer.rs
Normal file
415
src/servers/src/access_layer.rs
Normal file
@@ -0,0 +1,415 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use arrow::array::{
|
||||
Array, PrimitiveArray, RecordBatch, TimestampMicrosecondArray, TimestampMillisecondArray,
|
||||
TimestampNanosecondArray, TimestampSecondArray,
|
||||
};
|
||||
use arrow::datatypes::Int64Type;
|
||||
use arrow_schema::TimeUnit;
|
||||
use common_datasource::parquet_writer::AsyncWriter;
|
||||
use datafusion::parquet::arrow::AsyncArrowWriter;
|
||||
use mito2::sst::file::{FileId, FileMeta};
|
||||
use mito2::sst::parquet::{DEFAULT_ROW_GROUP_SIZE, PARQUET_METADATA_KEY};
|
||||
use object_store::config::ObjectStoreConfig;
|
||||
use object_store::util::{join_dir, join_path};
|
||||
use object_store::ObjectStore;
|
||||
use parquet::basic::{Compression, Encoding, ZstdLevel};
|
||||
use parquet::file::metadata::KeyValue;
|
||||
use parquet::file::properties::WriterProperties;
|
||||
use snafu::ResultExt;
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::metric_engine_consts::DATA_REGION_SUBDIR;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::batch_builder::physical_schema;
|
||||
use crate::error;
|
||||
|
||||
type AsyncParquetWriter = AsyncArrowWriter<AsyncWriter>;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct AccessLayerFactory {
|
||||
object_store: ObjectStore,
|
||||
}
|
||||
|
||||
impl AccessLayerFactory {
|
||||
pub async fn new(config: &ObjectStoreConfig) -> error::Result<AccessLayerFactory> {
|
||||
let object_store = object_store::factory::new_raw_object_store(config, "")
|
||||
.await
|
||||
.context(error::ObjectStoreSnafu)?;
|
||||
Ok(Self { object_store })
|
||||
}
|
||||
|
||||
pub(crate) async fn create_sst_writer(
|
||||
&self,
|
||||
catalog: &str,
|
||||
schema: &str,
|
||||
region_metadata: RegionMetadataRef,
|
||||
) -> error::Result<ParquetWriter> {
|
||||
let region_dir = build_data_region_dir(catalog, schema, region_metadata.region_id);
|
||||
let file_id = FileId::random();
|
||||
let file_path = join_path(®ion_dir, &file_id.as_parquet());
|
||||
let writer = self
|
||||
.object_store
|
||||
.writer(&file_path)
|
||||
.await
|
||||
.context(error::OpendalSnafu)?;
|
||||
|
||||
let schema = physical_schema();
|
||||
|
||||
let key_value_meta = KeyValue::new(
|
||||
PARQUET_METADATA_KEY.to_string(),
|
||||
region_metadata.to_json().unwrap(),
|
||||
);
|
||||
|
||||
let props = WriterProperties::builder()
|
||||
.set_key_value_metadata(Some(vec![key_value_meta]))
|
||||
.set_compression(Compression::ZSTD(ZstdLevel::default()))
|
||||
.set_encoding(Encoding::PLAIN)
|
||||
.set_max_row_group_size(DEFAULT_ROW_GROUP_SIZE)
|
||||
.build();
|
||||
|
||||
let writer = AsyncParquetWriter::try_new(AsyncWriter::new(writer), schema, Some(props))
|
||||
.context(error::ParquetSnafu)?;
|
||||
Ok(ParquetWriter {
|
||||
region_id: region_metadata.region_id,
|
||||
file_id,
|
||||
region_metadata,
|
||||
writer,
|
||||
timestamp_range: None,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ParquetWriter {
|
||||
region_id: RegionId,
|
||||
file_id: FileId,
|
||||
region_metadata: RegionMetadataRef,
|
||||
writer: AsyncParquetWriter,
|
||||
timestamp_range: Option<(i64, i64)>,
|
||||
}
|
||||
|
||||
impl ParquetWriter {
|
||||
pub(crate) fn file_id(&self) -> FileId {
|
||||
self.file_id
|
||||
}
|
||||
}
|
||||
|
||||
impl ParquetWriter {
|
||||
pub async fn write_record_batch(
|
||||
&mut self,
|
||||
batch: &RecordBatch,
|
||||
timestamp_range: Option<(i64, i64)>,
|
||||
) -> error::Result<()> {
|
||||
if let Err(e) = self.writer.write(&batch).await.context(error::ParquetSnafu) {
|
||||
common_telemetry::error!(e; "Region metadata: {:?}, batch schema: {:?}", self.region_metadata, batch.schema_ref());
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
let (batch_min, batch_max) =
|
||||
get_or_calculate_timestamp_range(timestamp_range, batch, &self.region_metadata)?;
|
||||
|
||||
if let Some((min, max)) = &mut self.timestamp_range {
|
||||
*min = (*min).min(batch_min);
|
||||
*max = (*max).max(batch_max);
|
||||
} else {
|
||||
self.timestamp_range = Some((batch_min, batch_max));
|
||||
};
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn finish(&mut self) -> error::Result<FileMeta> {
|
||||
let (min, max) = self.timestamp_range.unwrap();
|
||||
let timestamp_type = self
|
||||
.region_metadata
|
||||
.time_index_column()
|
||||
.column_schema
|
||||
.data_type
|
||||
.as_timestamp()
|
||||
.unwrap();
|
||||
let min_ts = timestamp_type.create_timestamp(min);
|
||||
let max_ts = timestamp_type.create_timestamp(max);
|
||||
let file_meta = self.writer.finish().await.context(error::ParquetSnafu)?;
|
||||
let meta = FileMeta {
|
||||
region_id: self.region_id,
|
||||
file_id: self.file_id,
|
||||
time_range: (min_ts, max_ts),
|
||||
level: 0,
|
||||
file_size: self.writer.bytes_written() as u64,
|
||||
available_indexes: Default::default(),
|
||||
index_file_size: 0,
|
||||
num_rows: file_meta.num_rows as u64,
|
||||
num_row_groups: file_meta.row_groups.len() as u64,
|
||||
sequence: None, //todo(hl): use flushed sequence here.
|
||||
};
|
||||
Ok(meta)
|
||||
}
|
||||
}
|
||||
|
||||
/// Builds the data region subdir for metric physical tables.
|
||||
fn build_data_region_dir(catalog: &str, schema: &str, physical_region_id: RegionId) -> String {
|
||||
let storage_path = common_meta::ddl::utils::region_storage_path(&catalog, &schema);
|
||||
join_dir(
|
||||
&store_api::path_utils::region_dir(&storage_path, physical_region_id),
|
||||
DATA_REGION_SUBDIR,
|
||||
)
|
||||
}
|
||||
|
||||
fn get_or_calculate_timestamp_range(
|
||||
timestamp_range: Option<(i64, i64)>,
|
||||
rb: &RecordBatch,
|
||||
region_metadata: &RegionMetadataRef,
|
||||
) -> error::Result<(i64, i64)> {
|
||||
if let Some(range) = timestamp_range {
|
||||
return Ok(range);
|
||||
};
|
||||
|
||||
let ts = rb
|
||||
.column_by_name(®ion_metadata.time_index_column().column_schema.name)
|
||||
.expect("column not found");
|
||||
let arrow::datatypes::DataType::Timestamp(unit, _) = ts.data_type() else {
|
||||
unreachable!("expected timestamp types");
|
||||
};
|
||||
let primitives: PrimitiveArray<Int64Type> = match unit {
|
||||
TimeUnit::Second => ts
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampSecondArray>()
|
||||
.unwrap()
|
||||
.reinterpret_cast(),
|
||||
TimeUnit::Millisecond => ts
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampMillisecondArray>()
|
||||
.unwrap()
|
||||
.reinterpret_cast(),
|
||||
TimeUnit::Microsecond => ts
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampMicrosecondArray>()
|
||||
.unwrap()
|
||||
.reinterpret_cast(),
|
||||
TimeUnit::Nanosecond => ts
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampNanosecondArray>()
|
||||
.unwrap()
|
||||
.reinterpret_cast(),
|
||||
};
|
||||
|
||||
let min = arrow::compute::min(&primitives).unwrap();
|
||||
let max = arrow::compute::max(&primitives).unwrap();
|
||||
Ok((min, max))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::SemanticType;
|
||||
use arrow::array::{Float64Array, StringArray};
|
||||
use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
|
||||
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
|
||||
use common_time::Timestamp;
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::schema::ColumnSchema;
|
||||
use object_store::services::MemoryConfig;
|
||||
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_build_data_region_dir_basic() {
|
||||
let result = build_data_region_dir("greptime", "public", RegionId::new(1024, 0));
|
||||
assert_eq!(&result, "data/greptime/public/1024/1024_0000000000/data/");
|
||||
}
|
||||
|
||||
fn create_test_region_metadata() -> RegionMetadataRef {
|
||||
let mut builder = RegionMetadataBuilder::new(RegionId::new(1024, 0));
|
||||
builder
|
||||
.push_column_metadata(ColumnMetadata {
|
||||
column_schema: ColumnSchema::new(
|
||||
GREPTIME_TIMESTAMP,
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
false,
|
||||
),
|
||||
semantic_type: SemanticType::Timestamp,
|
||||
column_id: 1,
|
||||
})
|
||||
.push_column_metadata(ColumnMetadata {
|
||||
column_schema: ColumnSchema::new(
|
||||
GREPTIME_VALUE,
|
||||
ConcreteDataType::float64_datatype(),
|
||||
true,
|
||||
),
|
||||
semantic_type: SemanticType::Field,
|
||||
column_id: 2,
|
||||
})
|
||||
.push_column_metadata(ColumnMetadata {
|
||||
column_schema: ColumnSchema::new("tag", ConcreteDataType::string_datatype(), true),
|
||||
semantic_type: SemanticType::Tag,
|
||||
column_id: 3,
|
||||
})
|
||||
.primary_key(vec![3]);
|
||||
let metadata = builder.build().unwrap();
|
||||
Arc::new(metadata)
|
||||
}
|
||||
|
||||
fn create_test_record_batch() -> RecordBatch {
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new(
|
||||
GREPTIME_TIMESTAMP,
|
||||
DataType::Timestamp(TimeUnit::Millisecond, None),
|
||||
false,
|
||||
),
|
||||
Field::new(GREPTIME_VALUE, DataType::Float64, true),
|
||||
Field::new("tag", DataType::Utf8, true),
|
||||
]));
|
||||
|
||||
let timestamp_array = TimestampMillisecondArray::from(vec![1000, 2000, 3000]);
|
||||
let value_array = Float64Array::from(vec![Some(10.0), None, Some(30.0)]);
|
||||
let tag_array = StringArray::from(vec![Some("a"), Some("b"), Some("c")]);
|
||||
|
||||
RecordBatch::try_new(
|
||||
schema,
|
||||
vec![
|
||||
Arc::new(timestamp_array),
|
||||
Arc::new(value_array),
|
||||
Arc::new(tag_array),
|
||||
],
|
||||
)
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_parquet_writer_write_and_finish() {
|
||||
let object_store = ObjectStore::from_config(MemoryConfig::default())
|
||||
.unwrap()
|
||||
.finish();
|
||||
let factory = AccessLayerFactory { object_store };
|
||||
|
||||
let region_metadata = create_test_region_metadata();
|
||||
let mut writer = factory
|
||||
.create_sst_writer("test_catalog", "test_schema", region_metadata.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let batch = create_test_record_batch();
|
||||
|
||||
// Test writing a record batch
|
||||
writer.write_record_batch(&batch, None).await.unwrap();
|
||||
|
||||
// Test finishing the writer
|
||||
let file_meta = writer.finish().await.unwrap();
|
||||
|
||||
assert_eq!(file_meta.region_id, RegionId::new(1024, 0));
|
||||
assert_eq!(file_meta.level, 0);
|
||||
assert_eq!(file_meta.num_rows, 3);
|
||||
assert_eq!(file_meta.num_row_groups, 1);
|
||||
assert!(file_meta.file_size > 0);
|
||||
|
||||
assert_eq!(file_meta.time_range.0, Timestamp::new_millisecond(1000));
|
||||
assert_eq!(file_meta.time_range.1, Timestamp::new_millisecond(3000));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_parquet_writer_multiple_batches() {
|
||||
let object_store = ObjectStore::from_config(MemoryConfig::default())
|
||||
.unwrap()
|
||||
.finish();
|
||||
let factory = AccessLayerFactory { object_store };
|
||||
|
||||
let region_metadata = create_test_region_metadata();
|
||||
let mut writer = factory
|
||||
.create_sst_writer("test_catalog", "test_schema", region_metadata.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Write first batch
|
||||
let batch1 = create_test_record_batch();
|
||||
writer.write_record_batch(&batch1, None).await.unwrap();
|
||||
|
||||
// Create second batch with different timestamp range
|
||||
let schema = region_metadata.schema.arrow_schema().clone();
|
||||
let timestamp_array = TimestampMillisecondArray::from(vec![4000, 5000]);
|
||||
let value_array = Float64Array::from(vec![Some(40.0), Some(50.0)]);
|
||||
let tag_array = StringArray::from(vec![Some("d"), Some("e")]);
|
||||
|
||||
let batch2 = RecordBatch::try_new(
|
||||
schema,
|
||||
vec![
|
||||
Arc::new(timestamp_array),
|
||||
Arc::new(value_array),
|
||||
Arc::new(tag_array),
|
||||
],
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
writer.write_record_batch(&batch2, None).await.unwrap();
|
||||
|
||||
let file_meta = writer.finish().await.unwrap();
|
||||
|
||||
// Should have combined rows from both batches
|
||||
assert_eq!(file_meta.num_rows, 5);
|
||||
assert_eq!(file_meta.time_range.0, Timestamp::new_millisecond(1000));
|
||||
assert_eq!(file_meta.time_range.1, Timestamp::new_millisecond(5000));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_parquet_writer_with_provided_timestamp_range() {
|
||||
let object_store = ObjectStore::from_config(MemoryConfig::default())
|
||||
.unwrap()
|
||||
.finish();
|
||||
let factory = AccessLayerFactory { object_store };
|
||||
|
||||
let region_metadata = create_test_region_metadata();
|
||||
let mut writer = factory
|
||||
.create_sst_writer("test_catalog", "test_schema", region_metadata.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let batch = create_test_record_batch();
|
||||
|
||||
// Provide explicit timestamp range that differs from actual data
|
||||
let provided_range = (500, 6000);
|
||||
writer
|
||||
.write_record_batch(&batch, Some(provided_range))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let file_meta = writer.finish().await.unwrap();
|
||||
|
||||
assert_eq!(file_meta.time_range.0, Timestamp::new_millisecond(500));
|
||||
assert_eq!(file_meta.time_range.1, Timestamp::new_millisecond(6000));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_or_calculate_timestamp_range_with_provided_range() {
|
||||
let region_metadata = create_test_region_metadata();
|
||||
let batch = create_test_record_batch();
|
||||
|
||||
let provided_range = Some((100, 200));
|
||||
let result = get_or_calculate_timestamp_range(provided_range, &batch, ®ion_metadata);
|
||||
|
||||
assert!(result.is_ok());
|
||||
assert_eq!(result.unwrap(), (100, 200));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_or_calculate_timestamp_range_calculated() {
|
||||
let region_metadata = create_test_region_metadata();
|
||||
let batch = create_test_record_batch();
|
||||
|
||||
let result = get_or_calculate_timestamp_range(None, &batch, ®ion_metadata);
|
||||
|
||||
assert!(result.is_ok());
|
||||
assert_eq!(result.unwrap(), (1000, 3000));
|
||||
}
|
||||
}
|
||||
604
src/servers/src/batch_builder.rs
Normal file
604
src/servers/src/batch_builder.rs
Normal file
@@ -0,0 +1,604 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::value::ValueData;
|
||||
use api::v1::{ColumnDataType, ColumnSchema, OpType, SemanticType};
|
||||
use arrow::array::{
|
||||
ArrayBuilder, ArrayRef, BinaryBuilder, Float64Array, RecordBatch, TimestampMillisecondArray,
|
||||
UInt64Array, UInt8Array,
|
||||
};
|
||||
use arrow::compute;
|
||||
use arrow_schema::Field;
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use common_meta::node_manager::NodeManagerRef;
|
||||
use common_query::prelude::{GREPTIME_PHYSICAL_TABLE, GREPTIME_TIMESTAMP, GREPTIME_VALUE};
|
||||
use common_telemetry::info;
|
||||
use itertools::Itertools;
|
||||
use metric_engine::row_modifier::{RowModifier, RowsIter};
|
||||
use mito_codec::row_converter::SparsePrimaryKeyCodec;
|
||||
use operator::schema_helper::{
|
||||
ensure_logical_tables_for_metrics, metadatas_for_region_ids, LogicalSchema, LogicalSchemas,
|
||||
SchemaHelper,
|
||||
};
|
||||
use partition::manager::PartitionRuleManagerRef;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::storage::consts::{
|
||||
ReservedColumnId, OP_TYPE_COLUMN_NAME, PRIMARY_KEY_COLUMN_NAME, SEQUENCE_COLUMN_NAME,
|
||||
};
|
||||
use store_api::storage::{ColumnId, RegionId};
|
||||
use table::metadata::TableId;
|
||||
|
||||
use crate::error;
|
||||
use crate::prom_row_builder::{PromCtx, TableBuilder};
|
||||
|
||||
pub struct MetricsBatchBuilder {
|
||||
schema_helper: SchemaHelper,
|
||||
builders:
|
||||
HashMap<String /*schema*/, HashMap<RegionId /*physical table name*/, BatchEncoder>>,
|
||||
partition_manager: PartitionRuleManagerRef,
|
||||
node_manager: NodeManagerRef,
|
||||
}
|
||||
|
||||
impl MetricsBatchBuilder {
|
||||
pub fn new(
|
||||
schema_helper: SchemaHelper,
|
||||
partition_manager: PartitionRuleManagerRef,
|
||||
node_manager: NodeManagerRef,
|
||||
) -> Self {
|
||||
MetricsBatchBuilder {
|
||||
schema_helper,
|
||||
builders: Default::default(),
|
||||
partition_manager,
|
||||
node_manager,
|
||||
}
|
||||
}
|
||||
|
||||
/// Detected the DDL requirements according to the staged table rows.
|
||||
pub async fn create_or_alter_physical_tables(
|
||||
&self,
|
||||
tables: &HashMap<PromCtx, HashMap<String, TableBuilder>>,
|
||||
query_ctx: &QueryContextRef,
|
||||
) -> error::Result<()> {
|
||||
// Physical table name -> logical tables -> tags in logical table
|
||||
let mut tags: HashMap<String, HashMap<String, HashSet<String>>> = HashMap::default();
|
||||
let catalog = query_ctx.current_catalog();
|
||||
let schema = query_ctx.current_schema();
|
||||
|
||||
for (ctx, tables) in tables {
|
||||
for (logical_table_name, table_builder) in tables {
|
||||
let physical_table_name = self
|
||||
.determine_physical_table_name(
|
||||
logical_table_name,
|
||||
&ctx.physical_table,
|
||||
catalog,
|
||||
&schema,
|
||||
)
|
||||
.await?;
|
||||
tags.entry(physical_table_name)
|
||||
.or_default()
|
||||
.entry(logical_table_name.clone())
|
||||
.or_default()
|
||||
.extend(table_builder.tags().cloned());
|
||||
}
|
||||
}
|
||||
let logical_schemas = tags_to_logical_schemas(tags);
|
||||
ensure_logical_tables_for_metrics(&self.schema_helper, &logical_schemas, query_ctx)
|
||||
.await
|
||||
.context(error::OperatorSnafu)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Finds physical table id for logical table.
|
||||
async fn determine_physical_table_name(
|
||||
&self,
|
||||
logical_table_name: &str,
|
||||
physical_table_name: &Option<String>,
|
||||
catalog: &str,
|
||||
schema: &str,
|
||||
) -> error::Result<String> {
|
||||
let logical_table = self
|
||||
.schema_helper
|
||||
.get_table(catalog, schema, logical_table_name)
|
||||
.await
|
||||
.context(error::OperatorSnafu)?;
|
||||
if let Some(logical_table) = logical_table {
|
||||
// logical table already exist, just return the physical table
|
||||
let logical_table_id = logical_table.table_info().table_id();
|
||||
let physical_table_id = self
|
||||
.schema_helper
|
||||
.table_route_manager()
|
||||
.get_physical_table_id(logical_table_id)
|
||||
.await
|
||||
.context(error::CommonMetaSnafu)?;
|
||||
let physical_table = self
|
||||
.schema_helper
|
||||
.catalog_manager()
|
||||
.tables_by_ids(catalog, schema, &[physical_table_id])
|
||||
.await
|
||||
.context(error::CatalogSnafu)?
|
||||
.swap_remove(0);
|
||||
return Ok(physical_table.table_info().name.clone());
|
||||
}
|
||||
|
||||
// Logical table not exist, try assign logical table to a physical table.
|
||||
let physical_table_name = physical_table_name
|
||||
.as_deref()
|
||||
.unwrap_or(GREPTIME_PHYSICAL_TABLE);
|
||||
Ok(physical_table_name.to_string())
|
||||
}
|
||||
|
||||
/// Retrieves physical region metadata of given logical table names.
|
||||
///
|
||||
/// The `logical_tables` is a list of table names, each entry contains the schema name and the table name.
|
||||
/// Returns the following mapping: `schema => logical table => (logical table id, region 0 metadata of the physical table)`.
|
||||
pub(crate) async fn collect_physical_region_metadata(
|
||||
&self,
|
||||
logical_tables: &[(String, String)],
|
||||
query_ctx: &QueryContextRef,
|
||||
) -> error::Result<HashMap<String, HashMap<String, (TableId, RegionMetadataRef)>>> {
|
||||
let catalog = query_ctx.current_catalog();
|
||||
// Logical and physical table ids.
|
||||
let mut table_ids = Vec::with_capacity(logical_tables.len());
|
||||
let mut physical_region_ids = HashSet::new();
|
||||
for (schema, table_name) in logical_tables {
|
||||
let logical_table = self
|
||||
.schema_helper
|
||||
.get_table(catalog, schema, table_name)
|
||||
.await
|
||||
.context(error::OperatorSnafu)?
|
||||
.context(error::TableNotFoundSnafu {
|
||||
catalog,
|
||||
schema: schema,
|
||||
table: table_name,
|
||||
})?;
|
||||
let logical_table_id = logical_table.table_info().table_id();
|
||||
let physical_table_id = self
|
||||
.schema_helper
|
||||
.table_route_manager()
|
||||
.get_physical_table_id(logical_table_id)
|
||||
.await
|
||||
.context(error::CommonMetaSnafu)?;
|
||||
table_ids.push((logical_table_id, physical_table_id));
|
||||
// We only get metadata from region 0.
|
||||
physical_region_ids.insert(RegionId::new(physical_table_id, 0));
|
||||
}
|
||||
|
||||
// Batch get physical metadata.
|
||||
let physical_region_ids = physical_region_ids.into_iter().collect_vec();
|
||||
let region_metadatas = metadatas_for_region_ids(
|
||||
&self.partition_manager,
|
||||
&self.node_manager,
|
||||
&physical_region_ids,
|
||||
query_ctx,
|
||||
)
|
||||
.await
|
||||
.context(error::OperatorSnafu)?;
|
||||
let mut result_map: HashMap<_, HashMap<_, _>> = HashMap::new();
|
||||
let region_metadatas: HashMap<_, _> = region_metadatas
|
||||
.into_iter()
|
||||
.flatten()
|
||||
.map(|meta| (meta.region_id, Arc::new(meta)))
|
||||
.collect();
|
||||
for (i, (schema, table_name)) in logical_tables.iter().enumerate() {
|
||||
let physical_table_id = table_ids[i].1;
|
||||
let physical_region_id = RegionId::new(physical_table_id, 0);
|
||||
let physical_metadata =
|
||||
region_metadatas.get(&physical_region_id).with_context(|| {
|
||||
error::UnexpectedResultSnafu {
|
||||
reason: format!(
|
||||
"Physical region metadata {} for table {} not found",
|
||||
physical_region_id, table_name
|
||||
),
|
||||
}
|
||||
})?;
|
||||
|
||||
match result_map.get_mut(schema) {
|
||||
Some(table_map) => {
|
||||
table_map.insert(
|
||||
table_name.clone(),
|
||||
(table_ids[i].0, physical_metadata.clone()),
|
||||
);
|
||||
}
|
||||
None => {
|
||||
let mut table_map = HashMap::new();
|
||||
table_map.insert(
|
||||
table_name.clone(),
|
||||
(table_ids[i].0, physical_metadata.clone()),
|
||||
);
|
||||
result_map.insert(schema.to_string(), table_map);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(result_map)
|
||||
}
|
||||
|
||||
/// Builds [RecordBatch] from rows with primary key encoded.
|
||||
/// Potentially we also need to modify the column name of timestamp and value field to
|
||||
/// match the schema of physical tables.
|
||||
/// Note:
|
||||
/// Make sure all logical table and physical table are created when reach here and the mapping
|
||||
/// from logical table name to physical table ref is stored in [physical_region_metadata].
|
||||
pub(crate) async fn append_rows_to_batch(
|
||||
&mut self,
|
||||
current_catalog: Option<String>,
|
||||
current_schema: Option<String>,
|
||||
table_data: &mut HashMap<PromCtx, HashMap<String, TableBuilder>>,
|
||||
physical_region_metadata: &HashMap<
|
||||
String, /*schema name*/
|
||||
HashMap<
|
||||
String, /*logical table name*/
|
||||
(TableId /*logical table id*/, RegionMetadataRef),
|
||||
>,
|
||||
>,
|
||||
) -> error::Result<()> {
|
||||
for (ctx, tables_in_schema) in table_data {
|
||||
// use session catalog.
|
||||
let catalog = current_catalog.as_deref().unwrap_or(DEFAULT_CATALOG_NAME);
|
||||
// schema in PromCtx precedes session schema.
|
||||
let schema = ctx
|
||||
.schema
|
||||
.as_deref()
|
||||
.or(current_schema.as_deref())
|
||||
.unwrap_or(DEFAULT_SCHEMA_NAME);
|
||||
// Look up physical region metadata by schema and table name
|
||||
let schema_metadata =
|
||||
physical_region_metadata
|
||||
.get(schema)
|
||||
.context(error::TableNotFoundSnafu {
|
||||
catalog,
|
||||
schema,
|
||||
table: "",
|
||||
})?;
|
||||
|
||||
for (logical_table_name, table) in tables_in_schema {
|
||||
let (logical_table_id, physical_table) = schema_metadata
|
||||
.get(logical_table_name)
|
||||
.context(error::TableNotFoundSnafu {
|
||||
catalog,
|
||||
schema,
|
||||
table: logical_table_name,
|
||||
})?;
|
||||
|
||||
let encoder = self
|
||||
.builders
|
||||
.entry(schema.to_string())
|
||||
.or_default()
|
||||
.entry(physical_table.region_id)
|
||||
.or_insert_with(|| Self::create_sparse_encoder(&physical_table));
|
||||
let name_to_id: HashMap<_, _> = physical_table
|
||||
.column_metadatas
|
||||
.iter()
|
||||
.map(|c| (c.column_schema.name.clone(), c.column_id))
|
||||
.collect();
|
||||
let _ = std::mem::replace(encoder.name_to_id_mut(), name_to_id);
|
||||
encoder.append_rows(*logical_table_id, std::mem::take(table))?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Finishes current record batch builder and returns record batches grouped by physical table id.
|
||||
pub(crate) fn finish(
|
||||
self,
|
||||
) -> error::Result<
|
||||
HashMap<
|
||||
String, /*schema name*/
|
||||
HashMap<RegionId /*physical region id*/, Vec<(RecordBatch, (i64, i64))>>,
|
||||
>,
|
||||
> {
|
||||
let mut table_batches: HashMap<String, HashMap<RegionId, Vec<(RecordBatch, (i64, i64))>>> =
|
||||
HashMap::with_capacity(self.builders.len());
|
||||
|
||||
for (schema_name, schema_tables) in self.builders {
|
||||
let schema_batches = table_batches.entry(schema_name).or_default();
|
||||
for (physical_region_id, table_data) in schema_tables {
|
||||
let rb = table_data.finish()?;
|
||||
if !rb.is_empty() {
|
||||
schema_batches
|
||||
.entry(physical_region_id)
|
||||
.or_default()
|
||||
.extend(rb);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(table_batches)
|
||||
}
|
||||
|
||||
/// Creates Encoder that converts Rows into RecordBatch with primary key encoded.
|
||||
fn create_sparse_encoder(physical_region_meta: &RegionMetadataRef) -> BatchEncoder {
|
||||
let name_to_id: HashMap<_, _> = physical_region_meta
|
||||
.column_metadatas
|
||||
.iter()
|
||||
.map(|c| (c.column_schema.name.clone(), c.column_id))
|
||||
.collect();
|
||||
BatchEncoder::new(name_to_id)
|
||||
}
|
||||
}
|
||||
|
||||
struct Columns {
|
||||
encoded_primary_key_array_builder: BinaryBuilder,
|
||||
timestamps: Vec<i64>,
|
||||
value: Vec<f64>,
|
||||
timestamp_range: Option<(i64, i64)>,
|
||||
}
|
||||
|
||||
impl Columns {
|
||||
fn pk_offset(&self) -> usize {
|
||||
self.encoded_primary_key_array_builder
|
||||
.offsets_slice()
|
||||
.last()
|
||||
.copied()
|
||||
.unwrap_or(0) as usize
|
||||
}
|
||||
|
||||
fn estimated_size(&self) -> usize {
|
||||
let value_size = self.encoded_primary_key_array_builder.values_slice().len();
|
||||
let offset_size = self.encoded_primary_key_array_builder.offsets_slice().len() * 4;
|
||||
let validity_sze = self
|
||||
.encoded_primary_key_array_builder
|
||||
.validity_slice()
|
||||
.map(|v| v.len())
|
||||
.unwrap_or(0);
|
||||
let timestamp_size = self.timestamps.len() * 8 + std::mem::size_of::<Vec<i64>>();
|
||||
let val_size = self.value.len() * 8 + std::mem::size_of::<Vec<f64>>();
|
||||
value_size + offset_size + validity_sze + timestamp_size + val_size + size_of::<Self>()
|
||||
}
|
||||
|
||||
fn push(&mut self, pk: &[u8], val: f64, timestamp: i64) {
|
||||
self.encoded_primary_key_array_builder.append_value(&pk);
|
||||
self.value.push(val);
|
||||
self.timestamps.push(timestamp);
|
||||
if let Some((min, max)) = &mut self.timestamp_range {
|
||||
*min = (*min).min(timestamp);
|
||||
*max = (*max).max(timestamp);
|
||||
} else {
|
||||
self.timestamp_range = Some((timestamp, timestamp));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for Columns {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
encoded_primary_key_array_builder: BinaryBuilder::with_capacity(16, 0),
|
||||
timestamps: Vec::with_capacity(16),
|
||||
value: Vec::with_capacity(16),
|
||||
timestamp_range: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct ColumnsBuilder {
|
||||
columns: Vec<Columns>,
|
||||
}
|
||||
|
||||
impl ColumnsBuilder {
|
||||
fn push(&mut self, pk: &[u8], val: f64, ts: i64) {
|
||||
let last = match self.columns.last_mut() {
|
||||
None => {
|
||||
self.columns.push(Columns::default());
|
||||
self.columns.last_mut().unwrap()
|
||||
}
|
||||
Some(last_builder) => {
|
||||
if last_builder.pk_offset() + pk.len() >= i32::MAX as usize {
|
||||
info!(
|
||||
"Current builder is full {}, rows: {}/{}",
|
||||
last_builder.pk_offset(),
|
||||
last_builder.encoded_primary_key_array_builder.len(),
|
||||
last_builder.timestamps.len()
|
||||
);
|
||||
// Current builder is full, create a new one
|
||||
self.columns.push(Columns::default());
|
||||
self.columns.last_mut().unwrap()
|
||||
} else {
|
||||
last_builder
|
||||
}
|
||||
}
|
||||
};
|
||||
last.push(pk, val, ts);
|
||||
}
|
||||
}
|
||||
|
||||
struct BatchEncoder {
|
||||
name_to_id: HashMap<String, ColumnId>,
|
||||
pk_codec: SparsePrimaryKeyCodec,
|
||||
columns_builder: ColumnsBuilder,
|
||||
}
|
||||
|
||||
impl BatchEncoder {
|
||||
fn new(name_to_id: HashMap<String, ColumnId>) -> BatchEncoder {
|
||||
Self {
|
||||
name_to_id,
|
||||
pk_codec: SparsePrimaryKeyCodec::schemaless(),
|
||||
columns_builder: ColumnsBuilder::default(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn estimated_size(&self) -> usize {
|
||||
self.columns_builder
|
||||
.columns
|
||||
.iter()
|
||||
.map(|v| v.estimated_size())
|
||||
.sum()
|
||||
}
|
||||
|
||||
pub(crate) fn total_rows(&self) -> usize {
|
||||
self.columns_builder
|
||||
.columns
|
||||
.iter()
|
||||
.map(|v| v.timestamps.len())
|
||||
.sum()
|
||||
}
|
||||
|
||||
pub(crate) fn name_to_id_mut(&mut self) -> &mut HashMap<String, ColumnId> {
|
||||
&mut self.name_to_id
|
||||
}
|
||||
|
||||
fn append_rows(
|
||||
&mut self,
|
||||
logical_table_id: TableId,
|
||||
mut table_builder: TableBuilder,
|
||||
) -> error::Result<()> {
|
||||
// todo(hl): we can simplified the row iter because schema in TableBuilder is known (ts, val, tags...)
|
||||
let row_insert_request = table_builder.as_row_insert_request("don't care".to_string());
|
||||
|
||||
let mut iter = RowsIter::new(row_insert_request.rows.unwrap(), &self.name_to_id);
|
||||
|
||||
let mut encode_buf = vec![];
|
||||
for row in iter.iter_mut() {
|
||||
let (table_id, ts_id) = RowModifier::fill_internal_columns(logical_table_id, &row);
|
||||
let internal_columns = [
|
||||
(
|
||||
ReservedColumnId::table_id(),
|
||||
api::helper::pb_value_to_value_ref(&table_id, &None),
|
||||
),
|
||||
(
|
||||
ReservedColumnId::tsid(),
|
||||
api::helper::pb_value_to_value_ref(&ts_id, &None),
|
||||
),
|
||||
];
|
||||
self.pk_codec
|
||||
.encode_to_vec(internal_columns.into_iter(), &mut encode_buf)
|
||||
.context(error::EncodePrimaryKeySnafu)?;
|
||||
self.pk_codec
|
||||
.encode_to_vec(row.primary_keys(), &mut encode_buf)
|
||||
.context(error::EncodePrimaryKeySnafu)?;
|
||||
// safety: field values cannot be null in prom remote write
|
||||
let ValueData::F64Value(val) = row.value_at(1).value_data.as_ref().unwrap() else {
|
||||
return error::InvalidFieldValueTypeSnafu.fail();
|
||||
};
|
||||
// process timestamp and field. We already know the position of timestamps and values in [TableBuilder].
|
||||
let ValueData::TimestampMillisecondValue(ts) =
|
||||
// safety: timestamp values cannot be null
|
||||
row.value_at(0).value_data.as_ref().unwrap()
|
||||
else {
|
||||
return error::InvalidTimestampValueTypeSnafu.fail();
|
||||
};
|
||||
|
||||
self.columns_builder.push(&encode_buf, *val, *ts);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn finish(self) -> error::Result<Vec<(RecordBatch, (i64, i64))>> {
|
||||
if self.columns_builder.columns.is_empty() {
|
||||
return Ok(vec![]);
|
||||
}
|
||||
|
||||
let mut res = Vec::with_capacity(self.columns_builder.columns.len());
|
||||
|
||||
for mut columns in self.columns_builder.columns {
|
||||
let num_rows = columns.timestamps.len();
|
||||
let value = Float64Array::from(columns.value);
|
||||
let timestamp = TimestampMillisecondArray::from(columns.timestamps);
|
||||
|
||||
let op_type = Arc::new(UInt8Array::from_value(OpType::Put as u8, num_rows)) as ArrayRef;
|
||||
// todo: now we set sequence all to 0.
|
||||
let sequence = Arc::new(UInt64Array::from_value(0, num_rows)) as ArrayRef;
|
||||
|
||||
let pk = columns.encoded_primary_key_array_builder.finish();
|
||||
let indices = compute::sort_to_indices(&pk, None, None).context(error::ArrowSnafu)?;
|
||||
|
||||
// Sort arrays
|
||||
let value = compute::take(&value, &indices, None).context(error::ArrowSnafu)?;
|
||||
let ts = compute::take(×tamp, &indices, None).context(error::ArrowSnafu)?;
|
||||
let pk = compute::take(&pk, &indices, None).context(error::ArrowSnafu)?;
|
||||
let rb =
|
||||
RecordBatch::try_new(physical_schema(), vec![value, ts, pk, sequence, op_type])
|
||||
.context(error::ArrowSnafu)?;
|
||||
res.push((rb, columns.timestamp_range.unwrap()))
|
||||
}
|
||||
|
||||
Ok(res)
|
||||
}
|
||||
}
|
||||
|
||||
fn tags_to_logical_schemas(
|
||||
tags: HashMap<String, HashMap<String, HashSet<String>>>,
|
||||
) -> LogicalSchemas {
|
||||
let schemas: HashMap<String, Vec<LogicalSchema>> = tags
|
||||
.into_iter()
|
||||
.map(|(physical, logical_tables)| {
|
||||
let schemas: Vec<_> = logical_tables
|
||||
.into_iter()
|
||||
.map(|(logical, tags)| {
|
||||
let mut columns: Vec<_> = tags
|
||||
.into_iter()
|
||||
.map(|tag_name| ColumnSchema {
|
||||
column_name: tag_name,
|
||||
datatype: ColumnDataType::String as i32,
|
||||
semantic_type: SemanticType::Tag as i32,
|
||||
..Default::default()
|
||||
})
|
||||
.collect();
|
||||
columns.push(ColumnSchema {
|
||||
column_name: GREPTIME_TIMESTAMP.to_string(),
|
||||
datatype: ColumnDataType::TimestampMillisecond as i32,
|
||||
semantic_type: SemanticType::Timestamp as i32,
|
||||
..Default::default()
|
||||
});
|
||||
columns.push(ColumnSchema {
|
||||
column_name: GREPTIME_VALUE.to_string(),
|
||||
datatype: ColumnDataType::Float64 as i32,
|
||||
semantic_type: SemanticType::Field as i32,
|
||||
..Default::default()
|
||||
});
|
||||
LogicalSchema {
|
||||
name: logical,
|
||||
columns,
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
(physical, schemas)
|
||||
})
|
||||
.collect();
|
||||
|
||||
LogicalSchemas { schemas }
|
||||
}
|
||||
|
||||
/// Creates the schema of output record batch.
|
||||
pub fn physical_schema() -> arrow::datatypes::SchemaRef {
|
||||
Arc::new(arrow::datatypes::Schema::new(vec![
|
||||
Field::new(GREPTIME_VALUE, arrow::datatypes::DataType::Float64, false),
|
||||
Field::new(
|
||||
GREPTIME_TIMESTAMP,
|
||||
arrow::datatypes::DataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
|
||||
false,
|
||||
),
|
||||
Field::new(
|
||||
PRIMARY_KEY_COLUMN_NAME,
|
||||
arrow::datatypes::DataType::Binary,
|
||||
false,
|
||||
),
|
||||
Field::new(
|
||||
SEQUENCE_COLUMN_NAME,
|
||||
arrow::datatypes::DataType::UInt64,
|
||||
false,
|
||||
),
|
||||
Field::new(
|
||||
OP_TYPE_COLUMN_NAME,
|
||||
arrow::datatypes::DataType::UInt8,
|
||||
false,
|
||||
),
|
||||
]))
|
||||
}
|
||||
@@ -624,6 +624,64 @@ pub enum Error {
|
||||
|
||||
#[snafu(display("Unknown hint: {}", hint))]
|
||||
UnknownHint { hint: String },
|
||||
|
||||
#[snafu(display("Failed to invoke common_meta"))]
|
||||
CommonMeta {
|
||||
source: common_meta::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to invoke operator"))]
|
||||
Operator {
|
||||
source: operator::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to encode primary key"))]
|
||||
EncodePrimaryKey {
|
||||
source: mito_codec::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Invalid timestamp value type in row data, expected TimestampMillisecondValue"
|
||||
))]
|
||||
InvalidTimestampValueType {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid field value type in row data, expected F64Value"))]
|
||||
InvalidFieldValueType {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to operate object store"))]
|
||||
Opendal {
|
||||
#[snafu(source)]
|
||||
error: object_store::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to operate object store"))]
|
||||
ObjectStore {
|
||||
source: object_store::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to operate object store"))]
|
||||
Parquet {
|
||||
#[snafu(source)]
|
||||
error: parquet::errors::ParquetError,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
@@ -747,6 +805,15 @@ impl ErrorExt for Error {
|
||||
DurationOverflow { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
HandleOtelArrowRequest { .. } => StatusCode::Internal,
|
||||
CommonMeta { source, .. } => source.status_code(),
|
||||
Operator { source, .. } => source.status_code(),
|
||||
EncodePrimaryKey { source, .. } => source.status_code(),
|
||||
InvalidTimestampValueType { .. } | InvalidFieldValueType { .. } => {
|
||||
StatusCode::Unexpected
|
||||
}
|
||||
ObjectStore { source, .. } => source.status_code(),
|
||||
Parquet { .. } => StatusCode::Internal,
|
||||
Opendal { .. } => StatusCode::Internal,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -76,7 +76,6 @@ use crate::query_handler::sql::ServerSqlQueryHandlerRef;
|
||||
use crate::query_handler::{
|
||||
InfluxdbLineProtocolHandlerRef, JaegerQueryHandlerRef, LogQueryHandlerRef,
|
||||
OpenTelemetryProtocolHandlerRef, OpentsdbProtocolHandlerRef, PipelineHandlerRef,
|
||||
PromStoreProtocolHandlerRef,
|
||||
};
|
||||
use crate::server::Server;
|
||||
|
||||
@@ -566,20 +565,7 @@ impl HttpServerBuilder {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_prom_handler(
|
||||
self,
|
||||
handler: PromStoreProtocolHandlerRef,
|
||||
pipeline_handler: Option<PipelineHandlerRef>,
|
||||
prom_store_with_metric_engine: bool,
|
||||
prom_validation_mode: PromValidationMode,
|
||||
) -> Self {
|
||||
let state = PromStoreState {
|
||||
prom_store_handler: handler,
|
||||
pipeline_handler,
|
||||
prom_store_with_metric_engine,
|
||||
prom_validation_mode,
|
||||
};
|
||||
|
||||
pub fn with_prom_handler(self, state: PromStoreState) -> Self {
|
||||
Self {
|
||||
router: self.router.nest(
|
||||
&format!("/{HTTP_API_VERSION}/prometheus"),
|
||||
|
||||
@@ -12,7 +12,9 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
use api::prom_store::remote::ReadRequest;
|
||||
use axum::body::Bytes;
|
||||
@@ -22,22 +24,29 @@ use axum::response::IntoResponse;
|
||||
use axum::Extension;
|
||||
use axum_extra::TypedHeader;
|
||||
use common_catalog::consts::DEFAULT_SCHEMA_NAME;
|
||||
use common_meta::node_manager::NodeManagerRef;
|
||||
use common_query::prelude::GREPTIME_PHYSICAL_TABLE;
|
||||
use common_telemetry::tracing;
|
||||
use common_telemetry::{info, tracing};
|
||||
use hyper::HeaderMap;
|
||||
use lazy_static::lazy_static;
|
||||
use object_pool::Pool;
|
||||
use operator::schema_helper::SchemaHelper;
|
||||
use partition::manager::PartitionRuleManagerRef;
|
||||
use pipeline::util::to_pipeline_version;
|
||||
use pipeline::{ContextReq, PipelineDefinition};
|
||||
use prost::Message;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use session::context::{Channel, QueryContext};
|
||||
use session::context::{Channel, QueryContext, QueryContextRef};
|
||||
use snafu::prelude::*;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
|
||||
use crate::access_layer::AccessLayerFactory;
|
||||
use crate::batch_builder::MetricsBatchBuilder;
|
||||
use crate::error::{self, InternalSnafu, PipelineSnafu, Result};
|
||||
use crate::http::extractor::PipelineInfo;
|
||||
use crate::http::header::{write_cost_header_map, GREPTIME_DB_HEADER_METRICS};
|
||||
use crate::http::PromValidationMode;
|
||||
use crate::prom_row_builder::{PromCtx, TableBuilder, TablesBuilder};
|
||||
use crate::prom_store::{snappy_decompress, zstd_decompress};
|
||||
use crate::proto::{PromSeriesProcessor, PromWriteRequest};
|
||||
use crate::query_handler::{PipelineHandlerRef, PromStoreProtocolHandlerRef, PromStoreResponse};
|
||||
@@ -52,12 +61,174 @@ pub const DEFAULT_ENCODING: &str = "snappy";
|
||||
pub const VM_ENCODING: &str = "zstd";
|
||||
pub const VM_PROTO_VERSION: &str = "1";
|
||||
|
||||
/// Additional states for bulk write requests.
|
||||
#[derive(Clone)]
|
||||
pub struct PromBulkState {
|
||||
pub schema_helper: SchemaHelper,
|
||||
pub partition_manager: PartitionRuleManagerRef,
|
||||
pub node_manager: NodeManagerRef,
|
||||
pub access_layer_factory: AccessLayerFactory,
|
||||
pub tx: Option<
|
||||
Sender<(
|
||||
QueryContextRef,
|
||||
HashMap<PromCtx, HashMap<String, TableBuilder>>,
|
||||
)>,
|
||||
>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct PromStoreState {
|
||||
pub prom_store_handler: PromStoreProtocolHandlerRef,
|
||||
pub pipeline_handler: Option<PipelineHandlerRef>,
|
||||
pub prom_store_with_metric_engine: bool,
|
||||
pub prom_validation_mode: PromValidationMode,
|
||||
pub bulk_state: Option<PromBulkState>,
|
||||
}
|
||||
|
||||
impl PromBulkState {
|
||||
pub fn start_background_task(&mut self) {
|
||||
let (tx, mut rx) = tokio::sync::mpsc::channel::<(
|
||||
QueryContextRef,
|
||||
HashMap<PromCtx, HashMap<String, TableBuilder>>,
|
||||
)>(16);
|
||||
|
||||
self.tx = Some(tx);
|
||||
let schema_helper = self.schema_helper.clone();
|
||||
let partition_manager = self.partition_manager.clone();
|
||||
let node_manager = self.node_manager.clone();
|
||||
let access_layer_factory = self.access_layer_factory.clone();
|
||||
|
||||
let handle = tokio::spawn(async move {
|
||||
loop {
|
||||
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
|
||||
let start = Instant::now();
|
||||
let mut batch_builder = MetricsBatchBuilder::new(
|
||||
schema_helper.clone(),
|
||||
partition_manager.clone(),
|
||||
node_manager.clone(),
|
||||
);
|
||||
let mut physical_region_metadata_total = HashMap::new();
|
||||
let mut num_batches = 0;
|
||||
while let Some((query_context, mut tables)) = rx.recv().await {
|
||||
batch_builder
|
||||
.create_or_alter_physical_tables(&tables, &query_context)
|
||||
.await
|
||||
.unwrap();
|
||||
info!(
|
||||
"create_or_alter_physical_tables, elapsed time: {}ms",
|
||||
start.elapsed().as_millis()
|
||||
);
|
||||
|
||||
// Extract logical table names from tables for metadata collection
|
||||
let current_schema = query_context.current_schema();
|
||||
let logical_tables: Vec<(String, String)> = tables
|
||||
.iter()
|
||||
.flat_map(|(ctx, table_map)| {
|
||||
let schema = ctx.schema.as_deref().unwrap_or(¤t_schema);
|
||||
table_map
|
||||
.keys()
|
||||
.map(|table_name| (schema.to_string(), table_name.clone()))
|
||||
})
|
||||
.collect();
|
||||
|
||||
let start = Instant::now();
|
||||
// Gather all region metadata for region 0 of physical tables.
|
||||
let physical_region_metadata = batch_builder
|
||||
.collect_physical_region_metadata(&logical_tables, &query_context)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
physical_region_metadata_total.extend(physical_region_metadata);
|
||||
info!(
|
||||
"collect_physical_region_metadata, elapsed time: {}ms",
|
||||
start.elapsed().as_millis()
|
||||
);
|
||||
|
||||
let start = Instant::now();
|
||||
batch_builder
|
||||
.append_rows_to_batch(
|
||||
None,
|
||||
None,
|
||||
&mut tables,
|
||||
&physical_region_metadata_total,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
num_batches += 1;
|
||||
info!(
|
||||
"append_rows_to_batch, elapsed time: {}ms, batches: {}",
|
||||
num_batches,
|
||||
start.elapsed().as_millis()
|
||||
);
|
||||
|
||||
if num_batches >= 10 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let start = Instant::now();
|
||||
let record_batches = batch_builder.finish().unwrap();
|
||||
let physical_region_id_to_meta = physical_region_metadata_total
|
||||
.into_iter()
|
||||
.map(|(schema_name, tables)| {
|
||||
let region_id_to_meta = tables
|
||||
.into_values()
|
||||
.map(|(_, physical_region_meta)| {
|
||||
(physical_region_meta.region_id, physical_region_meta)
|
||||
})
|
||||
.collect::<HashMap<_, _>>();
|
||||
(schema_name, region_id_to_meta)
|
||||
})
|
||||
.collect::<HashMap<_, _>>();
|
||||
|
||||
info!("Finishing batches cost: {}ms", start.elapsed().as_millis());
|
||||
let start = Instant::now();
|
||||
|
||||
let mut tables_per_schema = HashMap::with_capacity(record_batches.len());
|
||||
let mut file_metas = vec![];
|
||||
for (schema_name, schema_batches) in record_batches {
|
||||
let tables_in_schema =
|
||||
tables_per_schema.entry(schema_name.clone()).or_insert(0);
|
||||
*tables_in_schema = *tables_in_schema + 1;
|
||||
let schema_regions = physical_region_id_to_meta
|
||||
.get(&schema_name)
|
||||
.expect("physical region schema not found");
|
||||
for (physical_region_id, record_batches) in schema_batches {
|
||||
let physical_region_metadata = schema_regions
|
||||
.get(&physical_region_id)
|
||||
.expect("physical region metadata not found");
|
||||
for (rb, time_range) in record_batches {
|
||||
let mut writer = access_layer_factory
|
||||
.create_sst_writer(
|
||||
"greptime", //todo(hl): use the catalog name in query context.
|
||||
&schema_name,
|
||||
physical_region_metadata.clone(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let start = Instant::now();
|
||||
info!("Created writer: {}", writer.file_id());
|
||||
writer
|
||||
.write_record_batch(&rb, Some(time_range))
|
||||
.await
|
||||
.unwrap();
|
||||
let file_meta = writer.finish().await.unwrap();
|
||||
info!(
|
||||
"Finished writer: {}, elapsed time: {}ms",
|
||||
writer.file_id(),
|
||||
start.elapsed().as_millis()
|
||||
);
|
||||
file_metas.push(file_meta);
|
||||
}
|
||||
}
|
||||
}
|
||||
info!(
|
||||
"upload sst files, elapsed time: {}ms, schema num: {} tables_per_schema: {:?}, file_metas: {:?}",
|
||||
start.elapsed().as_millis(),tables_per_schema.len(),tables_per_schema,file_metas
|
||||
);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
@@ -98,6 +269,7 @@ pub async fn remote_write(
|
||||
pipeline_handler,
|
||||
prom_store_with_metric_engine,
|
||||
prom_validation_mode,
|
||||
bulk_state,
|
||||
} = state;
|
||||
|
||||
if let Some(_vm_handshake) = params.get_vm_proto_version {
|
||||
@@ -132,6 +304,32 @@ pub async fn remote_write(
|
||||
processor.set_pipeline(pipeline_handler, query_ctx.clone(), pipeline_def);
|
||||
}
|
||||
|
||||
if let Some(state) = bulk_state {
|
||||
let context = PromBulkContext {
|
||||
schema_helper: state.schema_helper,
|
||||
query_ctx: query_ctx.clone(),
|
||||
partition_manager: state.partition_manager,
|
||||
node_manager: state.node_manager,
|
||||
access_layer_factory: state.access_layer_factory,
|
||||
};
|
||||
let builder = decode_remote_write_request_to_batch(
|
||||
is_zstd,
|
||||
body,
|
||||
prom_validation_mode,
|
||||
&mut processor,
|
||||
context,
|
||||
)
|
||||
.await?;
|
||||
state
|
||||
.tx
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.send((query_ctx, builder.tables))
|
||||
.await
|
||||
.unwrap();
|
||||
return Ok((StatusCode::NO_CONTENT, write_cost_header_map(0)).into_response());
|
||||
}
|
||||
|
||||
let req =
|
||||
decode_remote_write_request(is_zstd, body, prom_validation_mode, &mut processor).await?;
|
||||
|
||||
@@ -202,6 +400,15 @@ fn try_decompress(is_zstd: bool, body: &[u8]) -> Result<Bytes> {
|
||||
}))
|
||||
}
|
||||
|
||||
/// Context for processing remote write requests in bulk mode.
|
||||
pub struct PromBulkContext {
|
||||
pub(crate) schema_helper: SchemaHelper,
|
||||
pub(crate) query_ctx: QueryContextRef,
|
||||
pub(crate) partition_manager: PartitionRuleManagerRef,
|
||||
pub(crate) node_manager: NodeManagerRef,
|
||||
pub(crate) access_layer_factory: AccessLayerFactory,
|
||||
}
|
||||
|
||||
async fn decode_remote_write_request(
|
||||
is_zstd: bool,
|
||||
body: Bytes,
|
||||
@@ -236,6 +443,38 @@ async fn decode_remote_write_request(
|
||||
}
|
||||
}
|
||||
|
||||
async fn decode_remote_write_request_to_batch(
|
||||
is_zstd: bool,
|
||||
body: Bytes,
|
||||
prom_validation_mode: PromValidationMode,
|
||||
processor: &mut PromSeriesProcessor,
|
||||
bulk: PromBulkContext,
|
||||
) -> Result<TablesBuilder> {
|
||||
let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_ELAPSED.start_timer();
|
||||
|
||||
// due to vmagent's limitation, there is a chance that vmagent is
|
||||
// sending content type wrong so we have to apply a fallback with decoding
|
||||
// the content in another method.
|
||||
//
|
||||
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5301
|
||||
// see https://github.com/GreptimeTeam/greptimedb/issues/3929
|
||||
let buf = if let Ok(buf) = try_decompress(is_zstd, &body[..]) {
|
||||
buf
|
||||
} else {
|
||||
// fallback to the other compression method
|
||||
try_decompress(!is_zstd, &body[..])?
|
||||
};
|
||||
|
||||
let mut request = PROM_WRITE_REQUEST_POOL.pull(PromWriteRequest::default);
|
||||
|
||||
processor.use_pipeline = false;
|
||||
request
|
||||
.merge(buf, prom_validation_mode, processor)
|
||||
.context(error::DecodePromRemoteRequestSnafu)?;
|
||||
|
||||
Ok(std::mem::take(&mut request.table_data))
|
||||
}
|
||||
|
||||
async fn decode_remote_read_request(body: Bytes) -> Result<ReadRequest> {
|
||||
let buf = snappy_decompress(&body[..])?;
|
||||
|
||||
|
||||
@@ -21,7 +21,10 @@
|
||||
use datafusion_expr::LogicalPlan;
|
||||
use datatypes::schema::Schema;
|
||||
|
||||
pub mod access_layer;
|
||||
pub mod addrs;
|
||||
#[allow(dead_code)]
|
||||
mod batch_builder;
|
||||
pub mod configurator;
|
||||
pub(crate) mod elasticsearch;
|
||||
pub mod error;
|
||||
|
||||
@@ -13,16 +13,21 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::HashMap;
|
||||
use std::string::ToString;
|
||||
use std::time::Instant;
|
||||
|
||||
use ahash::HashMap;
|
||||
use api::prom_store::remote::Sample;
|
||||
use api::v1::value::ValueData;
|
||||
use api::v1::{ColumnDataType, ColumnSchema, Row, RowInsertRequest, Rows, SemanticType, Value};
|
||||
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
|
||||
use common_telemetry::info;
|
||||
use pipeline::{ContextOpt, ContextReq};
|
||||
use prost::DecodeError;
|
||||
|
||||
use crate::batch_builder::MetricsBatchBuilder;
|
||||
use crate::error::Result;
|
||||
use crate::http::prom_store::PromBulkContext;
|
||||
use crate::http::PromValidationMode;
|
||||
use crate::proto::{decode_string, PromLabel};
|
||||
use crate::repeated_field::Clear;
|
||||
@@ -38,7 +43,7 @@ pub struct PromCtx {
|
||||
#[derive(Default, Debug)]
|
||||
pub(crate) struct TablesBuilder {
|
||||
// schema -> table -> table_builder
|
||||
tables: HashMap<PromCtx, HashMap<String, TableBuilder>>,
|
||||
pub tables: HashMap<PromCtx, HashMap<String, TableBuilder>>,
|
||||
}
|
||||
|
||||
impl Clear for TablesBuilder {
|
||||
@@ -91,11 +96,113 @@ impl TablesBuilder {
|
||||
req
|
||||
})
|
||||
}
|
||||
|
||||
/// Converts [TablesBuilder] to record batch and clears inner states.
|
||||
pub(crate) async fn as_record_batches(&mut self, bulk_ctx: &PromBulkContext) -> Result<()> {
|
||||
let mut batch_builder = MetricsBatchBuilder::new(
|
||||
bulk_ctx.schema_helper.clone(),
|
||||
bulk_ctx.partition_manager.clone(),
|
||||
bulk_ctx.node_manager.clone(),
|
||||
);
|
||||
let mut tables = std::mem::take(&mut self.tables);
|
||||
|
||||
let start = Instant::now();
|
||||
batch_builder
|
||||
.create_or_alter_physical_tables(&tables, &bulk_ctx.query_ctx)
|
||||
.await?;
|
||||
info!(
|
||||
"create_or_alter_physical_tables, elapsed time: {}ms",
|
||||
start.elapsed().as_millis()
|
||||
);
|
||||
|
||||
// Extract logical table names from tables for metadata collection
|
||||
let current_schema = bulk_ctx.query_ctx.current_schema();
|
||||
let logical_tables: Vec<(String, String)> = tables
|
||||
.iter()
|
||||
.flat_map(|(ctx, table_map)| {
|
||||
let schema = ctx.schema.as_deref().unwrap_or(¤t_schema);
|
||||
table_map
|
||||
.keys()
|
||||
.map(|table_name| (schema.to_string(), table_name.clone()))
|
||||
})
|
||||
.collect();
|
||||
|
||||
let start = Instant::now();
|
||||
// Gather all region metadata for region 0 of physical tables.
|
||||
let physical_region_metadata = batch_builder
|
||||
.collect_physical_region_metadata(&logical_tables, &bulk_ctx.query_ctx)
|
||||
.await?;
|
||||
|
||||
info!(
|
||||
"collect_physical_region_metadata, elapsed time: {}ms",
|
||||
start.elapsed().as_millis()
|
||||
);
|
||||
|
||||
let start = Instant::now();
|
||||
batch_builder
|
||||
.append_rows_to_batch(None, None, &mut tables, &physical_region_metadata)
|
||||
.await?;
|
||||
let record_batches = batch_builder.finish()?;
|
||||
info!(
|
||||
"append_rows_to_batch, elapsed time: {}ms",
|
||||
start.elapsed().as_millis()
|
||||
);
|
||||
|
||||
let physical_region_id_to_meta = physical_region_metadata
|
||||
.into_iter()
|
||||
.map(|(schema_name, tables)| {
|
||||
let region_id_to_meta = tables
|
||||
.into_values()
|
||||
.map(|(_, physical_region_meta)| {
|
||||
(physical_region_meta.region_id, physical_region_meta)
|
||||
})
|
||||
.collect::<HashMap<_, _>>();
|
||||
(schema_name, region_id_to_meta)
|
||||
})
|
||||
.collect::<HashMap<_, _>>();
|
||||
|
||||
let start = Instant::now();
|
||||
|
||||
let mut tables_per_schema = HashMap::with_capacity(record_batches.len());
|
||||
let mut file_metas = vec![];
|
||||
for (schema_name, schema_batches) in record_batches {
|
||||
let tables_in_schema = tables_per_schema.entry(schema_name.clone()).or_insert(0);
|
||||
*tables_in_schema = *tables_in_schema + 1;
|
||||
let schema_regions = physical_region_id_to_meta
|
||||
.get(&schema_name)
|
||||
.expect("physical region metadata not found");
|
||||
for (physical_region_id, record_batches) in schema_batches {
|
||||
let physical_region_metadata = schema_regions
|
||||
.get(&physical_region_id)
|
||||
.expect("physical region metadata not found");
|
||||
for (rb, time_range) in record_batches {
|
||||
let mut writer = bulk_ctx
|
||||
.access_layer_factory
|
||||
.create_sst_writer(
|
||||
"greptime", //todo(hl): use the catalog name in query context.
|
||||
&schema_name,
|
||||
physical_region_metadata.clone(),
|
||||
)
|
||||
.await?;
|
||||
writer.write_record_batch(&rb, Some(time_range)).await?;
|
||||
let file_meta = writer.finish().await?;
|
||||
file_metas.push(file_meta);
|
||||
}
|
||||
}
|
||||
}
|
||||
info!(
|
||||
"upload sst files, elapsed time: {}ms, schema num: {} tables_per_schema: {:?}, file_metas: {:?}",
|
||||
start.elapsed().as_millis(),
|
||||
tables_per_schema.len(),
|
||||
tables_per_schema, file_metas
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Builder for one table.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct TableBuilder {
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct TableBuilder {
|
||||
/// Column schemas.
|
||||
schema: Vec<ColumnSchema>,
|
||||
/// Rows written.
|
||||
@@ -210,6 +317,13 @@ impl TableBuilder {
|
||||
rows: Some(Rows { schema, rows }),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn tags(&self) -> impl Iterator<Item = &String> {
|
||||
self.schema
|
||||
.iter()
|
||||
.filter(|v| v.semantic_type == SemanticType::Tag as i32)
|
||||
.map(|c| &c.column_name)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -28,8 +28,9 @@ use prost::DecodeError;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::OptionExt;
|
||||
|
||||
use crate::error::InternalSnafu;
|
||||
use crate::error::{InternalSnafu, Result};
|
||||
use crate::http::event::PipelineIngestRequest;
|
||||
use crate::http::prom_store::PromBulkContext;
|
||||
use crate::http::PromValidationMode;
|
||||
use crate::pipeline::run_pipeline;
|
||||
use crate::prom_row_builder::{PromCtx, TablesBuilder};
|
||||
@@ -283,7 +284,7 @@ pub(crate) fn decode_string(
|
||||
|
||||
#[derive(Default, Debug)]
|
||||
pub struct PromWriteRequest {
|
||||
table_data: TablesBuilder,
|
||||
pub table_data: TablesBuilder,
|
||||
series: PromTimeSeries,
|
||||
}
|
||||
|
||||
@@ -352,6 +353,11 @@ impl PromWriteRequest {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Converts the write request into a record batch and reset the table data.
|
||||
pub async fn as_record_batch(&mut self, bulk_ctx: &PromBulkContext) -> Result<()> {
|
||||
self.table_data.as_record_batches(bulk_ctx).await
|
||||
}
|
||||
}
|
||||
|
||||
/// A hook to be injected into the PromWriteRequest decoding process.
|
||||
|
||||
@@ -28,6 +28,7 @@ use query::parser::PromQuery;
|
||||
use query::query_engine::DescribeResult;
|
||||
use servers::error::{Error, Result};
|
||||
use servers::http::header::{CONTENT_ENCODING_SNAPPY, CONTENT_TYPE_PROTOBUF};
|
||||
use servers::http::prom_store::PromStoreState;
|
||||
use servers::http::test_helpers::TestClient;
|
||||
use servers::http::{HttpOptions, HttpServerBuilder, PromValidationMode};
|
||||
use servers::prom_store;
|
||||
@@ -121,9 +122,16 @@ fn make_test_app(tx: mpsc::Sender<(String, Vec<u8>)>) -> Router {
|
||||
};
|
||||
|
||||
let instance = Arc::new(DummyInstance { tx });
|
||||
let state = PromStoreState {
|
||||
prom_store_handler: instance.clone(),
|
||||
pipeline_handler: None,
|
||||
prom_store_with_metric_engine: true,
|
||||
prom_validation_mode: PromValidationMode::Unchecked,
|
||||
bulk_state: None,
|
||||
};
|
||||
let server = HttpServerBuilder::new(http_opts)
|
||||
.with_sql_handler(instance.clone())
|
||||
.with_prom_handler(instance, None, true, PromValidationMode::Unchecked)
|
||||
.with_prom_handler(state)
|
||||
.build();
|
||||
server.build(server.make_app()).unwrap()
|
||||
}
|
||||
|
||||
@@ -213,6 +213,14 @@ impl TableMeta {
|
||||
.map(|(_, cs)| &cs.name)
|
||||
}
|
||||
|
||||
/// Returns names of primary keys.
|
||||
pub fn primary_key_names(&self) -> impl Iterator<Item = &String> {
|
||||
let columns_schemas = self.schema.column_schemas();
|
||||
self.primary_key_indices
|
||||
.iter()
|
||||
.map(|pk_idx| &columns_schemas[*pk_idx].name)
|
||||
}
|
||||
|
||||
/// Returns the new [TableMetaBuilder] after applying given `alter_kind`.
|
||||
///
|
||||
/// The returned builder would derive the next column id of this meta.
|
||||
|
||||
@@ -43,6 +43,7 @@ use object_store::ObjectStore;
|
||||
use servers::grpc::builder::GrpcServerBuilder;
|
||||
use servers::grpc::greptime_handler::GreptimeRequestHandler;
|
||||
use servers::grpc::{FlightCompression, GrpcOptions, GrpcServer, GrpcServerConfig};
|
||||
use servers::http::prom_store::PromStoreState;
|
||||
use servers::http::{HttpOptions, HttpServerBuilder, PromValidationMode};
|
||||
use servers::metrics_handler::MetricsHandler;
|
||||
use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
|
||||
@@ -534,15 +535,17 @@ pub async fn setup_test_prom_app_with_frontend(
|
||||
..Default::default()
|
||||
};
|
||||
let frontend_ref = instance.fe_instance().clone();
|
||||
let state = PromStoreState {
|
||||
prom_store_handler: frontend_ref.clone(),
|
||||
pipeline_handler: Some(frontend_ref.clone()),
|
||||
prom_store_with_metric_engine: true,
|
||||
prom_validation_mode: PromValidationMode::Strict,
|
||||
bulk_state: None,
|
||||
};
|
||||
let http_server = HttpServerBuilder::new(http_opts)
|
||||
.with_sql_handler(ServerSqlQueryHandlerAdapter::arc(frontend_ref.clone()))
|
||||
.with_logs_handler(instance.fe_instance().clone())
|
||||
.with_prom_handler(
|
||||
frontend_ref.clone(),
|
||||
Some(frontend_ref.clone()),
|
||||
true,
|
||||
PromValidationMode::Strict,
|
||||
)
|
||||
.with_prom_handler(state)
|
||||
.with_prometheus_handler(frontend_ref)
|
||||
.with_greptime_config_options(instance.opts.datanode_options().to_toml().unwrap())
|
||||
.build();
|
||||
|
||||
Reference in New Issue
Block a user