diff --git a/Cargo.lock b/Cargo.lock index c2bad3d971..f4ff29fb70 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2004,9 +2004,11 @@ dependencies = [ "common-macro", "common-test-util", "futures", + "lazy_static", "paste", "pin-project", "rand 0.9.1", + "regex", "serde", "snafu 0.8.6", "tokio", @@ -2454,6 +2456,7 @@ dependencies = [ "datafusion-expr", "datatypes", "futures-util", + "once_cell", "serde", "snafu 0.8.6", "sqlparser", @@ -7579,6 +7582,7 @@ dependencies = [ "common-decimal", "common-error", "common-macro", + "common-query", "common-recordbatch", "common-telemetry", "common-time", diff --git a/config/config.md b/config/config.md index 72d48b5bcb..1c20e66540 100644 --- a/config/config.md +++ b/config/config.md @@ -13,6 +13,7 @@ | Key | Type | Default | Descriptions | | --- | -----| ------- | ----------- | | `default_timezone` | String | Unset | The default timezone of the server. | +| `default_column_prefix` | String | Unset | The default column prefix for auto-created time index and value columns. | | `init_regions_in_background` | Bool | `false` | Initialize all regions in the background during the startup.
By default, it provides services after all regions have been initialized. | | `init_regions_parallelism` | Integer | `16` | Parallelism of initializing regions. | | `max_concurrent_queries` | Integer | `0` | The maximum current queries allowed to be executed. Zero means unlimited. | @@ -226,6 +227,7 @@ | Key | Type | Default | Descriptions | | --- | -----| ------- | ----------- | | `default_timezone` | String | Unset | The default timezone of the server. | +| `default_column_prefix` | String | Unset | The default column prefix for auto-created time index and value columns. | | `max_in_flight_write_bytes` | String | Unset | The maximum in-flight write bytes. | | `runtime` | -- | -- | The runtime options. | | `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. | @@ -440,6 +442,7 @@ | Key | Type | Default | Descriptions | | --- | -----| ------- | ----------- | | `node_id` | Integer | Unset | The datanode identifier and should be unique in the cluster. | +| `default_column_prefix` | String | Unset | The default column prefix for auto-created time index and value columns. | | `require_lease_before_startup` | Bool | `false` | Start services after regions have obtained leases.
It will block the datanode start if it can't receive leases in the heartbeat from metasrv. | | `init_regions_in_background` | Bool | `false` | Initialize all regions in the background during the startup.
By default, it provides services after all regions have been initialized. | | `init_regions_parallelism` | Integer | `16` | Parallelism of initializing regions. | diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 82ee07bd84..b232f5109f 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -2,6 +2,10 @@ ## @toml2docs:none-default node_id = 42 +## The default column prefix for auto-created time index and value columns. +## @toml2docs:none-default +default_column_prefix = "greptime" + ## Start services after regions have obtained leases. ## It will block the datanode start if it can't receive leases in the heartbeat from metasrv. require_lease_before_startup = false diff --git a/config/frontend.example.toml b/config/frontend.example.toml index 9ffcdad540..70c61c82c7 100644 --- a/config/frontend.example.toml +++ b/config/frontend.example.toml @@ -2,6 +2,10 @@ ## @toml2docs:none-default default_timezone = "UTC" +## The default column prefix for auto-created time index and value columns. +## @toml2docs:none-default +default_column_prefix = "greptime" + ## The maximum in-flight write bytes. ## @toml2docs:none-default #+ max_in_flight_write_bytes = "500MB" diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 744dbbe751..22f5574ef5 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -2,6 +2,10 @@ ## @toml2docs:none-default default_timezone = "UTC" +## The default column prefix for auto-created time index and value columns. +## @toml2docs:none-default +default_column_prefix = "greptime" + ## Initialize all regions in the background during the startup. ## By default, it provides services after all regions have been initialized. init_regions_in_background = false diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index fda6d968bf..89992eba37 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -25,11 +25,13 @@ use clap::Parser; use client::client_manager::NodeClients; use common_base::Plugins; use common_config::{Configurable, DEFAULT_DATA_HOME}; +use common_error::ext::BoxedError; use common_grpc::channel_manager::ChannelConfig; use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder}; use common_meta::heartbeat::handler::HandlerGroupExecutor; use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler; use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; +use common_query::prelude::set_default_prefix; use common_stat::ResourceStatImpl; use common_telemetry::info; use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions}; @@ -333,6 +335,9 @@ impl StartCommand { .context(error::StartFrontendSnafu)?; set_default_timezone(opts.default_timezone.as_deref()).context(error::InitTimezoneSnafu)?; + set_default_prefix(opts.default_column_prefix.as_deref()) + .map_err(BoxedError::new) + .context(error::BuildCliSnafu)?; let meta_client_options = opts .meta_client diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 58602d0a39..bf5aff7825 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -41,6 +41,7 @@ use common_meta::region_registry::LeaderRegionRegistry; use common_meta::sequence::SequenceBuilder; use common_meta::wal_options_allocator::{WalOptionsAllocatorRef, build_wal_options_allocator}; use common_procedure::ProcedureManagerRef; +use common_query::prelude::set_default_prefix; use common_telemetry::info; use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions}; use common_time::timezone::set_default_timezone; @@ -355,6 +356,10 @@ impl StartCommand { let mut plugins = Plugins::new(); let plugin_opts = opts.plugins; let mut opts = opts.component; + set_default_prefix(opts.default_column_prefix.as_deref()) + .map_err(BoxedError::new) + .context(error::BuildCliSnafu)?; + opts.grpc.detect_server_addr(); let fe_opts = opts.frontend_options(); let dn_opts = opts.datanode_options(); diff --git a/src/cmd/tests/load_config_test.rs b/src/cmd/tests/load_config_test.rs index b92cf9631d..f4ee324b69 100644 --- a/src/cmd/tests/load_config_test.rs +++ b/src/cmd/tests/load_config_test.rs @@ -48,6 +48,7 @@ fn test_load_datanode_example_config() { let expected = GreptimeOptions:: { component: DatanodeOptions { node_id: Some(42), + default_column_prefix: Some("greptime".to_string()), meta_client: Some(MetaClientOptions { metasrv_addrs: vec!["127.0.0.1:3002".to_string()], timeout: Duration::from_secs(3), @@ -113,6 +114,7 @@ fn test_load_frontend_example_config() { let expected = GreptimeOptions:: { component: FrontendOptions { default_timezone: Some("UTC".to_string()), + default_column_prefix: Some("greptime".to_string()), meta_client: Some(MetaClientOptions { metasrv_addrs: vec!["127.0.0.1:3002".to_string()], timeout: Duration::from_secs(3), @@ -273,6 +275,7 @@ fn test_load_standalone_example_config() { let expected = GreptimeOptions:: { component: StandaloneOptions { default_timezone: Some("UTC".to_string()), + default_column_prefix: Some("greptime".to_string()), wal: DatanodeWalConfig::RaftEngine(RaftEngineConfig { dir: Some(format!("{}/{}", DEFAULT_DATA_HOME, WAL_DIR)), sync_period: Some(Duration::from_secs(10)), diff --git a/src/common/base/Cargo.toml b/src/common/base/Cargo.toml index ae2945b1f5..4a881990b4 100644 --- a/src/common/base/Cargo.toml +++ b/src/common/base/Cargo.toml @@ -18,9 +18,11 @@ bytes.workspace = true common-error.workspace = true common-macro.workspace = true futures.workspace = true +lazy_static.workspace = true paste.workspace = true pin-project.workspace = true rand.workspace = true +regex.workspace = true serde = { version = "1.0", features = ["derive"] } snafu.workspace = true tokio.workspace = true diff --git a/src/common/base/src/lib.rs b/src/common/base/src/lib.rs index cc5acdbf47..1f530c2753 100644 --- a/src/common/base/src/lib.rs +++ b/src/common/base/src/lib.rs @@ -19,6 +19,7 @@ pub mod plugins; pub mod range_read; #[allow(clippy::all)] pub mod readable_size; +pub mod regex_pattern; pub mod secrets; pub mod serde; diff --git a/src/common/base/src/regex_pattern.rs b/src/common/base/src/regex_pattern.rs new file mode 100644 index 0000000000..7ff46693ba --- /dev/null +++ b/src/common/base/src/regex_pattern.rs @@ -0,0 +1,22 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use lazy_static::lazy_static; +use regex::Regex; + +pub const NAME_PATTERN: &str = r"[a-zA-Z_:-][a-zA-Z0-9_:\-\.@#]*"; + +lazy_static! { + pub static ref NAME_PATTERN_REG: Regex = Regex::new(&format!("^{NAME_PATTERN}$")).unwrap(); +} diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index a1d98db301..55dbc0ad01 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -121,6 +121,7 @@ use std::ops::{Deref, DerefMut}; use std::sync::Arc; use bytes::Bytes; +use common_base::regex_pattern::NAME_PATTERN; use common_catalog::consts::{ DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, }; @@ -164,7 +165,6 @@ use crate::rpc::router::{LeaderState, RegionRoute, region_distribution}; use crate::rpc::store::BatchDeleteRequest; use crate::state_store::PoisonValue; -pub const NAME_PATTERN: &str = r"[a-zA-Z_:-][a-zA-Z0-9_:\-\.@#]*"; pub const TOPIC_NAME_PATTERN: &str = r"[a-zA-Z0-9_:-][a-zA-Z0-9_:\-\.@#]*"; pub const LEGACY_MAINTENANCE_KEY: &str = "__maintenance"; pub const MAINTENANCE_KEY: &str = "__switches/maintenance"; @@ -269,10 +269,6 @@ pub type FlowId = u32; /// The partition of flow. pub type FlowPartitionId = u32; -lazy_static! { - pub static ref NAME_PATTERN_REGEX: Regex = Regex::new(NAME_PATTERN).unwrap(); -} - lazy_static! { pub static ref TOPIC_NAME_PATTERN_REGEX: Regex = Regex::new(TOPIC_NAME_PATTERN).unwrap(); } diff --git a/src/common/query/Cargo.toml b/src/common/query/Cargo.toml index 7cdc5a8a45..48328ea612 100644 --- a/src/common/query/Cargo.toml +++ b/src/common/query/Cargo.toml @@ -14,6 +14,7 @@ workspace = true api.workspace = true async-trait.workspace = true bytes.workspace = true +common-base.workspace = true common-error.workspace = true common-macro.workspace = true common-recordbatch.workspace = true @@ -22,6 +23,7 @@ datafusion.workspace = true datafusion-common.workspace = true datafusion-expr.workspace = true datatypes.workspace = true +once_cell.workspace = true serde.workspace = true snafu.workspace = true sqlparser.workspace = true diff --git a/src/common/query/src/error.rs b/src/common/query/src/error.rs index 163efb30a7..618795bb4a 100644 --- a/src/common/query/src/error.rs +++ b/src/common/query/src/error.rs @@ -199,6 +199,9 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Invalid character in prefix config: {}", prefix))] + InvalidColumnPrefix { prefix: String }, } pub type Result = std::result::Result; @@ -227,7 +230,8 @@ impl ErrorExt for Error { Error::UnsupportedInputDataType { .. } | Error::TypeCast { .. } - | Error::InvalidFuncArgs { .. } => StatusCode::InvalidArguments, + | Error::InvalidFuncArgs { .. } + | Error::InvalidColumnPrefix { .. } => StatusCode::InvalidArguments, Error::ConvertDfRecordBatchStream { source, .. } => source.status_code(), diff --git a/src/common/query/src/prelude.rs b/src/common/query/src/prelude.rs index f467906402..c27b94294e 100644 --- a/src/common/query/src/prelude.rs +++ b/src/common/query/src/prelude.rs @@ -12,15 +12,61 @@ // See the License for the specific language governing permissions and // limitations under the License. +use common_base::regex_pattern::NAME_PATTERN_REG; pub use datafusion_common::ScalarValue; +use once_cell::sync::OnceCell; +use snafu::ensure; pub use crate::columnar_value::ColumnarValue; +use crate::error::{InvalidColumnPrefixSnafu, Result}; -/// Default timestamp column name for Prometheus metrics. -pub const GREPTIME_TIMESTAMP: &str = "greptime_timestamp"; -/// Default value column name for Prometheus metrics. -pub const GREPTIME_VALUE: &str = "greptime_value"; -/// Default counter column name for OTLP metrics. +/// Default time index column name. +static GREPTIME_TIMESTAMP_CELL: OnceCell = OnceCell::new(); + +/// Default value column name. +static GREPTIME_VALUE_CELL: OnceCell = OnceCell::new(); + +pub fn set_default_prefix(prefix: Option<&str>) -> Result<()> { + match prefix { + None => { + // use default greptime prefix + GREPTIME_TIMESTAMP_CELL.get_or_init(|| GREPTIME_TIMESTAMP.to_string()); + GREPTIME_VALUE_CELL.get_or_init(|| GREPTIME_VALUE.to_string()); + } + Some(s) if s.trim().is_empty() => { + // use "" to disable prefix + GREPTIME_TIMESTAMP_CELL.get_or_init(|| "timestamp".to_string()); + GREPTIME_VALUE_CELL.get_or_init(|| "value".to_string()); + } + Some(x) => { + ensure!( + NAME_PATTERN_REG.is_match(x), + InvalidColumnPrefixSnafu { prefix: x } + ); + GREPTIME_TIMESTAMP_CELL.get_or_init(|| format!("{}_timestamp", x)); + GREPTIME_VALUE_CELL.get_or_init(|| format!("{}_value", x)); + } + } + Ok(()) +} + +/// Get the default timestamp column name. +/// Returns the configured value, or `greptime_timestamp` if not set. +pub fn greptime_timestamp() -> &'static str { + GREPTIME_TIMESTAMP_CELL.get_or_init(|| GREPTIME_TIMESTAMP.to_string()) +} + +/// Get the default value column name. +/// Returns the configured value, or `greptime_value` if not set. +pub fn greptime_value() -> &'static str { + GREPTIME_VALUE_CELL.get_or_init(|| GREPTIME_VALUE.to_string()) +} + +/// Default timestamp column name constant for backward compatibility. +const GREPTIME_TIMESTAMP: &str = "greptime_timestamp"; +/// Default value column name constant for backward compatibility. +const GREPTIME_VALUE: &str = "greptime_value"; +/// Default counter column name for OTLP metrics (legacy mode). pub const GREPTIME_COUNT: &str = "greptime_count"; /// Default physical table name pub const GREPTIME_PHYSICAL_TABLE: &str = "greptime_physical_table"; diff --git a/src/datanode/src/config.rs b/src/datanode/src/config.rs index 2f2fcd2697..e40a52bd6b 100644 --- a/src/datanode/src/config.rs +++ b/src/datanode/src/config.rs @@ -66,6 +66,7 @@ impl Default for StorageConfig { #[serde(default)] pub struct DatanodeOptions { pub node_id: Option, + pub default_column_prefix: Option, pub workload_types: Vec, pub require_lease_before_startup: bool, pub init_regions_in_background: bool, @@ -119,6 +120,7 @@ impl Default for DatanodeOptions { fn default() -> Self { Self { node_id: None, + default_column_prefix: None, workload_types: vec![DatanodeWorkloadType::Hybrid], require_lease_before_startup: false, init_regions_in_background: false, diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index b9b8edcdba..50d0ef4076 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -27,6 +27,7 @@ use common_meta::key::runtime_switch::RuntimeSwitchManager; use common_meta::key::{SchemaMetadataManager, SchemaMetadataManagerRef}; use common_meta::kv_backend::KvBackendRef; pub use common_procedure::options::ProcedureConfig; +use common_query::prelude::set_default_prefix; use common_stat::ResourceStatImpl; use common_telemetry::{error, info, warn}; use common_wal::config::DatanodeWalConfig; @@ -59,9 +60,9 @@ use tokio::sync::Notify; use crate::config::{DatanodeOptions, RegionEngineConfig, StorageConfig}; use crate::error::{ - self, BuildMetricEngineSnafu, BuildMitoEngineSnafu, CreateDirSnafu, GetMetadataSnafu, - MissingCacheSnafu, MissingNodeIdSnafu, OpenLogStoreSnafu, Result, ShutdownInstanceSnafu, - ShutdownServerSnafu, StartServerSnafu, + self, BuildDatanodeSnafu, BuildMetricEngineSnafu, BuildMitoEngineSnafu, CreateDirSnafu, + GetMetadataSnafu, MissingCacheSnafu, MissingNodeIdSnafu, OpenLogStoreSnafu, Result, + ShutdownInstanceSnafu, ShutdownServerSnafu, StartServerSnafu, }; use crate::event_listener::{ NoopRegionServerEventListener, RegionServerEventListenerRef, RegionServerEventReceiver, @@ -220,6 +221,9 @@ impl DatanodeBuilder { pub async fn build(mut self) -> Result { let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?; + set_default_prefix(self.opts.default_column_prefix.as_deref()) + .map_err(BoxedError::new) + .context(BuildDatanodeSnafu)?; let meta_client = self.meta_client.take(); diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index a2e6f674e2..eda483a1e2 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -165,6 +165,13 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to build datanode"))] + BuildDatanode { + #[snafu(implicit)] + location: Location, + source: BoxedError, + }, + #[snafu(display("Failed to build http client"))] BuildHttpClient { #[snafu(implicit)] @@ -429,7 +436,8 @@ impl ErrorExt for Error { | MissingRequiredField { .. } | RegionEngineNotFound { .. } | ParseAddr { .. } - | TomlFormat { .. } => StatusCode::InvalidArguments, + | TomlFormat { .. } + | BuildDatanode { .. } => StatusCode::InvalidArguments, PayloadNotExist { .. } | Unexpected { .. } diff --git a/src/frontend/src/frontend.rs b/src/frontend/src/frontend.rs index bf2e7a0558..dce9ffd158 100644 --- a/src/frontend/src/frontend.rs +++ b/src/frontend/src/frontend.rs @@ -45,6 +45,7 @@ use crate::service_config::{ pub struct FrontendOptions { pub node_id: Option, pub default_timezone: Option, + pub default_column_prefix: Option, pub heartbeat: HeartbeatOptions, pub http: HttpOptions, pub grpc: GrpcOptions, @@ -77,6 +78,7 @@ impl Default for FrontendOptions { Self { node_id: None, default_timezone: None, + default_column_prefix: None, heartbeat: HeartbeatOptions::frontend_default(), http: HttpOptions::default(), grpc: GrpcOptions::default(), diff --git a/src/meta-srv/src/handler/persist_stats_handler.rs b/src/meta-srv/src/handler/persist_stats_handler.rs index 1dc81f49eb..abc2fa3c3e 100644 --- a/src/meta-srv/src/handler/persist_stats_handler.rs +++ b/src/meta-srv/src/handler/persist_stats_handler.rs @@ -77,6 +77,7 @@ struct PersistRegionStat<'a> { sst_size: u64, write_bytes_delta: u64, #[col( + // This col name is for the information schema table, so we don't touch it name = "greptime_timestamp", semantic = "Timestamp", datatype = "TimestampMillisecond" diff --git a/src/metric-engine/src/data_region.rs b/src/metric-engine/src/data_region.rs index 5056cd0352..beab78cd70 100644 --- a/src/metric-engine/src/data_region.rs +++ b/src/metric-engine/src/data_region.rs @@ -240,6 +240,7 @@ impl DataRegion { #[cfg(test)] mod test { + use common_query::prelude::{greptime_timestamp, greptime_value}; use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; @@ -300,8 +301,8 @@ mod test { .map(|c| &c.column_schema.name) .collect::>(); let expected = vec![ - "greptime_timestamp", - "greptime_value", + greptime_timestamp(), + greptime_value(), "__table_id", "__tsid", "job", diff --git a/src/metric-engine/src/engine/alter.rs b/src/metric-engine/src/engine/alter.rs index 1c4cb93639..1ae63915e9 100644 --- a/src/metric-engine/src/engine/alter.rs +++ b/src/metric-engine/src/engine/alter.rs @@ -224,6 +224,7 @@ mod test { use api::v1::SemanticType; use common_meta::ddl::test_util::assert_column_name_and_id; use common_meta::ddl::utils::{parse_column_metadatas, parse_manifest_infos_from_extensions}; + use common_query::prelude::{greptime_timestamp, greptime_value}; use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY; use store_api::region_engine::RegionEngine; use store_api::region_request::{ @@ -295,7 +296,7 @@ mod test { .unwrap(); assert_eq!(semantic_type, SemanticType::Tag); let timestamp_index = metadata_region - .column_semantic_type(physical_region_id, logical_region_id, "greptime_timestamp") + .column_semantic_type(physical_region_id, logical_region_id, greptime_timestamp()) .await .unwrap() .unwrap(); @@ -305,8 +306,8 @@ mod test { assert_column_name_and_id( &column_metadatas, &[ - ("greptime_timestamp", 0), - ("greptime_value", 1), + (greptime_timestamp(), 0), + (greptime_value(), 1), ("__table_id", ReservedColumnId::table_id()), ("__tsid", ReservedColumnId::tsid()), ("job", 2), @@ -364,8 +365,8 @@ mod test { assert_column_name_and_id( &column_metadatas, &[ - ("greptime_timestamp", 0), - ("greptime_value", 1), + (greptime_timestamp(), 0), + (greptime_value(), 1), ("__table_id", ReservedColumnId::table_id()), ("__tsid", ReservedColumnId::tsid()), ("job", 2), diff --git a/src/metric-engine/src/engine/create.rs b/src/metric-engine/src/engine/create.rs index c506c0e2b4..2796d3652b 100644 --- a/src/metric-engine/src/engine/create.rs +++ b/src/metric-engine/src/engine/create.rs @@ -619,6 +619,7 @@ pub(crate) fn region_options_for_metadata_region( mod test { use common_meta::ddl::test_util::assert_column_name_and_id; use common_meta::ddl::utils::{parse_column_metadatas, parse_manifest_infos_from_extensions}; + use common_query::prelude::{greptime_timestamp, greptime_value}; use store_api::metric_engine_consts::{METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY}; use store_api::region_request::BatchRegionDdlRequest; @@ -856,8 +857,8 @@ mod test { assert_column_name_and_id( &column_metadatas, &[ - ("greptime_timestamp", 0), - ("greptime_value", 1), + (greptime_timestamp(), 0), + (greptime_value(), 1), ("__table_id", ReservedColumnId::table_id()), ("__tsid", ReservedColumnId::tsid()), ("job", 2), diff --git a/src/metric-engine/src/engine/sync.rs b/src/metric-engine/src/engine/sync.rs index b62b138dab..741938f8d7 100644 --- a/src/metric-engine/src/engine/sync.rs +++ b/src/metric-engine/src/engine/sync.rs @@ -110,6 +110,7 @@ mod tests { use std::collections::HashMap; use api::v1::SemanticType; + use common_query::prelude::greptime_timestamp; use common_telemetry::info; use datatypes::data_type::ConcreteDataType; use datatypes::schema::ColumnSchema; @@ -243,7 +244,7 @@ mod tests { .unwrap(); assert_eq!(semantic_type, SemanticType::Tag); let timestamp_index = metadata_region - .column_semantic_type(physical_region_id, logical_region_id, "greptime_timestamp") + .column_semantic_type(physical_region_id, logical_region_id, greptime_timestamp()) .await .unwrap() .unwrap(); diff --git a/src/metric-engine/src/test_util.rs b/src/metric-engine/src/test_util.rs index d594541d84..cc173e534c 100644 --- a/src/metric-engine/src/test_util.rs +++ b/src/metric-engine/src/test_util.rs @@ -17,6 +17,7 @@ use api::v1::value::ValueData; use api::v1::{ColumnDataType, ColumnSchema as PbColumnSchema, Row, SemanticType, Value}; use common_meta::ddl::utils::parse_column_metadatas; +use common_query::prelude::{greptime_timestamp, greptime_value}; use common_telemetry::debug; use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; @@ -132,7 +133,7 @@ impl TestEnv { column_id: 0, semantic_type: SemanticType::Timestamp, column_schema: ColumnSchema::new( - "greptime_timestamp", + greptime_timestamp(), ConcreteDataType::timestamp_millisecond_datatype(), false, ), @@ -141,7 +142,7 @@ impl TestEnv { column_id: 1, semantic_type: SemanticType::Field, column_schema: ColumnSchema::new( - "greptime_value", + greptime_value(), ConcreteDataType::float64_datatype(), false, ), @@ -204,8 +205,8 @@ impl TestEnv { assert_eq!( column_names, vec![ - "greptime_timestamp", - "greptime_value", + greptime_timestamp(), + greptime_value(), "__table_id", "__tsid", "job", @@ -300,7 +301,7 @@ pub fn create_logical_region_request( column_id: 0, semantic_type: SemanticType::Timestamp, column_schema: ColumnSchema::new( - "greptime_timestamp", + greptime_timestamp(), ConcreteDataType::timestamp_millisecond_datatype(), false, ), @@ -309,7 +310,7 @@ pub fn create_logical_region_request( column_id: 1, semantic_type: SemanticType::Field, column_schema: ColumnSchema::new( - "greptime_value", + greptime_value(), ConcreteDataType::float64_datatype(), false, ), @@ -372,14 +373,14 @@ pub fn alter_logical_region_request(tags: &[&str]) -> RegionAlterRequest { pub fn row_schema_with_tags(tags: &[&str]) -> Vec { let mut schema = vec![ PbColumnSchema { - column_name: "greptime_timestamp".to_string(), + column_name: greptime_timestamp().to_string(), datatype: ColumnDataType::TimestampMillisecond as i32, semantic_type: SemanticType::Timestamp as _, datatype_extension: None, options: None, }, PbColumnSchema { - column_name: "greptime_value".to_string(), + column_name: greptime_value().to_string(), datatype: ColumnDataType::Float64 as i32, semantic_type: SemanticType::Field as _, datatype_extension: None, diff --git a/src/mito-codec/Cargo.toml b/src/mito-codec/Cargo.toml index 99a46e8ac9..81808f2714 100644 --- a/src/mito-codec/Cargo.toml +++ b/src/mito-codec/Cargo.toml @@ -15,6 +15,7 @@ common-base.workspace = true common-decimal.workspace = true common-error.workspace = true common-macro.workspace = true +common-query.workspace = true common-recordbatch.workspace = true common-telemetry.workspace = true common-time.workspace = true diff --git a/src/mito-codec/src/primary_key_filter.rs b/src/mito-codec/src/primary_key_filter.rs index e4d1ce5056..c71fafc974 100644 --- a/src/mito-codec/src/primary_key_filter.rs +++ b/src/mito-codec/src/primary_key_filter.rs @@ -154,6 +154,7 @@ mod tests { use std::sync::Arc; use api::v1::SemanticType; + use common_query::prelude::{greptime_timestamp, greptime_value}; use datafusion_common::Column; use datafusion_expr::{BinaryExpr, Expr, Literal, Operator}; use datatypes::prelude::ConcreteDataType; @@ -193,7 +194,7 @@ mod tests { }) .push_column_metadata(ColumnMetadata { column_schema: ColumnSchema::new( - "greptime_value", + greptime_value(), ConcreteDataType::float64_datatype(), false, ), @@ -202,7 +203,7 @@ mod tests { }) .push_column_metadata(ColumnMetadata { column_schema: ColumnSchema::new( - "greptime_timestamp", + greptime_timestamp(), ConcreteDataType::timestamp_nanosecond_datatype(), false, ), diff --git a/src/mito-codec/src/row_converter/sparse.rs b/src/mito-codec/src/row_converter/sparse.rs index edc26db8f0..191c2bd011 100644 --- a/src/mito-codec/src/row_converter/sparse.rs +++ b/src/mito-codec/src/row_converter/sparse.rs @@ -385,6 +385,7 @@ mod tests { use std::sync::Arc; use api::v1::SemanticType; + use common_query::prelude::{greptime_timestamp, greptime_value}; use common_time::Timestamp; use common_time::timestamp::TimeUnit; use datatypes::schema::ColumnSchema; @@ -461,7 +462,7 @@ mod tests { }) .push_column_metadata(ColumnMetadata { column_schema: ColumnSchema::new( - "greptime_value", + greptime_value(), ConcreteDataType::float64_datatype(), false, ), @@ -470,7 +471,7 @@ mod tests { }) .push_column_metadata(ColumnMetadata { column_schema: ColumnSchema::new( - "greptime_timestamp", + greptime_timestamp(), ConcreteDataType::timestamp_nanosecond_datatype(), false, ), diff --git a/src/mito2/src/memtable/partition_tree.rs b/src/mito2/src/memtable/partition_tree.rs index e404a5851e..31cadac4f1 100644 --- a/src/mito2/src/memtable/partition_tree.rs +++ b/src/mito2/src/memtable/partition_tree.rs @@ -384,6 +384,7 @@ mod tests { use api::v1::helper::{field_column_schema, row, tag_column_schema, time_index_column_schema}; use api::v1::value::ValueData; use api::v1::{Mutation, OpType, Rows, SemanticType}; + use common_query::prelude::{greptime_timestamp, greptime_value}; use common_time::Timestamp; use datafusion_common::Column; use datafusion_expr::{BinaryExpr, Expr, Literal, Operator}; @@ -694,7 +695,7 @@ mod tests { }) .push_column_metadata(ColumnMetadata { column_schema: ColumnSchema::new( - "greptime_timestamp", + greptime_timestamp(), ConcreteDataType::timestamp_millisecond_datatype(), false, ), @@ -703,7 +704,7 @@ mod tests { }) .push_column_metadata(ColumnMetadata { column_schema: ColumnSchema::new( - "greptime_value", + greptime_value(), ConcreteDataType::float64_datatype(), true, ), diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index 59ab06c95e..9de4fb3fba 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -37,7 +37,7 @@ use common_meta::cache::TableFlownodeSetCacheRef; use common_meta::node_manager::{AffectedRows, NodeManagerRef}; use common_meta::peer::Peer; use common_query::Output; -use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE}; +use common_query::prelude::{greptime_timestamp, greptime_value}; use common_telemetry::tracing_context::TracingContext; use common_telemetry::{error, info, warn}; use datatypes::schema::SkippingIndexOptions; @@ -721,14 +721,14 @@ impl Inserter { // schema with timestamp and field column let default_schema = vec![ ColumnSchema { - column_name: GREPTIME_TIMESTAMP.to_string(), + column_name: greptime_timestamp().to_string(), datatype: ColumnDataType::TimestampMillisecond as _, semantic_type: SemanticType::Timestamp as _, datatype_extension: None, options: None, }, ColumnSchema { - column_name: GREPTIME_VALUE.to_string(), + column_name: greptime_value().to_string(), datatype: ColumnDataType::Float64 as _, semantic_type: SemanticType::Field as _, datatype_extension: None, diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index 3b626d13d0..295e33e43e 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -26,13 +26,13 @@ use api::v1::{ }; use catalog::CatalogManagerRef; use chrono::Utc; +use common_base::regex_pattern::NAME_PATTERN_REG; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, is_readonly_schema}; use common_catalog::{format_full_flow_name, format_full_table_name}; use common_error::ext::BoxedError; use common_meta::cache_invalidator::Context; use common_meta::ddl::create_flow::FlowType; use common_meta::instruction::CacheIdent; -use common_meta::key::NAME_PATTERN; use common_meta::key::schema_name::{SchemaName, SchemaNameKey}; use common_meta::procedure_executor::ExecutorContext; #[cfg(feature = "enterprise")] @@ -52,14 +52,12 @@ use datafusion_expr::LogicalPlan; use datatypes::prelude::ConcreteDataType; use datatypes::schema::{RawSchema, Schema}; use datatypes::value::Value; -use lazy_static::lazy_static; use partition::expr::{Operand, PartitionExpr, RestrictedOp}; use partition::multi_dim::MultiDimPartitionRule; use query::parser::QueryStatement; use query::plan::extract_and_rewrite_full_table_names; use query::query_engine::DefaultSerializer; use query::sql::create_table_stmt; -use regex::Regex; use session::context::QueryContextRef; use session::table_name::table_idents_to_full_name; use snafu::{OptionExt, ResultExt, ensure}; @@ -96,10 +94,6 @@ use crate::expr_helper; use crate::statement::StatementExecutor; use crate::statement::show::create_partitions_stmt; -lazy_static! { - pub static ref NAME_PATTERN_REG: Regex = Regex::new(&format!("^{NAME_PATTERN}$")).unwrap(); -} - impl StatementExecutor { pub fn catalog_manager(&self) -> CatalogManagerRef { self.catalog_manager.clone() diff --git a/src/pipeline/src/etl/transform/transformer/greptime.rs b/src/pipeline/src/etl/transform/transformer/greptime.rs index de98213972..6774842ef1 100644 --- a/src/pipeline/src/etl/transform/transformer/greptime.rs +++ b/src/pipeline/src/etl/transform/transformer/greptime.rs @@ -24,7 +24,7 @@ use api::v1::column_data_type_extension::TypeExt; use api::v1::value::ValueData; use api::v1::{ColumnDataType, ColumnDataTypeExtension, JsonTypeExtension, SemanticType}; use coerce::{coerce_columns, coerce_value}; -use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE}; +use common_query::prelude::{greptime_timestamp, greptime_value}; use common_telemetry::warn; use greptime_proto::v1::{ColumnSchema, Row, Rows, Value as GreptimeValue}; use itertools::Itertools; @@ -48,7 +48,6 @@ use crate::etl::transform::index::Index; use crate::etl::transform::{Transform, Transforms}; use crate::{PipelineContext, truthy, unwrap_or_continue_if_err}; -const DEFAULT_GREPTIME_TIMESTAMP_COLUMN: &str = "greptime_timestamp"; const DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING: usize = 10; /// fields not in the columns will be discarded @@ -138,10 +137,7 @@ impl GreptimeTransformer { let default = None; let transform = Transform { - fields: Fields::one(Field::new( - DEFAULT_GREPTIME_TIMESTAMP_COLUMN.to_string(), - None, - )), + fields: Fields::one(Field::new(greptime_timestamp().to_string(), None)), type_, default, index: Some(Index::Time), @@ -347,7 +343,7 @@ fn calc_ts(p_ctx: &PipelineContext, values: &VrlValue) -> Result { let ts = values .as_object() - .and_then(|m| m.get(GREPTIME_TIMESTAMP)) + .and_then(|m| m.get(greptime_timestamp())) .and_then(|ts| ts.try_into_i64().ok()) .unwrap_or_default(); Ok(Some(ValueData::TimestampMillisecondValue(ts))) @@ -395,7 +391,7 @@ pub(crate) fn values_to_row( // skip ts column let ts_column_name = custom_ts .as_ref() - .map_or(DEFAULT_GREPTIME_TIMESTAMP_COLUMN, |ts| ts.get_column_name()); + .map_or(greptime_timestamp(), |ts| ts.get_column_name()); let values = values.into_object().context(ValueMustBeMapSnafu)?; @@ -416,7 +412,7 @@ pub(crate) fn values_to_row( } fn decide_semantic(p_ctx: &PipelineContext, column_name: &str) -> i32 { - if p_ctx.channel == Channel::Prometheus && column_name != GREPTIME_VALUE { + if p_ctx.channel == Channel::Prometheus && column_name != greptime_value() { SemanticType::Tag as i32 } else { SemanticType::Field as i32 @@ -563,7 +559,7 @@ fn identity_pipeline_inner( schema_info.schema.push(ColumnSchema { column_name: custom_ts .map(|ts| ts.get_column_name().to_string()) - .unwrap_or_else(|| DEFAULT_GREPTIME_TIMESTAMP_COLUMN.to_string()), + .unwrap_or_else(|| greptime_timestamp().to_string()), datatype: custom_ts.map(|c| c.get_datatype()).unwrap_or_else(|| { if pipeline_ctx.channel == Channel::Prometheus { ColumnDataType::TimestampMillisecond diff --git a/src/pipeline/tests/date.rs b/src/pipeline/tests/date.rs index fc9e726b61..0164dd4c22 100644 --- a/src/pipeline/tests/date.rs +++ b/src/pipeline/tests/date.rs @@ -15,6 +15,7 @@ mod common; use api::v1::ColumnSchema; +use common_query::prelude::greptime_timestamp; use greptime_proto::v1::value::ValueData; use greptime_proto::v1::{ColumnDataType, SemanticType}; use lazy_static::lazy_static; @@ -35,7 +36,7 @@ lazy_static! { SemanticType::Field, ), common::make_column_schema( - "greptime_timestamp".to_string(), + greptime_timestamp().to_string(), ColumnDataType::TimestampNanosecond, SemanticType::Timestamp, ), diff --git a/src/pipeline/tests/dissect.rs b/src/pipeline/tests/dissect.rs index a24e374532..b948110511 100644 --- a/src/pipeline/tests/dissect.rs +++ b/src/pipeline/tests/dissect.rs @@ -14,6 +14,7 @@ mod common; +use common_query::prelude::greptime_timestamp; use greptime_proto::v1::value::ValueData::StringValue; use greptime_proto::v1::{ColumnDataType, SemanticType}; use pipeline::{PipelineContext, setup_pipeline}; @@ -51,7 +52,7 @@ transform: make_string_column_schema("a".to_string()), make_string_column_schema("b".to_string()), common::make_column_schema( - "greptime_timestamp".to_string(), + greptime_timestamp().to_string(), ColumnDataType::TimestampNanosecond, SemanticType::Timestamp, ), @@ -99,7 +100,7 @@ transform: make_string_column_schema("a".to_string()), make_string_column_schema("b".to_string()), common::make_column_schema( - "greptime_timestamp".to_string(), + greptime_timestamp().to_string(), ColumnDataType::TimestampNanosecond, SemanticType::Timestamp, ), @@ -142,7 +143,7 @@ transform: make_string_column_schema("a".to_string()), make_string_column_schema("b".to_string()), common::make_column_schema( - "greptime_timestamp".to_string(), + greptime_timestamp().to_string(), ColumnDataType::TimestampNanosecond, SemanticType::Timestamp, ), @@ -185,7 +186,7 @@ transform: make_string_column_schema("key3".to_string()), make_string_column_schema("key5".to_string()), common::make_column_schema( - "greptime_timestamp".to_string(), + greptime_timestamp().to_string(), ColumnDataType::TimestampNanosecond, SemanticType::Timestamp, ), @@ -236,7 +237,7 @@ transform: let expected_schema = vec![ make_string_column_schema("key1".to_string()), common::make_column_schema( - "greptime_timestamp".to_string(), + greptime_timestamp().to_string(), ColumnDataType::TimestampNanosecond, SemanticType::Timestamp, ), diff --git a/src/pipeline/tests/epoch.rs b/src/pipeline/tests/epoch.rs index 84662793b9..ead018ad42 100644 --- a/src/pipeline/tests/epoch.rs +++ b/src/pipeline/tests/epoch.rs @@ -15,6 +15,7 @@ mod common; use api::v1::ColumnSchema; +use common_query::prelude::greptime_timestamp; use greptime_proto::v1::value::ValueData; use greptime_proto::v1::{ColumnDataType, SemanticType}; @@ -128,7 +129,7 @@ transform: make_time_field("input_nanosecond", ColumnDataType::TimestampNanosecond), make_time_field("input_nano", ColumnDataType::TimestampNanosecond), common::make_column_schema( - "greptime_timestamp".to_string(), + greptime_timestamp().to_string(), ColumnDataType::TimestampNanosecond, SemanticType::Timestamp, ), @@ -187,7 +188,7 @@ transform: SemanticType::Field, ), common::make_column_schema( - "greptime_timestamp".to_string(), + greptime_timestamp().to_string(), ColumnDataType::TimestampNanosecond, SemanticType::Timestamp, ), @@ -238,7 +239,7 @@ transform: SemanticType::Field, ), common::make_column_schema( - "greptime_timestamp".to_string(), + greptime_timestamp().to_string(), ColumnDataType::TimestampNanosecond, SemanticType::Timestamp, ), diff --git a/src/pipeline/tests/join.rs b/src/pipeline/tests/join.rs index 3625160361..dbc966404f 100644 --- a/src/pipeline/tests/join.rs +++ b/src/pipeline/tests/join.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use common_query::prelude::greptime_timestamp; use greptime_proto::v1::value::ValueData::StringValue; use greptime_proto::v1::{ColumnDataType, ColumnSchema, SemanticType}; use lazy_static::lazy_static; @@ -38,7 +39,7 @@ lazy_static! { SemanticType::Field, ), common::make_column_schema( - "greptime_timestamp".to_string(), + greptime_timestamp().to_string(), ColumnDataType::TimestampNanosecond, SemanticType::Timestamp, ), diff --git a/src/pipeline/tests/letter.rs b/src/pipeline/tests/letter.rs index d6d9a2cccb..307da50867 100644 --- a/src/pipeline/tests/letter.rs +++ b/src/pipeline/tests/letter.rs @@ -15,6 +15,7 @@ mod common; use api::v1::ColumnSchema; +use common_query::prelude::greptime_timestamp; use greptime_proto::v1::value::ValueData; use greptime_proto::v1::{ColumnDataType, SemanticType}; use lazy_static::lazy_static; @@ -27,7 +28,7 @@ lazy_static! { SemanticType::Field, ), common::make_column_schema( - "greptime_timestamp".to_string(), + greptime_timestamp().to_string(), ColumnDataType::TimestampNanosecond, SemanticType::Timestamp, ), @@ -125,7 +126,7 @@ transform: SemanticType::Field, ), common::make_column_schema( - "greptime_timestamp".to_string(), + greptime_timestamp().to_string(), ColumnDataType::TimestampNanosecond, SemanticType::Timestamp, ), @@ -175,7 +176,7 @@ transform: SemanticType::Field, ), common::make_column_schema( - "greptime_timestamp".to_string(), + greptime_timestamp().to_string(), ColumnDataType::TimestampNanosecond, SemanticType::Timestamp, ), diff --git a/src/pipeline/tests/on_failure.rs b/src/pipeline/tests/on_failure.rs index 2662a3fa96..d7df1ad7fa 100644 --- a/src/pipeline/tests/on_failure.rs +++ b/src/pipeline/tests/on_failure.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use common_query::prelude::greptime_timestamp; use greptime_proto::v1::value::ValueData::{U8Value, U16Value}; use greptime_proto::v1::{ColumnDataType, SemanticType}; @@ -46,7 +47,7 @@ transform: SemanticType::Field, ), common::make_column_schema( - "greptime_timestamp".to_string(), + greptime_timestamp().to_string(), ColumnDataType::TimestampNanosecond, SemanticType::Timestamp, ), @@ -87,7 +88,7 @@ transform: SemanticType::Field, ), common::make_column_schema( - "greptime_timestamp".to_string(), + greptime_timestamp().to_string(), ColumnDataType::TimestampNanosecond, SemanticType::Timestamp, ), @@ -123,7 +124,7 @@ transform: SemanticType::Field, ), common::make_column_schema( - "greptime_timestamp".to_string(), + greptime_timestamp().to_string(), ColumnDataType::TimestampNanosecond, SemanticType::Timestamp, ), @@ -175,7 +176,7 @@ transform: SemanticType::Field, ), common::make_column_schema( - "greptime_timestamp".to_string(), + greptime_timestamp().to_string(), ColumnDataType::TimestampNanosecond, SemanticType::Timestamp, ), diff --git a/src/pipeline/tests/regex.rs b/src/pipeline/tests/regex.rs index a8a7daaf5c..a0a3944c8e 100644 --- a/src/pipeline/tests/regex.rs +++ b/src/pipeline/tests/regex.rs @@ -15,6 +15,7 @@ mod common; use api::v1::ColumnSchema; +use common_query::prelude::greptime_timestamp; use greptime_proto::v1::value::ValueData::StringValue; use greptime_proto::v1::{ColumnDataType, SemanticType}; use lazy_static::lazy_static; @@ -27,7 +28,7 @@ lazy_static! { SemanticType::Field, ), common::make_column_schema( - "greptime_timestamp".to_string(), + greptime_timestamp().to_string(), ColumnDataType::TimestampNanosecond, SemanticType::Timestamp, ), @@ -156,7 +157,7 @@ transform: SemanticType::Field, ), common::make_column_schema( - "greptime_timestamp".to_string(), + greptime_timestamp().to_string(), ColumnDataType::TimestampNanosecond, SemanticType::Timestamp, ), diff --git a/src/pipeline/tests/simple_extract.rs b/src/pipeline/tests/simple_extract.rs index ee2fbcbcae..2a93e5d135 100644 --- a/src/pipeline/tests/simple_extract.rs +++ b/src/pipeline/tests/simple_extract.rs @@ -16,6 +16,7 @@ mod common; use api::v1::value::ValueData; use api::v1::{ColumnDataType, ColumnSchema, SemanticType}; +use common_query::prelude::greptime_timestamp; use lazy_static::lazy_static; lazy_static! { @@ -26,7 +27,7 @@ lazy_static! { SemanticType::Field, ), common::make_column_schema( - "greptime_timestamp".to_string(), + greptime_timestamp().to_string(), ColumnDataType::TimestampNanosecond, SemanticType::Timestamp, ), diff --git a/src/pipeline/tests/urlencoding.rs b/src/pipeline/tests/urlencoding.rs index dd0c4ffe9f..b8366aa044 100644 --- a/src/pipeline/tests/urlencoding.rs +++ b/src/pipeline/tests/urlencoding.rs @@ -14,6 +14,7 @@ mod common; +use common_query::prelude::greptime_timestamp; use greptime_proto::v1::value::ValueData; use greptime_proto::v1::{ColumnDataType, SemanticType}; @@ -54,7 +55,7 @@ transform: SemanticType::Field, ), common::make_column_schema( - "greptime_timestamp".to_string(), + greptime_timestamp().to_string(), ColumnDataType::TimestampNanosecond, SemanticType::Timestamp, ), @@ -100,7 +101,7 @@ transform: SemanticType::Field, ), common::make_column_schema( - "greptime_timestamp".to_string(), + greptime_timestamp().to_string(), ColumnDataType::TimestampNanosecond, SemanticType::Timestamp, ), diff --git a/src/query/src/promql/planner.rs b/src/query/src/promql/planner.rs index a1dc1b640a..5cc26cee05 100644 --- a/src/query/src/promql/planner.rs +++ b/src/query/src/promql/planner.rs @@ -22,7 +22,7 @@ use catalog::table_source::DfTableSourceProvider; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_function::function::FunctionContext; -use common_query::prelude::GREPTIME_VALUE; +use common_query::prelude::greptime_value; use datafusion::common::DFSchemaRef; use datafusion::datasource::DefaultTableSource; use datafusion::functions_aggregate::average::avg_udaf; @@ -2576,7 +2576,7 @@ impl PromPlanner { self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string()); self.ctx.reset_table_name_and_schema(); self.ctx.tag_columns = vec![]; - self.ctx.field_columns = vec![GREPTIME_VALUE.to_string()]; + self.ctx.field_columns = vec![greptime_value().to_string()]; Ok(LogicalPlan::Extension(Extension { node: Arc::new( EmptyMetric::new( @@ -2584,7 +2584,7 @@ impl PromPlanner { self.ctx.end, self.ctx.interval, SPECIAL_TIME_FUNCTION.to_string(), - GREPTIME_VALUE.to_string(), + greptime_value().to_string(), Some(lit), ) .context(DataFusionPlanningSnafu)?, @@ -3433,6 +3433,7 @@ mod test { use catalog::memory::{MemoryCatalogManager, new_memory_catalog_manager}; use common_base::Plugins; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; + use common_query::prelude::greptime_timestamp; use common_query::test_util::DummyDecoder; use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnSchema, Schema}; @@ -3543,14 +3544,14 @@ mod test { } columns.push( ColumnSchema::new( - "greptime_timestamp".to_string(), + greptime_timestamp().to_string(), ConcreteDataType::timestamp_millisecond_datatype(), false, ) .with_time_index(true), ); columns.push(ColumnSchema::new( - "greptime_value".to_string(), + greptime_value().to_string(), ConcreteDataType::float64_datatype(), true, )); diff --git a/src/query/src/sql.rs b/src/query/src/sql.rs index 6b6ee2ed07..693b1aa068 100644 --- a/src/query/src/sql.rs +++ b/src/query/src/sql.rs @@ -34,7 +34,7 @@ use common_datasource::util::find_dir_and_filename; use common_meta::SchemaOptions; use common_meta::key::flow::flow_info::FlowInfoValue; use common_query::Output; -use common_query::prelude::GREPTIME_TIMESTAMP; +use common_query::prelude::greptime_timestamp; use common_recordbatch::RecordBatches; use common_recordbatch::adapter::RecordBatchStreamAdapter; use common_time::Timestamp; @@ -1195,14 +1195,14 @@ pub fn file_column_schemas_to_table( let timestamp_type = ConcreteDataType::timestamp_millisecond_datatype(); let default_zero = Value::Timestamp(Timestamp::new_millisecond(0)); - let timestamp_column_schema = ColumnSchema::new(GREPTIME_TIMESTAMP, timestamp_type, false) + let timestamp_column_schema = ColumnSchema::new(greptime_timestamp(), timestamp_type, false) .with_time_index(true) .with_default_constraint(Some(ColumnDefaultConstraint::Value(default_zero))) .unwrap(); if let Some(column_schema) = column_schemas .iter_mut() - .find(|column_schema| column_schema.name == GREPTIME_TIMESTAMP) + .find(|column_schema| column_schema.name == greptime_timestamp()) { // Replace the column schema with the default one *column_schema = timestamp_column_schema; @@ -1210,7 +1210,7 @@ pub fn file_column_schemas_to_table( column_schemas.push(timestamp_column_schema); } - (column_schemas, GREPTIME_TIMESTAMP.to_string()) + (column_schemas, greptime_timestamp().to_string()) } /// This function checks if the column schemas from a file can be matched with diff --git a/src/servers/src/http/loki.rs b/src/servers/src/http/loki.rs index 45d6eadadd..f10ab53190 100644 --- a/src/servers/src/http/loki.rs +++ b/src/servers/src/http/loki.rs @@ -26,7 +26,7 @@ use axum::extract::State; use axum_extra::TypedHeader; use bytes::Bytes; use chrono::DateTime; -use common_query::prelude::GREPTIME_TIMESTAMP; +use common_query::prelude::greptime_timestamp; use common_query::{Output, OutputData}; use common_telemetry::{error, warn}; use headers::ContentType; @@ -73,7 +73,7 @@ const LINES_KEY: &str = "values"; lazy_static! { static ref LOKI_INIT_SCHEMAS: Vec = vec![ ColumnSchema { - column_name: GREPTIME_TIMESTAMP.to_string(), + column_name: greptime_timestamp().to_string(), datatype: ColumnDataType::TimestampNanosecond.into(), semantic_type: SemanticType::Timestamp.into(), datatype_extension: None, @@ -453,7 +453,7 @@ impl From> for LokiPipeline { let mut map = BTreeMap::new(); map.insert( - KeyString::from(GREPTIME_TIMESTAMP), + KeyString::from(greptime_timestamp()), VrlValue::Timestamp(DateTime::from_timestamp_nanos(ts)), ); map.insert( @@ -586,7 +586,7 @@ impl From>> for LokiPipeline { let mut map = BTreeMap::new(); map.insert( - KeyString::from(GREPTIME_TIMESTAMP), + KeyString::from(greptime_timestamp()), VrlValue::Timestamp(DateTime::from_timestamp_nanos(ts)), ); map.insert( diff --git a/src/servers/src/influxdb.rs b/src/servers/src/influxdb.rs index 2ebfd9dd08..9bff0bbc6e 100644 --- a/src/servers/src/influxdb.rs +++ b/src/servers/src/influxdb.rs @@ -15,7 +15,7 @@ use api::v1::value::ValueData; use api::v1::{ColumnDataType, RowInsertRequests}; use common_grpc::precision::Precision; -use common_query::prelude::GREPTIME_TIMESTAMP; +use common_query::prelude::greptime_timestamp; use hyper::Request; use influxdb_line_protocol::{FieldValue, parse_lines}; use snafu::ResultExt; @@ -91,7 +91,7 @@ impl TryFrom for RowInsertRequests { // timestamp row_writer::write_ts_to_nanos( table_data, - GREPTIME_TIMESTAMP, + greptime_timestamp(), ts, precision, &mut one_row, @@ -117,6 +117,7 @@ fn unwrap_or_default_precision(precision: Option) -> Precision { mod tests { use api::v1::value::ValueData; use api::v1::{ColumnDataType, RowInsertRequests, Rows, SemanticType}; + use common_query::prelude::greptime_timestamp; use crate::influxdb::InfluxdbRequest; @@ -193,7 +194,7 @@ monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003"; } } } - "greptime_timestamp" => { + _ if column_schema.column_name == greptime_timestamp() => { assert_eq!( ColumnDataType::TimestampNanosecond as i32, column_schema.datatype @@ -268,7 +269,7 @@ monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003"; } } } - "greptime_timestamp" => { + _ if column_schema.column_name == greptime_timestamp() => { assert_eq!( ColumnDataType::TimestampNanosecond as i32, column_schema.datatype diff --git a/src/servers/src/opentsdb.rs b/src/servers/src/opentsdb.rs index 9ae63c1b9e..203eef8c2b 100644 --- a/src/servers/src/opentsdb.rs +++ b/src/servers/src/opentsdb.rs @@ -16,7 +16,7 @@ pub mod codec; use api::v1::RowInsertRequests; use common_grpc::precision::Precision; -use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE}; +use common_query::prelude::{greptime_timestamp, greptime_value}; use self::codec::DataPoint; use crate::error::Result; @@ -42,11 +42,11 @@ pub fn data_point_to_grpc_row_insert_requests( row_writer::write_tags(table_data, tags.into_iter(), &mut one_row)?; // value - row_writer::write_f64(table_data, GREPTIME_VALUE, value, &mut one_row)?; + row_writer::write_f64(table_data, greptime_value(), value, &mut one_row)?; // timestamp row_writer::write_ts_to_millis( table_data, - GREPTIME_TIMESTAMP, + greptime_timestamp(), Some(timestamp), Precision::Millisecond, &mut one_row, diff --git a/src/servers/src/opentsdb/codec.rs b/src/servers/src/opentsdb/codec.rs index 16aa9b6381..c4760aa74d 100644 --- a/src/servers/src/opentsdb/codec.rs +++ b/src/servers/src/opentsdb/codec.rs @@ -13,7 +13,7 @@ // limitations under the License. use api::v1::{Column, ColumnDataType, InsertRequest as GrpcInsertRequest, SemanticType, column}; -use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE}; +use common_query::prelude::{greptime_timestamp, greptime_value}; use crate::error::{self, Result}; @@ -129,7 +129,7 @@ impl DataPoint { let mut columns = Vec::with_capacity(2 + self.tags.len()); let ts_column = Column { - column_name: GREPTIME_TIMESTAMP.to_string(), + column_name: greptime_timestamp().to_string(), values: Some(column::Values { timestamp_millisecond_values: vec![self.ts_millis], ..Default::default() @@ -141,7 +141,7 @@ impl DataPoint { columns.push(ts_column); let field_column = Column { - column_name: GREPTIME_VALUE.to_string(), + column_name: greptime_value().to_string(), values: Some(column::Values { f64_values: vec![self.value], ..Default::default() @@ -267,7 +267,7 @@ mod test { assert_eq!(row_count, 1); assert_eq!(columns.len(), 4); - assert_eq!(columns[0].column_name, GREPTIME_TIMESTAMP); + assert_eq!(columns[0].column_name, greptime_timestamp()); assert_eq!( columns[0] .values @@ -277,7 +277,7 @@ mod test { vec![1000] ); - assert_eq!(columns[1].column_name, GREPTIME_VALUE); + assert_eq!(columns[1].column_name, greptime_value()); assert_eq!(columns[1].values.as_ref().unwrap().f64_values, vec![1.0]); assert_eq!(columns[2].column_name, "tagk1"); diff --git a/src/servers/src/otlp/metrics.rs b/src/servers/src/otlp/metrics.rs index 274a0ba41e..d89cd3f277 100644 --- a/src/servers/src/otlp/metrics.rs +++ b/src/servers/src/otlp/metrics.rs @@ -15,7 +15,7 @@ use ahash::{HashMap, HashSet}; use api::v1::{RowInsertRequests, Value}; use common_grpc::precision::Precision; -use common_query::prelude::{GREPTIME_COUNT, GREPTIME_TIMESTAMP, GREPTIME_VALUE}; +use common_query::prelude::{GREPTIME_COUNT, greptime_timestamp, greptime_value}; use lazy_static::lazy_static; use otel_arrow_rust::proto::opentelemetry::collector::metrics::v1::ExportMetricsServiceRequest; use otel_arrow_rust::proto::opentelemetry::common::v1::{AnyValue, KeyValue, any_value}; @@ -481,7 +481,7 @@ fn write_timestamp( if legacy_mode { row_writer::write_ts_to_nanos( table, - GREPTIME_TIMESTAMP, + greptime_timestamp(), Some(time_nano), Precision::Nanosecond, row, @@ -489,7 +489,7 @@ fn write_timestamp( } else { row_writer::write_ts_to_millis( table, - GREPTIME_TIMESTAMP, + greptime_timestamp(), Some(time_nano / 1000000), Precision::Millisecond, row, @@ -571,7 +571,7 @@ fn encode_gauge( metric_ctx, )?; - write_data_point_value(table, &mut row, GREPTIME_VALUE, &data_point.value)?; + write_data_point_value(table, &mut row, greptime_value(), &data_point.value)?; table.add_row(row); } @@ -606,7 +606,7 @@ fn encode_sum( data_point.time_unix_nano as i64, metric_ctx, )?; - write_data_point_value(table, &mut row, GREPTIME_VALUE, &data_point.value)?; + write_data_point_value(table, &mut row, greptime_value(), &data_point.value)?; table.add_row(row); } @@ -680,7 +680,7 @@ fn encode_histogram( accumulated_count += count; row_writer::write_f64( &mut bucket_table, - GREPTIME_VALUE, + greptime_value(), accumulated_count as f64, &mut bucket_row, )?; @@ -700,7 +700,7 @@ fn encode_histogram( metric_ctx, )?; - row_writer::write_f64(&mut sum_table, GREPTIME_VALUE, sum, &mut sum_row)?; + row_writer::write_f64(&mut sum_table, greptime_value(), sum, &mut sum_row)?; sum_table.add_row(sum_row); } @@ -717,7 +717,7 @@ fn encode_histogram( row_writer::write_f64( &mut count_table, - GREPTIME_VALUE, + greptime_value(), data_point.count as f64, &mut count_row, )?; @@ -807,7 +807,7 @@ fn encode_summary( row_writer::write_tag(quantile_table, "quantile", quantile.quantile, &mut row)?; row_writer::write_f64( quantile_table, - GREPTIME_VALUE, + greptime_value(), quantile.value, &mut row, )?; @@ -833,7 +833,7 @@ fn encode_summary( row_writer::write_f64( count_table, - GREPTIME_VALUE, + greptime_value(), data_point.count as f64, &mut row, )?; @@ -858,7 +858,7 @@ fn encode_summary( metric_ctx, )?; - row_writer::write_f64(sum_table, GREPTIME_VALUE, data_point.sum, &mut row)?; + row_writer::write_f64(sum_table, greptime_value(), data_point.sum, &mut row)?; sum_table.add_row(row); } @@ -1494,8 +1494,8 @@ mod tests { vec![ "otel_scope_scope", "host", - "greptime_timestamp", - "greptime_value" + greptime_timestamp(), + greptime_value() ] ); } @@ -1544,8 +1544,8 @@ mod tests { vec![ "otel_scope_scope", "host", - "greptime_timestamp", - "greptime_value" + greptime_timestamp(), + greptime_value() ] ); } @@ -1594,9 +1594,9 @@ mod tests { vec![ "otel_scope_scope", "host", - "greptime_timestamp", + greptime_timestamp(), "quantile", - "greptime_value" + greptime_value() ] ); @@ -1612,8 +1612,8 @@ mod tests { vec![ "otel_scope_scope", "host", - "greptime_timestamp", - "greptime_value" + greptime_timestamp(), + greptime_value() ] ); @@ -1629,8 +1629,8 @@ mod tests { vec![ "otel_scope_scope", "host", - "greptime_timestamp", - "greptime_value" + greptime_timestamp(), + greptime_value() ] ); } @@ -1681,9 +1681,9 @@ mod tests { vec![ "otel_scope_scope", "host", - "greptime_timestamp", + greptime_timestamp(), "le", - "greptime_value", + greptime_value(), ] ); @@ -1699,8 +1699,8 @@ mod tests { vec![ "otel_scope_scope", "host", - "greptime_timestamp", - "greptime_value" + greptime_timestamp(), + greptime_value() ] ); @@ -1716,8 +1716,8 @@ mod tests { vec![ "otel_scope_scope", "host", - "greptime_timestamp", - "greptime_value" + greptime_timestamp(), + greptime_value() ] ); } diff --git a/src/servers/src/prom_row_builder.rs b/src/servers/src/prom_row_builder.rs index a6591dbfe1..b17048a4dd 100644 --- a/src/servers/src/prom_row_builder.rs +++ b/src/servers/src/prom_row_builder.rs @@ -20,7 +20,7 @@ use api::prom_store::remote::Sample; use api::v1::helper::{field_column_schema, tag_column_schema, time_index_column_schema}; use api::v1::value::ValueData; use api::v1::{ColumnDataType, ColumnSchema, Row, RowInsertRequest, Rows, SemanticType, Value}; -use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE}; +use common_query::prelude::{greptime_timestamp, greptime_value}; use pipeline::{ContextOpt, ContextReq}; use prost::DecodeError; @@ -114,15 +114,18 @@ impl Default for TableBuilder { impl TableBuilder { pub(crate) fn with_capacity(cols: usize, rows: usize) -> Self { let mut col_indexes = HashMap::with_capacity_and_hasher(cols, Default::default()); - col_indexes.insert(GREPTIME_TIMESTAMP.to_string(), 0); - col_indexes.insert(GREPTIME_VALUE.to_string(), 1); + col_indexes.insert(greptime_timestamp().to_string(), 0); + col_indexes.insert(greptime_value().to_string(), 1); let mut schema = Vec::with_capacity(cols); schema.push(time_index_column_schema( - GREPTIME_TIMESTAMP, + greptime_timestamp(), ColumnDataType::TimestampMillisecond, )); - schema.push(field_column_schema(GREPTIME_VALUE, ColumnDataType::Float64)); + schema.push(field_column_schema( + greptime_value(), + ColumnDataType::Float64, + )); Self { schema, diff --git a/src/servers/src/prom_store.rs b/src/servers/src/prom_store.rs index 9738bbc8a0..81268d8663 100644 --- a/src/servers/src/prom_store.rs +++ b/src/servers/src/prom_store.rs @@ -22,7 +22,7 @@ use api::prom_store::remote::label_matcher::Type as MatcherType; use api::prom_store::remote::{Label, Query, ReadRequest, Sample, TimeSeries, WriteRequest}; use api::v1::RowInsertRequests; use common_grpc::precision::Precision; -use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE}; +use common_query::prelude::{greptime_timestamp, greptime_value}; use common_recordbatch::{RecordBatch, RecordBatches}; use common_telemetry::tracing; use common_time::timestamp::TimeUnit; @@ -111,8 +111,8 @@ pub fn query_to_plan(dataframe: DataFrame, q: &Query) -> Result { let mut conditions = Vec::with_capacity(label_matches.len() + 1); - conditions.push(col(GREPTIME_TIMESTAMP).gt_eq(lit_timestamp_millisecond(start_timestamp_ms))); - conditions.push(col(GREPTIME_TIMESTAMP).lt_eq(lit_timestamp_millisecond(end_timestamp_ms))); + conditions.push(col(greptime_timestamp()).gt_eq(lit_timestamp_millisecond(start_timestamp_ms))); + conditions.push(col(greptime_timestamp()).lt_eq(lit_timestamp_millisecond(end_timestamp_ms))); for m in label_matches { let name = &m.name; @@ -241,7 +241,8 @@ fn collect_timeseries_ids(table_name: &str, recordbatch: &RecordBatch) -> Vec Result> { - let ts_column = recordbatch.column_by_name(GREPTIME_TIMESTAMP).context( + let ts_column = recordbatch.column_by_name(greptime_timestamp()).context( error::InvalidPromRemoteReadQueryResultSnafu { msg: "missing greptime_timestamp column in query result", }, @@ -289,7 +290,7 @@ fn recordbatch_to_timeseries(table: &str, recordbatch: RecordBatch) -> Result Result<(RowInsertR // value row_writer::write_f64( table_data, - GREPTIME_VALUE, + greptime_value(), series.samples[0].value, &mut one_row, )?; // timestamp row_writer::write_ts_to_millis( table_data, - GREPTIME_TIMESTAMP, + greptime_timestamp(), Some(series.samples[0].timestamp), Precision::Millisecond, &mut one_row, @@ -403,11 +404,11 @@ pub fn to_grpc_row_insert_requests(request: &WriteRequest) -> Result<(RowInsertR let kvs = kvs.clone(); row_writer::write_tags(table_data, kvs, &mut one_row)?; // value - row_writer::write_f64(table_data, GREPTIME_VALUE, *value, &mut one_row)?; + row_writer::write_f64(table_data, greptime_value(), *value, &mut one_row)?; // timestamp row_writer::write_ts_to_millis( table_data, - GREPTIME_TIMESTAMP, + greptime_timestamp(), Some(*timestamp), Precision::Millisecond, &mut one_row, @@ -628,11 +629,11 @@ mod tests { let schema = Arc::new(Schema::new(vec![ ColumnSchema::new( - GREPTIME_TIMESTAMP, + greptime_timestamp(), ConcreteDataType::timestamp_millisecond_datatype(), true, ), - ColumnSchema::new(GREPTIME_VALUE, ConcreteDataType::float64_datatype(), true), + ColumnSchema::new(greptime_value(), ConcreteDataType::float64_datatype(), true), ColumnSchema::new("instance", ConcreteDataType::string_datatype(), true), ColumnSchema::new("job", ConcreteDataType::string_datatype(), true), ])); @@ -655,10 +656,12 @@ mod tests { let plan = query_to_plan(DataFrame::DataFusion(dataframe), &q).unwrap(); let display_string = format!("{}", plan.display_indent()); - assert_eq!( - "Filter: ?table?.greptime_timestamp >= TimestampMillisecond(1000, None) AND ?table?.greptime_timestamp <= TimestampMillisecond(2000, None)\n TableScan: ?table?", - display_string + let ts_col = greptime_timestamp(); + let expected = format!( + "Filter: ?table?.{} >= TimestampMillisecond(1000, None) AND ?table?.{} <= TimestampMillisecond(2000, None)\n TableScan: ?table?", + ts_col, ts_col ); + assert_eq!(expected, display_string); let q = Query { start_timestamp_ms: 1000, @@ -687,22 +690,24 @@ mod tests { let plan = query_to_plan(DataFrame::DataFusion(dataframe), &q).unwrap(); let display_string = format!("{}", plan.display_indent()); - assert_eq!( - "Filter: ?table?.greptime_timestamp >= TimestampMillisecond(1000, None) AND ?table?.greptime_timestamp <= TimestampMillisecond(2000, None) AND regexp_match(?table?.job, Utf8(\"*prom*\")) IS NOT NULL AND ?table?.instance != Utf8(\"localhost\")\n TableScan: ?table?", - display_string + let ts_col = greptime_timestamp(); + let expected = format!( + "Filter: ?table?.{} >= TimestampMillisecond(1000, None) AND ?table?.{} <= TimestampMillisecond(2000, None) AND regexp_match(?table?.job, Utf8(\"*prom*\")) IS NOT NULL AND ?table?.instance != Utf8(\"localhost\")\n TableScan: ?table?", + ts_col, ts_col ); + assert_eq!(expected, display_string); } fn column_schemas_with( mut kts_iter: Vec<(&str, ColumnDataType, SemanticType)>, ) -> Vec { kts_iter.push(( - "greptime_value", + greptime_value(), ColumnDataType::Float64, SemanticType::Field, )); kts_iter.push(( - "greptime_timestamp", + greptime_timestamp(), ColumnDataType::TimestampMillisecond, SemanticType::Timestamp, )); @@ -837,11 +842,11 @@ mod tests { fn test_recordbatches_to_timeseries() { let schema = Arc::new(Schema::new(vec![ ColumnSchema::new( - GREPTIME_TIMESTAMP, + greptime_timestamp(), ConcreteDataType::timestamp_millisecond_datatype(), true, ), - ColumnSchema::new(GREPTIME_VALUE, ConcreteDataType::float64_datatype(), true), + ColumnSchema::new(greptime_value(), ConcreteDataType::float64_datatype(), true), ColumnSchema::new("instance", ConcreteDataType::string_datatype(), true), ])); diff --git a/src/servers/src/proto.rs b/src/servers/src/proto.rs index 564943a152..1ef01c443b 100644 --- a/src/servers/src/proto.rs +++ b/src/servers/src/proto.rs @@ -19,7 +19,7 @@ use std::slice; use api::prom_store::remote::Sample; use bytes::{Buf, Bytes}; -use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE}; +use common_query::prelude::{greptime_timestamp, greptime_value}; use common_telemetry::warn; use pipeline::{ContextReq, GreptimePipelineParams, PipelineContext, PipelineDefinition}; use prost::DecodeError; @@ -407,10 +407,10 @@ impl PromSeriesProcessor { let timestamp = s.timestamp; pipeline_map.insert( - KeyString::from(GREPTIME_TIMESTAMP), + KeyString::from(greptime_timestamp()), VrlValue::Integer(timestamp), ); - pipeline_map.insert(KeyString::from(GREPTIME_VALUE), VrlValue::Float(value)); + pipeline_map.insert(KeyString::from(greptime_value()), VrlValue::Float(value)); if one_sample { vec_pipeline_map.push(VrlValue::Object(pipeline_map)); break; diff --git a/src/standalone/src/options.rs b/src/standalone/src/options.rs index 20dbcbb850..abbfcf64e2 100644 --- a/src/standalone/src/options.rs +++ b/src/standalone/src/options.rs @@ -37,6 +37,7 @@ use servers::http::HttpOptions; pub struct StandaloneOptions { pub enable_telemetry: bool, pub default_timezone: Option, + pub default_column_prefix: Option, pub http: HttpOptions, pub grpc: GrpcOptions, pub mysql: MysqlOptions, @@ -69,6 +70,7 @@ impl Default for StandaloneOptions { Self { enable_telemetry: true, default_timezone: None, + default_column_prefix: None, http: HttpOptions::default(), grpc: GrpcOptions::default(), mysql: MysqlOptions::default(),