feat: prefix option for timestamp index and value column (#7125)

* refactor: use GREPTIME_TIMESTAMP const

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* feat: add config for default ts col name

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* refactor: replace GREPTIME_TIMESTAMP with function get

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: update config doc

* fix: test

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: remove opts on flownode and metasrv

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: add validation for ts column name

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: use get_or_init to avoid test error

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: fmt

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: update docs

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: using empty string to disable prefix

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: update comment

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: address CR issues

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

---------

Signed-off-by: shuiyisong <xixing.sys@gmail.com>
This commit is contained in:
shuiyisong
2025-10-27 16:00:03 +08:00
committed by GitHub
parent 0a3961927d
commit a20ac4f9e5
52 changed files with 305 additions and 163 deletions

4
Cargo.lock generated
View File

@@ -2004,9 +2004,11 @@ dependencies = [
"common-macro",
"common-test-util",
"futures",
"lazy_static",
"paste",
"pin-project",
"rand 0.9.1",
"regex",
"serde",
"snafu 0.8.6",
"tokio",
@@ -2454,6 +2456,7 @@ dependencies = [
"datafusion-expr",
"datatypes",
"futures-util",
"once_cell",
"serde",
"snafu 0.8.6",
"sqlparser",
@@ -7579,6 +7582,7 @@ dependencies = [
"common-decimal",
"common-error",
"common-macro",
"common-query",
"common-recordbatch",
"common-telemetry",
"common-time",

View File

@@ -13,6 +13,7 @@
| Key | Type | Default | Descriptions |
| --- | -----| ------- | ----------- |
| `default_timezone` | String | Unset | The default timezone of the server. |
| `default_column_prefix` | String | Unset | The default column prefix for auto-created time index and value columns. |
| `init_regions_in_background` | Bool | `false` | Initialize all regions in the background during the startup.<br/>By default, it provides services after all regions have been initialized. |
| `init_regions_parallelism` | Integer | `16` | Parallelism of initializing regions. |
| `max_concurrent_queries` | Integer | `0` | The maximum current queries allowed to be executed. Zero means unlimited. |
@@ -226,6 +227,7 @@
| Key | Type | Default | Descriptions |
| --- | -----| ------- | ----------- |
| `default_timezone` | String | Unset | The default timezone of the server. |
| `default_column_prefix` | String | Unset | The default column prefix for auto-created time index and value columns. |
| `max_in_flight_write_bytes` | String | Unset | The maximum in-flight write bytes. |
| `runtime` | -- | -- | The runtime options. |
| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
@@ -440,6 +442,7 @@
| Key | Type | Default | Descriptions |
| --- | -----| ------- | ----------- |
| `node_id` | Integer | Unset | The datanode identifier and should be unique in the cluster. |
| `default_column_prefix` | String | Unset | The default column prefix for auto-created time index and value columns. |
| `require_lease_before_startup` | Bool | `false` | Start services after regions have obtained leases.<br/>It will block the datanode start if it can't receive leases in the heartbeat from metasrv. |
| `init_regions_in_background` | Bool | `false` | Initialize all regions in the background during the startup.<br/>By default, it provides services after all regions have been initialized. |
| `init_regions_parallelism` | Integer | `16` | Parallelism of initializing regions. |

View File

@@ -2,6 +2,10 @@
## @toml2docs:none-default
node_id = 42
## The default column prefix for auto-created time index and value columns.
## @toml2docs:none-default
default_column_prefix = "greptime"
## Start services after regions have obtained leases.
## It will block the datanode start if it can't receive leases in the heartbeat from metasrv.
require_lease_before_startup = false

View File

@@ -2,6 +2,10 @@
## @toml2docs:none-default
default_timezone = "UTC"
## The default column prefix for auto-created time index and value columns.
## @toml2docs:none-default
default_column_prefix = "greptime"
## The maximum in-flight write bytes.
## @toml2docs:none-default
#+ max_in_flight_write_bytes = "500MB"

View File

@@ -2,6 +2,10 @@
## @toml2docs:none-default
default_timezone = "UTC"
## The default column prefix for auto-created time index and value columns.
## @toml2docs:none-default
default_column_prefix = "greptime"
## Initialize all regions in the background during the startup.
## By default, it provides services after all regions have been initialized.
init_regions_in_background = false

View File

@@ -25,11 +25,13 @@ use clap::Parser;
use client::client_manager::NodeClients;
use common_base::Plugins;
use common_config::{Configurable, DEFAULT_DATA_HOME};
use common_error::ext::BoxedError;
use common_grpc::channel_manager::ChannelConfig;
use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_query::prelude::set_default_prefix;
use common_stat::ResourceStatImpl;
use common_telemetry::info;
use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
@@ -333,6 +335,9 @@ impl StartCommand {
.context(error::StartFrontendSnafu)?;
set_default_timezone(opts.default_timezone.as_deref()).context(error::InitTimezoneSnafu)?;
set_default_prefix(opts.default_column_prefix.as_deref())
.map_err(BoxedError::new)
.context(error::BuildCliSnafu)?;
let meta_client_options = opts
.meta_client

View File

@@ -41,6 +41,7 @@ use common_meta::region_registry::LeaderRegionRegistry;
use common_meta::sequence::SequenceBuilder;
use common_meta::wal_options_allocator::{WalOptionsAllocatorRef, build_wal_options_allocator};
use common_procedure::ProcedureManagerRef;
use common_query::prelude::set_default_prefix;
use common_telemetry::info;
use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
use common_time::timezone::set_default_timezone;
@@ -355,6 +356,10 @@ impl StartCommand {
let mut plugins = Plugins::new();
let plugin_opts = opts.plugins;
let mut opts = opts.component;
set_default_prefix(opts.default_column_prefix.as_deref())
.map_err(BoxedError::new)
.context(error::BuildCliSnafu)?;
opts.grpc.detect_server_addr();
let fe_opts = opts.frontend_options();
let dn_opts = opts.datanode_options();

View File

@@ -48,6 +48,7 @@ fn test_load_datanode_example_config() {
let expected = GreptimeOptions::<DatanodeOptions> {
component: DatanodeOptions {
node_id: Some(42),
default_column_prefix: Some("greptime".to_string()),
meta_client: Some(MetaClientOptions {
metasrv_addrs: vec!["127.0.0.1:3002".to_string()],
timeout: Duration::from_secs(3),
@@ -113,6 +114,7 @@ fn test_load_frontend_example_config() {
let expected = GreptimeOptions::<FrontendOptions> {
component: FrontendOptions {
default_timezone: Some("UTC".to_string()),
default_column_prefix: Some("greptime".to_string()),
meta_client: Some(MetaClientOptions {
metasrv_addrs: vec!["127.0.0.1:3002".to_string()],
timeout: Duration::from_secs(3),
@@ -273,6 +275,7 @@ fn test_load_standalone_example_config() {
let expected = GreptimeOptions::<StandaloneOptions> {
component: StandaloneOptions {
default_timezone: Some("UTC".to_string()),
default_column_prefix: Some("greptime".to_string()),
wal: DatanodeWalConfig::RaftEngine(RaftEngineConfig {
dir: Some(format!("{}/{}", DEFAULT_DATA_HOME, WAL_DIR)),
sync_period: Some(Duration::from_secs(10)),

View File

@@ -18,9 +18,11 @@ bytes.workspace = true
common-error.workspace = true
common-macro.workspace = true
futures.workspace = true
lazy_static.workspace = true
paste.workspace = true
pin-project.workspace = true
rand.workspace = true
regex.workspace = true
serde = { version = "1.0", features = ["derive"] }
snafu.workspace = true
tokio.workspace = true

View File

@@ -19,6 +19,7 @@ pub mod plugins;
pub mod range_read;
#[allow(clippy::all)]
pub mod readable_size;
pub mod regex_pattern;
pub mod secrets;
pub mod serde;

View File

@@ -0,0 +1,22 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use lazy_static::lazy_static;
use regex::Regex;
pub const NAME_PATTERN: &str = r"[a-zA-Z_:-][a-zA-Z0-9_:\-\.@#]*";
lazy_static! {
pub static ref NAME_PATTERN_REG: Regex = Regex::new(&format!("^{NAME_PATTERN}$")).unwrap();
}

View File

@@ -121,6 +121,7 @@ use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use bytes::Bytes;
use common_base::regex_pattern::NAME_PATTERN;
use common_catalog::consts::{
DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME,
};
@@ -164,7 +165,6 @@ use crate::rpc::router::{LeaderState, RegionRoute, region_distribution};
use crate::rpc::store::BatchDeleteRequest;
use crate::state_store::PoisonValue;
pub const NAME_PATTERN: &str = r"[a-zA-Z_:-][a-zA-Z0-9_:\-\.@#]*";
pub const TOPIC_NAME_PATTERN: &str = r"[a-zA-Z0-9_:-][a-zA-Z0-9_:\-\.@#]*";
pub const LEGACY_MAINTENANCE_KEY: &str = "__maintenance";
pub const MAINTENANCE_KEY: &str = "__switches/maintenance";
@@ -269,10 +269,6 @@ pub type FlowId = u32;
/// The partition of flow.
pub type FlowPartitionId = u32;
lazy_static! {
pub static ref NAME_PATTERN_REGEX: Regex = Regex::new(NAME_PATTERN).unwrap();
}
lazy_static! {
pub static ref TOPIC_NAME_PATTERN_REGEX: Regex = Regex::new(TOPIC_NAME_PATTERN).unwrap();
}

View File

@@ -14,6 +14,7 @@ workspace = true
api.workspace = true
async-trait.workspace = true
bytes.workspace = true
common-base.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-recordbatch.workspace = true
@@ -22,6 +23,7 @@ datafusion.workspace = true
datafusion-common.workspace = true
datafusion-expr.workspace = true
datatypes.workspace = true
once_cell.workspace = true
serde.workspace = true
snafu.workspace = true
sqlparser.workspace = true

View File

@@ -199,6 +199,9 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid character in prefix config: {}", prefix))]
InvalidColumnPrefix { prefix: String },
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -227,7 +230,8 @@ impl ErrorExt for Error {
Error::UnsupportedInputDataType { .. }
| Error::TypeCast { .. }
| Error::InvalidFuncArgs { .. } => StatusCode::InvalidArguments,
| Error::InvalidFuncArgs { .. }
| Error::InvalidColumnPrefix { .. } => StatusCode::InvalidArguments,
Error::ConvertDfRecordBatchStream { source, .. } => source.status_code(),

View File

@@ -12,15 +12,61 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_base::regex_pattern::NAME_PATTERN_REG;
pub use datafusion_common::ScalarValue;
use once_cell::sync::OnceCell;
use snafu::ensure;
pub use crate::columnar_value::ColumnarValue;
use crate::error::{InvalidColumnPrefixSnafu, Result};
/// Default timestamp column name for Prometheus metrics.
pub const GREPTIME_TIMESTAMP: &str = "greptime_timestamp";
/// Default value column name for Prometheus metrics.
pub const GREPTIME_VALUE: &str = "greptime_value";
/// Default counter column name for OTLP metrics.
/// Default time index column name.
static GREPTIME_TIMESTAMP_CELL: OnceCell<String> = OnceCell::new();
/// Default value column name.
static GREPTIME_VALUE_CELL: OnceCell<String> = OnceCell::new();
pub fn set_default_prefix(prefix: Option<&str>) -> Result<()> {
match prefix {
None => {
// use default greptime prefix
GREPTIME_TIMESTAMP_CELL.get_or_init(|| GREPTIME_TIMESTAMP.to_string());
GREPTIME_VALUE_CELL.get_or_init(|| GREPTIME_VALUE.to_string());
}
Some(s) if s.trim().is_empty() => {
// use "" to disable prefix
GREPTIME_TIMESTAMP_CELL.get_or_init(|| "timestamp".to_string());
GREPTIME_VALUE_CELL.get_or_init(|| "value".to_string());
}
Some(x) => {
ensure!(
NAME_PATTERN_REG.is_match(x),
InvalidColumnPrefixSnafu { prefix: x }
);
GREPTIME_TIMESTAMP_CELL.get_or_init(|| format!("{}_timestamp", x));
GREPTIME_VALUE_CELL.get_or_init(|| format!("{}_value", x));
}
}
Ok(())
}
/// Get the default timestamp column name.
/// Returns the configured value, or `greptime_timestamp` if not set.
pub fn greptime_timestamp() -> &'static str {
GREPTIME_TIMESTAMP_CELL.get_or_init(|| GREPTIME_TIMESTAMP.to_string())
}
/// Get the default value column name.
/// Returns the configured value, or `greptime_value` if not set.
pub fn greptime_value() -> &'static str {
GREPTIME_VALUE_CELL.get_or_init(|| GREPTIME_VALUE.to_string())
}
/// Default timestamp column name constant for backward compatibility.
const GREPTIME_TIMESTAMP: &str = "greptime_timestamp";
/// Default value column name constant for backward compatibility.
const GREPTIME_VALUE: &str = "greptime_value";
/// Default counter column name for OTLP metrics (legacy mode).
pub const GREPTIME_COUNT: &str = "greptime_count";
/// Default physical table name
pub const GREPTIME_PHYSICAL_TABLE: &str = "greptime_physical_table";

View File

@@ -66,6 +66,7 @@ impl Default for StorageConfig {
#[serde(default)]
pub struct DatanodeOptions {
pub node_id: Option<u64>,
pub default_column_prefix: Option<String>,
pub workload_types: Vec<DatanodeWorkloadType>,
pub require_lease_before_startup: bool,
pub init_regions_in_background: bool,
@@ -119,6 +120,7 @@ impl Default for DatanodeOptions {
fn default() -> Self {
Self {
node_id: None,
default_column_prefix: None,
workload_types: vec![DatanodeWorkloadType::Hybrid],
require_lease_before_startup: false,
init_regions_in_background: false,

View File

@@ -27,6 +27,7 @@ use common_meta::key::runtime_switch::RuntimeSwitchManager;
use common_meta::key::{SchemaMetadataManager, SchemaMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
pub use common_procedure::options::ProcedureConfig;
use common_query::prelude::set_default_prefix;
use common_stat::ResourceStatImpl;
use common_telemetry::{error, info, warn};
use common_wal::config::DatanodeWalConfig;
@@ -59,9 +60,9 @@ use tokio::sync::Notify;
use crate::config::{DatanodeOptions, RegionEngineConfig, StorageConfig};
use crate::error::{
self, BuildMetricEngineSnafu, BuildMitoEngineSnafu, CreateDirSnafu, GetMetadataSnafu,
MissingCacheSnafu, MissingNodeIdSnafu, OpenLogStoreSnafu, Result, ShutdownInstanceSnafu,
ShutdownServerSnafu, StartServerSnafu,
self, BuildDatanodeSnafu, BuildMetricEngineSnafu, BuildMitoEngineSnafu, CreateDirSnafu,
GetMetadataSnafu, MissingCacheSnafu, MissingNodeIdSnafu, OpenLogStoreSnafu, Result,
ShutdownInstanceSnafu, ShutdownServerSnafu, StartServerSnafu,
};
use crate::event_listener::{
NoopRegionServerEventListener, RegionServerEventListenerRef, RegionServerEventReceiver,
@@ -220,6 +221,9 @@ impl DatanodeBuilder {
pub async fn build(mut self) -> Result<Datanode> {
let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;
set_default_prefix(self.opts.default_column_prefix.as_deref())
.map_err(BoxedError::new)
.context(BuildDatanodeSnafu)?;
let meta_client = self.meta_client.take();

View File

@@ -165,6 +165,13 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to build datanode"))]
BuildDatanode {
#[snafu(implicit)]
location: Location,
source: BoxedError,
},
#[snafu(display("Failed to build http client"))]
BuildHttpClient {
#[snafu(implicit)]
@@ -429,7 +436,8 @@ impl ErrorExt for Error {
| MissingRequiredField { .. }
| RegionEngineNotFound { .. }
| ParseAddr { .. }
| TomlFormat { .. } => StatusCode::InvalidArguments,
| TomlFormat { .. }
| BuildDatanode { .. } => StatusCode::InvalidArguments,
PayloadNotExist { .. }
| Unexpected { .. }

View File

@@ -45,6 +45,7 @@ use crate::service_config::{
pub struct FrontendOptions {
pub node_id: Option<String>,
pub default_timezone: Option<String>,
pub default_column_prefix: Option<String>,
pub heartbeat: HeartbeatOptions,
pub http: HttpOptions,
pub grpc: GrpcOptions,
@@ -77,6 +78,7 @@ impl Default for FrontendOptions {
Self {
node_id: None,
default_timezone: None,
default_column_prefix: None,
heartbeat: HeartbeatOptions::frontend_default(),
http: HttpOptions::default(),
grpc: GrpcOptions::default(),

View File

@@ -77,6 +77,7 @@ struct PersistRegionStat<'a> {
sst_size: u64,
write_bytes_delta: u64,
#[col(
// This col name is for the information schema table, so we don't touch it
name = "greptime_timestamp",
semantic = "Timestamp",
datatype = "TimestampMillisecond"

View File

@@ -240,6 +240,7 @@ impl DataRegion {
#[cfg(test)]
mod test {
use common_query::prelude::{greptime_timestamp, greptime_value};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
@@ -300,8 +301,8 @@ mod test {
.map(|c| &c.column_schema.name)
.collect::<Vec<_>>();
let expected = vec![
"greptime_timestamp",
"greptime_value",
greptime_timestamp(),
greptime_value(),
"__table_id",
"__tsid",
"job",

View File

@@ -224,6 +224,7 @@ mod test {
use api::v1::SemanticType;
use common_meta::ddl::test_util::assert_column_name_and_id;
use common_meta::ddl::utils::{parse_column_metadatas, parse_manifest_infos_from_extensions};
use common_query::prelude::{greptime_timestamp, greptime_value};
use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{
@@ -295,7 +296,7 @@ mod test {
.unwrap();
assert_eq!(semantic_type, SemanticType::Tag);
let timestamp_index = metadata_region
.column_semantic_type(physical_region_id, logical_region_id, "greptime_timestamp")
.column_semantic_type(physical_region_id, logical_region_id, greptime_timestamp())
.await
.unwrap()
.unwrap();
@@ -305,8 +306,8 @@ mod test {
assert_column_name_and_id(
&column_metadatas,
&[
("greptime_timestamp", 0),
("greptime_value", 1),
(greptime_timestamp(), 0),
(greptime_value(), 1),
("__table_id", ReservedColumnId::table_id()),
("__tsid", ReservedColumnId::tsid()),
("job", 2),
@@ -364,8 +365,8 @@ mod test {
assert_column_name_and_id(
&column_metadatas,
&[
("greptime_timestamp", 0),
("greptime_value", 1),
(greptime_timestamp(), 0),
(greptime_value(), 1),
("__table_id", ReservedColumnId::table_id()),
("__tsid", ReservedColumnId::tsid()),
("job", 2),

View File

@@ -619,6 +619,7 @@ pub(crate) fn region_options_for_metadata_region(
mod test {
use common_meta::ddl::test_util::assert_column_name_and_id;
use common_meta::ddl::utils::{parse_column_metadatas, parse_manifest_infos_from_extensions};
use common_query::prelude::{greptime_timestamp, greptime_value};
use store_api::metric_engine_consts::{METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY};
use store_api::region_request::BatchRegionDdlRequest;
@@ -856,8 +857,8 @@ mod test {
assert_column_name_and_id(
&column_metadatas,
&[
("greptime_timestamp", 0),
("greptime_value", 1),
(greptime_timestamp(), 0),
(greptime_value(), 1),
("__table_id", ReservedColumnId::table_id()),
("__tsid", ReservedColumnId::tsid()),
("job", 2),

View File

@@ -110,6 +110,7 @@ mod tests {
use std::collections::HashMap;
use api::v1::SemanticType;
use common_query::prelude::greptime_timestamp;
use common_telemetry::info;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::ColumnSchema;
@@ -243,7 +244,7 @@ mod tests {
.unwrap();
assert_eq!(semantic_type, SemanticType::Tag);
let timestamp_index = metadata_region
.column_semantic_type(physical_region_id, logical_region_id, "greptime_timestamp")
.column_semantic_type(physical_region_id, logical_region_id, greptime_timestamp())
.await
.unwrap()
.unwrap();

View File

@@ -17,6 +17,7 @@
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, ColumnSchema as PbColumnSchema, Row, SemanticType, Value};
use common_meta::ddl::utils::parse_column_metadatas;
use common_query::prelude::{greptime_timestamp, greptime_value};
use common_telemetry::debug;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
@@ -132,7 +133,7 @@ impl TestEnv {
column_id: 0,
semantic_type: SemanticType::Timestamp,
column_schema: ColumnSchema::new(
"greptime_timestamp",
greptime_timestamp(),
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
@@ -141,7 +142,7 @@ impl TestEnv {
column_id: 1,
semantic_type: SemanticType::Field,
column_schema: ColumnSchema::new(
"greptime_value",
greptime_value(),
ConcreteDataType::float64_datatype(),
false,
),
@@ -204,8 +205,8 @@ impl TestEnv {
assert_eq!(
column_names,
vec![
"greptime_timestamp",
"greptime_value",
greptime_timestamp(),
greptime_value(),
"__table_id",
"__tsid",
"job",
@@ -300,7 +301,7 @@ pub fn create_logical_region_request(
column_id: 0,
semantic_type: SemanticType::Timestamp,
column_schema: ColumnSchema::new(
"greptime_timestamp",
greptime_timestamp(),
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
@@ -309,7 +310,7 @@ pub fn create_logical_region_request(
column_id: 1,
semantic_type: SemanticType::Field,
column_schema: ColumnSchema::new(
"greptime_value",
greptime_value(),
ConcreteDataType::float64_datatype(),
false,
),
@@ -372,14 +373,14 @@ pub fn alter_logical_region_request(tags: &[&str]) -> RegionAlterRequest {
pub fn row_schema_with_tags(tags: &[&str]) -> Vec<PbColumnSchema> {
let mut schema = vec![
PbColumnSchema {
column_name: "greptime_timestamp".to_string(),
column_name: greptime_timestamp().to_string(),
datatype: ColumnDataType::TimestampMillisecond as i32,
semantic_type: SemanticType::Timestamp as _,
datatype_extension: None,
options: None,
},
PbColumnSchema {
column_name: "greptime_value".to_string(),
column_name: greptime_value().to_string(),
datatype: ColumnDataType::Float64 as i32,
semantic_type: SemanticType::Field as _,
datatype_extension: None,

View File

@@ -15,6 +15,7 @@ common-base.workspace = true
common-decimal.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-query.workspace = true
common-recordbatch.workspace = true
common-telemetry.workspace = true
common-time.workspace = true

View File

@@ -154,6 +154,7 @@ mod tests {
use std::sync::Arc;
use api::v1::SemanticType;
use common_query::prelude::{greptime_timestamp, greptime_value};
use datafusion_common::Column;
use datafusion_expr::{BinaryExpr, Expr, Literal, Operator};
use datatypes::prelude::ConcreteDataType;
@@ -193,7 +194,7 @@ mod tests {
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"greptime_value",
greptime_value(),
ConcreteDataType::float64_datatype(),
false,
),
@@ -202,7 +203,7 @@ mod tests {
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"greptime_timestamp",
greptime_timestamp(),
ConcreteDataType::timestamp_nanosecond_datatype(),
false,
),

View File

@@ -385,6 +385,7 @@ mod tests {
use std::sync::Arc;
use api::v1::SemanticType;
use common_query::prelude::{greptime_timestamp, greptime_value};
use common_time::Timestamp;
use common_time::timestamp::TimeUnit;
use datatypes::schema::ColumnSchema;
@@ -461,7 +462,7 @@ mod tests {
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"greptime_value",
greptime_value(),
ConcreteDataType::float64_datatype(),
false,
),
@@ -470,7 +471,7 @@ mod tests {
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"greptime_timestamp",
greptime_timestamp(),
ConcreteDataType::timestamp_nanosecond_datatype(),
false,
),

View File

@@ -384,6 +384,7 @@ mod tests {
use api::v1::helper::{field_column_schema, row, tag_column_schema, time_index_column_schema};
use api::v1::value::ValueData;
use api::v1::{Mutation, OpType, Rows, SemanticType};
use common_query::prelude::{greptime_timestamp, greptime_value};
use common_time::Timestamp;
use datafusion_common::Column;
use datafusion_expr::{BinaryExpr, Expr, Literal, Operator};
@@ -694,7 +695,7 @@ mod tests {
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"greptime_timestamp",
greptime_timestamp(),
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
@@ -703,7 +704,7 @@ mod tests {
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"greptime_value",
greptime_value(),
ConcreteDataType::float64_datatype(),
true,
),

View File

@@ -37,7 +37,7 @@ use common_meta::cache::TableFlownodeSetCacheRef;
use common_meta::node_manager::{AffectedRows, NodeManagerRef};
use common_meta::peer::Peer;
use common_query::Output;
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use common_query::prelude::{greptime_timestamp, greptime_value};
use common_telemetry::tracing_context::TracingContext;
use common_telemetry::{error, info, warn};
use datatypes::schema::SkippingIndexOptions;
@@ -721,14 +721,14 @@ impl Inserter {
// schema with timestamp and field column
let default_schema = vec![
ColumnSchema {
column_name: GREPTIME_TIMESTAMP.to_string(),
column_name: greptime_timestamp().to_string(),
datatype: ColumnDataType::TimestampMillisecond as _,
semantic_type: SemanticType::Timestamp as _,
datatype_extension: None,
options: None,
},
ColumnSchema {
column_name: GREPTIME_VALUE.to_string(),
column_name: greptime_value().to_string(),
datatype: ColumnDataType::Float64 as _,
semantic_type: SemanticType::Field as _,
datatype_extension: None,

View File

@@ -26,13 +26,13 @@ use api::v1::{
};
use catalog::CatalogManagerRef;
use chrono::Utc;
use common_base::regex_pattern::NAME_PATTERN_REG;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, is_readonly_schema};
use common_catalog::{format_full_flow_name, format_full_table_name};
use common_error::ext::BoxedError;
use common_meta::cache_invalidator::Context;
use common_meta::ddl::create_flow::FlowType;
use common_meta::instruction::CacheIdent;
use common_meta::key::NAME_PATTERN;
use common_meta::key::schema_name::{SchemaName, SchemaNameKey};
use common_meta::procedure_executor::ExecutorContext;
#[cfg(feature = "enterprise")]
@@ -52,14 +52,12 @@ use datafusion_expr::LogicalPlan;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{RawSchema, Schema};
use datatypes::value::Value;
use lazy_static::lazy_static;
use partition::expr::{Operand, PartitionExpr, RestrictedOp};
use partition::multi_dim::MultiDimPartitionRule;
use query::parser::QueryStatement;
use query::plan::extract_and_rewrite_full_table_names;
use query::query_engine::DefaultSerializer;
use query::sql::create_table_stmt;
use regex::Regex;
use session::context::QueryContextRef;
use session::table_name::table_idents_to_full_name;
use snafu::{OptionExt, ResultExt, ensure};
@@ -96,10 +94,6 @@ use crate::expr_helper;
use crate::statement::StatementExecutor;
use crate::statement::show::create_partitions_stmt;
lazy_static! {
pub static ref NAME_PATTERN_REG: Regex = Regex::new(&format!("^{NAME_PATTERN}$")).unwrap();
}
impl StatementExecutor {
pub fn catalog_manager(&self) -> CatalogManagerRef {
self.catalog_manager.clone()

View File

@@ -24,7 +24,7 @@ use api::v1::column_data_type_extension::TypeExt;
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, ColumnDataTypeExtension, JsonTypeExtension, SemanticType};
use coerce::{coerce_columns, coerce_value};
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use common_query::prelude::{greptime_timestamp, greptime_value};
use common_telemetry::warn;
use greptime_proto::v1::{ColumnSchema, Row, Rows, Value as GreptimeValue};
use itertools::Itertools;
@@ -48,7 +48,6 @@ use crate::etl::transform::index::Index;
use crate::etl::transform::{Transform, Transforms};
use crate::{PipelineContext, truthy, unwrap_or_continue_if_err};
const DEFAULT_GREPTIME_TIMESTAMP_COLUMN: &str = "greptime_timestamp";
const DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING: usize = 10;
/// fields not in the columns will be discarded
@@ -138,10 +137,7 @@ impl GreptimeTransformer {
let default = None;
let transform = Transform {
fields: Fields::one(Field::new(
DEFAULT_GREPTIME_TIMESTAMP_COLUMN.to_string(),
None,
)),
fields: Fields::one(Field::new(greptime_timestamp().to_string(), None)),
type_,
default,
index: Some(Index::Time),
@@ -347,7 +343,7 @@ fn calc_ts(p_ctx: &PipelineContext, values: &VrlValue) -> Result<Option<ValueDat
Channel::Prometheus => {
let ts = values
.as_object()
.and_then(|m| m.get(GREPTIME_TIMESTAMP))
.and_then(|m| m.get(greptime_timestamp()))
.and_then(|ts| ts.try_into_i64().ok())
.unwrap_or_default();
Ok(Some(ValueData::TimestampMillisecondValue(ts)))
@@ -395,7 +391,7 @@ pub(crate) fn values_to_row(
// skip ts column
let ts_column_name = custom_ts
.as_ref()
.map_or(DEFAULT_GREPTIME_TIMESTAMP_COLUMN, |ts| ts.get_column_name());
.map_or(greptime_timestamp(), |ts| ts.get_column_name());
let values = values.into_object().context(ValueMustBeMapSnafu)?;
@@ -416,7 +412,7 @@ pub(crate) fn values_to_row(
}
fn decide_semantic(p_ctx: &PipelineContext, column_name: &str) -> i32 {
if p_ctx.channel == Channel::Prometheus && column_name != GREPTIME_VALUE {
if p_ctx.channel == Channel::Prometheus && column_name != greptime_value() {
SemanticType::Tag as i32
} else {
SemanticType::Field as i32
@@ -563,7 +559,7 @@ fn identity_pipeline_inner(
schema_info.schema.push(ColumnSchema {
column_name: custom_ts
.map(|ts| ts.get_column_name().to_string())
.unwrap_or_else(|| DEFAULT_GREPTIME_TIMESTAMP_COLUMN.to_string()),
.unwrap_or_else(|| greptime_timestamp().to_string()),
datatype: custom_ts.map(|c| c.get_datatype()).unwrap_or_else(|| {
if pipeline_ctx.channel == Channel::Prometheus {
ColumnDataType::TimestampMillisecond

View File

@@ -15,6 +15,7 @@
mod common;
use api::v1::ColumnSchema;
use common_query::prelude::greptime_timestamp;
use greptime_proto::v1::value::ValueData;
use greptime_proto::v1::{ColumnDataType, SemanticType};
use lazy_static::lazy_static;
@@ -35,7 +36,7 @@ lazy_static! {
SemanticType::Field,
),
common::make_column_schema(
"greptime_timestamp".to_string(),
greptime_timestamp().to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),

View File

@@ -14,6 +14,7 @@
mod common;
use common_query::prelude::greptime_timestamp;
use greptime_proto::v1::value::ValueData::StringValue;
use greptime_proto::v1::{ColumnDataType, SemanticType};
use pipeline::{PipelineContext, setup_pipeline};
@@ -51,7 +52,7 @@ transform:
make_string_column_schema("a".to_string()),
make_string_column_schema("b".to_string()),
common::make_column_schema(
"greptime_timestamp".to_string(),
greptime_timestamp().to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
@@ -99,7 +100,7 @@ transform:
make_string_column_schema("a".to_string()),
make_string_column_schema("b".to_string()),
common::make_column_schema(
"greptime_timestamp".to_string(),
greptime_timestamp().to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
@@ -142,7 +143,7 @@ transform:
make_string_column_schema("a".to_string()),
make_string_column_schema("b".to_string()),
common::make_column_schema(
"greptime_timestamp".to_string(),
greptime_timestamp().to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
@@ -185,7 +186,7 @@ transform:
make_string_column_schema("key3".to_string()),
make_string_column_schema("key5".to_string()),
common::make_column_schema(
"greptime_timestamp".to_string(),
greptime_timestamp().to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
@@ -236,7 +237,7 @@ transform:
let expected_schema = vec![
make_string_column_schema("key1".to_string()),
common::make_column_schema(
"greptime_timestamp".to_string(),
greptime_timestamp().to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),

View File

@@ -15,6 +15,7 @@
mod common;
use api::v1::ColumnSchema;
use common_query::prelude::greptime_timestamp;
use greptime_proto::v1::value::ValueData;
use greptime_proto::v1::{ColumnDataType, SemanticType};
@@ -128,7 +129,7 @@ transform:
make_time_field("input_nanosecond", ColumnDataType::TimestampNanosecond),
make_time_field("input_nano", ColumnDataType::TimestampNanosecond),
common::make_column_schema(
"greptime_timestamp".to_string(),
greptime_timestamp().to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
@@ -187,7 +188,7 @@ transform:
SemanticType::Field,
),
common::make_column_schema(
"greptime_timestamp".to_string(),
greptime_timestamp().to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
@@ -238,7 +239,7 @@ transform:
SemanticType::Field,
),
common::make_column_schema(
"greptime_timestamp".to_string(),
greptime_timestamp().to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_query::prelude::greptime_timestamp;
use greptime_proto::v1::value::ValueData::StringValue;
use greptime_proto::v1::{ColumnDataType, ColumnSchema, SemanticType};
use lazy_static::lazy_static;
@@ -38,7 +39,7 @@ lazy_static! {
SemanticType::Field,
),
common::make_column_schema(
"greptime_timestamp".to_string(),
greptime_timestamp().to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),

View File

@@ -15,6 +15,7 @@
mod common;
use api::v1::ColumnSchema;
use common_query::prelude::greptime_timestamp;
use greptime_proto::v1::value::ValueData;
use greptime_proto::v1::{ColumnDataType, SemanticType};
use lazy_static::lazy_static;
@@ -27,7 +28,7 @@ lazy_static! {
SemanticType::Field,
),
common::make_column_schema(
"greptime_timestamp".to_string(),
greptime_timestamp().to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
@@ -125,7 +126,7 @@ transform:
SemanticType::Field,
),
common::make_column_schema(
"greptime_timestamp".to_string(),
greptime_timestamp().to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
@@ -175,7 +176,7 @@ transform:
SemanticType::Field,
),
common::make_column_schema(
"greptime_timestamp".to_string(),
greptime_timestamp().to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_query::prelude::greptime_timestamp;
use greptime_proto::v1::value::ValueData::{U8Value, U16Value};
use greptime_proto::v1::{ColumnDataType, SemanticType};
@@ -46,7 +47,7 @@ transform:
SemanticType::Field,
),
common::make_column_schema(
"greptime_timestamp".to_string(),
greptime_timestamp().to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
@@ -87,7 +88,7 @@ transform:
SemanticType::Field,
),
common::make_column_schema(
"greptime_timestamp".to_string(),
greptime_timestamp().to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
@@ -123,7 +124,7 @@ transform:
SemanticType::Field,
),
common::make_column_schema(
"greptime_timestamp".to_string(),
greptime_timestamp().to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
@@ -175,7 +176,7 @@ transform:
SemanticType::Field,
),
common::make_column_schema(
"greptime_timestamp".to_string(),
greptime_timestamp().to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),

View File

@@ -15,6 +15,7 @@
mod common;
use api::v1::ColumnSchema;
use common_query::prelude::greptime_timestamp;
use greptime_proto::v1::value::ValueData::StringValue;
use greptime_proto::v1::{ColumnDataType, SemanticType};
use lazy_static::lazy_static;
@@ -27,7 +28,7 @@ lazy_static! {
SemanticType::Field,
),
common::make_column_schema(
"greptime_timestamp".to_string(),
greptime_timestamp().to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
@@ -156,7 +157,7 @@ transform:
SemanticType::Field,
),
common::make_column_schema(
"greptime_timestamp".to_string(),
greptime_timestamp().to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),

View File

@@ -16,6 +16,7 @@ mod common;
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, ColumnSchema, SemanticType};
use common_query::prelude::greptime_timestamp;
use lazy_static::lazy_static;
lazy_static! {
@@ -26,7 +27,7 @@ lazy_static! {
SemanticType::Field,
),
common::make_column_schema(
"greptime_timestamp".to_string(),
greptime_timestamp().to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),

View File

@@ -14,6 +14,7 @@
mod common;
use common_query::prelude::greptime_timestamp;
use greptime_proto::v1::value::ValueData;
use greptime_proto::v1::{ColumnDataType, SemanticType};
@@ -54,7 +55,7 @@ transform:
SemanticType::Field,
),
common::make_column_schema(
"greptime_timestamp".to_string(),
greptime_timestamp().to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
@@ -100,7 +101,7 @@ transform:
SemanticType::Field,
),
common::make_column_schema(
"greptime_timestamp".to_string(),
greptime_timestamp().to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),

View File

@@ -22,7 +22,7 @@ use catalog::table_source::DfTableSourceProvider;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_function::function::FunctionContext;
use common_query::prelude::GREPTIME_VALUE;
use common_query::prelude::greptime_value;
use datafusion::common::DFSchemaRef;
use datafusion::datasource::DefaultTableSource;
use datafusion::functions_aggregate::average::avg_udaf;
@@ -2576,7 +2576,7 @@ impl PromPlanner {
self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
self.ctx.reset_table_name_and_schema();
self.ctx.tag_columns = vec![];
self.ctx.field_columns = vec![GREPTIME_VALUE.to_string()];
self.ctx.field_columns = vec![greptime_value().to_string()];
Ok(LogicalPlan::Extension(Extension {
node: Arc::new(
EmptyMetric::new(
@@ -2584,7 +2584,7 @@ impl PromPlanner {
self.ctx.end,
self.ctx.interval,
SPECIAL_TIME_FUNCTION.to_string(),
GREPTIME_VALUE.to_string(),
greptime_value().to_string(),
Some(lit),
)
.context(DataFusionPlanningSnafu)?,
@@ -3433,6 +3433,7 @@ mod test {
use catalog::memory::{MemoryCatalogManager, new_memory_catalog_manager};
use common_base::Plugins;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_query::prelude::greptime_timestamp;
use common_query::test_util::DummyDecoder;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema};
@@ -3543,14 +3544,14 @@ mod test {
}
columns.push(
ColumnSchema::new(
"greptime_timestamp".to_string(),
greptime_timestamp().to_string(),
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_time_index(true),
);
columns.push(ColumnSchema::new(
"greptime_value".to_string(),
greptime_value().to_string(),
ConcreteDataType::float64_datatype(),
true,
));

View File

@@ -34,7 +34,7 @@ use common_datasource::util::find_dir_and_filename;
use common_meta::SchemaOptions;
use common_meta::key::flow::flow_info::FlowInfoValue;
use common_query::Output;
use common_query::prelude::GREPTIME_TIMESTAMP;
use common_query::prelude::greptime_timestamp;
use common_recordbatch::RecordBatches;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_time::Timestamp;
@@ -1195,14 +1195,14 @@ pub fn file_column_schemas_to_table(
let timestamp_type = ConcreteDataType::timestamp_millisecond_datatype();
let default_zero = Value::Timestamp(Timestamp::new_millisecond(0));
let timestamp_column_schema = ColumnSchema::new(GREPTIME_TIMESTAMP, timestamp_type, false)
let timestamp_column_schema = ColumnSchema::new(greptime_timestamp(), timestamp_type, false)
.with_time_index(true)
.with_default_constraint(Some(ColumnDefaultConstraint::Value(default_zero)))
.unwrap();
if let Some(column_schema) = column_schemas
.iter_mut()
.find(|column_schema| column_schema.name == GREPTIME_TIMESTAMP)
.find(|column_schema| column_schema.name == greptime_timestamp())
{
// Replace the column schema with the default one
*column_schema = timestamp_column_schema;
@@ -1210,7 +1210,7 @@ pub fn file_column_schemas_to_table(
column_schemas.push(timestamp_column_schema);
}
(column_schemas, GREPTIME_TIMESTAMP.to_string())
(column_schemas, greptime_timestamp().to_string())
}
/// This function checks if the column schemas from a file can be matched with

View File

@@ -26,7 +26,7 @@ use axum::extract::State;
use axum_extra::TypedHeader;
use bytes::Bytes;
use chrono::DateTime;
use common_query::prelude::GREPTIME_TIMESTAMP;
use common_query::prelude::greptime_timestamp;
use common_query::{Output, OutputData};
use common_telemetry::{error, warn};
use headers::ContentType;
@@ -73,7 +73,7 @@ const LINES_KEY: &str = "values";
lazy_static! {
static ref LOKI_INIT_SCHEMAS: Vec<ColumnSchema> = vec![
ColumnSchema {
column_name: GREPTIME_TIMESTAMP.to_string(),
column_name: greptime_timestamp().to_string(),
datatype: ColumnDataType::TimestampNanosecond.into(),
semantic_type: SemanticType::Timestamp.into(),
datatype_extension: None,
@@ -453,7 +453,7 @@ impl From<LokiMiddleItem<VrlValue>> for LokiPipeline {
let mut map = BTreeMap::new();
map.insert(
KeyString::from(GREPTIME_TIMESTAMP),
KeyString::from(greptime_timestamp()),
VrlValue::Timestamp(DateTime::from_timestamp_nanos(ts)),
);
map.insert(
@@ -586,7 +586,7 @@ impl From<LokiMiddleItem<Vec<LabelPairAdapter>>> for LokiPipeline {
let mut map = BTreeMap::new();
map.insert(
KeyString::from(GREPTIME_TIMESTAMP),
KeyString::from(greptime_timestamp()),
VrlValue::Timestamp(DateTime::from_timestamp_nanos(ts)),
);
map.insert(

View File

@@ -15,7 +15,7 @@
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, RowInsertRequests};
use common_grpc::precision::Precision;
use common_query::prelude::GREPTIME_TIMESTAMP;
use common_query::prelude::greptime_timestamp;
use hyper::Request;
use influxdb_line_protocol::{FieldValue, parse_lines};
use snafu::ResultExt;
@@ -91,7 +91,7 @@ impl TryFrom<InfluxdbRequest> for RowInsertRequests {
// timestamp
row_writer::write_ts_to_nanos(
table_data,
GREPTIME_TIMESTAMP,
greptime_timestamp(),
ts,
precision,
&mut one_row,
@@ -117,6 +117,7 @@ fn unwrap_or_default_precision(precision: Option<Precision>) -> Precision {
mod tests {
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, RowInsertRequests, Rows, SemanticType};
use common_query::prelude::greptime_timestamp;
use crate::influxdb::InfluxdbRequest;
@@ -193,7 +194,7 @@ monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003";
}
}
}
"greptime_timestamp" => {
_ if column_schema.column_name == greptime_timestamp() => {
assert_eq!(
ColumnDataType::TimestampNanosecond as i32,
column_schema.datatype
@@ -268,7 +269,7 @@ monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003";
}
}
}
"greptime_timestamp" => {
_ if column_schema.column_name == greptime_timestamp() => {
assert_eq!(
ColumnDataType::TimestampNanosecond as i32,
column_schema.datatype

View File

@@ -16,7 +16,7 @@ pub mod codec;
use api::v1::RowInsertRequests;
use common_grpc::precision::Precision;
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use common_query::prelude::{greptime_timestamp, greptime_value};
use self::codec::DataPoint;
use crate::error::Result;
@@ -42,11 +42,11 @@ pub fn data_point_to_grpc_row_insert_requests(
row_writer::write_tags(table_data, tags.into_iter(), &mut one_row)?;
// value
row_writer::write_f64(table_data, GREPTIME_VALUE, value, &mut one_row)?;
row_writer::write_f64(table_data, greptime_value(), value, &mut one_row)?;
// timestamp
row_writer::write_ts_to_millis(
table_data,
GREPTIME_TIMESTAMP,
greptime_timestamp(),
Some(timestamp),
Precision::Millisecond,
&mut one_row,

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use api::v1::{Column, ColumnDataType, InsertRequest as GrpcInsertRequest, SemanticType, column};
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use common_query::prelude::{greptime_timestamp, greptime_value};
use crate::error::{self, Result};
@@ -129,7 +129,7 @@ impl DataPoint {
let mut columns = Vec::with_capacity(2 + self.tags.len());
let ts_column = Column {
column_name: GREPTIME_TIMESTAMP.to_string(),
column_name: greptime_timestamp().to_string(),
values: Some(column::Values {
timestamp_millisecond_values: vec![self.ts_millis],
..Default::default()
@@ -141,7 +141,7 @@ impl DataPoint {
columns.push(ts_column);
let field_column = Column {
column_name: GREPTIME_VALUE.to_string(),
column_name: greptime_value().to_string(),
values: Some(column::Values {
f64_values: vec![self.value],
..Default::default()
@@ -267,7 +267,7 @@ mod test {
assert_eq!(row_count, 1);
assert_eq!(columns.len(), 4);
assert_eq!(columns[0].column_name, GREPTIME_TIMESTAMP);
assert_eq!(columns[0].column_name, greptime_timestamp());
assert_eq!(
columns[0]
.values
@@ -277,7 +277,7 @@ mod test {
vec![1000]
);
assert_eq!(columns[1].column_name, GREPTIME_VALUE);
assert_eq!(columns[1].column_name, greptime_value());
assert_eq!(columns[1].values.as_ref().unwrap().f64_values, vec![1.0]);
assert_eq!(columns[2].column_name, "tagk1");

View File

@@ -15,7 +15,7 @@
use ahash::{HashMap, HashSet};
use api::v1::{RowInsertRequests, Value};
use common_grpc::precision::Precision;
use common_query::prelude::{GREPTIME_COUNT, GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use common_query::prelude::{GREPTIME_COUNT, greptime_timestamp, greptime_value};
use lazy_static::lazy_static;
use otel_arrow_rust::proto::opentelemetry::collector::metrics::v1::ExportMetricsServiceRequest;
use otel_arrow_rust::proto::opentelemetry::common::v1::{AnyValue, KeyValue, any_value};
@@ -481,7 +481,7 @@ fn write_timestamp(
if legacy_mode {
row_writer::write_ts_to_nanos(
table,
GREPTIME_TIMESTAMP,
greptime_timestamp(),
Some(time_nano),
Precision::Nanosecond,
row,
@@ -489,7 +489,7 @@ fn write_timestamp(
} else {
row_writer::write_ts_to_millis(
table,
GREPTIME_TIMESTAMP,
greptime_timestamp(),
Some(time_nano / 1000000),
Precision::Millisecond,
row,
@@ -571,7 +571,7 @@ fn encode_gauge(
metric_ctx,
)?;
write_data_point_value(table, &mut row, GREPTIME_VALUE, &data_point.value)?;
write_data_point_value(table, &mut row, greptime_value(), &data_point.value)?;
table.add_row(row);
}
@@ -606,7 +606,7 @@ fn encode_sum(
data_point.time_unix_nano as i64,
metric_ctx,
)?;
write_data_point_value(table, &mut row, GREPTIME_VALUE, &data_point.value)?;
write_data_point_value(table, &mut row, greptime_value(), &data_point.value)?;
table.add_row(row);
}
@@ -680,7 +680,7 @@ fn encode_histogram(
accumulated_count += count;
row_writer::write_f64(
&mut bucket_table,
GREPTIME_VALUE,
greptime_value(),
accumulated_count as f64,
&mut bucket_row,
)?;
@@ -700,7 +700,7 @@ fn encode_histogram(
metric_ctx,
)?;
row_writer::write_f64(&mut sum_table, GREPTIME_VALUE, sum, &mut sum_row)?;
row_writer::write_f64(&mut sum_table, greptime_value(), sum, &mut sum_row)?;
sum_table.add_row(sum_row);
}
@@ -717,7 +717,7 @@ fn encode_histogram(
row_writer::write_f64(
&mut count_table,
GREPTIME_VALUE,
greptime_value(),
data_point.count as f64,
&mut count_row,
)?;
@@ -807,7 +807,7 @@ fn encode_summary(
row_writer::write_tag(quantile_table, "quantile", quantile.quantile, &mut row)?;
row_writer::write_f64(
quantile_table,
GREPTIME_VALUE,
greptime_value(),
quantile.value,
&mut row,
)?;
@@ -833,7 +833,7 @@ fn encode_summary(
row_writer::write_f64(
count_table,
GREPTIME_VALUE,
greptime_value(),
data_point.count as f64,
&mut row,
)?;
@@ -858,7 +858,7 @@ fn encode_summary(
metric_ctx,
)?;
row_writer::write_f64(sum_table, GREPTIME_VALUE, data_point.sum, &mut row)?;
row_writer::write_f64(sum_table, greptime_value(), data_point.sum, &mut row)?;
sum_table.add_row(row);
}
@@ -1494,8 +1494,8 @@ mod tests {
vec![
"otel_scope_scope",
"host",
"greptime_timestamp",
"greptime_value"
greptime_timestamp(),
greptime_value()
]
);
}
@@ -1544,8 +1544,8 @@ mod tests {
vec![
"otel_scope_scope",
"host",
"greptime_timestamp",
"greptime_value"
greptime_timestamp(),
greptime_value()
]
);
}
@@ -1594,9 +1594,9 @@ mod tests {
vec![
"otel_scope_scope",
"host",
"greptime_timestamp",
greptime_timestamp(),
"quantile",
"greptime_value"
greptime_value()
]
);
@@ -1612,8 +1612,8 @@ mod tests {
vec![
"otel_scope_scope",
"host",
"greptime_timestamp",
"greptime_value"
greptime_timestamp(),
greptime_value()
]
);
@@ -1629,8 +1629,8 @@ mod tests {
vec![
"otel_scope_scope",
"host",
"greptime_timestamp",
"greptime_value"
greptime_timestamp(),
greptime_value()
]
);
}
@@ -1681,9 +1681,9 @@ mod tests {
vec![
"otel_scope_scope",
"host",
"greptime_timestamp",
greptime_timestamp(),
"le",
"greptime_value",
greptime_value(),
]
);
@@ -1699,8 +1699,8 @@ mod tests {
vec![
"otel_scope_scope",
"host",
"greptime_timestamp",
"greptime_value"
greptime_timestamp(),
greptime_value()
]
);
@@ -1716,8 +1716,8 @@ mod tests {
vec![
"otel_scope_scope",
"host",
"greptime_timestamp",
"greptime_value"
greptime_timestamp(),
greptime_value()
]
);
}

View File

@@ -20,7 +20,7 @@ use api::prom_store::remote::Sample;
use api::v1::helper::{field_column_schema, tag_column_schema, time_index_column_schema};
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, ColumnSchema, Row, RowInsertRequest, Rows, SemanticType, Value};
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use common_query::prelude::{greptime_timestamp, greptime_value};
use pipeline::{ContextOpt, ContextReq};
use prost::DecodeError;
@@ -114,15 +114,18 @@ impl Default for TableBuilder {
impl TableBuilder {
pub(crate) fn with_capacity(cols: usize, rows: usize) -> Self {
let mut col_indexes = HashMap::with_capacity_and_hasher(cols, Default::default());
col_indexes.insert(GREPTIME_TIMESTAMP.to_string(), 0);
col_indexes.insert(GREPTIME_VALUE.to_string(), 1);
col_indexes.insert(greptime_timestamp().to_string(), 0);
col_indexes.insert(greptime_value().to_string(), 1);
let mut schema = Vec::with_capacity(cols);
schema.push(time_index_column_schema(
GREPTIME_TIMESTAMP,
greptime_timestamp(),
ColumnDataType::TimestampMillisecond,
));
schema.push(field_column_schema(GREPTIME_VALUE, ColumnDataType::Float64));
schema.push(field_column_schema(
greptime_value(),
ColumnDataType::Float64,
));
Self {
schema,

View File

@@ -22,7 +22,7 @@ use api::prom_store::remote::label_matcher::Type as MatcherType;
use api::prom_store::remote::{Label, Query, ReadRequest, Sample, TimeSeries, WriteRequest};
use api::v1::RowInsertRequests;
use common_grpc::precision::Precision;
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use common_query::prelude::{greptime_timestamp, greptime_value};
use common_recordbatch::{RecordBatch, RecordBatches};
use common_telemetry::tracing;
use common_time::timestamp::TimeUnit;
@@ -111,8 +111,8 @@ pub fn query_to_plan(dataframe: DataFrame, q: &Query) -> Result<LogicalPlan> {
let mut conditions = Vec::with_capacity(label_matches.len() + 1);
conditions.push(col(GREPTIME_TIMESTAMP).gt_eq(lit_timestamp_millisecond(start_timestamp_ms)));
conditions.push(col(GREPTIME_TIMESTAMP).lt_eq(lit_timestamp_millisecond(end_timestamp_ms)));
conditions.push(col(greptime_timestamp()).gt_eq(lit_timestamp_millisecond(start_timestamp_ms)));
conditions.push(col(greptime_timestamp()).lt_eq(lit_timestamp_millisecond(end_timestamp_ms)));
for m in label_matches {
let name = &m.name;
@@ -241,7 +241,8 @@ fn collect_timeseries_ids(table_name: &str, recordbatch: &RecordBatch) -> Vec<Ti
));
for (i, column_schema) in recordbatch.schema.column_schemas().iter().enumerate() {
if column_schema.name == GREPTIME_VALUE || column_schema.name == GREPTIME_TIMESTAMP {
if column_schema.name == greptime_value() || column_schema.name == greptime_timestamp()
{
continue;
}
@@ -274,7 +275,7 @@ pub fn recordbatches_to_timeseries(
}
fn recordbatch_to_timeseries(table: &str, recordbatch: RecordBatch) -> Result<Vec<TimeSeries>> {
let ts_column = recordbatch.column_by_name(GREPTIME_TIMESTAMP).context(
let ts_column = recordbatch.column_by_name(greptime_timestamp()).context(
error::InvalidPromRemoteReadQueryResultSnafu {
msg: "missing greptime_timestamp column in query result",
},
@@ -289,7 +290,7 @@ fn recordbatch_to_timeseries(table: &str, recordbatch: RecordBatch) -> Result<Ve
}
);
let field_column = recordbatch.column_by_name(GREPTIME_VALUE).context(
let field_column = recordbatch.column_by_name(greptime_value()).context(
error::InvalidPromRemoteReadQueryResultSnafu {
msg: "missing greptime_value column in query result",
},
@@ -381,14 +382,14 @@ pub fn to_grpc_row_insert_requests(request: &WriteRequest) -> Result<(RowInsertR
// value
row_writer::write_f64(
table_data,
GREPTIME_VALUE,
greptime_value(),
series.samples[0].value,
&mut one_row,
)?;
// timestamp
row_writer::write_ts_to_millis(
table_data,
GREPTIME_TIMESTAMP,
greptime_timestamp(),
Some(series.samples[0].timestamp),
Precision::Millisecond,
&mut one_row,
@@ -403,11 +404,11 @@ pub fn to_grpc_row_insert_requests(request: &WriteRequest) -> Result<(RowInsertR
let kvs = kvs.clone();
row_writer::write_tags(table_data, kvs, &mut one_row)?;
// value
row_writer::write_f64(table_data, GREPTIME_VALUE, *value, &mut one_row)?;
row_writer::write_f64(table_data, greptime_value(), *value, &mut one_row)?;
// timestamp
row_writer::write_ts_to_millis(
table_data,
GREPTIME_TIMESTAMP,
greptime_timestamp(),
Some(*timestamp),
Precision::Millisecond,
&mut one_row,
@@ -628,11 +629,11 @@ mod tests {
let schema = Arc::new(Schema::new(vec![
ColumnSchema::new(
GREPTIME_TIMESTAMP,
greptime_timestamp(),
ConcreteDataType::timestamp_millisecond_datatype(),
true,
),
ColumnSchema::new(GREPTIME_VALUE, ConcreteDataType::float64_datatype(), true),
ColumnSchema::new(greptime_value(), ConcreteDataType::float64_datatype(), true),
ColumnSchema::new("instance", ConcreteDataType::string_datatype(), true),
ColumnSchema::new("job", ConcreteDataType::string_datatype(), true),
]));
@@ -655,10 +656,12 @@ mod tests {
let plan = query_to_plan(DataFrame::DataFusion(dataframe), &q).unwrap();
let display_string = format!("{}", plan.display_indent());
assert_eq!(
"Filter: ?table?.greptime_timestamp >= TimestampMillisecond(1000, None) AND ?table?.greptime_timestamp <= TimestampMillisecond(2000, None)\n TableScan: ?table?",
display_string
let ts_col = greptime_timestamp();
let expected = format!(
"Filter: ?table?.{} >= TimestampMillisecond(1000, None) AND ?table?.{} <= TimestampMillisecond(2000, None)\n TableScan: ?table?",
ts_col, ts_col
);
assert_eq!(expected, display_string);
let q = Query {
start_timestamp_ms: 1000,
@@ -687,22 +690,24 @@ mod tests {
let plan = query_to_plan(DataFrame::DataFusion(dataframe), &q).unwrap();
let display_string = format!("{}", plan.display_indent());
assert_eq!(
"Filter: ?table?.greptime_timestamp >= TimestampMillisecond(1000, None) AND ?table?.greptime_timestamp <= TimestampMillisecond(2000, None) AND regexp_match(?table?.job, Utf8(\"*prom*\")) IS NOT NULL AND ?table?.instance != Utf8(\"localhost\")\n TableScan: ?table?",
display_string
let ts_col = greptime_timestamp();
let expected = format!(
"Filter: ?table?.{} >= TimestampMillisecond(1000, None) AND ?table?.{} <= TimestampMillisecond(2000, None) AND regexp_match(?table?.job, Utf8(\"*prom*\")) IS NOT NULL AND ?table?.instance != Utf8(\"localhost\")\n TableScan: ?table?",
ts_col, ts_col
);
assert_eq!(expected, display_string);
}
fn column_schemas_with(
mut kts_iter: Vec<(&str, ColumnDataType, SemanticType)>,
) -> Vec<api::v1::ColumnSchema> {
kts_iter.push((
"greptime_value",
greptime_value(),
ColumnDataType::Float64,
SemanticType::Field,
));
kts_iter.push((
"greptime_timestamp",
greptime_timestamp(),
ColumnDataType::TimestampMillisecond,
SemanticType::Timestamp,
));
@@ -837,11 +842,11 @@ mod tests {
fn test_recordbatches_to_timeseries() {
let schema = Arc::new(Schema::new(vec![
ColumnSchema::new(
GREPTIME_TIMESTAMP,
greptime_timestamp(),
ConcreteDataType::timestamp_millisecond_datatype(),
true,
),
ColumnSchema::new(GREPTIME_VALUE, ConcreteDataType::float64_datatype(), true),
ColumnSchema::new(greptime_value(), ConcreteDataType::float64_datatype(), true),
ColumnSchema::new("instance", ConcreteDataType::string_datatype(), true),
]));

View File

@@ -19,7 +19,7 @@ use std::slice;
use api::prom_store::remote::Sample;
use bytes::{Buf, Bytes};
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use common_query::prelude::{greptime_timestamp, greptime_value};
use common_telemetry::warn;
use pipeline::{ContextReq, GreptimePipelineParams, PipelineContext, PipelineDefinition};
use prost::DecodeError;
@@ -407,10 +407,10 @@ impl PromSeriesProcessor {
let timestamp = s.timestamp;
pipeline_map.insert(
KeyString::from(GREPTIME_TIMESTAMP),
KeyString::from(greptime_timestamp()),
VrlValue::Integer(timestamp),
);
pipeline_map.insert(KeyString::from(GREPTIME_VALUE), VrlValue::Float(value));
pipeline_map.insert(KeyString::from(greptime_value()), VrlValue::Float(value));
if one_sample {
vec_pipeline_map.push(VrlValue::Object(pipeline_map));
break;

View File

@@ -37,6 +37,7 @@ use servers::http::HttpOptions;
pub struct StandaloneOptions {
pub enable_telemetry: bool,
pub default_timezone: Option<String>,
pub default_column_prefix: Option<String>,
pub http: HttpOptions,
pub grpc: GrpcOptions,
pub mysql: MysqlOptions,
@@ -69,6 +70,7 @@ impl Default for StandaloneOptions {
Self {
enable_telemetry: true,
default_timezone: None,
default_column_prefix: None,
http: HttpOptions::default(),
grpc: GrpcOptions::default(),
mysql: MysqlOptions::default(),