Compare commits

...

4 Commits

Author SHA1 Message Date
Zhenchi
01081ef97b refactor: unify function registry (Part 2)
Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2025-06-11 08:21:11 +00:00
localhost
01796c9cc0 chore: org cli sub command (#6265)
* chore: org cli sub command

* chore: make clippy happy

* chore: fix info command not support absolute path

* chore: fix cli test

* Apply suggestions from code review

Co-authored-by: Weny Xu <wenymedia@gmail.com>

* chore: reorganizing the cli tool

* chore: fix limit issue

* chore: add some doc for cli

* chore: format code

---------

Co-authored-by: Weny Xu <wenymedia@gmail.com>
2025-06-11 03:34:56 +00:00
liyang
9469a8f8f2 ci: add signature information when updating downstream repository (#6282)
Signed-off-by: liyang <daviderli614@gmail.com>
2025-06-10 17:18:29 +00:00
Ruihang Xia
2fabe346a1 fix: null value handling on PromQL's join (#6289)
* fix: null value handling on PromQL's join

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update sqlness comment

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-06-10 13:24:09 +00:00
22 changed files with 341 additions and 99 deletions

View File

@@ -30,7 +30,7 @@ update_helm_charts_version() {
# Commit the changes.
git add .
git commit -m "chore: Update GreptimeDB version to ${VERSION}"
git commit -s -m "chore: Update GreptimeDB version to ${VERSION}"
git push origin $BRANCH_NAME
# Create a Pull Request.

View File

@@ -26,7 +26,7 @@ update_homebrew_greptime_version() {
# Commit the changes.
git add .
git commit -m "chore: Update GreptimeDB version to ${VERSION}"
git commit -s -m "chore: Update GreptimeDB version to ${VERSION}"
git push origin $BRANCH_NAME
# Create a Pull Request.

49
Cargo.lock generated
View File

@@ -2336,6 +2336,7 @@ dependencies = [
"num-traits",
"once_cell",
"paste",
"promql",
"s2",
"serde",
"serde_json",
@@ -3252,7 +3253,7 @@ dependencies = [
[[package]]
name = "datafusion"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
dependencies = [
"arrow 54.2.1",
"arrow-array 54.2.1",
@@ -3303,7 +3304,7 @@ dependencies = [
[[package]]
name = "datafusion-catalog"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
dependencies = [
"arrow 54.2.1",
"async-trait",
@@ -3323,7 +3324,7 @@ dependencies = [
[[package]]
name = "datafusion-catalog-listing"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
dependencies = [
"arrow 54.2.1",
"arrow-schema 54.3.1",
@@ -3346,7 +3347,7 @@ dependencies = [
[[package]]
name = "datafusion-common"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
dependencies = [
"ahash 0.8.11",
"arrow 54.2.1",
@@ -3371,7 +3372,7 @@ dependencies = [
[[package]]
name = "datafusion-common-runtime"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
dependencies = [
"log",
"tokio",
@@ -3380,12 +3381,12 @@ dependencies = [
[[package]]
name = "datafusion-doc"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
[[package]]
name = "datafusion-execution"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
dependencies = [
"arrow 54.2.1",
"dashmap",
@@ -3403,7 +3404,7 @@ dependencies = [
[[package]]
name = "datafusion-expr"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
dependencies = [
"arrow 54.2.1",
"chrono",
@@ -3423,7 +3424,7 @@ dependencies = [
[[package]]
name = "datafusion-expr-common"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
dependencies = [
"arrow 54.2.1",
"datafusion-common",
@@ -3434,7 +3435,7 @@ dependencies = [
[[package]]
name = "datafusion-functions"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
dependencies = [
"arrow 54.2.1",
"arrow-buffer 54.3.1",
@@ -3463,7 +3464,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-aggregate"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
dependencies = [
"ahash 0.8.11",
"arrow 54.2.1",
@@ -3484,7 +3485,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-aggregate-common"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
dependencies = [
"ahash 0.8.11",
"arrow 54.2.1",
@@ -3496,7 +3497,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-nested"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
dependencies = [
"arrow 54.2.1",
"arrow-array 54.2.1",
@@ -3518,7 +3519,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-table"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
dependencies = [
"arrow 54.2.1",
"async-trait",
@@ -3533,7 +3534,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-window"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
dependencies = [
"datafusion-common",
"datafusion-doc",
@@ -3549,7 +3550,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-window-common"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
dependencies = [
"datafusion-common",
"datafusion-physical-expr-common",
@@ -3558,7 +3559,7 @@ dependencies = [
[[package]]
name = "datafusion-macros"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
dependencies = [
"datafusion-expr",
"quote",
@@ -3568,7 +3569,7 @@ dependencies = [
[[package]]
name = "datafusion-optimizer"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
dependencies = [
"arrow 54.2.1",
"chrono",
@@ -3586,7 +3587,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-expr"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
dependencies = [
"ahash 0.8.11",
"arrow 54.2.1",
@@ -3609,7 +3610,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-expr-common"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
dependencies = [
"ahash 0.8.11",
"arrow 54.2.1",
@@ -3622,7 +3623,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-optimizer"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
dependencies = [
"arrow 54.2.1",
"arrow-schema 54.3.1",
@@ -3643,7 +3644,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-plan"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
dependencies = [
"ahash 0.8.11",
"arrow 54.2.1",
@@ -3673,7 +3674,7 @@ dependencies = [
[[package]]
name = "datafusion-sql"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
dependencies = [
"arrow 54.2.1",
"arrow-array 54.2.1",
@@ -3691,7 +3692,7 @@ dependencies = [
[[package]]
name = "datafusion-substrait"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
dependencies = [
"async-recursion",
"async-trait",

View File

@@ -116,15 +116,15 @@ clap = { version = "4.4", features = ["derive"] }
config = "0.13.0"
crossbeam-utils = "0.8"
dashmap = "6.1"
datafusion = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
datafusion-common = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
datafusion-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
datafusion-functions = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
datafusion-optimizer = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
datafusion-physical-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
datafusion-physical-plan = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
datafusion = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
datafusion-common = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
datafusion-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
datafusion-functions = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
datafusion-optimizer = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
datafusion-physical-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
datafusion-physical-plan = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
deadpool = "0.12"
deadpool-postgres = "0.14"
derive_builder = "0.20"

View File

@@ -58,6 +58,7 @@ where
info!("{desc}, average operation cost: {cost:.2} ms");
}
/// Command to benchmark table metadata operations.
#[derive(Debug, Default, Parser)]
pub struct BenchTableMetadataCommand {
#[clap(long)]

View File

@@ -244,6 +244,18 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Unsupported memory backend"))]
UnsupportedMemoryBackend {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("File path invalid: {}", msg))]
InvalidFilePath {
msg: String,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -262,6 +274,8 @@ impl ErrorExt for Error {
| Error::ConnectEtcd { .. }
| Error::CreateDir { .. }
| Error::EmptyResult { .. }
| Error::InvalidFilePath { .. }
| Error::UnsupportedMemoryBackend { .. }
| Error::ParseProxyOpts { .. } => StatusCode::InvalidArguments,
Error::StartProcedureManager { source, .. }

View File

@@ -50,6 +50,7 @@ enum ExportTarget {
All,
}
/// Command for exporting data from the GreptimeDB.
#[derive(Debug, Default, Parser)]
pub struct ExportCommand {
/// Server address to connect

View File

@@ -40,6 +40,7 @@ enum ImportTarget {
All,
}
/// Command to import data from a directory into a GreptimeDB instance.
#[derive(Debug, Default, Parser)]
pub struct ImportCommand {
/// Server address to connect

View File

@@ -20,7 +20,7 @@ mod import;
mod meta_snapshot;
use async_trait::async_trait;
use clap::Parser;
use clap::{Parser, Subcommand};
use common_error::ext::BoxedError;
pub use database::DatabaseClient;
use error::Result;
@@ -28,7 +28,7 @@ use error::Result;
pub use crate::bench::BenchTableMetadataCommand;
pub use crate::export::ExportCommand;
pub use crate::import::ImportCommand;
pub use crate::meta_snapshot::{MetaRestoreCommand, MetaSnapshotCommand};
pub use crate::meta_snapshot::{MetaCommand, MetaInfoCommand, MetaRestoreCommand, MetaSaveCommand};
#[async_trait]
pub trait Tool: Send + Sync {
@@ -51,3 +51,19 @@ impl AttachCommand {
unimplemented!("Wait for https://github.com/GreptimeTeam/greptimedb/issues/2373")
}
}
/// Subcommand for data operations like export and import.
#[derive(Subcommand)]
pub enum DataCommand {
Export(ExportCommand),
Import(ImportCommand),
}
impl DataCommand {
pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
match self {
DataCommand::Export(cmd) => cmd.build().await,
DataCommand::Import(cmd) => cmd.build().await,
}
}
}

View File

@@ -12,10 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::path::Path;
use std::sync::Arc;
use async_trait::async_trait;
use clap::Parser;
use clap::{Parser, Subcommand};
use common_base::secrets::{ExposeSecret, SecretString};
use common_error::ext::BoxedError;
use common_meta::kv_backend::chroot::ChrootKvBackend;
@@ -26,10 +27,50 @@ use meta_srv::bootstrap::create_etcd_client;
use meta_srv::metasrv::BackendImpl;
use object_store::services::{Fs, S3};
use object_store::ObjectStore;
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};
use crate::error::{KvBackendNotSetSnafu, OpenDalSnafu, S3ConfigNotSetSnafu};
use crate::error::{
InvalidFilePathSnafu, KvBackendNotSetSnafu, OpenDalSnafu, S3ConfigNotSetSnafu,
UnsupportedMemoryBackendSnafu,
};
use crate::Tool;
/// Subcommand for metadata snapshot management.
#[derive(Subcommand)]
pub enum MetaCommand {
#[clap(subcommand)]
Snapshot(MetaSnapshotCommand),
}
impl MetaCommand {
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
match self {
MetaCommand::Snapshot(cmd) => cmd.build().await,
}
}
}
/// Subcommand for metadata snapshot operations. such as save, restore and info.
#[derive(Subcommand)]
pub enum MetaSnapshotCommand {
/// Export metadata snapshot tool.
Save(MetaSaveCommand),
/// Restore metadata snapshot tool.
Restore(MetaRestoreCommand),
/// Explore metadata from metadata snapshot.
Info(MetaInfoCommand),
}
impl MetaSnapshotCommand {
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
match self {
MetaSnapshotCommand::Save(cmd) => cmd.build().await,
MetaSnapshotCommand::Restore(cmd) => cmd.build().await,
MetaSnapshotCommand::Info(cmd) => cmd.build().await,
}
}
}
#[derive(Debug, Default, Parser)]
struct MetaConnection {
/// The endpoint of store. one of etcd, pg or mysql.
@@ -91,6 +132,9 @@ impl MetaConnection {
.await
.map_err(BoxedError::new)?)
}
Some(BackendImpl::MemoryStore) => UnsupportedMemoryBackendSnafu
.fail()
.map_err(BoxedError::new),
_ => KvBackendNotSetSnafu { backend: "all" }
.fail()
.map_err(BoxedError::new),
@@ -170,7 +214,7 @@ impl S3Config {
/// It will dump the metadata snapshot to local file or s3 bucket.
/// The snapshot file will be in binary format.
#[derive(Debug, Default, Parser)]
pub struct MetaSnapshotCommand {
pub struct MetaSaveCommand {
/// The connection to the metadata store.
#[clap(flatten)]
connection: MetaConnection,
@@ -196,7 +240,7 @@ fn create_local_file_object_store(root: &str) -> Result<ObjectStore, BoxedError>
Ok(object_store)
}
impl MetaSnapshotCommand {
impl MetaSaveCommand {
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
let kvbackend = self.connection.build().await?;
let output_dir = &self.output_dir;
@@ -327,3 +371,89 @@ impl Tool for MetaRestoreTool {
}
}
}
/// Explore metadata from metadata snapshot.
#[derive(Debug, Default, Parser)]
pub struct MetaInfoCommand {
/// The s3 config.
#[clap(flatten)]
s3_config: S3Config,
/// The name of the target snapshot file. we will add the file extension automatically.
#[clap(long, default_value = "metadata_snapshot")]
file_name: String,
/// The query string to filter the metadata.
#[clap(long, default_value = "*")]
inspect_key: String,
/// The limit of the metadata to query.
#[clap(long)]
limit: Option<usize>,
}
pub struct MetaInfoTool {
inner: ObjectStore,
source_file: String,
inspect_key: String,
limit: Option<usize>,
}
#[async_trait]
impl Tool for MetaInfoTool {
async fn do_work(&self) -> std::result::Result<(), BoxedError> {
let result = MetadataSnapshotManager::info(
&self.inner,
&self.source_file,
&self.inspect_key,
self.limit,
)
.await
.map_err(BoxedError::new)?;
for item in result {
println!("{}", item);
}
Ok(())
}
}
impl MetaInfoCommand {
fn decide_object_store_root_for_local_store(
file_path: &str,
) -> Result<(&str, &str), BoxedError> {
let path = Path::new(file_path);
let parent = path
.parent()
.and_then(|p| p.to_str())
.context(InvalidFilePathSnafu { msg: file_path })
.map_err(BoxedError::new)?;
let file_name = path
.file_name()
.and_then(|f| f.to_str())
.context(InvalidFilePathSnafu { msg: file_path })
.map_err(BoxedError::new)?;
let root = if parent.is_empty() { "." } else { parent };
Ok((root, file_name))
}
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
let object_store = self.s3_config.build("").map_err(BoxedError::new)?;
if let Some(store) = object_store {
let tool = MetaInfoTool {
inner: store,
source_file: self.file_name.clone(),
inspect_key: self.inspect_key.clone(),
limit: self.limit,
};
Ok(Box::new(tool))
} else {
let (root, file_name) =
Self::decide_object_store_root_for_local_store(&self.file_name)?;
let object_store = create_local_file_object_store(root)?;
let tool = MetaInfoTool {
inner: object_store,
source_file: file_name.to_string(),
inspect_key: self.inspect_key.clone(),
limit: self.limit,
};
Ok(Box::new(tool))
}
}
}

View File

@@ -146,6 +146,7 @@ mod tests {
let output_dir = tempfile::tempdir().unwrap();
let cli = cli::Command::parse_from([
"cli",
"data",
"export",
"--addr",
"127.0.0.1:4000",

View File

@@ -47,6 +47,7 @@ num = "0.4"
num-traits = "0.2"
once_cell.workspace = true
paste.workspace = true
promql.workspace = true
s2 = { version = "0.0.12", optional = true }
serde.workspace = true
serde_json.workspace = true

View File

@@ -39,7 +39,7 @@ use crate::system::SystemFunction;
#[derive(Default)]
pub struct FunctionRegistry {
functions: RwLock<HashMap<String, ScalarFunctionFactory>>,
scalar_functions: RwLock<HashMap<String, ScalarFunctionFactory>>,
async_functions: RwLock<HashMap<String, AsyncFunctionRef>>,
aggregate_functions: RwLock<HashMap<String, AggregateUDF>>,
}
@@ -48,7 +48,7 @@ impl FunctionRegistry {
pub fn register(&self, func: impl Into<ScalarFunctionFactory>) {
let func = func.into();
let _ = self
.functions
.scalar_functions
.write()
.unwrap()
.insert(func.name().to_string(), func);
@@ -87,13 +87,17 @@ impl FunctionRegistry {
.collect()
}
#[cfg(test)]
pub fn get_function(&self, name: &str) -> Option<ScalarFunctionFactory> {
self.functions.read().unwrap().get(name).cloned()
pub fn get_scalar_function(&self, name: &str) -> Option<ScalarFunctionFactory> {
self.scalar_functions.read().unwrap().get(name).cloned()
}
pub fn scalar_functions(&self) -> Vec<ScalarFunctionFactory> {
self.functions.read().unwrap().values().cloned().collect()
self.scalar_functions
.read()
.unwrap()
.values()
.cloned()
.collect()
}
pub fn aggregate_functions(&self) -> Vec<AggregateUDF> {
@@ -144,6 +148,11 @@ pub static FUNCTION_REGISTRY: Lazy<Arc<FunctionRegistry>> = Lazy::new(|| {
// Approximate functions
ApproximateFunction::register(&function_registry);
// PromQL aggregate functions
for aggr in promql::functions::aggr_funcs() {
function_registry.register_aggr(aggr);
}
Arc::new(function_registry)
});
@@ -156,10 +165,10 @@ mod tests {
fn test_function_registry() {
let registry = FunctionRegistry::default();
assert!(registry.get_function("test_and").is_none());
assert!(registry.get_scalar_function("test_and").is_none());
assert!(registry.scalar_functions().is_empty());
registry.register_scalar(TestAndFunction);
let _ = registry.get_function("test_and").unwrap();
let _ = registry.get_scalar_function("test_and").unwrap();
assert_eq!(1, registry.scalar_functions().len());
}
}

View File

@@ -14,6 +14,7 @@
pub mod file;
use std::borrow::Cow;
use std::fmt::{Display, Formatter};
use std::path::{Path, PathBuf};
use std::time::Instant;
@@ -271,6 +272,49 @@ impl MetadataSnapshotManager {
Ok((filename.to_string(), num_keyvalues as u64))
}
fn format_output(key: Cow<'_, str>, value: Cow<'_, str>) -> String {
format!("{} => {}", key, value)
}
pub async fn info(
object_store: &ObjectStore,
file_path: &str,
query_str: &str,
limit: Option<usize>,
) -> Result<Vec<String>> {
let path = Path::new(file_path);
let file_name = path
.file_name()
.and_then(|s| s.to_str())
.context(InvalidFilePathSnafu { file_path })?;
let filename = FileName::try_from(file_name)?;
let data = object_store
.read(file_path)
.await
.context(ReadObjectSnafu { file_path })?;
let document = Document::from_slice(&filename.extension.format, &data.to_bytes())?;
let metadata_content = document.into_metadata_content()?.values();
let mut results = Vec::with_capacity(limit.unwrap_or(256));
for kv in metadata_content {
let key_str = String::from_utf8_lossy(&kv.key);
if let Some(prefix) = query_str.strip_suffix('*') {
if key_str.starts_with(prefix) {
let value_str = String::from_utf8_lossy(&kv.value);
results.push(Self::format_output(key_str, value_str));
}
} else if key_str == query_str {
let value_str = String::from_utf8_lossy(&kv.value);
results.push(Self::format_output(key_str, value_str));
}
if results.len() == limit.unwrap_or(usize::MAX) {
break;
}
}
Ok(results)
}
}
#[cfg(test)]

View File

@@ -111,6 +111,11 @@ impl MetadataContent {
pub fn into_iter(self) -> impl Iterator<Item = KeyValue> {
self.values.into_iter()
}
/// Returns the key-value pairs as a vector.
pub fn values(self) -> Vec<KeyValue> {
self.values
}
}
/// The key-value pair of the backup file.

View File

@@ -13,20 +13,17 @@
// limitations under the License.
use clap::Parser;
use cli::{
BenchTableMetadataCommand, ExportCommand, ImportCommand, MetaRestoreCommand,
MetaSnapshotCommand, Tool,
};
use cli::{BenchTableMetadataCommand, DataCommand, MetaCommand, Tool};
use common_error::ext::BoxedError;
#[derive(Parser)]
pub enum SubCommand {
// Attach(AttachCommand),
Bench(BenchTableMetadataCommand),
Export(ExportCommand),
Import(ImportCommand),
MetaSnapshot(MetaSnapshotCommand),
MetaRestore(MetaRestoreCommand),
#[clap(subcommand)]
Data(DataCommand),
#[clap(subcommand)]
Meta(MetaCommand),
}
impl SubCommand {
@@ -34,10 +31,8 @@ impl SubCommand {
match self {
// SubCommand::Attach(cmd) => cmd.build().await,
SubCommand::Bench(cmd) => cmd.build().await,
SubCommand::Export(cmd) => cmd.build().await,
SubCommand::Import(cmd) => cmd.build().await,
SubCommand::MetaSnapshot(cmd) => cmd.build().await,
SubCommand::MetaRestore(cmd) => cmd.build().await,
SubCommand::Data(cmd) => cmd.build().await,
SubCommand::Meta(cmd) => cmd.build().await,
}
}
}

View File

@@ -34,6 +34,7 @@ pub use changes::Changes;
use datafusion::arrow::array::{ArrayRef, Float64Array, TimestampMillisecondArray};
use datafusion::error::DataFusionError;
use datafusion::physical_plan::ColumnarValue;
use datafusion_expr::{AggregateUDF, ScalarUDF};
pub use deriv::Deriv;
pub use extrapolate_rate::{Delta, Increase, Rate};
pub use holt_winters::HoltWinters;
@@ -44,6 +45,39 @@ pub use quantile_aggr::{quantile_udaf, QUANTILE_NAME};
pub use resets::Resets;
pub use round::Round;
/// Range functions for PromQL.
pub fn range_funcs() -> Vec<ScalarUDF> {
vec![
IDelta::<false>::scalar_udf(),
IDelta::<true>::scalar_udf(),
Rate::scalar_udf(),
Increase::scalar_udf(),
Delta::scalar_udf(),
Resets::scalar_udf(),
Changes::scalar_udf(),
Deriv::scalar_udf(),
Round::scalar_udf(),
AvgOverTime::scalar_udf(),
MinOverTime::scalar_udf(),
MaxOverTime::scalar_udf(),
SumOverTime::scalar_udf(),
CountOverTime::scalar_udf(),
LastOverTime::scalar_udf(),
AbsentOverTime::scalar_udf(),
PresentOverTime::scalar_udf(),
StddevOverTime::scalar_udf(),
StdvarOverTime::scalar_udf(),
QuantileOverTime::scalar_udf(),
PredictLinear::scalar_udf(),
HoltWinters::scalar_udf(),
]
}
/// Aggregate functions for PromQL.
pub fn aggr_funcs() -> Vec<AggregateUDF> {
vec![quantile_udaf()]
}
/// Extracts an array from a `ColumnarValue`.
///
/// If the `ColumnarValue` is a scalar, it converts it to an array of size 1.

View File

@@ -40,8 +40,8 @@ pub struct QuantileAccumulator {
/// Create a quantile `AggregateUDF` for PromQL quantile operator,
/// which calculates φ-quantile (0 ≤ φ ≤ 1) over dimensions
pub fn quantile_udaf() -> Arc<AggregateUDF> {
Arc::new(create_udaf(
pub fn quantile_udaf() -> AggregateUDF {
create_udaf(
QUANTILE_NAME,
// Input type: (φ, values)
vec![DataType::Float64, DataType::Float64],
@@ -63,7 +63,7 @@ pub fn quantile_udaf() -> Arc<AggregateUDF> {
)]
.into(),
)]),
))
)
}
impl QuantileAccumulator {

View File

@@ -1948,7 +1948,7 @@ impl PromPlanner {
token::T_QUANTILE => {
let q = Self::get_param_value_as_f64(op, param)?;
non_col_args.push(lit(q));
quantile_udaf()
Arc::new(quantile_udaf())
}
token::T_AVG => avg_udaf(),
token::T_COUNT_VALUES | token::T_COUNT => count_udaf(),
@@ -2444,7 +2444,7 @@ impl PromPlanner {
LogicalPlanBuilder::from(left)
.alias(left_table_ref)
.context(DataFusionPlanningSnafu)?
.join(
.join_detailed(
right,
JoinType::Inner,
(
@@ -2458,6 +2458,7 @@ impl PromPlanner {
.collect::<Vec<_>>(),
),
None,
true,
)
.context(DataFusionPlanningSnafu)?
.build()

View File

@@ -28,11 +28,6 @@ use datafusion::execution::{FunctionRegistry, SessionStateBuilder};
use datafusion::logical_expr::LogicalPlan;
use datafusion_expr::UserDefinedLogicalNode;
use greptime_proto::substrait_extension::MergeScan as PbMergeScan;
use promql::functions::{
quantile_udaf, AbsentOverTime, AvgOverTime, Changes, CountOverTime, Delta, Deriv, IDelta,
Increase, LastOverTime, MaxOverTime, MinOverTime, PresentOverTime, Rate, Resets, Round,
StddevOverTime, StdvarOverTime, SumOverTime,
};
use prost::Message;
use session::context::QueryContextRef;
use snafu::ResultExt;
@@ -117,12 +112,15 @@ impl SubstraitPlanDecoder for DefaultPlanDecoder {
let mut session_state = SessionStateBuilder::new_from_existing(self.session_state.clone())
.with_catalog_list(catalog_list)
.build();
// Substrait decoder will look up the UDFs in SessionState, so we need to register them
// Note: the query context must be passed to set the timezone
// We MUST register the UDFs after we build the session state, otherwise the UDFs will be lost
// if they have the same name as the default UDFs or their alias.
// e.g. The default UDF `to_char()` has an alias `date_format()`, if we register a UDF with the name `date_format()`
// before we build the session state, the UDF will be lost.
// Scalar functions
for func in FUNCTION_REGISTRY.scalar_functions() {
let udf = func.provide(FunctionContext {
query_ctx: self.query_ctx.clone(),
@@ -133,6 +131,15 @@ impl SubstraitPlanDecoder for DefaultPlanDecoder {
.context(RegisterUdfSnafu { name: func.name() })?;
}
// PromQL range functions
for func in promql::functions::range_funcs() {
let name = func.name().to_string();
session_state
.register_udf(Arc::new(func))
.context(RegisterUdfSnafu { name })?;
}
// Aggregate functions
for func in FUNCTION_REGISTRY.aggregate_functions() {
let name = func.name().to_string();
session_state
@@ -140,29 +147,6 @@ impl SubstraitPlanDecoder for DefaultPlanDecoder {
.context(RegisterUdfSnafu { name })?;
}
let _ = session_state.register_udaf(quantile_udaf());
let _ = session_state.register_udf(Arc::new(IDelta::<false>::scalar_udf()));
let _ = session_state.register_udf(Arc::new(IDelta::<true>::scalar_udf()));
let _ = session_state.register_udf(Arc::new(Rate::scalar_udf()));
let _ = session_state.register_udf(Arc::new(Increase::scalar_udf()));
let _ = session_state.register_udf(Arc::new(Delta::scalar_udf()));
let _ = session_state.register_udf(Arc::new(Resets::scalar_udf()));
let _ = session_state.register_udf(Arc::new(Changes::scalar_udf()));
let _ = session_state.register_udf(Arc::new(Deriv::scalar_udf()));
let _ = session_state.register_udf(Arc::new(Round::scalar_udf()));
let _ = session_state.register_udf(Arc::new(AvgOverTime::scalar_udf()));
let _ = session_state.register_udf(Arc::new(MinOverTime::scalar_udf()));
let _ = session_state.register_udf(Arc::new(MaxOverTime::scalar_udf()));
let _ = session_state.register_udf(Arc::new(SumOverTime::scalar_udf()));
let _ = session_state.register_udf(Arc::new(CountOverTime::scalar_udf()));
let _ = session_state.register_udf(Arc::new(LastOverTime::scalar_udf()));
let _ = session_state.register_udf(Arc::new(AbsentOverTime::scalar_udf()));
let _ = session_state.register_udf(Arc::new(PresentOverTime::scalar_udf()));
let _ = session_state.register_udf(Arc::new(StddevOverTime::scalar_udf()));
let _ = session_state.register_udf(Arc::new(StdvarOverTime::scalar_udf()));
// TODO(ruihang): add quantile_over_time, predict_linear, holt_winters, round
let logical_plan = DFLogicalSubstraitConvertor
.decode(message, session_state)
.await

View File

@@ -675,11 +675,16 @@ insert into cache_miss_with_null_label values
Affected Rows: 4
-- SQLNESS SORT_RESULT 3 1
-- null!=null, so it will returns the empty set.
tql eval (3, 4, '1s') cache_hit_with_null_label / (cache_miss_with_null_label + cache_hit_with_null_label);
++
++
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+
| job | null_label | ts | lhs.greptime_value / rhs.cache_miss_with_null_label.greptime_value + cache_hit_with_null_label.greptime_value |
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+
| read | | 1970-01-01T00:00:03 | 0.5 |
| read | | 1970-01-01T00:00:04 | 0.75 |
| write | | 1970-01-01T00:00:03 | 0.5 |
| write | | 1970-01-01T00:00:04 | 0.6666666666666666 |
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') cache_hit_with_null_label / ignoring(null_label) (cache_miss_with_null_label + ignoring(null_label) cache_hit_with_null_label);

View File

@@ -325,7 +325,6 @@ insert into cache_miss_with_null_label values
(4000, "write", null, 2.0);
-- SQLNESS SORT_RESULT 3 1
-- null!=null, so it will returns the empty set.
tql eval (3, 4, '1s') cache_hit_with_null_label / (cache_miss_with_null_label + cache_hit_with_null_label);
-- SQLNESS SORT_RESULT 3 1