mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-07 05:42:57 +00:00
Compare commits
11 Commits
flow/admin
...
v0.15.0-ni
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
69870e2762 | ||
|
|
f9f4ac1dca | ||
|
|
99e56af98c | ||
|
|
538b5abaae | ||
|
|
a2b3ad77df | ||
|
|
0eb9e97f79 | ||
|
|
06b1627da5 | ||
|
|
0d4f27a699 | ||
|
|
c4da8bb69d | ||
|
|
0bd8856e2f | ||
|
|
92c5a9f5f4 |
@@ -64,11 +64,11 @@ inputs:
|
||||
upload-max-retry-times:
|
||||
description: Max retry times for uploading artifacts to S3
|
||||
required: false
|
||||
default: "20"
|
||||
default: "30"
|
||||
upload-retry-timeout:
|
||||
description: Timeout for uploading artifacts to S3
|
||||
required: false
|
||||
default: "30" # minutes
|
||||
default: "120" # minutes
|
||||
runs:
|
||||
using: composite
|
||||
steps:
|
||||
|
||||
780
Cargo.lock
generated
780
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -30,6 +30,7 @@ members = [
|
||||
"src/common/recordbatch",
|
||||
"src/common/runtime",
|
||||
"src/common/session",
|
||||
"src/common/stat",
|
||||
"src/common/substrait",
|
||||
"src/common/telemetry",
|
||||
"src/common/test-util",
|
||||
@@ -132,7 +133,7 @@ etcd-client = "0.14"
|
||||
fst = "0.4.7"
|
||||
futures = "0.3"
|
||||
futures-util = "0.3"
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "2dca1dc67862d7b410838aef81232274c019b3f6" }
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "454c52634c3bac27de10bf0d85d5533eed1cf03f" }
|
||||
hex = "0.4"
|
||||
http = "1"
|
||||
humantime = "2.1"
|
||||
@@ -148,6 +149,7 @@ meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev =
|
||||
mockall = "0.13"
|
||||
moka = "0.12"
|
||||
nalgebra = "0.33"
|
||||
nix = { version = "0.30.1", default-features = false, features = ["event", "fs", "process"] }
|
||||
notify = "8.0"
|
||||
num_cpus = "1.16"
|
||||
object_store_opendal = "0.50"
|
||||
@@ -287,6 +289,7 @@ query = { path = "src/query" }
|
||||
servers = { path = "src/servers" }
|
||||
session = { path = "src/session" }
|
||||
sql = { path = "src/sql" }
|
||||
stat = { path = "src/common/stat" }
|
||||
store-api = { path = "src/store-api" }
|
||||
substrait = { path = "src/common/substrait" }
|
||||
table = { path = "src/table" }
|
||||
|
||||
@@ -19,9 +19,11 @@ use std::time::Duration;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use clap::{Parser, ValueEnum};
|
||||
use common_base::secrets::{ExposeSecret, SecretString};
|
||||
use common_error::ext::BoxedError;
|
||||
use common_telemetry::{debug, error, info};
|
||||
use object_store::layers::LoggingLayer;
|
||||
use object_store::services::Oss;
|
||||
use object_store::{services, ObjectStore};
|
||||
use serde_json::Value;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
@@ -110,15 +112,15 @@ pub struct ExportCommand {
|
||||
#[clap(long)]
|
||||
s3: bool,
|
||||
|
||||
/// if both `s3_ddl_local_dir` and `s3` are set, `s3_ddl_local_dir` will be only used for
|
||||
/// exported SQL files, and the data will be exported to s3.
|
||||
/// if both `ddl_local_dir` and remote storage (s3/oss) are set, `ddl_local_dir` will be only used for
|
||||
/// exported SQL files, and the data will be exported to remote storage.
|
||||
///
|
||||
/// Note that `s3_ddl_local_dir` export sql files to **LOCAL** file system, this is useful if export client don't have
|
||||
/// direct access to s3.
|
||||
/// Note that `ddl_local_dir` export sql files to **LOCAL** file system, this is useful if export client don't have
|
||||
/// direct access to remote storage.
|
||||
///
|
||||
/// if `s3` is set but `s3_ddl_local_dir` is not set, both SQL&data will be exported to s3.
|
||||
/// if remote storage is set but `ddl_local_dir` is not set, both SQL&data will be exported to remote storage.
|
||||
#[clap(long)]
|
||||
s3_ddl_local_dir: Option<String>,
|
||||
ddl_local_dir: Option<String>,
|
||||
|
||||
/// The s3 bucket name
|
||||
/// if s3 is set, this is required
|
||||
@@ -149,6 +151,30 @@ pub struct ExportCommand {
|
||||
/// if s3 is set, this is required
|
||||
#[clap(long)]
|
||||
s3_region: Option<String>,
|
||||
|
||||
/// if export data to oss
|
||||
#[clap(long)]
|
||||
oss: bool,
|
||||
|
||||
/// The oss bucket name
|
||||
/// if oss is set, this is required
|
||||
#[clap(long)]
|
||||
oss_bucket: Option<String>,
|
||||
|
||||
/// The oss endpoint
|
||||
/// if oss is set, this is required
|
||||
#[clap(long)]
|
||||
oss_endpoint: Option<String>,
|
||||
|
||||
/// The oss access key id
|
||||
/// if oss is set, this is required
|
||||
#[clap(long)]
|
||||
oss_access_key_id: Option<String>,
|
||||
|
||||
/// The oss access key secret
|
||||
/// if oss is set, this is required
|
||||
#[clap(long)]
|
||||
oss_access_key_secret: Option<String>,
|
||||
}
|
||||
|
||||
impl ExportCommand {
|
||||
@@ -162,7 +188,7 @@ impl ExportCommand {
|
||||
{
|
||||
return Err(BoxedError::new(S3ConfigNotSetSnafu {}.build()));
|
||||
}
|
||||
if !self.s3 && self.output_dir.is_none() {
|
||||
if !self.s3 && !self.oss && self.output_dir.is_none() {
|
||||
return Err(BoxedError::new(OutputDirNotSetSnafu {}.build()));
|
||||
}
|
||||
let (catalog, schema) =
|
||||
@@ -187,13 +213,32 @@ impl ExportCommand {
|
||||
start_time: self.start_time.clone(),
|
||||
end_time: self.end_time.clone(),
|
||||
s3: self.s3,
|
||||
s3_ddl_local_dir: self.s3_ddl_local_dir.clone(),
|
||||
ddl_local_dir: self.ddl_local_dir.clone(),
|
||||
s3_bucket: self.s3_bucket.clone(),
|
||||
s3_root: self.s3_root.clone(),
|
||||
s3_endpoint: self.s3_endpoint.clone(),
|
||||
s3_access_key: self.s3_access_key.clone(),
|
||||
s3_secret_key: self.s3_secret_key.clone(),
|
||||
// Wrap sensitive values in SecretString
|
||||
s3_access_key: self
|
||||
.s3_access_key
|
||||
.as_ref()
|
||||
.map(|k| SecretString::from(k.clone())),
|
||||
s3_secret_key: self
|
||||
.s3_secret_key
|
||||
.as_ref()
|
||||
.map(|k| SecretString::from(k.clone())),
|
||||
s3_region: self.s3_region.clone(),
|
||||
oss: self.oss,
|
||||
oss_bucket: self.oss_bucket.clone(),
|
||||
oss_endpoint: self.oss_endpoint.clone(),
|
||||
// Wrap sensitive values in SecretString
|
||||
oss_access_key_id: self
|
||||
.oss_access_key_id
|
||||
.as_ref()
|
||||
.map(|k| SecretString::from(k.clone())),
|
||||
oss_access_key_secret: self
|
||||
.oss_access_key_secret
|
||||
.as_ref()
|
||||
.map(|k| SecretString::from(k.clone())),
|
||||
}))
|
||||
}
|
||||
}
|
||||
@@ -209,23 +254,30 @@ pub struct Export {
|
||||
start_time: Option<String>,
|
||||
end_time: Option<String>,
|
||||
s3: bool,
|
||||
s3_ddl_local_dir: Option<String>,
|
||||
ddl_local_dir: Option<String>,
|
||||
s3_bucket: Option<String>,
|
||||
s3_root: Option<String>,
|
||||
s3_endpoint: Option<String>,
|
||||
s3_access_key: Option<String>,
|
||||
s3_secret_key: Option<String>,
|
||||
// Changed to SecretString for sensitive data
|
||||
s3_access_key: Option<SecretString>,
|
||||
s3_secret_key: Option<SecretString>,
|
||||
s3_region: Option<String>,
|
||||
oss: bool,
|
||||
oss_bucket: Option<String>,
|
||||
oss_endpoint: Option<String>,
|
||||
// Changed to SecretString for sensitive data
|
||||
oss_access_key_id: Option<SecretString>,
|
||||
oss_access_key_secret: Option<SecretString>,
|
||||
}
|
||||
|
||||
impl Export {
|
||||
fn catalog_path(&self) -> PathBuf {
|
||||
if self.s3 {
|
||||
if self.s3 || self.oss {
|
||||
PathBuf::from(&self.catalog)
|
||||
} else if let Some(dir) = &self.output_dir {
|
||||
PathBuf::from(dir).join(&self.catalog)
|
||||
} else {
|
||||
unreachable!("catalog_path: output_dir must be set when not using s3")
|
||||
unreachable!("catalog_path: output_dir must be set when not using remote storage")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -427,7 +479,7 @@ impl Export {
|
||||
.await?;
|
||||
|
||||
// Create directory if needed for file system storage
|
||||
if !export_self.s3 {
|
||||
if !export_self.s3 && !export_self.oss {
|
||||
let db_dir = format!("{}/{}/", export_self.catalog, schema);
|
||||
operator.create_dir(&db_dir).await.context(OpenDalSnafu)?;
|
||||
}
|
||||
@@ -473,6 +525,8 @@ impl Export {
|
||||
async fn build_operator(&self) -> Result<ObjectStore> {
|
||||
if self.s3 {
|
||||
self.build_s3_operator().await
|
||||
} else if self.oss {
|
||||
self.build_oss_operator().await
|
||||
} else {
|
||||
self.build_fs_operator().await
|
||||
}
|
||||
@@ -480,9 +534,8 @@ impl Export {
|
||||
|
||||
/// build operator with preference for file system
|
||||
async fn build_prefer_fs_operator(&self) -> Result<ObjectStore> {
|
||||
// is under s3 mode and s3_ddl_dir is set, use it as root
|
||||
if self.s3 && self.s3_ddl_local_dir.is_some() {
|
||||
let root = self.s3_ddl_local_dir.as_ref().unwrap().clone();
|
||||
if (self.s3 || self.oss) && self.ddl_local_dir.is_some() {
|
||||
let root = self.ddl_local_dir.as_ref().unwrap().clone();
|
||||
let op = ObjectStore::new(services::Fs::default().root(&root))
|
||||
.context(OpenDalSnafu)?
|
||||
.layer(LoggingLayer::default())
|
||||
@@ -490,6 +543,8 @@ impl Export {
|
||||
Ok(op)
|
||||
} else if self.s3 {
|
||||
self.build_s3_operator().await
|
||||
} else if self.oss {
|
||||
self.build_oss_operator().await
|
||||
} else {
|
||||
self.build_fs_operator().await
|
||||
}
|
||||
@@ -515,11 +570,35 @@ impl Export {
|
||||
}
|
||||
|
||||
if let Some(key_id) = self.s3_access_key.as_ref() {
|
||||
builder = builder.access_key_id(key_id);
|
||||
builder = builder.access_key_id(key_id.expose_secret());
|
||||
}
|
||||
|
||||
if let Some(secret_key) = self.s3_secret_key.as_ref() {
|
||||
builder = builder.secret_access_key(secret_key);
|
||||
builder = builder.secret_access_key(secret_key.expose_secret());
|
||||
}
|
||||
|
||||
let op = ObjectStore::new(builder)
|
||||
.context(OpenDalSnafu)?
|
||||
.layer(LoggingLayer::default())
|
||||
.finish();
|
||||
Ok(op)
|
||||
}
|
||||
|
||||
async fn build_oss_operator(&self) -> Result<ObjectStore> {
|
||||
let mut builder = Oss::default()
|
||||
.bucket(self.oss_bucket.as_ref().expect("oss_bucket must be set"))
|
||||
.endpoint(
|
||||
self.oss_endpoint
|
||||
.as_ref()
|
||||
.expect("oss_endpoint must be set"),
|
||||
);
|
||||
|
||||
// Use expose_secret() to access the actual secret value
|
||||
if let Some(key_id) = self.oss_access_key_id.as_ref() {
|
||||
builder = builder.access_key_id(key_id.expose_secret());
|
||||
}
|
||||
if let Some(secret_key) = self.oss_access_key_secret.as_ref() {
|
||||
builder = builder.access_key_secret(secret_key.expose_secret());
|
||||
}
|
||||
|
||||
let op = ObjectStore::new(builder)
|
||||
@@ -562,8 +641,8 @@ impl Export {
|
||||
tasks.push(async move {
|
||||
let _permit = semaphore_moved.acquire().await.unwrap();
|
||||
|
||||
// Create directory if not using S3
|
||||
if !export_self.s3 {
|
||||
// Create directory if not using remote storage
|
||||
if !export_self.s3 && !export_self.oss {
|
||||
let db_dir = format!("{}/{}/", export_self.catalog, schema);
|
||||
operator.create_dir(&db_dir).await.context(OpenDalSnafu)?;
|
||||
}
|
||||
@@ -575,7 +654,11 @@ impl Export {
|
||||
r#"COPY DATABASE "{}"."{}" TO '{}' WITH ({}){};"#,
|
||||
export_self.catalog, schema, path, with_options_clone, connection_part
|
||||
);
|
||||
info!("Executing sql: {sql}");
|
||||
|
||||
// Log SQL command but mask sensitive information
|
||||
let safe_sql = export_self.mask_sensitive_sql(&sql);
|
||||
info!("Executing sql: {}", safe_sql);
|
||||
|
||||
export_self.database_client.sql_in_public(&sql).await?;
|
||||
info!(
|
||||
"Finished exporting {}.{} data to {}",
|
||||
@@ -615,6 +698,29 @@ impl Export {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Mask sensitive information in SQL commands for safe logging
|
||||
fn mask_sensitive_sql(&self, sql: &str) -> String {
|
||||
let mut masked_sql = sql.to_string();
|
||||
|
||||
// Mask S3 credentials
|
||||
if let Some(access_key) = &self.s3_access_key {
|
||||
masked_sql = masked_sql.replace(access_key.expose_secret(), "[REDACTED]");
|
||||
}
|
||||
if let Some(secret_key) = &self.s3_secret_key {
|
||||
masked_sql = masked_sql.replace(secret_key.expose_secret(), "[REDACTED]");
|
||||
}
|
||||
|
||||
// Mask OSS credentials
|
||||
if let Some(access_key_id) = &self.oss_access_key_id {
|
||||
masked_sql = masked_sql.replace(access_key_id.expose_secret(), "[REDACTED]");
|
||||
}
|
||||
if let Some(access_key_secret) = &self.oss_access_key_secret {
|
||||
masked_sql = masked_sql.replace(access_key_secret.expose_secret(), "[REDACTED]");
|
||||
}
|
||||
|
||||
masked_sql
|
||||
}
|
||||
|
||||
fn get_file_path(&self, schema: &str, file_name: &str) -> String {
|
||||
format!("{}/{}/{}", self.catalog, schema, file_name)
|
||||
}
|
||||
@@ -631,6 +737,13 @@ impl Export {
|
||||
},
|
||||
file_path
|
||||
)
|
||||
} else if self.oss {
|
||||
format!(
|
||||
"oss://{}/{}/{}",
|
||||
self.oss_bucket.as_ref().unwrap_or(&String::new()),
|
||||
self.catalog,
|
||||
file_path
|
||||
)
|
||||
} else {
|
||||
format!(
|
||||
"{}/{}",
|
||||
@@ -675,15 +788,36 @@ impl Export {
|
||||
};
|
||||
|
||||
// Safety: All s3 options are required
|
||||
// Use expose_secret() to access the actual secret values
|
||||
let connection_options = format!(
|
||||
"ACCESS_KEY_ID='{}', SECRET_ACCESS_KEY='{}', REGION='{}'{}",
|
||||
self.s3_access_key.as_ref().unwrap(),
|
||||
self.s3_secret_key.as_ref().unwrap(),
|
||||
self.s3_access_key.as_ref().unwrap().expose_secret(),
|
||||
self.s3_secret_key.as_ref().unwrap().expose_secret(),
|
||||
self.s3_region.as_ref().unwrap(),
|
||||
endpoint_option
|
||||
);
|
||||
|
||||
(s3_path, format!(" CONNECTION ({})", connection_options))
|
||||
} else if self.oss {
|
||||
let oss_path = format!(
|
||||
"oss://{}/{}/{}/",
|
||||
self.oss_bucket.as_ref().unwrap(),
|
||||
self.catalog,
|
||||
schema
|
||||
);
|
||||
let endpoint_option = if let Some(endpoint) = self.oss_endpoint.as_ref() {
|
||||
format!(", ENDPOINT='{}'", endpoint)
|
||||
} else {
|
||||
String::new()
|
||||
};
|
||||
|
||||
let connection_options = format!(
|
||||
"ACCESS_KEY_ID='{}', ACCESS_KEY_SECRET='{}'{}",
|
||||
self.oss_access_key_id.as_ref().unwrap().expose_secret(),
|
||||
self.oss_access_key_secret.as_ref().unwrap().expose_secret(),
|
||||
endpoint_option
|
||||
);
|
||||
(oss_path, format!(" CONNECTION ({})", connection_options))
|
||||
} else {
|
||||
(
|
||||
self.catalog_path()
|
||||
|
||||
@@ -80,6 +80,7 @@ servers.workspace = true
|
||||
session.workspace = true
|
||||
similar-asserts.workspace = true
|
||||
snafu.workspace = true
|
||||
stat.workspace = true
|
||||
store-api.workspace = true
|
||||
substrait.workspace = true
|
||||
table.workspace = true
|
||||
|
||||
@@ -28,7 +28,7 @@ use tracing_appender::non_blocking::WorkerGuard;
|
||||
|
||||
use crate::datanode::{DatanodeOptions, Instance, APP_NAME};
|
||||
use crate::error::{MetaClientInitSnafu, MissingConfigSnafu, Result, StartDatanodeSnafu};
|
||||
use crate::log_versions;
|
||||
use crate::{create_resource_limit_metrics, log_versions};
|
||||
|
||||
/// Builder for Datanode instance.
|
||||
pub struct InstanceBuilder {
|
||||
@@ -68,6 +68,7 @@ impl InstanceBuilder {
|
||||
);
|
||||
|
||||
log_versions(version(), short_version(), APP_NAME);
|
||||
create_resource_limit_metrics(APP_NAME);
|
||||
|
||||
plugins::setup_datanode_plugins(plugins, &opts.plugins, dn_opts)
|
||||
.await
|
||||
|
||||
@@ -45,7 +45,7 @@ use crate::error::{
|
||||
MissingConfigSnafu, Result, ShutdownFlownodeSnafu, StartFlownodeSnafu,
|
||||
};
|
||||
use crate::options::{GlobalOptions, GreptimeOptions};
|
||||
use crate::{log_versions, App};
|
||||
use crate::{create_resource_limit_metrics, log_versions, App};
|
||||
|
||||
pub const APP_NAME: &str = "greptime-flownode";
|
||||
|
||||
@@ -246,7 +246,9 @@ impl StartCommand {
|
||||
opts.component.node_id.map(|x| x.to_string()),
|
||||
None,
|
||||
);
|
||||
|
||||
log_versions(version(), short_version(), APP_NAME);
|
||||
create_resource_limit_metrics(APP_NAME);
|
||||
|
||||
info!("Flownode start command: {:#?}", self);
|
||||
info!("Flownode options: {:#?}", opts);
|
||||
|
||||
@@ -44,7 +44,7 @@ use tracing_appender::non_blocking::WorkerGuard;
|
||||
|
||||
use crate::error::{self, Result};
|
||||
use crate::options::{GlobalOptions, GreptimeOptions};
|
||||
use crate::{log_versions, App};
|
||||
use crate::{create_resource_limit_metrics, log_versions, App};
|
||||
|
||||
type FrontendOptions = GreptimeOptions<frontend::frontend::FrontendOptions>;
|
||||
|
||||
@@ -270,7 +270,9 @@ impl StartCommand {
|
||||
opts.component.node_id.clone(),
|
||||
opts.component.slow_query.as_ref(),
|
||||
);
|
||||
|
||||
log_versions(version(), short_version(), APP_NAME);
|
||||
create_resource_limit_metrics(APP_NAME);
|
||||
|
||||
info!("Frontend start command: {:#?}", self);
|
||||
info!("Frontend options: {:#?}", opts);
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
|
||||
use async_trait::async_trait;
|
||||
use common_telemetry::{error, info};
|
||||
use stat::{get_cpu_limit, get_memory_limit};
|
||||
|
||||
use crate::error::Result;
|
||||
|
||||
@@ -31,6 +32,12 @@ pub mod standalone;
|
||||
lazy_static::lazy_static! {
|
||||
static ref APP_VERSION: prometheus::IntGaugeVec =
|
||||
prometheus::register_int_gauge_vec!("greptime_app_version", "app version", &["version", "short_version", "app"]).unwrap();
|
||||
|
||||
static ref CPU_LIMIT: prometheus::IntGaugeVec =
|
||||
prometheus::register_int_gauge_vec!("greptime_cpu_limit_in_millicores", "cpu limit in millicores", &["app"]).unwrap();
|
||||
|
||||
static ref MEMORY_LIMIT: prometheus::IntGaugeVec =
|
||||
prometheus::register_int_gauge_vec!("greptime_memory_limit_in_bytes", "memory limit in bytes", &["app"]).unwrap();
|
||||
}
|
||||
|
||||
/// wait for the close signal, for unix platform it's SIGINT or SIGTERM
|
||||
@@ -114,6 +121,24 @@ pub fn log_versions(version: &str, short_version: &str, app: &str) {
|
||||
log_env_flags();
|
||||
}
|
||||
|
||||
pub fn create_resource_limit_metrics(app: &str) {
|
||||
if let Some(cpu_limit) = get_cpu_limit() {
|
||||
info!(
|
||||
"GreptimeDB start with cpu limit in millicores: {}",
|
||||
cpu_limit
|
||||
);
|
||||
CPU_LIMIT.with_label_values(&[app]).set(cpu_limit);
|
||||
}
|
||||
|
||||
if let Some(memory_limit) = get_memory_limit() {
|
||||
info!(
|
||||
"GreptimeDB start with memory limit in bytes: {}",
|
||||
memory_limit
|
||||
);
|
||||
MEMORY_LIMIT.with_label_values(&[app]).set(memory_limit);
|
||||
}
|
||||
}
|
||||
|
||||
fn log_env_flags() {
|
||||
info!("command line arguments");
|
||||
for argument in std::env::args() {
|
||||
|
||||
@@ -29,7 +29,7 @@ use tracing_appender::non_blocking::WorkerGuard;
|
||||
|
||||
use crate::error::{self, LoadLayeredConfigSnafu, Result, StartMetaServerSnafu};
|
||||
use crate::options::{GlobalOptions, GreptimeOptions};
|
||||
use crate::{log_versions, App};
|
||||
use crate::{create_resource_limit_metrics, log_versions, App};
|
||||
|
||||
type MetasrvOptions = GreptimeOptions<meta_srv::metasrv::MetasrvOptions>;
|
||||
|
||||
@@ -302,7 +302,9 @@ impl StartCommand {
|
||||
None,
|
||||
None,
|
||||
);
|
||||
|
||||
log_versions(version(), short_version(), APP_NAME);
|
||||
create_resource_limit_metrics(APP_NAME);
|
||||
|
||||
info!("Metasrv start command: {:#?}", self);
|
||||
|
||||
|
||||
@@ -83,7 +83,7 @@ use tracing_appender::non_blocking::WorkerGuard;
|
||||
|
||||
use crate::error::{Result, StartFlownodeSnafu};
|
||||
use crate::options::{GlobalOptions, GreptimeOptions};
|
||||
use crate::{error, log_versions, App};
|
||||
use crate::{create_resource_limit_metrics, error, log_versions, App};
|
||||
|
||||
pub const APP_NAME: &str = "greptime-standalone";
|
||||
|
||||
@@ -457,7 +457,9 @@ impl StartCommand {
|
||||
None,
|
||||
opts.component.slow_query.as_ref(),
|
||||
);
|
||||
|
||||
log_versions(version(), short_version(), APP_NAME);
|
||||
create_resource_limit_metrics(APP_NAME);
|
||||
|
||||
info!("Standalone start command: {:#?}", self);
|
||||
info!("Standalone options: {opts:#?}");
|
||||
|
||||
@@ -13,7 +13,9 @@
|
||||
// limitations under the License.
|
||||
|
||||
pub mod fs;
|
||||
pub mod oss;
|
||||
pub mod s3;
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use lazy_static::lazy_static;
|
||||
@@ -25,10 +27,12 @@ use url::{ParseError, Url};
|
||||
use self::fs::build_fs_backend;
|
||||
use self::s3::build_s3_backend;
|
||||
use crate::error::{self, Result};
|
||||
use crate::object_store::oss::build_oss_backend;
|
||||
use crate::util::find_dir_and_filename;
|
||||
|
||||
pub const FS_SCHEMA: &str = "FS";
|
||||
pub const S3_SCHEMA: &str = "S3";
|
||||
pub const OSS_SCHEMA: &str = "OSS";
|
||||
|
||||
/// Returns `(schema, Option<host>, path)`
|
||||
pub fn parse_url(url: &str) -> Result<(String, Option<String>, String)> {
|
||||
@@ -64,6 +68,12 @@ pub fn build_backend(url: &str, connection: &HashMap<String, String>) -> Result<
|
||||
})?;
|
||||
Ok(build_s3_backend(&host, &root, connection)?)
|
||||
}
|
||||
OSS_SCHEMA => {
|
||||
let host = host.context(error::EmptyHostPathSnafu {
|
||||
url: url.to_string(),
|
||||
})?;
|
||||
Ok(build_oss_backend(&host, &root, connection)?)
|
||||
}
|
||||
FS_SCHEMA => Ok(build_fs_backend(&root)?),
|
||||
|
||||
_ => error::UnsupportedBackendProtocolSnafu {
|
||||
|
||||
118
src/common/datasource/src/object_store/oss.rs
Normal file
118
src/common/datasource/src/object_store/oss.rs
Normal file
@@ -0,0 +1,118 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use object_store::services::Oss;
|
||||
use object_store::ObjectStore;
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::error::{self, Result};
|
||||
|
||||
const BUCKET: &str = "bucket";
|
||||
const ENDPOINT: &str = "endpoint";
|
||||
const ACCESS_KEY_ID: &str = "access_key_id";
|
||||
const ACCESS_KEY_SECRET: &str = "access_key_secret";
|
||||
const ROOT: &str = "root";
|
||||
const ALLOW_ANONYMOUS: &str = "allow_anonymous";
|
||||
|
||||
/// Check if the key is supported in OSS configuration.
|
||||
pub fn is_supported_in_oss(key: &str) -> bool {
|
||||
[
|
||||
ROOT,
|
||||
ALLOW_ANONYMOUS,
|
||||
BUCKET,
|
||||
ENDPOINT,
|
||||
ACCESS_KEY_ID,
|
||||
ACCESS_KEY_SECRET,
|
||||
]
|
||||
.contains(&key)
|
||||
}
|
||||
|
||||
/// Build an OSS backend using the provided bucket, root, and connection parameters.
|
||||
pub fn build_oss_backend(
|
||||
bucket: &str,
|
||||
root: &str,
|
||||
connection: &HashMap<String, String>,
|
||||
) -> Result<ObjectStore> {
|
||||
let mut builder = Oss::default().bucket(bucket).root(root);
|
||||
|
||||
if let Some(endpoint) = connection.get(ENDPOINT) {
|
||||
builder = builder.endpoint(endpoint);
|
||||
}
|
||||
|
||||
if let Some(access_key_id) = connection.get(ACCESS_KEY_ID) {
|
||||
builder = builder.access_key_id(access_key_id);
|
||||
}
|
||||
|
||||
if let Some(access_key_secret) = connection.get(ACCESS_KEY_SECRET) {
|
||||
builder = builder.access_key_secret(access_key_secret);
|
||||
}
|
||||
|
||||
if let Some(allow_anonymous) = connection.get(ALLOW_ANONYMOUS) {
|
||||
let allow = allow_anonymous.as_str().parse::<bool>().map_err(|e| {
|
||||
error::InvalidConnectionSnafu {
|
||||
msg: format!(
|
||||
"failed to parse the option {}={}, {}",
|
||||
ALLOW_ANONYMOUS, allow_anonymous, e
|
||||
),
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
if allow {
|
||||
builder = builder.allow_anonymous();
|
||||
}
|
||||
}
|
||||
|
||||
let op = ObjectStore::new(builder)
|
||||
.context(error::BuildBackendSnafu)?
|
||||
.layer(object_store::layers::LoggingLayer::default())
|
||||
.layer(object_store::layers::TracingLayer)
|
||||
.layer(object_store::layers::build_prometheus_metrics_layer(true))
|
||||
.finish();
|
||||
|
||||
Ok(op)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_is_supported_in_oss() {
|
||||
assert!(is_supported_in_oss(ROOT));
|
||||
assert!(is_supported_in_oss(ALLOW_ANONYMOUS));
|
||||
assert!(is_supported_in_oss(BUCKET));
|
||||
assert!(is_supported_in_oss(ENDPOINT));
|
||||
assert!(is_supported_in_oss(ACCESS_KEY_ID));
|
||||
assert!(is_supported_in_oss(ACCESS_KEY_SECRET));
|
||||
assert!(!is_supported_in_oss("foo"));
|
||||
assert!(!is_supported_in_oss("BAR"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_build_oss_backend_all_fields_valid() {
|
||||
let mut connection = HashMap::new();
|
||||
connection.insert(
|
||||
ENDPOINT.to_string(),
|
||||
"http://oss-ap-southeast-1.aliyuncs.com".to_string(),
|
||||
);
|
||||
connection.insert(ACCESS_KEY_ID.to_string(), "key_id".to_string());
|
||||
connection.insert(ACCESS_KEY_SECRET.to_string(), "key_secret".to_string());
|
||||
connection.insert(ALLOW_ANONYMOUS.to_string(), "true".to_string());
|
||||
|
||||
let result = build_oss_backend("my-bucket", "my-root", &connection);
|
||||
assert!(result.is_ok());
|
||||
}
|
||||
}
|
||||
@@ -1,90 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_macro::admin_fn;
|
||||
use common_query::error::{
|
||||
InvalidFuncArgsSnafu, MissingFlowServiceHandlerSnafu, Result, UnsupportedInputDataTypeSnafu,
|
||||
};
|
||||
use common_query::prelude::Signature;
|
||||
use datafusion::logical_expr::Volatility;
|
||||
use datatypes::value::{Value, ValueRef};
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::ensure;
|
||||
use store_api::storage::ConcreteDataType;
|
||||
|
||||
use crate::handlers::FlowServiceHandlerRef;
|
||||
use crate::helper::parse_catalog_flow;
|
||||
|
||||
fn adjust_signature() -> Signature {
|
||||
Signature::exact(
|
||||
vec![
|
||||
ConcreteDataType::string_datatype(), // flow name
|
||||
ConcreteDataType::uint64_datatype(), // min_run_interval in seconds
|
||||
ConcreteDataType::uint64_datatype(), // max filter number per query
|
||||
],
|
||||
Volatility::Immutable,
|
||||
)
|
||||
}
|
||||
|
||||
#[admin_fn(
|
||||
name = AdjustFlowFunction,
|
||||
display_name = adjust_flow,
|
||||
sig_fn = adjust_signature,
|
||||
ret = uint64
|
||||
)]
|
||||
pub(crate) async fn adjust_flow(
|
||||
flow_service_handler: &FlowServiceHandlerRef,
|
||||
query_ctx: &QueryContextRef,
|
||||
params: &[ValueRef<'_>],
|
||||
) -> Result<Value> {
|
||||
ensure!(
|
||||
params.len() == 3,
|
||||
InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"The length of the args is not correct, expect 3, have: {}",
|
||||
params.len()
|
||||
),
|
||||
}
|
||||
);
|
||||
|
||||
let (flow_name, min_run_interval, max_filter_num) = match (params[0], params[1], params[2]) {
|
||||
(
|
||||
ValueRef::String(flow_name),
|
||||
ValueRef::UInt64(min_run_interval),
|
||||
ValueRef::UInt64(max_filter_num),
|
||||
) => (flow_name, min_run_interval, max_filter_num),
|
||||
_ => {
|
||||
return UnsupportedInputDataTypeSnafu {
|
||||
function: "adjust_flow",
|
||||
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
};
|
||||
|
||||
let (catalog_name, flow_name) = parse_catalog_flow(flow_name, query_ctx)?;
|
||||
|
||||
let res = flow_service_handler
|
||||
.adjust(
|
||||
&catalog_name,
|
||||
&flow_name,
|
||||
min_run_interval,
|
||||
max_filter_num as usize,
|
||||
query_ctx.clone(),
|
||||
)
|
||||
.await?;
|
||||
let affected_rows = res.affected_rows;
|
||||
|
||||
Ok(Value::from(affected_rows))
|
||||
}
|
||||
@@ -26,7 +26,6 @@ use flush_compact_table::{CompactTableFunction, FlushTableFunction};
|
||||
use migrate_region::MigrateRegionFunction;
|
||||
use remove_region_follower::RemoveRegionFollowerFunction;
|
||||
|
||||
use crate::adjust_flow::AdjustFlowFunction;
|
||||
use crate::flush_flow::FlushFlowFunction;
|
||||
use crate::function_registry::FunctionRegistry;
|
||||
|
||||
@@ -44,6 +43,5 @@ impl AdminFunction {
|
||||
registry.register_async(Arc::new(FlushTableFunction));
|
||||
registry.register_async(Arc::new(CompactTableFunction));
|
||||
registry.register_async(Arc::new(FlushFlowFunction));
|
||||
registry.register_async(Arc::new(AdjustFlowFunction));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,19 +12,21 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_error::ext::BoxedError;
|
||||
use common_macro::admin_fn;
|
||||
use common_query::error::{
|
||||
InvalidFuncArgsSnafu, MissingFlowServiceHandlerSnafu, Result, UnsupportedInputDataTypeSnafu,
|
||||
ExecuteSnafu, InvalidFuncArgsSnafu, MissingFlowServiceHandlerSnafu, Result,
|
||||
UnsupportedInputDataTypeSnafu,
|
||||
};
|
||||
use common_query::prelude::Signature;
|
||||
use datafusion::logical_expr::Volatility;
|
||||
use datatypes::value::{Value, ValueRef};
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::ensure;
|
||||
use snafu::{ensure, ResultExt};
|
||||
use sql::parser::ParserContext;
|
||||
use store_api::storage::ConcreteDataType;
|
||||
|
||||
use crate::handlers::FlowServiceHandlerRef;
|
||||
use crate::helper::parse_catalog_flow;
|
||||
|
||||
fn flush_signature() -> Signature {
|
||||
Signature::uniform(
|
||||
@@ -45,6 +47,20 @@ pub(crate) async fn flush_flow(
|
||||
query_ctx: &QueryContextRef,
|
||||
params: &[ValueRef<'_>],
|
||||
) -> Result<Value> {
|
||||
let (catalog_name, flow_name) = parse_flush_flow(params, query_ctx)?;
|
||||
|
||||
let res = flow_service_handler
|
||||
.flush(&catalog_name, &flow_name, query_ctx.clone())
|
||||
.await?;
|
||||
let affected_rows = res.affected_rows;
|
||||
|
||||
Ok(Value::from(affected_rows))
|
||||
}
|
||||
|
||||
fn parse_flush_flow(
|
||||
params: &[ValueRef<'_>],
|
||||
query_ctx: &QueryContextRef,
|
||||
) -> Result<(String, String)> {
|
||||
ensure!(
|
||||
params.len() == 1,
|
||||
InvalidFuncArgsSnafu {
|
||||
@@ -54,6 +70,7 @@ pub(crate) async fn flush_flow(
|
||||
),
|
||||
}
|
||||
);
|
||||
|
||||
let ValueRef::String(flow_name) = params[0] else {
|
||||
return UnsupportedInputDataTypeSnafu {
|
||||
function: "flush_flow",
|
||||
@@ -61,14 +78,27 @@ pub(crate) async fn flush_flow(
|
||||
}
|
||||
.fail();
|
||||
};
|
||||
let (catalog_name, flow_name) = parse_catalog_flow(flow_name, query_ctx)?;
|
||||
let obj_name = ParserContext::parse_table_name(flow_name, query_ctx.sql_dialect())
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExecuteSnafu)?;
|
||||
|
||||
let res = flow_service_handler
|
||||
.flush(&catalog_name, &flow_name, query_ctx.clone())
|
||||
.await?;
|
||||
let affected_rows = res.affected_rows;
|
||||
|
||||
Ok(Value::from(affected_rows))
|
||||
let (catalog_name, flow_name) = match &obj_name.0[..] {
|
||||
[flow_name] => (
|
||||
query_ctx.current_catalog().to_string(),
|
||||
flow_name.value.clone(),
|
||||
),
|
||||
[catalog, flow_name] => (catalog.value.clone(), flow_name.value.clone()),
|
||||
_ => {
|
||||
return InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"expect flow name to be <catalog>.<flow-name> or <flow-name>, actual: {}",
|
||||
obj_name
|
||||
),
|
||||
}
|
||||
.fail()
|
||||
}
|
||||
};
|
||||
Ok((catalog_name, flow_name))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -124,7 +154,10 @@ mod test {
|
||||
("catalog.flow_name", ("catalog", "flow_name")),
|
||||
];
|
||||
for (input, expected) in testcases.iter() {
|
||||
let result = parse_catalog_flow(input, &QueryContext::arc()).unwrap();
|
||||
let args = vec![*input];
|
||||
let args = args.into_iter().map(ValueRef::String).collect::<Vec<_>>();
|
||||
|
||||
let result = parse_flush_flow(&args, &QueryContext::arc()).unwrap();
|
||||
assert_eq!(*expected, (result.0.as_str(), result.1.as_str()));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -87,15 +87,6 @@ pub trait FlowServiceHandler: Send + Sync {
|
||||
flow: &str,
|
||||
ctx: QueryContextRef,
|
||||
) -> Result<api::v1::flow::FlowResponse>;
|
||||
|
||||
async fn adjust(
|
||||
&self,
|
||||
catalog: &str,
|
||||
flow: &str,
|
||||
min_run_interval_secs: u64,
|
||||
max_filter_num_per_query: usize,
|
||||
ctx: QueryContextRef,
|
||||
) -> Result<api::v1::flow::FlowResponse>;
|
||||
}
|
||||
|
||||
pub type TableMutationHandlerRef = Arc<dyn TableMutationHandler>;
|
||||
|
||||
@@ -12,15 +12,12 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_error::ext::BoxedError;
|
||||
use common_query::error::{ExecuteSnafu, InvalidFuncArgsSnafu, InvalidInputTypeSnafu, Result};
|
||||
use common_query::error::{InvalidInputTypeSnafu, Result};
|
||||
use common_query::prelude::{Signature, TypeSignature, Volatility};
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::types::cast::cast;
|
||||
use datatypes::value::ValueRef;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::ResultExt;
|
||||
use sql::parser::ParserContext;
|
||||
|
||||
/// Create a function signature with oneof signatures of interleaving two arguments.
|
||||
pub fn one_of_sigs2(args1: Vec<ConcreteDataType>, args2: Vec<ConcreteDataType>) -> Signature {
|
||||
@@ -46,30 +43,3 @@ pub fn cast_u64(value: &ValueRef) -> Result<Option<u64>> {
|
||||
})
|
||||
.map(|v| v.as_u64())
|
||||
}
|
||||
|
||||
pub fn parse_catalog_flow(
|
||||
flow_name: &str,
|
||||
query_ctx: &QueryContextRef,
|
||||
) -> Result<(String, String)> {
|
||||
let obj_name = ParserContext::parse_table_name(flow_name, query_ctx.sql_dialect())
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExecuteSnafu)?;
|
||||
|
||||
let (catalog_name, flow_name) = match &obj_name.0[..] {
|
||||
[flow_name] => (
|
||||
query_ctx.current_catalog().to_string(),
|
||||
flow_name.value.clone(),
|
||||
),
|
||||
[catalog, flow_name] => (catalog.value.clone(), flow_name.value.clone()),
|
||||
_ => {
|
||||
return InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"expect flow name to be <catalog>.<flow-name> or <flow-name>, actual: {}",
|
||||
obj_name
|
||||
),
|
||||
}
|
||||
.fail()
|
||||
}
|
||||
};
|
||||
Ok((catalog_name, flow_name))
|
||||
}
|
||||
|
||||
@@ -15,7 +15,6 @@
|
||||
#![feature(let_chains)]
|
||||
#![feature(try_blocks)]
|
||||
|
||||
mod adjust_flow;
|
||||
mod admin;
|
||||
mod flush_flow;
|
||||
mod macros;
|
||||
|
||||
@@ -148,17 +148,6 @@ impl FunctionState {
|
||||
) -> Result<api::v1::flow::FlowResponse> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
async fn adjust(
|
||||
&self,
|
||||
_catalog: &str,
|
||||
_flow: &str,
|
||||
_min_run_interval_secs: u64,
|
||||
_max_filter_num_per_query: usize,
|
||||
_ctx: QueryContextRef,
|
||||
) -> Result<api::v1::flow::FlowResponse> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
Self {
|
||||
|
||||
@@ -18,6 +18,7 @@ pub mod create_table;
|
||||
pub mod datanode_handler;
|
||||
pub mod flownode_handler;
|
||||
|
||||
use std::assert_matches::assert_matches;
|
||||
use std::collections::HashMap;
|
||||
|
||||
use api::v1::meta::Partition;
|
||||
@@ -75,8 +76,6 @@ pub async fn create_logical_table(
|
||||
physical_table_id: TableId,
|
||||
table_name: &str,
|
||||
) -> TableId {
|
||||
use std::assert_matches::assert_matches;
|
||||
|
||||
let tasks = vec![test_create_logical_table_task(table_name)];
|
||||
let mut procedure = CreateLogicalTablesProcedure::new(tasks, physical_table_id, ddl_context);
|
||||
let status = procedure.on_prepare().await.unwrap();
|
||||
|
||||
@@ -188,7 +188,71 @@ pub const CACHE_KEY_PREFIXES: [&str; 5] = [
|
||||
NODE_ADDRESS_PREFIX,
|
||||
];
|
||||
|
||||
pub type RegionDistribution = BTreeMap<DatanodeId, Vec<RegionNumber>>;
|
||||
/// A set of regions with the same role.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize)]
|
||||
pub struct RegionRoleSet {
|
||||
/// Leader regions.
|
||||
pub leader_regions: Vec<RegionNumber>,
|
||||
/// Follower regions.
|
||||
pub follower_regions: Vec<RegionNumber>,
|
||||
}
|
||||
|
||||
impl<'de> Deserialize<'de> for RegionRoleSet {
|
||||
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
{
|
||||
#[derive(Deserialize)]
|
||||
#[serde(untagged)]
|
||||
enum RegionRoleSetOrLeaderOnly {
|
||||
Full {
|
||||
leader_regions: Vec<RegionNumber>,
|
||||
follower_regions: Vec<RegionNumber>,
|
||||
},
|
||||
LeaderOnly(Vec<RegionNumber>),
|
||||
}
|
||||
match RegionRoleSetOrLeaderOnly::deserialize(deserializer)? {
|
||||
RegionRoleSetOrLeaderOnly::Full {
|
||||
leader_regions,
|
||||
follower_regions,
|
||||
} => Ok(RegionRoleSet::new(leader_regions, follower_regions)),
|
||||
RegionRoleSetOrLeaderOnly::LeaderOnly(leader_regions) => {
|
||||
Ok(RegionRoleSet::new(leader_regions, vec![]))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl RegionRoleSet {
|
||||
/// Create a new region role set.
|
||||
pub fn new(leader_regions: Vec<RegionNumber>, follower_regions: Vec<RegionNumber>) -> Self {
|
||||
Self {
|
||||
leader_regions,
|
||||
follower_regions,
|
||||
}
|
||||
}
|
||||
|
||||
/// Add a leader region to the set.
|
||||
pub fn add_leader_region(&mut self, region_number: RegionNumber) {
|
||||
self.leader_regions.push(region_number);
|
||||
}
|
||||
|
||||
/// Add a follower region to the set.
|
||||
pub fn add_follower_region(&mut self, region_number: RegionNumber) {
|
||||
self.follower_regions.push(region_number);
|
||||
}
|
||||
|
||||
/// Sort the regions.
|
||||
pub fn sort(&mut self) {
|
||||
self.follower_regions.sort();
|
||||
self.leader_regions.sort();
|
||||
}
|
||||
}
|
||||
|
||||
/// The distribution of regions.
|
||||
///
|
||||
/// The key is the datanode id, the value is the region role set.
|
||||
pub type RegionDistribution = BTreeMap<DatanodeId, RegionRoleSet>;
|
||||
|
||||
/// The id of flow.
|
||||
pub type FlowId = u32;
|
||||
@@ -1368,7 +1432,8 @@ mod tests {
|
||||
use crate::key::table_name::TableNameKey;
|
||||
use crate::key::table_route::TableRouteValue;
|
||||
use crate::key::{
|
||||
DeserializedValueWithBytes, TableMetadataManager, ViewInfoValue, TOPIC_REGION_PREFIX,
|
||||
DeserializedValueWithBytes, RegionDistribution, RegionRoleSet, TableMetadataManager,
|
||||
ViewInfoValue, TOPIC_REGION_PREFIX,
|
||||
};
|
||||
use crate::kv_backend::memory::MemoryKvBackend;
|
||||
use crate::kv_backend::KvBackend;
|
||||
@@ -1995,7 +2060,8 @@ mod tests {
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(got.regions, regions)
|
||||
assert_eq!(got.regions, regions.leader_regions);
|
||||
assert_eq!(got.follower_regions, regions.follower_regions);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2412,4 +2478,28 @@ mod tests {
|
||||
assert_eq!(current_view_info.columns, new_columns);
|
||||
assert_eq!(current_view_info.plan_columns, new_plan_columns);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_region_role_set_deserialize() {
|
||||
let s = r#"{"leader_regions": [1, 2, 3], "follower_regions": [4, 5, 6]}"#;
|
||||
let region_role_set: RegionRoleSet = serde_json::from_str(s).unwrap();
|
||||
assert_eq!(region_role_set.leader_regions, vec![1, 2, 3]);
|
||||
assert_eq!(region_role_set.follower_regions, vec![4, 5, 6]);
|
||||
|
||||
let s = r#"[1, 2, 3]"#;
|
||||
let region_role_set: RegionRoleSet = serde_json::from_str(s).unwrap();
|
||||
assert_eq!(region_role_set.leader_regions, vec![1, 2, 3]);
|
||||
assert!(region_role_set.follower_regions.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_region_distribution_deserialize() {
|
||||
let s = r#"{"1": [1,2,3], "2": {"leader_regions": [7, 8, 9], "follower_regions": [10, 11, 12]}}"#;
|
||||
let region_distribution: RegionDistribution = serde_json::from_str(s).unwrap();
|
||||
assert_eq!(region_distribution.len(), 2);
|
||||
assert_eq!(region_distribution[&1].leader_regions, vec![1, 2, 3]);
|
||||
assert!(region_distribution[&1].follower_regions.is_empty());
|
||||
assert_eq!(region_distribution[&2].leader_regions, vec![7, 8, 9]);
|
||||
assert_eq!(region_distribution[&2].follower_regions, vec![10, 11, 12]);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,7 +24,7 @@ use table::metadata::TableId;
|
||||
use crate::error::{DatanodeTableInfoNotFoundSnafu, InvalidMetadataSnafu, Result};
|
||||
use crate::key::table_route::PhysicalTableRouteValue;
|
||||
use crate::key::{
|
||||
MetadataKey, MetadataValue, RegionDistribution, DATANODE_TABLE_KEY_PATTERN,
|
||||
MetadataKey, MetadataValue, RegionDistribution, RegionRoleSet, DATANODE_TABLE_KEY_PATTERN,
|
||||
DATANODE_TABLE_KEY_PREFIX,
|
||||
};
|
||||
use crate::kv_backend::txn::{Txn, TxnOp};
|
||||
@@ -118,23 +118,31 @@ impl Display for DatanodeTableKey {
|
||||
pub struct DatanodeTableValue {
|
||||
pub table_id: TableId,
|
||||
pub regions: Vec<RegionNumber>,
|
||||
#[serde(default)]
|
||||
pub follower_regions: Vec<RegionNumber>,
|
||||
#[serde(flatten)]
|
||||
pub region_info: RegionInfo,
|
||||
version: u64,
|
||||
}
|
||||
|
||||
impl DatanodeTableValue {
|
||||
pub fn new(table_id: TableId, regions: Vec<RegionNumber>, region_info: RegionInfo) -> Self {
|
||||
pub fn new(table_id: TableId, region_role_set: RegionRoleSet, region_info: RegionInfo) -> Self {
|
||||
let RegionRoleSet {
|
||||
leader_regions,
|
||||
follower_regions,
|
||||
} = region_role_set;
|
||||
|
||||
Self {
|
||||
table_id,
|
||||
regions,
|
||||
regions: leader_regions,
|
||||
follower_regions,
|
||||
region_info,
|
||||
version: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Decodes `KeyValue` to ((),`DatanodeTableValue`)
|
||||
/// Decodes [`KeyValue`] to [`DatanodeTableValue`].
|
||||
pub fn datanode_table_value_decoder(kv: KeyValue) -> Result<DatanodeTableValue> {
|
||||
DatanodeTableValue::try_from_raw_value(&kv.value)
|
||||
}
|
||||
@@ -373,10 +381,11 @@ mod tests {
|
||||
let value = DatanodeTableValue {
|
||||
table_id: 42,
|
||||
regions: vec![1, 2, 3],
|
||||
follower_regions: vec![],
|
||||
region_info: RegionInfo::default(),
|
||||
version: 1,
|
||||
};
|
||||
let literal = br#"{"table_id":42,"regions":[1,2,3],"engine":"","region_storage_path":"","region_options":{},"region_wal_options":{},"version":1}"#;
|
||||
let literal = br#"{"table_id":42,"regions":[1,2,3],"follower_regions":[],"engine":"","region_storage_path":"","region_options":{},"region_wal_options":{},"version":1}"#;
|
||||
|
||||
let raw_value = value.try_as_raw_value().unwrap();
|
||||
assert_eq!(raw_value, literal);
|
||||
@@ -467,6 +476,7 @@ mod tests {
|
||||
let table_value = DatanodeTableValue {
|
||||
table_id: 1,
|
||||
regions: vec![],
|
||||
follower_regions: vec![],
|
||||
region_info,
|
||||
version: 1,
|
||||
};
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
|
||||
pub mod flow_info;
|
||||
pub(crate) mod flow_name;
|
||||
pub(crate) mod flow_route;
|
||||
pub mod flow_route;
|
||||
pub mod flow_state;
|
||||
mod flownode_addr_helper;
|
||||
pub(crate) mod flownode_flow;
|
||||
|
||||
@@ -114,37 +114,37 @@ impl<'a> MetadataKey<'a, FlowInfoKeyInner> for FlowInfoKeyInner {
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
pub struct FlowInfoValue {
|
||||
/// The source tables used by the flow.
|
||||
pub(crate) source_table_ids: Vec<TableId>,
|
||||
pub source_table_ids: Vec<TableId>,
|
||||
/// The sink table used by the flow.
|
||||
pub(crate) sink_table_name: TableName,
|
||||
pub sink_table_name: TableName,
|
||||
/// Which flow nodes this flow is running on.
|
||||
pub(crate) flownode_ids: BTreeMap<FlowPartitionId, FlownodeId>,
|
||||
pub flownode_ids: BTreeMap<FlowPartitionId, FlownodeId>,
|
||||
/// The catalog name.
|
||||
pub(crate) catalog_name: String,
|
||||
pub catalog_name: String,
|
||||
/// The query context used when create flow.
|
||||
/// Although flow doesn't belong to any schema, this query_context is needed to remember
|
||||
/// the query context when `create_flow` is executed
|
||||
/// for recovering flow using the same sql&query_context after db restart.
|
||||
/// if none, should use default query context
|
||||
#[serde(default)]
|
||||
pub(crate) query_context: Option<crate::rpc::ddl::QueryContext>,
|
||||
pub query_context: Option<crate::rpc::ddl::QueryContext>,
|
||||
/// The flow name.
|
||||
pub(crate) flow_name: String,
|
||||
pub flow_name: String,
|
||||
/// The raw sql.
|
||||
pub(crate) raw_sql: String,
|
||||
pub raw_sql: String,
|
||||
/// The expr of expire.
|
||||
/// Duration in seconds as `i64`.
|
||||
pub(crate) expire_after: Option<i64>,
|
||||
pub expire_after: Option<i64>,
|
||||
/// The comment.
|
||||
pub(crate) comment: String,
|
||||
pub comment: String,
|
||||
/// The options.
|
||||
pub(crate) options: HashMap<String, String>,
|
||||
pub options: HashMap<String, String>,
|
||||
/// The created time
|
||||
#[serde(default)]
|
||||
pub(crate) created_time: DateTime<Utc>,
|
||||
pub created_time: DateTime<Utc>,
|
||||
/// The updated time.
|
||||
#[serde(default)]
|
||||
pub(crate) updated_time: DateTime<Utc>,
|
||||
pub updated_time: DateTime<Utc>,
|
||||
}
|
||||
|
||||
impl FlowInfoValue {
|
||||
|
||||
@@ -40,17 +40,23 @@ pub fn region_distribution(region_routes: &[RegionRoute]) -> RegionDistribution
|
||||
let mut regions_id_map = RegionDistribution::new();
|
||||
for route in region_routes.iter() {
|
||||
if let Some(peer) = route.leader_peer.as_ref() {
|
||||
let region_id = route.region.id.region_number();
|
||||
regions_id_map.entry(peer.id).or_default().push(region_id);
|
||||
let region_number = route.region.id.region_number();
|
||||
regions_id_map
|
||||
.entry(peer.id)
|
||||
.or_default()
|
||||
.add_leader_region(region_number);
|
||||
}
|
||||
for peer in route.follower_peers.iter() {
|
||||
let region_id = route.region.id.region_number();
|
||||
regions_id_map.entry(peer.id).or_default().push(region_id);
|
||||
let region_number = route.region.id.region_number();
|
||||
regions_id_map
|
||||
.entry(peer.id)
|
||||
.or_default()
|
||||
.add_follower_region(region_number);
|
||||
}
|
||||
}
|
||||
for (_, regions) in regions_id_map.iter_mut() {
|
||||
// id asc
|
||||
regions.sort()
|
||||
for (_, region_role_set) in regions_id_map.iter_mut() {
|
||||
// Sort the regions in ascending order.
|
||||
region_role_set.sort()
|
||||
}
|
||||
regions_id_map
|
||||
}
|
||||
@@ -455,6 +461,7 @@ impl From<PbPartition> for Partition {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::key::RegionRoleSet;
|
||||
|
||||
#[test]
|
||||
fn test_leader_is_downgraded() {
|
||||
@@ -611,8 +618,8 @@ mod tests {
|
||||
|
||||
let distribution = region_distribution(®ion_routes);
|
||||
assert_eq!(distribution.len(), 3);
|
||||
assert_eq!(distribution[&1], vec![1, 2]);
|
||||
assert_eq!(distribution[&2], vec![1, 2]);
|
||||
assert_eq!(distribution[&3], vec![1, 2]);
|
||||
assert_eq!(distribution[&1], RegionRoleSet::new(vec![1], vec![2]));
|
||||
assert_eq!(distribution[&2], RegionRoleSet::new(vec![2], vec![1]));
|
||||
assert_eq!(distribution[&3], RegionRoleSet::new(vec![], vec![1, 2]));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -133,6 +133,18 @@ pub enum Error {
|
||||
source: datatypes::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Failed to downcast vector of type '{:?}' to type '{:?}'",
|
||||
from_type,
|
||||
to_type
|
||||
))]
|
||||
DowncastVector {
|
||||
from_type: ConcreteDataType,
|
||||
to_type: ConcreteDataType,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Error occurs when performing arrow computation"))]
|
||||
ArrowCompute {
|
||||
#[snafu(source)]
|
||||
@@ -192,6 +204,8 @@ impl ErrorExt for Error {
|
||||
| Error::PhysicalExpr { .. }
|
||||
| Error::RecordBatchSliceIndexOverflow { .. } => StatusCode::Internal,
|
||||
|
||||
Error::DowncastVector { .. } => StatusCode::Unexpected,
|
||||
|
||||
Error::PollStream { .. } => StatusCode::EngineExecuteQuery,
|
||||
|
||||
Error::ArrowCompute { .. } => StatusCode::IllegalState,
|
||||
|
||||
@@ -30,13 +30,16 @@ pub use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecord
|
||||
use datatypes::arrow::compute::SortOptions;
|
||||
pub use datatypes::arrow::record_batch::RecordBatch as DfRecordBatch;
|
||||
use datatypes::arrow::util::pretty;
|
||||
use datatypes::prelude::VectorRef;
|
||||
use datatypes::schema::{Schema, SchemaRef};
|
||||
use datatypes::prelude::{ConcreteDataType, VectorRef};
|
||||
use datatypes::scalars::{ScalarVector, ScalarVectorBuilder};
|
||||
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
|
||||
use datatypes::types::json_type_value_to_string;
|
||||
use datatypes::vectors::{BinaryVector, StringVectorBuilder};
|
||||
use error::Result;
|
||||
use futures::task::{Context, Poll};
|
||||
use futures::{Stream, TryStreamExt};
|
||||
pub use recordbatch::RecordBatch;
|
||||
use snafu::{ensure, ResultExt};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
|
||||
pub trait RecordBatchStream: Stream<Item = Result<RecordBatch>> {
|
||||
fn name(&self) -> &str {
|
||||
@@ -58,6 +61,146 @@ pub struct OrderOption {
|
||||
pub options: SortOptions,
|
||||
}
|
||||
|
||||
/// A wrapper that maps a [RecordBatchStream] to a new [RecordBatchStream] by applying a function to each [RecordBatch].
|
||||
///
|
||||
/// The mapper function is applied to each [RecordBatch] in the stream.
|
||||
/// The schema of the new [RecordBatchStream] is the same as the schema of the inner [RecordBatchStream] after applying the schema mapper function.
|
||||
/// The output ordering of the new [RecordBatchStream] is the same as the output ordering of the inner [RecordBatchStream].
|
||||
/// The metrics of the new [RecordBatchStream] is the same as the metrics of the inner [RecordBatchStream] if it is not `None`.
|
||||
pub struct SendableRecordBatchMapper {
|
||||
inner: SendableRecordBatchStream,
|
||||
/// The mapper function is applied to each [RecordBatch] in the stream.
|
||||
/// The original schema and the mapped schema are passed to the mapper function.
|
||||
mapper: fn(RecordBatch, &SchemaRef, &SchemaRef) -> Result<RecordBatch>,
|
||||
/// The schema of the new [RecordBatchStream] is the same as the schema of the inner [RecordBatchStream] after applying the schema mapper function.
|
||||
schema: SchemaRef,
|
||||
/// Whether the mapper function is applied to each [RecordBatch] in the stream.
|
||||
apply_mapper: bool,
|
||||
}
|
||||
|
||||
/// Maps the json type to string in the batch.
|
||||
///
|
||||
/// The json type is mapped to string by converting the json value to string.
|
||||
/// The batch is updated to have the same number of columns as the original batch,
|
||||
/// but with the json type mapped to string.
|
||||
pub fn map_json_type_to_string(
|
||||
batch: RecordBatch,
|
||||
original_schema: &SchemaRef,
|
||||
mapped_schema: &SchemaRef,
|
||||
) -> Result<RecordBatch> {
|
||||
let mut vectors = Vec::with_capacity(original_schema.column_schemas().len());
|
||||
for (vector, schema) in batch.columns.iter().zip(original_schema.column_schemas()) {
|
||||
if let ConcreteDataType::Json(j) = schema.data_type {
|
||||
let mut string_vector_builder = StringVectorBuilder::with_capacity(vector.len());
|
||||
let binary_vector = vector
|
||||
.as_any()
|
||||
.downcast_ref::<BinaryVector>()
|
||||
.with_context(|| error::DowncastVectorSnafu {
|
||||
from_type: schema.data_type.clone(),
|
||||
to_type: ConcreteDataType::binary_datatype(),
|
||||
})?;
|
||||
for value in binary_vector.iter_data() {
|
||||
let Some(value) = value else {
|
||||
string_vector_builder.push(None);
|
||||
continue;
|
||||
};
|
||||
let string_value =
|
||||
json_type_value_to_string(value, &j.format).with_context(|_| {
|
||||
error::CastVectorSnafu {
|
||||
from_type: schema.data_type.clone(),
|
||||
to_type: ConcreteDataType::string_datatype(),
|
||||
}
|
||||
})?;
|
||||
string_vector_builder.push(Some(string_value.as_str()));
|
||||
}
|
||||
|
||||
let string_vector = string_vector_builder.finish();
|
||||
vectors.push(Arc::new(string_vector) as VectorRef);
|
||||
} else {
|
||||
vectors.push(vector.clone());
|
||||
}
|
||||
}
|
||||
|
||||
RecordBatch::new(mapped_schema.clone(), vectors)
|
||||
}
|
||||
|
||||
/// Maps the json type to string in the schema.
|
||||
///
|
||||
/// The json type is mapped to string by converting the json value to string.
|
||||
/// The schema is updated to have the same number of columns as the original schema,
|
||||
/// but with the json type mapped to string.
|
||||
///
|
||||
/// Returns the new schema and whether the schema needs to be mapped to string.
|
||||
pub fn map_json_type_to_string_schema(schema: SchemaRef) -> (SchemaRef, bool) {
|
||||
let mut new_columns = Vec::with_capacity(schema.column_schemas().len());
|
||||
let mut apply_mapper = false;
|
||||
for column in schema.column_schemas() {
|
||||
if matches!(column.data_type, ConcreteDataType::Json(_)) {
|
||||
new_columns.push(ColumnSchema::new(
|
||||
column.name.to_string(),
|
||||
ConcreteDataType::string_datatype(),
|
||||
column.is_nullable(),
|
||||
));
|
||||
apply_mapper = true;
|
||||
} else {
|
||||
new_columns.push(column.clone());
|
||||
}
|
||||
}
|
||||
(Arc::new(Schema::new(new_columns)), apply_mapper)
|
||||
}
|
||||
|
||||
impl SendableRecordBatchMapper {
|
||||
/// Creates a new [SendableRecordBatchMapper] with the given inner [RecordBatchStream], mapper function, and schema mapper function.
|
||||
pub fn new(
|
||||
inner: SendableRecordBatchStream,
|
||||
mapper: fn(RecordBatch, &SchemaRef, &SchemaRef) -> Result<RecordBatch>,
|
||||
schema_mapper: fn(SchemaRef) -> (SchemaRef, bool),
|
||||
) -> Self {
|
||||
let (mapped_schema, apply_mapper) = schema_mapper(inner.schema());
|
||||
Self {
|
||||
inner,
|
||||
mapper,
|
||||
schema: mapped_schema,
|
||||
apply_mapper,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl RecordBatchStream for SendableRecordBatchMapper {
|
||||
fn name(&self) -> &str {
|
||||
"SendableRecordBatchMapper"
|
||||
}
|
||||
|
||||
fn schema(&self) -> SchemaRef {
|
||||
self.schema.clone()
|
||||
}
|
||||
|
||||
fn output_ordering(&self) -> Option<&[OrderOption]> {
|
||||
self.inner.output_ordering()
|
||||
}
|
||||
|
||||
fn metrics(&self) -> Option<RecordBatchMetrics> {
|
||||
self.inner.metrics()
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for SendableRecordBatchMapper {
|
||||
type Item = Result<RecordBatch>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
if self.apply_mapper {
|
||||
Pin::new(&mut self.inner).poll_next(cx).map(|opt| {
|
||||
opt.map(|result| {
|
||||
result
|
||||
.and_then(|batch| (self.mapper)(batch, &self.inner.schema(), &self.schema))
|
||||
})
|
||||
})
|
||||
} else {
|
||||
Pin::new(&mut self.inner).poll_next(cx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// EmptyRecordBatchStream can be used to create a RecordBatchStream
|
||||
/// that will produce no results
|
||||
pub struct EmptyRecordBatchStream {
|
||||
|
||||
11
src/common/stat/Cargo.toml
Normal file
11
src/common/stat/Cargo.toml
Normal file
@@ -0,0 +1,11 @@
|
||||
[package]
|
||||
name = "stat"
|
||||
version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
nix.workspace = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
183
src/common/stat/src/cgroups.rs
Normal file
183
src/common/stat/src/cgroups.rs
Normal file
@@ -0,0 +1,183 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#![allow(dead_code)]
|
||||
|
||||
use std::fs::read_to_string;
|
||||
use std::path::Path;
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
use nix::sys::{statfs, statfs::statfs};
|
||||
|
||||
/// `MAX_VALUE` is used to indicate that the resource is unlimited.
|
||||
pub const MAX_VALUE: i64 = -1;
|
||||
|
||||
const CGROUP_UNIFIED_MOUNTPOINT: &str = "/sys/fs/cgroup";
|
||||
|
||||
const MEMORY_MAX_FILE_CGROUP_V2: &str = "memory.max";
|
||||
const MEMORY_MAX_FILE_CGROUP_V1: &str = "memory.limit_in_bytes";
|
||||
const CPU_MAX_FILE_CGROUP_V2: &str = "cpu.max";
|
||||
const CPU_QUOTA_FILE_CGROUP_V1: &str = "cpu.cfs_quota_us";
|
||||
const CPU_PERIOD_FILE_CGROUP_V1: &str = "cpu.cfs_period_us";
|
||||
|
||||
// `MAX_VALUE_CGROUP_V2` string in `/sys/fs/cgroup/cpu.max` and `/sys/fs/cgroup/memory.max` to indicate that the resource is unlimited.
|
||||
const MAX_VALUE_CGROUP_V2: &str = "max";
|
||||
|
||||
// For cgroup v1, if the memory is unlimited, it will return a very large value(different from platform) that close to 2^63.
|
||||
// For easier comparison, if the memory limit is larger than 1PB we consider it as unlimited.
|
||||
const MAX_MEMORY_IN_BYTES: i64 = 1125899906842624; // 1PB
|
||||
|
||||
/// Get the limit of memory in bytes.
|
||||
///
|
||||
/// - If the memory is unlimited, return `-1`.
|
||||
/// - Return `None` if it fails to read the memory limit or not on linux.
|
||||
pub fn get_memory_limit() -> Option<i64> {
|
||||
#[cfg(target_os = "linux")]
|
||||
{
|
||||
let memory_max_file = if is_cgroup_v2()? {
|
||||
// Read `/sys/fs/cgroup/memory.max` to get the memory limit.
|
||||
MEMORY_MAX_FILE_CGROUP_V2
|
||||
} else {
|
||||
// Read `/sys/fs/cgroup/memory.limit_in_bytes` to get the memory limit.
|
||||
MEMORY_MAX_FILE_CGROUP_V1
|
||||
};
|
||||
|
||||
// For cgroup v1, it will return a very large value(different from platform) if the memory is unlimited.
|
||||
let memory_limit =
|
||||
read_value_from_file(Path::new(CGROUP_UNIFIED_MOUNTPOINT).join(memory_max_file))?;
|
||||
|
||||
// If memory limit exceeds 1PB(cgroup v1), consider it as unlimited.
|
||||
if memory_limit > MAX_MEMORY_IN_BYTES {
|
||||
return Some(MAX_VALUE);
|
||||
}
|
||||
Some(memory_limit)
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
None
|
||||
}
|
||||
|
||||
/// Get the limit of cpu in millicores.
|
||||
///
|
||||
/// - If the cpu is unlimited, return `-1`.
|
||||
/// - Return `None` if it fails to read the cpu limit or not on linux.
|
||||
pub fn get_cpu_limit() -> Option<i64> {
|
||||
#[cfg(target_os = "linux")]
|
||||
if is_cgroup_v2()? {
|
||||
// Read `/sys/fs/cgroup/cpu.max` to get the cpu limit.
|
||||
get_cgroup_v2_cpu_limit(Path::new(CGROUP_UNIFIED_MOUNTPOINT).join(CPU_MAX_FILE_CGROUP_V2))
|
||||
} else {
|
||||
// Read `/sys/fs/cgroup/cpu.cfs_quota_us` and `/sys/fs/cgroup/cpu.cfs_period_us` to get the cpu limit.
|
||||
let quota = read_value_from_file(
|
||||
Path::new(CGROUP_UNIFIED_MOUNTPOINT).join(CPU_QUOTA_FILE_CGROUP_V1),
|
||||
)?;
|
||||
|
||||
if quota == MAX_VALUE {
|
||||
return Some(MAX_VALUE);
|
||||
}
|
||||
|
||||
let period = read_value_from_file(
|
||||
Path::new(CGROUP_UNIFIED_MOUNTPOINT).join(CPU_PERIOD_FILE_CGROUP_V1),
|
||||
)?;
|
||||
|
||||
// Return the cpu limit in millicores.
|
||||
Some(quota * 1000 / period)
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
None
|
||||
}
|
||||
|
||||
// Check whether the cgroup is v2.
|
||||
// - Return `true` if the cgroup is v2, otherwise return `false`.
|
||||
// - Return `None` if the detection fails or not on linux.
|
||||
fn is_cgroup_v2() -> Option<bool> {
|
||||
#[cfg(target_os = "linux")]
|
||||
{
|
||||
let path = Path::new(CGROUP_UNIFIED_MOUNTPOINT);
|
||||
let fs_stat = statfs(path).ok()?;
|
||||
Some(fs_stat.filesystem_type() == statfs::CGROUP2_SUPER_MAGIC)
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
None
|
||||
}
|
||||
|
||||
fn read_value_from_file<P: AsRef<Path>>(path: P) -> Option<i64> {
|
||||
let content = read_to_string(&path).ok()?;
|
||||
|
||||
// If the content starts with "max", return `MAX_VALUE`.
|
||||
if content.starts_with(MAX_VALUE_CGROUP_V2) {
|
||||
return Some(MAX_VALUE);
|
||||
}
|
||||
|
||||
content.trim().parse::<i64>().ok()
|
||||
}
|
||||
|
||||
fn get_cgroup_v2_cpu_limit<P: AsRef<Path>>(path: P) -> Option<i64> {
|
||||
let content = read_to_string(&path).ok()?;
|
||||
|
||||
let fields = content.trim().split(' ').collect::<Vec<&str>>();
|
||||
if fields.len() != 2 {
|
||||
return None;
|
||||
}
|
||||
|
||||
// If the cpu is unlimited, it will be `-1`.
|
||||
let quota = fields[0].trim();
|
||||
if quota == MAX_VALUE_CGROUP_V2 {
|
||||
return Some(MAX_VALUE);
|
||||
}
|
||||
|
||||
let quota = quota.parse::<i64>().ok()?;
|
||||
|
||||
let period = fields[1].trim();
|
||||
let period = period.parse::<i64>().ok()?;
|
||||
|
||||
// Return the cpu limit in millicores.
|
||||
Some(quota * 1000 / period)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_read_value_from_file() {
|
||||
assert_eq!(
|
||||
read_value_from_file(Path::new("testdata").join("memory.max")).unwrap(),
|
||||
100000
|
||||
);
|
||||
assert_eq!(
|
||||
read_value_from_file(Path::new("testdata").join("memory.max.unlimited")).unwrap(),
|
||||
MAX_VALUE
|
||||
);
|
||||
assert_eq!(read_value_from_file(Path::new("non_existent_file")), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_cgroup_v2_cpu_limit() {
|
||||
assert_eq!(
|
||||
get_cgroup_v2_cpu_limit(Path::new("testdata").join("cpu.max")).unwrap(),
|
||||
1500
|
||||
);
|
||||
assert_eq!(
|
||||
get_cgroup_v2_cpu_limit(Path::new("testdata").join("cpu.max.unlimited")).unwrap(),
|
||||
MAX_VALUE
|
||||
);
|
||||
assert_eq!(
|
||||
get_cgroup_v2_cpu_limit(Path::new("non_existent_file")),
|
||||
None
|
||||
);
|
||||
}
|
||||
}
|
||||
17
src/common/stat/src/lib.rs
Normal file
17
src/common/stat/src/lib.rs
Normal file
@@ -0,0 +1,17 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
mod cgroups;
|
||||
|
||||
pub use cgroups::*;
|
||||
1
src/common/stat/testdata/cpu.max
vendored
Normal file
1
src/common/stat/testdata/cpu.max
vendored
Normal file
@@ -0,0 +1 @@
|
||||
150000 100000
|
||||
1
src/common/stat/testdata/cpu.max.unlimited
vendored
Normal file
1
src/common/stat/testdata/cpu.max.unlimited
vendored
Normal file
@@ -0,0 +1 @@
|
||||
max 100000
|
||||
1
src/common/stat/testdata/memory.max
vendored
Normal file
1
src/common/stat/testdata/memory.max
vendored
Normal file
@@ -0,0 +1 @@
|
||||
100000
|
||||
1
src/common/stat/testdata/memory.max.unlimited
vendored
Normal file
1
src/common/stat/testdata/memory.max.unlimited
vendored
Normal file
@@ -0,0 +1 @@
|
||||
max
|
||||
@@ -6,6 +6,7 @@ license.workspace = true
|
||||
|
||||
[features]
|
||||
testing = []
|
||||
enterprise = []
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
@@ -559,6 +559,8 @@ async fn open_all_regions(
|
||||
init_regions_parallelism: usize,
|
||||
) -> Result<()> {
|
||||
let mut regions = vec![];
|
||||
#[cfg(feature = "enterprise")]
|
||||
let mut follower_regions = vec![];
|
||||
for table_value in table_values {
|
||||
for region_number in table_value.regions {
|
||||
// Augments region options with wal options if a wal options is provided.
|
||||
@@ -576,6 +578,24 @@ async fn open_all_regions(
|
||||
region_options,
|
||||
));
|
||||
}
|
||||
|
||||
#[cfg(feature = "enterprise")]
|
||||
for region_number in table_value.follower_regions {
|
||||
// Augments region options with wal options if a wal options is provided.
|
||||
let mut region_options = table_value.region_info.region_options.clone();
|
||||
prepare_wal_options(
|
||||
&mut region_options,
|
||||
RegionId::new(table_value.table_id, region_number),
|
||||
&table_value.region_info.region_wal_options,
|
||||
);
|
||||
|
||||
follower_regions.push((
|
||||
RegionId::new(table_value.table_id, region_number),
|
||||
table_value.region_info.engine.clone(),
|
||||
table_value.region_info.region_storage_path.clone(),
|
||||
region_options,
|
||||
));
|
||||
}
|
||||
}
|
||||
let num_regions = regions.len();
|
||||
info!("going to open {} region(s)", num_regions);
|
||||
@@ -617,6 +637,43 @@ async fn open_all_regions(
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "enterprise")]
|
||||
if !follower_regions.is_empty() {
|
||||
info!(
|
||||
"going to open {} follower region(s)",
|
||||
follower_regions.len()
|
||||
);
|
||||
let mut region_requests = Vec::with_capacity(follower_regions.len());
|
||||
for (region_id, engine, store_path, options) in follower_regions {
|
||||
let region_dir = region_dir(&store_path, region_id);
|
||||
region_requests.push((
|
||||
region_id,
|
||||
RegionOpenRequest {
|
||||
engine,
|
||||
region_dir,
|
||||
options,
|
||||
skip_wal_replay: true,
|
||||
},
|
||||
));
|
||||
}
|
||||
|
||||
let open_regions = region_server
|
||||
.handle_batch_open_requests(init_regions_parallelism, region_requests)
|
||||
.await?;
|
||||
|
||||
ensure!(
|
||||
open_regions.len() == num_regions,
|
||||
error::UnexpectedSnafu {
|
||||
violated: format!(
|
||||
"Expected to open {} of follower regions, only {} of regions has opened",
|
||||
num_regions,
|
||||
open_regions.len()
|
||||
)
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
info!("all regions are opened");
|
||||
|
||||
Ok(())
|
||||
@@ -632,6 +689,7 @@ mod tests {
|
||||
use common_base::Plugins;
|
||||
use common_meta::cache::LayeredCacheRegistryBuilder;
|
||||
use common_meta::key::datanode_table::DatanodeTableManager;
|
||||
use common_meta::key::RegionRoleSet;
|
||||
use common_meta::kv_backend::memory::MemoryKvBackend;
|
||||
use common_meta::kv_backend::KvBackendRef;
|
||||
use mito2::engine::MITO_ENGINE_NAME;
|
||||
@@ -651,7 +709,7 @@ mod tests {
|
||||
"foo/bar/weny",
|
||||
HashMap::from([("foo".to_string(), "bar".to_string())]),
|
||||
HashMap::default(),
|
||||
BTreeMap::from([(0, vec![0, 1, 2])]),
|
||||
BTreeMap::from([(0, RegionRoleSet::new(vec![0, 1, 2], vec![]))]),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
|
||||
@@ -61,7 +61,6 @@ prost.workspace = true
|
||||
query.workspace = true
|
||||
rand.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
servers.workspace = true
|
||||
session.workspace = true
|
||||
smallvec.workspace = true
|
||||
|
||||
@@ -18,7 +18,7 @@ use std::sync::atomic::AtomicBool;
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::flow::{
|
||||
flow_request, AdjustFlow, CreateRequest, DropRequest, FlowRequest, FlowResponse, FlushFlow,
|
||||
flow_request, CreateRequest, DropRequest, FlowRequest, FlowResponse, FlushFlow,
|
||||
};
|
||||
use api::v1::region::InsertRequests;
|
||||
use catalog::CatalogManager;
|
||||
@@ -32,7 +32,6 @@ use common_telemetry::{error, info, trace, warn};
|
||||
use datatypes::value::Value;
|
||||
use futures::TryStreamExt;
|
||||
use itertools::Itertools;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use session::context::QueryContextBuilder;
|
||||
use snafu::{ensure, IntoError, OptionExt, ResultExt};
|
||||
use store_api::storage::{RegionId, TableId};
|
||||
@@ -810,25 +809,6 @@ impl common_meta::node_manager::Flownode for FlowDualEngine {
|
||||
..Default::default()
|
||||
})
|
||||
}
|
||||
Some(flow_request::Body::Adjust(AdjustFlow { flow_id, options })) => {
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
struct Options {
|
||||
min_run_interval_secs: u64,
|
||||
max_filter_num_per_query: usize,
|
||||
}
|
||||
let options: Options = serde_json::from_str(&options).with_context(|_| {
|
||||
common_meta::error::DeserializeFromJsonSnafu { input: options }
|
||||
})?;
|
||||
self.batching_engine
|
||||
.adjust_flow(
|
||||
flow_id.unwrap().id as u64,
|
||||
options.min_run_interval_secs,
|
||||
options.max_filter_num_per_query,
|
||||
)
|
||||
.await
|
||||
.map_err(to_meta_err(snafu::location!()))?;
|
||||
Ok(Default::default())
|
||||
}
|
||||
other => common_meta::error::InvalidFlowRequestBodySnafu { body: other }.fail(),
|
||||
}
|
||||
}
|
||||
@@ -861,6 +841,93 @@ fn to_meta_err(
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl common_meta::node_manager::Flownode for StreamingEngine {
|
||||
async fn handle(&self, request: FlowRequest) -> MetaResult<FlowResponse> {
|
||||
let query_ctx = request
|
||||
.header
|
||||
.and_then(|h| h.query_context)
|
||||
.map(|ctx| ctx.into());
|
||||
match request.body {
|
||||
Some(flow_request::Body::Create(CreateRequest {
|
||||
flow_id: Some(task_id),
|
||||
source_table_ids,
|
||||
sink_table_name: Some(sink_table_name),
|
||||
create_if_not_exists,
|
||||
expire_after,
|
||||
comment,
|
||||
sql,
|
||||
flow_options,
|
||||
or_replace,
|
||||
})) => {
|
||||
let source_table_ids = source_table_ids.into_iter().map(|id| id.id).collect_vec();
|
||||
let sink_table_name = [
|
||||
sink_table_name.catalog_name,
|
||||
sink_table_name.schema_name,
|
||||
sink_table_name.table_name,
|
||||
];
|
||||
let expire_after = expire_after.map(|e| e.value);
|
||||
let args = CreateFlowArgs {
|
||||
flow_id: task_id.id as u64,
|
||||
sink_table_name,
|
||||
source_table_ids,
|
||||
create_if_not_exists,
|
||||
or_replace,
|
||||
expire_after,
|
||||
comment: Some(comment),
|
||||
sql: sql.clone(),
|
||||
flow_options,
|
||||
query_ctx,
|
||||
};
|
||||
let ret = self
|
||||
.create_flow(args)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.with_context(|_| CreateFlowSnafu { sql: sql.clone() })
|
||||
.map_err(to_meta_err(snafu::location!()))?;
|
||||
METRIC_FLOW_TASK_COUNT.inc();
|
||||
Ok(FlowResponse {
|
||||
affected_flows: ret
|
||||
.map(|id| greptime_proto::v1::FlowId { id: id as u32 })
|
||||
.into_iter()
|
||||
.collect_vec(),
|
||||
..Default::default()
|
||||
})
|
||||
}
|
||||
Some(flow_request::Body::Drop(DropRequest {
|
||||
flow_id: Some(flow_id),
|
||||
})) => {
|
||||
self.remove_flow(flow_id.id as u64)
|
||||
.await
|
||||
.map_err(to_meta_err(snafu::location!()))?;
|
||||
METRIC_FLOW_TASK_COUNT.dec();
|
||||
Ok(Default::default())
|
||||
}
|
||||
Some(flow_request::Body::Flush(FlushFlow {
|
||||
flow_id: Some(flow_id),
|
||||
})) => {
|
||||
let row = self
|
||||
.flush_flow_inner(flow_id.id as u64)
|
||||
.await
|
||||
.map_err(to_meta_err(snafu::location!()))?;
|
||||
Ok(FlowResponse {
|
||||
affected_flows: vec![flow_id],
|
||||
affected_rows: row as u64,
|
||||
..Default::default()
|
||||
})
|
||||
}
|
||||
other => common_meta::error::InvalidFlowRequestBodySnafu { body: other }.fail(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_inserts(&self, request: InsertRequests) -> MetaResult<FlowResponse> {
|
||||
self.handle_inserts_inner(request)
|
||||
.await
|
||||
.map(|_| Default::default())
|
||||
.map_err(to_meta_err(snafu::location!()))
|
||||
}
|
||||
}
|
||||
|
||||
impl FlowEngine for StreamingEngine {
|
||||
async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
|
||||
self.create_flow_inner(args).await
|
||||
|
||||
@@ -388,20 +388,6 @@ impl BatchingEngine {
|
||||
pub async fn flow_exist_inner(&self, flow_id: FlowId) -> bool {
|
||||
self.tasks.read().await.contains_key(&flow_id)
|
||||
}
|
||||
|
||||
pub async fn adjust_flow(
|
||||
&self,
|
||||
flow_id: FlowId,
|
||||
min_run_interval_secs: u64,
|
||||
max_filter_num_per_query: usize,
|
||||
) -> Result<(), Error> {
|
||||
let task = self.tasks.read().await.get(&flow_id).cloned();
|
||||
let task = task.with_context(|| FlowNotFoundSnafu { id: flow_id })?;
|
||||
debug!("Adjusting flow {flow_id} with min_run_interval_secs={} and max_filter_num_per_query={}", min_run_interval_secs, max_filter_num_per_query);
|
||||
task.adjust(min_run_interval_secs, max_filter_num_per_query);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl FlowEngine for BatchingEngine {
|
||||
|
||||
@@ -32,7 +32,6 @@ use crate::batching_mode::MIN_REFRESH_DURATION;
|
||||
use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu};
|
||||
use crate::metrics::{
|
||||
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT,
|
||||
METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT,
|
||||
};
|
||||
use crate::{Error, FlowId};
|
||||
|
||||
@@ -53,11 +52,6 @@ pub struct TaskState {
|
||||
pub(crate) shutdown_rx: oneshot::Receiver<()>,
|
||||
/// Task handle
|
||||
pub(crate) task_handle: Option<tokio::task::JoinHandle<()>>,
|
||||
|
||||
/// min run interval in seconds
|
||||
pub(crate) min_run_interval: Option<u64>,
|
||||
/// max filter number per query
|
||||
pub(crate) max_filter_num: Option<usize>,
|
||||
}
|
||||
impl TaskState {
|
||||
pub fn new(query_ctx: QueryContextRef, shutdown_rx: oneshot::Receiver<()>) -> Self {
|
||||
@@ -69,8 +63,6 @@ impl TaskState {
|
||||
exec_state: ExecState::Idle,
|
||||
shutdown_rx,
|
||||
task_handle: None,
|
||||
min_run_interval: None,
|
||||
max_filter_num: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -95,17 +87,20 @@ impl TaskState {
|
||||
pub fn get_next_start_query_time(
|
||||
&self,
|
||||
flow_id: FlowId,
|
||||
_time_window_size: &Option<Duration>,
|
||||
time_window_size: &Option<Duration>,
|
||||
max_timeout: Option<Duration>,
|
||||
) -> Instant {
|
||||
let next_duration = max_timeout
|
||||
let last_duration = max_timeout
|
||||
.unwrap_or(self.last_query_duration)
|
||||
.min(self.last_query_duration)
|
||||
.max(
|
||||
self.min_run_interval
|
||||
.map(|s| Duration::from_secs(s))
|
||||
.unwrap_or(MIN_REFRESH_DURATION),
|
||||
);
|
||||
.max(MIN_REFRESH_DURATION);
|
||||
|
||||
let next_duration = time_window_size
|
||||
.map(|t| {
|
||||
let half = t / 2;
|
||||
half.max(last_duration)
|
||||
})
|
||||
.unwrap_or(last_duration);
|
||||
|
||||
// if have dirty time window, execute immediately to clean dirty time window
|
||||
if self.dirty_time_windows.windows.is_empty() {
|
||||
@@ -251,10 +246,6 @@ impl DirtyTimeWindows {
|
||||
.with_label_values(&[flow_id.to_string().as_str()])
|
||||
.observe(first_nth.len() as f64);
|
||||
|
||||
METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT
|
||||
.with_label_values(&[flow_id.to_string().as_str()])
|
||||
.observe(self.windows.len() as f64);
|
||||
|
||||
let full_time_range = first_nth
|
||||
.iter()
|
||||
.fold(chrono::Duration::zero(), |acc, (start, end)| {
|
||||
|
||||
@@ -144,12 +144,6 @@ impl BatchingTask {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn adjust(&self, min_run_interval_secs: u64, max_filter_num_per_query: usize) {
|
||||
let mut state = self.state.write().unwrap();
|
||||
state.min_run_interval = Some(min_run_interval_secs);
|
||||
state.max_filter_num = Some(max_filter_num_per_query);
|
||||
}
|
||||
|
||||
/// mark time window range (now - expire_after, now) as dirty (or (0, now) if expire_after not set)
|
||||
///
|
||||
/// useful for flush_flow to flush dirty time windows range
|
||||
@@ -586,20 +580,19 @@ impl BatchingTask {
|
||||
),
|
||||
})?;
|
||||
|
||||
let expr = {
|
||||
let mut state = self.state.write().unwrap();
|
||||
let max_window_cnt = state
|
||||
.max_filter_num
|
||||
.unwrap_or(DirtyTimeWindows::MAX_FILTER_NUM);
|
||||
state.dirty_time_windows.gen_filter_exprs(
|
||||
let expr = self
|
||||
.state
|
||||
.write()
|
||||
.unwrap()
|
||||
.dirty_time_windows
|
||||
.gen_filter_exprs(
|
||||
&col_name,
|
||||
Some(l),
|
||||
window_size,
|
||||
max_window_cnt,
|
||||
DirtyTimeWindows::MAX_FILTER_NUM,
|
||||
self.config.flow_id,
|
||||
Some(self),
|
||||
)?
|
||||
};
|
||||
)?;
|
||||
|
||||
debug!(
|
||||
"Flow id={:?}, Generated filter expr: {:?}",
|
||||
|
||||
@@ -42,14 +42,6 @@ lazy_static! {
|
||||
vec![60., 2. * 60., 3. * 60., 5. * 60., 10. * 60.]
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT: HistogramVec =
|
||||
register_histogram_vec!(
|
||||
"greptime_flow_batching_engine_stalled_query_window_cnt",
|
||||
"flow batching engine stalled query time window count",
|
||||
&["flow_id"],
|
||||
vec![0.0, 5., 10., 20., 40.]
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT: HistogramVec =
|
||||
register_histogram_vec!(
|
||||
"greptime_flow_batching_engine_query_window_cnt",
|
||||
|
||||
@@ -375,7 +375,7 @@ impl HeartbeatMailbox {
|
||||
|
||||
/// Parses the [Instruction] from [MailboxMessage].
|
||||
#[cfg(test)]
|
||||
pub(crate) fn json_instruction(msg: &MailboxMessage) -> Result<Instruction> {
|
||||
pub fn json_instruction(msg: &MailboxMessage) -> Result<Instruction> {
|
||||
let Payload::Json(payload) =
|
||||
msg.payload
|
||||
.as_ref()
|
||||
|
||||
@@ -40,7 +40,7 @@ pub(crate) fn infer_time_bucket<'a>(files: impl Iterator<Item = &'a FileHandle>)
|
||||
.unwrap_or_else(|| TIME_BUCKETS.max()) // safety: TIME_BUCKETS cannot be empty.
|
||||
}
|
||||
|
||||
pub(crate) struct TimeBuckets([i64; 7]);
|
||||
pub(crate) struct TimeBuckets([i64; 5]);
|
||||
|
||||
impl TimeBuckets {
|
||||
/// Fits a given time span into time bucket by find the minimum bucket that can cover the span.
|
||||
@@ -71,13 +71,11 @@ impl TimeBuckets {
|
||||
|
||||
/// A set of predefined time buckets.
|
||||
pub(crate) const TIME_BUCKETS: TimeBuckets = TimeBuckets([
|
||||
60 * 60, // one hour
|
||||
2 * 60 * 60, // two hours
|
||||
12 * 60 * 60, // twelve hours
|
||||
24 * 60 * 60, // one day
|
||||
7 * 24 * 60 * 60, // one week
|
||||
365 * 24 * 60 * 60, // one year
|
||||
10 * 365 * 24 * 60 * 60, // ten years
|
||||
60 * 60, // one hour
|
||||
2 * 60 * 60, // two hours
|
||||
12 * 60 * 60, // twelve hours
|
||||
24 * 60 * 60, // one day
|
||||
7 * 24 * 60 * 60, // one week
|
||||
]);
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -107,7 +105,7 @@ mod tests {
|
||||
TIME_BUCKETS.get(3),
|
||||
TIME_BUCKETS.fit_time_bucket(TIME_BUCKETS.get(3) - 1)
|
||||
);
|
||||
assert_eq!(TIME_BUCKETS.get(6), TIME_BUCKETS.fit_time_bucket(i64::MAX));
|
||||
assert_eq!(TIME_BUCKETS.get(4), TIME_BUCKETS.fit_time_bucket(i64::MAX));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -61,6 +61,30 @@ async fn put_and_flush(
|
||||
assert_eq!(0, result.affected_rows);
|
||||
}
|
||||
|
||||
async fn flush(engine: &MitoEngine, region_id: RegionId) {
|
||||
let result = engine
|
||||
.handle_request(
|
||||
region_id,
|
||||
RegionRequest::Flush(RegionFlushRequest {
|
||||
row_group_size: None,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(0, result.affected_rows);
|
||||
}
|
||||
|
||||
async fn compact(engine: &MitoEngine, region_id: RegionId) {
|
||||
let result = engine
|
||||
.handle_request(
|
||||
region_id,
|
||||
RegionRequest::Compact(RegionCompactRequest::default()),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(result.affected_rows, 0);
|
||||
}
|
||||
|
||||
async fn delete_and_flush(
|
||||
engine: &MitoEngine,
|
||||
region_id: RegionId,
|
||||
@@ -147,14 +171,7 @@ async fn test_compaction_region() {
|
||||
delete_and_flush(&engine, region_id, &column_schemas, 15..30).await;
|
||||
put_and_flush(&engine, region_id, &column_schemas, 15..25).await;
|
||||
|
||||
let result = engine
|
||||
.handle_request(
|
||||
region_id,
|
||||
RegionRequest::Compact(RegionCompactRequest::default()),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(result.affected_rows, 0);
|
||||
compact(&engine, region_id).await;
|
||||
|
||||
let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
|
||||
// Input:
|
||||
@@ -179,6 +196,136 @@ async fn test_compaction_region() {
|
||||
assert_eq!((0..25).map(|v| v * 1000).collect::<Vec<_>>(), vec);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_infer_compaction_time_window() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let mut env = TestEnv::new();
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
env.get_schema_metadata_manager()
|
||||
.register_region_table_info(
|
||||
region_id.table_id(),
|
||||
"test_table",
|
||||
"test_catalog",
|
||||
"test_schema",
|
||||
None,
|
||||
env.get_kv_backend(),
|
||||
)
|
||||
.await;
|
||||
|
||||
let request = CreateRequestBuilder::new()
|
||||
.insert_option("compaction.type", "twcs")
|
||||
.build();
|
||||
|
||||
let column_schemas = request
|
||||
.column_metadatas
|
||||
.iter()
|
||||
.map(column_metadata_to_column_schema)
|
||||
.collect::<Vec<_>>();
|
||||
engine
|
||||
.handle_request(region_id, RegionRequest::Create(request))
|
||||
.await
|
||||
.unwrap();
|
||||
// time window should be absent
|
||||
assert!(engine
|
||||
.get_region(region_id)
|
||||
.unwrap()
|
||||
.version_control
|
||||
.current()
|
||||
.version
|
||||
.compaction_time_window
|
||||
.is_none());
|
||||
|
||||
put_and_flush(&engine, region_id, &column_schemas, 1..2).await;
|
||||
put_and_flush(&engine, region_id, &column_schemas, 2..3).await;
|
||||
put_and_flush(&engine, region_id, &column_schemas, 3..4).await;
|
||||
put_and_flush(&engine, region_id, &column_schemas, 4..5).await;
|
||||
|
||||
compact(&engine, region_id).await;
|
||||
|
||||
let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
|
||||
assert_eq!(
|
||||
1,
|
||||
scanner.num_files(),
|
||||
"unexpected files: {:?}",
|
||||
scanner.file_ids()
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
Duration::from_secs(3600),
|
||||
engine
|
||||
.get_region(region_id)
|
||||
.unwrap()
|
||||
.version_control
|
||||
.current()
|
||||
.version
|
||||
.compaction_time_window
|
||||
.unwrap()
|
||||
);
|
||||
|
||||
// write two rows to trigger another flush.
|
||||
// note: this two rows still use the original part_duration (1day by default), so they are written
|
||||
// to the same time partition and flushed to one file.
|
||||
put_rows(
|
||||
&engine,
|
||||
region_id,
|
||||
Rows {
|
||||
schema: column_schemas.clone(),
|
||||
rows: build_rows_for_key("a", 3601, 3602, 0),
|
||||
},
|
||||
)
|
||||
.await;
|
||||
put_rows(
|
||||
&engine,
|
||||
region_id,
|
||||
Rows {
|
||||
schema: column_schemas.clone(),
|
||||
rows: build_rows_for_key("a", 7201, 7202, 0),
|
||||
},
|
||||
)
|
||||
.await;
|
||||
// this flush should update part_duration in TimePartitions.
|
||||
flush(&engine, region_id).await;
|
||||
compact(&engine, region_id).await;
|
||||
let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
|
||||
assert_eq!(
|
||||
2,
|
||||
scanner.num_files(),
|
||||
"unexpected files: {:?}",
|
||||
scanner.file_ids()
|
||||
);
|
||||
|
||||
// These data should use new part_duration in TimePartitions and get written to two different
|
||||
// time partitions so we end up with 4 ssts.
|
||||
put_rows(
|
||||
&engine,
|
||||
region_id,
|
||||
Rows {
|
||||
schema: column_schemas.clone(),
|
||||
rows: build_rows_for_key("a", 3601, 3602, 0),
|
||||
},
|
||||
)
|
||||
.await;
|
||||
put_rows(
|
||||
&engine,
|
||||
region_id,
|
||||
Rows {
|
||||
schema: column_schemas.clone(),
|
||||
rows: build_rows_for_key("a", 7201, 7202, 0),
|
||||
},
|
||||
)
|
||||
.await;
|
||||
flush(&engine, region_id).await;
|
||||
let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
|
||||
assert_eq!(
|
||||
4,
|
||||
scanner.num_files(),
|
||||
"unexpected files: {:?}",
|
||||
scanner.file_ids()
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_compaction_overlapping_files() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
@@ -216,14 +363,7 @@ async fn test_compaction_overlapping_files() {
|
||||
put_and_flush(&engine, region_id, &column_schemas, 20..30).await;
|
||||
delete_and_flush(&engine, region_id, &column_schemas, 30..40).await;
|
||||
|
||||
let result = engine
|
||||
.handle_request(
|
||||
region_id,
|
||||
RegionRequest::Compact(RegionCompactRequest::default()),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(result.affected_rows, 0);
|
||||
compact(&engine, region_id).await;
|
||||
|
||||
let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
|
||||
assert_eq!(
|
||||
@@ -282,15 +422,7 @@ async fn test_compaction_region_with_overlapping() {
|
||||
put_and_flush(&engine, region_id, &column_schemas, 3600..10800).await; // window 10800
|
||||
delete_and_flush(&engine, region_id, &column_schemas, 0..3600).await; // window 3600
|
||||
|
||||
let result = engine
|
||||
.handle_request(
|
||||
region_id,
|
||||
RegionRequest::Compact(RegionCompactRequest::default()),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(result.affected_rows, 0);
|
||||
|
||||
compact(&engine, region_id).await;
|
||||
let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
|
||||
let stream = scanner.scan().await.unwrap();
|
||||
let vec = collect_stream_ts(stream).await;
|
||||
@@ -336,15 +468,7 @@ async fn test_compaction_region_with_overlapping_delete_all() {
|
||||
put_and_flush(&engine, region_id, &column_schemas, 0..3600).await; // window 3600
|
||||
delete_and_flush(&engine, region_id, &column_schemas, 0..10800).await; // window 10800
|
||||
|
||||
let result = engine
|
||||
.handle_request(
|
||||
region_id,
|
||||
RegionRequest::Compact(RegionCompactRequest::default()),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(result.affected_rows, 0);
|
||||
|
||||
compact(&engine, region_id).await;
|
||||
let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
|
||||
assert_eq!(
|
||||
2,
|
||||
@@ -477,15 +601,7 @@ async fn test_compaction_update_time_window() {
|
||||
put_and_flush(&engine, region_id, &column_schemas, 1800..2700).await; // window 3600
|
||||
put_and_flush(&engine, region_id, &column_schemas, 2700..3600).await; // window 3600
|
||||
|
||||
let result = engine
|
||||
.handle_request(
|
||||
region_id,
|
||||
RegionRequest::Compact(RegionCompactRequest::default()),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(result.affected_rows, 0);
|
||||
|
||||
compact(&engine, region_id).await;
|
||||
assert_eq!(
|
||||
engine
|
||||
.get_region(region_id)
|
||||
@@ -572,13 +688,7 @@ async fn test_change_region_compaction_window() {
|
||||
put_and_flush(&engine, region_id, &column_schemas, 1200..1800).await; // window 3600
|
||||
put_and_flush(&engine, region_id, &column_schemas, 1800..2400).await; // window 3600
|
||||
|
||||
engine
|
||||
.handle_request(
|
||||
region_id,
|
||||
RegionRequest::Compact(RegionCompactRequest::default()),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
compact(&engine, region_id).await;
|
||||
|
||||
// Put window 7200
|
||||
put_and_flush(&engine, region_id, &column_schemas, 4000..5000).await;
|
||||
@@ -623,13 +733,7 @@ async fn test_change_region_compaction_window() {
|
||||
|
||||
// Compaction again. It should compacts window 3600 and 7200
|
||||
// into 7200.
|
||||
engine
|
||||
.handle_request(
|
||||
region_id,
|
||||
RegionRequest::Compact(RegionCompactRequest::default()),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
compact(&engine, region_id).await;
|
||||
// Check compaction window.
|
||||
{
|
||||
let region = engine.get_region(region_id).unwrap();
|
||||
@@ -709,13 +813,7 @@ async fn test_open_overwrite_compaction_window() {
|
||||
put_and_flush(&engine, region_id, &column_schemas, 1200..1800).await; // window 3600
|
||||
put_and_flush(&engine, region_id, &column_schemas, 1800..2400).await; // window 3600
|
||||
|
||||
engine
|
||||
.handle_request(
|
||||
region_id,
|
||||
RegionRequest::Compact(RegionCompactRequest::default()),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
compact(&engine, region_id).await;
|
||||
|
||||
// Check compaction window.
|
||||
{
|
||||
|
||||
@@ -21,6 +21,7 @@
|
||||
#![feature(result_flattening)]
|
||||
#![feature(int_roundings)]
|
||||
#![feature(debug_closure_helpers)]
|
||||
#![feature(duration_constructors)]
|
||||
|
||||
#[cfg(any(test, feature = "test"))]
|
||||
#[cfg_attr(feature = "test", allow(unused))]
|
||||
|
||||
@@ -40,25 +40,22 @@ use crate::memtable::key_values::KeyValue;
|
||||
use crate::memtable::version::SmallMemtableVec;
|
||||
use crate::memtable::{KeyValues, MemtableBuilderRef, MemtableId, MemtableRef};
|
||||
|
||||
/// Initial time window if not specified.
|
||||
const INITIAL_TIME_WINDOW: Duration = Duration::from_days(1);
|
||||
|
||||
/// A partition holds rows with timestamps between `[min, max)`.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct TimePartition {
|
||||
/// Memtable of the partition.
|
||||
memtable: MemtableRef,
|
||||
/// Time range of the partition. `min` is inclusive and `max` is exclusive.
|
||||
/// `None` means there is no time range. The time
|
||||
/// range is `None` if and only if the [TimePartitions::part_duration] is `None`.
|
||||
time_range: Option<PartTimeRange>,
|
||||
time_range: PartTimeRange,
|
||||
}
|
||||
|
||||
impl TimePartition {
|
||||
/// Returns whether the `ts` belongs to the partition.
|
||||
fn contains_timestamp(&self, ts: Timestamp) -> bool {
|
||||
let Some(range) = self.time_range else {
|
||||
return true;
|
||||
};
|
||||
|
||||
range.contains_timestamp(ts)
|
||||
self.time_range.contains_timestamp(ts)
|
||||
}
|
||||
|
||||
/// Write rows to the part.
|
||||
@@ -72,14 +69,11 @@ impl TimePartition {
|
||||
}
|
||||
|
||||
/// Write a partial [BulkPart] according to [TimePartition::time_range].
|
||||
fn write_record_batch_partial(&self, part: &BulkPart) -> error::Result<()> {
|
||||
let Some(range) = self.time_range else {
|
||||
unreachable!("TimePartition must have explicit time range when a bulk request involves multiple time partition")
|
||||
};
|
||||
fn write_record_batch_partial(&self, part: &BulkPart) -> Result<()> {
|
||||
let Some(filtered) = filter_record_batch(
|
||||
part,
|
||||
range.min_timestamp.value(),
|
||||
range.max_timestamp.value(),
|
||||
self.time_range.min_timestamp.value(),
|
||||
self.time_range.max_timestamp.value(),
|
||||
)?
|
||||
else {
|
||||
return Ok(());
|
||||
@@ -209,10 +203,7 @@ pub struct TimePartitions {
|
||||
/// Mutable data of partitions.
|
||||
inner: Mutex<PartitionsInner>,
|
||||
/// Duration of a partition.
|
||||
///
|
||||
/// `None` means there is only one partition and the [TimePartition::time_range] is
|
||||
/// also `None`.
|
||||
part_duration: Option<Duration>,
|
||||
part_duration: Duration,
|
||||
/// Metadata of the region.
|
||||
metadata: RegionMetadataRef,
|
||||
/// Builder of memtables.
|
||||
@@ -229,26 +220,10 @@ impl TimePartitions {
|
||||
next_memtable_id: MemtableId,
|
||||
part_duration: Option<Duration>,
|
||||
) -> Self {
|
||||
let mut inner = PartitionsInner::new(next_memtable_id);
|
||||
if part_duration.is_none() {
|
||||
// If `part_duration` is None, then we create a partition with `None` time
|
||||
// range so we will write all rows to that partition.
|
||||
let memtable = builder.build(inner.alloc_memtable_id(), &metadata);
|
||||
debug!(
|
||||
"Creates a time partition for all timestamps, region: {}, memtable_id: {}",
|
||||
metadata.region_id,
|
||||
memtable.id(),
|
||||
);
|
||||
let part = TimePartition {
|
||||
memtable,
|
||||
time_range: None,
|
||||
};
|
||||
inner.parts.push(part);
|
||||
}
|
||||
|
||||
let inner = PartitionsInner::new(next_memtable_id);
|
||||
Self {
|
||||
inner: Mutex::new(inner),
|
||||
part_duration,
|
||||
part_duration: part_duration.unwrap_or(INITIAL_TIME_WINDOW),
|
||||
metadata,
|
||||
builder,
|
||||
}
|
||||
@@ -329,19 +304,18 @@ impl TimePartitions {
|
||||
part_start: Timestamp,
|
||||
inner: &mut MutexGuard<PartitionsInner>,
|
||||
) -> Result<TimePartition> {
|
||||
let part_duration = self.part_duration.unwrap();
|
||||
let part_pos = match inner
|
||||
.parts
|
||||
.iter()
|
||||
.position(|part| part.time_range.unwrap().min_timestamp == part_start)
|
||||
.position(|part| part.time_range.min_timestamp == part_start)
|
||||
{
|
||||
Some(pos) => pos,
|
||||
None => {
|
||||
let range = PartTimeRange::from_start_duration(part_start, part_duration)
|
||||
let range = PartTimeRange::from_start_duration(part_start, self.part_duration)
|
||||
.with_context(|| InvalidRequestSnafu {
|
||||
region_id: self.metadata.region_id,
|
||||
reason: format!(
|
||||
"Partition time range for {part_start:?} is out of bound, bucket size: {part_duration:?}",
|
||||
"Partition time range for {part_start:?} is out of bound, bucket size: {:?}", self.part_duration
|
||||
),
|
||||
})?;
|
||||
let memtable = self
|
||||
@@ -351,14 +325,14 @@ impl TimePartitions {
|
||||
"Create time partition {:?} for region {}, duration: {:?}, memtable_id: {}, parts_total: {}",
|
||||
range,
|
||||
self.metadata.region_id,
|
||||
part_duration,
|
||||
self.part_duration,
|
||||
memtable.id(),
|
||||
inner.parts.len() + 1
|
||||
);
|
||||
let pos = inner.parts.len();
|
||||
inner.parts.push(TimePartition {
|
||||
memtable,
|
||||
time_range: Some(range),
|
||||
time_range: range,
|
||||
});
|
||||
pos
|
||||
}
|
||||
@@ -396,13 +370,13 @@ impl TimePartitions {
|
||||
/// Forks latest partition and updates the partition duration if `part_duration` is Some.
|
||||
pub fn fork(&self, metadata: &RegionMetadataRef, part_duration: Option<Duration>) -> Self {
|
||||
// Fall back to the existing partition duration.
|
||||
let part_duration = part_duration.or(self.part_duration);
|
||||
let part_duration = part_duration.unwrap_or(self.part_duration);
|
||||
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
let latest_part = inner
|
||||
.parts
|
||||
.iter()
|
||||
.max_by_key(|part| part.time_range.map(|range| range.min_timestamp))
|
||||
.max_by_key(|part| part.time_range.min_timestamp)
|
||||
.cloned();
|
||||
|
||||
let Some(old_part) = latest_part else {
|
||||
@@ -411,33 +385,31 @@ impl TimePartitions {
|
||||
metadata.clone(),
|
||||
self.builder.clone(),
|
||||
inner.next_memtable_id,
|
||||
part_duration,
|
||||
Some(part_duration),
|
||||
);
|
||||
};
|
||||
|
||||
let old_stats = old_part.memtable.stats();
|
||||
// Use the max timestamp to compute the new time range for the memtable.
|
||||
// If `part_duration` is None, the new range will be None.
|
||||
let new_time_range =
|
||||
old_stats
|
||||
.time_range()
|
||||
.zip(part_duration)
|
||||
.and_then(|(range, bucket)| {
|
||||
partition_start_timestamp(range.1, bucket)
|
||||
.and_then(|start| PartTimeRange::from_start_duration(start, bucket))
|
||||
});
|
||||
// Forks the latest partition, but compute the time range based on the new duration.
|
||||
let memtable = old_part.memtable.fork(inner.alloc_memtable_id(), metadata);
|
||||
let new_part = TimePartition {
|
||||
memtable,
|
||||
time_range: new_time_range,
|
||||
};
|
||||
let partitions_inner = old_stats
|
||||
.time_range()
|
||||
.and_then(|(_, old_stats_end_timestamp)| {
|
||||
partition_start_timestamp(old_stats_end_timestamp, part_duration)
|
||||
.and_then(|start| PartTimeRange::from_start_duration(start, part_duration))
|
||||
})
|
||||
.map(|part_time_range| {
|
||||
// Forks the latest partition, but compute the time range based on the new duration.
|
||||
let memtable = old_part.memtable.fork(inner.alloc_memtable_id(), metadata);
|
||||
let part = TimePartition {
|
||||
memtable,
|
||||
time_range: part_time_range,
|
||||
};
|
||||
PartitionsInner::with_partition(part, inner.next_memtable_id)
|
||||
})
|
||||
.unwrap_or_else(|| PartitionsInner::new(inner.next_memtable_id));
|
||||
|
||||
Self {
|
||||
inner: Mutex::new(PartitionsInner::with_partition(
|
||||
new_part,
|
||||
inner.next_memtable_id,
|
||||
)),
|
||||
inner: Mutex::new(partitions_inner),
|
||||
part_duration,
|
||||
metadata: metadata.clone(),
|
||||
builder: self.builder.clone(),
|
||||
@@ -445,7 +417,7 @@ impl TimePartitions {
|
||||
}
|
||||
|
||||
/// Returns partition duration.
|
||||
pub(crate) fn part_duration(&self) -> Option<Duration> {
|
||||
pub(crate) fn part_duration(&self) -> Duration {
|
||||
self.part_duration
|
||||
}
|
||||
|
||||
@@ -490,7 +462,7 @@ impl TimePartitions {
|
||||
self.metadata.clone(),
|
||||
self.builder.clone(),
|
||||
self.next_memtable_id(),
|
||||
part_duration.or(self.part_duration),
|
||||
Some(part_duration.unwrap_or(self.part_duration)),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -514,11 +486,7 @@ impl TimePartitions {
|
||||
let mut present = HashSet::new();
|
||||
// First find any existing partitions that overlap
|
||||
for part in existing_parts {
|
||||
let Some(part_time_range) = part.time_range.as_ref() else {
|
||||
matching.push(part);
|
||||
return Ok((matching, Vec::new()));
|
||||
};
|
||||
|
||||
let part_time_range = &part.time_range;
|
||||
if !(max < part_time_range.min_timestamp || min >= part_time_range.max_timestamp) {
|
||||
matching.push(part);
|
||||
present.insert(part_time_range.min_timestamp.value());
|
||||
@@ -526,7 +494,7 @@ impl TimePartitions {
|
||||
}
|
||||
|
||||
// safety: self.part_duration can only be present when reach here.
|
||||
let part_duration = self.part_duration.unwrap();
|
||||
let part_duration = self.part_duration_or_default();
|
||||
let timestamp_unit = self.metadata.time_index_type().unit();
|
||||
|
||||
let part_duration_sec = part_duration.as_secs() as i64;
|
||||
@@ -621,12 +589,13 @@ impl TimePartitions {
|
||||
Ok((matching, missing))
|
||||
}
|
||||
|
||||
/// Returns partition duration, or use default 1day duration is not present.
|
||||
fn part_duration_or_default(&self) -> Duration {
|
||||
self.part_duration
|
||||
}
|
||||
|
||||
/// Write to multiple partitions.
|
||||
fn write_multi_parts(&self, kvs: &KeyValues, parts: &PartitionVec) -> Result<()> {
|
||||
// If part duration is `None` then there is always one partition and all rows
|
||||
// will be put in that partition before invoking this method.
|
||||
debug_assert!(self.part_duration.is_some());
|
||||
|
||||
let mut parts_to_write = HashMap::new();
|
||||
let mut missing_parts = HashMap::new();
|
||||
for kv in kvs.iter() {
|
||||
@@ -635,9 +604,8 @@ impl TimePartitions {
|
||||
let ts = kv.timestamp().as_timestamp().unwrap().unwrap();
|
||||
for part in parts {
|
||||
if part.contains_timestamp(ts) {
|
||||
// Safety: Since part duration is `Some` so all time range should be `Some`.
|
||||
parts_to_write
|
||||
.entry(part.time_range.unwrap().min_timestamp)
|
||||
.entry(part.time_range.min_timestamp)
|
||||
.or_insert_with(|| PartitionToWrite {
|
||||
partition: part.clone(),
|
||||
key_values: Vec::new(),
|
||||
@@ -652,7 +620,7 @@ impl TimePartitions {
|
||||
if !part_found {
|
||||
// We need to write it to a new part.
|
||||
// Safety: `new()` ensures duration is always Some if we do to this method.
|
||||
let part_duration = self.part_duration.unwrap();
|
||||
let part_duration = self.part_duration_or_default();
|
||||
let part_start =
|
||||
partition_start_timestamp(ts, part_duration).with_context(|| {
|
||||
InvalidRequestSnafu {
|
||||
@@ -787,7 +755,7 @@ mod tests {
|
||||
let metadata = memtable_util::metadata_for_test();
|
||||
let builder = Arc::new(PartitionTreeMemtableBuilder::default());
|
||||
let partitions = TimePartitions::new(metadata.clone(), builder, 0, None);
|
||||
assert_eq!(1, partitions.num_partitions());
|
||||
assert_eq!(0, partitions.num_partitions());
|
||||
assert!(partitions.is_empty());
|
||||
|
||||
let kvs = memtable_util::build_key_values(
|
||||
@@ -849,14 +817,15 @@ mod tests {
|
||||
let parts = partitions.list_partitions();
|
||||
assert_eq!(
|
||||
Timestamp::new_millisecond(0),
|
||||
parts[0].time_range.unwrap().min_timestamp
|
||||
parts[0].time_range.min_timestamp
|
||||
);
|
||||
assert_eq!(
|
||||
Timestamp::new_millisecond(10000),
|
||||
parts[0].time_range.unwrap().max_timestamp
|
||||
parts[0].time_range.max_timestamp
|
||||
);
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
fn new_multi_partitions(metadata: &RegionMetadataRef) -> TimePartitions {
|
||||
let builder = Arc::new(PartitionTreeMemtableBuilder::default());
|
||||
let partitions =
|
||||
@@ -900,11 +869,11 @@ mod tests {
|
||||
assert_eq!(0, parts[0].memtable.id());
|
||||
assert_eq!(
|
||||
Timestamp::new_millisecond(0),
|
||||
parts[0].time_range.unwrap().min_timestamp
|
||||
parts[0].time_range.min_timestamp
|
||||
);
|
||||
assert_eq!(
|
||||
Timestamp::new_millisecond(5000),
|
||||
parts[0].time_range.unwrap().max_timestamp
|
||||
parts[0].time_range.max_timestamp
|
||||
);
|
||||
assert_eq!(&[0, 2000, 3000, 4000], ×tamps[..]);
|
||||
let iter = parts[1].memtable.iter(None, None, None).unwrap();
|
||||
@@ -913,11 +882,11 @@ mod tests {
|
||||
assert_eq!(&[5000, 7000], ×tamps[..]);
|
||||
assert_eq!(
|
||||
Timestamp::new_millisecond(5000),
|
||||
parts[1].time_range.unwrap().min_timestamp
|
||||
parts[1].time_range.min_timestamp
|
||||
);
|
||||
assert_eq!(
|
||||
Timestamp::new_millisecond(10000),
|
||||
parts[1].time_range.unwrap().max_timestamp
|
||||
parts[1].time_range.max_timestamp
|
||||
);
|
||||
}
|
||||
|
||||
@@ -928,26 +897,26 @@ mod tests {
|
||||
let partitions = TimePartitions::new(metadata.clone(), builder.clone(), 0, None);
|
||||
|
||||
let new_parts = partitions.new_with_part_duration(Some(Duration::from_secs(5)));
|
||||
assert_eq!(Duration::from_secs(5), new_parts.part_duration().unwrap());
|
||||
assert_eq!(1, new_parts.next_memtable_id());
|
||||
assert_eq!(Duration::from_secs(5), new_parts.part_duration());
|
||||
assert_eq!(0, new_parts.next_memtable_id());
|
||||
|
||||
// Won't update the duration if it's None.
|
||||
let new_parts = new_parts.new_with_part_duration(None);
|
||||
assert_eq!(Duration::from_secs(5), new_parts.part_duration().unwrap());
|
||||
assert_eq!(Duration::from_secs(5), new_parts.part_duration());
|
||||
// Don't need to create new memtables.
|
||||
assert_eq!(1, new_parts.next_memtable_id());
|
||||
assert_eq!(0, new_parts.next_memtable_id());
|
||||
|
||||
let new_parts = new_parts.new_with_part_duration(Some(Duration::from_secs(10)));
|
||||
assert_eq!(Duration::from_secs(10), new_parts.part_duration().unwrap());
|
||||
assert_eq!(Duration::from_secs(10), new_parts.part_duration());
|
||||
// Don't need to create new memtables.
|
||||
assert_eq!(1, new_parts.next_memtable_id());
|
||||
assert_eq!(0, new_parts.next_memtable_id());
|
||||
|
||||
let builder = Arc::new(PartitionTreeMemtableBuilder::default());
|
||||
let partitions = TimePartitions::new(metadata.clone(), builder.clone(), 0, None);
|
||||
// Need to build a new memtable as duration is still None.
|
||||
let new_parts = partitions.new_with_part_duration(None);
|
||||
assert!(new_parts.part_duration().is_none());
|
||||
assert_eq!(2, new_parts.next_memtable_id());
|
||||
assert_eq!(INITIAL_TIME_WINDOW, new_parts.part_duration());
|
||||
assert_eq!(0, new_parts.next_memtable_id());
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -957,28 +926,28 @@ mod tests {
|
||||
let partitions = TimePartitions::new(metadata.clone(), builder, 0, None);
|
||||
partitions.freeze().unwrap();
|
||||
let new_parts = partitions.fork(&metadata, None);
|
||||
assert!(new_parts.part_duration().is_none());
|
||||
assert_eq!(1, new_parts.list_partitions()[0].memtable.id());
|
||||
assert_eq!(2, new_parts.next_memtable_id());
|
||||
assert_eq!(INITIAL_TIME_WINDOW, new_parts.part_duration());
|
||||
assert!(new_parts.list_partitions().is_empty());
|
||||
assert_eq!(0, new_parts.next_memtable_id());
|
||||
|
||||
new_parts.freeze().unwrap();
|
||||
let new_parts = new_parts.fork(&metadata, Some(Duration::from_secs(5)));
|
||||
assert_eq!(Duration::from_secs(5), new_parts.part_duration().unwrap());
|
||||
assert_eq!(2, new_parts.list_partitions()[0].memtable.id());
|
||||
assert_eq!(3, new_parts.next_memtable_id());
|
||||
assert_eq!(Duration::from_secs(5), new_parts.part_duration());
|
||||
assert!(new_parts.list_partitions().is_empty());
|
||||
assert_eq!(0, new_parts.next_memtable_id());
|
||||
|
||||
new_parts.freeze().unwrap();
|
||||
let new_parts = new_parts.fork(&metadata, None);
|
||||
// Won't update the duration.
|
||||
assert_eq!(Duration::from_secs(5), new_parts.part_duration().unwrap());
|
||||
assert_eq!(3, new_parts.list_partitions()[0].memtable.id());
|
||||
assert_eq!(4, new_parts.next_memtable_id());
|
||||
assert_eq!(Duration::from_secs(5), new_parts.part_duration());
|
||||
assert!(new_parts.list_partitions().is_empty());
|
||||
assert_eq!(0, new_parts.next_memtable_id());
|
||||
|
||||
new_parts.freeze().unwrap();
|
||||
let new_parts = new_parts.fork(&metadata, Some(Duration::from_secs(10)));
|
||||
assert_eq!(Duration::from_secs(10), new_parts.part_duration().unwrap());
|
||||
assert_eq!(4, new_parts.list_partitions()[0].memtable.id());
|
||||
assert_eq!(5, new_parts.next_memtable_id());
|
||||
assert_eq!(Duration::from_secs(10), new_parts.part_duration());
|
||||
assert!(new_parts.list_partitions().is_empty());
|
||||
assert_eq!(0, new_parts.next_memtable_id());
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -990,14 +959,14 @@ mod tests {
|
||||
// Won't update the duration.
|
||||
let new_parts = partitions.fork(&metadata, None);
|
||||
assert!(new_parts.is_empty());
|
||||
assert_eq!(Duration::from_secs(5), new_parts.part_duration().unwrap());
|
||||
assert_eq!(Duration::from_secs(5), new_parts.part_duration());
|
||||
assert_eq!(2, new_parts.list_partitions()[0].memtable.id());
|
||||
assert_eq!(3, new_parts.next_memtable_id());
|
||||
|
||||
// Although we don't fork a memtable multiple times, we still add a test for it.
|
||||
let new_parts = partitions.fork(&metadata, Some(Duration::from_secs(10)));
|
||||
assert!(new_parts.is_empty());
|
||||
assert_eq!(Duration::from_secs(10), new_parts.part_duration().unwrap());
|
||||
assert_eq!(Duration::from_secs(10), new_parts.part_duration());
|
||||
assert_eq!(3, new_parts.list_partitions()[0].memtable.id());
|
||||
assert_eq!(4, new_parts.next_memtable_id());
|
||||
}
|
||||
@@ -1018,9 +987,9 @@ mod tests {
|
||||
Timestamp::new_millisecond(2000),
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(matching.len(), 1);
|
||||
assert!(missing.is_empty());
|
||||
assert!(matching[0].time_range.is_none());
|
||||
assert_eq!(matching.len(), 0);
|
||||
assert_eq!(missing.len(), 1);
|
||||
assert_eq!(missing[0], Timestamp::new_millisecond(0));
|
||||
|
||||
// Case 2: With time range partitioning
|
||||
let partitions = TimePartitions::new(
|
||||
@@ -1052,7 +1021,7 @@ mod tests {
|
||||
.unwrap();
|
||||
assert_eq!(matching.len(), 1);
|
||||
assert!(missing.is_empty());
|
||||
assert_eq!(matching[0].time_range.unwrap().min_timestamp.value(), 0);
|
||||
assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
|
||||
|
||||
// Test case 2b: Query spanning multiple existing partitions
|
||||
let (matching, missing) = partitions
|
||||
@@ -1065,8 +1034,8 @@ mod tests {
|
||||
.unwrap();
|
||||
assert_eq!(matching.len(), 2);
|
||||
assert!(missing.is_empty());
|
||||
assert_eq!(matching[0].time_range.unwrap().min_timestamp.value(), 0);
|
||||
assert_eq!(matching[1].time_range.unwrap().min_timestamp.value(), 5000);
|
||||
assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
|
||||
assert_eq!(matching[1].time_range.min_timestamp.value(), 5000);
|
||||
|
||||
// Test case 2c: Query requiring new partition
|
||||
let (matching, missing) = partitions
|
||||
@@ -1092,8 +1061,8 @@ mod tests {
|
||||
.unwrap();
|
||||
assert_eq!(matching.len(), 2);
|
||||
assert!(missing.is_empty());
|
||||
assert_eq!(matching[0].time_range.unwrap().min_timestamp.value(), 0);
|
||||
assert_eq!(matching[1].time_range.unwrap().min_timestamp.value(), 5000);
|
||||
assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
|
||||
assert_eq!(matching[1].time_range.min_timestamp.value(), 5000);
|
||||
|
||||
// Test case 2e: Corner case
|
||||
let (matching, missing) = partitions
|
||||
@@ -1106,8 +1075,8 @@ mod tests {
|
||||
.unwrap();
|
||||
assert_eq!(matching.len(), 2);
|
||||
assert!(missing.is_empty());
|
||||
assert_eq!(matching[0].time_range.unwrap().min_timestamp.value(), 0);
|
||||
assert_eq!(matching[1].time_range.unwrap().min_timestamp.value(), 5000);
|
||||
assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
|
||||
assert_eq!(matching[1].time_range.min_timestamp.value(), 5000);
|
||||
|
||||
// Test case 2f: Corner case with
|
||||
let (matching, missing) = partitions
|
||||
@@ -1120,7 +1089,7 @@ mod tests {
|
||||
.unwrap();
|
||||
assert_eq!(matching.len(), 1);
|
||||
assert_eq!(1, missing.len());
|
||||
assert_eq!(matching[0].time_range.unwrap().min_timestamp.value(), 5000);
|
||||
assert_eq!(matching[0].time_range.min_timestamp.value(), 5000);
|
||||
assert_eq!(missing[0].value(), 10000);
|
||||
|
||||
// Test case 2g: Cross 0
|
||||
@@ -1133,7 +1102,7 @@ mod tests {
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(matching.len(), 1);
|
||||
assert_eq!(matching[0].time_range.unwrap().min_timestamp.value(), 0);
|
||||
assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
|
||||
assert_eq!(1, missing.len());
|
||||
assert_eq!(missing[0].value(), -5000);
|
||||
|
||||
@@ -1151,8 +1120,8 @@ mod tests {
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(2, matching.len());
|
||||
assert_eq!(matching[0].time_range.unwrap().min_timestamp.value(), 0);
|
||||
assert_eq!(matching[1].time_range.unwrap().min_timestamp.value(), 5000);
|
||||
assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
|
||||
assert_eq!(matching[1].time_range.min_timestamp.value(), 5000);
|
||||
assert_eq!(2, missing.len());
|
||||
assert_eq!(missing[0].value(), -100000000000);
|
||||
assert_eq!(missing[1].value(), 100000000000);
|
||||
@@ -1162,10 +1131,7 @@ mod tests {
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new(
|
||||
"ts",
|
||||
arrow::datatypes::DataType::Timestamp(
|
||||
arrow::datatypes::TimeUnit::Millisecond,
|
||||
None,
|
||||
),
|
||||
DataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
|
||||
false,
|
||||
),
|
||||
Field::new("val", DataType::Utf8, true),
|
||||
|
||||
@@ -76,7 +76,7 @@ impl MemtableVersion {
|
||||
) -> Result<Option<MemtableVersion>> {
|
||||
if self.mutable.is_empty() {
|
||||
// No need to freeze the mutable memtable, but we need to check the time window.
|
||||
if self.mutable.part_duration() == time_window {
|
||||
if Some(self.mutable.part_duration()) == time_window {
|
||||
// If the time window is the same, we don't need to update it.
|
||||
return Ok(None);
|
||||
}
|
||||
@@ -98,7 +98,7 @@ impl MemtableVersion {
|
||||
// soft limit.
|
||||
self.mutable.freeze()?;
|
||||
// Fork the memtable.
|
||||
if self.mutable.part_duration() != time_window {
|
||||
if Some(self.mutable.part_duration()) != time_window {
|
||||
common_telemetry::debug!(
|
||||
"Fork memtable, update partition duration from {:?}, to {:?}",
|
||||
self.mutable.part_duration(),
|
||||
|
||||
@@ -142,7 +142,7 @@ impl VersionControl {
|
||||
/// Mark all opened files as deleted and set the delete marker in [VersionControlData]
|
||||
pub(crate) fn mark_dropped(&self, memtable_builder: &MemtableBuilderRef) {
|
||||
let version = self.current().version;
|
||||
let part_duration = version.memtables.mutable.part_duration();
|
||||
let part_duration = Some(version.memtables.mutable.part_duration());
|
||||
let next_memtable_id = version.memtables.mutable.next_memtable_id();
|
||||
let new_mutable = Arc::new(TimePartitions::new(
|
||||
version.metadata.clone(),
|
||||
@@ -166,7 +166,7 @@ impl VersionControl {
|
||||
/// new schema. Memtables of the version must be empty.
|
||||
pub(crate) fn alter_schema(&self, metadata: RegionMetadataRef, builder: &MemtableBuilderRef) {
|
||||
let version = self.current().version;
|
||||
let part_duration = version.memtables.mutable.part_duration();
|
||||
let part_duration = Some(version.memtables.mutable.part_duration());
|
||||
let next_memtable_id = version.memtables.mutable.next_memtable_id();
|
||||
let new_mutable = Arc::new(TimePartitions::new(
|
||||
metadata.clone(),
|
||||
@@ -202,7 +202,7 @@ impl VersionControl {
|
||||
version.metadata.clone(),
|
||||
memtable_builder.clone(),
|
||||
next_memtable_id,
|
||||
part_duration,
|
||||
Some(part_duration),
|
||||
));
|
||||
let new_version = Arc::new(
|
||||
VersionBuilder::new(version.metadata.clone(), new_mutable)
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use api::v1::flow::{AdjustFlow, FlowRequestHeader};
|
||||
use api::v1::flow::FlowRequestHeader;
|
||||
use async_trait::async_trait;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_function::handlers::FlowServiceHandler;
|
||||
@@ -22,7 +22,6 @@ use common_query::error::Result;
|
||||
use common_telemetry::tracing_context::TracingContext;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use futures::StreamExt;
|
||||
use serde_json::json;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
|
||||
@@ -58,96 +57,9 @@ impl FlowServiceHandler for FlowServiceOperator {
|
||||
) -> Result<api::v1::flow::FlowResponse> {
|
||||
self.flush_inner(catalog, flow, ctx).await
|
||||
}
|
||||
|
||||
async fn adjust(
|
||||
&self,
|
||||
catalog: &str,
|
||||
flow: &str,
|
||||
min_run_interval_secs: u64,
|
||||
max_filter_num_per_query: usize,
|
||||
ctx: QueryContextRef,
|
||||
) -> Result<api::v1::flow::FlowResponse> {
|
||||
self.adjust_inner(
|
||||
catalog,
|
||||
flow,
|
||||
min_run_interval_secs,
|
||||
max_filter_num_per_query,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
impl FlowServiceOperator {
|
||||
async fn adjust_inner(
|
||||
&self,
|
||||
catalog: &str,
|
||||
flow: &str,
|
||||
min_run_interval_secs: u64,
|
||||
max_filter_num_per_query: usize,
|
||||
ctx: QueryContextRef,
|
||||
) -> Result<api::v1::flow::FlowResponse> {
|
||||
let id = self
|
||||
.flow_metadata_manager
|
||||
.flow_name_manager()
|
||||
.get(catalog, flow)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(common_query::error::ExecuteSnafu)?
|
||||
.context(common_meta::error::FlowNotFoundSnafu {
|
||||
flow_name: format!("{}.{}", catalog, flow),
|
||||
})
|
||||
.map_err(BoxedError::new)
|
||||
.context(common_query::error::ExecuteSnafu)?
|
||||
.flow_id();
|
||||
|
||||
let all_flownode_peers = self
|
||||
.flow_metadata_manager
|
||||
.flow_route_manager()
|
||||
.routes(id)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(common_query::error::ExecuteSnafu)?;
|
||||
|
||||
// order of flownodes doesn't matter here
|
||||
let all_flow_nodes = FuturesUnordered::from_iter(
|
||||
all_flownode_peers
|
||||
.iter()
|
||||
.map(|(_key, peer)| self.node_manager.flownode(peer.peer())),
|
||||
)
|
||||
.collect::<Vec<_>>()
|
||||
.await;
|
||||
|
||||
// TODO(discord9): use proper type for flow options
|
||||
let options = json!({
|
||||
"min_run_interval_secs": min_run_interval_secs,
|
||||
"max_filter_num_per_query": max_filter_num_per_query,
|
||||
});
|
||||
|
||||
for node in all_flow_nodes {
|
||||
let _res = {
|
||||
use api::v1::flow::{flow_request, FlowRequest};
|
||||
let flush_req = FlowRequest {
|
||||
header: Some(FlowRequestHeader {
|
||||
tracing_context: TracingContext::from_current_span().to_w3c(),
|
||||
query_context: Some(
|
||||
common_meta::rpc::ddl::QueryContext::from(ctx.clone()).into(),
|
||||
),
|
||||
}),
|
||||
body: Some(flow_request::Body::Adjust(AdjustFlow {
|
||||
flow_id: Some(api::v1::FlowId { id }),
|
||||
options: options.to_string(),
|
||||
})),
|
||||
};
|
||||
node.handle(flush_req)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(common_query::error::ExecuteSnafu)?
|
||||
};
|
||||
}
|
||||
Ok(Default::default())
|
||||
}
|
||||
|
||||
/// Flush the flownodes according to the flow id.
|
||||
async fn flush_inner(
|
||||
&self,
|
||||
|
||||
@@ -25,7 +25,10 @@ use common_datasource::object_store::{build_backend, parse_url};
|
||||
use common_datasource::util::find_dir_and_filename;
|
||||
use common_query::Output;
|
||||
use common_recordbatch::adapter::DfRecordBatchStreamAdapter;
|
||||
use common_recordbatch::SendableRecordBatchStream;
|
||||
use common_recordbatch::{
|
||||
map_json_type_to_string, map_json_type_to_string_schema, RecordBatchStream,
|
||||
SendableRecordBatchMapper, SendableRecordBatchStream,
|
||||
};
|
||||
use common_telemetry::{debug, tracing};
|
||||
use datafusion::datasource::DefaultTableSource;
|
||||
use datafusion_common::TableReference as DfTableReference;
|
||||
@@ -57,6 +60,11 @@ impl StatementExecutor {
|
||||
) -> Result<usize> {
|
||||
let threshold = WRITE_BUFFER_THRESHOLD.as_bytes() as usize;
|
||||
|
||||
let stream = Box::pin(SendableRecordBatchMapper::new(
|
||||
stream,
|
||||
map_json_type_to_string,
|
||||
map_json_type_to_string_schema,
|
||||
));
|
||||
match format {
|
||||
Format::Csv(_) => stream_to_csv(
|
||||
Box::pin(DfRecordBatchStreamAdapter::new(stream)),
|
||||
|
||||
@@ -187,24 +187,42 @@ impl StatementExecutor {
|
||||
}
|
||||
);
|
||||
|
||||
// Check if is creating logical table
|
||||
if create_table.engine == METRIC_ENGINE_NAME
|
||||
&& create_table
|
||||
.table_options
|
||||
.contains_key(LOGICAL_TABLE_METADATA_KEY)
|
||||
{
|
||||
return self
|
||||
.create_logical_tables(std::slice::from_ref(create_table), query_ctx)
|
||||
// Create logical tables
|
||||
ensure!(
|
||||
partitions.is_none(),
|
||||
InvalidPartitionRuleSnafu {
|
||||
reason: "logical table in metric engine should not have partition rule, it will be inherited from physical table",
|
||||
}
|
||||
);
|
||||
self.create_logical_tables(std::slice::from_ref(create_table), query_ctx)
|
||||
.await?
|
||||
.into_iter()
|
||||
.next()
|
||||
.context(error::UnexpectedSnafu {
|
||||
violated: "expected to create a logical table",
|
||||
});
|
||||
violated: "expected to create logical tables",
|
||||
})
|
||||
} else {
|
||||
// Create other normal table
|
||||
self.create_non_logic_table(create_table, partitions, query_ctx)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub async fn create_non_logic_table(
|
||||
&self,
|
||||
create_table: &mut CreateTableExpr,
|
||||
partitions: Option<Partitions>,
|
||||
query_ctx: QueryContextRef,
|
||||
) -> Result<TableRef> {
|
||||
let _timer = crate::metrics::DIST_CREATE_TABLE.start_timer();
|
||||
|
||||
// Check if schema exists
|
||||
let schema = self
|
||||
.table_metadata_manager
|
||||
.schema_manager()
|
||||
@@ -214,7 +232,6 @@ impl StatementExecutor {
|
||||
))
|
||||
.await
|
||||
.context(TableMetadataManagerSnafu)?;
|
||||
|
||||
ensure!(
|
||||
schema.is_some(),
|
||||
SchemaNotFoundSnafu {
|
||||
|
||||
@@ -59,6 +59,7 @@ sql.workspace = true
|
||||
table.workspace = true
|
||||
tokio.workspace = true
|
||||
urlencoding = "2.1"
|
||||
vrl = "0.24"
|
||||
yaml-rust = "0.4"
|
||||
|
||||
[dev-dependencies]
|
||||
|
||||
@@ -24,9 +24,9 @@ fn processor_mut(
|
||||
let mut result = Vec::with_capacity(input_values.len());
|
||||
|
||||
for v in input_values {
|
||||
let mut payload = json_to_map(v).unwrap();
|
||||
let payload = json_to_map(v).unwrap();
|
||||
let r = pipeline
|
||||
.exec_mut(&mut payload)?
|
||||
.exec_mut(payload)?
|
||||
.into_transformed()
|
||||
.expect("expect transformed result ");
|
||||
result.push(r.0);
|
||||
|
||||
@@ -20,10 +20,10 @@ use crate::error::{
|
||||
Error, FieldRequiredForDispatcherSnafu, Result, TableSuffixRequiredForDispatcherRuleSnafu,
|
||||
ValueRequiredForDispatcherRuleSnafu,
|
||||
};
|
||||
use crate::etl::ctx_req::TABLE_SUFFIX_KEY;
|
||||
use crate::{PipelineMap, Value};
|
||||
|
||||
const FIELD: &str = "field";
|
||||
const TABLE_SUFFIX: &str = "table_suffix";
|
||||
const PIPELINE: &str = "pipeline";
|
||||
const VALUE: &str = "value";
|
||||
const RULES: &str = "rules";
|
||||
@@ -80,7 +80,7 @@ impl TryFrom<&Yaml> for Dispatcher {
|
||||
rules
|
||||
.iter()
|
||||
.map(|rule| {
|
||||
let table_part = rule[TABLE_SUFFIX]
|
||||
let table_part = rule[TABLE_SUFFIX_KEY]
|
||||
.as_str()
|
||||
.map(|s| s.to_string())
|
||||
.context(TableSuffixRequiredForDispatcherRuleSnafu)?;
|
||||
|
||||
@@ -411,13 +411,6 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
#[snafu(display(
|
||||
"At least one timestamp-related processor is required to use auto transform"
|
||||
))]
|
||||
TransformNoTimestampProcessor {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
#[snafu(display(
|
||||
"Illegal to set multiple timestamp Index columns, please set only one: {columns}"
|
||||
))]
|
||||
@@ -433,7 +426,7 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
#[snafu(display("Exactly one timestamp value is required to use auto transform"))]
|
||||
#[snafu(display("Exactly one time-related processor and one timestamp value is required to use auto transform"))]
|
||||
AutoTransformOneTimestamp {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
@@ -686,6 +679,54 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to compile VRL, {}", msg))]
|
||||
CompileVrl {
|
||||
msg: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to execute VRL, {}", msg))]
|
||||
ExecuteVrl {
|
||||
msg: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Float is not a number: {}", input_float))]
|
||||
FloatNaN {
|
||||
input_float: f64,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid timestamp value: {}", input))]
|
||||
InvalidTimestamp {
|
||||
input: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to convert bytes to utf8"))]
|
||||
BytesToUtf8 {
|
||||
#[snafu(source)]
|
||||
error: std::string::FromUtf8Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Please don't use regex in Vrl script"))]
|
||||
VrlRegexValue {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Vrl script should return `.` in the end"))]
|
||||
VrlReturnValue {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to cast type, msg: {}", msg))]
|
||||
CastType {
|
||||
msg: String,
|
||||
@@ -832,7 +873,6 @@ impl ErrorExt for Error {
|
||||
| TransformTypeMustBeSet { .. }
|
||||
| TransformColumnNameMustBeUnique { .. }
|
||||
| TransformMultipleTimestampIndex { .. }
|
||||
| TransformNoTimestampProcessor { .. }
|
||||
| TransformTimestampIndexCount { .. }
|
||||
| AutoTransformOneTimestamp { .. }
|
||||
| CoerceUnsupportedNullType { .. }
|
||||
@@ -866,6 +906,13 @@ impl ErrorExt for Error {
|
||||
| ReachedMaxNestedLevels { .. }
|
||||
| RequiredTableSuffixTemplate
|
||||
| InvalidTableSuffixTemplate { .. }
|
||||
| CompileVrl { .. }
|
||||
| ExecuteVrl { .. }
|
||||
| FloatNaN { .. }
|
||||
| BytesToUtf8 { .. }
|
||||
| InvalidTimestamp { .. }
|
||||
| VrlRegexValue { .. }
|
||||
| VrlReturnValue { .. }
|
||||
| PipelineMissing { .. } => StatusCode::InvalidArguments,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -30,12 +30,13 @@ use yaml_rust::YamlLoader;
|
||||
|
||||
use crate::dispatcher::{Dispatcher, Rule};
|
||||
use crate::error::{
|
||||
InputValueMustBeObjectSnafu, IntermediateKeyIndexSnafu, Result,
|
||||
TransformNoTimestampProcessorSnafu, YamlLoadSnafu, YamlParseSnafu,
|
||||
AutoTransformOneTimestampSnafu, InputValueMustBeObjectSnafu, IntermediateKeyIndexSnafu, Result,
|
||||
YamlLoadSnafu, YamlParseSnafu,
|
||||
};
|
||||
use crate::etl::ctx_req::TABLE_SUFFIX_KEY;
|
||||
use crate::etl::processor::ProcessorKind;
|
||||
use crate::tablesuffix::TableSuffixTemplate;
|
||||
use crate::GreptimeTransformer;
|
||||
use crate::{ContextOpt, GreptimeTransformer};
|
||||
|
||||
const DESCRIPTION: &str = "description";
|
||||
const PROCESSORS: &str = "processors";
|
||||
@@ -80,16 +81,14 @@ pub fn parse(input: &Content) -> Result<Pipeline> {
|
||||
// check processors have at least one timestamp-related processor
|
||||
let cnt = processors
|
||||
.iter()
|
||||
.filter(|p| {
|
||||
matches!(
|
||||
p,
|
||||
ProcessorKind::Date(_)
|
||||
| ProcessorKind::Timestamp(_)
|
||||
| ProcessorKind::Epoch(_)
|
||||
)
|
||||
.filter_map(|p| match p {
|
||||
ProcessorKind::Date(d) => Some(d.target_count()),
|
||||
ProcessorKind::Timestamp(t) => Some(t.target_count()),
|
||||
ProcessorKind::Epoch(e) => Some(e.target_count()),
|
||||
_ => None,
|
||||
})
|
||||
.count();
|
||||
ensure!(cnt > 0, TransformNoTimestampProcessorSnafu);
|
||||
.sum::<usize>();
|
||||
ensure!(cnt == 1, AutoTransformOneTimestampSnafu);
|
||||
None
|
||||
} else {
|
||||
Some(GreptimeTransformer::new(transformers)?)
|
||||
@@ -156,14 +155,15 @@ impl DispatchedTo {
|
||||
pub enum PipelineExecOutput {
|
||||
Transformed(TransformedOutput),
|
||||
AutoTransform(AutoTransformOutput),
|
||||
DispatchedTo(DispatchedTo),
|
||||
DispatchedTo(DispatchedTo, PipelineMap),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct TransformedOutput {
|
||||
pub opt: String,
|
||||
pub opt: ContextOpt,
|
||||
pub row: Row,
|
||||
pub table_suffix: Option<String>,
|
||||
pub pipeline_map: PipelineMap,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -171,6 +171,7 @@ pub struct AutoTransformOutput {
|
||||
pub table_suffix: Option<String>,
|
||||
// ts_column_name -> unit
|
||||
pub ts_unit_map: HashMap<String, TimeUnit>,
|
||||
pub pipeline_map: PipelineMap,
|
||||
}
|
||||
|
||||
impl PipelineExecOutput {
|
||||
@@ -188,7 +189,7 @@ impl PipelineExecOutput {
|
||||
|
||||
// Note: This is a test only function, do not use it in production.
|
||||
pub fn into_dispatched(self) -> Option<DispatchedTo> {
|
||||
if let Self::DispatchedTo(d) = self {
|
||||
if let Self::DispatchedTo(d, _) = self {
|
||||
Some(d)
|
||||
} else {
|
||||
None
|
||||
@@ -231,30 +232,38 @@ pub fn simd_json_array_to_map(val: Vec<simd_json::OwnedValue>) -> Result<Vec<Pip
|
||||
}
|
||||
|
||||
impl Pipeline {
|
||||
pub fn exec_mut(&self, val: &mut PipelineMap) -> Result<PipelineExecOutput> {
|
||||
pub fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineExecOutput> {
|
||||
// process
|
||||
for processor in self.processors.iter() {
|
||||
processor.exec_mut(val)?;
|
||||
val = processor.exec_mut(val)?;
|
||||
}
|
||||
|
||||
// dispatch, fast return if matched
|
||||
if let Some(rule) = self.dispatcher.as_ref().and_then(|d| d.exec(val)) {
|
||||
return Ok(PipelineExecOutput::DispatchedTo(rule.into()));
|
||||
if let Some(rule) = self.dispatcher.as_ref().and_then(|d| d.exec(&val)) {
|
||||
return Ok(PipelineExecOutput::DispatchedTo(rule.into(), val));
|
||||
}
|
||||
|
||||
// do transform
|
||||
if let Some(transformer) = self.transformer() {
|
||||
let (opt, row) = transformer.transform_mut(val)?;
|
||||
let table_suffix = self.tablesuffix.as_ref().and_then(|t| t.apply(val));
|
||||
let (mut opt, row) = transformer.transform_mut(&mut val)?;
|
||||
let table_suffix = opt.resolve_table_suffix(self.tablesuffix.as_ref(), &val);
|
||||
|
||||
Ok(PipelineExecOutput::Transformed(TransformedOutput {
|
||||
opt,
|
||||
row,
|
||||
table_suffix,
|
||||
pipeline_map: val,
|
||||
}))
|
||||
} else {
|
||||
let table_suffix = self.tablesuffix.as_ref().and_then(|t| t.apply(val));
|
||||
// check table suffix var
|
||||
let table_suffix = val
|
||||
.remove(TABLE_SUFFIX_KEY)
|
||||
.map(|f| f.to_str_value())
|
||||
.or_else(|| self.tablesuffix.as_ref().and_then(|t| t.apply(&val)));
|
||||
|
||||
let mut ts_unit_map = HashMap::with_capacity(4);
|
||||
// get all ts values
|
||||
for (k, v) in val {
|
||||
for (k, v) in val.iter() {
|
||||
if let Value::Timestamp(ts) = v {
|
||||
if !ts_unit_map.contains_key(k) {
|
||||
ts_unit_map.insert(k.clone(), ts.get_unit());
|
||||
@@ -264,6 +273,7 @@ impl Pipeline {
|
||||
Ok(PipelineExecOutput::AutoTransform(AutoTransformOutput {
|
||||
table_suffix,
|
||||
ts_unit_map,
|
||||
pipeline_map: val,
|
||||
}))
|
||||
}
|
||||
}
|
||||
@@ -318,9 +328,9 @@ transform:
|
||||
type: uint32
|
||||
"#;
|
||||
let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
|
||||
let mut payload = json_to_map(input_value).unwrap();
|
||||
let payload = json_to_map(input_value).unwrap();
|
||||
let result = pipeline
|
||||
.exec_mut(&mut payload)
|
||||
.exec_mut(payload)
|
||||
.unwrap()
|
||||
.into_transformed()
|
||||
.unwrap();
|
||||
@@ -371,7 +381,7 @@ transform:
|
||||
let mut payload = PipelineMap::new();
|
||||
payload.insert("message".to_string(), Value::String(message));
|
||||
let result = pipeline
|
||||
.exec_mut(&mut payload)
|
||||
.exec_mut(payload)
|
||||
.unwrap()
|
||||
.into_transformed()
|
||||
.unwrap();
|
||||
@@ -446,9 +456,9 @@ transform:
|
||||
"#;
|
||||
|
||||
let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
|
||||
let mut payload = json_to_map(input_value).unwrap();
|
||||
let payload = json_to_map(input_value).unwrap();
|
||||
let result = pipeline
|
||||
.exec_mut(&mut payload)
|
||||
.exec_mut(payload)
|
||||
.unwrap()
|
||||
.into_transformed()
|
||||
.unwrap();
|
||||
@@ -488,10 +498,10 @@ transform:
|
||||
|
||||
let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
|
||||
let schema = pipeline.schemas().unwrap().clone();
|
||||
let mut result = json_to_map(input_value).unwrap();
|
||||
let result = json_to_map(input_value).unwrap();
|
||||
|
||||
let row = pipeline
|
||||
.exec_mut(&mut result)
|
||||
.exec_mut(result)
|
||||
.unwrap()
|
||||
.into_transformed()
|
||||
.unwrap();
|
||||
|
||||
@@ -13,69 +13,145 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::hash_map::IntoIter;
|
||||
use std::collections::BTreeMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use ahash::{HashMap, HashMapExt};
|
||||
use api::v1::{RowInsertRequest, RowInsertRequests, Rows};
|
||||
use itertools::Itertools;
|
||||
use session::context::{QueryContext, QueryContextRef};
|
||||
|
||||
use crate::tablesuffix::TableSuffixTemplate;
|
||||
use crate::PipelineMap;
|
||||
|
||||
const DEFAULT_OPT: &str = "";
|
||||
const GREPTIME_AUTO_CREATE_TABLE: &str = "greptime_auto_create_table";
|
||||
const GREPTIME_TTL: &str = "greptime_ttl";
|
||||
const GREPTIME_APPEND_MODE: &str = "greptime_append_mode";
|
||||
const GREPTIME_MERGE_MODE: &str = "greptime_merge_mode";
|
||||
const GREPTIME_PHYSICAL_TABLE: &str = "greptime_physical_table";
|
||||
const GREPTIME_SKIP_WAL: &str = "greptime_skip_wal";
|
||||
const GREPTIME_TABLE_SUFFIX: &str = "greptime_table_suffix";
|
||||
|
||||
pub const PIPELINE_HINT_KEYS: [&str; 6] = [
|
||||
"greptime_auto_create_table",
|
||||
"greptime_ttl",
|
||||
"greptime_append_mode",
|
||||
"greptime_merge_mode",
|
||||
"greptime_physical_table",
|
||||
"greptime_skip_wal",
|
||||
pub(crate) const AUTO_CREATE_TABLE_KEY: &str = "auto_create_table";
|
||||
pub(crate) const TTL_KEY: &str = "ttl";
|
||||
pub(crate) const APPEND_MODE_KEY: &str = "append_mode";
|
||||
pub(crate) const MERGE_MODE_KEY: &str = "merge_mode";
|
||||
pub(crate) const PHYSICAL_TABLE_KEY: &str = "physical_table";
|
||||
pub(crate) const SKIP_WAL_KEY: &str = "skip_wal";
|
||||
pub(crate) const TABLE_SUFFIX_KEY: &str = "table_suffix";
|
||||
|
||||
pub const PIPELINE_HINT_KEYS: [&str; 7] = [
|
||||
GREPTIME_AUTO_CREATE_TABLE,
|
||||
GREPTIME_TTL,
|
||||
GREPTIME_APPEND_MODE,
|
||||
GREPTIME_MERGE_MODE,
|
||||
GREPTIME_PHYSICAL_TABLE,
|
||||
GREPTIME_SKIP_WAL,
|
||||
GREPTIME_TABLE_SUFFIX,
|
||||
];
|
||||
|
||||
const PIPELINE_HINT_PREFIX: &str = "greptime_";
|
||||
|
||||
// Remove hints from the pipeline context and form a option string
|
||||
// e.g: skip_wal=true,ttl=1d
|
||||
pub fn from_pipeline_map_to_opt(pipeline_map: &mut PipelineMap) -> String {
|
||||
let mut btreemap = BTreeMap::new();
|
||||
for k in PIPELINE_HINT_KEYS {
|
||||
if let Some(v) = pipeline_map.remove(k) {
|
||||
btreemap.insert(k, v.to_str_value());
|
||||
/// ContextOpt is a collection of options(including table options and pipeline options)
|
||||
/// that should be extracted during the pipeline execution.
|
||||
///
|
||||
/// The options are set in the format of hint keys. See [`PIPELINE_HINT_KEYS`].
|
||||
/// It's is used as the key in [`ContextReq`] for grouping the row insert requests.
|
||||
#[derive(Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
||||
pub struct ContextOpt {
|
||||
// table options, that need to be set in the query context before making row insert requests
|
||||
auto_create_table: Option<String>,
|
||||
ttl: Option<String>,
|
||||
append_mode: Option<String>,
|
||||
merge_mode: Option<String>,
|
||||
physical_table: Option<String>,
|
||||
skip_wal: Option<String>,
|
||||
|
||||
// pipeline options, not set in query context
|
||||
// can be removed before the end of the pipeline execution
|
||||
table_suffix: Option<String>,
|
||||
}
|
||||
|
||||
impl ContextOpt {
|
||||
pub fn from_pipeline_map_to_opt(pipeline_map: &mut PipelineMap) -> Self {
|
||||
let mut opt = Self::default();
|
||||
for k in PIPELINE_HINT_KEYS {
|
||||
if let Some(v) = pipeline_map.remove(k) {
|
||||
match k {
|
||||
GREPTIME_AUTO_CREATE_TABLE => {
|
||||
opt.auto_create_table = Some(v.to_str_value());
|
||||
}
|
||||
GREPTIME_TTL => {
|
||||
opt.ttl = Some(v.to_str_value());
|
||||
}
|
||||
GREPTIME_APPEND_MODE => {
|
||||
opt.append_mode = Some(v.to_str_value());
|
||||
}
|
||||
GREPTIME_MERGE_MODE => {
|
||||
opt.merge_mode = Some(v.to_str_value());
|
||||
}
|
||||
GREPTIME_PHYSICAL_TABLE => {
|
||||
opt.physical_table = Some(v.to_str_value());
|
||||
}
|
||||
GREPTIME_SKIP_WAL => {
|
||||
opt.skip_wal = Some(v.to_str_value());
|
||||
}
|
||||
GREPTIME_TABLE_SUFFIX => {
|
||||
opt.table_suffix = Some(v.to_str_value());
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
opt
|
||||
}
|
||||
|
||||
pub(crate) fn resolve_table_suffix(
|
||||
&mut self,
|
||||
table_suffix: Option<&TableSuffixTemplate>,
|
||||
pipeline_map: &PipelineMap,
|
||||
) -> Option<String> {
|
||||
self.table_suffix
|
||||
.take()
|
||||
.or_else(|| table_suffix.and_then(|s| s.apply(pipeline_map)))
|
||||
}
|
||||
|
||||
pub fn set_query_context(self, ctx: &mut QueryContext) {
|
||||
if let Some(auto_create_table) = &self.auto_create_table {
|
||||
ctx.set_extension(AUTO_CREATE_TABLE_KEY, auto_create_table);
|
||||
}
|
||||
if let Some(ttl) = &self.ttl {
|
||||
ctx.set_extension(TTL_KEY, ttl);
|
||||
}
|
||||
if let Some(append_mode) = &self.append_mode {
|
||||
ctx.set_extension(APPEND_MODE_KEY, append_mode);
|
||||
}
|
||||
if let Some(merge_mode) = &self.merge_mode {
|
||||
ctx.set_extension(MERGE_MODE_KEY, merge_mode);
|
||||
}
|
||||
if let Some(physical_table) = &self.physical_table {
|
||||
ctx.set_extension(PHYSICAL_TABLE_KEY, physical_table);
|
||||
}
|
||||
if let Some(skip_wal) = &self.skip_wal {
|
||||
ctx.set_extension(SKIP_WAL_KEY, skip_wal);
|
||||
}
|
||||
}
|
||||
btreemap
|
||||
.into_iter()
|
||||
.map(|(k, v)| format!("{}={}", k.replace(PIPELINE_HINT_PREFIX, ""), v))
|
||||
.join(",")
|
||||
}
|
||||
|
||||
// split the option string back to a map
|
||||
fn from_opt_to_map(opt: &str) -> HashMap<&str, &str> {
|
||||
opt.split(',')
|
||||
.filter_map(|s| {
|
||||
s.split_once("=")
|
||||
.filter(|(k, v)| !k.is_empty() && !v.is_empty())
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
// ContextReq is a collection of row insert requests with different options.
|
||||
// The default option is empty string.
|
||||
// Because options are set in query context, we have to split them into sequential calls
|
||||
// e.g:
|
||||
// {
|
||||
// "skip_wal=true,ttl=1d": [RowInsertRequest],
|
||||
// "ttl=1d": [RowInsertRequest],
|
||||
// }
|
||||
/// ContextReq is a collection of row insert requests with different options.
|
||||
/// The default option is all empty.
|
||||
/// Because options are set in query context, we have to split them into sequential calls
|
||||
/// The key is a [`ContextOpt`] struct for strong type.
|
||||
/// e.g:
|
||||
/// {
|
||||
/// "skip_wal=true,ttl=1d": [RowInsertRequest],
|
||||
/// "ttl=1d": [RowInsertRequest],
|
||||
/// }
|
||||
#[derive(Debug, Default)]
|
||||
pub struct ContextReq {
|
||||
req: HashMap<String, Vec<RowInsertRequest>>,
|
||||
req: HashMap<ContextOpt, Vec<RowInsertRequest>>,
|
||||
}
|
||||
|
||||
impl ContextReq {
|
||||
pub fn from_opt_map(opt_map: HashMap<String, Rows>, table_name: String) -> Self {
|
||||
pub fn from_opt_map(opt_map: HashMap<ContextOpt, Rows>, table_name: String) -> Self {
|
||||
Self {
|
||||
req: opt_map
|
||||
.into_iter()
|
||||
@@ -88,17 +164,17 @@ impl ContextReq {
|
||||
}],
|
||||
)
|
||||
})
|
||||
.collect::<HashMap<String, Vec<RowInsertRequest>>>(),
|
||||
.collect::<HashMap<ContextOpt, Vec<RowInsertRequest>>>(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn default_opt_with_reqs(reqs: Vec<RowInsertRequest>) -> Self {
|
||||
let mut req_map = HashMap::new();
|
||||
req_map.insert(DEFAULT_OPT.to_string(), reqs);
|
||||
req_map.insert(ContextOpt::default(), reqs);
|
||||
Self { req: req_map }
|
||||
}
|
||||
|
||||
pub fn add_rows(&mut self, opt: String, req: RowInsertRequest) {
|
||||
pub fn add_rows(&mut self, opt: ContextOpt, req: RowInsertRequest) {
|
||||
self.req.entry(opt).or_default().push(req);
|
||||
}
|
||||
|
||||
@@ -131,7 +207,7 @@ impl ContextReq {
|
||||
// It will clone the query context for each option and set the options to the context.
|
||||
// Then it will return the context and the row insert requests for actual insert.
|
||||
pub struct ContextReqIter {
|
||||
opt_req: IntoIter<String, Vec<RowInsertRequest>>,
|
||||
opt_req: IntoIter<ContextOpt, Vec<RowInsertRequest>>,
|
||||
ctx_template: QueryContext,
|
||||
}
|
||||
|
||||
@@ -140,13 +216,8 @@ impl Iterator for ContextReqIter {
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
let (opt, req_vec) = self.opt_req.next()?;
|
||||
|
||||
let opt_map = from_opt_to_map(&opt);
|
||||
|
||||
let mut ctx = self.ctx_template.clone();
|
||||
for (k, v) in opt_map {
|
||||
ctx.set_extension(k, v);
|
||||
}
|
||||
opt.set_query_context(&mut ctx);
|
||||
|
||||
Some((Arc::new(ctx), RowInsertRequests { inserts: req_vec }))
|
||||
}
|
||||
|
||||
@@ -29,6 +29,7 @@ pub mod select;
|
||||
pub mod simple_extract;
|
||||
pub mod timestamp;
|
||||
pub mod urlencoding;
|
||||
pub mod vrl;
|
||||
|
||||
use std::str::FromStr;
|
||||
|
||||
@@ -58,6 +59,7 @@ use crate::etl::field::{Field, Fields};
|
||||
use crate::etl::processor::json_parse::JsonParseProcessor;
|
||||
use crate::etl::processor::select::SelectProcessor;
|
||||
use crate::etl::processor::simple_extract::SimpleExtractProcessor;
|
||||
use crate::etl::processor::vrl::VrlProcessor;
|
||||
use crate::etl::PipelineMap;
|
||||
|
||||
const FIELD_NAME: &str = "field";
|
||||
@@ -123,7 +125,7 @@ pub trait Processor: std::fmt::Debug + Send + Sync + 'static {
|
||||
fn ignore_missing(&self) -> bool;
|
||||
|
||||
/// Execute the processor on a vector which be preprocessed by the pipeline
|
||||
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()>;
|
||||
fn exec_mut(&self, val: PipelineMap) -> Result<PipelineMap>;
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -146,6 +148,7 @@ pub enum ProcessorKind {
|
||||
Decolorize(DecolorizeProcessor),
|
||||
Digest(DigestProcessor),
|
||||
Select(SelectProcessor),
|
||||
Vrl(VrlProcessor),
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
@@ -227,6 +230,7 @@ fn parse_processor(doc: &yaml_rust::Yaml) -> Result<ProcessorKind> {
|
||||
json_parse::PROCESSOR_JSON_PARSE => {
|
||||
ProcessorKind::JsonParse(JsonParseProcessor::try_from(value)?)
|
||||
}
|
||||
vrl::PROCESSOR_VRL => ProcessorKind::Vrl(VrlProcessor::try_from(value)?),
|
||||
select::PROCESSOR_SELECT => ProcessorKind::Select(SelectProcessor::try_from(value)?),
|
||||
_ => return UnsupportedProcessorSnafu { processor: str_key }.fail(),
|
||||
};
|
||||
|
||||
@@ -249,7 +249,7 @@ impl Processor for CmcdProcessor {
|
||||
self.ignore_missing
|
||||
}
|
||||
|
||||
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
|
||||
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
|
||||
for field in self.fields.iter() {
|
||||
let name = field.input_field();
|
||||
|
||||
@@ -277,7 +277,7 @@ impl Processor for CmcdProcessor {
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
Ok(val)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -189,7 +189,7 @@ impl Processor for CsvProcessor {
|
||||
self.ignore_missing
|
||||
}
|
||||
|
||||
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
|
||||
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
|
||||
for field in self.fields.iter() {
|
||||
let name = field.input_field();
|
||||
|
||||
@@ -216,7 +216,7 @@ impl Processor for CsvProcessor {
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
Ok(val)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -163,6 +163,10 @@ pub struct DateProcessor {
|
||||
}
|
||||
|
||||
impl DateProcessor {
|
||||
pub(crate) fn target_count(&self) -> usize {
|
||||
self.fields.len()
|
||||
}
|
||||
|
||||
fn parse(&self, val: &str) -> Result<Timestamp> {
|
||||
let mut tz = Tz::UTC;
|
||||
if let Some(timezone) = &self.timezone {
|
||||
@@ -194,7 +198,7 @@ impl Processor for DateProcessor {
|
||||
self.ignore_missing
|
||||
}
|
||||
|
||||
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
|
||||
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
|
||||
for field in self.fields.iter() {
|
||||
let index = field.input_field();
|
||||
match val.get(index) {
|
||||
@@ -221,7 +225,7 @@ impl Processor for DateProcessor {
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
Ok(val)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -102,7 +102,7 @@ impl crate::etl::processor::Processor for DecolorizeProcessor {
|
||||
self.ignore_missing
|
||||
}
|
||||
|
||||
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
|
||||
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
|
||||
for field in self.fields.iter() {
|
||||
let index = field.input_field();
|
||||
match val.get(index) {
|
||||
@@ -122,7 +122,7 @@ impl crate::etl::processor::Processor for DecolorizeProcessor {
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
Ok(val)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -201,7 +201,7 @@ impl crate::etl::processor::Processor for DigestProcessor {
|
||||
self.ignore_missing
|
||||
}
|
||||
|
||||
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
|
||||
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
|
||||
for field in self.fields.iter() {
|
||||
let index = field.input_field();
|
||||
match val.get(index) {
|
||||
@@ -221,7 +221,7 @@ impl crate::etl::processor::Processor for DigestProcessor {
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
Ok(val)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -601,7 +601,7 @@ impl Processor for DissectProcessor {
|
||||
self.ignore_missing
|
||||
}
|
||||
|
||||
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
|
||||
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
|
||||
for field in self.fields.iter() {
|
||||
let index = field.input_field();
|
||||
match val.get(index) {
|
||||
@@ -629,7 +629,7 @@ impl Processor for DissectProcessor {
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
Ok(val)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -111,6 +111,10 @@ impl EpochProcessor {
|
||||
Resolution::Nano => Ok(Timestamp::Nanosecond(t)),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn target_count(&self) -> usize {
|
||||
self.fields.len()
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<&yaml_rust::yaml::Hash> for EpochProcessor {
|
||||
@@ -163,7 +167,7 @@ impl Processor for EpochProcessor {
|
||||
self.ignore_missing
|
||||
}
|
||||
|
||||
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
|
||||
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
|
||||
for field in self.fields.iter() {
|
||||
let index = field.input_field();
|
||||
match val.get(index) {
|
||||
@@ -183,7 +187,7 @@ impl Processor for EpochProcessor {
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
Ok(val)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -118,7 +118,7 @@ impl crate::etl::processor::Processor for GsubProcessor {
|
||||
self.ignore_missing
|
||||
}
|
||||
|
||||
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
|
||||
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
|
||||
for field in self.fields.iter() {
|
||||
let index = field.input_field();
|
||||
match val.get(index) {
|
||||
@@ -138,7 +138,7 @@ impl crate::etl::processor::Processor for GsubProcessor {
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
Ok(val)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -95,7 +95,7 @@ impl Processor for JoinProcessor {
|
||||
self.ignore_missing
|
||||
}
|
||||
|
||||
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
|
||||
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
|
||||
for field in self.fields.iter() {
|
||||
let index = field.input_field();
|
||||
match val.get(index) {
|
||||
@@ -123,7 +123,7 @@ impl Processor for JoinProcessor {
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
Ok(val)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -97,7 +97,7 @@ impl Processor for JsonParseProcessor {
|
||||
self.ignore_missing
|
||||
}
|
||||
|
||||
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
|
||||
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
|
||||
for field in self.fields.iter() {
|
||||
let index = field.input_field();
|
||||
match val.get(index) {
|
||||
@@ -117,7 +117,7 @@ impl Processor for JsonParseProcessor {
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
Ok(val)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -125,7 +125,7 @@ impl Processor for JsonPathProcessor {
|
||||
self.ignore_missing
|
||||
}
|
||||
|
||||
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
|
||||
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
|
||||
for field in self.fields.iter() {
|
||||
let index = field.input_field();
|
||||
match val.get(index) {
|
||||
@@ -145,7 +145,7 @@ impl Processor for JsonPathProcessor {
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
Ok(val)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -126,7 +126,7 @@ impl Processor for LetterProcessor {
|
||||
self.ignore_missing
|
||||
}
|
||||
|
||||
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
|
||||
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
|
||||
for field in self.fields.iter() {
|
||||
let index = field.input_field();
|
||||
match val.get(index) {
|
||||
@@ -154,7 +154,7 @@ impl Processor for LetterProcessor {
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
Ok(val)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -192,7 +192,7 @@ impl Processor for RegexProcessor {
|
||||
self.ignore_missing
|
||||
}
|
||||
|
||||
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
|
||||
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
|
||||
for field in self.fields.iter() {
|
||||
let index = field.input_field();
|
||||
let prefix = field.target_or_input_field();
|
||||
@@ -220,7 +220,7 @@ impl Processor for RegexProcessor {
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
Ok(val)
|
||||
}
|
||||
}
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -96,7 +96,7 @@ impl Processor for SelectProcessor {
|
||||
true
|
||||
}
|
||||
|
||||
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
|
||||
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
|
||||
match self.select_type {
|
||||
SelectType::Include => {
|
||||
let mut include_key_set = HashSet::with_capacity(val.len());
|
||||
@@ -121,7 +121,7 @@ impl Processor for SelectProcessor {
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
Ok(val)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -142,8 +142,9 @@ mod test {
|
||||
p.insert("hello".to_string(), Value::String("world".to_string()));
|
||||
p.insert("hello2".to_string(), Value::String("world2".to_string()));
|
||||
|
||||
let result = processor.exec_mut(&mut p);
|
||||
let result = processor.exec_mut(p);
|
||||
assert!(result.is_ok());
|
||||
let p = result.unwrap();
|
||||
assert_eq!(p.len(), 1);
|
||||
assert_eq!(p.get("hello"), Some(&Value::String("world".to_string())));
|
||||
}
|
||||
@@ -159,8 +160,9 @@ mod test {
|
||||
p.insert("hello".to_string(), Value::String("world".to_string()));
|
||||
p.insert("hello2".to_string(), Value::String("world2".to_string()));
|
||||
|
||||
let result = processor.exec_mut(&mut p);
|
||||
let result = processor.exec_mut(p);
|
||||
assert!(result.is_ok());
|
||||
let p = result.unwrap();
|
||||
assert_eq!(p.len(), 1);
|
||||
assert_eq!(p.get("hello3"), Some(&Value::String("world".to_string())));
|
||||
}
|
||||
@@ -176,8 +178,9 @@ mod test {
|
||||
p.insert("hello".to_string(), Value::String("world".to_string()));
|
||||
p.insert("hello2".to_string(), Value::String("world2".to_string()));
|
||||
|
||||
let result = processor.exec_mut(&mut p);
|
||||
let result = processor.exec_mut(p);
|
||||
assert!(result.is_ok());
|
||||
let p = result.unwrap();
|
||||
assert_eq!(p.len(), 1);
|
||||
assert_eq!(p.get("hello"), None);
|
||||
assert_eq!(p.get("hello2"), Some(&Value::String("world2".to_string())));
|
||||
|
||||
@@ -98,7 +98,7 @@ impl Processor for SimpleExtractProcessor {
|
||||
self.ignore_missing
|
||||
}
|
||||
|
||||
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
|
||||
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
|
||||
for field in self.fields.iter() {
|
||||
let index = field.input_field();
|
||||
match val.get(index) {
|
||||
@@ -118,7 +118,7 @@ impl Processor for SimpleExtractProcessor {
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
Ok(val)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -205,6 +205,10 @@ impl TimestampProcessor {
|
||||
Resolution::Nano => Ok(Timestamp::Nanosecond(t)),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn target_count(&self) -> usize {
|
||||
self.fields.len()
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_formats(yaml: &yaml_rust::yaml::Yaml) -> Result<Vec<(Arc<String>, Tz)>> {
|
||||
@@ -298,7 +302,7 @@ impl Processor for TimestampProcessor {
|
||||
self.ignore_missing
|
||||
}
|
||||
|
||||
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
|
||||
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
|
||||
for field in self.fields.iter() {
|
||||
let index = field.input_field();
|
||||
match val.get(index) {
|
||||
@@ -318,7 +322,7 @@ impl Processor for TimestampProcessor {
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
Ok(val)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -126,7 +126,7 @@ impl crate::etl::processor::Processor for UrlEncodingProcessor {
|
||||
self.ignore_missing
|
||||
}
|
||||
|
||||
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
|
||||
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
|
||||
for field in self.fields.iter() {
|
||||
let index = field.input_field();
|
||||
match val.get(index) {
|
||||
@@ -153,7 +153,7 @@ impl crate::etl::processor::Processor for UrlEncodingProcessor {
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
Ok(val)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
319
src/pipeline/src/etl/processor/vrl.rs
Normal file
319
src/pipeline/src/etl/processor/vrl.rs
Normal file
@@ -0,0 +1,319 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use chrono_tz::Tz;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use vrl::compiler::runtime::Runtime;
|
||||
use vrl::compiler::{compile, Program, TargetValue};
|
||||
use vrl::diagnostic::Formatter;
|
||||
use vrl::prelude::{Bytes, NotNan, TimeZone};
|
||||
use vrl::value::{KeyString, Kind, Secrets, Value as VrlValue};
|
||||
|
||||
use crate::error::{
|
||||
BytesToUtf8Snafu, CompileVrlSnafu, Error, ExecuteVrlSnafu, FloatNaNSnafu,
|
||||
InvalidTimestampSnafu, KeyMustBeStringSnafu, Result, VrlRegexValueSnafu, VrlReturnValueSnafu,
|
||||
};
|
||||
use crate::etl::processor::yaml_string;
|
||||
use crate::{PipelineMap, Value as PipelineValue};
|
||||
|
||||
pub(crate) const PROCESSOR_VRL: &str = "vrl";
|
||||
const SOURCE: &str = "source";
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct VrlProcessor {
|
||||
source: String,
|
||||
program: Program,
|
||||
}
|
||||
|
||||
impl VrlProcessor {
|
||||
pub fn new(source: String) -> Result<Self> {
|
||||
let fns = vrl::stdlib::all();
|
||||
|
||||
let compile_result = compile(&source, &fns).map_err(|e| {
|
||||
CompileVrlSnafu {
|
||||
msg: Formatter::new(&source, e).to_string(),
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
|
||||
let program = compile_result.program;
|
||||
|
||||
// check if the return value is have regex
|
||||
let result_def = program.final_type_info().result;
|
||||
let kind = result_def.kind();
|
||||
if !kind.is_object() {
|
||||
return VrlReturnValueSnafu.fail();
|
||||
}
|
||||
check_regex_output(kind)?;
|
||||
|
||||
Ok(Self { source, program })
|
||||
}
|
||||
|
||||
pub fn resolve(&self, m: PipelineMap) -> Result<PipelineValue> {
|
||||
let pipeline_vrl = m
|
||||
.into_iter()
|
||||
.map(|(k, v)| pipeline_value_to_vrl_value(v).map(|v| (KeyString::from(k), v)))
|
||||
.collect::<Result<BTreeMap<_, _>>>()?;
|
||||
|
||||
let mut target = TargetValue {
|
||||
value: VrlValue::Object(pipeline_vrl),
|
||||
metadata: VrlValue::Object(BTreeMap::new()),
|
||||
secrets: Secrets::default(),
|
||||
};
|
||||
|
||||
let timezone = TimeZone::Named(Tz::UTC);
|
||||
let mut runtime = Runtime::default();
|
||||
let re = runtime
|
||||
.resolve(&mut target, &self.program, &timezone)
|
||||
.map_err(|e| {
|
||||
ExecuteVrlSnafu {
|
||||
msg: e.get_expression_error().to_string(),
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
|
||||
vrl_value_to_pipeline_value(re)
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<&yaml_rust::yaml::Hash> for VrlProcessor {
|
||||
type Error = Error;
|
||||
|
||||
fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self> {
|
||||
let mut source = String::new();
|
||||
for (k, v) in value.iter() {
|
||||
let key = k
|
||||
.as_str()
|
||||
.with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
|
||||
if key == SOURCE {
|
||||
source = yaml_string(v, SOURCE)?;
|
||||
}
|
||||
}
|
||||
let processor = VrlProcessor::new(source)?;
|
||||
Ok(processor)
|
||||
}
|
||||
}
|
||||
|
||||
impl crate::etl::processor::Processor for VrlProcessor {
|
||||
fn kind(&self) -> &str {
|
||||
PROCESSOR_VRL
|
||||
}
|
||||
|
||||
fn ignore_missing(&self) -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
fn exec_mut(&self, val: PipelineMap) -> Result<PipelineMap> {
|
||||
let val = self.resolve(val)?;
|
||||
|
||||
if let PipelineValue::Map(m) = val {
|
||||
Ok(m.values)
|
||||
} else {
|
||||
VrlRegexValueSnafu.fail()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn pipeline_value_to_vrl_value(v: PipelineValue) -> Result<VrlValue> {
|
||||
match v {
|
||||
PipelineValue::Null => Ok(VrlValue::Null),
|
||||
PipelineValue::Int8(x) => Ok(VrlValue::Integer(x as i64)),
|
||||
PipelineValue::Int16(x) => Ok(VrlValue::Integer(x as i64)),
|
||||
PipelineValue::Int32(x) => Ok(VrlValue::Integer(x as i64)),
|
||||
PipelineValue::Int64(x) => Ok(VrlValue::Integer(x)),
|
||||
PipelineValue::Uint8(x) => Ok(VrlValue::Integer(x as i64)),
|
||||
PipelineValue::Uint16(x) => Ok(VrlValue::Integer(x as i64)),
|
||||
PipelineValue::Uint32(x) => Ok(VrlValue::Integer(x as i64)),
|
||||
PipelineValue::Uint64(x) => Ok(VrlValue::Integer(x as i64)),
|
||||
PipelineValue::Float32(x) => NotNan::new(x as f64)
|
||||
.map_err(|_| FloatNaNSnafu { input_float: x }.build())
|
||||
.map(VrlValue::Float),
|
||||
PipelineValue::Float64(x) => NotNan::new(x)
|
||||
.map_err(|_| FloatNaNSnafu { input_float: x }.build())
|
||||
.map(VrlValue::Float),
|
||||
PipelineValue::Boolean(x) => Ok(VrlValue::Boolean(x)),
|
||||
PipelineValue::String(x) => Ok(VrlValue::Bytes(Bytes::copy_from_slice(x.as_bytes()))),
|
||||
PipelineValue::Timestamp(x) => x
|
||||
.to_datetime()
|
||||
.context(InvalidTimestampSnafu {
|
||||
input: x.to_string(),
|
||||
})
|
||||
.map(VrlValue::Timestamp),
|
||||
PipelineValue::Array(array) => Ok(VrlValue::Array(
|
||||
array
|
||||
.into_iter()
|
||||
.map(pipeline_value_to_vrl_value)
|
||||
.collect::<Result<Vec<_>>>()?,
|
||||
)),
|
||||
PipelineValue::Map(m) => {
|
||||
let values = m
|
||||
.values
|
||||
.into_iter()
|
||||
.map(|(k, v)| pipeline_value_to_vrl_value(v).map(|v| (KeyString::from(k), v)))
|
||||
.collect::<Result<BTreeMap<_, _>>>()?;
|
||||
Ok(VrlValue::Object(values))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn vrl_value_to_pipeline_value(v: VrlValue) -> Result<PipelineValue> {
|
||||
match v {
|
||||
VrlValue::Bytes(bytes) => String::from_utf8(bytes.to_vec())
|
||||
.context(BytesToUtf8Snafu)
|
||||
.map(PipelineValue::String),
|
||||
VrlValue::Regex(_) => VrlRegexValueSnafu.fail(),
|
||||
VrlValue::Integer(x) => Ok(PipelineValue::Int64(x)),
|
||||
VrlValue::Float(not_nan) => Ok(PipelineValue::Float64(not_nan.into_inner())),
|
||||
VrlValue::Boolean(b) => Ok(PipelineValue::Boolean(b)),
|
||||
VrlValue::Timestamp(date_time) => crate::etl::value::Timestamp::from_datetime(date_time)
|
||||
.context(InvalidTimestampSnafu {
|
||||
input: date_time.to_string(),
|
||||
})
|
||||
.map(PipelineValue::Timestamp),
|
||||
VrlValue::Object(bm) => {
|
||||
let b = bm
|
||||
.into_iter()
|
||||
.map(|(k, v)| vrl_value_to_pipeline_value(v).map(|v| (k.to_string(), v)))
|
||||
.collect::<Result<BTreeMap<String, PipelineValue>>>()?;
|
||||
Ok(PipelineValue::Map(b.into()))
|
||||
}
|
||||
VrlValue::Array(values) => {
|
||||
let a = values
|
||||
.into_iter()
|
||||
.map(vrl_value_to_pipeline_value)
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
Ok(PipelineValue::Array(a.into()))
|
||||
}
|
||||
VrlValue::Null => Ok(PipelineValue::Null),
|
||||
}
|
||||
}
|
||||
|
||||
fn check_regex_output(output_kind: &Kind) -> Result<()> {
|
||||
if output_kind.is_regex() {
|
||||
return VrlRegexValueSnafu.fail();
|
||||
}
|
||||
|
||||
if let Some(arr) = output_kind.as_array() {
|
||||
let k = arr.known();
|
||||
for v in k.values() {
|
||||
check_regex_output(v)?
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(obj) = output_kind.as_object() {
|
||||
let k = obj.known();
|
||||
for v in k.values() {
|
||||
check_regex_output(v)?
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use super::*;
|
||||
use crate::etl::value::Timestamp;
|
||||
use crate::Map;
|
||||
|
||||
#[test]
|
||||
fn test_vrl() {
|
||||
let source = r#"
|
||||
.name.a = .user_info.name
|
||||
.name.b = .user_info.name
|
||||
del(.user_info)
|
||||
.timestamp = now()
|
||||
.
|
||||
"#;
|
||||
|
||||
let v = VrlProcessor::new(source.to_string());
|
||||
assert!(v.is_ok());
|
||||
let v = v.unwrap();
|
||||
|
||||
let mut n = PipelineMap::new();
|
||||
n.insert(
|
||||
"name".to_string(),
|
||||
PipelineValue::String("certain_name".to_string()),
|
||||
);
|
||||
|
||||
let mut m = PipelineMap::new();
|
||||
m.insert(
|
||||
"user_info".to_string(),
|
||||
PipelineValue::Map(Map { values: n }),
|
||||
);
|
||||
|
||||
let re = v.resolve(m);
|
||||
assert!(re.is_ok());
|
||||
let re = re.unwrap();
|
||||
|
||||
assert!(matches!(re, PipelineValue::Map(_)));
|
||||
assert!(re.get("name").is_some());
|
||||
let name = re.get("name").unwrap();
|
||||
assert!(matches!(name.get("a").unwrap(), PipelineValue::String(x) if x == "certain_name"));
|
||||
assert!(matches!(name.get("b").unwrap(), PipelineValue::String(x) if x == "certain_name"));
|
||||
assert!(re.get("timestamp").is_some());
|
||||
let timestamp = re.get("timestamp").unwrap();
|
||||
assert!(matches!(
|
||||
timestamp,
|
||||
PipelineValue::Timestamp(Timestamp::Nanosecond(_))
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_yaml_to_vrl() {
|
||||
let yaml = r#"
|
||||
processors:
|
||||
- vrl:
|
||||
source: |
|
||||
.name.a = .user_info.name
|
||||
.name.b = .user_info.name
|
||||
del(.user_info)
|
||||
.timestamp = now()
|
||||
.
|
||||
"#;
|
||||
let y = yaml_rust::YamlLoader::load_from_str(yaml).unwrap();
|
||||
let vrl_processor_yaml = y
|
||||
.first()
|
||||
.and_then(|x| x.as_hash())
|
||||
.and_then(|x| x.get(&yaml_rust::Yaml::String("processors".to_string())))
|
||||
.and_then(|x| x.as_vec())
|
||||
.and_then(|x| x.first())
|
||||
.and_then(|x| x.as_hash())
|
||||
.and_then(|x| x.get(&yaml_rust::Yaml::String("vrl".to_string())))
|
||||
.and_then(|x| x.as_hash())
|
||||
.unwrap();
|
||||
|
||||
let vrl = VrlProcessor::try_from(vrl_processor_yaml);
|
||||
assert!(vrl.is_ok());
|
||||
let vrl = vrl.unwrap();
|
||||
|
||||
assert_eq!(vrl.source, ".name.a = .user_info.name\n.name.b = .user_info.name\ndel(.user_info)\n.timestamp = now()\n.\n");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_regex() {
|
||||
let source = r#"
|
||||
.re = r'(?i)^Hello, World!$'
|
||||
del(.re)
|
||||
.re = r'(?i)^Hello, World!$'
|
||||
.
|
||||
"#;
|
||||
|
||||
let v = VrlProcessor::new(source.to_string());
|
||||
assert!(v.is_err());
|
||||
}
|
||||
}
|
||||
@@ -88,9 +88,10 @@ impl TryFrom<&Vec<yaml_rust::Yaml>> for Transforms {
|
||||
type Error = Error;
|
||||
|
||||
fn try_from(docs: &Vec<yaml_rust::Yaml>) -> Result<Self> {
|
||||
let mut transforms = Vec::with_capacity(100);
|
||||
let mut all_output_keys: Vec<String> = Vec::with_capacity(100);
|
||||
let mut all_required_keys = Vec::with_capacity(100);
|
||||
let mut transforms = Vec::with_capacity(32);
|
||||
let mut all_output_keys: Vec<String> = Vec::with_capacity(32);
|
||||
let mut all_required_keys = Vec::with_capacity(32);
|
||||
|
||||
for doc in docs {
|
||||
let transform_builder: Transform = doc
|
||||
.as_hash()
|
||||
@@ -123,15 +124,10 @@ impl TryFrom<&Vec<yaml_rust::Yaml>> for Transforms {
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Transform {
|
||||
pub fields: Fields,
|
||||
|
||||
pub type_: Value,
|
||||
|
||||
pub default: Option<Value>,
|
||||
|
||||
pub index: Option<Index>,
|
||||
|
||||
pub tag: bool,
|
||||
|
||||
pub on_failure: Option<OnFailure>,
|
||||
}
|
||||
|
||||
|
||||
@@ -35,12 +35,13 @@ use crate::error::{
|
||||
TransformColumnNameMustBeUniqueSnafu, TransformMultipleTimestampIndexSnafu,
|
||||
TransformTimestampIndexCountSnafu, UnsupportedNumberTypeSnafu,
|
||||
};
|
||||
use crate::etl::ctx_req::ContextOpt;
|
||||
use crate::etl::field::{Field, Fields};
|
||||
use crate::etl::transform::index::Index;
|
||||
use crate::etl::transform::{Transform, Transforms};
|
||||
use crate::etl::value::{Timestamp, Value};
|
||||
use crate::etl::PipelineMap;
|
||||
use crate::{from_pipeline_map_to_opt, PipelineContext};
|
||||
use crate::PipelineContext;
|
||||
|
||||
const DEFAULT_GREPTIME_TIMESTAMP_COLUMN: &str = "greptime_timestamp";
|
||||
const DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING: usize = 10;
|
||||
@@ -185,8 +186,8 @@ impl GreptimeTransformer {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn transform_mut(&self, pipeline_map: &mut PipelineMap) -> Result<(String, Row)> {
|
||||
let opt = from_pipeline_map_to_opt(pipeline_map);
|
||||
pub fn transform_mut(&self, pipeline_map: &mut PipelineMap) -> Result<(ContextOpt, Row)> {
|
||||
let opt = ContextOpt::from_pipeline_map_to_opt(pipeline_map);
|
||||
|
||||
let mut values = vec![GreptimeValue { value_data: None }; self.schema.len()];
|
||||
let mut output_index = 0;
|
||||
@@ -519,7 +520,7 @@ fn resolve_value(
|
||||
fn identity_pipeline_inner(
|
||||
pipeline_maps: Vec<PipelineMap>,
|
||||
pipeline_ctx: &PipelineContext<'_>,
|
||||
) -> Result<(SchemaInfo, HashMap<String, Vec<Row>>)> {
|
||||
) -> Result<(SchemaInfo, HashMap<ContextOpt, Vec<Row>>)> {
|
||||
let mut schema_info = SchemaInfo::default();
|
||||
let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
|
||||
|
||||
@@ -544,7 +545,7 @@ fn identity_pipeline_inner(
|
||||
let len = pipeline_maps.len();
|
||||
|
||||
for mut pipeline_map in pipeline_maps {
|
||||
let opt = from_pipeline_map_to_opt(&mut pipeline_map);
|
||||
let opt = ContextOpt::from_pipeline_map_to_opt(&mut pipeline_map);
|
||||
let row = values_to_row(&mut schema_info, pipeline_map, pipeline_ctx)?;
|
||||
|
||||
opt_map
|
||||
@@ -578,7 +579,7 @@ pub fn identity_pipeline(
|
||||
array: Vec<PipelineMap>,
|
||||
table: Option<Arc<table::Table>>,
|
||||
pipeline_ctx: &PipelineContext<'_>,
|
||||
) -> Result<HashMap<String, Rows>> {
|
||||
) -> Result<HashMap<ContextOpt, Rows>> {
|
||||
let input = if pipeline_ctx.pipeline_param.flatten_json_object() {
|
||||
array
|
||||
.into_iter()
|
||||
@@ -609,7 +610,7 @@ pub fn identity_pipeline(
|
||||
},
|
||||
)
|
||||
})
|
||||
.collect::<HashMap<String, Rows>>()
|
||||
.collect::<HashMap<ContextOpt, Rows>>()
|
||||
})
|
||||
}
|
||||
|
||||
@@ -761,7 +762,7 @@ mod tests {
|
||||
assert!(rows.is_ok());
|
||||
let mut rows = rows.unwrap();
|
||||
assert!(rows.len() == 1);
|
||||
let rows = rows.remove("").unwrap();
|
||||
let rows = rows.remove(&ContextOpt::default()).unwrap();
|
||||
assert_eq!(rows.schema.len(), 8);
|
||||
assert_eq!(rows.rows.len(), 2);
|
||||
assert_eq!(8, rows.rows[0].values.len());
|
||||
@@ -799,7 +800,7 @@ mod tests {
|
||||
}
|
||||
|
||||
assert!(rows.len() == 1);
|
||||
let rows = rows.remove("").unwrap();
|
||||
let rows = rows.remove(&ContextOpt::default()).unwrap();
|
||||
|
||||
Rows {
|
||||
schema: schema.schema,
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use common_time::timestamp::TimeUnit;
|
||||
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
@@ -104,6 +105,19 @@ impl Timestamp {
|
||||
Timestamp::Second(_) => TimeUnit::Second,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn to_datetime(&self) -> Option<DateTime<Utc>> {
|
||||
match self {
|
||||
Timestamp::Nanosecond(v) => Some(DateTime::from_timestamp_nanos(*v)),
|
||||
Timestamp::Microsecond(v) => DateTime::from_timestamp_micros(*v),
|
||||
Timestamp::Millisecond(v) => DateTime::from_timestamp_millis(*v),
|
||||
Timestamp::Second(v) => DateTime::from_timestamp(*v, 0),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_datetime(dt: DateTime<Utc>) -> Option<Self> {
|
||||
dt.timestamp_nanos_opt().map(Timestamp::Nanosecond)
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for Timestamp {
|
||||
|
||||
@@ -19,7 +19,7 @@ mod manager;
|
||||
mod metrics;
|
||||
mod tablesuffix;
|
||||
|
||||
pub use etl::ctx_req::{from_pipeline_map_to_opt, ContextReq};
|
||||
pub use etl::ctx_req::{ContextOpt, ContextReq};
|
||||
pub use etl::processor::Processor;
|
||||
pub use etl::transform::transformer::greptime::{GreptimePipelineParams, SchemaInfo};
|
||||
pub use etl::transform::transformer::identity_pipeline;
|
||||
|
||||
@@ -29,9 +29,9 @@ pub fn parse_and_exec(input_str: &str, pipeline_yaml: &str) -> Rows {
|
||||
match input_value {
|
||||
serde_json::Value::Array(array) => {
|
||||
for value in array {
|
||||
let mut intermediate_status = json_to_map(value).unwrap();
|
||||
let intermediate_status = json_to_map(value).unwrap();
|
||||
let row = pipeline
|
||||
.exec_mut(&mut intermediate_status)
|
||||
.exec_mut(intermediate_status)
|
||||
.expect("failed to exec pipeline")
|
||||
.into_transformed()
|
||||
.expect("expect transformed result ");
|
||||
@@ -39,9 +39,9 @@ pub fn parse_and_exec(input_str: &str, pipeline_yaml: &str) -> Rows {
|
||||
}
|
||||
}
|
||||
serde_json::Value::Object(_) => {
|
||||
let mut intermediate_status = json_to_map(input_value).unwrap();
|
||||
let intermediate_status = json_to_map(input_value).unwrap();
|
||||
let row = pipeline
|
||||
.exec_mut(&mut intermediate_status)
|
||||
.exec_mut(intermediate_status)
|
||||
.expect("failed to exec pipeline")
|
||||
.into_transformed()
|
||||
.expect("expect transformed result ");
|
||||
|
||||
@@ -274,9 +274,9 @@ transform:
|
||||
let yaml_content = pipeline::Content::Yaml(pipeline_yaml);
|
||||
let pipeline: pipeline::Pipeline =
|
||||
pipeline::parse(&yaml_content).expect("failed to parse pipeline");
|
||||
let mut result = json_to_map(input_value).unwrap();
|
||||
let result = json_to_map(input_value).unwrap();
|
||||
|
||||
let row = pipeline.exec_mut(&mut result);
|
||||
let row = pipeline.exec_mut(result);
|
||||
|
||||
assert!(row.is_err());
|
||||
assert_eq!(row.err().unwrap().to_string(), "No matching pattern found");
|
||||
|
||||
@@ -419,10 +419,10 @@ transform:
|
||||
|
||||
let yaml_content = Content::Yaml(pipeline_yaml);
|
||||
let pipeline: Pipeline = parse(&yaml_content).expect("failed to parse pipeline");
|
||||
let mut stats = json_to_map(input_value).unwrap();
|
||||
let stats = json_to_map(input_value).unwrap();
|
||||
|
||||
let row = pipeline
|
||||
.exec_mut(&mut stats)
|
||||
.exec_mut(stats)
|
||||
.expect("failed to exec pipeline")
|
||||
.into_transformed()
|
||||
.expect("expect transformed result ");
|
||||
@@ -488,9 +488,9 @@ transform:
|
||||
let yaml_content = Content::Yaml(pipeline_yaml);
|
||||
let pipeline: Pipeline = parse(&yaml_content).unwrap();
|
||||
|
||||
let mut status = json_to_map(input_value).unwrap();
|
||||
let status = json_to_map(input_value).unwrap();
|
||||
let row = pipeline
|
||||
.exec_mut(&mut status)
|
||||
.exec_mut(status)
|
||||
.unwrap()
|
||||
.into_transformed()
|
||||
.expect("expect transformed result ");
|
||||
@@ -597,9 +597,9 @@ transform:
|
||||
let yaml_content = Content::Yaml(pipeline_yaml);
|
||||
let pipeline: Pipeline = parse(&yaml_content).unwrap();
|
||||
|
||||
let mut status = json_to_map(input_value).unwrap();
|
||||
let status = json_to_map(input_value).unwrap();
|
||||
let row = pipeline
|
||||
.exec_mut(&mut status)
|
||||
.exec_mut(status)
|
||||
.unwrap()
|
||||
.into_transformed()
|
||||
.expect("expect transformed result ");
|
||||
@@ -663,9 +663,9 @@ transform:
|
||||
let yaml_content = Content::Yaml(pipeline_yaml);
|
||||
let pipeline: Pipeline = parse(&yaml_content).unwrap();
|
||||
|
||||
let mut status = json_to_map(input_value).unwrap();
|
||||
let status = json_to_map(input_value).unwrap();
|
||||
let row = pipeline
|
||||
.exec_mut(&mut status)
|
||||
.exec_mut(status)
|
||||
.unwrap()
|
||||
.into_transformed()
|
||||
.expect("expect transformed result ");
|
||||
@@ -703,10 +703,9 @@ transform:
|
||||
let yaml_content = Content::Yaml(pipeline_yaml);
|
||||
let pipeline: Pipeline = parse(&yaml_content).unwrap();
|
||||
|
||||
let mut status = json_to_map(input_value).unwrap();
|
||||
|
||||
let status = json_to_map(input_value).unwrap();
|
||||
let row = pipeline
|
||||
.exec_mut(&mut status)
|
||||
.exec_mut(status)
|
||||
.unwrap()
|
||||
.into_transformed()
|
||||
.expect("expect transformed result ");
|
||||
@@ -763,9 +762,9 @@ transform:
|
||||
let yaml_content = Content::Yaml(pipeline_yaml);
|
||||
let pipeline: Pipeline = parse(&yaml_content).unwrap();
|
||||
|
||||
let mut status = json_to_map(input_value).unwrap();
|
||||
let status = json_to_map(input_value).unwrap();
|
||||
let row = pipeline
|
||||
.exec_mut(&mut status)
|
||||
.exec_mut(status)
|
||||
.unwrap()
|
||||
.into_transformed()
|
||||
.expect("expect transformed result ");
|
||||
@@ -804,9 +803,9 @@ transform:
|
||||
let yaml_content = Content::Yaml(pipeline_yaml);
|
||||
let pipeline: Pipeline = parse(&yaml_content).unwrap();
|
||||
|
||||
let mut status = json_to_map(input_value).unwrap();
|
||||
let status = json_to_map(input_value).unwrap();
|
||||
let row = pipeline
|
||||
.exec_mut(&mut status)
|
||||
.exec_mut(status)
|
||||
.unwrap()
|
||||
.into_transformed()
|
||||
.expect("expect transformed result ");
|
||||
@@ -866,18 +865,18 @@ transform:
|
||||
let yaml_content = Content::Yaml(pipeline_yaml);
|
||||
let pipeline: Pipeline = parse(&yaml_content).unwrap();
|
||||
|
||||
let mut status = json_to_map(input_value1).unwrap();
|
||||
let status = json_to_map(input_value1).unwrap();
|
||||
let dispatched_to = pipeline
|
||||
.exec_mut(&mut status)
|
||||
.exec_mut(status)
|
||||
.unwrap()
|
||||
.into_dispatched()
|
||||
.expect("expect dispatched result ");
|
||||
assert_eq!(dispatched_to.table_suffix, "http");
|
||||
assert_eq!(dispatched_to.pipeline.unwrap(), "access_log_pipeline");
|
||||
|
||||
let mut status = json_to_map(input_value2).unwrap();
|
||||
let status = json_to_map(input_value2).unwrap();
|
||||
let row = pipeline
|
||||
.exec_mut(&mut status)
|
||||
.exec_mut(status)
|
||||
.unwrap()
|
||||
.into_transformed()
|
||||
.expect("expect transformed result ");
|
||||
@@ -930,8 +929,8 @@ table_suffix: _${logger}
|
||||
let yaml_content = Content::Yaml(pipeline_yaml);
|
||||
let pipeline: Pipeline = parse(&yaml_content).unwrap();
|
||||
|
||||
let mut status = json_to_map(input_value).unwrap();
|
||||
let exec_re = pipeline.exec_mut(&mut status).unwrap();
|
||||
let status = json_to_map(input_value).unwrap();
|
||||
let exec_re = pipeline.exec_mut(status).unwrap();
|
||||
|
||||
let (row, table_name) = exec_re.into_transformed().unwrap();
|
||||
let values = row.values;
|
||||
|
||||
@@ -569,6 +569,7 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Convert SQL value error"))]
|
||||
ConvertSqlValue {
|
||||
source: datatypes::error::Error,
|
||||
|
||||
@@ -120,9 +120,9 @@ async fn run_custom_pipeline(
|
||||
let mut auto_map = HashMap::new();
|
||||
let mut auto_map_ts_keys = HashMap::new();
|
||||
|
||||
for mut pipeline_map in pipeline_maps {
|
||||
for pipeline_map in pipeline_maps {
|
||||
let r = pipeline
|
||||
.exec_mut(&mut pipeline_map)
|
||||
.exec_mut(pipeline_map)
|
||||
.inspect_err(|_| {
|
||||
METRIC_HTTP_LOGS_TRANSFORM_ELAPSED
|
||||
.with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE])
|
||||
@@ -135,6 +135,7 @@ async fn run_custom_pipeline(
|
||||
opt,
|
||||
row,
|
||||
table_suffix,
|
||||
pipeline_map: _val,
|
||||
}) => {
|
||||
let act_table_name = table_suffix_to_table_name(&table_name, table_suffix);
|
||||
push_to_map!(transformed_map, (opt, act_table_name), row, arr_len);
|
||||
@@ -142,6 +143,7 @@ async fn run_custom_pipeline(
|
||||
PipelineExecOutput::AutoTransform(AutoTransformOutput {
|
||||
table_suffix,
|
||||
ts_unit_map,
|
||||
pipeline_map,
|
||||
}) => {
|
||||
let act_table_name = table_suffix_to_table_name(&table_name, table_suffix);
|
||||
push_to_map!(auto_map, act_table_name.clone(), pipeline_map, arr_len);
|
||||
@@ -150,8 +152,8 @@ async fn run_custom_pipeline(
|
||||
.or_insert_with(HashMap::new)
|
||||
.extend(ts_unit_map);
|
||||
}
|
||||
PipelineExecOutput::DispatchedTo(dispatched_to) => {
|
||||
push_to_map!(dispatched, dispatched_to, pipeline_map, arr_len);
|
||||
PipelineExecOutput::DispatchedTo(dispatched_to, val) => {
|
||||
push_to_map!(dispatched, dispatched_to, val, arr_len);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -391,13 +391,16 @@ fn parse_string_options(parser: &mut Parser) -> std::result::Result<(String, Str
|
||||
parser.expect_token(&Token::Eq)?;
|
||||
let value = if parser.parse_keyword(Keyword::NULL) {
|
||||
"".to_string()
|
||||
} else if let Ok(v) = parser.parse_literal_string() {
|
||||
v
|
||||
} else {
|
||||
return Err(ParserError::ParserError(format!(
|
||||
"Unexpected option value for alter table statements, expect string literal or NULL, got: `{}`",
|
||||
parser.next_token()
|
||||
)));
|
||||
let next_token = parser.peek_token();
|
||||
if let Token::Number(number_as_string, _) = next_token.token {
|
||||
parser.advance_token();
|
||||
number_as_string
|
||||
} else {
|
||||
parser.parse_literal_string().map_err(|_|{
|
||||
ParserError::ParserError(format!("Unexpected option value for alter table statements, expect string literal, numeric literal or NULL, got: `{}`", next_token))
|
||||
})?
|
||||
}
|
||||
};
|
||||
Ok((name, value))
|
||||
}
|
||||
@@ -1088,4 +1091,38 @@ mod tests {
|
||||
)
|
||||
.unwrap_err();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_alter_with_numeric_value() {
|
||||
for sql in [
|
||||
"ALTER TABLE test SET 'compaction.twcs.trigger_file_num'=8;",
|
||||
"ALTER TABLE test SET 'compaction.twcs.trigger_file_num'='8';",
|
||||
] {
|
||||
let mut result = ParserContext::create_with_dialect(
|
||||
sql,
|
||||
&GreptimeDbDialect {},
|
||||
ParseOptions::default(),
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(1, result.len());
|
||||
|
||||
let statement = result.remove(0);
|
||||
assert_matches!(statement, Statement::AlterTable { .. });
|
||||
match statement {
|
||||
Statement::AlterTable(alter_table) => {
|
||||
let alter_operation = alter_table.alter_operation();
|
||||
assert_matches!(alter_operation, AlterTableOperation::SetTableOptions { .. });
|
||||
match alter_operation {
|
||||
AlterTableOperation::SetTableOptions { options } => {
|
||||
assert_eq!(options.len(), 1);
|
||||
assert_eq!(options[0].key, "compaction.twcs.trigger_file_num");
|
||||
assert_eq!(options[0].value, "8");
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -211,6 +211,12 @@ impl ColumnExtensions {
|
||||
}
|
||||
}
|
||||
|
||||
/// Partition on columns or values.
|
||||
///
|
||||
/// - `column_list` is the list of columns in `PARTITION ON COLUMNS` clause.
|
||||
/// - `exprs` is the list of expressions in `PARTITION ON VALUES` clause, like
|
||||
/// `host <= 'host1'`, `host > 'host1' and host <= 'host2'` or `host > 'host2'`.
|
||||
/// Each expression stands for a partition.
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
|
||||
pub struct Partitions {
|
||||
pub column_list: Vec<Ident>,
|
||||
|
||||
@@ -19,6 +19,7 @@ use std::fmt;
|
||||
use std::str::FromStr;
|
||||
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_datasource::object_store::oss::is_supported_in_oss;
|
||||
use common_datasource::object_store::s3::is_supported_in_s3;
|
||||
use common_query::AddColumnLocation;
|
||||
use common_time::range::TimestampRange;
|
||||
@@ -70,6 +71,10 @@ pub fn validate_table_option(key: &str) -> bool {
|
||||
return true;
|
||||
}
|
||||
|
||||
if is_supported_in_oss(key) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if is_mito_engine_option_key(key) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -857,7 +857,7 @@ CREATE TABLE {table_name} (
|
||||
.expect("physical table route"),
|
||||
)
|
||||
.iter()
|
||||
.map(|(k, v)| (v[0], *k))
|
||||
.map(|(k, v)| (v.leader_regions[0], *k))
|
||||
.collect::<HashMap<u32, u64>>();
|
||||
assert!(region_to_dn_map.len() <= instance.datanodes().len());
|
||||
|
||||
|
||||
@@ -224,7 +224,7 @@ mod tests {
|
||||
.expect("region routes should be physical"),
|
||||
)
|
||||
.iter()
|
||||
.map(|(k, v)| (v[0], *k))
|
||||
.map(|(k, v)| (v.leader_regions[0], *k))
|
||||
.collect::<HashMap<u32, u64>>();
|
||||
assert!(region_to_dn_map.len() <= instance.datanodes().len());
|
||||
|
||||
|
||||
@@ -1458,9 +1458,12 @@ async fn test_insert_with_default_value_for_type(instance: Arc<Instance>, type_n
|
||||
.data;
|
||||
assert!(matches!(output, OutputData::AffectedRows(1)));
|
||||
|
||||
let output = execute_sql(&instance, &format!("select host, cpu from {table_name}"))
|
||||
.await
|
||||
.data;
|
||||
let output = execute_sql(
|
||||
&instance,
|
||||
&format!("select host, cpu from {table_name} order by host"),
|
||||
)
|
||||
.await
|
||||
.data;
|
||||
let expected = "\
|
||||
+-------+-----+
|
||||
| host | cpu |
|
||||
@@ -1757,7 +1760,12 @@ async fn test_execute_copy_from_orc_with_cast(instance: Arc<dyn MockInstance>) {
|
||||
|
||||
assert!(matches!(output, OutputData::AffectedRows(5)));
|
||||
|
||||
let output = execute_sql(&instance, "select * from demo;").await.data;
|
||||
let output = execute_sql(
|
||||
&instance,
|
||||
"select * from demo order by timestamp_simple asc;",
|
||||
)
|
||||
.await
|
||||
.data;
|
||||
let expected = r#"+-------------------------------+----------------------------+-------------------------+----------------------------+
|
||||
| bigint_direct | bigint_neg_direct | bigint_other | timestamp_simple |
|
||||
+-------------------------------+----------------------------+-------------------------+----------------------------+
|
||||
|
||||
@@ -520,7 +520,7 @@ async fn insert_and_assert(db: &Database) {
|
||||
|
||||
// select
|
||||
let output = db
|
||||
.sql("SELECT host, cpu, memory, ts FROM demo")
|
||||
.sql("SELECT host, cpu, memory, ts FROM demo order by host")
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
||||
@@ -105,6 +105,8 @@ macro_rules! http_tests {
|
||||
test_pipeline_dispatcher,
|
||||
test_pipeline_suffix_template,
|
||||
test_pipeline_context,
|
||||
test_pipeline_with_vrl,
|
||||
test_pipeline_with_hint_vrl,
|
||||
|
||||
test_otlp_metrics,
|
||||
test_otlp_traces_v0,
|
||||
@@ -2066,7 +2068,8 @@ table_suffix: _${type}
|
||||
"type": "http",
|
||||
"time": "2024-05-25 20:16:37.217",
|
||||
"log": "ClusterAdapter:enter sendTextDataToCluster\\n",
|
||||
"greptime_ttl": "1d"
|
||||
"greptime_ttl": "1d",
|
||||
"greptime_skip_wal": "true"
|
||||
},
|
||||
{
|
||||
"id1": "2436",
|
||||
@@ -2116,12 +2119,13 @@ table_suffix: _${type}
|
||||
// CREATE TABLE IF NOT EXISTS "d_table_http" (
|
||||
// ... ignore
|
||||
// )
|
||||
// ENGINE=mito
|
||||
// ENGINE=mito
|
||||
// WITH(
|
||||
// append_mode = 'true',
|
||||
// skip_wal = 'true',
|
||||
// ttl = '1day'
|
||||
// )
|
||||
let expected = "[[\"d_table_http\",\"CREATE TABLE IF NOT EXISTS \\\"d_table_http\\\" (\\n \\\"id1_root\\\" INT NULL,\\n \\\"id2_root\\\" INT NULL,\\n \\\"type\\\" STRING NULL,\\n \\\"log\\\" STRING NULL,\\n \\\"logger\\\" STRING NULL,\\n \\\"time\\\" TIMESTAMP(9) NOT NULL,\\n TIME INDEX (\\\"time\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true',\\n ttl = '1day'\\n)\"]]";
|
||||
let expected = "[[\"d_table_http\",\"CREATE TABLE IF NOT EXISTS \\\"d_table_http\\\" (\\n \\\"id1_root\\\" INT NULL,\\n \\\"id2_root\\\" INT NULL,\\n \\\"type\\\" STRING NULL,\\n \\\"log\\\" STRING NULL,\\n \\\"logger\\\" STRING NULL,\\n \\\"time\\\" TIMESTAMP(9) NOT NULL,\\n TIME INDEX (\\\"time\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true',\\n skip_wal = 'true',\\n ttl = '1day'\\n)\"]]";
|
||||
validate_data(
|
||||
"test_pipeline_context_http",
|
||||
&client,
|
||||
@@ -2133,6 +2137,141 @@ table_suffix: _${type}
|
||||
guard.remove_all().await;
|
||||
}
|
||||
|
||||
pub async fn test_pipeline_with_vrl(storage_type: StorageType) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let (app, mut guard) =
|
||||
setup_test_http_app_with_frontend(storage_type, "test_pipeline_with_vrl").await;
|
||||
|
||||
// handshake
|
||||
let client = TestClient::new(app).await;
|
||||
|
||||
let pipeline = r#"
|
||||
processors:
|
||||
- date:
|
||||
field: time
|
||||
formats:
|
||||
- "%Y-%m-%d %H:%M:%S%.3f"
|
||||
ignore_missing: true
|
||||
- vrl:
|
||||
source: |
|
||||
.log_id = .id
|
||||
del(.id)
|
||||
.
|
||||
|
||||
transform:
|
||||
- fields:
|
||||
- log_id
|
||||
type: int32
|
||||
- field: time
|
||||
type: time
|
||||
index: timestamp
|
||||
"#;
|
||||
|
||||
// 1. create pipeline
|
||||
let res = client
|
||||
.post("/v1/events/pipelines/root")
|
||||
.header("Content-Type", "application/x-yaml")
|
||||
.body(pipeline)
|
||||
.send()
|
||||
.await;
|
||||
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
|
||||
// 2. write data
|
||||
let data_body = r#"
|
||||
[
|
||||
{
|
||||
"id": "2436",
|
||||
"time": "2024-05-25 20:16:37.217"
|
||||
}
|
||||
]
|
||||
"#;
|
||||
let res = client
|
||||
.post("/v1/events/logs?db=public&table=d_table&pipeline_name=root")
|
||||
.header("Content-Type", "application/json")
|
||||
.body(data_body)
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
|
||||
validate_data(
|
||||
"test_pipeline_with_vrl",
|
||||
&client,
|
||||
"select * from d_table",
|
||||
"[[2436,1716668197217000000]]",
|
||||
)
|
||||
.await;
|
||||
|
||||
guard.remove_all().await;
|
||||
}
|
||||
|
||||
pub async fn test_pipeline_with_hint_vrl(storage_type: StorageType) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let (app, mut guard) =
|
||||
setup_test_http_app_with_frontend(storage_type, "test_pipeline_with_hint_vrl").await;
|
||||
|
||||
// handshake
|
||||
let client = TestClient::new(app).await;
|
||||
|
||||
let pipeline = r#"
|
||||
processors:
|
||||
- date:
|
||||
field: time
|
||||
formats:
|
||||
- "%Y-%m-%d %H:%M:%S%.3f"
|
||||
ignore_missing: true
|
||||
- vrl:
|
||||
source: |
|
||||
.greptime_table_suffix, err = "_" + .id
|
||||
.
|
||||
|
||||
transform:
|
||||
- fields:
|
||||
- id
|
||||
type: int32
|
||||
- field: time
|
||||
type: time
|
||||
index: timestamp
|
||||
"#;
|
||||
|
||||
// 1. create pipeline
|
||||
let res = client
|
||||
.post("/v1/events/pipelines/root")
|
||||
.header("Content-Type", "application/x-yaml")
|
||||
.body(pipeline)
|
||||
.send()
|
||||
.await;
|
||||
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
|
||||
// 2. write data
|
||||
let data_body = r#"
|
||||
[
|
||||
{
|
||||
"id": "2436",
|
||||
"time": "2024-05-25 20:16:37.217"
|
||||
}
|
||||
]
|
||||
"#;
|
||||
let res = client
|
||||
.post("/v1/events/logs?db=public&table=d_table&pipeline_name=root")
|
||||
.header("Content-Type", "application/json")
|
||||
.body(data_body)
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
|
||||
validate_data(
|
||||
"test_pipeline_with_hint_vrl",
|
||||
&client,
|
||||
"show tables",
|
||||
"[[\"d_table_2436\"],[\"demo\"],[\"numbers\"]]",
|
||||
)
|
||||
.await;
|
||||
|
||||
guard.remove_all().await;
|
||||
}
|
||||
|
||||
pub async fn test_identity_pipeline_with_flatten(store_type: StorageType) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let (app, mut guard) =
|
||||
|
||||
@@ -16,7 +16,7 @@ use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use client::{OutputData, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use common_meta::key::{RegionDistribution, TableMetadataManagerRef};
|
||||
use common_meta::key::{RegionDistribution, RegionRoleSet, TableMetadataManagerRef};
|
||||
use common_meta::peer::Peer;
|
||||
use common_query::Output;
|
||||
use common_recordbatch::RecordBatches;
|
||||
@@ -166,7 +166,7 @@ pub async fn test_region_migration(store_type: StorageType, endpoints: Vec<Strin
|
||||
to_regions
|
||||
);
|
||||
|
||||
let region_id = RegionId::new(table_id, from_regions[0]);
|
||||
let region_id = RegionId::new(table_id, from_regions.leader_regions[0]);
|
||||
// Trigger region migration.
|
||||
let procedure = region_migration_manager
|
||||
.submit_procedure(RegionMigrationProcedureTask::new(
|
||||
@@ -180,7 +180,12 @@ pub async fn test_region_migration(store_type: StorageType, endpoints: Vec<Strin
|
||||
info!("Started region procedure: {}!", procedure.unwrap());
|
||||
|
||||
// Prepares expected region distribution.
|
||||
to_regions.extend(from_regions);
|
||||
to_regions
|
||||
.leader_regions
|
||||
.extend(from_regions.leader_regions);
|
||||
to_regions
|
||||
.follower_regions
|
||||
.extend(from_regions.follower_regions);
|
||||
// Keeps asc order.
|
||||
to_regions.sort();
|
||||
distribution.insert(to_peer_id, to_regions);
|
||||
@@ -300,10 +305,10 @@ pub async fn test_metric_table_region_migration_by_sql(
|
||||
let (from_peer_id, from_regions) = distribution.pop_first().unwrap();
|
||||
info!(
|
||||
"Selecting from peer: {from_peer_id}, and regions: {:?}",
|
||||
from_regions[0]
|
||||
from_regions.leader_regions[0]
|
||||
);
|
||||
let to_peer_id = (from_peer_id + 1) % 3;
|
||||
let region_id = RegionId::new(table_id, from_regions[0]);
|
||||
let region_id = RegionId::new(table_id, from_regions.leader_regions[0]);
|
||||
// Trigger region migration.
|
||||
let procedure_id =
|
||||
trigger_migration_by_sql(&cluster, region_id.as_u64(), from_peer_id, to_peer_id).await;
|
||||
@@ -436,7 +441,7 @@ pub async fn test_region_migration_by_sql(store_type: StorageType, endpoints: Ve
|
||||
to_regions
|
||||
);
|
||||
|
||||
let region_id = RegionId::new(table_id, from_regions[0]);
|
||||
let region_id = RegionId::new(table_id, from_regions.leader_regions[0]);
|
||||
// Trigger region migration.
|
||||
let procedure_id =
|
||||
trigger_migration_by_sql(&cluster, region_id.as_u64(), from_peer_id, to_peer_id).await;
|
||||
@@ -558,12 +563,12 @@ pub async fn test_region_migration_multiple_regions(
|
||||
let (peer_2, peer_2_regions) = distribution.pop_first().unwrap();
|
||||
|
||||
// Picks the peer only contains as from peer.
|
||||
let ((from_peer_id, from_regions), (to_peer_id, mut to_regions)) = if peer_1_regions.len() == 1
|
||||
{
|
||||
((peer_1, peer_1_regions), (peer_2, peer_2_regions))
|
||||
} else {
|
||||
((peer_2, peer_2_regions), (peer_1, peer_1_regions))
|
||||
};
|
||||
let ((from_peer_id, from_regions), (to_peer_id, mut to_regions)) =
|
||||
if peer_1_regions.leader_regions.len() == 1 {
|
||||
((peer_1, peer_1_regions), (peer_2, peer_2_regions))
|
||||
} else {
|
||||
((peer_2, peer_2_regions), (peer_1, peer_1_regions))
|
||||
};
|
||||
|
||||
info!(
|
||||
"Selecting from peer: {from_peer_id}, and regions: {:?}",
|
||||
@@ -574,7 +579,7 @@ pub async fn test_region_migration_multiple_regions(
|
||||
to_regions
|
||||
);
|
||||
|
||||
let region_id = RegionId::new(table_id, from_regions[0]);
|
||||
let region_id = RegionId::new(table_id, from_regions.leader_regions[0]);
|
||||
// Trigger region migration.
|
||||
let procedure = region_migration_manager
|
||||
.submit_procedure(RegionMigrationProcedureTask::new(
|
||||
@@ -588,7 +593,12 @@ pub async fn test_region_migration_multiple_regions(
|
||||
info!("Started region procedure: {}!", procedure.unwrap());
|
||||
|
||||
// Prepares expected region distribution.
|
||||
to_regions.extend(from_regions);
|
||||
to_regions
|
||||
.leader_regions
|
||||
.extend(from_regions.leader_regions);
|
||||
to_regions
|
||||
.follower_regions
|
||||
.extend(from_regions.follower_regions);
|
||||
// Keeps asc order.
|
||||
to_regions.sort();
|
||||
distribution.insert(to_peer_id, to_regions);
|
||||
@@ -699,7 +709,7 @@ pub async fn test_region_migration_all_regions(store_type: StorageType, endpoint
|
||||
let region_migration_manager = cluster.metasrv.region_migration_manager();
|
||||
let (from_peer_id, mut from_regions) = distribution.pop_first().unwrap();
|
||||
let to_peer_id = 1;
|
||||
let mut to_regions = Vec::new();
|
||||
let mut to_regions = RegionRoleSet::default();
|
||||
info!(
|
||||
"Selecting from peer: {from_peer_id}, and regions: {:?}",
|
||||
from_regions
|
||||
@@ -709,7 +719,7 @@ pub async fn test_region_migration_all_regions(store_type: StorageType, endpoint
|
||||
to_regions
|
||||
);
|
||||
|
||||
let region_id = RegionId::new(table_id, from_regions[0]);
|
||||
let region_id = RegionId::new(table_id, from_regions.leader_regions[0]);
|
||||
// Trigger region migration.
|
||||
let procedure = region_migration_manager
|
||||
.submit_procedure(RegionMigrationProcedureTask::new(
|
||||
@@ -723,7 +733,9 @@ pub async fn test_region_migration_all_regions(store_type: StorageType, endpoint
|
||||
info!("Started region procedure: {}!", procedure.unwrap());
|
||||
|
||||
// Prepares expected region distribution.
|
||||
to_regions.push(from_regions.remove(0));
|
||||
to_regions
|
||||
.leader_regions
|
||||
.push(from_regions.leader_regions.remove(0));
|
||||
// Keeps asc order.
|
||||
to_regions.sort();
|
||||
distribution.insert(to_peer_id, to_regions);
|
||||
@@ -1120,7 +1132,7 @@ async fn find_region_distribution_by_sql(
|
||||
distribution
|
||||
.entry(datanode_id)
|
||||
.or_default()
|
||||
.push(region_id.region_number());
|
||||
.add_leader_region(region_id.region_number());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -145,7 +145,7 @@ INSERT INTO t1 (ts, val, host) VALUES
|
||||
|
||||
Affected Rows: 6
|
||||
|
||||
SELECT * FROM t1;
|
||||
SELECT * FROM t1 ORDER BY ts ASC;
|
||||
|
||||
+-------------+---------------------+------+
|
||||
| host | ts | val |
|
||||
@@ -159,7 +159,7 @@ ALTER TABLE t1 ADD COLUMN k STRING PRIMARY KEY;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
SELECT * FROM t1;
|
||||
SELECT * FROM t1 ORDER BY ts ASC;
|
||||
|
||||
+-------------+---+---------------------+------+
|
||||
| host | k | ts | val |
|
||||
|
||||
@@ -52,11 +52,11 @@ INSERT INTO t1 (ts, val, host) VALUES
|
||||
('2022-01-02 00:00:00', 4.56, 'example.com'),
|
||||
('2022-01-03 00:00:00', 7.89, 'example.com');
|
||||
|
||||
SELECT * FROM t1;
|
||||
SELECT * FROM t1 ORDER BY ts ASC;
|
||||
|
||||
ALTER TABLE t1 ADD COLUMN k STRING PRIMARY KEY;
|
||||
|
||||
SELECT * FROM t1;
|
||||
SELECT * FROM t1 ORDER BY ts ASC;
|
||||
|
||||
DROP TABLE t1;
|
||||
|
||||
|
||||
@@ -76,7 +76,7 @@ insert into foo (host) values ('host3');
|
||||
|
||||
Affected Rows: 1
|
||||
|
||||
select * from foo;
|
||||
select * from foo order by ts;
|
||||
|
||||
+-------+---------------------+-----+
|
||||
| host | ts | cpu |
|
||||
@@ -141,7 +141,7 @@ SELECT * FROM system_metrics;
|
||||
| host2 | idc_a | 80.0 | 70.3 | 90.0 | 2022-11-03T03:39:57.450 |
|
||||
+-------+-------+----------+-------------+-----------+-------------------------+
|
||||
|
||||
select * from foo;
|
||||
select * from foo order by host asc;
|
||||
|
||||
+-------+---------------------+-----+
|
||||
| host | ts | cpu |
|
||||
@@ -151,12 +151,12 @@ select * from foo;
|
||||
| host3 | 2023-04-29T00:00:00 | 0.0 |
|
||||
+-------+---------------------+-----+
|
||||
|
||||
SELECT * from t1;
|
||||
SELECT * from t1 order by ts desc;
|
||||
|
||||
++
|
||||
++
|
||||
|
||||
SELECT * from t2;
|
||||
SELECT * from t2 order by ts desc;
|
||||
|
||||
+------+-------------------------+-----+
|
||||
| job | ts | val |
|
||||
|
||||
@@ -37,7 +37,7 @@ insert into foo (host, cpu) values ('host2', 2.2);
|
||||
|
||||
insert into foo (host) values ('host3');
|
||||
|
||||
select * from foo;
|
||||
select * from foo order by ts;
|
||||
|
||||
CREATE TABLE phy (ts timestamp time index, val double) engine=metric with ("physical_metric_table" = "");
|
||||
|
||||
@@ -63,11 +63,11 @@ INSERT INTO t2 VALUES ('job1', 0, 0), ('job2', 1, 1);
|
||||
|
||||
SELECT * FROM system_metrics;
|
||||
|
||||
select * from foo;
|
||||
select * from foo order by host asc;
|
||||
|
||||
SELECT * from t1;
|
||||
SELECT * from t1 order by ts desc;
|
||||
|
||||
SELECT * from t2;
|
||||
SELECT * from t2 order by ts desc;
|
||||
|
||||
DROP TABLE t1;
|
||||
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user