Compare commits

...

27 Commits

Author SHA1 Message Date
Lei, HUANG
7e79b4b2f6 poc/create-alter-for-metrics:
### Commit Message

 Enhance Prometheus Bulk Write Handling

 - **`server.rs`**: Introduced `start_background_task` in `PromBulkState` to handle asynchronous batch processing and SST file writing. Added a new `tx` field to manage task communication.
 - **`access_layer.rs`**: Added `file_id` method to `ParquetWriter` for file identification.
 - **`batch_builder.rs`**: Modified `MetricsBatchBuilder` to utilize session catalog and schema, and updated batch processing logic to handle column metadata.
 - **`prom_store.rs`**: Updated `remote_write` to use `decode_remote_write_request_to_batch` for batch processing and send data to the background task.
 - **`prom_row_builder.rs`**: Made `TableBuilder` and `TablesBuilder` fields public for external access.
 - **`proto.rs`**: Exposed `table_data` in `PromWriteRequest` for batch processing.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-06-30 08:27:26 +00:00
Lei, HUANG
4ad40af468 poc/create-alter-for-metrics:
- **Refactor `BatchEncoder` and `Columns`**:
   - Introduced `Columns` and `ColumnsBuilder` to manage primary key, timestamp, and value data more efficiently.
   - Updated `BatchEncoder` to utilize `ColumnsBuilder` for handling multiple record batches.
   - Modified `finish` method to return a vector of record batches instead of a single optional batch.
   - Added methods for estimating size and counting total rows in `BatchEncoder`.

 - **Enhance `MetricsBatchBuilder`**:
   - Changed the structure to store multiple record batches per region in `MetricsBatchBuilder`.

 - **Update `TablesBuilder` in `prom_row_builder.rs`**:
   - Modified to handle multiple record batches per region and collect file metadata for logging.

 - **Remove unnecessary logging in `prom_store.rs`**:
   - Removed the "Use bulk mode" log statement.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-06-29 14:07:57 +00:00
Lei, HUANG
e4b048e788 poc/create-alter-for-metrics:
### Commit Message

 Enhance Schema Handling and Add Telemetry Logging

 - **`access_layer.rs`**: Refactored to use `physical_schema` for schema creation and improved error handling with telemetry logging for batch writing.
 - **`batch_builder.rs`**: Introduced `physical_schema` function for schema creation and updated data structures to use `RegionId` for physical tables. Removed redundant schema function.
 - **`prom_store.rs`**: Added telemetry logging to track bulk mode usage and processing time.
 - **`prom_row_builder.rs`**: Implemented telemetry logging to measure elapsed time for key operations like table creation, metadata collection, and batch appending.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-06-28 09:58:33 +00:00
Lei, HUANG
ecbf372de3 poc/create-alter-for-metrics:
### Add Object Store Integration and Access Layer Factory

 - **Cargo.lock, Cargo.toml**: Added `object-store` as a dependency to integrate object storage capabilities.
 - **frontend.rs, instance.rs, builder.rs, server.rs**: Introduced `ObjectStoreConfig` and `AccessLayerFactory` to manage object storage configurations and access layers.
 - **access_layer.rs**: Made `AccessLayerFactory` clonable and its constructor public to facilitate object store access layer creation.
 - **prom_store.rs**: Updated `PromBulkState` to include `AccessLayerFactory` and modified `remote_write` to utilize the access layer for processing requests.
 - **lib.rs**: Made `access_layer` module public to expose access layer functionalities.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-06-27 08:50:25 +00:00
Lei, HUANG
3d81a17360 poc/create-alter-for-metrics:
### Commit Message

 Enhance Schema and Table Handling in MetricsBatchBuilder

 - **`access_layer.rs`**: Made `create_sst_writer` function public within the crate to facilitate SST writing.
 - **`batch_builder.rs`**: Updated `MetricsBatchBuilder` to handle builders as a nested `HashMap` for schemas and logical table names. Modified `finish` method to return record batches grouped by schema and logical table name.
 - **`prom_row_builder.rs`**: Renamed `as_record_batch` to `as_record_batches` and implemented logic to write record batches using `AccessLayerFactory`.
 - **`proto.rs`**: Updated method call from `as_record_batch` to `as_record_batches` to reflect the new method name.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-06-26 13:00:54 +00:00
Lei, HUANG
025cae3679 poc/create-alter-for-metrics:
- **Refactor `AccessLayerFactory`**: Updated the method for joining paths in `access_layer.rs` by replacing `join_dir` with `join_path` for file path construction.
 - **Enhance Testing**: Added comprehensive tests in `access_layer.rs` to verify the functionality of the Parquet writer, including writing multiple batches and handling provided timestamp ranges.
 - **Optimize `MetricsBatchBuilder`**: Simplified the loop in `batch_builder.rs` by removing unnecessary mutability in the encoder variable.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-06-26 11:31:29 +00:00
Lei, HUANG
68409e28ea poc/create-alter-for-metrics:
### Add Parquet Writer and Access Layer

 - **`Cargo.lock`, `Cargo.toml`**: Updated dependencies to include `parquet`, `object-store`, and `common-datasource`.
 - **`parquet_writer.rs`**: Introduced a new module for writing Parquet files asynchronously using `AsyncWriter`.
 - **`access_layer.rs`**: Added a new access layer for managing Parquet file writing, including `AccessLayerFactory` and `ParquetWriter` for handling record batches and metadata.

 ### Enhance Batch Builder

 - **`batch_builder.rs`**: Enhanced `BatchEncoder` to handle timestamp ranges and return optional record batches with timestamp metadata.

 ### Update Error Handling

 - **`error.rs`**: Added new error variants for handling object store and Parquet operations.

 ### Modify Mito2 Parquet Constants

 - **`parquet.rs`**: Changed visibility of `DEFAULT_ROW_GROUP_SIZE` to public for broader access.

 ### Add Tests

 - **`access_layer.rs`**: Added basic tests for building data region directories.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-06-26 09:17:53 +00:00
evenyag
699406ae32 refactor: update MetricsBatchBuilder to use physical region metadata
- Update append_rows_to_batch to accept physical_region_metadata parameter
- Refactor metadata lookup to use HashMap<String, HashMap<String, (TableId, RegionMetadataRef)>>
- Fix collect_physical_region_metadata call in TablesBuilder to extract logical table info
- Remove unused TableName import from batch_builder.rs

Signed-off-by: evenyag <realevenyag@gmail.com>
2025-06-25 16:44:41 +00:00
evenyag
344006deca feat: Implements collect_physical_region_metadata
Add partition_manager and node_manager to PromBulkState and PromBulkContext

Signed-off-by: evenyag <realevenyag@gmail.com>
2025-06-25 16:44:41 +00:00
Lei, HUANG
63803f2b43 poc/create-alter-for-metrics:
### Commit Message

 Enhance `MetricsBatchBuilder` and `TablesBuilder` functionalities

 - **`batch_builder.rs`**:
   - Made `collect_physical_region_metadata` and `append_rows_to_batch` methods public to allow external access.
   - Implemented the `finish` method to complete record batch building and return batches grouped by physical table ID.

 - **`prom_row_builder.rs`**:
   - Updated `TablesBuilder::build` to utilize `MetricsBatchBuilder` for creating or altering physical tables and appending rows.
   - Integrated collection of physical region metadata and finalized record batch processing.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-06-25 16:44:40 +00:00
Lei, HUANG
cf62767b98 poc/create-alter-for-metrics:
### Commit Message

 Enhance error handling in `batch_builder.rs` and `error.rs`

 - **`batch_builder.rs`**:
   - Replaced `todo!` with specific error handling for invalid timestamp and field value types using `InvalidTimestampValueTypeSnafu` and `InvalidFieldValueTypeSnafu`.

 - **`error.rs`**:
   - Added new error variants `InvalidTimestampValueType` and `InvalidFieldValueType` with descriptive messages.
   - Updated `status_code` method to handle new error types with `StatusCode::Unexpected`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-06-25 16:44:40 +00:00
Lei, HUANG
4e53c1531d poc/create-alter-for-metrics:
### Commit Message

 Enhance batch sorting in `batch_builder.rs`

 - Implement sorting of batches by primary key using `compute::sort_to_indices`.
 - Update `op_type` and `sequence` to use `Arc` for consistency.
 - Apply sorting to `value`, `timestamp`, and primary key arrays using `compute::take`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-06-25 16:44:40 +00:00
Lei, HUANG
892cb66c53 poc/create-alter-for-metrics:
- **Add `ahash` Dependency**: Updated `Cargo.lock` and `Cargo.toml` to include `ahash` as a dependency for the `metric-engine` project.
 - **Refactor `MetricsBatchBuilder`**: Modified `MetricsBatchBuilder` in `batch_builder.rs` to include a `builders` field and refactored methods to use `BatchEncoder` for appending rows.
 - **Update `BatchEncoder`**: Changed `BatchEncoder` in `batch_builder.rs` to use a `name_to_id` map and added `append_rows` and `finish` methods for handling row encoding.
 - **Modify `LogicalSchemas` Structure**: Updated `schema_helper.rs` to use `HashMap` instead of `ahash::HashMap` for the `schemas` field in `LogicalSchemas`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-06-25 16:44:39 +00:00
Lei, HUANG
8b392477c8 poc/create-alter-for-metrics:
### Commit Summary

 - **Add New Dependencies**: Updated `Cargo.lock` and `Cargo.toml` to include `metric-engine` and `mito-codec` dependencies.
 - **Enhance `MetricEngineInner`**: Modified `put.rs` to use `logical_table_id` instead of `table_id` and adjusted method calls accordingly.
 - **Expose Structs and Methods**: Made `RowModifier`, `RowsIter`, and `RowIter` structs and their methods public in `row_modifier.rs`.
 - **Implement Batch Processing**: Added batch processing logic in `batch_builder.rs` to handle row conversion to record batches with primary key encoding.
 - **Error Handling**: Introduced `EncodePrimaryKey` error variant in `error.rs` for handling primary key encoding errors.
 - **Clone Support for `TableBuilder`**: Added `Clone` trait to `TableBuilder` in `prom_row_builder.rs`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-06-25 16:44:39 +00:00
evenyag
905593dc16 feat: add bulk mode flag for prom store with SchemaHelper integration
Add a new bulk_mode flag to PromStoreOptions that enables bulk processing
for prometheus metrics ingestion. When enabled, initializes PromBulkState
with a SchemaHelper instance for efficient schema management.

Changes:
- Add bulk_mode field to PromStoreOptions (default: false)
- Add create_schema_helper() method to Instance for SchemaHelper construction
- Add getter methods to StatementExecutor for procedure_executor and cache_invalidator
- Update server initialization to conditionally create PromBulkState when bulk_mode is enabled
- Fix clippy warnings in schema_helper.rs

Signed-off-by: evenyag <realevenyag@gmail.com>
2025-06-25 16:44:38 +00:00
evenyag
6c04cb9b19 feat: use schema_helper to create/alter physical table
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-06-25 16:44:38 +00:00
Lei, HUANG
24da3367c1 poc/create-alter-for-metrics:
- **Add `operator` dependency**: Updated `Cargo.lock` and `Cargo.toml` to include the `operator` dependency.
 - **Expose structs and functions in `schema_helper.rs`**: Made `LogicalSchema`, `LogicalSchemas`, and `ensure_logical_tables_for_metrics` public in `src/operator/src/schema_helper.rs`.
 - **Refactor `batch_builder.rs`**:
   - Changed the logic for handling physical and logical tables, including the introduction of `tags_to_logical_schemas` function.
   - Modified `determine_physical_table_name` to return a string instead of a table reference.
   - Updated logic for managing tags and logical schemas in `MetricsBatchBuilder`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-06-25 16:44:38 +00:00
Lei, HUANG
80b14965a6 feat/frontend-stager:
### Commit Summary

 - **Enhancements in `MetricsBatchBuilder`:**
   - Added support for session catalog and schema in `create_or_alter_physical_tables`.
   - Introduced `physical_tables` mapping for logical to physical table references.
   - Implemented `rows_to_batch` to build `RecordBatch` from rows with primary key encoded.

 - **Function Modifications:**
   - Updated `determine_physical_table` to use session catalog and schema.
   - Modified `build_create_table_expr` and `build_alter_table_expr` to include additional parameters for schema compatibility.

 - **Error Handling:**
   - Added error handling for missing physical tables in `rows_to_batch`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-06-25 16:44:37 +00:00
Lei, HUANG
5da3f86d0c feat/frontend-stager:
### Add `MetricsBatchBuilder` for Physical Table Management

 - **New Feature**: Introduced `MetricsBatchBuilder` in `batch_builder.rs` to manage the creation and alteration of physical tables based on logical table tags.
 - **Error Handling**: Added `CommonMeta` error variant in `error.rs` to handle errors from `common_meta`.
 - **Enhancements**:
   - Added `tags` method in `prom_row_builder.rs` to retrieve tag names from `TableBuilder`.
   - Implemented `primary_key_names` method in `metadata.rs` to return primary key names from `TableMeta`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-06-25 16:44:37 +00:00
evenyag
151273d1df feat: get region metadata by ids
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-06-25 16:44:36 +00:00
evenyag
b0289dbdde refactor: extract logics for filling options
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-06-25 16:44:36 +00:00
evenyag
c51730a954 feat: create or alter logical tables for metrics
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-06-25 16:44:36 +00:00
evenyag
207709c727 refactor: remove Inserter::create_physical_table_on_demand
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-06-25 16:44:35 +00:00
evenyag
deca8c44fa refactor: StatementExecutor calls SchemaHelper to change schema
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-06-25 16:44:35 +00:00
evenyag
2edd861ce9 refactor: use SchemaHelper in Inserter
Remove the dependency to StatementExecutor from Inserter

Signed-off-by: evenyag <realevenyag@gmail.com>
2025-06-25 16:44:34 +00:00
evenyag
14f3a4ab05 refactor: copy alter logic to SchemaHelper
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-06-25 16:44:34 +00:00
evenyag
34875c0346 refactor: define schema_helper mod to handle schema creation
It has the same logic as Inserter and StatementExecutor

Signed-off-by: evenyag <realevenyag@gmail.com>
2025-06-25 16:44:34 +00:00
38 changed files with 2672 additions and 768 deletions

21
Cargo.lock generated
View File

@@ -4738,6 +4738,7 @@ dependencies = [
"log-store", "log-store",
"meta-client", "meta-client",
"num_cpus", "num_cpus",
"object-store",
"opentelemetry-proto 0.27.0", "opentelemetry-proto 0.27.0",
"operator", "operator",
"otel-arrow-rust", "otel-arrow-rust",
@@ -6698,7 +6699,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4" checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"windows-targets 0.48.5", "windows-targets 0.52.6",
] ]
[[package]] [[package]]
@@ -7234,6 +7235,7 @@ dependencies = [
name = "metric-engine" name = "metric-engine"
version = "0.15.0" version = "0.15.0"
dependencies = [ dependencies = [
"ahash 0.8.11",
"api", "api",
"aquamarine", "aquamarine",
"async-stream", "async-stream",
@@ -11230,6 +11232,7 @@ dependencies = [
"common-base", "common-base",
"common-catalog", "common-catalog",
"common-config", "common-config",
"common-datasource",
"common-error", "common-error",
"common-frontend", "common-frontend",
"common-grpc", "common-grpc",
@@ -11272,16 +11275,23 @@ dependencies = [
"local-ip-address", "local-ip-address",
"log-query", "log-query",
"loki-proto", "loki-proto",
"metric-engine",
"mime_guess", "mime_guess",
"mito-codec",
"mito2",
"mysql_async", "mysql_async",
"notify", "notify",
"object-pool", "object-pool",
"object-store",
"once_cell", "once_cell",
"openmetrics-parser", "openmetrics-parser",
"opensrv-mysql", "opensrv-mysql",
"opentelemetry-proto 0.27.0", "opentelemetry-proto 0.27.0",
"operator",
"otel-arrow-rust", "otel-arrow-rust",
"parking_lot 0.12.3", "parking_lot 0.12.3",
"parquet",
"partition",
"permutation", "permutation",
"pgwire", "pgwire",
"pin-project", "pin-project",
@@ -13856,12 +13866,13 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
[[package]] [[package]]
name = "uuid" name = "uuid"
version = "1.10.0" version = "1.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81dfa00651efa65069b0b6b651f4aaa31ba9e3c3ce0137aaad053604ee7e0314" checksum = "3cf4199d1e5d15ddd86a694e4d0dffa9c323ce759fea589f00fef9d81cc1931d"
dependencies = [ dependencies = [
"getrandom 0.2.15", "getrandom 0.3.2",
"rand 0.8.5", "js-sys",
"rand 0.9.0",
"serde", "serde",
"wasm-bindgen", "wasm-bindgen",
] ]

View File

@@ -21,6 +21,7 @@ pub mod error;
pub mod file_format; pub mod file_format;
pub mod lister; pub mod lister;
pub mod object_store; pub mod object_store;
pub mod parquet_writer;
pub mod share_buffer; pub mod share_buffer;
#[cfg(test)] #[cfg(test)]
pub mod test_util; pub mod test_util;

View File

@@ -0,0 +1,52 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use bytes::Bytes;
use futures::future::BoxFuture;
use object_store::Writer;
use parquet::arrow::async_writer::AsyncFileWriter;
use parquet::errors::ParquetError;
/// Bridges opendal [Writer] with parquet [AsyncFileWriter].
pub struct AsyncWriter {
inner: Writer,
}
impl AsyncWriter {
/// Create a [`AsyncWriter`] by given [`Writer`].
pub fn new(writer: Writer) -> Self {
Self { inner: writer }
}
}
impl AsyncFileWriter for AsyncWriter {
fn write(&mut self, bs: Bytes) -> BoxFuture<'_, parquet::errors::Result<()>> {
Box::pin(async move {
self.inner
.write(bs)
.await
.map_err(|err| ParquetError::External(Box::new(err)))
})
}
fn complete(&mut self) -> BoxFuture<'_, parquet::errors::Result<()>> {
Box::pin(async move {
self.inner
.close()
.await
.map(|_| ())
.map_err(|err| ParquetError::External(Box::new(err)))
})
}
}

View File

@@ -26,7 +26,7 @@ use common_error::ext::BoxedError;
use common_meta::cache::{LayeredCacheRegistryRef, TableFlownodeSetCacheRef, TableRouteCacheRef}; use common_meta::cache::{LayeredCacheRegistryRef, TableFlownodeSetCacheRef, TableRouteCacheRef};
use common_meta::ddl::ProcedureExecutorRef; use common_meta::ddl::ProcedureExecutorRef;
use common_meta::key::flow::FlowMetadataManagerRef; use common_meta::key::flow::FlowMetadataManagerRef;
use common_meta::key::TableMetadataManagerRef; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef; use common_meta::kv_backend::KvBackendRef;
use common_meta::node_manager::{Flownode, NodeManagerRef}; use common_meta::node_manager::{Flownode, NodeManagerRef};
use common_query::Output; use common_query::Output;
@@ -37,6 +37,7 @@ use greptime_proto::v1::flow::{flow_server, FlowRequest, FlowResponse, InsertReq
use itertools::Itertools; use itertools::Itertools;
use operator::delete::Deleter; use operator::delete::Deleter;
use operator::insert::Inserter; use operator::insert::Inserter;
use operator::schema_helper::SchemaHelper;
use operator::statement::StatementExecutor; use operator::statement::StatementExecutor;
use partition::manager::PartitionRuleManager; use partition::manager::PartitionRuleManager;
use query::{QueryEngine, QueryEngineFactory}; use query::{QueryEngine, QueryEngineFactory};
@@ -546,8 +547,14 @@ impl FrontendInvoker {
name: TABLE_FLOWNODE_SET_CACHE_NAME, name: TABLE_FLOWNODE_SET_CACHE_NAME,
})?; })?;
let inserter = Arc::new(Inserter::new( let schema_helper = SchemaHelper::new(
catalog_manager.clone(), catalog_manager.clone(),
Arc::new(TableMetadataManager::new(kv_backend.clone())),
procedure_executor.clone(),
layered_cache_registry.clone(),
);
let inserter = Arc::new(Inserter::new(
schema_helper,
partition_manager.clone(), partition_manager.clone(),
node_manager.clone(), node_manager.clone(),
table_flownode_cache, table_flownode_cache,
@@ -588,7 +595,7 @@ impl FrontendInvoker {
.start_timer(); .start_timer();
self.inserter self.inserter
.handle_row_inserts(requests, ctx, &self.statement_executor, false, false) .handle_row_inserts(requests, ctx, false, false)
.await .await
.map_err(BoxedError::new) .map_err(BoxedError::new)
.context(common_frontend::error::ExternalSnafu) .context(common_frontend::error::ExternalSnafu)

View File

@@ -49,6 +49,7 @@ log-query.workspace = true
log-store.workspace = true log-store.workspace = true
meta-client.workspace = true meta-client.workspace = true
num_cpus.workspace = true num_cpus.workspace = true
object-store.workspace = true
opentelemetry-proto.workspace = true opentelemetry-proto.workspace = true
operator.workspace = true operator.workspace = true
otel-arrow-rust.workspace = true otel-arrow-rust.workspace = true

View File

@@ -19,6 +19,7 @@ use common_config::config::Configurable;
use common_options::datanode::DatanodeClientOptions; use common_options::datanode::DatanodeClientOptions;
use common_telemetry::logging::{LoggingOptions, SlowQueryOptions, TracingOptions}; use common_telemetry::logging::{LoggingOptions, SlowQueryOptions, TracingOptions};
use meta_client::MetaClientOptions; use meta_client::MetaClientOptions;
use object_store::config::ObjectStoreConfig;
use query::options::QueryOptions; use query::options::QueryOptions;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use servers::export_metrics::{ExportMetricsOption, ExportMetricsTask}; use servers::export_metrics::{ExportMetricsOption, ExportMetricsTask};
@@ -62,6 +63,7 @@ pub struct FrontendOptions {
pub query: QueryOptions, pub query: QueryOptions,
pub max_in_flight_write_bytes: Option<ReadableSize>, pub max_in_flight_write_bytes: Option<ReadableSize>,
pub slow_query: Option<SlowQueryOptions>, pub slow_query: Option<SlowQueryOptions>,
pub store: ObjectStoreConfig,
} }
impl Default for FrontendOptions { impl Default for FrontendOptions {
@@ -88,6 +90,7 @@ impl Default for FrontendOptions {
query: QueryOptions::default(), query: QueryOptions::default(),
max_in_flight_write_bytes: None, max_in_flight_write_bytes: None,
slow_query: Some(SlowQueryOptions::default()), slow_query: Some(SlowQueryOptions::default()),
store: ObjectStoreConfig::default(),
} }
} }
} }
@@ -116,8 +119,7 @@ impl Frontend {
if let Some(t) = self.export_metrics_task.as_ref() { if let Some(t) = self.export_metrics_task.as_ref() {
if t.send_by_handler { if t.send_by_handler {
let inserter = self.instance.inserter().clone(); let inserter = self.instance.inserter().clone();
let statement_executor = self.instance.statement_executor().clone(); let handler = ExportMetricHandler::new_handler(inserter);
let handler = ExportMetricHandler::new_handler(inserter, statement_executor);
t.start(Some(handler)).context(error::StartServerSnafu)? t.start(Some(handler)).context(error::StartServerSnafu)?
} else { } else {
t.start(None).context(error::StartServerSnafu)?; t.start(None).context(error::StartServerSnafu)?;

View File

@@ -39,6 +39,7 @@ use common_config::KvBackendConfig;
use common_error::ext::{BoxedError, ErrorExt}; use common_error::ext::{BoxedError, ErrorExt};
use common_meta::key::TableMetadataManagerRef; use common_meta::key::TableMetadataManagerRef;
use common_meta::kv_backend::KvBackendRef; use common_meta::kv_backend::KvBackendRef;
use common_meta::node_manager::NodeManagerRef;
use common_meta::state_store::KvStateStore; use common_meta::state_store::KvStateStore;
use common_procedure::local::{LocalManager, ManagerConfig}; use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::options::ProcedureConfig; use common_procedure::options::ProcedureConfig;
@@ -49,7 +50,9 @@ use datafusion_expr::LogicalPlan;
use log_store::raft_engine::RaftEngineBackend; use log_store::raft_engine::RaftEngineBackend;
use operator::delete::DeleterRef; use operator::delete::DeleterRef;
use operator::insert::InserterRef; use operator::insert::InserterRef;
use operator::schema_helper::SchemaHelper;
use operator::statement::{StatementExecutor, StatementExecutorRef}; use operator::statement::{StatementExecutor, StatementExecutorRef};
use partition::manager::PartitionRuleManagerRef;
use pipeline::pipeline_operator::PipelineOperator; use pipeline::pipeline_operator::PipelineOperator;
use prometheus::HistogramTimer; use prometheus::HistogramTimer;
use promql_parser::label::Matcher; use promql_parser::label::Matcher;
@@ -58,6 +61,7 @@ use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
use query::query_engine::options::{validate_catalog_and_schema, QueryOptions}; use query::query_engine::options::{validate_catalog_and_schema, QueryOptions};
use query::query_engine::DescribeResult; use query::query_engine::DescribeResult;
use query::QueryEngineRef; use query::QueryEngineRef;
use servers::access_layer::AccessLayerFactory;
use servers::error as server_error; use servers::error as server_error;
use servers::error::{AuthSnafu, ExecuteQuerySnafu, ParsePromQLSnafu}; use servers::error::{AuthSnafu, ExecuteQuerySnafu, ParsePromQLSnafu};
use servers::interceptor::{ use servers::interceptor::{
@@ -100,6 +104,7 @@ pub struct Instance {
slow_query_recorder: Option<SlowQueryRecorder>, slow_query_recorder: Option<SlowQueryRecorder>,
limiter: Option<LimiterRef>, limiter: Option<LimiterRef>,
process_manager: ProcessManagerRef, process_manager: ProcessManagerRef,
access_layer_factory: AccessLayerFactory,
} }
impl Instance { impl Instance {
@@ -161,6 +166,27 @@ impl Instance {
pub fn process_manager(&self) -> &ProcessManagerRef { pub fn process_manager(&self) -> &ProcessManagerRef {
&self.process_manager &self.process_manager
} }
pub fn create_schema_helper(&self) -> SchemaHelper {
SchemaHelper::new(
self.catalog_manager.clone(),
self.table_metadata_manager.clone(),
self.statement_executor.procedure_executor().clone(),
self.statement_executor.cache_invalidator().clone(),
)
}
pub fn partition_manager(&self) -> &PartitionRuleManagerRef {
self.inserter.partition_manager()
}
pub fn node_manager(&self) -> &NodeManagerRef {
self.inserter.node_manager()
}
pub fn access_layer_factory(&self) -> &AccessLayerFactory {
&self.access_layer_factory
}
} }
fn parse_stmt(sql: &str, dialect: &(dyn Dialect + Send + Sync)) -> Result<Vec<Statement>> { fn parse_stmt(sql: &str, dialect: &(dyn Dialect + Send + Sync)) -> Result<Vec<Statement>> {

View File

@@ -30,12 +30,14 @@ use operator::flow::FlowServiceOperator;
use operator::insert::Inserter; use operator::insert::Inserter;
use operator::procedure::ProcedureServiceOperator; use operator::procedure::ProcedureServiceOperator;
use operator::request::Requester; use operator::request::Requester;
use operator::schema_helper::SchemaHelper;
use operator::statement::{StatementExecutor, StatementExecutorRef}; use operator::statement::{StatementExecutor, StatementExecutorRef};
use operator::table::TableMutationOperator; use operator::table::TableMutationOperator;
use partition::manager::PartitionRuleManager; use partition::manager::PartitionRuleManager;
use pipeline::pipeline_operator::PipelineOperator; use pipeline::pipeline_operator::PipelineOperator;
use query::region_query::RegionQueryHandlerFactoryRef; use query::region_query::RegionQueryHandlerFactoryRef;
use query::QueryEngineFactory; use query::QueryEngineFactory;
use servers::access_layer::AccessLayerFactory;
use snafu::OptionExt; use snafu::OptionExt;
use crate::error::{self, Result}; use crate::error::{self, Result};
@@ -130,8 +132,15 @@ impl FrontendBuilder {
name: TABLE_FLOWNODE_SET_CACHE_NAME, name: TABLE_FLOWNODE_SET_CACHE_NAME,
})?; })?;
let inserter = Arc::new(Inserter::new( let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
let schema_helper = SchemaHelper::new(
self.catalog_manager.clone(), self.catalog_manager.clone(),
table_metadata_manager.clone(),
self.procedure_executor.clone(),
local_cache_invalidator.clone(),
);
let inserter = Arc::new(Inserter::new(
schema_helper,
partition_manager.clone(), partition_manager.clone(),
node_manager.clone(), node_manager.clone(),
table_flownode_cache, table_flownode_cache,
@@ -176,7 +185,7 @@ impl FrontendBuilder {
self.catalog_manager.clone(), self.catalog_manager.clone(),
query_engine.clone(), query_engine.clone(),
self.procedure_executor, self.procedure_executor,
kv_backend.clone(), kv_backend,
local_cache_invalidator, local_cache_invalidator,
inserter.clone(), inserter.clone(),
table_route_cache, table_route_cache,
@@ -211,6 +220,7 @@ impl FrontendBuilder {
Arc::new(Limiter::new(max_in_flight_write_bytes.as_bytes())) Arc::new(Limiter::new(max_in_flight_write_bytes.as_bytes()))
}); });
let access_layer_factory = AccessLayerFactory::new(&self.options.store).await.unwrap();
Ok(Instance { Ok(Instance {
catalog_manager: self.catalog_manager, catalog_manager: self.catalog_manager,
pipeline_operator, pipeline_operator,
@@ -219,10 +229,11 @@ impl FrontendBuilder {
plugins, plugins,
inserter, inserter,
deleter, deleter,
table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend)), table_metadata_manager,
slow_query_recorder, slow_query_recorder,
limiter, limiter,
process_manager, process_manager,
access_layer_factory,
}) })
} }
} }

View File

@@ -408,7 +408,7 @@ impl Instance {
ctx: QueryContextRef, ctx: QueryContextRef,
) -> Result<Output> { ) -> Result<Output> {
self.inserter self.inserter
.handle_column_inserts(requests, ctx, self.statement_executor.as_ref()) .handle_column_inserts(requests, ctx)
.await .await
.context(TableOperationSnafu) .context(TableOperationSnafu)
} }
@@ -422,13 +422,7 @@ impl Instance {
is_single_value: bool, is_single_value: bool,
) -> Result<Output> { ) -> Result<Output> {
self.inserter self.inserter
.handle_row_inserts( .handle_row_inserts(requests, ctx, accommodate_existing_schema, is_single_value)
requests,
ctx,
self.statement_executor.as_ref(),
accommodate_existing_schema,
is_single_value,
)
.await .await
.context(TableOperationSnafu) .context(TableOperationSnafu)
} }
@@ -441,10 +435,7 @@ impl Instance {
) -> Result<Output> { ) -> Result<Output> {
self.inserter self.inserter
.handle_last_non_null_inserts( .handle_last_non_null_inserts(
requests, requests, ctx, true,
ctx,
self.statement_executor.as_ref(),
true,
// Influx protocol may writes multiple fields (values). // Influx protocol may writes multiple fields (values).
false, false,
) )
@@ -460,7 +451,7 @@ impl Instance {
physical_table: String, physical_table: String,
) -> Result<Output> { ) -> Result<Output> {
self.inserter self.inserter
.handle_metric_row_inserts(requests, ctx, &self.statement_executor, physical_table) .handle_metric_row_inserts(requests, ctx, physical_table)
.await .await
.context(TableOperationSnafu) .context(TableOperationSnafu)
} }

View File

@@ -135,7 +135,7 @@ impl Instance {
}; };
self.inserter self.inserter
.handle_log_inserts(log, ctx, self.statement_executor.as_ref()) .handle_log_inserts(log, ctx)
.await .await
.map_err(BoxedError::new) .map_err(BoxedError::new)
.context(ExecuteGrpcRequestSnafu) .context(ExecuteGrpcRequestSnafu)
@@ -157,7 +157,7 @@ impl Instance {
}; };
self.inserter self.inserter
.handle_trace_inserts(rows, ctx, self.statement_executor.as_ref()) .handle_trace_inserts(rows, ctx)
.await .await
.map_err(BoxedError::new) .map_err(BoxedError::new)
.context(ExecuteGrpcRequestSnafu) .context(ExecuteGrpcRequestSnafu)

View File

@@ -28,7 +28,6 @@ use common_query::Output;
use common_recordbatch::RecordBatches; use common_recordbatch::RecordBatches;
use common_telemetry::{debug, tracing}; use common_telemetry::{debug, tracing};
use operator::insert::InserterRef; use operator::insert::InserterRef;
use operator::statement::StatementExecutor;
use prost::Message; use prost::Message;
use servers::error::{self, AuthSnafu, InFlightWriteBytesExceededSnafu, Result as ServerResult}; use servers::error::{self, AuthSnafu, InFlightWriteBytesExceededSnafu, Result as ServerResult};
use servers::http::header::{collect_plan_metrics, CONTENT_ENCODING_SNAPPY, CONTENT_TYPE_PROTOBUF}; use servers::http::header::{collect_plan_metrics, CONTENT_ENCODING_SNAPPY, CONTENT_TYPE_PROTOBUF};
@@ -271,18 +270,11 @@ impl PromStoreProtocolHandler for Instance {
/// so only implement `PromStoreProtocolHandler::write` method. /// so only implement `PromStoreProtocolHandler::write` method.
pub struct ExportMetricHandler { pub struct ExportMetricHandler {
inserter: InserterRef, inserter: InserterRef,
statement_executor: Arc<StatementExecutor>,
} }
impl ExportMetricHandler { impl ExportMetricHandler {
pub fn new_handler( pub fn new_handler(inserter: InserterRef) -> PromStoreProtocolHandlerRef {
inserter: InserterRef, Arc::new(Self { inserter })
statement_executor: Arc<StatementExecutor>,
) -> PromStoreProtocolHandlerRef {
Arc::new(Self {
inserter,
statement_executor,
})
} }
} }
@@ -295,12 +287,7 @@ impl PromStoreProtocolHandler for ExportMetricHandler {
_: bool, _: bool,
) -> ServerResult<Output> { ) -> ServerResult<Output> {
self.inserter self.inserter
.handle_metric_row_inserts( .handle_metric_row_inserts(request, ctx, GREPTIME_PHYSICAL_TABLE.to_string())
request,
ctx,
&self.statement_executor,
GREPTIME_PHYSICAL_TABLE.to_string(),
)
.await .await
.map_err(BoxedError::new) .map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu) .context(error::ExecuteGrpcQuerySnafu)

View File

@@ -24,6 +24,7 @@ use servers::grpc::frontend_grpc_handler::FrontendGrpcHandler;
use servers::grpc::greptime_handler::GreptimeRequestHandler; use servers::grpc::greptime_handler::GreptimeRequestHandler;
use servers::grpc::{GrpcOptions, GrpcServer}; use servers::grpc::{GrpcOptions, GrpcServer};
use servers::http::event::LogValidatorRef; use servers::http::event::LogValidatorRef;
use servers::http::prom_store::{PromBulkState, PromStoreState};
use servers::http::{HttpServer, HttpServerBuilder}; use servers::http::{HttpServer, HttpServerBuilder};
use servers::interceptor::LogIngestInterceptorRef; use servers::interceptor::LogIngestInterceptorRef;
use servers::metrics_handler::MetricsHandler; use servers::metrics_handler::MetricsHandler;
@@ -95,13 +96,30 @@ where
} }
if opts.prom_store.enable { if opts.prom_store.enable {
let bulk_state = if opts.prom_store.bulk_mode {
let mut state = PromBulkState {
schema_helper: self.instance.create_schema_helper(),
partition_manager: self.instance.partition_manager().clone(),
node_manager: self.instance.node_manager().clone(),
access_layer_factory: self.instance.access_layer_factory().clone(),
tx: None,
};
state.start_background_task();
Some(state)
} else {
None
};
let state = PromStoreState {
prom_store_handler: self.instance.clone(),
pipeline_handler: Some(self.instance.clone()),
prom_store_with_metric_engine: opts.prom_store.with_metric_engine,
prom_validation_mode: opts.http.prom_validation_mode,
bulk_state,
};
builder = builder builder = builder
.with_prom_handler( .with_prom_handler(state)
self.instance.clone(),
Some(self.instance.clone()),
opts.prom_store.with_metric_engine,
opts.http.prom_validation_mode,
)
.with_prometheus_handler(self.instance.clone()); .with_prometheus_handler(self.instance.clone());
} }

View File

@@ -18,6 +18,7 @@ use serde::{Deserialize, Serialize};
pub struct PromStoreOptions { pub struct PromStoreOptions {
pub enable: bool, pub enable: bool,
pub with_metric_engine: bool, pub with_metric_engine: bool,
pub bulk_mode: bool,
} }
impl Default for PromStoreOptions { impl Default for PromStoreOptions {
@@ -25,6 +26,7 @@ impl Default for PromStoreOptions {
Self { Self {
enable: true, enable: true,
with_metric_engine: true, with_metric_engine: true,
bulk_mode: false,
} }
} }
} }
@@ -37,6 +39,7 @@ mod tests {
fn test_prom_store_options() { fn test_prom_store_options() {
let default = PromStoreOptions::default(); let default = PromStoreOptions::default();
assert!(default.enable); assert!(default.enable);
assert!(default.with_metric_engine) assert!(default.with_metric_engine);
assert!(!default.bulk_mode);
} }
} }

View File

@@ -233,7 +233,7 @@ impl SlowQueryEventHandler {
.into(); .into();
self.inserter self.inserter
.handle_row_inserts(requests, query_ctx, &self.statement_executor, false, false) .handle_row_inserts(requests, query_ctx, false, false)
.await .await
.context(TableOperationSnafu)?; .context(TableOperationSnafu)?;

View File

@@ -8,6 +8,7 @@ license.workspace = true
workspace = true workspace = true
[dependencies] [dependencies]
ahash.workspace = true
api.workspace = true api.workspace = true
aquamarine.workspace = true aquamarine.workspace = true
async-stream.workspace = true async-stream.workspace = true

View File

@@ -147,7 +147,7 @@ impl MetricEngineInner {
fn modify_rows( fn modify_rows(
&self, &self,
physical_region_id: RegionId, physical_region_id: RegionId,
table_id: TableId, logical_table_id: TableId,
rows: &mut Rows, rows: &mut Rows,
encoding: PrimaryKeyEncoding, encoding: PrimaryKeyEncoding,
) -> Result<()> { ) -> Result<()> {
@@ -163,7 +163,9 @@ impl MetricEngineInner {
.physical_columns(); .physical_columns();
RowsIter::new(input, name_to_id) RowsIter::new(input, name_to_id)
}; };
let output = self.row_modifier.modify_rows(iter, table_id, encoding)?; let output = self
.row_modifier
.modify_rows(iter, logical_table_id, encoding)?;
*rows = output; *rows = output;
Ok(()) Ok(())
} }

View File

@@ -40,7 +40,7 @@ const TSID_HASH_SEED: u32 = 846793005;
/// ///
/// - For [`PrimaryKeyEncoding::Dense`] encoding, /// - For [`PrimaryKeyEncoding::Dense`] encoding,
/// it adds two columns(`__table_id`, `__tsid`) to the row. /// it adds two columns(`__table_id`, `__tsid`) to the row.
pub(crate) struct RowModifier { pub struct RowModifier {
codec: SparsePrimaryKeyCodec, codec: SparsePrimaryKeyCodec,
} }
@@ -52,7 +52,7 @@ impl RowModifier {
} }
/// Modify rows with the given primary key encoding. /// Modify rows with the given primary key encoding.
pub(crate) fn modify_rows( pub fn modify_rows(
&self, &self,
iter: RowsIter, iter: RowsIter,
table_id: TableId, table_id: TableId,
@@ -74,7 +74,7 @@ impl RowModifier {
let mut buffer = vec![]; let mut buffer = vec![];
for mut iter in iter.iter_mut() { for mut iter in iter.iter_mut() {
let (table_id, tsid) = self.fill_internal_columns(table_id, &iter); let (table_id, tsid) = Self::fill_internal_columns(table_id, &iter);
let mut values = Vec::with_capacity(num_output_column); let mut values = Vec::with_capacity(num_output_column);
buffer.clear(); buffer.clear();
let internal_columns = [ let internal_columns = [
@@ -135,7 +135,7 @@ impl RowModifier {
options: None, options: None,
}); });
for iter in iter.iter_mut() { for iter in iter.iter_mut() {
let (table_id, tsid) = self.fill_internal_columns(table_id, &iter); let (table_id, tsid) = Self::fill_internal_columns(table_id, &iter);
iter.row.values.push(table_id); iter.row.values.push(table_id);
iter.row.values.push(tsid); iter.row.values.push(tsid);
} }
@@ -144,7 +144,7 @@ impl RowModifier {
} }
/// Fills internal columns of a row with table name and a hash of tag values. /// Fills internal columns of a row with table name and a hash of tag values.
fn fill_internal_columns(&self, table_id: TableId, iter: &RowIter<'_>) -> (Value, Value) { pub fn fill_internal_columns(table_id: TableId, iter: &RowIter<'_>) -> (Value, Value) {
let mut hasher = TsidGenerator::default(); let mut hasher = TsidGenerator::default();
for (name, value) in iter.primary_keys_with_name() { for (name, value) in iter.primary_keys_with_name() {
// The type is checked before. So only null is ignored. // The type is checked before. So only null is ignored.
@@ -264,7 +264,7 @@ impl IterIndex {
} }
/// Iterator of rows. /// Iterator of rows.
pub(crate) struct RowsIter { pub struct RowsIter {
rows: Rows, rows: Rows,
index: IterIndex, index: IterIndex,
} }
@@ -276,7 +276,7 @@ impl RowsIter {
} }
/// Returns the iterator of rows. /// Returns the iterator of rows.
fn iter_mut(&mut self) -> impl Iterator<Item = RowIter> { pub fn iter_mut(&mut self) -> impl Iterator<Item = RowIter> {
self.rows.rows.iter_mut().map(|row| RowIter { self.rows.rows.iter_mut().map(|row| RowIter {
row, row,
index: &self.index, index: &self.index,
@@ -290,10 +290,22 @@ impl RowsIter {
.iter() .iter()
.map(|idx| std::mem::take(&mut self.rows.schema[idx.index])) .map(|idx| std::mem::take(&mut self.rows.schema[idx.index]))
} }
pub fn num_rows(&self) -> usize {
self.rows.rows.len()
}
pub fn num_columns(&self) -> usize {
self.rows.schema.len()
}
pub fn num_primary_keys(&self) -> usize {
self.index.num_primary_key_column
}
} }
/// Iterator of a row. /// Iterator of a row.
struct RowIter<'a> { pub struct RowIter<'a> {
row: &'a mut Row, row: &'a mut Row,
index: &'a IterIndex, index: &'a IterIndex,
schema: &'a Vec<ColumnSchema>, schema: &'a Vec<ColumnSchema>,
@@ -313,7 +325,7 @@ impl RowIter<'_> {
} }
/// Returns the primary keys. /// Returns the primary keys.
fn primary_keys(&self) -> impl Iterator<Item = (ColumnId, ValueRef)> { pub fn primary_keys(&self) -> impl Iterator<Item = (ColumnId, ValueRef)> {
self.index.indices[..self.index.num_primary_key_column] self.index.indices[..self.index.num_primary_key_column]
.iter() .iter()
.map(|idx| { .map(|idx| {
@@ -333,6 +345,13 @@ impl RowIter<'_> {
.iter() .iter()
.map(|idx| std::mem::take(&mut self.row.values[idx.index])) .map(|idx| std::mem::take(&mut self.row.values[idx.index]))
} }
/// Returns value at given offset.
/// # Panics
/// Panics if offset out-of-bound
pub fn value_at(&self, idx: usize) -> &Value {
&self.row.values[idx]
}
} }
#[cfg(test)] #[cfg(test)]
@@ -476,7 +495,6 @@ mod tests {
#[test] #[test]
fn test_fill_internal_columns() { fn test_fill_internal_columns() {
let name_to_column_id = test_name_to_column_id(); let name_to_column_id = test_name_to_column_id();
let encoder = RowModifier::new();
let table_id = 1025; let table_id = 1025;
let schema = test_schema(); let schema = test_schema();
let row = test_row("greptimedb", "127.0.0.1"); let row = test_row("greptimedb", "127.0.0.1");
@@ -486,7 +504,7 @@ mod tests {
}; };
let mut rows_iter = RowsIter::new(rows, &name_to_column_id); let mut rows_iter = RowsIter::new(rows, &name_to_column_id);
let row_iter = rows_iter.iter_mut().next().unwrap(); let row_iter = rows_iter.iter_mut().next().unwrap();
let (encoded_table_id, tsid) = encoder.fill_internal_columns(table_id, &row_iter); let (encoded_table_id, tsid) = RowModifier::fill_internal_columns(table_id, &row_iter);
assert_eq!(encoded_table_id, ValueData::U32Value(1025).into()); assert_eq!(encoded_table_id, ValueData::U32Value(1025).into());
assert_eq!(tsid, ValueData::U64Value(9442261431637846000).into()); assert_eq!(tsid, ValueData::U64Value(9442261431637846000).into());
@@ -514,7 +532,7 @@ mod tests {
}; };
let mut rows_iter = RowsIter::new(rows, &name_to_column_id); let mut rows_iter = RowsIter::new(rows, &name_to_column_id);
let row_iter = rows_iter.iter_mut().next().unwrap(); let row_iter = rows_iter.iter_mut().next().unwrap();
let (encoded_table_id, tsid) = encoder.fill_internal_columns(table_id, &row_iter); let (encoded_table_id, tsid) = RowModifier::fill_internal_columns(table_id, &row_iter);
assert_eq!(encoded_table_id, ValueData::U32Value(1025).into()); assert_eq!(encoded_table_id, ValueData::U32Value(1025).into());
assert_eq!(tsid, ValueData::U64Value(9442261431637846000).into()); assert_eq!(tsid, ValueData::U64Value(9442261431637846000).into());
} }

View File

@@ -41,7 +41,7 @@ pub const PARQUET_METADATA_KEY: &str = "greptime:metadata";
/// Default batch size to read parquet files. /// Default batch size to read parquet files.
pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 1024; pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 1024;
/// Default row group size for parquet files. /// Default row group size for parquet files.
pub(crate) const DEFAULT_ROW_GROUP_SIZE: usize = 100 * DEFAULT_READ_BATCH_SIZE; pub const DEFAULT_ROW_GROUP_SIZE: usize = 100 * DEFAULT_READ_BATCH_SIZE;
/// Parquet write options. /// Parquet write options.
#[derive(Debug)] #[derive(Debug)]

View File

@@ -860,6 +860,14 @@ pub enum Error {
#[snafu(implicit)] #[snafu(implicit)]
location: Location, location: Location,
}, },
#[snafu(display("Failed to decode object from json"))]
DecodeJson {
#[snafu(source)]
error: serde_json::error::Error,
#[snafu(implicit)]
location: Location,
},
} }
pub type Result<T> = std::result::Result<T, Error>; pub type Result<T> = std::result::Result<T, Error>;
@@ -991,6 +999,7 @@ impl ErrorExt for Error {
Error::InvalidProcessId { .. } => StatusCode::InvalidArguments, Error::InvalidProcessId { .. } => StatusCode::InvalidArguments,
Error::ProcessManagerMissing { .. } => StatusCode::Unexpected, Error::ProcessManagerMissing { .. } => StatusCode::Unexpected,
Error::PathNotFound { .. } => StatusCode::InvalidArguments, Error::PathNotFound { .. } => StatusCode::InvalidArguments,
Error::DecodeJson { .. } => StatusCode::Unexpected,
} }
} }

View File

@@ -22,10 +22,9 @@ use api::v1::region::{
RegionRequestHeader, RegionRequestHeader,
}; };
use api::v1::{ use api::v1::{
AlterTableExpr, ColumnDataType, ColumnSchema, CreateTableExpr, InsertRequests, AlterTableExpr, ColumnSchema, CreateTableExpr, InsertRequests, RowInsertRequest,
RowInsertRequest, RowInsertRequests, SemanticType, RowInsertRequests, SemanticType,
}; };
use catalog::CatalogManagerRef;
use client::{OutputData, OutputMeta}; use client::{OutputData, OutputMeta};
use common_catalog::consts::{ use common_catalog::consts::{
default_engine, trace_services_table_name, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN, default_engine, trace_services_table_name, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN,
@@ -35,7 +34,6 @@ use common_grpc_expr::util::ColumnExpr;
use common_meta::cache::TableFlownodeSetCacheRef; use common_meta::cache::TableFlownodeSetCacheRef;
use common_meta::node_manager::{AffectedRows, NodeManagerRef}; use common_meta::node_manager::{AffectedRows, NodeManagerRef};
use common_meta::peer::Peer; use common_meta::peer::Peer;
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use common_query::Output; use common_query::Output;
use common_telemetry::tracing_context::TracingContext; use common_telemetry::tracing_context::TracingContext;
use common_telemetry::{error, info, warn}; use common_telemetry::{error, info, warn};
@@ -49,9 +47,7 @@ use snafu::ResultExt;
use sql::partition::partition_rule_for_hexstring; use sql::partition::partition_rule_for_hexstring;
use sql::statements::create::Partitions; use sql::statements::create::Partitions;
use sql::statements::insert::Insert; use sql::statements::insert::Insert;
use store_api::metric_engine_consts::{ use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY,
};
use store_api::mito_engine_options::{APPEND_MODE_KEY, MERGE_MODE_KEY}; use store_api::mito_engine_options::{APPEND_MODE_KEY, MERGE_MODE_KEY};
use store_api::storage::{RegionId, TableId}; use store_api::storage::{RegionId, TableId};
use table::metadata::TableInfo; use table::metadata::TableInfo;
@@ -63,7 +59,7 @@ use table::table_reference::TableReference;
use table::TableRef; use table::TableRef;
use crate::error::{ use crate::error::{
CatalogSnafu, ColumnOptionsSnafu, CreatePartitionRulesSnafu, FindRegionLeaderSnafu, ColumnOptionsSnafu, CreatePartitionRulesSnafu, FindRegionLeaderSnafu,
InvalidInsertRequestSnafu, JoinTaskSnafu, RequestInsertsSnafu, Result, TableNotFoundSnafu, InvalidInsertRequestSnafu, JoinTaskSnafu, RequestInsertsSnafu, Result, TableNotFoundSnafu,
}; };
use crate::expr_helper; use crate::expr_helper;
@@ -72,10 +68,10 @@ use crate::req_convert::common::preprocess_row_insert_requests;
use crate::req_convert::insert::{ use crate::req_convert::insert::{
fill_reqs_with_impure_default, ColumnToRow, RowToRegion, StatementToRegion, TableToRegion, fill_reqs_with_impure_default, ColumnToRow, RowToRegion, StatementToRegion, TableToRegion,
}; };
use crate::statement::StatementExecutor; use crate::schema_helper::SchemaHelper;
pub struct Inserter { pub struct Inserter {
catalog_manager: CatalogManagerRef, pub(crate) schema_helper: SchemaHelper,
pub(crate) partition_manager: PartitionRuleManagerRef, pub(crate) partition_manager: PartitionRuleManagerRef,
pub(crate) node_manager: NodeManagerRef, pub(crate) node_manager: NodeManagerRef,
pub(crate) table_flownode_set_cache: TableFlownodeSetCacheRef, pub(crate) table_flownode_set_cache: TableFlownodeSetCacheRef,
@@ -85,7 +81,7 @@ pub type InserterRef = Arc<Inserter>;
/// Hint for the table type to create automatically. /// Hint for the table type to create automatically.
#[derive(Clone)] #[derive(Clone)]
enum AutoCreateTableType { pub(crate) enum AutoCreateTableType {
/// A logical table with the physical table name. /// A logical table with the physical table name.
Logical(String), Logical(String),
/// A physical table. /// A physical table.
@@ -127,27 +123,34 @@ pub struct InstantAndNormalInsertRequests {
impl Inserter { impl Inserter {
pub fn new( pub fn new(
catalog_manager: CatalogManagerRef, schema_helper: SchemaHelper,
partition_manager: PartitionRuleManagerRef, partition_manager: PartitionRuleManagerRef,
node_manager: NodeManagerRef, node_manager: NodeManagerRef,
table_flownode_set_cache: TableFlownodeSetCacheRef, table_flownode_set_cache: TableFlownodeSetCacheRef,
) -> Self { ) -> Self {
Self { Self {
catalog_manager, schema_helper,
partition_manager, partition_manager,
node_manager, node_manager,
table_flownode_set_cache, table_flownode_set_cache,
} }
} }
pub fn partition_manager(&self) -> &PartitionRuleManagerRef {
&self.partition_manager
}
pub fn node_manager(&self) -> &NodeManagerRef {
&self.node_manager
}
pub async fn handle_column_inserts( pub async fn handle_column_inserts(
&self, &self,
requests: InsertRequests, requests: InsertRequests,
ctx: QueryContextRef, ctx: QueryContextRef,
statement_executor: &StatementExecutor,
) -> Result<Output> { ) -> Result<Output> {
let row_inserts = ColumnToRow::convert(requests)?; let row_inserts = ColumnToRow::convert(requests)?;
self.handle_row_inserts(row_inserts, ctx, statement_executor, false, false) self.handle_row_inserts(row_inserts, ctx, false, false)
.await .await
} }
@@ -156,7 +159,6 @@ impl Inserter {
&self, &self,
mut requests: RowInsertRequests, mut requests: RowInsertRequests,
ctx: QueryContextRef, ctx: QueryContextRef,
statement_executor: &StatementExecutor,
accommodate_existing_schema: bool, accommodate_existing_schema: bool,
is_single_value: bool, is_single_value: bool,
) -> Result<Output> { ) -> Result<Output> {
@@ -164,7 +166,6 @@ impl Inserter {
self.handle_row_inserts_with_create_type( self.handle_row_inserts_with_create_type(
requests, requests,
ctx, ctx,
statement_executor,
AutoCreateTableType::Physical, AutoCreateTableType::Physical,
accommodate_existing_schema, accommodate_existing_schema,
is_single_value, is_single_value,
@@ -177,12 +178,10 @@ impl Inserter {
&self, &self,
requests: RowInsertRequests, requests: RowInsertRequests,
ctx: QueryContextRef, ctx: QueryContextRef,
statement_executor: &StatementExecutor,
) -> Result<Output> { ) -> Result<Output> {
self.handle_row_inserts_with_create_type( self.handle_row_inserts_with_create_type(
requests, requests,
ctx, ctx,
statement_executor,
AutoCreateTableType::Log, AutoCreateTableType::Log,
false, false,
false, false,
@@ -194,12 +193,10 @@ impl Inserter {
&self, &self,
requests: RowInsertRequests, requests: RowInsertRequests,
ctx: QueryContextRef, ctx: QueryContextRef,
statement_executor: &StatementExecutor,
) -> Result<Output> { ) -> Result<Output> {
self.handle_row_inserts_with_create_type( self.handle_row_inserts_with_create_type(
requests, requests,
ctx, ctx,
statement_executor,
AutoCreateTableType::Trace, AutoCreateTableType::Trace,
false, false,
false, false,
@@ -212,14 +209,12 @@ impl Inserter {
&self, &self,
requests: RowInsertRequests, requests: RowInsertRequests,
ctx: QueryContextRef, ctx: QueryContextRef,
statement_executor: &StatementExecutor,
accommodate_existing_schema: bool, accommodate_existing_schema: bool,
is_single_value: bool, is_single_value: bool,
) -> Result<Output> { ) -> Result<Output> {
self.handle_row_inserts_with_create_type( self.handle_row_inserts_with_create_type(
requests, requests,
ctx, ctx,
statement_executor,
AutoCreateTableType::LastNonNull, AutoCreateTableType::LastNonNull,
accommodate_existing_schema, accommodate_existing_schema,
is_single_value, is_single_value,
@@ -232,7 +227,6 @@ impl Inserter {
&self, &self,
mut requests: RowInsertRequests, mut requests: RowInsertRequests,
ctx: QueryContextRef, ctx: QueryContextRef,
statement_executor: &StatementExecutor,
create_type: AutoCreateTableType, create_type: AutoCreateTableType,
accommodate_existing_schema: bool, accommodate_existing_schema: bool,
is_single_value: bool, is_single_value: bool,
@@ -254,7 +248,6 @@ impl Inserter {
&mut requests, &mut requests,
&ctx, &ctx,
create_type, create_type,
statement_executor,
accommodate_existing_schema, accommodate_existing_schema,
is_single_value, is_single_value,
) )
@@ -280,7 +273,6 @@ impl Inserter {
&self, &self,
mut requests: RowInsertRequests, mut requests: RowInsertRequests,
ctx: QueryContextRef, ctx: QueryContextRef,
statement_executor: &StatementExecutor,
physical_table: String, physical_table: String,
) -> Result<Output> { ) -> Result<Output> {
// remove empty requests // remove empty requests
@@ -293,7 +285,8 @@ impl Inserter {
validate_column_count_match(&requests)?; validate_column_count_match(&requests)?;
// check and create physical table // check and create physical table
self.create_physical_table_on_demand(&ctx, physical_table.clone(), statement_executor) self.schema_helper
.create_metric_physical_table(&ctx, physical_table.clone())
.await?; .await?;
// check and create logical tables // check and create logical tables
@@ -305,7 +298,6 @@ impl Inserter {
&mut requests, &mut requests,
&ctx, &ctx,
AutoCreateTableType::Logical(physical_table.to_string()), AutoCreateTableType::Logical(physical_table.to_string()),
statement_executor,
true, true,
true, true,
) )
@@ -350,10 +342,13 @@ impl Inserter {
insert: &Insert, insert: &Insert,
ctx: &QueryContextRef, ctx: &QueryContextRef,
) -> Result<Output> { ) -> Result<Output> {
let (inserts, table_info) = let (inserts, table_info) = StatementToRegion::new(
StatementToRegion::new(self.catalog_manager.as_ref(), &self.partition_manager, ctx) self.schema_helper.catalog_manager().as_ref(),
.convert(insert, ctx) &self.partition_manager,
.await?; ctx,
)
.convert(insert, ctx)
.await?;
let table_infos = let table_infos =
HashMap::from_iter([(table_info.table_id(), table_info.clone())].into_iter()); HashMap::from_iter([(table_info.table_id(), table_info.clone())].into_iter());
@@ -482,7 +477,6 @@ impl Inserter {
requests: &mut RowInsertRequests, requests: &mut RowInsertRequests,
ctx: &QueryContextRef, ctx: &QueryContextRef,
auto_create_table_type: AutoCreateTableType, auto_create_table_type: AutoCreateTableType,
statement_executor: &StatementExecutor,
accommodate_existing_schema: bool, accommodate_existing_schema: bool,
is_single_value: bool, is_single_value: bool,
) -> Result<CreateAlterTableResult> { ) -> Result<CreateAlterTableResult> {
@@ -543,7 +537,7 @@ impl Inserter {
instant_table_ids.insert(table_info.table_id()); instant_table_ids.insert(table_info.table_id());
} }
table_infos.insert(table_info.table_id(), table.table_info()); table_infos.insert(table_info.table_id(), table.table_info());
if let Some(alter_expr) = self.get_alter_table_expr_on_demand( if let Some(alter_expr) = Self::get_alter_table_expr_on_demand(
req, req,
&table, &table,
ctx, ctx,
@@ -565,9 +559,7 @@ impl Inserter {
AutoCreateTableType::Logical(_) => { AutoCreateTableType::Logical(_) => {
if !create_tables.is_empty() { if !create_tables.is_empty() {
// Creates logical tables in batch. // Creates logical tables in batch.
let tables = self let tables = self.create_logical_tables(create_tables, ctx).await?;
.create_logical_tables(create_tables, ctx, statement_executor)
.await?;
for table in tables { for table in tables {
let table_info = table.table_info(); let table_info = table.table_info();
@@ -579,7 +571,7 @@ impl Inserter {
} }
if !alter_tables.is_empty() { if !alter_tables.is_empty() {
// Alter logical tables in batch. // Alter logical tables in batch.
statement_executor self.schema_helper
.alter_logical_tables(alter_tables, ctx.clone()) .alter_logical_tables(alter_tables, ctx.clone())
.await?; .await?;
} }
@@ -590,9 +582,7 @@ impl Inserter {
// note that auto create table shouldn't be ttl instant table // note that auto create table shouldn't be ttl instant table
// for it's a very unexpected behavior and should be set by user explicitly // for it's a very unexpected behavior and should be set by user explicitly
for create_table in create_tables { for create_table in create_tables {
let table = self let table = self.create_physical_table(create_table, None, ctx).await?;
.create_physical_table(create_table, None, ctx, statement_executor)
.await?;
let table_info = table.table_info(); let table_info = table.table_info();
if table_info.is_ttl_instant_table() { if table_info.is_ttl_instant_table() {
instant_table_ids.insert(table_info.table_id()); instant_table_ids.insert(table_info.table_id());
@@ -600,8 +590,8 @@ impl Inserter {
table_infos.insert(table_info.table_id(), table.table_info()); table_infos.insert(table_info.table_id(), table.table_info());
} }
for alter_expr in alter_tables.into_iter() { for alter_expr in alter_tables.into_iter() {
statement_executor self.schema_helper
.alter_table_inner(alter_expr, ctx.clone()) .alter_table_by_expr(alter_expr, ctx.clone())
.await?; .await?;
} }
} }
@@ -619,9 +609,7 @@ impl Inserter {
create_table create_table
.table_options .table_options
.insert(APPEND_MODE_KEY.to_string(), "false".to_string()); .insert(APPEND_MODE_KEY.to_string(), "false".to_string());
let table = self let table = self.create_physical_table(create_table, None, ctx).await?;
.create_physical_table(create_table, None, ctx, statement_executor)
.await?;
let table_info = table.table_info(); let table_info = table.table_info();
if table_info.is_ttl_instant_table() { if table_info.is_ttl_instant_table() {
instant_table_ids.insert(table_info.table_id()); instant_table_ids.insert(table_info.table_id());
@@ -662,12 +650,7 @@ impl Inserter {
); );
let table = self let table = self
.create_physical_table( .create_physical_table(create_table, Some(partitions), ctx)
create_table,
Some(partitions),
ctx,
statement_executor,
)
.await?; .await?;
let table_info = table.table_info(); let table_info = table.table_info();
if table_info.is_ttl_instant_table() { if table_info.is_ttl_instant_table() {
@@ -677,8 +660,8 @@ impl Inserter {
} }
} }
for alter_expr in alter_tables.into_iter() { for alter_expr in alter_tables.into_iter() {
statement_executor self.schema_helper
.alter_table_inner(alter_expr, ctx.clone()) .alter_table_by_expr(alter_expr, ctx.clone())
.await?; .await?;
} }
} }
@@ -690,79 +673,13 @@ impl Inserter {
}) })
} }
async fn create_physical_table_on_demand(
&self,
ctx: &QueryContextRef,
physical_table: String,
statement_executor: &StatementExecutor,
) -> Result<()> {
let catalog_name = ctx.current_catalog();
let schema_name = ctx.current_schema();
// check if exist
if self
.get_table(catalog_name, &schema_name, &physical_table)
.await?
.is_some()
{
return Ok(());
}
let table_reference = TableReference::full(catalog_name, &schema_name, &physical_table);
info!("Physical metric table `{table_reference}` does not exist, try creating table");
// schema with timestamp and field column
let default_schema = vec![
ColumnSchema {
column_name: GREPTIME_TIMESTAMP.to_string(),
datatype: ColumnDataType::TimestampMillisecond as _,
semantic_type: SemanticType::Timestamp as _,
datatype_extension: None,
options: None,
},
ColumnSchema {
column_name: GREPTIME_VALUE.to_string(),
datatype: ColumnDataType::Float64 as _,
semantic_type: SemanticType::Field as _,
datatype_extension: None,
options: None,
},
];
let create_table_expr =
&mut build_create_table_expr(&table_reference, &default_schema, default_engine())?;
create_table_expr.engine = METRIC_ENGINE_NAME.to_string();
create_table_expr
.table_options
.insert(PHYSICAL_TABLE_METADATA_KEY.to_string(), "true".to_string());
// create physical table
let res = statement_executor
.create_table_inner(create_table_expr, None, ctx.clone())
.await;
match res {
Ok(_) => {
info!("Successfully created table {table_reference}",);
Ok(())
}
Err(err) => {
error!(err; "Failed to create table {table_reference}");
Err(err)
}
}
}
async fn get_table( async fn get_table(
&self, &self,
catalog: &str, catalog: &str,
schema: &str, schema: &str,
table: &str, table: &str,
) -> Result<Option<TableRef>> { ) -> Result<Option<TableRef>> {
self.catalog_manager self.schema_helper.get_table(catalog, schema, table).await
.table(catalog, schema, table, None)
.await
.context(CatalogSnafu)
} }
fn get_create_table_expr_on_demand( fn get_create_table_expr_on_demand(
@@ -771,38 +688,9 @@ impl Inserter {
create_type: &AutoCreateTableType, create_type: &AutoCreateTableType,
ctx: &QueryContextRef, ctx: &QueryContextRef,
) -> Result<CreateTableExpr> { ) -> Result<CreateTableExpr> {
let mut table_options = Vec::with_capacity(4);
for key in VALID_TABLE_OPTION_KEYS {
if let Some(value) = ctx.extension(key) {
table_options.push((key, value));
}
}
let mut engine_name = default_engine(); let mut engine_name = default_engine();
match create_type { if matches!(create_type, AutoCreateTableType::Logical(_)) {
AutoCreateTableType::Logical(physical_table) => { engine_name = METRIC_ENGINE_NAME;
engine_name = METRIC_ENGINE_NAME;
table_options.push((LOGICAL_TABLE_METADATA_KEY, physical_table));
}
AutoCreateTableType::Physical => {
if let Some(append_mode) = ctx.extension(APPEND_MODE_KEY) {
table_options.push((APPEND_MODE_KEY, append_mode));
}
if let Some(merge_mode) = ctx.extension(MERGE_MODE_KEY) {
table_options.push((MERGE_MODE_KEY, merge_mode));
}
}
// Set append_mode to true for log table.
// because log tables should keep rows with the same ts and tags.
AutoCreateTableType::Log => {
table_options.push((APPEND_MODE_KEY, "true"));
}
AutoCreateTableType::LastNonNull => {
table_options.push((MERGE_MODE_KEY, "last_non_null"));
}
AutoCreateTableType::Trace => {
table_options.push((APPEND_MODE_KEY, "true"));
}
} }
let schema = ctx.current_schema(); let schema = ctx.current_schema();
@@ -813,11 +701,9 @@ impl Inserter {
build_create_table_expr(&table_ref, request_schema, engine_name)?; build_create_table_expr(&table_ref, request_schema, engine_name)?;
info!("Table `{table_ref}` does not exist, try creating table"); info!("Table `{table_ref}` does not exist, try creating table");
for (k, v) in table_options {
create_table_expr // Use the common fill_table_options_for_create function to populate table options
.table_options fill_table_options_for_create(&mut create_table_expr.table_options, create_type, ctx);
.insert(k.to_string(), v.to_string());
}
Ok(create_table_expr) Ok(create_table_expr)
} }
@@ -830,7 +716,6 @@ impl Inserter {
/// When `accommodate_existing_schema` is true and `is_single_value` is true, it also consider fields when modifying the /// When `accommodate_existing_schema` is true and `is_single_value` is true, it also consider fields when modifying the
/// input `req`. /// input `req`.
fn get_alter_table_expr_on_demand( fn get_alter_table_expr_on_demand(
&self,
req: &mut RowInsertRequest, req: &mut RowInsertRequest,
table: &TableRef, table: &TableRef,
ctx: &QueryContextRef, ctx: &QueryContextRef,
@@ -918,7 +803,6 @@ impl Inserter {
mut create_table_expr: CreateTableExpr, mut create_table_expr: CreateTableExpr,
partitions: Option<Partitions>, partitions: Option<Partitions>,
ctx: &QueryContextRef, ctx: &QueryContextRef,
statement_executor: &StatementExecutor,
) -> Result<TableRef> { ) -> Result<TableRef> {
{ {
let table_ref = TableReference::full( let table_ref = TableReference::full(
@@ -929,8 +813,9 @@ impl Inserter {
info!("Table `{table_ref}` does not exist, try creating table"); info!("Table `{table_ref}` does not exist, try creating table");
} }
let res = statement_executor let res = self
.create_table_inner(&mut create_table_expr, partitions, ctx.clone()) .schema_helper
.create_table_by_expr(&mut create_table_expr, partitions, ctx.clone())
.await; .await;
let table_ref = TableReference::full( let table_ref = TableReference::full(
@@ -958,9 +843,9 @@ impl Inserter {
&self, &self,
create_table_exprs: Vec<CreateTableExpr>, create_table_exprs: Vec<CreateTableExpr>,
ctx: &QueryContextRef, ctx: &QueryContextRef,
statement_executor: &StatementExecutor,
) -> Result<Vec<TableRef>> { ) -> Result<Vec<TableRef>> {
let res = statement_executor let res = self
.schema_helper
.create_logical_tables(&create_table_exprs, ctx.clone()) .create_logical_tables(&create_table_exprs, ctx.clone())
.await; .await;
@@ -1011,7 +896,49 @@ fn validate_column_count_match(requests: &RowInsertRequests) -> Result<()> {
Ok(()) Ok(())
} }
fn build_create_table_expr( /// Fill table options for a new table by create type.
pub(crate) fn fill_table_options_for_create(
table_options: &mut std::collections::HashMap<String, String>,
create_type: &AutoCreateTableType,
ctx: &QueryContextRef,
) {
for key in VALID_TABLE_OPTION_KEYS {
if let Some(value) = ctx.extension(key) {
table_options.insert(key.to_string(), value.to_string());
}
}
match create_type {
AutoCreateTableType::Logical(physical_table) => {
table_options.insert(
LOGICAL_TABLE_METADATA_KEY.to_string(),
physical_table.to_string(),
);
}
AutoCreateTableType::Physical => {
if let Some(append_mode) = ctx.extension(APPEND_MODE_KEY) {
table_options.insert(APPEND_MODE_KEY.to_string(), append_mode.to_string());
}
if let Some(merge_mode) = ctx.extension(MERGE_MODE_KEY) {
table_options.insert(MERGE_MODE_KEY.to_string(), merge_mode.to_string());
}
}
// Set append_mode to true for log table.
// because log tables should keep rows with the same ts and tags.
AutoCreateTableType::Log => {
table_options.insert(APPEND_MODE_KEY.to_string(), "true".to_string());
}
AutoCreateTableType::LastNonNull => {
table_options.insert(MERGE_MODE_KEY.to_string(), "last_non_null".to_string());
}
AutoCreateTableType::Trace => {
table_options.insert(APPEND_MODE_KEY.to_string(), "true".to_string());
}
}
}
/// Builds a [CreateTableExpr] for the given table and schema.
pub(crate) fn build_create_table_expr(
table: &TableReference, table: &TableReference,
request_schema: &[ColumnSchema], request_schema: &[ColumnSchema],
engine: &str, engine: &str,
@@ -1144,19 +1071,14 @@ mod tests {
use api::v1::{ColumnSchema as GrpcColumnSchema, RowInsertRequest, Rows, SemanticType, Value}; use api::v1::{ColumnSchema as GrpcColumnSchema, RowInsertRequest, Rows, SemanticType, Value};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_meta::cache::new_table_flownode_set_cache;
use common_meta::ddl::test_util::datanode_handler::NaiveDatanodeHandler;
use common_meta::test_util::MockDatanodeManager;
use datatypes::data_type::ConcreteDataType; use datatypes::data_type::ConcreteDataType;
use datatypes::schema::ColumnSchema; use datatypes::schema::ColumnSchema;
use moka::future::Cache;
use session::context::QueryContext; use session::context::QueryContext;
use table::dist_table::DummyDataSource; use table::dist_table::DummyDataSource;
use table::metadata::{TableInfoBuilder, TableMetaBuilder, TableType}; use table::metadata::{TableInfoBuilder, TableMetaBuilder, TableType};
use table::TableRef; use table::TableRef;
use super::*; use super::*;
use crate::tests::{create_partition_rule_manager, prepare_mocked_backend};
fn make_table_ref_with_schema(ts_name: &str, field_name: &str) -> TableRef { fn make_table_ref_with_schema(ts_name: &str, field_name: &str) -> TableRef {
let schema = datatypes::schema::SchemaBuilder::try_from_columns(vec![ let schema = datatypes::schema::SchemaBuilder::try_from_columns(vec![
@@ -1236,20 +1158,8 @@ mod tests {
DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME,
)); ));
let kv_backend = prepare_mocked_backend().await; let alter_expr =
let inserter = Inserter::new( Inserter::get_alter_table_expr_on_demand(&mut req, &table, &ctx, true, true).unwrap();
catalog::memory::MemoryCatalogManager::new(),
create_partition_rule_manager(kv_backend.clone()).await,
Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)),
Arc::new(new_table_flownode_set_cache(
String::new(),
Cache::new(100),
kv_backend.clone(),
)),
);
let alter_expr = inserter
.get_alter_table_expr_on_demand(&mut req, &table, &ctx, true, true)
.unwrap();
assert!(alter_expr.is_none()); assert!(alter_expr.is_none());
// The request's schema should have updated names for timestamp and field columns // The request's schema should have updated names for timestamp and field columns

View File

@@ -27,6 +27,7 @@ pub mod procedure;
pub mod region_req_factory; pub mod region_req_factory;
pub mod req_convert; pub mod req_convert;
pub mod request; pub mod request;
pub mod schema_helper;
pub mod statement; pub mod statement;
pub mod table; pub mod table;
#[cfg(test)] #[cfg(test)]

View File

@@ -0,0 +1,799 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Utilities to deal with table schemas.
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::alter_table_expr::Kind;
use api::v1::region::region_request::Body;
use api::v1::region::{ListMetadataRequest, RegionRequestHeader};
use api::v1::{AlterTableExpr, ColumnDataType, ColumnSchema, CreateTableExpr, SemanticType};
use catalog::CatalogManagerRef;
use common_catalog::consts::{
default_engine, is_readonly_schema, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME,
};
use common_catalog::format_full_table_name;
use common_grpc_expr::util::ColumnExpr;
use common_meta::cache_invalidator::{CacheInvalidatorRef, Context};
use common_meta::ddl::{ExecutorContext, ProcedureExecutorRef};
use common_meta::instruction::CacheIdent;
use common_meta::key::schema_name::SchemaNameKey;
use common_meta::key::table_route::TableRouteManager;
use common_meta::key::TableMetadataManagerRef;
use common_meta::node_manager::NodeManagerRef;
use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use common_meta::rpc::router::Partition;
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use common_query::Output;
use common_telemetry::tracing;
use common_telemetry::tracing_context::TracingContext;
use futures::future;
use partition::manager::PartitionRuleManagerRef;
use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
use sql::statements::create::Partitions;
use store_api::metadata::RegionMetadata;
use store_api::metric_engine_consts::{
LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY,
};
use store_api::storage::RegionId;
use table::dist_table::DistTable;
use table::metadata::{RawTableInfo, TableId, TableInfo};
use table::table_name::TableName;
use table::table_reference::TableReference;
use table::TableRef;
use crate::error::{
CatalogSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu, DecodeJsonSnafu,
EmptyDdlExprSnafu, ExecuteDdlSnafu, FindRegionLeaderSnafu, InvalidPartitionRuleSnafu,
InvalidTableNameSnafu, InvalidateTableCacheSnafu, JoinTaskSnafu, RequestRegionSnafu, Result,
SchemaNotFoundSnafu, SchemaReadOnlySnafu, TableAlreadyExistsSnafu, TableMetadataManagerSnafu,
TableNotFoundSnafu, UnexpectedSnafu,
};
use crate::expr_helper;
use crate::insert::{build_create_table_expr, fill_table_options_for_create, AutoCreateTableType};
use crate::region_req_factory::RegionRequestFactory;
use crate::statement::ddl::{create_table_info, parse_partitions, verify_alter, NAME_PATTERN_REG};
/// Helper to query and manipulate (CREATE/ALTER) table schemas.
#[derive(Clone)]
pub struct SchemaHelper {
catalog_manager: CatalogManagerRef,
table_metadata_manager: TableMetadataManagerRef,
procedure_executor: ProcedureExecutorRef,
cache_invalidator: CacheInvalidatorRef,
}
impl SchemaHelper {
/// Creates a new [`SchemaHelper`].
pub fn new(
catalog_manager: CatalogManagerRef,
table_metadata_manager: TableMetadataManagerRef,
procedure_executor: ProcedureExecutorRef,
cache_invalidator: CacheInvalidatorRef,
) -> Self {
Self {
catalog_manager,
table_metadata_manager,
procedure_executor,
cache_invalidator,
}
}
/// Gets the table by catalog, schema and table name.
pub async fn get_table(
&self,
catalog: &str,
schema: &str,
table: &str,
) -> Result<Option<TableRef>> {
self.catalog_manager
.table(catalog, schema, table, None)
.await
.context(CatalogSnafu)
}
// TODO(yingwen): Can we create the physical table with all columns from the prometheus metrics?
/// Creates a physical table for metric engine.
///
/// If table already exists, do nothing.
pub async fn create_metric_physical_table(
&self,
ctx: &QueryContextRef,
physical_table: String,
) -> Result<()> {
let catalog_name = ctx.current_catalog();
let schema_name = ctx.current_schema();
// check if exist
if self
.get_table(catalog_name, &schema_name, &physical_table)
.await?
.is_some()
{
return Ok(());
}
let table_reference = TableReference::full(catalog_name, &schema_name, &physical_table);
common_telemetry::info!(
"Physical metric table `{table_reference}` does not exist, try creating table"
);
// schema with timestamp and field column
let default_schema = vec![
ColumnSchema {
column_name: GREPTIME_TIMESTAMP.to_string(),
datatype: ColumnDataType::TimestampMillisecond as _,
semantic_type: SemanticType::Timestamp as _,
datatype_extension: None,
options: None,
},
ColumnSchema {
column_name: GREPTIME_VALUE.to_string(),
datatype: ColumnDataType::Float64 as _,
semantic_type: SemanticType::Field as _,
datatype_extension: None,
options: None,
},
];
let create_table_expr =
&mut build_create_table_expr(&table_reference, &default_schema, default_engine())?;
create_table_expr.engine = METRIC_ENGINE_NAME.to_string();
create_table_expr
.table_options
.insert(PHYSICAL_TABLE_METADATA_KEY.to_string(), "true".to_string());
// create physical table.
// TODO(yingwen): Simplify this function. But remember to start the timer.
let res = self
.create_table_by_expr(create_table_expr, None, ctx.clone())
.await;
match res {
Ok(_) => {
common_telemetry::info!("Successfully created table {table_reference}",);
Ok(())
}
Err(err) => {
common_telemetry::error!(err; "Failed to create table {table_reference}");
Err(err)
}
}
}
/// Creates a table by [CreateTableExpr].
#[tracing::instrument(skip_all)]
pub async fn create_table_by_expr(
&self,
create_table: &mut CreateTableExpr,
partitions: Option<Partitions>,
query_ctx: QueryContextRef,
) -> Result<TableRef> {
ensure!(
!is_readonly_schema(&create_table.schema_name),
SchemaReadOnlySnafu {
name: create_table.schema_name.clone()
}
);
if create_table.engine == METRIC_ENGINE_NAME
&& create_table
.table_options
.contains_key(LOGICAL_TABLE_METADATA_KEY)
{
// Create logical tables
ensure!(
partitions.is_none(),
InvalidPartitionRuleSnafu {
reason: "logical table in metric engine should not have partition rule, it will be inherited from physical table",
}
);
self.create_logical_tables(std::slice::from_ref(create_table), query_ctx)
.await?
.into_iter()
.next()
.context(UnexpectedSnafu {
violated: "expected to create logical tables",
})
} else {
// Create other normal table
self.create_non_logic_table(create_table, partitions, query_ctx)
.await
}
}
/// Creates a non-logical table.
/// - If the schema doesn't exist, returns an error
/// - If the table already exists:
/// - If `create_if_not_exists` is true, returns the existing table
/// - If `create_if_not_exists` is false, returns an error
#[tracing::instrument(skip_all)]
pub async fn create_non_logic_table(
&self,
create_table: &mut CreateTableExpr,
partitions: Option<Partitions>,
query_ctx: QueryContextRef,
) -> Result<TableRef> {
let _timer = crate::metrics::DIST_CREATE_TABLE.start_timer();
// Check if schema exists
let schema = self
.table_metadata_manager
.schema_manager()
.get(SchemaNameKey::new(
&create_table.catalog_name,
&create_table.schema_name,
))
.await
.context(TableMetadataManagerSnafu)?;
ensure!(
schema.is_some(),
SchemaNotFoundSnafu {
schema_info: &create_table.schema_name,
}
);
// if table exists.
if let Some(table) = self
.catalog_manager
.table(
&create_table.catalog_name,
&create_table.schema_name,
&create_table.table_name,
Some(&query_ctx),
)
.await
.context(CatalogSnafu)?
{
return if create_table.create_if_not_exists {
Ok(table)
} else {
TableAlreadyExistsSnafu {
table: format_full_table_name(
&create_table.catalog_name,
&create_table.schema_name,
&create_table.table_name,
),
}
.fail()
};
}
ensure!(
NAME_PATTERN_REG.is_match(&create_table.table_name),
InvalidTableNameSnafu {
table_name: &create_table.table_name,
}
);
let table_name = TableName::new(
&create_table.catalog_name,
&create_table.schema_name,
&create_table.table_name,
);
let (partitions, partition_cols) = parse_partitions(create_table, partitions, &query_ctx)?;
let mut table_info = create_table_info(create_table, partition_cols)?;
let resp = self
.create_table_procedure(
create_table.clone(),
partitions,
table_info.clone(),
query_ctx,
)
.await?;
let table_id = resp.table_ids.into_iter().next().context(UnexpectedSnafu {
violated: "expected table_id",
})?;
common_telemetry::info!(
"Successfully created table '{table_name}' with table id {table_id}"
);
table_info.ident.table_id = table_id;
let table_info: Arc<TableInfo> =
Arc::new(table_info.try_into().context(CreateTableInfoSnafu)?);
create_table.table_id = Some(api::v1::TableId { id: table_id });
let table = DistTable::table(table_info);
Ok(table)
}
/// Creates logical tables.
#[tracing::instrument(skip_all)]
pub async fn create_logical_tables(
&self,
create_table_exprs: &[CreateTableExpr],
query_context: QueryContextRef,
) -> Result<Vec<TableRef>> {
let _timer = crate::metrics::DIST_CREATE_TABLES.start_timer();
ensure!(
!create_table_exprs.is_empty(),
EmptyDdlExprSnafu {
name: "create logic tables"
}
);
// Check table names
for create_table in create_table_exprs {
ensure!(
NAME_PATTERN_REG.is_match(&create_table.table_name),
InvalidTableNameSnafu {
table_name: &create_table.table_name,
}
);
}
let mut raw_tables_info = create_table_exprs
.iter()
.map(|create| create_table_info(create, vec![]))
.collect::<Result<Vec<_>>>()?;
let tables_data = create_table_exprs
.iter()
.cloned()
.zip(raw_tables_info.iter().cloned())
.collect::<Vec<_>>();
let resp = self
.create_logical_tables_procedure(tables_data, query_context)
.await?;
let table_ids = resp.table_ids;
ensure!(table_ids.len() == raw_tables_info.len(), CreateLogicalTablesSnafu {
reason: format!("The number of tables is inconsistent with the expected number to be created, expected: {}, actual: {}", raw_tables_info.len(), table_ids.len())
});
common_telemetry::info!("Successfully created logical tables: {:?}", table_ids);
for (i, table_info) in raw_tables_info.iter_mut().enumerate() {
table_info.ident.table_id = table_ids[i];
}
let tables_info = raw_tables_info
.into_iter()
.map(|x| x.try_into().context(CreateTableInfoSnafu))
.collect::<Result<Vec<_>>>()?;
Ok(tables_info
.into_iter()
.map(|x| DistTable::table(Arc::new(x)))
.collect())
}
/// Alters a table by [AlterTableExpr].
#[tracing::instrument(skip_all)]
pub async fn alter_table_by_expr(
&self,
expr: AlterTableExpr,
query_context: QueryContextRef,
) -> Result<Output> {
ensure!(
!is_readonly_schema(&expr.schema_name),
SchemaReadOnlySnafu {
name: expr.schema_name.clone()
}
);
let catalog_name = if expr.catalog_name.is_empty() {
DEFAULT_CATALOG_NAME.to_string()
} else {
expr.catalog_name.clone()
};
let schema_name = if expr.schema_name.is_empty() {
DEFAULT_SCHEMA_NAME.to_string()
} else {
expr.schema_name.clone()
};
let table_name = expr.table_name.clone();
let table = self
.catalog_manager
.table(
&catalog_name,
&schema_name,
&table_name,
Some(&query_context),
)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: format_full_table_name(&catalog_name, &schema_name, &table_name),
})?;
let table_id = table.table_info().ident.table_id;
let need_alter = verify_alter(table_id, table.table_info(), expr.clone())?;
if !need_alter {
return Ok(Output::new_with_affected_rows(0));
}
common_telemetry::info!(
"Table info before alter is {:?}, expr: {:?}",
table.table_info(),
expr
);
let physical_table_id = self
.table_metadata_manager
.table_route_manager()
.get_physical_table_id(table_id)
.await
.context(TableMetadataManagerSnafu)?;
let (req, invalidate_keys) = if physical_table_id == table_id {
// This is physical table
let req = SubmitDdlTaskRequest {
query_context,
task: DdlTask::new_alter_table(expr),
};
let invalidate_keys = vec![
CacheIdent::TableId(table_id),
CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
];
(req, invalidate_keys)
} else {
// This is logical table
let req = SubmitDdlTaskRequest {
query_context,
task: DdlTask::new_alter_logical_tables(vec![expr]),
};
let mut invalidate_keys = vec![
CacheIdent::TableId(physical_table_id),
CacheIdent::TableId(table_id),
CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
];
let physical_table = self
.table_metadata_manager
.table_info_manager()
.get(physical_table_id)
.await
.context(TableMetadataManagerSnafu)?
.map(|x| x.into_inner());
if let Some(physical_table) = physical_table {
let physical_table_name = TableName::new(
physical_table.table_info.catalog_name,
physical_table.table_info.schema_name,
physical_table.table_info.name,
);
invalidate_keys.push(CacheIdent::TableName(physical_table_name));
}
(req, invalidate_keys)
};
self.procedure_executor
.submit_ddl_task(&ExecutorContext::default(), req)
.await
.context(ExecuteDdlSnafu)?;
// Invalidates local cache ASAP.
self.cache_invalidator
.invalidate(&Context::default(), &invalidate_keys)
.await
.context(InvalidateTableCacheSnafu)?;
Ok(Output::new_with_affected_rows(0))
}
/// Alter logical tables.
pub async fn alter_logical_tables(
&self,
alter_table_exprs: Vec<AlterTableExpr>,
query_context: QueryContextRef,
) -> Result<Output> {
let _timer = crate::metrics::DIST_ALTER_TABLES.start_timer();
ensure!(
!alter_table_exprs.is_empty(),
EmptyDdlExprSnafu {
name: "alter logical tables"
}
);
// group by physical table id
let mut groups: HashMap<TableId, Vec<AlterTableExpr>> = HashMap::new();
for expr in alter_table_exprs {
// Get table_id from catalog_manager
let catalog = if expr.catalog_name.is_empty() {
query_context.current_catalog()
} else {
&expr.catalog_name
};
let schema = if expr.schema_name.is_empty() {
query_context.current_schema()
} else {
expr.schema_name.to_string()
};
let table_name = &expr.table_name;
let table = self
.catalog_manager
.table(catalog, &schema, table_name, Some(&query_context))
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: format_full_table_name(catalog, &schema, table_name),
})?;
let table_id = table.table_info().ident.table_id;
let physical_table_id = self
.table_metadata_manager
.table_route_manager()
.get_physical_table_id(table_id)
.await
.context(TableMetadataManagerSnafu)?;
groups.entry(physical_table_id).or_default().push(expr);
}
// Submit procedure for each physical table
let mut handles = Vec::with_capacity(groups.len());
for (_physical_table_id, exprs) in groups {
let fut = self.alter_logical_tables_procedure(exprs, query_context.clone());
handles.push(fut);
}
let _results = futures::future::try_join_all(handles).await?;
Ok(Output::new_with_affected_rows(0))
}
/// Returns the catalog manager.
pub fn catalog_manager(&self) -> &CatalogManagerRef {
&self.catalog_manager
}
/// Returns the table route manager.
pub fn table_route_manager(&self) -> &TableRouteManager {
self.table_metadata_manager.table_route_manager()
}
/// Submits a procedure to create a non-logical table.
async fn create_table_procedure(
&self,
create_table: CreateTableExpr,
partitions: Vec<Partition>,
table_info: RawTableInfo,
query_context: QueryContextRef,
) -> Result<SubmitDdlTaskResponse> {
let partitions = partitions.into_iter().map(Into::into).collect();
let request = SubmitDdlTaskRequest {
query_context,
task: DdlTask::new_create_table(create_table, partitions, table_info),
};
self.procedure_executor
.submit_ddl_task(&ExecutorContext::default(), request)
.await
.context(ExecuteDdlSnafu)
}
/// Submits a procedure to create logical tables.
async fn create_logical_tables_procedure(
&self,
tables_data: Vec<(CreateTableExpr, RawTableInfo)>,
query_context: QueryContextRef,
) -> Result<SubmitDdlTaskResponse> {
let request = SubmitDdlTaskRequest {
query_context,
task: DdlTask::new_create_logical_tables(tables_data),
};
self.procedure_executor
.submit_ddl_task(&ExecutorContext::default(), request)
.await
.context(ExecuteDdlSnafu)
}
/// Submits a procedure to alter logical tables.
async fn alter_logical_tables_procedure(
&self,
tables_data: Vec<AlterTableExpr>,
query_context: QueryContextRef,
) -> Result<SubmitDdlTaskResponse> {
let request = SubmitDdlTaskRequest {
query_context,
task: DdlTask::new_alter_logical_tables(tables_data),
};
self.procedure_executor
.submit_ddl_task(&ExecutorContext::default(), request)
.await
.context(ExecuteDdlSnafu)
}
}
/// Schema of a logical table.
pub struct LogicalSchema {
/// Name of the logical table.
pub name: String,
/// Schema of columns in the logical table.
pub columns: Vec<ColumnSchema>,
}
/// Logical table schemas.
pub struct LogicalSchemas {
/// Logical table schemas group by physical table name.
pub schemas: HashMap<String, Vec<LogicalSchema>>,
}
/// Creates or alters logical tables to match the provided schemas
/// for prometheus metrics.
pub async fn ensure_logical_tables_for_metrics(
helper: &SchemaHelper,
schemas: &LogicalSchemas,
query_ctx: &QueryContextRef,
) -> Result<()> {
let catalog_name = query_ctx.current_catalog();
let schema_name = query_ctx.current_schema();
// 1. For each physical table, creates it if it doesn't exist.
for physical_table_name in schemas.schemas.keys() {
// Check if the physical table exists and create it if it doesn't
let physical_table_opt = helper
.get_table(catalog_name, &schema_name, physical_table_name)
.await?;
if physical_table_opt.is_none() {
// Physical table doesn't exist, create it
helper
.create_metric_physical_table(query_ctx, physical_table_name.clone())
.await?;
}
}
// 2. Collects logical tables that do not exist. (CreateTableExpr)
let mut tables_to_create: Vec<CreateTableExpr> = Vec::new();
// 3. Collects alterations (columns to add) for each logical table. (AlterTableExpr)
let mut tables_to_alter: Vec<AlterTableExpr> = Vec::new();
// Process each logical table to determine if it needs to be created or altered
for (physical_table_name, logical_schemas) in &schemas.schemas {
for logical_schema in logical_schemas {
let table_name = &logical_schema.name;
// Check if the logical table exists
let table_opt = helper
.get_table(catalog_name, &schema_name, table_name)
.await?;
if let Some(existing_table) = table_opt {
// Logical table exists, determine if it needs alteration
let existing_schema = existing_table.schema();
let column_exprs = ColumnExpr::from_column_schemas(&logical_schema.columns);
let add_columns =
expr_helper::extract_add_columns_expr(&existing_schema, column_exprs)?;
let Some(add_columns) = add_columns else {
continue;
};
let alter_expr = AlterTableExpr {
catalog_name: catalog_name.to_string(),
schema_name: schema_name.clone(),
table_name: table_name.to_string(),
kind: Some(Kind::AddColumns(add_columns)),
};
tables_to_alter.push(alter_expr);
} else {
// Logical table doesn't exist, prepare for creation
// Build a CreateTableExpr from the table reference and columns
let table_ref = TableReference::full(catalog_name, &schema_name, table_name);
let mut create_expr = build_create_table_expr(
&table_ref,
&logical_schema.columns,
METRIC_ENGINE_NAME,
)?;
create_expr.create_if_not_exists = true;
let create_type = AutoCreateTableType::Logical(physical_table_name.clone());
// Fill table options.
fill_table_options_for_create(
&mut create_expr.table_options,
&create_type,
query_ctx,
);
tables_to_create.push(create_expr);
}
}
}
// 4. Creates logical tables in batch using `create_logical_tables()`.
if !tables_to_create.is_empty() {
helper
.create_logical_tables(&tables_to_create, query_ctx.clone())
.await?;
}
// 5. Alters logical tables in batch using `alter_logical_tables()`.
if !tables_to_alter.is_empty() {
helper
.alter_logical_tables(tables_to_alter, query_ctx.clone())
.await?;
}
Ok(())
}
/// Gets the list of metadatas for a list of region ids.
// TODO(yingwen): Should we return RegionMetadataRef?
pub async fn metadatas_for_region_ids(
partition_manager: &PartitionRuleManagerRef,
node_manager: &NodeManagerRef,
region_ids: &[RegionId],
ctx: &QueryContextRef,
) -> Result<Vec<Option<RegionMetadata>>> {
// Groups regions by peers.
// This map contains: peer => (ListMetadataRequest, A vec of indices of regions).
let mut request_per_region = HashMap::new();
for (index, region_id) in region_ids.iter().copied().enumerate() {
let peer = partition_manager
.find_region_leader(region_id)
.await
.context(FindRegionLeaderSnafu)?;
let request_indices = request_per_region
.entry(peer)
.or_insert_with(|| (ListMetadataRequest::default(), Vec::new()));
request_indices.0.region_ids.push(region_id.as_u64());
request_indices.1.push(index);
}
// Sends requests to datanode and waits for responses.
let tasks = request_per_region
.into_iter()
.map(|(peer, (request, indices))| {
let node_manager = node_manager.clone();
let request_factory = RegionRequestFactory::new(RegionRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
dbname: ctx.get_db_string(),
..Default::default()
});
common_runtime::spawn_global(async move {
let request = request_factory.build_request(Body::ListMetadata(request));
let resp = node_manager
.datanode(&peer)
.await
.handle(request)
.await
.context(RequestRegionSnafu)?;
let metadatas: Vec<Option<RegionMetadata>> =
serde_json::from_slice(&resp.metadata).context(DecodeJsonSnafu)?;
Ok((metadatas, indices))
})
});
let results = future::try_join_all(tasks).await.context(JoinTaskSnafu)?;
let mut output_metadatas = vec![None; region_ids.len()];
for result in results {
let (mut metadatas, indices) = result?;
ensure!(
metadatas.len() == indices.len(),
UnexpectedSnafu {
violated: format!(
"Length mismatch between request and response, expected {} metadatas, got {}",
indices.len(),
metadatas.len()
),
}
);
for index in indices {
output_metadatas[index] = metadatas[index].take();
}
}
Ok(output_metadatas)
}

View File

@@ -18,7 +18,7 @@ mod copy_query_to;
mod copy_table_from; mod copy_table_from;
mod copy_table_to; mod copy_table_to;
mod cursor; mod cursor;
mod ddl; pub(crate) mod ddl;
mod describe; mod describe;
mod dml; mod dml;
mod kill; mod kill;
@@ -102,6 +102,14 @@ pub struct StatementExecutor {
pub type StatementExecutorRef = Arc<StatementExecutor>; pub type StatementExecutorRef = Arc<StatementExecutor>;
impl StatementExecutor { impl StatementExecutor {
pub fn procedure_executor(&self) -> &ProcedureExecutorRef {
&self.procedure_executor
}
pub fn cache_invalidator(&self) -> &CacheInvalidatorRef {
&self.cache_invalidator
}
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub fn new( pub fn new(
catalog_manager: CatalogManagerRef, catalog_manager: CatalogManagerRef,

View File

@@ -26,7 +26,7 @@ use api::v1::{
}; };
use catalog::CatalogManagerRef; use catalog::CatalogManagerRef;
use chrono::Utc; use chrono::Utc;
use common_catalog::consts::{is_readonly_schema, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_catalog::consts::is_readonly_schema;
use common_catalog::{format_full_flow_name, format_full_table_name}; use common_catalog::{format_full_flow_name, format_full_table_name};
use common_error::ext::BoxedError; use common_error::ext::BoxedError;
use common_meta::cache_invalidator::Context; use common_meta::cache_invalidator::Context;
@@ -43,7 +43,7 @@ use common_meta::rpc::ddl::{
CreateFlowTask, DdlTask, DropFlowTask, DropViewTask, SubmitDdlTaskRequest, CreateFlowTask, DdlTask, DropFlowTask, DropViewTask, SubmitDdlTaskRequest,
SubmitDdlTaskResponse, SubmitDdlTaskResponse,
}; };
use common_meta::rpc::router::{Partition, Partition as MetaPartition}; use common_meta::rpc::router::Partition as MetaPartition;
use common_query::Output; use common_query::Output;
use common_telemetry::{debug, info, tracing, warn}; use common_telemetry::{debug, info, tracing, warn};
use common_time::Timezone; use common_time::Timezone;
@@ -74,7 +74,6 @@ use sql::statements::create::{
use sql::statements::sql_value_to_value; use sql::statements::sql_value_to_value;
use sql::statements::statement::Statement; use sql::statements::statement::Statement;
use sqlparser::ast::{Expr, Ident, UnaryOperator, Value as ParserValue}; use sqlparser::ast::{Expr, Ident, UnaryOperator, Value as ParserValue};
use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use table::dist_table::DistTable; use table::dist_table::DistTable;
use table::metadata::{self, RawTableInfo, RawTableMeta, TableId, TableInfo, TableType}; use table::metadata::{self, RawTableInfo, RawTableMeta, TableId, TableInfo, TableType};
@@ -84,12 +83,11 @@ use table::TableRef;
use crate::error::{ use crate::error::{
self, AlterExprToRequestSnafu, BuildDfLogicalPlanSnafu, CatalogSnafu, ColumnDataTypeSnafu, self, AlterExprToRequestSnafu, BuildDfLogicalPlanSnafu, CatalogSnafu, ColumnDataTypeSnafu,
ColumnNotFoundSnafu, ConvertSchemaSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu, ColumnNotFoundSnafu, ConvertSchemaSnafu, CreateTableInfoSnafu, DeserializePartitionSnafu,
DeserializePartitionSnafu, EmptyDdlExprSnafu, ExternalSnafu, ExtractTableNamesSnafu, ExternalSnafu, ExtractTableNamesSnafu, FlowNotFoundSnafu, InvalidPartitionRuleSnafu,
FlowNotFoundSnafu, InvalidPartitionRuleSnafu, InvalidPartitionSnafu, InvalidSqlSnafu, InvalidPartitionSnafu, InvalidSqlSnafu, InvalidViewNameSnafu, InvalidViewStmtSnafu,
InvalidTableNameSnafu, InvalidViewNameSnafu, InvalidViewStmtSnafu, ParseSqlValueSnafu, Result, ParseSqlValueSnafu, Result, SchemaInUseSnafu, SchemaNotFoundSnafu, SchemaReadOnlySnafu,
SchemaInUseSnafu, SchemaNotFoundSnafu, SchemaReadOnlySnafu, SubstraitCodecSnafu, SubstraitCodecSnafu, TableAlreadyExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu,
TableAlreadyExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu,
UnrecognizedTableOptionSnafu, ViewAlreadyExistsSnafu, UnrecognizedTableOptionSnafu, ViewAlreadyExistsSnafu,
}; };
use crate::expr_helper; use crate::expr_helper;
@@ -97,7 +95,8 @@ use crate::statement::show::create_partitions_stmt;
use crate::statement::StatementExecutor; use crate::statement::StatementExecutor;
lazy_static! { lazy_static! {
static ref NAME_PATTERN_REG: Regex = Regex::new(&format!("^{NAME_PATTERN}$")).unwrap(); /// Regex to validate table name.
pub(crate) static ref NAME_PATTERN_REG: Regex = Regex::new(&format!("^{NAME_PATTERN}$")).unwrap();
} }
impl StatementExecutor { impl StatementExecutor {
@@ -182,192 +181,10 @@ impl StatementExecutor {
partitions: Option<Partitions>, partitions: Option<Partitions>,
query_ctx: QueryContextRef, query_ctx: QueryContextRef,
) -> Result<TableRef> { ) -> Result<TableRef> {
ensure!( self.inserter
!is_readonly_schema(&create_table.schema_name), .schema_helper
SchemaReadOnlySnafu { .create_table_by_expr(create_table, partitions, query_ctx)
name: create_table.schema_name.clone()
}
);
if create_table.engine == METRIC_ENGINE_NAME
&& create_table
.table_options
.contains_key(LOGICAL_TABLE_METADATA_KEY)
{
// Create logical tables
ensure!(
partitions.is_none(),
InvalidPartitionRuleSnafu {
reason: "logical table in metric engine should not have partition rule, it will be inherited from physical table",
}
);
self.create_logical_tables(std::slice::from_ref(create_table), query_ctx)
.await?
.into_iter()
.next()
.context(error::UnexpectedSnafu {
violated: "expected to create logical tables",
})
} else {
// Create other normal table
self.create_non_logic_table(create_table, partitions, query_ctx)
.await
}
}
#[tracing::instrument(skip_all)]
pub async fn create_non_logic_table(
&self,
create_table: &mut CreateTableExpr,
partitions: Option<Partitions>,
query_ctx: QueryContextRef,
) -> Result<TableRef> {
let _timer = crate::metrics::DIST_CREATE_TABLE.start_timer();
// Check if schema exists
let schema = self
.table_metadata_manager
.schema_manager()
.get(SchemaNameKey::new(
&create_table.catalog_name,
&create_table.schema_name,
))
.await .await
.context(TableMetadataManagerSnafu)?;
ensure!(
schema.is_some(),
SchemaNotFoundSnafu {
schema_info: &create_table.schema_name,
}
);
// if table exists.
if let Some(table) = self
.catalog_manager
.table(
&create_table.catalog_name,
&create_table.schema_name,
&create_table.table_name,
Some(&query_ctx),
)
.await
.context(CatalogSnafu)?
{
return if create_table.create_if_not_exists {
Ok(table)
} else {
TableAlreadyExistsSnafu {
table: format_full_table_name(
&create_table.catalog_name,
&create_table.schema_name,
&create_table.table_name,
),
}
.fail()
};
}
ensure!(
NAME_PATTERN_REG.is_match(&create_table.table_name),
InvalidTableNameSnafu {
table_name: &create_table.table_name,
}
);
let table_name = TableName::new(
&create_table.catalog_name,
&create_table.schema_name,
&create_table.table_name,
);
let (partitions, partition_cols) = parse_partitions(create_table, partitions, &query_ctx)?;
let mut table_info = create_table_info(create_table, partition_cols)?;
let resp = self
.create_table_procedure(
create_table.clone(),
partitions,
table_info.clone(),
query_ctx,
)
.await?;
let table_id = resp
.table_ids
.into_iter()
.next()
.context(error::UnexpectedSnafu {
violated: "expected table_id",
})?;
info!("Successfully created table '{table_name}' with table id {table_id}");
table_info.ident.table_id = table_id;
let table_info: Arc<TableInfo> =
Arc::new(table_info.try_into().context(CreateTableInfoSnafu)?);
create_table.table_id = Some(api::v1::TableId { id: table_id });
let table = DistTable::table(table_info);
Ok(table)
}
#[tracing::instrument(skip_all)]
pub async fn create_logical_tables(
&self,
create_table_exprs: &[CreateTableExpr],
query_context: QueryContextRef,
) -> Result<Vec<TableRef>> {
let _timer = crate::metrics::DIST_CREATE_TABLES.start_timer();
ensure!(
!create_table_exprs.is_empty(),
EmptyDdlExprSnafu {
name: "create logic tables"
}
);
// Check table names
for create_table in create_table_exprs {
ensure!(
NAME_PATTERN_REG.is_match(&create_table.table_name),
InvalidTableNameSnafu {
table_name: &create_table.table_name,
}
);
}
let mut raw_tables_info = create_table_exprs
.iter()
.map(|create| create_table_info(create, vec![]))
.collect::<Result<Vec<_>>>()?;
let tables_data = create_table_exprs
.iter()
.cloned()
.zip(raw_tables_info.iter().cloned())
.collect::<Vec<_>>();
let resp = self
.create_logical_tables_procedure(tables_data, query_context)
.await?;
let table_ids = resp.table_ids;
ensure!(table_ids.len() == raw_tables_info.len(), CreateLogicalTablesSnafu {
reason: format!("The number of tables is inconsistent with the expected number to be created, expected: {}, actual: {}", raw_tables_info.len(), table_ids.len())
});
info!("Successfully created logical tables: {:?}", table_ids);
for (i, table_info) in raw_tables_info.iter_mut().enumerate() {
table_info.ident.table_id = table_ids[i];
}
let tables_info = raw_tables_info
.into_iter()
.map(|x| x.try_into().context(CreateTableInfoSnafu))
.collect::<Result<Vec<_>>>()?;
Ok(tables_info
.into_iter()
.map(|x| DistTable::table(Arc::new(x)))
.collect())
} }
#[cfg(feature = "enterprise")] #[cfg(feature = "enterprise")]
@@ -953,64 +770,6 @@ impl StatementExecutor {
.context(error::ExecuteDdlSnafu) .context(error::ExecuteDdlSnafu)
} }
#[tracing::instrument(skip_all)]
pub async fn alter_logical_tables(
&self,
alter_table_exprs: Vec<AlterTableExpr>,
query_context: QueryContextRef,
) -> Result<Output> {
let _timer = crate::metrics::DIST_ALTER_TABLES.start_timer();
ensure!(
!alter_table_exprs.is_empty(),
EmptyDdlExprSnafu {
name: "alter logical tables"
}
);
// group by physical table id
let mut groups: HashMap<TableId, Vec<AlterTableExpr>> = HashMap::new();
for expr in alter_table_exprs {
// Get table_id from catalog_manager
let catalog = if expr.catalog_name.is_empty() {
query_context.current_catalog()
} else {
&expr.catalog_name
};
let schema = if expr.schema_name.is_empty() {
query_context.current_schema()
} else {
expr.schema_name.to_string()
};
let table_name = &expr.table_name;
let table = self
.catalog_manager
.table(catalog, &schema, table_name, Some(&query_context))
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: format_full_table_name(catalog, &schema, table_name),
})?;
let table_id = table.table_info().ident.table_id;
let physical_table_id = self
.table_metadata_manager
.table_route_manager()
.get_physical_table_id(table_id)
.await
.context(TableMetadataManagerSnafu)?;
groups.entry(physical_table_id).or_default().push(expr);
}
// Submit procedure for each physical table
let mut handles = Vec::with_capacity(groups.len());
for (_physical_table_id, exprs) in groups {
let fut = self.alter_logical_tables_procedure(exprs, query_context.clone());
handles.push(fut);
}
let _results = futures::future::try_join_all(handles).await?;
Ok(Output::new_with_affected_rows(0))
}
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
pub async fn drop_table( pub async fn drop_table(
&self, &self,
@@ -1152,60 +911,6 @@ impl StatementExecutor {
Ok(Output::new_with_affected_rows(0)) Ok(Output::new_with_affected_rows(0))
} }
/// Verifies an alter and returns whether it is necessary to perform the alter.
///
/// # Returns
///
/// Returns true if the alter need to be porformed; otherwise, it returns false.
fn verify_alter(
&self,
table_id: TableId,
table_info: Arc<TableInfo>,
expr: AlterTableExpr,
) -> Result<bool> {
let request: AlterTableRequest = common_grpc_expr::alter_expr_to_request(table_id, expr)
.context(AlterExprToRequestSnafu)?;
let AlterTableRequest {
table_name,
alter_kind,
..
} = &request;
if let AlterKind::RenameTable { new_table_name } = alter_kind {
ensure!(
NAME_PATTERN_REG.is_match(new_table_name),
error::UnexpectedSnafu {
violated: format!("Invalid table name: {}", new_table_name)
}
);
} else if let AlterKind::AddColumns { columns } = alter_kind {
// If all the columns are marked as add_if_not_exists and they already exist in the table,
// there is no need to perform the alter.
let column_names: HashSet<_> = table_info
.meta
.schema
.column_schemas()
.iter()
.map(|schema| &schema.name)
.collect();
if columns.iter().all(|column| {
column_names.contains(&column.column_schema.name) && column.add_if_not_exists
}) {
return Ok(false);
}
}
let _ = table_info
.meta
.builder_with_alter_kind(table_name, &request.alter_kind)
.context(error::TableSnafu)?
.build()
.context(error::BuildTableMetaSnafu { table_name })?;
Ok(true)
}
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
pub async fn alter_table( pub async fn alter_table(
&self, &self,
@@ -1222,116 +927,10 @@ impl StatementExecutor {
expr: AlterTableExpr, expr: AlterTableExpr,
query_context: QueryContextRef, query_context: QueryContextRef,
) -> Result<Output> { ) -> Result<Output> {
ensure!( self.inserter
!is_readonly_schema(&expr.schema_name), .schema_helper
SchemaReadOnlySnafu { .alter_table_by_expr(expr, query_context)
name: expr.schema_name.clone()
}
);
let catalog_name = if expr.catalog_name.is_empty() {
DEFAULT_CATALOG_NAME.to_string()
} else {
expr.catalog_name.clone()
};
let schema_name = if expr.schema_name.is_empty() {
DEFAULT_SCHEMA_NAME.to_string()
} else {
expr.schema_name.clone()
};
let table_name = expr.table_name.clone();
let table = self
.catalog_manager
.table(
&catalog_name,
&schema_name,
&table_name,
Some(&query_context),
)
.await .await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: format_full_table_name(&catalog_name, &schema_name, &table_name),
})?;
let table_id = table.table_info().ident.table_id;
let need_alter = self.verify_alter(table_id, table.table_info(), expr.clone())?;
if !need_alter {
return Ok(Output::new_with_affected_rows(0));
}
info!(
"Table info before alter is {:?}, expr: {:?}",
table.table_info(),
expr
);
let physical_table_id = self
.table_metadata_manager
.table_route_manager()
.get_physical_table_id(table_id)
.await
.context(TableMetadataManagerSnafu)?;
let (req, invalidate_keys) = if physical_table_id == table_id {
// This is physical table
let req = SubmitDdlTaskRequest {
query_context,
task: DdlTask::new_alter_table(expr),
};
let invalidate_keys = vec![
CacheIdent::TableId(table_id),
CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
];
(req, invalidate_keys)
} else {
// This is logical table
let req = SubmitDdlTaskRequest {
query_context,
task: DdlTask::new_alter_logical_tables(vec![expr]),
};
let mut invalidate_keys = vec![
CacheIdent::TableId(physical_table_id),
CacheIdent::TableId(table_id),
CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
];
let physical_table = self
.table_metadata_manager
.table_info_manager()
.get(physical_table_id)
.await
.context(TableMetadataManagerSnafu)?
.map(|x| x.into_inner());
if let Some(physical_table) = physical_table {
let physical_table_name = TableName::new(
physical_table.table_info.catalog_name,
physical_table.table_info.schema_name,
physical_table.table_info.name,
);
invalidate_keys.push(CacheIdent::TableName(physical_table_name));
}
(req, invalidate_keys)
};
self.procedure_executor
.submit_ddl_task(&ExecutorContext::default(), req)
.await
.context(error::ExecuteDdlSnafu)?;
// Invalidates local cache ASAP.
self.cache_invalidator
.invalidate(&Context::default(), &invalidate_keys)
.await
.context(error::InvalidateTableCacheSnafu)?;
Ok(Output::new_with_affected_rows(0))
} }
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
@@ -1386,58 +985,6 @@ impl StatementExecutor {
Ok(Output::new_with_affected_rows(0)) Ok(Output::new_with_affected_rows(0))
} }
async fn create_table_procedure(
&self,
create_table: CreateTableExpr,
partitions: Vec<Partition>,
table_info: RawTableInfo,
query_context: QueryContextRef,
) -> Result<SubmitDdlTaskResponse> {
let partitions = partitions.into_iter().map(Into::into).collect();
let request = SubmitDdlTaskRequest {
query_context,
task: DdlTask::new_create_table(create_table, partitions, table_info),
};
self.procedure_executor
.submit_ddl_task(&ExecutorContext::default(), request)
.await
.context(error::ExecuteDdlSnafu)
}
async fn create_logical_tables_procedure(
&self,
tables_data: Vec<(CreateTableExpr, RawTableInfo)>,
query_context: QueryContextRef,
) -> Result<SubmitDdlTaskResponse> {
let request = SubmitDdlTaskRequest {
query_context,
task: DdlTask::new_create_logical_tables(tables_data),
};
self.procedure_executor
.submit_ddl_task(&ExecutorContext::default(), request)
.await
.context(error::ExecuteDdlSnafu)
}
async fn alter_logical_tables_procedure(
&self,
tables_data: Vec<AlterTableExpr>,
query_context: QueryContextRef,
) -> Result<SubmitDdlTaskResponse> {
let request = SubmitDdlTaskRequest {
query_context,
task: DdlTask::new_alter_logical_tables(tables_data),
};
self.procedure_executor
.submit_ddl_task(&ExecutorContext::default(), request)
.await
.context(error::ExecuteDdlSnafu)
}
async fn drop_table_procedure( async fn drop_table_procedure(
&self, &self,
table_name: &TableName, table_name: &TableName,
@@ -1585,8 +1132,61 @@ impl StatementExecutor {
} }
} }
/// Verifies an alter and returns whether it is necessary to perform the alter.
///
/// # Returns
///
/// Returns true if the alter need to be porformed; otherwise, it returns false.
pub(crate) fn verify_alter(
table_id: TableId,
table_info: Arc<TableInfo>,
expr: AlterTableExpr,
) -> Result<bool> {
let request: AlterTableRequest =
common_grpc_expr::alter_expr_to_request(table_id, expr).context(AlterExprToRequestSnafu)?;
let AlterTableRequest {
table_name,
alter_kind,
..
} = &request;
if let AlterKind::RenameTable { new_table_name } = alter_kind {
ensure!(
NAME_PATTERN_REG.is_match(new_table_name),
error::UnexpectedSnafu {
violated: format!("Invalid table name: {}", new_table_name)
}
);
} else if let AlterKind::AddColumns { columns } = alter_kind {
// If all the columns are marked as add_if_not_exists and they already exist in the table,
// there is no need to perform the alter.
let column_names: HashSet<_> = table_info
.meta
.schema
.column_schemas()
.iter()
.map(|schema| &schema.name)
.collect();
if columns.iter().all(|column| {
column_names.contains(&column.column_schema.name) && column.add_if_not_exists
}) {
return Ok(false);
}
}
let _ = table_info
.meta
.builder_with_alter_kind(table_name, &request.alter_kind)
.context(error::TableSnafu)?
.build()
.context(error::BuildTableMetaSnafu { table_name })?;
Ok(true)
}
/// Parse partition statement [Partitions] into [MetaPartition] and partition columns. /// Parse partition statement [Partitions] into [MetaPartition] and partition columns.
fn parse_partitions( pub(crate) fn parse_partitions(
create_table: &CreateTableExpr, create_table: &CreateTableExpr,
partitions: Option<Partitions>, partitions: Option<Partitions>,
query_ctx: &QueryContextRef, query_ctx: &QueryContextRef,
@@ -1619,7 +1219,7 @@ fn parse_partitions(
)) ))
} }
fn create_table_info( pub(crate) fn create_table_info(
create_table: &CreateTableExpr, create_table: &CreateTableExpr,
partition_columns: Vec<String>, partition_columns: Vec<String>,
) -> Result<RawTableInfo> { ) -> Result<RawTableInfo> {

View File

@@ -88,7 +88,6 @@ impl PipelineOperator {
catalog.to_string(), catalog.to_string(),
Arc::new(PipelineTable::new( Arc::new(PipelineTable::new(
self.inserter.clone(), self.inserter.clone(),
self.statement_executor.clone(),
table, table,
self.query_engine.clone(), self.query_engine.clone(),
)), )),

View File

@@ -30,7 +30,6 @@ use datatypes::timestamp::TimestampNanosecond;
use datatypes::vectors::{StringVector, TimestampNanosecondVector, Vector}; use datatypes::vectors::{StringVector, TimestampNanosecondVector, Vector};
use itertools::Itertools; use itertools::Itertools;
use operator::insert::InserterRef; use operator::insert::InserterRef;
use operator::statement::StatementExecutorRef;
use query::dataframe::DataFrame; use query::dataframe::DataFrame;
use query::QueryEngineRef; use query::QueryEngineRef;
use session::context::{QueryContextBuilder, QueryContextRef}; use session::context::{QueryContextBuilder, QueryContextRef};
@@ -61,7 +60,6 @@ pub(crate) const EMPTY_SCHEMA_NAME: &str = "";
/// Every catalog has its own pipeline table. /// Every catalog has its own pipeline table.
pub struct PipelineTable { pub struct PipelineTable {
inserter: InserterRef, inserter: InserterRef,
statement_executor: StatementExecutorRef,
table: TableRef, table: TableRef,
query_engine: QueryEngineRef, query_engine: QueryEngineRef,
cache: PipelineCache, cache: PipelineCache,
@@ -69,15 +67,9 @@ pub struct PipelineTable {
impl PipelineTable { impl PipelineTable {
/// Create a new PipelineTable. /// Create a new PipelineTable.
pub fn new( pub fn new(inserter: InserterRef, table: TableRef, query_engine: QueryEngineRef) -> Self {
inserter: InserterRef,
statement_executor: StatementExecutorRef,
table: TableRef,
query_engine: QueryEngineRef,
) -> Self {
Self { Self {
inserter, inserter,
statement_executor,
table, table,
query_engine, query_engine,
cache: PipelineCache::new(), cache: PipelineCache::new(),
@@ -232,13 +224,7 @@ impl PipelineTable {
let output = self let output = self
.inserter .inserter
.handle_row_inserts( .handle_row_inserts(requests, Self::query_ctx(&table_info), false, false)
requests,
Self::query_ctx(&table_info),
&self.statement_executor,
false,
false,
)
.await .await
.context(InsertPipelineSnafu)?; .context(InsertPipelineSnafu)?;

View File

@@ -36,6 +36,7 @@ chrono.workspace = true
common-base.workspace = true common-base.workspace = true
common-catalog.workspace = true common-catalog.workspace = true
common-config.workspace = true common-config.workspace = true
common-datasource.workspace = true
common-error.workspace = true common-error.workspace = true
common-frontend.workspace = true common-frontend.workspace = true
common-grpc.workspace = true common-grpc.workspace = true
@@ -74,11 +75,18 @@ jsonb.workspace = true
lazy_static.workspace = true lazy_static.workspace = true
log-query.workspace = true log-query.workspace = true
loki-proto.workspace = true loki-proto.workspace = true
metric-engine.workspace = true
mime_guess = "2.0" mime_guess = "2.0"
mito-codec.workspace = true
mito2.workspace = true
notify.workspace = true notify.workspace = true
object-pool = "0.5" object-pool = "0.5"
object-store.workspace = true
once_cell.workspace = true once_cell.workspace = true
openmetrics-parser = "0.4" openmetrics-parser = "0.4"
operator.workspace = true
parquet.workspace = true
partition.workspace = true
simd-json.workspace = true simd-json.workspace = true
socket2 = "0.5" socket2 = "0.5"
# use crates.io version once the following PRs is merged into the nextest release # use crates.io version once the following PRs is merged into the nextest release

View File

@@ -0,0 +1,415 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use arrow::array::{
Array, PrimitiveArray, RecordBatch, TimestampMicrosecondArray, TimestampMillisecondArray,
TimestampNanosecondArray, TimestampSecondArray,
};
use arrow::datatypes::Int64Type;
use arrow_schema::TimeUnit;
use common_datasource::parquet_writer::AsyncWriter;
use datafusion::parquet::arrow::AsyncArrowWriter;
use mito2::sst::file::{FileId, FileMeta};
use mito2::sst::parquet::{DEFAULT_ROW_GROUP_SIZE, PARQUET_METADATA_KEY};
use object_store::config::ObjectStoreConfig;
use object_store::util::{join_dir, join_path};
use object_store::ObjectStore;
use parquet::basic::{Compression, Encoding, ZstdLevel};
use parquet::file::metadata::KeyValue;
use parquet::file::properties::WriterProperties;
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::metric_engine_consts::DATA_REGION_SUBDIR;
use store_api::storage::RegionId;
use crate::batch_builder::physical_schema;
use crate::error;
type AsyncParquetWriter = AsyncArrowWriter<AsyncWriter>;
#[derive(Clone)]
pub struct AccessLayerFactory {
object_store: ObjectStore,
}
impl AccessLayerFactory {
pub async fn new(config: &ObjectStoreConfig) -> error::Result<AccessLayerFactory> {
let object_store = object_store::factory::new_raw_object_store(config, "")
.await
.context(error::ObjectStoreSnafu)?;
Ok(Self { object_store })
}
pub(crate) async fn create_sst_writer(
&self,
catalog: &str,
schema: &str,
region_metadata: RegionMetadataRef,
) -> error::Result<ParquetWriter> {
let region_dir = build_data_region_dir(catalog, schema, region_metadata.region_id);
let file_id = FileId::random();
let file_path = join_path(&region_dir, &file_id.as_parquet());
let writer = self
.object_store
.writer(&file_path)
.await
.context(error::OpendalSnafu)?;
let schema = physical_schema();
let key_value_meta = KeyValue::new(
PARQUET_METADATA_KEY.to_string(),
region_metadata.to_json().unwrap(),
);
let props = WriterProperties::builder()
.set_key_value_metadata(Some(vec![key_value_meta]))
.set_compression(Compression::ZSTD(ZstdLevel::default()))
.set_encoding(Encoding::PLAIN)
.set_max_row_group_size(DEFAULT_ROW_GROUP_SIZE)
.build();
let writer = AsyncParquetWriter::try_new(AsyncWriter::new(writer), schema, Some(props))
.context(error::ParquetSnafu)?;
Ok(ParquetWriter {
region_id: region_metadata.region_id,
file_id,
region_metadata,
writer,
timestamp_range: None,
})
}
}
pub struct ParquetWriter {
region_id: RegionId,
file_id: FileId,
region_metadata: RegionMetadataRef,
writer: AsyncParquetWriter,
timestamp_range: Option<(i64, i64)>,
}
impl ParquetWriter {
pub(crate) fn file_id(&self) -> FileId {
self.file_id
}
}
impl ParquetWriter {
pub async fn write_record_batch(
&mut self,
batch: &RecordBatch,
timestamp_range: Option<(i64, i64)>,
) -> error::Result<()> {
if let Err(e) = self.writer.write(&batch).await.context(error::ParquetSnafu) {
common_telemetry::error!(e; "Region metadata: {:?}, batch schema: {:?}", self.region_metadata, batch.schema_ref());
return Err(e);
}
let (batch_min, batch_max) =
get_or_calculate_timestamp_range(timestamp_range, batch, &self.region_metadata)?;
if let Some((min, max)) = &mut self.timestamp_range {
*min = (*min).min(batch_min);
*max = (*max).max(batch_max);
} else {
self.timestamp_range = Some((batch_min, batch_max));
};
Ok(())
}
pub async fn finish(&mut self) -> error::Result<FileMeta> {
let (min, max) = self.timestamp_range.unwrap();
let timestamp_type = self
.region_metadata
.time_index_column()
.column_schema
.data_type
.as_timestamp()
.unwrap();
let min_ts = timestamp_type.create_timestamp(min);
let max_ts = timestamp_type.create_timestamp(max);
let file_meta = self.writer.finish().await.context(error::ParquetSnafu)?;
let meta = FileMeta {
region_id: self.region_id,
file_id: self.file_id,
time_range: (min_ts, max_ts),
level: 0,
file_size: self.writer.bytes_written() as u64,
available_indexes: Default::default(),
index_file_size: 0,
num_rows: file_meta.num_rows as u64,
num_row_groups: file_meta.row_groups.len() as u64,
sequence: None, //todo(hl): use flushed sequence here.
};
Ok(meta)
}
}
/// Builds the data region subdir for metric physical tables.
fn build_data_region_dir(catalog: &str, schema: &str, physical_region_id: RegionId) -> String {
let storage_path = common_meta::ddl::utils::region_storage_path(&catalog, &schema);
join_dir(
&store_api::path_utils::region_dir(&storage_path, physical_region_id),
DATA_REGION_SUBDIR,
)
}
fn get_or_calculate_timestamp_range(
timestamp_range: Option<(i64, i64)>,
rb: &RecordBatch,
region_metadata: &RegionMetadataRef,
) -> error::Result<(i64, i64)> {
if let Some(range) = timestamp_range {
return Ok(range);
};
let ts = rb
.column_by_name(&region_metadata.time_index_column().column_schema.name)
.expect("column not found");
let arrow::datatypes::DataType::Timestamp(unit, _) = ts.data_type() else {
unreachable!("expected timestamp types");
};
let primitives: PrimitiveArray<Int64Type> = match unit {
TimeUnit::Second => ts
.as_any()
.downcast_ref::<TimestampSecondArray>()
.unwrap()
.reinterpret_cast(),
TimeUnit::Millisecond => ts
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap()
.reinterpret_cast(),
TimeUnit::Microsecond => ts
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.unwrap()
.reinterpret_cast(),
TimeUnit::Nanosecond => ts
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.unwrap()
.reinterpret_cast(),
};
let min = arrow::compute::min(&primitives).unwrap();
let max = arrow::compute::max(&primitives).unwrap();
Ok((min, max))
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use api::v1::SemanticType;
use arrow::array::{Float64Array, StringArray};
use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use common_time::Timestamp;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use object_store::services::MemoryConfig;
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
use super::*;
#[test]
fn test_build_data_region_dir_basic() {
let result = build_data_region_dir("greptime", "public", RegionId::new(1024, 0));
assert_eq!(&result, "data/greptime/public/1024/1024_0000000000/data/");
}
fn create_test_region_metadata() -> RegionMetadataRef {
let mut builder = RegionMetadataBuilder::new(RegionId::new(1024, 0));
builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
GREPTIME_TIMESTAMP,
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 1,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
GREPTIME_VALUE,
ConcreteDataType::float64_datatype(),
true,
),
semantic_type: SemanticType::Field,
column_id: 2,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("tag", ConcreteDataType::string_datatype(), true),
semantic_type: SemanticType::Tag,
column_id: 3,
})
.primary_key(vec![3]);
let metadata = builder.build().unwrap();
Arc::new(metadata)
}
fn create_test_record_batch() -> RecordBatch {
let schema = Arc::new(Schema::new(vec![
Field::new(
GREPTIME_TIMESTAMP,
DataType::Timestamp(TimeUnit::Millisecond, None),
false,
),
Field::new(GREPTIME_VALUE, DataType::Float64, true),
Field::new("tag", DataType::Utf8, true),
]));
let timestamp_array = TimestampMillisecondArray::from(vec![1000, 2000, 3000]);
let value_array = Float64Array::from(vec![Some(10.0), None, Some(30.0)]);
let tag_array = StringArray::from(vec![Some("a"), Some("b"), Some("c")]);
RecordBatch::try_new(
schema,
vec![
Arc::new(timestamp_array),
Arc::new(value_array),
Arc::new(tag_array),
],
)
.unwrap()
}
#[tokio::test]
async fn test_parquet_writer_write_and_finish() {
let object_store = ObjectStore::from_config(MemoryConfig::default())
.unwrap()
.finish();
let factory = AccessLayerFactory { object_store };
let region_metadata = create_test_region_metadata();
let mut writer = factory
.create_sst_writer("test_catalog", "test_schema", region_metadata.clone())
.await
.unwrap();
let batch = create_test_record_batch();
// Test writing a record batch
writer.write_record_batch(&batch, None).await.unwrap();
// Test finishing the writer
let file_meta = writer.finish().await.unwrap();
assert_eq!(file_meta.region_id, RegionId::new(1024, 0));
assert_eq!(file_meta.level, 0);
assert_eq!(file_meta.num_rows, 3);
assert_eq!(file_meta.num_row_groups, 1);
assert!(file_meta.file_size > 0);
assert_eq!(file_meta.time_range.0, Timestamp::new_millisecond(1000));
assert_eq!(file_meta.time_range.1, Timestamp::new_millisecond(3000));
}
#[tokio::test]
async fn test_parquet_writer_multiple_batches() {
let object_store = ObjectStore::from_config(MemoryConfig::default())
.unwrap()
.finish();
let factory = AccessLayerFactory { object_store };
let region_metadata = create_test_region_metadata();
let mut writer = factory
.create_sst_writer("test_catalog", "test_schema", region_metadata.clone())
.await
.unwrap();
// Write first batch
let batch1 = create_test_record_batch();
writer.write_record_batch(&batch1, None).await.unwrap();
// Create second batch with different timestamp range
let schema = region_metadata.schema.arrow_schema().clone();
let timestamp_array = TimestampMillisecondArray::from(vec![4000, 5000]);
let value_array = Float64Array::from(vec![Some(40.0), Some(50.0)]);
let tag_array = StringArray::from(vec![Some("d"), Some("e")]);
let batch2 = RecordBatch::try_new(
schema,
vec![
Arc::new(timestamp_array),
Arc::new(value_array),
Arc::new(tag_array),
],
)
.unwrap();
writer.write_record_batch(&batch2, None).await.unwrap();
let file_meta = writer.finish().await.unwrap();
// Should have combined rows from both batches
assert_eq!(file_meta.num_rows, 5);
assert_eq!(file_meta.time_range.0, Timestamp::new_millisecond(1000));
assert_eq!(file_meta.time_range.1, Timestamp::new_millisecond(5000));
}
#[tokio::test]
async fn test_parquet_writer_with_provided_timestamp_range() {
let object_store = ObjectStore::from_config(MemoryConfig::default())
.unwrap()
.finish();
let factory = AccessLayerFactory { object_store };
let region_metadata = create_test_region_metadata();
let mut writer = factory
.create_sst_writer("test_catalog", "test_schema", region_metadata.clone())
.await
.unwrap();
let batch = create_test_record_batch();
// Provide explicit timestamp range that differs from actual data
let provided_range = (500, 6000);
writer
.write_record_batch(&batch, Some(provided_range))
.await
.unwrap();
let file_meta = writer.finish().await.unwrap();
assert_eq!(file_meta.time_range.0, Timestamp::new_millisecond(500));
assert_eq!(file_meta.time_range.1, Timestamp::new_millisecond(6000));
}
#[test]
fn test_get_or_calculate_timestamp_range_with_provided_range() {
let region_metadata = create_test_region_metadata();
let batch = create_test_record_batch();
let provided_range = Some((100, 200));
let result = get_or_calculate_timestamp_range(provided_range, &batch, &region_metadata);
assert!(result.is_ok());
assert_eq!(result.unwrap(), (100, 200));
}
#[test]
fn test_get_or_calculate_timestamp_range_calculated() {
let region_metadata = create_test_region_metadata();
let batch = create_test_record_batch();
let result = get_or_calculate_timestamp_range(None, &batch, &region_metadata);
assert!(result.is_ok());
assert_eq!(result.unwrap(), (1000, 3000));
}
}

View File

@@ -0,0 +1,604 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, ColumnSchema, OpType, SemanticType};
use arrow::array::{
ArrayBuilder, ArrayRef, BinaryBuilder, Float64Array, RecordBatch, TimestampMillisecondArray,
UInt64Array, UInt8Array,
};
use arrow::compute;
use arrow_schema::Field;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_meta::node_manager::NodeManagerRef;
use common_query::prelude::{GREPTIME_PHYSICAL_TABLE, GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use common_telemetry::info;
use itertools::Itertools;
use metric_engine::row_modifier::{RowModifier, RowsIter};
use mito_codec::row_converter::SparsePrimaryKeyCodec;
use operator::schema_helper::{
ensure_logical_tables_for_metrics, metadatas_for_region_ids, LogicalSchema, LogicalSchemas,
SchemaHelper,
};
use partition::manager::PartitionRuleManagerRef;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::consts::{
ReservedColumnId, OP_TYPE_COLUMN_NAME, PRIMARY_KEY_COLUMN_NAME, SEQUENCE_COLUMN_NAME,
};
use store_api::storage::{ColumnId, RegionId};
use table::metadata::TableId;
use crate::error;
use crate::prom_row_builder::{PromCtx, TableBuilder};
pub struct MetricsBatchBuilder {
schema_helper: SchemaHelper,
builders:
HashMap<String /*schema*/, HashMap<RegionId /*physical table name*/, BatchEncoder>>,
partition_manager: PartitionRuleManagerRef,
node_manager: NodeManagerRef,
}
impl MetricsBatchBuilder {
pub fn new(
schema_helper: SchemaHelper,
partition_manager: PartitionRuleManagerRef,
node_manager: NodeManagerRef,
) -> Self {
MetricsBatchBuilder {
schema_helper,
builders: Default::default(),
partition_manager,
node_manager,
}
}
/// Detected the DDL requirements according to the staged table rows.
pub async fn create_or_alter_physical_tables(
&self,
tables: &HashMap<PromCtx, HashMap<String, TableBuilder>>,
query_ctx: &QueryContextRef,
) -> error::Result<()> {
// Physical table name -> logical tables -> tags in logical table
let mut tags: HashMap<String, HashMap<String, HashSet<String>>> = HashMap::default();
let catalog = query_ctx.current_catalog();
let schema = query_ctx.current_schema();
for (ctx, tables) in tables {
for (logical_table_name, table_builder) in tables {
let physical_table_name = self
.determine_physical_table_name(
logical_table_name,
&ctx.physical_table,
catalog,
&schema,
)
.await?;
tags.entry(physical_table_name)
.or_default()
.entry(logical_table_name.clone())
.or_default()
.extend(table_builder.tags().cloned());
}
}
let logical_schemas = tags_to_logical_schemas(tags);
ensure_logical_tables_for_metrics(&self.schema_helper, &logical_schemas, query_ctx)
.await
.context(error::OperatorSnafu)?;
Ok(())
}
/// Finds physical table id for logical table.
async fn determine_physical_table_name(
&self,
logical_table_name: &str,
physical_table_name: &Option<String>,
catalog: &str,
schema: &str,
) -> error::Result<String> {
let logical_table = self
.schema_helper
.get_table(catalog, schema, logical_table_name)
.await
.context(error::OperatorSnafu)?;
if let Some(logical_table) = logical_table {
// logical table already exist, just return the physical table
let logical_table_id = logical_table.table_info().table_id();
let physical_table_id = self
.schema_helper
.table_route_manager()
.get_physical_table_id(logical_table_id)
.await
.context(error::CommonMetaSnafu)?;
let physical_table = self
.schema_helper
.catalog_manager()
.tables_by_ids(catalog, schema, &[physical_table_id])
.await
.context(error::CatalogSnafu)?
.swap_remove(0);
return Ok(physical_table.table_info().name.clone());
}
// Logical table not exist, try assign logical table to a physical table.
let physical_table_name = physical_table_name
.as_deref()
.unwrap_or(GREPTIME_PHYSICAL_TABLE);
Ok(physical_table_name.to_string())
}
/// Retrieves physical region metadata of given logical table names.
///
/// The `logical_tables` is a list of table names, each entry contains the schema name and the table name.
/// Returns the following mapping: `schema => logical table => (logical table id, region 0 metadata of the physical table)`.
pub(crate) async fn collect_physical_region_metadata(
&self,
logical_tables: &[(String, String)],
query_ctx: &QueryContextRef,
) -> error::Result<HashMap<String, HashMap<String, (TableId, RegionMetadataRef)>>> {
let catalog = query_ctx.current_catalog();
// Logical and physical table ids.
let mut table_ids = Vec::with_capacity(logical_tables.len());
let mut physical_region_ids = HashSet::new();
for (schema, table_name) in logical_tables {
let logical_table = self
.schema_helper
.get_table(catalog, schema, table_name)
.await
.context(error::OperatorSnafu)?
.context(error::TableNotFoundSnafu {
catalog,
schema: schema,
table: table_name,
})?;
let logical_table_id = logical_table.table_info().table_id();
let physical_table_id = self
.schema_helper
.table_route_manager()
.get_physical_table_id(logical_table_id)
.await
.context(error::CommonMetaSnafu)?;
table_ids.push((logical_table_id, physical_table_id));
// We only get metadata from region 0.
physical_region_ids.insert(RegionId::new(physical_table_id, 0));
}
// Batch get physical metadata.
let physical_region_ids = physical_region_ids.into_iter().collect_vec();
let region_metadatas = metadatas_for_region_ids(
&self.partition_manager,
&self.node_manager,
&physical_region_ids,
query_ctx,
)
.await
.context(error::OperatorSnafu)?;
let mut result_map: HashMap<_, HashMap<_, _>> = HashMap::new();
let region_metadatas: HashMap<_, _> = region_metadatas
.into_iter()
.flatten()
.map(|meta| (meta.region_id, Arc::new(meta)))
.collect();
for (i, (schema, table_name)) in logical_tables.iter().enumerate() {
let physical_table_id = table_ids[i].1;
let physical_region_id = RegionId::new(physical_table_id, 0);
let physical_metadata =
region_metadatas.get(&physical_region_id).with_context(|| {
error::UnexpectedResultSnafu {
reason: format!(
"Physical region metadata {} for table {} not found",
physical_region_id, table_name
),
}
})?;
match result_map.get_mut(schema) {
Some(table_map) => {
table_map.insert(
table_name.clone(),
(table_ids[i].0, physical_metadata.clone()),
);
}
None => {
let mut table_map = HashMap::new();
table_map.insert(
table_name.clone(),
(table_ids[i].0, physical_metadata.clone()),
);
result_map.insert(schema.to_string(), table_map);
}
}
}
Ok(result_map)
}
/// Builds [RecordBatch] from rows with primary key encoded.
/// Potentially we also need to modify the column name of timestamp and value field to
/// match the schema of physical tables.
/// Note:
/// Make sure all logical table and physical table are created when reach here and the mapping
/// from logical table name to physical table ref is stored in [physical_region_metadata].
pub(crate) async fn append_rows_to_batch(
&mut self,
current_catalog: Option<String>,
current_schema: Option<String>,
table_data: &mut HashMap<PromCtx, HashMap<String, TableBuilder>>,
physical_region_metadata: &HashMap<
String, /*schema name*/
HashMap<
String, /*logical table name*/
(TableId /*logical table id*/, RegionMetadataRef),
>,
>,
) -> error::Result<()> {
for (ctx, tables_in_schema) in table_data {
// use session catalog.
let catalog = current_catalog.as_deref().unwrap_or(DEFAULT_CATALOG_NAME);
// schema in PromCtx precedes session schema.
let schema = ctx
.schema
.as_deref()
.or(current_schema.as_deref())
.unwrap_or(DEFAULT_SCHEMA_NAME);
// Look up physical region metadata by schema and table name
let schema_metadata =
physical_region_metadata
.get(schema)
.context(error::TableNotFoundSnafu {
catalog,
schema,
table: "",
})?;
for (logical_table_name, table) in tables_in_schema {
let (logical_table_id, physical_table) = schema_metadata
.get(logical_table_name)
.context(error::TableNotFoundSnafu {
catalog,
schema,
table: logical_table_name,
})?;
let encoder = self
.builders
.entry(schema.to_string())
.or_default()
.entry(physical_table.region_id)
.or_insert_with(|| Self::create_sparse_encoder(&physical_table));
let name_to_id: HashMap<_, _> = physical_table
.column_metadatas
.iter()
.map(|c| (c.column_schema.name.clone(), c.column_id))
.collect();
let _ = std::mem::replace(encoder.name_to_id_mut(), name_to_id);
encoder.append_rows(*logical_table_id, std::mem::take(table))?;
}
}
Ok(())
}
/// Finishes current record batch builder and returns record batches grouped by physical table id.
pub(crate) fn finish(
self,
) -> error::Result<
HashMap<
String, /*schema name*/
HashMap<RegionId /*physical region id*/, Vec<(RecordBatch, (i64, i64))>>,
>,
> {
let mut table_batches: HashMap<String, HashMap<RegionId, Vec<(RecordBatch, (i64, i64))>>> =
HashMap::with_capacity(self.builders.len());
for (schema_name, schema_tables) in self.builders {
let schema_batches = table_batches.entry(schema_name).or_default();
for (physical_region_id, table_data) in schema_tables {
let rb = table_data.finish()?;
if !rb.is_empty() {
schema_batches
.entry(physical_region_id)
.or_default()
.extend(rb);
}
}
}
Ok(table_batches)
}
/// Creates Encoder that converts Rows into RecordBatch with primary key encoded.
fn create_sparse_encoder(physical_region_meta: &RegionMetadataRef) -> BatchEncoder {
let name_to_id: HashMap<_, _> = physical_region_meta
.column_metadatas
.iter()
.map(|c| (c.column_schema.name.clone(), c.column_id))
.collect();
BatchEncoder::new(name_to_id)
}
}
struct Columns {
encoded_primary_key_array_builder: BinaryBuilder,
timestamps: Vec<i64>,
value: Vec<f64>,
timestamp_range: Option<(i64, i64)>,
}
impl Columns {
fn pk_offset(&self) -> usize {
self.encoded_primary_key_array_builder
.offsets_slice()
.last()
.copied()
.unwrap_or(0) as usize
}
fn estimated_size(&self) -> usize {
let value_size = self.encoded_primary_key_array_builder.values_slice().len();
let offset_size = self.encoded_primary_key_array_builder.offsets_slice().len() * 4;
let validity_sze = self
.encoded_primary_key_array_builder
.validity_slice()
.map(|v| v.len())
.unwrap_or(0);
let timestamp_size = self.timestamps.len() * 8 + std::mem::size_of::<Vec<i64>>();
let val_size = self.value.len() * 8 + std::mem::size_of::<Vec<f64>>();
value_size + offset_size + validity_sze + timestamp_size + val_size + size_of::<Self>()
}
fn push(&mut self, pk: &[u8], val: f64, timestamp: i64) {
self.encoded_primary_key_array_builder.append_value(&pk);
self.value.push(val);
self.timestamps.push(timestamp);
if let Some((min, max)) = &mut self.timestamp_range {
*min = (*min).min(timestamp);
*max = (*max).max(timestamp);
} else {
self.timestamp_range = Some((timestamp, timestamp));
}
}
}
impl Default for Columns {
fn default() -> Self {
Self {
encoded_primary_key_array_builder: BinaryBuilder::with_capacity(16, 0),
timestamps: Vec::with_capacity(16),
value: Vec::with_capacity(16),
timestamp_range: None,
}
}
}
#[derive(Default)]
struct ColumnsBuilder {
columns: Vec<Columns>,
}
impl ColumnsBuilder {
fn push(&mut self, pk: &[u8], val: f64, ts: i64) {
let last = match self.columns.last_mut() {
None => {
self.columns.push(Columns::default());
self.columns.last_mut().unwrap()
}
Some(last_builder) => {
if last_builder.pk_offset() + pk.len() >= i32::MAX as usize {
info!(
"Current builder is full {}, rows: {}/{}",
last_builder.pk_offset(),
last_builder.encoded_primary_key_array_builder.len(),
last_builder.timestamps.len()
);
// Current builder is full, create a new one
self.columns.push(Columns::default());
self.columns.last_mut().unwrap()
} else {
last_builder
}
}
};
last.push(pk, val, ts);
}
}
struct BatchEncoder {
name_to_id: HashMap<String, ColumnId>,
pk_codec: SparsePrimaryKeyCodec,
columns_builder: ColumnsBuilder,
}
impl BatchEncoder {
fn new(name_to_id: HashMap<String, ColumnId>) -> BatchEncoder {
Self {
name_to_id,
pk_codec: SparsePrimaryKeyCodec::schemaless(),
columns_builder: ColumnsBuilder::default(),
}
}
pub(crate) fn estimated_size(&self) -> usize {
self.columns_builder
.columns
.iter()
.map(|v| v.estimated_size())
.sum()
}
pub(crate) fn total_rows(&self) -> usize {
self.columns_builder
.columns
.iter()
.map(|v| v.timestamps.len())
.sum()
}
pub(crate) fn name_to_id_mut(&mut self) -> &mut HashMap<String, ColumnId> {
&mut self.name_to_id
}
fn append_rows(
&mut self,
logical_table_id: TableId,
mut table_builder: TableBuilder,
) -> error::Result<()> {
// todo(hl): we can simplified the row iter because schema in TableBuilder is known (ts, val, tags...)
let row_insert_request = table_builder.as_row_insert_request("don't care".to_string());
let mut iter = RowsIter::new(row_insert_request.rows.unwrap(), &self.name_to_id);
let mut encode_buf = vec![];
for row in iter.iter_mut() {
let (table_id, ts_id) = RowModifier::fill_internal_columns(logical_table_id, &row);
let internal_columns = [
(
ReservedColumnId::table_id(),
api::helper::pb_value_to_value_ref(&table_id, &None),
),
(
ReservedColumnId::tsid(),
api::helper::pb_value_to_value_ref(&ts_id, &None),
),
];
self.pk_codec
.encode_to_vec(internal_columns.into_iter(), &mut encode_buf)
.context(error::EncodePrimaryKeySnafu)?;
self.pk_codec
.encode_to_vec(row.primary_keys(), &mut encode_buf)
.context(error::EncodePrimaryKeySnafu)?;
// safety: field values cannot be null in prom remote write
let ValueData::F64Value(val) = row.value_at(1).value_data.as_ref().unwrap() else {
return error::InvalidFieldValueTypeSnafu.fail();
};
// process timestamp and field. We already know the position of timestamps and values in [TableBuilder].
let ValueData::TimestampMillisecondValue(ts) =
// safety: timestamp values cannot be null
row.value_at(0).value_data.as_ref().unwrap()
else {
return error::InvalidTimestampValueTypeSnafu.fail();
};
self.columns_builder.push(&encode_buf, *val, *ts);
}
Ok(())
}
fn finish(self) -> error::Result<Vec<(RecordBatch, (i64, i64))>> {
if self.columns_builder.columns.is_empty() {
return Ok(vec![]);
}
let mut res = Vec::with_capacity(self.columns_builder.columns.len());
for mut columns in self.columns_builder.columns {
let num_rows = columns.timestamps.len();
let value = Float64Array::from(columns.value);
let timestamp = TimestampMillisecondArray::from(columns.timestamps);
let op_type = Arc::new(UInt8Array::from_value(OpType::Put as u8, num_rows)) as ArrayRef;
// todo: now we set sequence all to 0.
let sequence = Arc::new(UInt64Array::from_value(0, num_rows)) as ArrayRef;
let pk = columns.encoded_primary_key_array_builder.finish();
let indices = compute::sort_to_indices(&pk, None, None).context(error::ArrowSnafu)?;
// Sort arrays
let value = compute::take(&value, &indices, None).context(error::ArrowSnafu)?;
let ts = compute::take(&timestamp, &indices, None).context(error::ArrowSnafu)?;
let pk = compute::take(&pk, &indices, None).context(error::ArrowSnafu)?;
let rb =
RecordBatch::try_new(physical_schema(), vec![value, ts, pk, sequence, op_type])
.context(error::ArrowSnafu)?;
res.push((rb, columns.timestamp_range.unwrap()))
}
Ok(res)
}
}
fn tags_to_logical_schemas(
tags: HashMap<String, HashMap<String, HashSet<String>>>,
) -> LogicalSchemas {
let schemas: HashMap<String, Vec<LogicalSchema>> = tags
.into_iter()
.map(|(physical, logical_tables)| {
let schemas: Vec<_> = logical_tables
.into_iter()
.map(|(logical, tags)| {
let mut columns: Vec<_> = tags
.into_iter()
.map(|tag_name| ColumnSchema {
column_name: tag_name,
datatype: ColumnDataType::String as i32,
semantic_type: SemanticType::Tag as i32,
..Default::default()
})
.collect();
columns.push(ColumnSchema {
column_name: GREPTIME_TIMESTAMP.to_string(),
datatype: ColumnDataType::TimestampMillisecond as i32,
semantic_type: SemanticType::Timestamp as i32,
..Default::default()
});
columns.push(ColumnSchema {
column_name: GREPTIME_VALUE.to_string(),
datatype: ColumnDataType::Float64 as i32,
semantic_type: SemanticType::Field as i32,
..Default::default()
});
LogicalSchema {
name: logical,
columns,
}
})
.collect();
(physical, schemas)
})
.collect();
LogicalSchemas { schemas }
}
/// Creates the schema of output record batch.
pub fn physical_schema() -> arrow::datatypes::SchemaRef {
Arc::new(arrow::datatypes::Schema::new(vec![
Field::new(GREPTIME_VALUE, arrow::datatypes::DataType::Float64, false),
Field::new(
GREPTIME_TIMESTAMP,
arrow::datatypes::DataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
false,
),
Field::new(
PRIMARY_KEY_COLUMN_NAME,
arrow::datatypes::DataType::Binary,
false,
),
Field::new(
SEQUENCE_COLUMN_NAME,
arrow::datatypes::DataType::UInt64,
false,
),
Field::new(
OP_TYPE_COLUMN_NAME,
arrow::datatypes::DataType::UInt8,
false,
),
]))
}

View File

@@ -624,6 +624,64 @@ pub enum Error {
#[snafu(display("Unknown hint: {}", hint))] #[snafu(display("Unknown hint: {}", hint))]
UnknownHint { hint: String }, UnknownHint { hint: String },
#[snafu(display("Failed to invoke common_meta"))]
CommonMeta {
source: common_meta::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to invoke operator"))]
Operator {
source: operator::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to encode primary key"))]
EncodePrimaryKey {
source: mito_codec::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Invalid timestamp value type in row data, expected TimestampMillisecondValue"
))]
InvalidTimestampValueType {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid field value type in row data, expected F64Value"))]
InvalidFieldValueType {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to operate object store"))]
Opendal {
#[snafu(source)]
error: object_store::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to operate object store"))]
ObjectStore {
source: object_store::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to operate object store"))]
Parquet {
#[snafu(source)]
error: parquet::errors::ParquetError,
#[snafu(implicit)]
location: Location,
},
} }
pub type Result<T, E = Error> = std::result::Result<T, E>; pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -747,6 +805,15 @@ impl ErrorExt for Error {
DurationOverflow { .. } => StatusCode::InvalidArguments, DurationOverflow { .. } => StatusCode::InvalidArguments,
HandleOtelArrowRequest { .. } => StatusCode::Internal, HandleOtelArrowRequest { .. } => StatusCode::Internal,
CommonMeta { source, .. } => source.status_code(),
Operator { source, .. } => source.status_code(),
EncodePrimaryKey { source, .. } => source.status_code(),
InvalidTimestampValueType { .. } | InvalidFieldValueType { .. } => {
StatusCode::Unexpected
}
ObjectStore { source, .. } => source.status_code(),
Parquet { .. } => StatusCode::Internal,
Opendal { .. } => StatusCode::Internal,
} }
} }

View File

@@ -76,7 +76,6 @@ use crate::query_handler::sql::ServerSqlQueryHandlerRef;
use crate::query_handler::{ use crate::query_handler::{
InfluxdbLineProtocolHandlerRef, JaegerQueryHandlerRef, LogQueryHandlerRef, InfluxdbLineProtocolHandlerRef, JaegerQueryHandlerRef, LogQueryHandlerRef,
OpenTelemetryProtocolHandlerRef, OpentsdbProtocolHandlerRef, PipelineHandlerRef, OpenTelemetryProtocolHandlerRef, OpentsdbProtocolHandlerRef, PipelineHandlerRef,
PromStoreProtocolHandlerRef,
}; };
use crate::server::Server; use crate::server::Server;
@@ -566,20 +565,7 @@ impl HttpServerBuilder {
} }
} }
pub fn with_prom_handler( pub fn with_prom_handler(self, state: PromStoreState) -> Self {
self,
handler: PromStoreProtocolHandlerRef,
pipeline_handler: Option<PipelineHandlerRef>,
prom_store_with_metric_engine: bool,
prom_validation_mode: PromValidationMode,
) -> Self {
let state = PromStoreState {
prom_store_handler: handler,
pipeline_handler,
prom_store_with_metric_engine,
prom_validation_mode,
};
Self { Self {
router: self.router.nest( router: self.router.nest(
&format!("/{HTTP_API_VERSION}/prometheus"), &format!("/{HTTP_API_VERSION}/prometheus"),

View File

@@ -12,7 +12,9 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use std::time::Instant;
use api::prom_store::remote::ReadRequest; use api::prom_store::remote::ReadRequest;
use axum::body::Bytes; use axum::body::Bytes;
@@ -22,22 +24,29 @@ use axum::response::IntoResponse;
use axum::Extension; use axum::Extension;
use axum_extra::TypedHeader; use axum_extra::TypedHeader;
use common_catalog::consts::DEFAULT_SCHEMA_NAME; use common_catalog::consts::DEFAULT_SCHEMA_NAME;
use common_meta::node_manager::NodeManagerRef;
use common_query::prelude::GREPTIME_PHYSICAL_TABLE; use common_query::prelude::GREPTIME_PHYSICAL_TABLE;
use common_telemetry::tracing; use common_telemetry::{info, tracing};
use hyper::HeaderMap; use hyper::HeaderMap;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use object_pool::Pool; use object_pool::Pool;
use operator::schema_helper::SchemaHelper;
use partition::manager::PartitionRuleManagerRef;
use pipeline::util::to_pipeline_version; use pipeline::util::to_pipeline_version;
use pipeline::{ContextReq, PipelineDefinition}; use pipeline::{ContextReq, PipelineDefinition};
use prost::Message; use prost::Message;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use session::context::{Channel, QueryContext}; use session::context::{Channel, QueryContext, QueryContextRef};
use snafu::prelude::*; use snafu::prelude::*;
use tokio::sync::mpsc::Sender;
use crate::access_layer::AccessLayerFactory;
use crate::batch_builder::MetricsBatchBuilder;
use crate::error::{self, InternalSnafu, PipelineSnafu, Result}; use crate::error::{self, InternalSnafu, PipelineSnafu, Result};
use crate::http::extractor::PipelineInfo; use crate::http::extractor::PipelineInfo;
use crate::http::header::{write_cost_header_map, GREPTIME_DB_HEADER_METRICS}; use crate::http::header::{write_cost_header_map, GREPTIME_DB_HEADER_METRICS};
use crate::http::PromValidationMode; use crate::http::PromValidationMode;
use crate::prom_row_builder::{PromCtx, TableBuilder, TablesBuilder};
use crate::prom_store::{snappy_decompress, zstd_decompress}; use crate::prom_store::{snappy_decompress, zstd_decompress};
use crate::proto::{PromSeriesProcessor, PromWriteRequest}; use crate::proto::{PromSeriesProcessor, PromWriteRequest};
use crate::query_handler::{PipelineHandlerRef, PromStoreProtocolHandlerRef, PromStoreResponse}; use crate::query_handler::{PipelineHandlerRef, PromStoreProtocolHandlerRef, PromStoreResponse};
@@ -52,12 +61,174 @@ pub const DEFAULT_ENCODING: &str = "snappy";
pub const VM_ENCODING: &str = "zstd"; pub const VM_ENCODING: &str = "zstd";
pub const VM_PROTO_VERSION: &str = "1"; pub const VM_PROTO_VERSION: &str = "1";
/// Additional states for bulk write requests.
#[derive(Clone)]
pub struct PromBulkState {
pub schema_helper: SchemaHelper,
pub partition_manager: PartitionRuleManagerRef,
pub node_manager: NodeManagerRef,
pub access_layer_factory: AccessLayerFactory,
pub tx: Option<
Sender<(
QueryContextRef,
HashMap<PromCtx, HashMap<String, TableBuilder>>,
)>,
>,
}
#[derive(Clone)] #[derive(Clone)]
pub struct PromStoreState { pub struct PromStoreState {
pub prom_store_handler: PromStoreProtocolHandlerRef, pub prom_store_handler: PromStoreProtocolHandlerRef,
pub pipeline_handler: Option<PipelineHandlerRef>, pub pipeline_handler: Option<PipelineHandlerRef>,
pub prom_store_with_metric_engine: bool, pub prom_store_with_metric_engine: bool,
pub prom_validation_mode: PromValidationMode, pub prom_validation_mode: PromValidationMode,
pub bulk_state: Option<PromBulkState>,
}
impl PromBulkState {
pub fn start_background_task(&mut self) {
let (tx, mut rx) = tokio::sync::mpsc::channel::<(
QueryContextRef,
HashMap<PromCtx, HashMap<String, TableBuilder>>,
)>(16);
self.tx = Some(tx);
let schema_helper = self.schema_helper.clone();
let partition_manager = self.partition_manager.clone();
let node_manager = self.node_manager.clone();
let access_layer_factory = self.access_layer_factory.clone();
let handle = tokio::spawn(async move {
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
let start = Instant::now();
let mut batch_builder = MetricsBatchBuilder::new(
schema_helper.clone(),
partition_manager.clone(),
node_manager.clone(),
);
let mut physical_region_metadata_total = HashMap::new();
let mut num_batches = 0;
while let Some((query_context, mut tables)) = rx.recv().await {
batch_builder
.create_or_alter_physical_tables(&tables, &query_context)
.await
.unwrap();
info!(
"create_or_alter_physical_tables, elapsed time: {}ms",
start.elapsed().as_millis()
);
// Extract logical table names from tables for metadata collection
let current_schema = query_context.current_schema();
let logical_tables: Vec<(String, String)> = tables
.iter()
.flat_map(|(ctx, table_map)| {
let schema = ctx.schema.as_deref().unwrap_or(&current_schema);
table_map
.keys()
.map(|table_name| (schema.to_string(), table_name.clone()))
})
.collect();
let start = Instant::now();
// Gather all region metadata for region 0 of physical tables.
let physical_region_metadata = batch_builder
.collect_physical_region_metadata(&logical_tables, &query_context)
.await
.unwrap();
physical_region_metadata_total.extend(physical_region_metadata);
info!(
"collect_physical_region_metadata, elapsed time: {}ms",
start.elapsed().as_millis()
);
let start = Instant::now();
batch_builder
.append_rows_to_batch(
None,
None,
&mut tables,
&physical_region_metadata_total,
)
.await
.unwrap();
num_batches += 1;
info!(
"append_rows_to_batch, elapsed time: {}ms, batches: {}",
num_batches,
start.elapsed().as_millis()
);
if num_batches >= 10 {
break;
}
}
let start = Instant::now();
let record_batches = batch_builder.finish().unwrap();
let physical_region_id_to_meta = physical_region_metadata_total
.into_iter()
.map(|(schema_name, tables)| {
let region_id_to_meta = tables
.into_values()
.map(|(_, physical_region_meta)| {
(physical_region_meta.region_id, physical_region_meta)
})
.collect::<HashMap<_, _>>();
(schema_name, region_id_to_meta)
})
.collect::<HashMap<_, _>>();
info!("Finishing batches cost: {}ms", start.elapsed().as_millis());
let start = Instant::now();
let mut tables_per_schema = HashMap::with_capacity(record_batches.len());
let mut file_metas = vec![];
for (schema_name, schema_batches) in record_batches {
let tables_in_schema =
tables_per_schema.entry(schema_name.clone()).or_insert(0);
*tables_in_schema = *tables_in_schema + 1;
let schema_regions = physical_region_id_to_meta
.get(&schema_name)
.expect("physical region schema not found");
for (physical_region_id, record_batches) in schema_batches {
let physical_region_metadata = schema_regions
.get(&physical_region_id)
.expect("physical region metadata not found");
for (rb, time_range) in record_batches {
let mut writer = access_layer_factory
.create_sst_writer(
"greptime", //todo(hl): use the catalog name in query context.
&schema_name,
physical_region_metadata.clone(),
)
.await
.unwrap();
let start = Instant::now();
info!("Created writer: {}", writer.file_id());
writer
.write_record_batch(&rb, Some(time_range))
.await
.unwrap();
let file_meta = writer.finish().await.unwrap();
info!(
"Finished writer: {}, elapsed time: {}ms",
writer.file_id(),
start.elapsed().as_millis()
);
file_metas.push(file_meta);
}
}
}
info!(
"upload sst files, elapsed time: {}ms, schema num: {} tables_per_schema: {:?}, file_metas: {:?}",
start.elapsed().as_millis(),tables_per_schema.len(),tables_per_schema,file_metas
);
}
});
}
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
@@ -98,6 +269,7 @@ pub async fn remote_write(
pipeline_handler, pipeline_handler,
prom_store_with_metric_engine, prom_store_with_metric_engine,
prom_validation_mode, prom_validation_mode,
bulk_state,
} = state; } = state;
if let Some(_vm_handshake) = params.get_vm_proto_version { if let Some(_vm_handshake) = params.get_vm_proto_version {
@@ -132,6 +304,32 @@ pub async fn remote_write(
processor.set_pipeline(pipeline_handler, query_ctx.clone(), pipeline_def); processor.set_pipeline(pipeline_handler, query_ctx.clone(), pipeline_def);
} }
if let Some(state) = bulk_state {
let context = PromBulkContext {
schema_helper: state.schema_helper,
query_ctx: query_ctx.clone(),
partition_manager: state.partition_manager,
node_manager: state.node_manager,
access_layer_factory: state.access_layer_factory,
};
let builder = decode_remote_write_request_to_batch(
is_zstd,
body,
prom_validation_mode,
&mut processor,
context,
)
.await?;
state
.tx
.as_ref()
.unwrap()
.send((query_ctx, builder.tables))
.await
.unwrap();
return Ok((StatusCode::NO_CONTENT, write_cost_header_map(0)).into_response());
}
let req = let req =
decode_remote_write_request(is_zstd, body, prom_validation_mode, &mut processor).await?; decode_remote_write_request(is_zstd, body, prom_validation_mode, &mut processor).await?;
@@ -202,6 +400,15 @@ fn try_decompress(is_zstd: bool, body: &[u8]) -> Result<Bytes> {
})) }))
} }
/// Context for processing remote write requests in bulk mode.
pub struct PromBulkContext {
pub(crate) schema_helper: SchemaHelper,
pub(crate) query_ctx: QueryContextRef,
pub(crate) partition_manager: PartitionRuleManagerRef,
pub(crate) node_manager: NodeManagerRef,
pub(crate) access_layer_factory: AccessLayerFactory,
}
async fn decode_remote_write_request( async fn decode_remote_write_request(
is_zstd: bool, is_zstd: bool,
body: Bytes, body: Bytes,
@@ -236,6 +443,38 @@ async fn decode_remote_write_request(
} }
} }
async fn decode_remote_write_request_to_batch(
is_zstd: bool,
body: Bytes,
prom_validation_mode: PromValidationMode,
processor: &mut PromSeriesProcessor,
bulk: PromBulkContext,
) -> Result<TablesBuilder> {
let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_ELAPSED.start_timer();
// due to vmagent's limitation, there is a chance that vmagent is
// sending content type wrong so we have to apply a fallback with decoding
// the content in another method.
//
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5301
// see https://github.com/GreptimeTeam/greptimedb/issues/3929
let buf = if let Ok(buf) = try_decompress(is_zstd, &body[..]) {
buf
} else {
// fallback to the other compression method
try_decompress(!is_zstd, &body[..])?
};
let mut request = PROM_WRITE_REQUEST_POOL.pull(PromWriteRequest::default);
processor.use_pipeline = false;
request
.merge(buf, prom_validation_mode, processor)
.context(error::DecodePromRemoteRequestSnafu)?;
Ok(std::mem::take(&mut request.table_data))
}
async fn decode_remote_read_request(body: Bytes) -> Result<ReadRequest> { async fn decode_remote_read_request(body: Bytes) -> Result<ReadRequest> {
let buf = snappy_decompress(&body[..])?; let buf = snappy_decompress(&body[..])?;

View File

@@ -21,7 +21,10 @@
use datafusion_expr::LogicalPlan; use datafusion_expr::LogicalPlan;
use datatypes::schema::Schema; use datatypes::schema::Schema;
pub mod access_layer;
pub mod addrs; pub mod addrs;
#[allow(dead_code)]
mod batch_builder;
pub mod configurator; pub mod configurator;
pub(crate) mod elasticsearch; pub(crate) mod elasticsearch;
pub mod error; pub mod error;

View File

@@ -13,16 +13,21 @@
// limitations under the License. // limitations under the License.
use std::collections::hash_map::Entry; use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::string::ToString; use std::string::ToString;
use std::time::Instant;
use ahash::HashMap;
use api::prom_store::remote::Sample; use api::prom_store::remote::Sample;
use api::v1::value::ValueData; use api::v1::value::ValueData;
use api::v1::{ColumnDataType, ColumnSchema, Row, RowInsertRequest, Rows, SemanticType, Value}; use api::v1::{ColumnDataType, ColumnSchema, Row, RowInsertRequest, Rows, SemanticType, Value};
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE}; use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use common_telemetry::info;
use pipeline::{ContextOpt, ContextReq}; use pipeline::{ContextOpt, ContextReq};
use prost::DecodeError; use prost::DecodeError;
use crate::batch_builder::MetricsBatchBuilder;
use crate::error::Result;
use crate::http::prom_store::PromBulkContext;
use crate::http::PromValidationMode; use crate::http::PromValidationMode;
use crate::proto::{decode_string, PromLabel}; use crate::proto::{decode_string, PromLabel};
use crate::repeated_field::Clear; use crate::repeated_field::Clear;
@@ -38,7 +43,7 @@ pub struct PromCtx {
#[derive(Default, Debug)] #[derive(Default, Debug)]
pub(crate) struct TablesBuilder { pub(crate) struct TablesBuilder {
// schema -> table -> table_builder // schema -> table -> table_builder
tables: HashMap<PromCtx, HashMap<String, TableBuilder>>, pub tables: HashMap<PromCtx, HashMap<String, TableBuilder>>,
} }
impl Clear for TablesBuilder { impl Clear for TablesBuilder {
@@ -91,11 +96,113 @@ impl TablesBuilder {
req req
}) })
} }
/// Converts [TablesBuilder] to record batch and clears inner states.
pub(crate) async fn as_record_batches(&mut self, bulk_ctx: &PromBulkContext) -> Result<()> {
let mut batch_builder = MetricsBatchBuilder::new(
bulk_ctx.schema_helper.clone(),
bulk_ctx.partition_manager.clone(),
bulk_ctx.node_manager.clone(),
);
let mut tables = std::mem::take(&mut self.tables);
let start = Instant::now();
batch_builder
.create_or_alter_physical_tables(&tables, &bulk_ctx.query_ctx)
.await?;
info!(
"create_or_alter_physical_tables, elapsed time: {}ms",
start.elapsed().as_millis()
);
// Extract logical table names from tables for metadata collection
let current_schema = bulk_ctx.query_ctx.current_schema();
let logical_tables: Vec<(String, String)> = tables
.iter()
.flat_map(|(ctx, table_map)| {
let schema = ctx.schema.as_deref().unwrap_or(&current_schema);
table_map
.keys()
.map(|table_name| (schema.to_string(), table_name.clone()))
})
.collect();
let start = Instant::now();
// Gather all region metadata for region 0 of physical tables.
let physical_region_metadata = batch_builder
.collect_physical_region_metadata(&logical_tables, &bulk_ctx.query_ctx)
.await?;
info!(
"collect_physical_region_metadata, elapsed time: {}ms",
start.elapsed().as_millis()
);
let start = Instant::now();
batch_builder
.append_rows_to_batch(None, None, &mut tables, &physical_region_metadata)
.await?;
let record_batches = batch_builder.finish()?;
info!(
"append_rows_to_batch, elapsed time: {}ms",
start.elapsed().as_millis()
);
let physical_region_id_to_meta = physical_region_metadata
.into_iter()
.map(|(schema_name, tables)| {
let region_id_to_meta = tables
.into_values()
.map(|(_, physical_region_meta)| {
(physical_region_meta.region_id, physical_region_meta)
})
.collect::<HashMap<_, _>>();
(schema_name, region_id_to_meta)
})
.collect::<HashMap<_, _>>();
let start = Instant::now();
let mut tables_per_schema = HashMap::with_capacity(record_batches.len());
let mut file_metas = vec![];
for (schema_name, schema_batches) in record_batches {
let tables_in_schema = tables_per_schema.entry(schema_name.clone()).or_insert(0);
*tables_in_schema = *tables_in_schema + 1;
let schema_regions = physical_region_id_to_meta
.get(&schema_name)
.expect("physical region metadata not found");
for (physical_region_id, record_batches) in schema_batches {
let physical_region_metadata = schema_regions
.get(&physical_region_id)
.expect("physical region metadata not found");
for (rb, time_range) in record_batches {
let mut writer = bulk_ctx
.access_layer_factory
.create_sst_writer(
"greptime", //todo(hl): use the catalog name in query context.
&schema_name,
physical_region_metadata.clone(),
)
.await?;
writer.write_record_batch(&rb, Some(time_range)).await?;
let file_meta = writer.finish().await?;
file_metas.push(file_meta);
}
}
}
info!(
"upload sst files, elapsed time: {}ms, schema num: {} tables_per_schema: {:?}, file_metas: {:?}",
start.elapsed().as_millis(),
tables_per_schema.len(),
tables_per_schema, file_metas
);
Ok(())
}
} }
/// Builder for one table. /// Builder for one table.
#[derive(Debug)] #[derive(Debug, Clone)]
pub(crate) struct TableBuilder { pub struct TableBuilder {
/// Column schemas. /// Column schemas.
schema: Vec<ColumnSchema>, schema: Vec<ColumnSchema>,
/// Rows written. /// Rows written.
@@ -210,6 +317,13 @@ impl TableBuilder {
rows: Some(Rows { schema, rows }), rows: Some(Rows { schema, rows }),
} }
} }
pub(crate) fn tags(&self) -> impl Iterator<Item = &String> {
self.schema
.iter()
.filter(|v| v.semantic_type == SemanticType::Tag as i32)
.map(|c| &c.column_name)
}
} }
#[cfg(test)] #[cfg(test)]

View File

@@ -28,8 +28,9 @@ use prost::DecodeError;
use session::context::QueryContextRef; use session::context::QueryContextRef;
use snafu::OptionExt; use snafu::OptionExt;
use crate::error::InternalSnafu; use crate::error::{InternalSnafu, Result};
use crate::http::event::PipelineIngestRequest; use crate::http::event::PipelineIngestRequest;
use crate::http::prom_store::PromBulkContext;
use crate::http::PromValidationMode; use crate::http::PromValidationMode;
use crate::pipeline::run_pipeline; use crate::pipeline::run_pipeline;
use crate::prom_row_builder::{PromCtx, TablesBuilder}; use crate::prom_row_builder::{PromCtx, TablesBuilder};
@@ -283,7 +284,7 @@ pub(crate) fn decode_string(
#[derive(Default, Debug)] #[derive(Default, Debug)]
pub struct PromWriteRequest { pub struct PromWriteRequest {
table_data: TablesBuilder, pub table_data: TablesBuilder,
series: PromTimeSeries, series: PromTimeSeries,
} }
@@ -352,6 +353,11 @@ impl PromWriteRequest {
Ok(()) Ok(())
} }
/// Converts the write request into a record batch and reset the table data.
pub async fn as_record_batch(&mut self, bulk_ctx: &PromBulkContext) -> Result<()> {
self.table_data.as_record_batches(bulk_ctx).await
}
} }
/// A hook to be injected into the PromWriteRequest decoding process. /// A hook to be injected into the PromWriteRequest decoding process.

View File

@@ -28,6 +28,7 @@ use query::parser::PromQuery;
use query::query_engine::DescribeResult; use query::query_engine::DescribeResult;
use servers::error::{Error, Result}; use servers::error::{Error, Result};
use servers::http::header::{CONTENT_ENCODING_SNAPPY, CONTENT_TYPE_PROTOBUF}; use servers::http::header::{CONTENT_ENCODING_SNAPPY, CONTENT_TYPE_PROTOBUF};
use servers::http::prom_store::PromStoreState;
use servers::http::test_helpers::TestClient; use servers::http::test_helpers::TestClient;
use servers::http::{HttpOptions, HttpServerBuilder, PromValidationMode}; use servers::http::{HttpOptions, HttpServerBuilder, PromValidationMode};
use servers::prom_store; use servers::prom_store;
@@ -121,9 +122,16 @@ fn make_test_app(tx: mpsc::Sender<(String, Vec<u8>)>) -> Router {
}; };
let instance = Arc::new(DummyInstance { tx }); let instance = Arc::new(DummyInstance { tx });
let state = PromStoreState {
prom_store_handler: instance.clone(),
pipeline_handler: None,
prom_store_with_metric_engine: true,
prom_validation_mode: PromValidationMode::Unchecked,
bulk_state: None,
};
let server = HttpServerBuilder::new(http_opts) let server = HttpServerBuilder::new(http_opts)
.with_sql_handler(instance.clone()) .with_sql_handler(instance.clone())
.with_prom_handler(instance, None, true, PromValidationMode::Unchecked) .with_prom_handler(state)
.build(); .build();
server.build(server.make_app()).unwrap() server.build(server.make_app()).unwrap()
} }

View File

@@ -213,6 +213,14 @@ impl TableMeta {
.map(|(_, cs)| &cs.name) .map(|(_, cs)| &cs.name)
} }
/// Returns names of primary keys.
pub fn primary_key_names(&self) -> impl Iterator<Item = &String> {
let columns_schemas = self.schema.column_schemas();
self.primary_key_indices
.iter()
.map(|pk_idx| &columns_schemas[*pk_idx].name)
}
/// Returns the new [TableMetaBuilder] after applying given `alter_kind`. /// Returns the new [TableMetaBuilder] after applying given `alter_kind`.
/// ///
/// The returned builder would derive the next column id of this meta. /// The returned builder would derive the next column id of this meta.

View File

@@ -43,6 +43,7 @@ use object_store::ObjectStore;
use servers::grpc::builder::GrpcServerBuilder; use servers::grpc::builder::GrpcServerBuilder;
use servers::grpc::greptime_handler::GreptimeRequestHandler; use servers::grpc::greptime_handler::GreptimeRequestHandler;
use servers::grpc::{FlightCompression, GrpcOptions, GrpcServer, GrpcServerConfig}; use servers::grpc::{FlightCompression, GrpcOptions, GrpcServer, GrpcServerConfig};
use servers::http::prom_store::PromStoreState;
use servers::http::{HttpOptions, HttpServerBuilder, PromValidationMode}; use servers::http::{HttpOptions, HttpServerBuilder, PromValidationMode};
use servers::metrics_handler::MetricsHandler; use servers::metrics_handler::MetricsHandler;
use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef}; use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
@@ -534,15 +535,17 @@ pub async fn setup_test_prom_app_with_frontend(
..Default::default() ..Default::default()
}; };
let frontend_ref = instance.fe_instance().clone(); let frontend_ref = instance.fe_instance().clone();
let state = PromStoreState {
prom_store_handler: frontend_ref.clone(),
pipeline_handler: Some(frontend_ref.clone()),
prom_store_with_metric_engine: true,
prom_validation_mode: PromValidationMode::Strict,
bulk_state: None,
};
let http_server = HttpServerBuilder::new(http_opts) let http_server = HttpServerBuilder::new(http_opts)
.with_sql_handler(ServerSqlQueryHandlerAdapter::arc(frontend_ref.clone())) .with_sql_handler(ServerSqlQueryHandlerAdapter::arc(frontend_ref.clone()))
.with_logs_handler(instance.fe_instance().clone()) .with_logs_handler(instance.fe_instance().clone())
.with_prom_handler( .with_prom_handler(state)
frontend_ref.clone(),
Some(frontend_ref.clone()),
true,
PromValidationMode::Strict,
)
.with_prometheus_handler(frontend_ref) .with_prometheus_handler(frontend_ref)
.with_greptime_config_options(instance.opts.datanode_options().to_toml().unwrap()) .with_greptime_config_options(instance.opts.datanode_options().to_toml().unwrap())
.build(); .build();