mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-06 05:12:54 +00:00
Compare commits
17 Commits
feat/geo-f
...
v0.10.0-ni
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f0e2d6e663 | ||
|
|
306bd25c64 | ||
|
|
ddafcc678c | ||
|
|
2564b5daee | ||
|
|
37dcf34bb9 | ||
|
|
8eda36bfe3 | ||
|
|
68b59e0e5e | ||
|
|
a37aeb2814 | ||
|
|
f641c562c2 | ||
|
|
9286e963e7 | ||
|
|
8ea4f67e4b | ||
|
|
5e4bac2633 | ||
|
|
d45b04180c | ||
|
|
8c8499ce53 | ||
|
|
79f40a762b | ||
|
|
b062d8515d | ||
|
|
9f9c1dab60 |
@@ -42,7 +42,7 @@ runs:
|
||||
- name: Install rust toolchain
|
||||
uses: actions-rust-lang/setup-rust-toolchain@v1
|
||||
with:
|
||||
targets: ${{ inputs.arch }}
|
||||
target: ${{ inputs.arch }}
|
||||
|
||||
- name: Start etcd # For integration tests.
|
||||
if: ${{ inputs.disable-run-tests == 'false' }}
|
||||
|
||||
@@ -27,7 +27,7 @@ runs:
|
||||
- name: Install rust toolchain
|
||||
uses: actions-rust-lang/setup-rust-toolchain@v1
|
||||
with:
|
||||
targets: ${{ inputs.arch }}
|
||||
target: ${{ inputs.arch }}
|
||||
components: llvm-tools-preview
|
||||
|
||||
- name: Rust Cache
|
||||
|
||||
2
.github/actions/start-runner/action.yml
vendored
2
.github/actions/start-runner/action.yml
vendored
@@ -38,7 +38,7 @@ runs:
|
||||
steps:
|
||||
- name: Configure AWS credentials
|
||||
if: startsWith(inputs.runner, 'ec2')
|
||||
uses: aws-actions/configure-aws-credentials@v2
|
||||
uses: aws-actions/configure-aws-credentials@v4
|
||||
with:
|
||||
aws-access-key-id: ${{ inputs.aws-access-key-id }}
|
||||
aws-secret-access-key: ${{ inputs.aws-secret-access-key }}
|
||||
|
||||
2
.github/actions/stop-runner/action.yml
vendored
2
.github/actions/stop-runner/action.yml
vendored
@@ -25,7 +25,7 @@ runs:
|
||||
steps:
|
||||
- name: Configure AWS credentials
|
||||
if: ${{ inputs.label && inputs.ec2-instance-id }}
|
||||
uses: aws-actions/configure-aws-credentials@v2
|
||||
uses: aws-actions/configure-aws-credentials@v4
|
||||
with:
|
||||
aws-access-key-id: ${{ inputs.aws-access-key-id }}
|
||||
aws-secret-access-key: ${{ inputs.aws-secret-access-key }}
|
||||
|
||||
19
.github/workflows/release.yml
vendored
19
.github/workflows/release.yml
vendored
@@ -33,6 +33,7 @@ on:
|
||||
description: The runner uses to build linux-arm64 artifacts
|
||||
default: ec2-c6g.4xlarge-arm64
|
||||
options:
|
||||
- ubuntu-2204-32-cores-arm
|
||||
- ec2-c6g.xlarge-arm64 # 4C8G
|
||||
- ec2-c6g.2xlarge-arm64 # 8C16G
|
||||
- ec2-c6g.4xlarge-arm64 # 16C32G
|
||||
@@ -98,16 +99,6 @@ permissions:
|
||||
contents: write # Allows the action to create a release.
|
||||
|
||||
jobs:
|
||||
check-builder-rust-version:
|
||||
name: Check rust version in builder
|
||||
runs-on: ubuntu-20.04
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- name: Check Rust toolchain version
|
||||
shell: bash
|
||||
run: |
|
||||
./scripts/check-builder-rust-version.sh
|
||||
|
||||
allocate-runners:
|
||||
name: Allocate runners
|
||||
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
|
||||
@@ -132,6 +123,11 @@ jobs:
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
- name: Check Rust toolchain version
|
||||
shell: bash
|
||||
run: |
|
||||
./scripts/check-builder-rust-version.sh
|
||||
|
||||
# The create-version will create a global variable named 'version' in the global workflows.
|
||||
# - If it's a tag push release, the version is the tag name(${{ github.ref_name }});
|
||||
# - If it's a scheduled release, the version is '${{ env.NEXT_RELEASE_VERSION }}-nightly-$buildTime', like v0.2.0-nigthly-20230313;
|
||||
@@ -256,7 +252,8 @@ jobs:
|
||||
cargo-profile: ${{ env.CARGO_PROFILE }}
|
||||
features: ${{ matrix.features }}
|
||||
version: ${{ needs.allocate-runners.outputs.version }}
|
||||
disable-run-tests: ${{ env.DISABLE_RUN_TESTS }}
|
||||
# We decide to disable the integration tests on macOS because it's unnecessary and time-consuming.
|
||||
disable-run-tests: true
|
||||
artifacts-dir: ${{ matrix.artifacts-dir-prefix }}-${{ needs.allocate-runners.outputs.version }}
|
||||
|
||||
- name: Set build macos result
|
||||
|
||||
52
Cargo.lock
generated
52
Cargo.lock
generated
@@ -1950,8 +1950,6 @@ dependencies = [
|
||||
"common-version",
|
||||
"datafusion",
|
||||
"datatypes",
|
||||
"geohash",
|
||||
"h3o",
|
||||
"num",
|
||||
"num-traits",
|
||||
"once_cell",
|
||||
@@ -3815,12 +3813,6 @@ dependencies = [
|
||||
"num-traits",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "float_eq"
|
||||
version = "1.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "28a80e3145d8ad11ba0995949bbcf48b9df2be62772b3d351ef017dff6ecb853"
|
||||
|
||||
[[package]]
|
||||
name = "flow"
|
||||
version = "0.9.2"
|
||||
@@ -4219,27 +4211,6 @@ dependencies = [
|
||||
"version_check",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "geo-types"
|
||||
version = "0.7.13"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9ff16065e5720f376fbced200a5ae0f47ace85fd70b7e54269790281353b6d61"
|
||||
dependencies = [
|
||||
"approx",
|
||||
"num-traits",
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "geohash"
|
||||
version = "0.13.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0fb94b1a65401d6cbf22958a9040aa364812c26674f841bee538b12c135db1e6"
|
||||
dependencies = [
|
||||
"geo-types",
|
||||
"libm",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "gethostname"
|
||||
version = "0.2.3"
|
||||
@@ -4330,25 +4301,6 @@ dependencies = [
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "h3o"
|
||||
version = "0.6.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0de3592e1f699692aa0525c42ff7879ec3ee7e36329af20967bc910a1cdc39c7"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"either",
|
||||
"float_eq",
|
||||
"h3o-bit",
|
||||
"libm",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "h3o-bit"
|
||||
version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6fb45e8060378c0353781abf67e1917b545a6b710d0342d85b70c125af7ef320"
|
||||
|
||||
[[package]]
|
||||
name = "half"
|
||||
version = "1.8.3"
|
||||
@@ -4765,7 +4717,7 @@ dependencies = [
|
||||
"httpdate",
|
||||
"itoa",
|
||||
"pin-project-lite",
|
||||
"socket2 0.5.7",
|
||||
"socket2 0.4.10",
|
||||
"tokio",
|
||||
"tower-service",
|
||||
"tracing",
|
||||
@@ -8560,7 +8512,7 @@ dependencies = [
|
||||
"indoc",
|
||||
"libc",
|
||||
"memoffset 0.9.1",
|
||||
"parking_lot 0.12.3",
|
||||
"parking_lot 0.11.2",
|
||||
"portable-atomic",
|
||||
"pyo3-build-config",
|
||||
"pyo3-ffi",
|
||||
|
||||
@@ -15,6 +15,8 @@
|
||||
| `mode` | String | `standalone` | The running mode of the datanode. It can be `standalone` or `distributed`. |
|
||||
| `enable_telemetry` | Bool | `true` | Enable telemetry to collect anonymous usage data. |
|
||||
| `default_timezone` | String | `None` | The default timezone of the server. |
|
||||
| `init_regions_in_background` | Bool | `false` | Initialize all regions in the background during the startup.<br/>By default, it provides services after all regions have been initialized. |
|
||||
| `init_regions_parallelism` | Integer | `16` | Parallelism of initializing regions. |
|
||||
| `runtime` | -- | -- | The runtime options. |
|
||||
| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
|
||||
| `runtime.compact_rt_size` | Integer | `4` | The number of threads to execute the runtime for global write operations. |
|
||||
|
||||
@@ -8,6 +8,13 @@ enable_telemetry = true
|
||||
## +toml2docs:none-default
|
||||
default_timezone = "UTC"
|
||||
|
||||
## Initialize all regions in the background during the startup.
|
||||
## By default, it provides services after all regions have been initialized.
|
||||
init_regions_in_background = false
|
||||
|
||||
## Parallelism of initializing regions.
|
||||
init_regions_parallelism = 16
|
||||
|
||||
## The runtime options.
|
||||
[runtime]
|
||||
## The number of threads to execute the runtime for global read operations.
|
||||
|
||||
@@ -313,7 +313,7 @@ struct SystemCatalog {
|
||||
catalog_cache: Cache<String, Arc<InformationSchemaProvider>>,
|
||||
pg_catalog_cache: Cache<String, Arc<PGCatalogProvider>>,
|
||||
|
||||
// system_schema_provier for default catalog
|
||||
// system_schema_provider for default catalog
|
||||
information_schema_provider: Arc<InformationSchemaProvider>,
|
||||
pg_catalog_provider: Arc<PGCatalogProvider>,
|
||||
backend: KvBackendRef,
|
||||
|
||||
@@ -21,6 +21,8 @@ mod export;
|
||||
mod helper;
|
||||
|
||||
// Wait for https://github.com/GreptimeTeam/greptimedb/issues/2373
|
||||
mod database;
|
||||
mod import;
|
||||
#[allow(unused)]
|
||||
mod repl;
|
||||
|
||||
@@ -32,6 +34,7 @@ pub use repl::Repl;
|
||||
use tracing_appender::non_blocking::WorkerGuard;
|
||||
|
||||
use self::export::ExportCommand;
|
||||
use crate::cli::import::ImportCommand;
|
||||
use crate::error::Result;
|
||||
use crate::options::GlobalOptions;
|
||||
use crate::App;
|
||||
@@ -114,6 +117,7 @@ enum SubCommand {
|
||||
// Attach(AttachCommand),
|
||||
Bench(BenchTableMetadataCommand),
|
||||
Export(ExportCommand),
|
||||
Import(ImportCommand),
|
||||
}
|
||||
|
||||
impl SubCommand {
|
||||
@@ -122,6 +126,7 @@ impl SubCommand {
|
||||
// SubCommand::Attach(cmd) => cmd.build().await,
|
||||
SubCommand::Bench(cmd) => cmd.build(guard).await,
|
||||
SubCommand::Export(cmd) => cmd.build(guard).await,
|
||||
SubCommand::Import(cmd) => cmd.build(guard).await,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
119
src/cmd/src/cli/database.rs
Normal file
119
src/cmd/src/cli/database.rs
Normal file
@@ -0,0 +1,119 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use base64::engine::general_purpose;
|
||||
use base64::Engine;
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use serde_json::Value;
|
||||
use servers::http::greptime_result_v1::GreptimedbV1Response;
|
||||
use servers::http::GreptimeQueryOutput;
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::error::{HttpQuerySqlSnafu, Result, SerdeJsonSnafu};
|
||||
|
||||
pub(crate) struct DatabaseClient {
|
||||
addr: String,
|
||||
catalog: String,
|
||||
auth_header: Option<String>,
|
||||
}
|
||||
|
||||
impl DatabaseClient {
|
||||
pub fn new(addr: String, catalog: String, auth_basic: Option<String>) -> Self {
|
||||
let auth_header = if let Some(basic) = auth_basic {
|
||||
let encoded = general_purpose::STANDARD.encode(basic);
|
||||
Some(format!("basic {}", encoded))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
Self {
|
||||
addr,
|
||||
catalog,
|
||||
auth_header,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn sql_in_public(&self, sql: &str) -> Result<Option<Vec<Vec<Value>>>> {
|
||||
self.sql(sql, DEFAULT_SCHEMA_NAME).await
|
||||
}
|
||||
|
||||
/// Execute sql query.
|
||||
pub async fn sql(&self, sql: &str, schema: &str) -> Result<Option<Vec<Vec<Value>>>> {
|
||||
let url = format!("http://{}/v1/sql", self.addr);
|
||||
let params = [
|
||||
("db", format!("{}-{}", self.catalog, schema)),
|
||||
("sql", sql.to_string()),
|
||||
];
|
||||
let mut request = reqwest::Client::new()
|
||||
.post(&url)
|
||||
.form(¶ms)
|
||||
.header("Content-Type", "application/x-www-form-urlencoded");
|
||||
if let Some(ref auth) = self.auth_header {
|
||||
request = request.header("Authorization", auth);
|
||||
}
|
||||
|
||||
let response = request.send().await.with_context(|_| HttpQuerySqlSnafu {
|
||||
reason: format!("bad url: {}", url),
|
||||
})?;
|
||||
let response = response
|
||||
.error_for_status()
|
||||
.with_context(|_| HttpQuerySqlSnafu {
|
||||
reason: format!("query failed: {}", sql),
|
||||
})?;
|
||||
|
||||
let text = response.text().await.with_context(|_| HttpQuerySqlSnafu {
|
||||
reason: "cannot get response text".to_string(),
|
||||
})?;
|
||||
|
||||
let body = serde_json::from_str::<GreptimedbV1Response>(&text).context(SerdeJsonSnafu)?;
|
||||
Ok(body.output().first().and_then(|output| match output {
|
||||
GreptimeQueryOutput::Records(records) => Some(records.rows().clone()),
|
||||
GreptimeQueryOutput::AffectedRows(_) => None,
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
/// Split at `-`.
|
||||
pub(crate) fn split_database(database: &str) -> Result<(String, Option<String>)> {
|
||||
let (catalog, schema) = match database.split_once('-') {
|
||||
Some((catalog, schema)) => (catalog, schema),
|
||||
None => (DEFAULT_CATALOG_NAME, database),
|
||||
};
|
||||
|
||||
if schema == "*" {
|
||||
Ok((catalog.to_string(), None))
|
||||
} else {
|
||||
Ok((catalog.to_string(), Some(schema.to_string())))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_split_database() {
|
||||
let result = split_database("catalog-schema").unwrap();
|
||||
assert_eq!(result, ("catalog".to_string(), Some("schema".to_string())));
|
||||
|
||||
let result = split_database("schema").unwrap();
|
||||
assert_eq!(result, ("greptime".to_string(), Some("schema".to_string())));
|
||||
|
||||
let result = split_database("catalog-*").unwrap();
|
||||
assert_eq!(result, ("catalog".to_string(), None));
|
||||
|
||||
let result = split_database("*").unwrap();
|
||||
assert_eq!(result, ("greptime".to_string(), None));
|
||||
}
|
||||
}
|
||||
@@ -17,26 +17,19 @@ use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use base64::engine::general_purpose;
|
||||
use base64::Engine;
|
||||
use clap::{Parser, ValueEnum};
|
||||
use client::DEFAULT_SCHEMA_NAME;
|
||||
use common_catalog::consts::DEFAULT_CATALOG_NAME;
|
||||
use common_telemetry::{debug, error, info};
|
||||
use serde_json::Value;
|
||||
use servers::http::greptime_result_v1::GreptimedbV1Response;
|
||||
use servers::http::GreptimeQueryOutput;
|
||||
use snafu::ResultExt;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use tokio::fs::File;
|
||||
use tokio::io::{AsyncWriteExt, BufWriter};
|
||||
use tokio::sync::Semaphore;
|
||||
use tokio::time::Instant;
|
||||
use tracing_appender::non_blocking::WorkerGuard;
|
||||
|
||||
use crate::cli::{Instance, Tool};
|
||||
use crate::error::{
|
||||
EmptyResultSnafu, Error, FileIoSnafu, HttpQuerySqlSnafu, Result, SerdeJsonSnafu,
|
||||
};
|
||||
use crate::cli::database::DatabaseClient;
|
||||
use crate::cli::{database, Instance, Tool};
|
||||
use crate::error::{EmptyResultSnafu, Error, FileIoSnafu, Result};
|
||||
|
||||
type TableReference = (String, String, String);
|
||||
|
||||
@@ -94,26 +87,21 @@ pub struct ExportCommand {
|
||||
|
||||
impl ExportCommand {
|
||||
pub async fn build(&self, guard: Vec<WorkerGuard>) -> Result<Instance> {
|
||||
let (catalog, schema) = split_database(&self.database)?;
|
||||
let (catalog, schema) = database::split_database(&self.database)?;
|
||||
|
||||
let auth_header = if let Some(basic) = &self.auth_basic {
|
||||
let encoded = general_purpose::STANDARD.encode(basic);
|
||||
Some(format!("basic {}", encoded))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let database_client =
|
||||
DatabaseClient::new(self.addr.clone(), catalog.clone(), self.auth_basic.clone());
|
||||
|
||||
Ok(Instance::new(
|
||||
Box::new(Export {
|
||||
addr: self.addr.clone(),
|
||||
catalog,
|
||||
schema,
|
||||
database_client,
|
||||
output_dir: self.output_dir.clone(),
|
||||
parallelism: self.export_jobs,
|
||||
target: self.target.clone(),
|
||||
start_time: self.start_time.clone(),
|
||||
end_time: self.end_time.clone(),
|
||||
auth_header,
|
||||
}),
|
||||
guard,
|
||||
))
|
||||
@@ -121,78 +109,43 @@ impl ExportCommand {
|
||||
}
|
||||
|
||||
pub struct Export {
|
||||
addr: String,
|
||||
catalog: String,
|
||||
schema: Option<String>,
|
||||
database_client: DatabaseClient,
|
||||
output_dir: String,
|
||||
parallelism: usize,
|
||||
target: ExportTarget,
|
||||
start_time: Option<String>,
|
||||
end_time: Option<String>,
|
||||
auth_header: Option<String>,
|
||||
}
|
||||
|
||||
impl Export {
|
||||
/// Execute one single sql query.
|
||||
async fn sql(&self, sql: &str) -> Result<Option<Vec<Vec<Value>>>> {
|
||||
let url = format!(
|
||||
"http://{}/v1/sql?db={}-{}&sql={}",
|
||||
self.addr,
|
||||
self.catalog,
|
||||
self.schema.as_deref().unwrap_or(DEFAULT_SCHEMA_NAME),
|
||||
sql
|
||||
);
|
||||
|
||||
let mut request = reqwest::Client::new()
|
||||
.get(&url)
|
||||
.header("Content-Type", "application/x-www-form-urlencoded");
|
||||
if let Some(ref auth) = self.auth_header {
|
||||
request = request.header("Authorization", auth);
|
||||
async fn get_db_names(&self) -> Result<Vec<String>> {
|
||||
if let Some(schema) = &self.schema {
|
||||
Ok(vec![schema.clone()])
|
||||
} else {
|
||||
self.all_db_names().await
|
||||
}
|
||||
|
||||
let response = request.send().await.with_context(|_| HttpQuerySqlSnafu {
|
||||
reason: format!("bad url: {}", url),
|
||||
})?;
|
||||
let response = response
|
||||
.error_for_status()
|
||||
.with_context(|_| HttpQuerySqlSnafu {
|
||||
reason: format!("query failed: {}", sql),
|
||||
})?;
|
||||
|
||||
let text = response.text().await.with_context(|_| HttpQuerySqlSnafu {
|
||||
reason: "cannot get response text".to_string(),
|
||||
})?;
|
||||
|
||||
let body = serde_json::from_str::<GreptimedbV1Response>(&text).context(SerdeJsonSnafu)?;
|
||||
Ok(body.output().first().and_then(|output| match output {
|
||||
GreptimeQueryOutput::Records(records) => Some(records.rows().clone()),
|
||||
GreptimeQueryOutput::AffectedRows(_) => None,
|
||||
}))
|
||||
}
|
||||
|
||||
/// Iterate over all db names.
|
||||
///
|
||||
/// Newbie: `db_name` is catalog + schema.
|
||||
async fn iter_db_names(&self) -> Result<Vec<(String, String)>> {
|
||||
if let Some(schema) = &self.schema {
|
||||
Ok(vec![(self.catalog.clone(), schema.clone())])
|
||||
} else {
|
||||
let result = self.sql("SHOW DATABASES").await?;
|
||||
let Some(records) = result else {
|
||||
EmptyResultSnafu.fail()?
|
||||
async fn all_db_names(&self) -> Result<Vec<String>> {
|
||||
let result = self.database_client.sql_in_public("SHOW DATABASES").await?;
|
||||
let records = result.context(EmptyResultSnafu)?;
|
||||
let mut result = Vec::with_capacity(records.len());
|
||||
for value in records {
|
||||
let Value::String(schema) = &value[0] else {
|
||||
unreachable!()
|
||||
};
|
||||
let mut result = Vec::with_capacity(records.len());
|
||||
for value in records {
|
||||
let Value::String(schema) = &value[0] else {
|
||||
unreachable!()
|
||||
};
|
||||
if schema == common_catalog::consts::INFORMATION_SCHEMA_NAME {
|
||||
continue;
|
||||
}
|
||||
result.push((self.catalog.clone(), schema.clone()));
|
||||
if schema == common_catalog::consts::INFORMATION_SCHEMA_NAME {
|
||||
continue;
|
||||
}
|
||||
Ok(result)
|
||||
if schema == common_catalog::consts::PG_CATALOG_NAME {
|
||||
continue;
|
||||
}
|
||||
result.push(schema.clone());
|
||||
}
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Return a list of [`TableReference`] to be exported.
|
||||
@@ -201,7 +154,11 @@ impl Export {
|
||||
&self,
|
||||
catalog: &str,
|
||||
schema: &str,
|
||||
) -> Result<(Vec<TableReference>, Vec<TableReference>)> {
|
||||
) -> Result<(
|
||||
Vec<TableReference>,
|
||||
Vec<TableReference>,
|
||||
Vec<TableReference>,
|
||||
)> {
|
||||
// Puts all metric table first
|
||||
let sql = format!(
|
||||
"SELECT table_catalog, table_schema, table_name \
|
||||
@@ -210,15 +167,13 @@ impl Export {
|
||||
and table_catalog = \'{catalog}\' \
|
||||
and table_schema = \'{schema}\'"
|
||||
);
|
||||
let result = self.sql(&sql).await?;
|
||||
let Some(records) = result else {
|
||||
EmptyResultSnafu.fail()?
|
||||
};
|
||||
let result = self.database_client.sql_in_public(&sql).await?;
|
||||
let records = result.context(EmptyResultSnafu)?;
|
||||
let mut metric_physical_tables = HashSet::with_capacity(records.len());
|
||||
for value in records {
|
||||
let mut t = Vec::with_capacity(3);
|
||||
for v in &value {
|
||||
let serde_json::Value::String(value) = v else {
|
||||
let Value::String(value) = v else {
|
||||
unreachable!()
|
||||
};
|
||||
t.push(value);
|
||||
@@ -228,54 +183,63 @@ impl Export {
|
||||
|
||||
// TODO: SQL injection hurts
|
||||
let sql = format!(
|
||||
"SELECT table_catalog, table_schema, table_name \
|
||||
"SELECT table_catalog, table_schema, table_name, table_type \
|
||||
FROM information_schema.tables \
|
||||
WHERE table_type = \'BASE TABLE\' \
|
||||
WHERE (table_type = \'BASE TABLE\' OR table_type = \'VIEW\') \
|
||||
and table_catalog = \'{catalog}\' \
|
||||
and table_schema = \'{schema}\'",
|
||||
);
|
||||
let result = self.sql(&sql).await?;
|
||||
let Some(records) = result else {
|
||||
EmptyResultSnafu.fail()?
|
||||
};
|
||||
let result = self.database_client.sql_in_public(&sql).await?;
|
||||
let records = result.context(EmptyResultSnafu)?;
|
||||
|
||||
debug!("Fetched table list: {:?}", records);
|
||||
debug!("Fetched table/view list: {:?}", records);
|
||||
|
||||
if records.is_empty() {
|
||||
return Ok((vec![], vec![]));
|
||||
return Ok((vec![], vec![], vec![]));
|
||||
}
|
||||
|
||||
let mut remaining_tables = Vec::with_capacity(records.len());
|
||||
let mut views = Vec::new();
|
||||
for value in records {
|
||||
let mut t = Vec::with_capacity(3);
|
||||
let mut t = Vec::with_capacity(4);
|
||||
for v in &value {
|
||||
let serde_json::Value::String(value) = v else {
|
||||
let Value::String(value) = v else {
|
||||
unreachable!()
|
||||
};
|
||||
t.push(value);
|
||||
}
|
||||
let table = (t[0].clone(), t[1].clone(), t[2].clone());
|
||||
let table_type = t[3].as_str();
|
||||
// Ignores the physical table
|
||||
if !metric_physical_tables.contains(&table) {
|
||||
remaining_tables.push(table);
|
||||
if table_type == "VIEW" {
|
||||
views.push(table);
|
||||
} else {
|
||||
remaining_tables.push(table);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok((
|
||||
metric_physical_tables.into_iter().collect(),
|
||||
remaining_tables,
|
||||
views,
|
||||
))
|
||||
}
|
||||
|
||||
async fn show_create_table(&self, catalog: &str, schema: &str, table: &str) -> Result<String> {
|
||||
async fn show_create(
|
||||
&self,
|
||||
show_type: &str,
|
||||
catalog: &str,
|
||||
schema: &str,
|
||||
table: &str,
|
||||
) -> Result<String> {
|
||||
let sql = format!(
|
||||
r#"SHOW CREATE TABLE "{}"."{}"."{}""#,
|
||||
catalog, schema, table
|
||||
r#"SHOW CREATE {} "{}"."{}"."{}""#,
|
||||
show_type, catalog, schema, table
|
||||
);
|
||||
let result = self.sql(&sql).await?;
|
||||
let Some(records) = result else {
|
||||
EmptyResultSnafu.fail()?
|
||||
};
|
||||
let result = self.database_client.sql_in_public(&sql).await?;
|
||||
let records = result.context(EmptyResultSnafu)?;
|
||||
let Value::String(create_table) = &records[0][1] else {
|
||||
unreachable!()
|
||||
};
|
||||
@@ -286,18 +250,19 @@ impl Export {
|
||||
async fn export_create_table(&self) -> Result<()> {
|
||||
let timer = Instant::now();
|
||||
let semaphore = Arc::new(Semaphore::new(self.parallelism));
|
||||
let db_names = self.iter_db_names().await?;
|
||||
let db_names = self.get_db_names().await?;
|
||||
let db_count = db_names.len();
|
||||
let mut tasks = Vec::with_capacity(db_names.len());
|
||||
for (catalog, schema) in db_names {
|
||||
for schema in db_names {
|
||||
let semaphore_moved = semaphore.clone();
|
||||
tasks.push(async move {
|
||||
let _permit = semaphore_moved.acquire().await.unwrap();
|
||||
let (metric_physical_tables, remaining_tables) =
|
||||
self.get_table_list(&catalog, &schema).await?;
|
||||
let table_count = metric_physical_tables.len() + remaining_tables.len();
|
||||
let (metric_physical_tables, remaining_tables, views) =
|
||||
self.get_table_list(&self.catalog, &schema).await?;
|
||||
let table_count =
|
||||
metric_physical_tables.len() + remaining_tables.len() + views.len();
|
||||
let output_dir = Path::new(&self.output_dir)
|
||||
.join(&catalog)
|
||||
.join(&self.catalog)
|
||||
.join(format!("{schema}/"));
|
||||
tokio::fs::create_dir_all(&output_dir)
|
||||
.await
|
||||
@@ -305,7 +270,7 @@ impl Export {
|
||||
let output_file = Path::new(&output_dir).join("create_tables.sql");
|
||||
let mut file = File::create(output_file).await.context(FileIoSnafu)?;
|
||||
for (c, s, t) in metric_physical_tables.into_iter().chain(remaining_tables) {
|
||||
match self.show_create_table(&c, &s, &t).await {
|
||||
match self.show_create("TABLE", &c, &s, &t).await {
|
||||
Err(e) => {
|
||||
error!(e; r#"Failed to export table "{}"."{}"."{}""#, c, s, t)
|
||||
}
|
||||
@@ -316,9 +281,22 @@ impl Export {
|
||||
}
|
||||
}
|
||||
}
|
||||
for (c, s, v) in views {
|
||||
match self.show_create("VIEW", &c, &s, &v).await {
|
||||
Err(e) => {
|
||||
error!(e; r#"Failed to export view "{}"."{}"."{}""#, c, s, v)
|
||||
}
|
||||
Ok(create_view) => {
|
||||
file.write_all(create_view.as_bytes())
|
||||
.await
|
||||
.context(FileIoSnafu)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info!(
|
||||
"Finished exporting {catalog}.{schema} with {table_count} table schemas to path: {}",
|
||||
"Finished exporting {}.{schema} with {table_count} table schemas to path: {}",
|
||||
self.catalog,
|
||||
output_dir.to_string_lossy()
|
||||
);
|
||||
|
||||
@@ -332,7 +310,7 @@ impl Export {
|
||||
.filter(|r| match r {
|
||||
Ok(_) => true,
|
||||
Err(e) => {
|
||||
error!(e; "export job failed");
|
||||
error!(e; "export schema job failed");
|
||||
false
|
||||
}
|
||||
})
|
||||
@@ -347,15 +325,15 @@ impl Export {
|
||||
async fn export_database_data(&self) -> Result<()> {
|
||||
let timer = Instant::now();
|
||||
let semaphore = Arc::new(Semaphore::new(self.parallelism));
|
||||
let db_names = self.iter_db_names().await?;
|
||||
let db_names = self.get_db_names().await?;
|
||||
let db_count = db_names.len();
|
||||
let mut tasks = Vec::with_capacity(db_names.len());
|
||||
for (catalog, schema) in db_names {
|
||||
let mut tasks = Vec::with_capacity(db_count);
|
||||
for schema in db_names {
|
||||
let semaphore_moved = semaphore.clone();
|
||||
tasks.push(async move {
|
||||
let _permit = semaphore_moved.acquire().await.unwrap();
|
||||
let output_dir = Path::new(&self.output_dir)
|
||||
.join(&catalog)
|
||||
.join(&self.catalog)
|
||||
.join(format!("{schema}/"));
|
||||
tokio::fs::create_dir_all(&output_dir)
|
||||
.await
|
||||
@@ -379,7 +357,7 @@ impl Export {
|
||||
|
||||
let sql = format!(
|
||||
r#"COPY DATABASE "{}"."{}" TO '{}' {};"#,
|
||||
catalog,
|
||||
self.catalog,
|
||||
schema,
|
||||
output_dir.to_str().unwrap(),
|
||||
with_options
|
||||
@@ -387,10 +365,11 @@ impl Export {
|
||||
|
||||
info!("Executing sql: {sql}");
|
||||
|
||||
self.sql(&sql).await?;
|
||||
self.database_client.sql_in_public(&sql).await?;
|
||||
|
||||
info!(
|
||||
"Finished exporting {catalog}.{schema} data into path: {}",
|
||||
"Finished exporting {}.{schema} data into path: {}",
|
||||
self.catalog,
|
||||
output_dir.to_string_lossy()
|
||||
);
|
||||
|
||||
@@ -400,7 +379,7 @@ impl Export {
|
||||
BufWriter::new(File::create(copy_from_file).await.context(FileIoSnafu)?);
|
||||
let copy_database_from_sql = format!(
|
||||
r#"COPY DATABASE "{}"."{}" FROM '{}' WITH (FORMAT='parquet');"#,
|
||||
catalog,
|
||||
self.catalog,
|
||||
schema,
|
||||
output_dir.to_str().unwrap()
|
||||
);
|
||||
@@ -410,7 +389,7 @@ impl Export {
|
||||
.context(FileIoSnafu)?;
|
||||
writer.flush().await.context(FileIoSnafu)?;
|
||||
|
||||
info!("Finished exporting {catalog}.{schema} copy_from.sql");
|
||||
info!("Finished exporting {}.{schema} copy_from.sql", self.catalog);
|
||||
|
||||
Ok::<(), Error>(())
|
||||
})
|
||||
@@ -429,13 +408,12 @@ impl Export {
|
||||
.count();
|
||||
let elapsed = timer.elapsed();
|
||||
|
||||
info!("Success {success}/{db_count} jobs, costs: {:?}", elapsed);
|
||||
info!("Success {success}/{db_count} jobs, costs: {elapsed:?}");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(deprecated)]
|
||||
#[async_trait]
|
||||
impl Tool for Export {
|
||||
async fn do_work(&self) -> Result<()> {
|
||||
@@ -450,20 +428,6 @@ impl Tool for Export {
|
||||
}
|
||||
}
|
||||
|
||||
/// Split at `-`.
|
||||
fn split_database(database: &str) -> Result<(String, Option<String>)> {
|
||||
let (catalog, schema) = match database.split_once('-') {
|
||||
Some((catalog, schema)) => (catalog, schema),
|
||||
None => (DEFAULT_CATALOG_NAME, database),
|
||||
};
|
||||
|
||||
if schema == "*" {
|
||||
Ok((catalog.to_string(), None))
|
||||
} else {
|
||||
Ok((catalog.to_string(), Some(schema.to_string())))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use clap::Parser;
|
||||
@@ -471,26 +435,10 @@ mod tests {
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use common_telemetry::logging::LoggingOptions;
|
||||
|
||||
use crate::cli::export::split_database;
|
||||
use crate::error::Result as CmdResult;
|
||||
use crate::options::GlobalOptions;
|
||||
use crate::{cli, standalone, App};
|
||||
|
||||
#[test]
|
||||
fn test_split_database() {
|
||||
let result = split_database("catalog-schema").unwrap();
|
||||
assert_eq!(result, ("catalog".to_string(), Some("schema".to_string())));
|
||||
|
||||
let result = split_database("schema").unwrap();
|
||||
assert_eq!(result, ("greptime".to_string(), Some("schema".to_string())));
|
||||
|
||||
let result = split_database("catalog-*").unwrap();
|
||||
assert_eq!(result, ("catalog".to_string(), None));
|
||||
|
||||
let result = split_database("*").unwrap();
|
||||
assert_eq!(result, ("greptime".to_string(), None));
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_export_create_table_with_quoted_names() -> CmdResult<()> {
|
||||
let output_dir = tempfile::tempdir().unwrap();
|
||||
|
||||
204
src/cmd/src/cli/import.rs
Normal file
204
src/cmd/src/cli/import.rs
Normal file
@@ -0,0 +1,204 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use clap::{Parser, ValueEnum};
|
||||
use common_telemetry::{error, info, warn};
|
||||
use snafu::ResultExt;
|
||||
use tokio::sync::Semaphore;
|
||||
use tokio::time::Instant;
|
||||
use tracing_appender::non_blocking::WorkerGuard;
|
||||
|
||||
use crate::cli::database::DatabaseClient;
|
||||
use crate::cli::{database, Instance, Tool};
|
||||
use crate::error::{Error, FileIoSnafu, Result};
|
||||
|
||||
#[derive(Debug, Default, Clone, ValueEnum)]
|
||||
enum ImportTarget {
|
||||
/// Import all table schemas into the database.
|
||||
Schema,
|
||||
/// Import all table data into the database.
|
||||
Data,
|
||||
/// Export all table schemas and data at once.
|
||||
#[default]
|
||||
All,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Parser)]
|
||||
pub struct ImportCommand {
|
||||
/// Server address to connect
|
||||
#[clap(long)]
|
||||
addr: String,
|
||||
|
||||
/// Directory of the data. E.g.: /tmp/greptimedb-backup
|
||||
#[clap(long)]
|
||||
input_dir: String,
|
||||
|
||||
/// The name of the catalog to import.
|
||||
#[clap(long, default_value = "greptime-*")]
|
||||
database: String,
|
||||
|
||||
/// Parallelism of the import.
|
||||
#[clap(long, short = 'j', default_value = "1")]
|
||||
import_jobs: usize,
|
||||
|
||||
/// Max retry times for each job.
|
||||
#[clap(long, default_value = "3")]
|
||||
max_retry: usize,
|
||||
|
||||
/// Things to export
|
||||
#[clap(long, short = 't', value_enum, default_value = "all")]
|
||||
target: ImportTarget,
|
||||
|
||||
/// The basic authentication for connecting to the server
|
||||
#[clap(long)]
|
||||
auth_basic: Option<String>,
|
||||
}
|
||||
|
||||
impl ImportCommand {
|
||||
pub async fn build(&self, guard: Vec<WorkerGuard>) -> Result<Instance> {
|
||||
let (catalog, schema) = database::split_database(&self.database)?;
|
||||
let database_client =
|
||||
DatabaseClient::new(self.addr.clone(), catalog.clone(), self.auth_basic.clone());
|
||||
|
||||
Ok(Instance::new(
|
||||
Box::new(Import {
|
||||
catalog,
|
||||
schema,
|
||||
database_client,
|
||||
input_dir: self.input_dir.clone(),
|
||||
parallelism: self.import_jobs,
|
||||
target: self.target.clone(),
|
||||
}),
|
||||
guard,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Import {
|
||||
catalog: String,
|
||||
schema: Option<String>,
|
||||
database_client: DatabaseClient,
|
||||
input_dir: String,
|
||||
parallelism: usize,
|
||||
target: ImportTarget,
|
||||
}
|
||||
|
||||
impl Import {
|
||||
async fn import_create_table(&self) -> Result<()> {
|
||||
self.do_sql_job("create_tables.sql").await
|
||||
}
|
||||
|
||||
async fn import_database_data(&self) -> Result<()> {
|
||||
self.do_sql_job("copy_from.sql").await
|
||||
}
|
||||
|
||||
async fn do_sql_job(&self, filename: &str) -> Result<()> {
|
||||
let timer = Instant::now();
|
||||
let semaphore = Arc::new(Semaphore::new(self.parallelism));
|
||||
let db_names = self.get_db_names().await?;
|
||||
let db_count = db_names.len();
|
||||
let mut tasks = Vec::with_capacity(db_count);
|
||||
for schema in db_names {
|
||||
let semaphore_moved = semaphore.clone();
|
||||
tasks.push(async move {
|
||||
let _permit = semaphore_moved.acquire().await.unwrap();
|
||||
let database_input_dir = self.catalog_path().join(&schema);
|
||||
let sql_file = database_input_dir.join(filename);
|
||||
let sql = tokio::fs::read_to_string(sql_file)
|
||||
.await
|
||||
.context(FileIoSnafu)?;
|
||||
if sql.is_empty() {
|
||||
info!("Empty `{filename}` {database_input_dir:?}");
|
||||
} else {
|
||||
self.database_client.sql(&sql, &schema).await?;
|
||||
info!("Imported `{filename}` for database {schema}");
|
||||
}
|
||||
|
||||
Ok::<(), Error>(())
|
||||
})
|
||||
}
|
||||
|
||||
let success = futures::future::join_all(tasks)
|
||||
.await
|
||||
.into_iter()
|
||||
.filter(|r| match r {
|
||||
Ok(_) => true,
|
||||
Err(e) => {
|
||||
error!(e; "import {filename} job failed");
|
||||
false
|
||||
}
|
||||
})
|
||||
.count();
|
||||
let elapsed = timer.elapsed();
|
||||
info!("Success {success}/{db_count} `{filename}` jobs, cost: {elapsed:?}");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn catalog_path(&self) -> PathBuf {
|
||||
PathBuf::from(&self.input_dir).join(&self.catalog)
|
||||
}
|
||||
|
||||
async fn get_db_names(&self) -> Result<Vec<String>> {
|
||||
if let Some(schema) = &self.schema {
|
||||
Ok(vec![schema.clone()])
|
||||
} else {
|
||||
self.all_db_names().await
|
||||
}
|
||||
}
|
||||
|
||||
// Get all database names in the input directory.
|
||||
// The directory structure should be like:
|
||||
// /tmp/greptimedb-backup
|
||||
// ├── greptime-1
|
||||
// │ ├── db1
|
||||
// │ └── db2
|
||||
async fn all_db_names(&self) -> Result<Vec<String>> {
|
||||
let mut db_names = vec![];
|
||||
let path = self.catalog_path();
|
||||
let mut entries = tokio::fs::read_dir(path).await.context(FileIoSnafu)?;
|
||||
while let Some(entry) = entries.next_entry().await.context(FileIoSnafu)? {
|
||||
let path = entry.path();
|
||||
if path.is_dir() {
|
||||
let db_name = match path.file_name() {
|
||||
Some(name) => name.to_string_lossy().to_string(),
|
||||
None => {
|
||||
warn!("Failed to get the file name of {:?}", path);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
db_names.push(db_name);
|
||||
}
|
||||
}
|
||||
Ok(db_names)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Tool for Import {
|
||||
async fn do_work(&self) -> Result<()> {
|
||||
match self.target {
|
||||
ImportTarget::Schema => self.import_create_table().await,
|
||||
ImportTarget::Data => self.import_database_data().await,
|
||||
ImportTarget::All => {
|
||||
self.import_create_table().await?;
|
||||
self.import_database_data().await
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -141,6 +141,8 @@ pub struct StandaloneOptions {
|
||||
pub region_engine: Vec<RegionEngineConfig>,
|
||||
pub export_metrics: ExportMetricsOption,
|
||||
pub tracing: TracingOptions,
|
||||
pub init_regions_in_background: bool,
|
||||
pub init_regions_parallelism: usize,
|
||||
}
|
||||
|
||||
impl Default for StandaloneOptions {
|
||||
@@ -168,6 +170,8 @@ impl Default for StandaloneOptions {
|
||||
RegionEngineConfig::File(FileEngineConfig::default()),
|
||||
],
|
||||
tracing: TracingOptions::default(),
|
||||
init_regions_in_background: false,
|
||||
init_regions_parallelism: 16,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -218,6 +222,9 @@ impl StandaloneOptions {
|
||||
storage: cloned_opts.storage,
|
||||
region_engine: cloned_opts.region_engine,
|
||||
grpc: cloned_opts.grpc,
|
||||
init_regions_in_background: cloned_opts.init_regions_in_background,
|
||||
init_regions_parallelism: cloned_opts.init_regions_parallelism,
|
||||
mode: Mode::Standalone,
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -81,6 +81,7 @@ fn test_load_datanode_example_config() {
|
||||
sst_meta_cache_size: ReadableSize::mb(128),
|
||||
vector_cache_size: ReadableSize::mb(512),
|
||||
page_cache_size: ReadableSize::mb(512),
|
||||
selector_result_cache_size: ReadableSize::mb(512),
|
||||
max_background_jobs: 4,
|
||||
experimental_write_cache_ttl: Some(Duration::from_secs(60 * 60 * 8)),
|
||||
..Default::default()
|
||||
|
||||
@@ -7,10 +7,6 @@ license.workspace = true
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[features]
|
||||
default = ["geo"]
|
||||
geo = ["geohash", "h3o"]
|
||||
|
||||
[dependencies]
|
||||
api.workspace = true
|
||||
arc-swap = "1.0"
|
||||
@@ -27,8 +23,6 @@ common-time.workspace = true
|
||||
common-version.workspace = true
|
||||
datafusion.workspace = true
|
||||
datatypes.workspace = true
|
||||
geohash = { version = "0.13", optional = true }
|
||||
h3o = { version = "0.6", optional = true }
|
||||
num = "0.4"
|
||||
num-traits = "0.2"
|
||||
once_cell.workspace = true
|
||||
|
||||
@@ -116,10 +116,6 @@ pub static FUNCTION_REGISTRY: Lazy<Arc<FunctionRegistry>> = Lazy::new(|| {
|
||||
SystemFunction::register(&function_registry);
|
||||
TableFunction::register(&function_registry);
|
||||
|
||||
// Geo functions
|
||||
#[cfg(feature = "geo")]
|
||||
crate::scalars::geo::GeoFunctions::register(&function_registry);
|
||||
|
||||
Arc::new(function_registry)
|
||||
});
|
||||
|
||||
|
||||
@@ -15,8 +15,6 @@
|
||||
pub mod aggregate;
|
||||
pub(crate) mod date;
|
||||
pub mod expression;
|
||||
#[cfg(feature = "geo")]
|
||||
pub mod geo;
|
||||
pub mod matches;
|
||||
pub mod math;
|
||||
pub mod numpy;
|
||||
|
||||
@@ -1,31 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
mod geohash;
|
||||
mod h3;
|
||||
|
||||
use geohash::GeohashFunction;
|
||||
use h3::H3Function;
|
||||
|
||||
use crate::function_registry::FunctionRegistry;
|
||||
|
||||
pub(crate) struct GeoFunctions;
|
||||
|
||||
impl GeoFunctions {
|
||||
pub fn register(registry: &FunctionRegistry) {
|
||||
registry.register(Arc::new(GeohashFunction));
|
||||
registry.register(Arc::new(H3Function));
|
||||
}
|
||||
}
|
||||
@@ -1,135 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::fmt;
|
||||
|
||||
use common_error::ext::{BoxedError, PlainError};
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_query::error::{self, InvalidFuncArgsSnafu, Result};
|
||||
use common_query::prelude::{Signature, TypeSignature};
|
||||
use datafusion::logical_expr::Volatility;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::scalars::ScalarVectorBuilder;
|
||||
use datatypes::value::Value;
|
||||
use datatypes::vectors::{MutableVector, StringVectorBuilder, VectorRef};
|
||||
use geohash::Coord;
|
||||
use snafu::{ensure, ResultExt};
|
||||
|
||||
use crate::function::{Function, FunctionContext};
|
||||
|
||||
/// Function that return geohash string for a given geospatial coordinate.
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub struct GeohashFunction;
|
||||
|
||||
const NAME: &str = "geohash";
|
||||
|
||||
impl Function for GeohashFunction {
|
||||
fn name(&self) -> &str {
|
||||
NAME
|
||||
}
|
||||
|
||||
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
|
||||
Ok(ConcreteDataType::string_datatype())
|
||||
}
|
||||
|
||||
fn signature(&self) -> Signature {
|
||||
let mut signatures = Vec::new();
|
||||
for coord_type in &[
|
||||
ConcreteDataType::float32_datatype(),
|
||||
ConcreteDataType::float64_datatype(),
|
||||
] {
|
||||
for resolution_type in &[
|
||||
ConcreteDataType::int8_datatype(),
|
||||
ConcreteDataType::int16_datatype(),
|
||||
ConcreteDataType::int32_datatype(),
|
||||
ConcreteDataType::int64_datatype(),
|
||||
ConcreteDataType::uint8_datatype(),
|
||||
ConcreteDataType::uint16_datatype(),
|
||||
ConcreteDataType::uint32_datatype(),
|
||||
ConcreteDataType::uint64_datatype(),
|
||||
] {
|
||||
signatures.push(TypeSignature::Exact(vec![
|
||||
// latitude
|
||||
coord_type.clone(),
|
||||
// longitude
|
||||
coord_type.clone(),
|
||||
// resolution
|
||||
resolution_type.clone(),
|
||||
]));
|
||||
}
|
||||
}
|
||||
Signature::one_of(signatures, Volatility::Stable)
|
||||
}
|
||||
|
||||
fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
|
||||
ensure!(
|
||||
columns.len() == 3,
|
||||
InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"The length of the args is not correct, expect 3, provided : {}",
|
||||
columns.len()
|
||||
),
|
||||
}
|
||||
);
|
||||
|
||||
let lat_vec = &columns[0];
|
||||
let lon_vec = &columns[1];
|
||||
let resolution_vec = &columns[2];
|
||||
|
||||
let size = lat_vec.len();
|
||||
let mut results = StringVectorBuilder::with_capacity(size);
|
||||
|
||||
for i in 0..size {
|
||||
let lat = lat_vec.get(i).as_f64_lossy();
|
||||
let lon = lon_vec.get(i).as_f64_lossy();
|
||||
let r = match resolution_vec.get(i) {
|
||||
Value::Int8(v) => v as usize,
|
||||
Value::Int16(v) => v as usize,
|
||||
Value::Int32(v) => v as usize,
|
||||
Value::Int64(v) => v as usize,
|
||||
Value::UInt8(v) => v as usize,
|
||||
Value::UInt16(v) => v as usize,
|
||||
Value::UInt32(v) => v as usize,
|
||||
Value::UInt64(v) => v as usize,
|
||||
_ => unreachable!(),
|
||||
};
|
||||
|
||||
let result = match (lat, lon) {
|
||||
(Some(lat), Some(lon)) => {
|
||||
let coord = Coord { x: lon, y: lat };
|
||||
let encoded = geohash::encode(coord, r)
|
||||
.map_err(|e| {
|
||||
BoxedError::new(PlainError::new(
|
||||
format!("Geohash error: {}", e),
|
||||
StatusCode::EngineExecuteQuery,
|
||||
))
|
||||
})
|
||||
.context(error::ExecuteSnafu)?;
|
||||
Some(encoded)
|
||||
}
|
||||
_ => None,
|
||||
};
|
||||
|
||||
results.push(result.as_deref());
|
||||
}
|
||||
|
||||
Ok(results.to_vector())
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for GeohashFunction {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "{}", NAME)
|
||||
}
|
||||
}
|
||||
@@ -1,143 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::fmt;
|
||||
|
||||
use common_error::ext::{BoxedError, PlainError};
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_query::error::{self, InvalidFuncArgsSnafu, Result};
|
||||
use common_query::prelude::{Signature, TypeSignature};
|
||||
use datafusion::logical_expr::Volatility;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::scalars::ScalarVectorBuilder;
|
||||
use datatypes::value::Value;
|
||||
use datatypes::vectors::{MutableVector, StringVectorBuilder, VectorRef};
|
||||
use h3o::{LatLng, Resolution};
|
||||
use snafu::{ensure, ResultExt};
|
||||
|
||||
use crate::function::{Function, FunctionContext};
|
||||
|
||||
/// Function that return h3 encoding string for a given geospatial coordinate.
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub struct H3Function;
|
||||
|
||||
const NAME: &str = "h3";
|
||||
|
||||
impl Function for H3Function {
|
||||
fn name(&self) -> &str {
|
||||
NAME
|
||||
}
|
||||
|
||||
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
|
||||
Ok(ConcreteDataType::string_datatype())
|
||||
}
|
||||
|
||||
fn signature(&self) -> Signature {
|
||||
let mut signatures = Vec::new();
|
||||
for coord_type in &[
|
||||
ConcreteDataType::float32_datatype(),
|
||||
ConcreteDataType::float64_datatype(),
|
||||
] {
|
||||
for resolution_type in &[
|
||||
ConcreteDataType::int8_datatype(),
|
||||
ConcreteDataType::int16_datatype(),
|
||||
ConcreteDataType::int32_datatype(),
|
||||
ConcreteDataType::int64_datatype(),
|
||||
ConcreteDataType::uint8_datatype(),
|
||||
ConcreteDataType::uint16_datatype(),
|
||||
ConcreteDataType::uint32_datatype(),
|
||||
ConcreteDataType::uint64_datatype(),
|
||||
] {
|
||||
signatures.push(TypeSignature::Exact(vec![
|
||||
// latitude
|
||||
coord_type.clone(),
|
||||
// longitude
|
||||
coord_type.clone(),
|
||||
// resolution
|
||||
resolution_type.clone(),
|
||||
]));
|
||||
}
|
||||
}
|
||||
Signature::one_of(signatures, Volatility::Stable)
|
||||
}
|
||||
|
||||
fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
|
||||
ensure!(
|
||||
columns.len() == 3,
|
||||
InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"The length of the args is not correct, expect 3, provided : {}",
|
||||
columns.len()
|
||||
),
|
||||
}
|
||||
);
|
||||
|
||||
let lat_vec = &columns[0];
|
||||
let lon_vec = &columns[1];
|
||||
let resolution_vec = &columns[2];
|
||||
|
||||
let size = lat_vec.len();
|
||||
let mut results = StringVectorBuilder::with_capacity(size);
|
||||
|
||||
for i in 0..size {
|
||||
let lat = lat_vec.get(i).as_f64_lossy();
|
||||
let lon = lon_vec.get(i).as_f64_lossy();
|
||||
let r = match resolution_vec.get(i) {
|
||||
Value::Int8(v) => v as u8,
|
||||
Value::Int16(v) => v as u8,
|
||||
Value::Int32(v) => v as u8,
|
||||
Value::Int64(v) => v as u8,
|
||||
Value::UInt8(v) => v,
|
||||
Value::UInt16(v) => v as u8,
|
||||
Value::UInt32(v) => v as u8,
|
||||
Value::UInt64(v) => v as u8,
|
||||
_ => unreachable!(),
|
||||
};
|
||||
|
||||
let result = match (lat, lon) {
|
||||
(Some(lat), Some(lon)) => {
|
||||
let coord = LatLng::new(lat, lon)
|
||||
.map_err(|e| {
|
||||
BoxedError::new(PlainError::new(
|
||||
format!("H3 error: {}", e),
|
||||
StatusCode::EngineExecuteQuery,
|
||||
))
|
||||
})
|
||||
.context(error::ExecuteSnafu)?;
|
||||
let r = Resolution::try_from(r as u8)
|
||||
.map_err(|e| {
|
||||
BoxedError::new(PlainError::new(
|
||||
format!("H3 error: {}", e),
|
||||
StatusCode::EngineExecuteQuery,
|
||||
))
|
||||
})
|
||||
.context(error::ExecuteSnafu)?;
|
||||
let encoded = coord.to_cell(r).to_string();
|
||||
Some(encoded)
|
||||
}
|
||||
_ => None,
|
||||
};
|
||||
|
||||
results.push(result.as_deref());
|
||||
}
|
||||
|
||||
Ok(results.to_vector())
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for H3Function {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "{}", NAME)
|
||||
}
|
||||
}
|
||||
@@ -89,6 +89,19 @@ impl TryFrom<&HashMap<String, String>> for SchemaNameValue {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<SchemaNameValue> for HashMap<String, String> {
|
||||
fn from(value: SchemaNameValue) -> Self {
|
||||
let mut opts = HashMap::new();
|
||||
if let Some(ttl) = value.ttl {
|
||||
opts.insert(
|
||||
OPT_KEY_TTL.to_string(),
|
||||
format!("{}", humantime::format_duration(ttl)),
|
||||
);
|
||||
}
|
||||
opts
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> SchemaNameKey<'a> {
|
||||
pub fn new(catalog: &'a str, schema: &'a str) -> Self {
|
||||
Self { catalog, schema }
|
||||
|
||||
@@ -172,13 +172,12 @@ impl ErrorExt for Error {
|
||||
|
||||
Error::DataTypes { .. }
|
||||
| Error::CreateRecordBatches { .. }
|
||||
| Error::PollStream { .. }
|
||||
| Error::Format { .. }
|
||||
| Error::ToArrowScalar { .. }
|
||||
| Error::ProjectArrowRecordBatch { .. }
|
||||
| Error::PhysicalExpr { .. } => StatusCode::Internal,
|
||||
|
||||
Error::PollStream { .. } => StatusCode::EngineExecuteQuery,
|
||||
|
||||
Error::ArrowCompute { .. } => StatusCode::IllegalState,
|
||||
|
||||
Error::ColumnNotExists { .. } => StatusCode::TableColumnNotFound,
|
||||
|
||||
@@ -234,7 +234,13 @@ pub fn init_global_logging(
|
||||
opentelemetry_otlp::new_exporter().tonic().with_endpoint(
|
||||
opts.otlp_endpoint
|
||||
.as_ref()
|
||||
.map(|e| format!("http://{}", e))
|
||||
.map(|e| {
|
||||
if e.starts_with("http") {
|
||||
e.to_string()
|
||||
} else {
|
||||
format!("http://{}", e)
|
||||
}
|
||||
})
|
||||
.unwrap_or(DEFAULT_OTLP_ENDPOINT.to_string()),
|
||||
),
|
||||
)
|
||||
|
||||
@@ -37,7 +37,7 @@ use crate::alive_keeper::RegionAliveKeeper;
|
||||
use crate::config::DatanodeOptions;
|
||||
use crate::error::{self, MetaClientInitSnafu, Result};
|
||||
use crate::event_listener::RegionServerEventReceiver;
|
||||
use crate::metrics;
|
||||
use crate::metrics::{self, HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT};
|
||||
use crate::region_server::RegionServer;
|
||||
|
||||
pub(crate) mod handler;
|
||||
@@ -231,10 +231,12 @@ impl HeartbeatTask {
|
||||
mailbox_message: Some(message),
|
||||
..Default::default()
|
||||
};
|
||||
HEARTBEAT_RECV_COUNT.with_label_values(&["success"]).inc();
|
||||
Some(req)
|
||||
}
|
||||
Err(e) => {
|
||||
error!(e; "Failed to encode mailbox messages!");
|
||||
HEARTBEAT_RECV_COUNT.with_label_values(&["error"]).inc();
|
||||
None
|
||||
}
|
||||
}
|
||||
@@ -304,6 +306,8 @@ impl HeartbeatTask {
|
||||
error!(e; "Failed to reconnect to metasrv!");
|
||||
}
|
||||
}
|
||||
} else {
|
||||
HEARTBEAT_SENT_COUNT.inc();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -54,4 +54,17 @@ lazy_static! {
|
||||
&[REGION_ROLE]
|
||||
)
|
||||
.unwrap();
|
||||
/// The number of heartbeats send by datanode.
|
||||
pub static ref HEARTBEAT_SENT_COUNT: IntCounter = register_int_counter!(
|
||||
"greptime_datanode_heartbeat_send_count",
|
||||
"datanode heartbeat sent",
|
||||
)
|
||||
.unwrap();
|
||||
/// The number of heartbeats received by datanode, labeled with result type.
|
||||
pub static ref HEARTBEAT_RECV_COUNT: IntCounterVec = register_int_counter_vec!(
|
||||
"greptime_datanode_heartbeat_recv_count",
|
||||
"datanode heartbeat received",
|
||||
&["result"]
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
@@ -860,7 +860,7 @@ impl RegionServerInner {
|
||||
// complains "higher-ranked lifetime error". Rust can't prove some future is legit.
|
||||
// Possible related issue: https://github.com/rust-lang/rust/issues/102211
|
||||
//
|
||||
// The walkaround is to put the async functions in the `common_runtime::spawn_global`. Or like
|
||||
// The workaround is to put the async functions in the `common_runtime::spawn_global`. Or like
|
||||
// it here, collect the values first then use later separately.
|
||||
|
||||
let regions = self
|
||||
|
||||
@@ -268,23 +268,6 @@ impl Value {
|
||||
}
|
||||
}
|
||||
|
||||
/// Cast Value to f32. Return None if it's not castable;
|
||||
pub fn as_f64_lossy(&self) -> Option<f64> {
|
||||
match self {
|
||||
Value::Float32(v) => Some(v.0 as _),
|
||||
Value::Float64(v) => Some(v.0),
|
||||
Value::Int8(v) => Some(*v as _),
|
||||
Value::Int16(v) => Some(*v as _),
|
||||
Value::Int32(v) => Some(*v as _),
|
||||
Value::Int64(v) => Some(*v as _),
|
||||
Value::UInt8(v) => Some(*v as _),
|
||||
Value::UInt16(v) => Some(*v as _),
|
||||
Value::UInt32(v) => Some(*v as _),
|
||||
Value::UInt64(v) => Some(*v as _),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the logical type of the value.
|
||||
pub fn logical_type_id(&self) -> LogicalTypeId {
|
||||
match self {
|
||||
|
||||
@@ -49,13 +49,13 @@ use crate::adapter::table_source::TableSource;
|
||||
use crate::adapter::util::column_schemas_to_proto;
|
||||
use crate::adapter::worker::{create_worker, Worker, WorkerHandle};
|
||||
use crate::compute::ErrCollector;
|
||||
use crate::df_optimizer::sql_to_flow_plan;
|
||||
use crate::error::{ExternalSnafu, InternalSnafu, TableNotFoundSnafu, UnexpectedSnafu};
|
||||
use crate::expr::GlobalId;
|
||||
use crate::metrics::{
|
||||
METRIC_FLOW_INPUT_BUF_SIZE, METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_RUN_INTERVAL_MS,
|
||||
};
|
||||
use crate::repr::{self, DiffRow, Row, BATCH_SIZE};
|
||||
use crate::transform::sql_to_flow_plan;
|
||||
|
||||
mod flownode_impl;
|
||||
mod parse_expr;
|
||||
|
||||
@@ -28,7 +28,7 @@ use super::state::Scheduler;
|
||||
use crate::compute::state::DataflowState;
|
||||
use crate::compute::types::{Collection, CollectionBundle, ErrCollector, Toff};
|
||||
use crate::error::{Error, InvalidQuerySnafu, NotImplementedSnafu};
|
||||
use crate::expr::{self, GlobalId, LocalId};
|
||||
use crate::expr::{self, Batch, GlobalId, LocalId};
|
||||
use crate::plan::{Plan, TypedPlan};
|
||||
use crate::repr::{self, DiffRow};
|
||||
|
||||
@@ -87,9 +87,38 @@ impl<'referred, 'df> Context<'referred, 'df> {
|
||||
}
|
||||
|
||||
impl<'referred, 'df> Context<'referred, 'df> {
|
||||
/// Interpret and execute plan
|
||||
/// Like `render_plan` but in Batch Mode
|
||||
pub fn render_plan_batch(&mut self, plan: TypedPlan) -> Result<CollectionBundle<Batch>, Error> {
|
||||
match plan.plan {
|
||||
Plan::Constant { rows } => Ok(self.render_constant_batch(rows)),
|
||||
Plan::Get { .. } => NotImplementedSnafu {
|
||||
reason: "Get is still WIP in batchmode",
|
||||
}
|
||||
.fail(),
|
||||
Plan::Let { .. } => NotImplementedSnafu {
|
||||
reason: "Let is still WIP in batchmode",
|
||||
}
|
||||
.fail(),
|
||||
Plan::Mfp { input, mfp } => self.render_mfp_batch(input, mfp),
|
||||
Plan::Reduce {
|
||||
input,
|
||||
key_val_plan,
|
||||
reduce_plan,
|
||||
} => self.render_reduce_batch(input, &key_val_plan, &reduce_plan, &plan.schema.typ),
|
||||
Plan::Join { .. } => NotImplementedSnafu {
|
||||
reason: "Join is still WIP",
|
||||
}
|
||||
.fail(),
|
||||
Plan::Union { .. } => NotImplementedSnafu {
|
||||
reason: "Union is still WIP",
|
||||
}
|
||||
.fail(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Interpret plan to dataflow and prepare them for execution
|
||||
///
|
||||
/// return the output of this plan
|
||||
/// return the output handler of this plan
|
||||
pub fn render_plan(&mut self, plan: TypedPlan) -> Result<CollectionBundle, Error> {
|
||||
match plan.plan {
|
||||
Plan::Constant { rows } => Ok(self.render_constant(rows)),
|
||||
@@ -112,17 +141,61 @@ impl<'referred, 'df> Context<'referred, 'df> {
|
||||
}
|
||||
}
|
||||
|
||||
/// render Constant, take all rows that have a timestamp not greater than the current time
|
||||
/// This function is primarily used for testing
|
||||
/// Always assume input is sorted by timestamp
|
||||
pub fn render_constant_batch(&mut self, rows: Vec<DiffRow>) -> CollectionBundle<Batch> {
|
||||
let (send_port, recv_port) = self.df.make_edge::<_, Toff<Batch>>("constant_batch");
|
||||
let mut per_time: BTreeMap<repr::Timestamp, Vec<DiffRow>> = Default::default();
|
||||
for (key, group) in &rows.into_iter().group_by(|(_row, ts, _diff)| *ts) {
|
||||
per_time.entry(key).or_default().extend(group);
|
||||
}
|
||||
|
||||
let now = self.compute_state.current_time_ref();
|
||||
// TODO(discord9): better way to schedule future run
|
||||
let scheduler = self.compute_state.get_scheduler();
|
||||
let scheduler_inner = scheduler.clone();
|
||||
let err_collector = self.err_collector.clone();
|
||||
|
||||
let subgraph_id =
|
||||
self.df
|
||||
.add_subgraph_source("ConstantBatch", send_port, move |_ctx, send_port| {
|
||||
// find the first timestamp that is greater than now
|
||||
// use filter_map
|
||||
|
||||
let mut after = per_time.split_off(&(*now.borrow() + 1));
|
||||
// swap
|
||||
std::mem::swap(&mut per_time, &mut after);
|
||||
let not_great_than_now = after;
|
||||
|
||||
not_great_than_now.into_iter().for_each(|(_ts, rows)| {
|
||||
err_collector.run(|| {
|
||||
let rows = rows.into_iter().map(|(row, _ts, _diff)| row).collect();
|
||||
let batch = Batch::try_from_rows(rows)?;
|
||||
send_port.give(vec![batch]);
|
||||
Ok(())
|
||||
});
|
||||
});
|
||||
// schedule the next run
|
||||
if let Some(next_run_time) = per_time.keys().next().copied() {
|
||||
scheduler_inner.schedule_at(next_run_time);
|
||||
}
|
||||
});
|
||||
scheduler.set_cur_subgraph(subgraph_id);
|
||||
|
||||
CollectionBundle::from_collection(Collection::from_port(recv_port))
|
||||
}
|
||||
|
||||
/// render Constant, take all rows that have a timestamp not greater than the current time
|
||||
///
|
||||
/// Always assume input is sorted by timestamp
|
||||
pub fn render_constant(&mut self, rows: Vec<DiffRow>) -> CollectionBundle {
|
||||
let (send_port, recv_port) = self.df.make_edge::<_, Toff>("constant");
|
||||
let mut per_time: BTreeMap<repr::Timestamp, Vec<DiffRow>> = rows
|
||||
.into_iter()
|
||||
.group_by(|(_row, ts, _diff)| *ts)
|
||||
.into_iter()
|
||||
.map(|(k, v)| (k, v.into_iter().collect_vec()))
|
||||
.collect();
|
||||
let mut per_time: BTreeMap<repr::Timestamp, Vec<DiffRow>> = Default::default();
|
||||
for (key, group) in &rows.into_iter().group_by(|(_row, ts, _diff)| *ts) {
|
||||
per_time.entry(key).or_default().extend(group);
|
||||
}
|
||||
|
||||
let now = self.compute_state.current_time_ref();
|
||||
// TODO(discord9): better way to schedule future run
|
||||
let scheduler = self.compute_state.get_scheduler();
|
||||
|
||||
@@ -23,12 +23,59 @@ use crate::compute::render::Context;
|
||||
use crate::compute::state::Scheduler;
|
||||
use crate::compute::types::{Arranged, Collection, CollectionBundle, ErrCollector, Toff};
|
||||
use crate::error::{Error, PlanSnafu};
|
||||
use crate::expr::{EvalError, MapFilterProject, MfpPlan, ScalarExpr};
|
||||
use crate::expr::{Batch, EvalError, MapFilterProject, MfpPlan, ScalarExpr};
|
||||
use crate::plan::TypedPlan;
|
||||
use crate::repr::{self, DiffRow, KeyValDiffRow, Row};
|
||||
use crate::utils::ArrangeHandler;
|
||||
|
||||
impl<'referred, 'df> Context<'referred, 'df> {
|
||||
/// Like `render_mfp` but in batch mode
|
||||
pub fn render_mfp_batch(
|
||||
&mut self,
|
||||
input: Box<TypedPlan>,
|
||||
mfp: MapFilterProject,
|
||||
) -> Result<CollectionBundle<Batch>, Error> {
|
||||
let input = self.render_plan_batch(*input)?;
|
||||
|
||||
let (out_send_port, out_recv_port) = self.df.make_edge::<_, Toff<Batch>>("mfp_batch");
|
||||
|
||||
// This closure capture following variables:
|
||||
let mfp_plan = MfpPlan::create_from(mfp)?;
|
||||
|
||||
let err_collector = self.err_collector.clone();
|
||||
|
||||
// TODO(discord9): better way to schedule future run
|
||||
let scheduler = self.compute_state.get_scheduler();
|
||||
|
||||
let subgraph = self.df.add_subgraph_in_out(
|
||||
"mfp_batch",
|
||||
input.collection.into_inner(),
|
||||
out_send_port,
|
||||
move |_ctx, recv, send| {
|
||||
// mfp only need to passively receive updates from recvs
|
||||
let src_data = recv.take_inner().into_iter().flat_map(|v| v.into_iter());
|
||||
|
||||
let output_batches = src_data
|
||||
.filter_map(|mut input_batch| {
|
||||
err_collector.run(|| {
|
||||
let res_batch = mfp_plan.mfp.eval_batch_into(&mut input_batch)?;
|
||||
Ok(res_batch)
|
||||
})
|
||||
})
|
||||
.collect_vec();
|
||||
|
||||
send.give(output_batches);
|
||||
},
|
||||
);
|
||||
|
||||
// register current subgraph in scheduler for future scheduling
|
||||
scheduler.set_cur_subgraph(subgraph);
|
||||
|
||||
let bundle =
|
||||
CollectionBundle::from_collection(Collection::<Batch>::from_port(out_recv_port));
|
||||
Ok(bundle)
|
||||
}
|
||||
|
||||
/// render MapFilterProject, will only emit the `rows` once. Assume all incoming row's sys time being `now`` and ignore the row's stated sys time
|
||||
/// TODO(discord9): schedule mfp operator to run when temporal filter need
|
||||
///
|
||||
|
||||
@@ -14,23 +14,247 @@
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
use std::ops::Range;
|
||||
use std::sync::Arc;
|
||||
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::prelude::DataType;
|
||||
use datatypes::value::{ListValue, Value};
|
||||
use datatypes::vectors::NullVector;
|
||||
use hydroflow::scheduled::graph_ext::GraphExt;
|
||||
use itertools::Itertools;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
|
||||
use crate::compute::render::{Context, SubgraphArg};
|
||||
use crate::compute::types::{Arranged, Collection, CollectionBundle, ErrCollector, Toff};
|
||||
use crate::error::{Error, PlanSnafu};
|
||||
use crate::error::{Error, NotImplementedSnafu, PlanSnafu};
|
||||
use crate::expr::error::{DataAlreadyExpiredSnafu, DataTypeSnafu, InternalSnafu};
|
||||
use crate::expr::{EvalError, ScalarExpr};
|
||||
use crate::expr::{Batch, EvalError, ScalarExpr};
|
||||
use crate::plan::{AccumulablePlan, AggrWithIndex, KeyValPlan, ReducePlan, TypedPlan};
|
||||
use crate::repr::{self, DiffRow, KeyValDiffRow, RelationType, Row};
|
||||
use crate::utils::{ArrangeHandler, ArrangeReader, ArrangeWriter, KeyExpiryManager};
|
||||
|
||||
impl<'referred, 'df> Context<'referred, 'df> {
|
||||
const REDUCE_BATCH: &'static str = "reduce_batch";
|
||||
/// Like `render_reduce`, but for batch mode, and only barebone implementation
|
||||
/// no support for distinct aggregation for now
|
||||
// There is a false positive in using `Vec<ScalarExpr>` as key due to `Value` have `bytes` variant
|
||||
#[allow(clippy::mutable_key_type)]
|
||||
pub fn render_reduce_batch(
|
||||
&mut self,
|
||||
input: Box<TypedPlan>,
|
||||
key_val_plan: &KeyValPlan,
|
||||
reduce_plan: &ReducePlan,
|
||||
output_type: &RelationType,
|
||||
) -> Result<CollectionBundle<Batch>, Error> {
|
||||
let accum_plan = if let ReducePlan::Accumulable(accum_plan) = reduce_plan {
|
||||
if !accum_plan.distinct_aggrs.is_empty() {
|
||||
NotImplementedSnafu {
|
||||
reason: "Distinct aggregation is not supported in batch mode",
|
||||
}
|
||||
.fail()?
|
||||
}
|
||||
accum_plan.clone()
|
||||
} else {
|
||||
NotImplementedSnafu {
|
||||
reason: "Only accumulable reduce plan is supported in batch mode",
|
||||
}
|
||||
.fail()?
|
||||
};
|
||||
|
||||
let input = self.render_plan_batch(*input)?;
|
||||
|
||||
// first assembly key&val to separate key and val columns(since this is batch mode)
|
||||
// Then stream kvs through a reduce operator
|
||||
|
||||
// the output is concat from key and val
|
||||
let output_key_arity = key_val_plan.key_plan.output_arity();
|
||||
|
||||
// TODO(discord9): config global expire time from self
|
||||
let arrange_handler = self.compute_state.new_arrange(None);
|
||||
|
||||
if let (Some(time_index), Some(expire_after)) =
|
||||
(output_type.time_index, self.compute_state.expire_after())
|
||||
{
|
||||
let expire_man =
|
||||
KeyExpiryManager::new(Some(expire_after), Some(ScalarExpr::Column(time_index)));
|
||||
arrange_handler.write().set_expire_state(expire_man);
|
||||
}
|
||||
|
||||
// reduce need full arrangement to be able to query all keys
|
||||
let arrange_handler_inner = arrange_handler.clone_full_arrange().context(PlanSnafu {
|
||||
reason: "No write is expected at this point",
|
||||
})?;
|
||||
let key_val_plan = key_val_plan.clone();
|
||||
|
||||
let now = self.compute_state.current_time_ref();
|
||||
|
||||
let err_collector = self.err_collector.clone();
|
||||
|
||||
// TODO(discord9): better way to schedule future run
|
||||
let scheduler = self.compute_state.get_scheduler();
|
||||
|
||||
let (out_send_port, out_recv_port) =
|
||||
self.df.make_edge::<_, Toff<Batch>>(Self::REDUCE_BATCH);
|
||||
|
||||
let subgraph =
|
||||
self.df.add_subgraph_in_out(
|
||||
Self::REDUCE_BATCH,
|
||||
input.collection.into_inner(),
|
||||
out_send_port,
|
||||
move |_ctx, recv, send| {
|
||||
let now = *(now.borrow());
|
||||
let arrange = arrange_handler_inner.clone();
|
||||
// mfp only need to passively receive updates from recvs
|
||||
let src_data = recv
|
||||
.take_inner()
|
||||
.into_iter()
|
||||
.flat_map(|v| v.into_iter())
|
||||
.collect_vec();
|
||||
|
||||
let mut key_to_many_vals = BTreeMap::<Row, Batch>::new();
|
||||
for batch in src_data {
|
||||
err_collector.run(|| {
|
||||
let (key_batch, val_batch) =
|
||||
batch_split_by_key_val(&batch, &key_val_plan, &err_collector);
|
||||
ensure!(
|
||||
key_batch.row_count() == val_batch.row_count(),
|
||||
InternalSnafu {
|
||||
reason: format!(
|
||||
"Key and val batch should have the same row count, found {} and {}",
|
||||
key_batch.row_count(),
|
||||
val_batch.row_count()
|
||||
)
|
||||
}
|
||||
);
|
||||
|
||||
for row_idx in 0..key_batch.row_count() {
|
||||
let key_row = key_batch.get_row(row_idx).unwrap();
|
||||
let val_row = val_batch.slice(row_idx, 1)?;
|
||||
let val_batch =
|
||||
key_to_many_vals.entry(Row::new(key_row)).or_default();
|
||||
val_batch.append_batch(val_row)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
});
|
||||
}
|
||||
|
||||
// write lock the arrange for the rest of the function body
|
||||
// to prevent wired race condition
|
||||
let mut arrange = arrange.write();
|
||||
let mut all_arrange_updates = Vec::with_capacity(key_to_many_vals.len());
|
||||
let mut all_output_rows = Vec::with_capacity(key_to_many_vals.len());
|
||||
|
||||
for (key, val_batch) in key_to_many_vals {
|
||||
err_collector.run(|| -> Result<(), _> {
|
||||
let (accums, _, _) = arrange.get(now, &key).unwrap_or_default();
|
||||
let accum_list = from_accum_values_to_live_accums(
|
||||
accums.unpack(),
|
||||
accum_plan.simple_aggrs.len(),
|
||||
)?;
|
||||
|
||||
let mut accum_output = AccumOutput::new();
|
||||
for AggrWithIndex {
|
||||
expr,
|
||||
input_idx,
|
||||
output_idx,
|
||||
} in accum_plan.simple_aggrs.iter()
|
||||
{
|
||||
let cur_old_accum = accum_list.get(*output_idx).cloned().unwrap_or_default();
|
||||
// if batch is empty, input null instead
|
||||
let cur_input = val_batch.batch().get(*input_idx).cloned().unwrap_or_else(||Arc::new(NullVector::new(val_batch.row_count())));
|
||||
|
||||
let (output, new_accum) =
|
||||
expr.func.eval_batch(cur_old_accum, cur_input, None)?;
|
||||
|
||||
accum_output.insert_accum(*output_idx, new_accum);
|
||||
accum_output.insert_output(*output_idx, output);
|
||||
}
|
||||
|
||||
let (new_accums, res_val_row) = accum_output.into_accum_output()?;
|
||||
|
||||
let arrange_update = ((key.clone(), Row::new(new_accums)), now, 1);
|
||||
all_arrange_updates.push(arrange_update);
|
||||
|
||||
let mut key_val = key;
|
||||
key_val.extend(res_val_row);
|
||||
all_output_rows.push((key_val, now, 1));
|
||||
|
||||
Ok(())
|
||||
});
|
||||
}
|
||||
|
||||
err_collector.run(|| {
|
||||
arrange.apply_updates(now, all_arrange_updates)?;
|
||||
arrange.compact_to(now)
|
||||
});
|
||||
|
||||
// this output part is not supposed to be resource intensive
|
||||
// (because for every batch there wouldn't usually be as many output row?),
|
||||
// so we can do some costly operation here
|
||||
let output_types = all_output_rows.first().map(|(row, _, _)| {
|
||||
row.iter()
|
||||
.map(|v| v.data_type())
|
||||
.collect::<Vec<ConcreteDataType>>()
|
||||
});
|
||||
|
||||
if let Some(output_types) = output_types {
|
||||
err_collector.run(|| {
|
||||
let column_cnt = output_types.len();
|
||||
let row_cnt = all_output_rows.len();
|
||||
|
||||
let mut output_builder = output_types
|
||||
.into_iter()
|
||||
.map(|t| t.create_mutable_vector(row_cnt))
|
||||
.collect_vec();
|
||||
|
||||
for (row, _, _) in all_output_rows {
|
||||
for (i, v) in row.into_iter().enumerate() {
|
||||
output_builder
|
||||
.get_mut(i)
|
||||
.context(InternalSnafu{
|
||||
reason: format!(
|
||||
"Output builder should have the same length as the row, expected at most {} but got {}",
|
||||
column_cnt-1,
|
||||
i
|
||||
)
|
||||
})?
|
||||
.try_push_value_ref(v.as_value_ref())
|
||||
.context(DataTypeSnafu {
|
||||
msg: "Failed to push value",
|
||||
})?;
|
||||
}
|
||||
}
|
||||
|
||||
let output_columns = output_builder
|
||||
.into_iter()
|
||||
.map(|mut b| b.to_vector())
|
||||
.collect_vec();
|
||||
|
||||
let output_batch = Batch::try_new(output_columns, row_cnt)?;
|
||||
send.give(vec![output_batch]);
|
||||
|
||||
Ok(())
|
||||
});
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
scheduler.set_cur_subgraph(subgraph);
|
||||
|
||||
// by default the key of output arrange
|
||||
let arranged = BTreeMap::from([(
|
||||
(0..output_key_arity).map(ScalarExpr::Column).collect_vec(),
|
||||
Arranged::new(arrange_handler),
|
||||
)]);
|
||||
|
||||
let bundle = CollectionBundle {
|
||||
collection: Collection::from_port(out_recv_port),
|
||||
arranged,
|
||||
};
|
||||
Ok(bundle)
|
||||
}
|
||||
|
||||
const REDUCE: &'static str = "reduce";
|
||||
/// render `Plan::Reduce` into executable dataflow
|
||||
// There is a false positive in using `Vec<ScalarExpr>` as key due to `Value` have `bytes` variant
|
||||
@@ -151,6 +375,18 @@ impl<'referred, 'df> Context<'referred, 'df> {
|
||||
}
|
||||
}
|
||||
|
||||
fn from_accum_values_to_live_accums(
|
||||
accums: Vec<Value>,
|
||||
len: usize,
|
||||
) -> Result<Vec<Vec<Value>>, EvalError> {
|
||||
let accum_ranges = from_val_to_slice_idx(accums.first().cloned(), len)?;
|
||||
let mut accum_list = vec![];
|
||||
for range in accum_ranges.iter() {
|
||||
accum_list.push(accums.get(range.clone()).unwrap_or_default().to_vec());
|
||||
}
|
||||
Ok(accum_list)
|
||||
}
|
||||
|
||||
/// All arrange(aka state) used in reduce operator
|
||||
pub struct ReduceArrange {
|
||||
/// The output arrange of reduce operator
|
||||
@@ -160,33 +396,40 @@ pub struct ReduceArrange {
|
||||
distinct_input: Option<Vec<ArrangeHandler>>,
|
||||
}
|
||||
|
||||
/// split a row into key and val by evaluate the key and val plan
|
||||
fn split_row_to_key_val(
|
||||
row: Row,
|
||||
sys_time: repr::Timestamp,
|
||||
diff: repr::Diff,
|
||||
fn batch_split_by_key_val(
|
||||
batch: &Batch,
|
||||
key_val_plan: &KeyValPlan,
|
||||
row_buf: &mut Row,
|
||||
) -> Result<Option<KeyValDiffRow>, EvalError> {
|
||||
if let Some(key) = key_val_plan
|
||||
.key_plan
|
||||
.evaluate_into(&mut row.inner.clone(), row_buf)?
|
||||
{
|
||||
// val_plan is not supported to carry any filter predicate,
|
||||
let val = key_val_plan
|
||||
.val_plan
|
||||
.evaluate_into(&mut row.inner.clone(), row_buf)?
|
||||
.context(InternalSnafu {
|
||||
reason: "val_plan should not contain any filter predicate",
|
||||
})?;
|
||||
Ok(Some(((key, val), sys_time, diff)))
|
||||
} else {
|
||||
Ok(None)
|
||||
err_collector: &ErrCollector,
|
||||
) -> (Batch, Batch) {
|
||||
let row_count = batch.row_count();
|
||||
let mut key_batch = Batch::empty();
|
||||
let mut val_batch = Batch::empty();
|
||||
|
||||
err_collector.run(|| {
|
||||
if key_val_plan.key_plan.output_arity() != 0 {
|
||||
key_batch = key_val_plan.key_plan.eval_batch_into(&mut batch.clone())?;
|
||||
}
|
||||
|
||||
if key_val_plan.val_plan.output_arity() != 0 {
|
||||
val_batch = key_val_plan.val_plan.eval_batch_into(&mut batch.clone())?;
|
||||
}
|
||||
Ok(())
|
||||
});
|
||||
|
||||
// deal with empty key or val
|
||||
if key_batch.row_count() == 0 && key_batch.column_count() == 0 {
|
||||
key_batch.set_row_count(row_count);
|
||||
}
|
||||
|
||||
if val_batch.row_count() == 0 && val_batch.column_count() == 0 {
|
||||
val_batch.set_row_count(row_count);
|
||||
}
|
||||
|
||||
(key_batch, val_batch)
|
||||
}
|
||||
|
||||
/// split a row into key and val by evaluate the key and val plan
|
||||
fn batch_split_rows_to_key_val(
|
||||
fn split_rows_to_key_val(
|
||||
rows: impl IntoIterator<Item = DiffRow>,
|
||||
key_val_plan: KeyValPlan,
|
||||
err_collector: ErrCollector,
|
||||
@@ -235,7 +478,7 @@ fn reduce_subgraph(
|
||||
send,
|
||||
}: SubgraphArg,
|
||||
) {
|
||||
let key_val = batch_split_rows_to_key_val(data, key_val_plan.clone(), err_collector.clone());
|
||||
let key_val = split_rows_to_key_val(data, key_val_plan.clone(), err_collector.clone());
|
||||
// from here for distinct reduce and accum reduce, things are drastically different
|
||||
// for distinct reduce the arrange store the output,
|
||||
// but for accum reduce the arrange store the accum state, and output is
|
||||
@@ -1127,6 +1370,105 @@ mod test {
|
||||
run_and_check(&mut state, &mut df, 6..7, expected, output);
|
||||
}
|
||||
|
||||
/// Batch Mode Reduce Evaluation
|
||||
/// SELECT SUM(col) FROM table
|
||||
///
|
||||
/// table schema:
|
||||
/// | name | type |
|
||||
/// |------|-------|
|
||||
/// | col | Int64 |
|
||||
#[test]
|
||||
fn test_basic_batch_reduce_accum() {
|
||||
let mut df = Hydroflow::new();
|
||||
let mut state = DataflowState::default();
|
||||
let now = state.current_time_ref();
|
||||
let mut ctx = harness_test_ctx(&mut df, &mut state);
|
||||
|
||||
let rows = vec![
|
||||
(Row::new(vec![1i64.into()]), 1, 1),
|
||||
(Row::new(vec![2i64.into()]), 2, 1),
|
||||
(Row::new(vec![3i64.into()]), 3, 1),
|
||||
(Row::new(vec![1i64.into()]), 4, 1),
|
||||
(Row::new(vec![2i64.into()]), 5, 1),
|
||||
(Row::new(vec![3i64.into()]), 6, 1),
|
||||
];
|
||||
let input_plan = Plan::Constant { rows: rows.clone() };
|
||||
|
||||
let typ = RelationType::new(vec![ColumnType::new_nullable(
|
||||
ConcreteDataType::int64_datatype(),
|
||||
)]);
|
||||
let key_val_plan = KeyValPlan {
|
||||
key_plan: MapFilterProject::new(1).project([]).unwrap().into_safe(),
|
||||
val_plan: MapFilterProject::new(1).project([0]).unwrap().into_safe(),
|
||||
};
|
||||
|
||||
let simple_aggrs = vec![AggrWithIndex::new(
|
||||
AggregateExpr {
|
||||
func: AggregateFunc::SumInt64,
|
||||
expr: ScalarExpr::Column(0),
|
||||
distinct: false,
|
||||
},
|
||||
0,
|
||||
0,
|
||||
)];
|
||||
let accum_plan = AccumulablePlan {
|
||||
full_aggrs: vec![AggregateExpr {
|
||||
func: AggregateFunc::SumInt64,
|
||||
expr: ScalarExpr::Column(0),
|
||||
distinct: false,
|
||||
}],
|
||||
simple_aggrs,
|
||||
distinct_aggrs: vec![],
|
||||
};
|
||||
|
||||
let reduce_plan = ReducePlan::Accumulable(accum_plan);
|
||||
let bundle = ctx
|
||||
.render_reduce_batch(
|
||||
Box::new(input_plan.with_types(typ.into_unnamed())),
|
||||
&key_val_plan,
|
||||
&reduce_plan,
|
||||
&RelationType::empty(),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
{
|
||||
let now_inner = now.clone();
|
||||
let expected = BTreeMap::<i64, Vec<i64>>::from([
|
||||
(1, vec![1i64]),
|
||||
(2, vec![3i64]),
|
||||
(3, vec![6i64]),
|
||||
(4, vec![7i64]),
|
||||
(5, vec![9i64]),
|
||||
(6, vec![12i64]),
|
||||
]);
|
||||
let collection = bundle.collection;
|
||||
ctx.df
|
||||
.add_subgraph_sink("test_sink", collection.into_inner(), move |_ctx, recv| {
|
||||
let now = *now_inner.borrow();
|
||||
let data = recv.take_inner();
|
||||
let res = data.into_iter().flat_map(|v| v.into_iter()).collect_vec();
|
||||
|
||||
if let Some(expected) = expected.get(&now) {
|
||||
let batch = expected.iter().map(|v| Value::from(*v)).collect_vec();
|
||||
let batch = Batch::try_from_rows(vec![batch.into()]).unwrap();
|
||||
assert_eq!(res.first(), Some(&batch));
|
||||
}
|
||||
});
|
||||
drop(ctx);
|
||||
|
||||
for now in 1..7 {
|
||||
state.set_current_ts(now);
|
||||
state.run_available_with_schedule(&mut df);
|
||||
if !state.get_err_collector().is_empty() {
|
||||
panic!(
|
||||
"Errors occur: {:?}",
|
||||
state.get_err_collector().get_all_blocking()
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// SELECT SUM(col) FROM table
|
||||
///
|
||||
/// table schema:
|
||||
|
||||
@@ -27,11 +27,67 @@ use crate::compute::render::Context;
|
||||
use crate::compute::types::{Arranged, Collection, CollectionBundle, Toff};
|
||||
use crate::error::{Error, PlanSnafu};
|
||||
use crate::expr::error::InternalSnafu;
|
||||
use crate::expr::EvalError;
|
||||
use crate::expr::{Batch, EvalError};
|
||||
use crate::repr::{DiffRow, Row, BROADCAST_CAP};
|
||||
|
||||
#[allow(clippy::mutable_key_type)]
|
||||
impl<'referred, 'df> Context<'referred, 'df> {
|
||||
/// simply send the batch to downstream, without fancy features like buffering
|
||||
pub fn render_source_batch(
|
||||
&mut self,
|
||||
mut src_recv: broadcast::Receiver<Batch>,
|
||||
) -> Result<CollectionBundle<Batch>, Error> {
|
||||
debug!("Rendering Source Batch");
|
||||
let (send_port, recv_port) = self.df.make_edge::<_, Toff<Batch>>("source_batch");
|
||||
|
||||
let schd = self.compute_state.get_scheduler();
|
||||
let inner_schd = schd.clone();
|
||||
let now = self.compute_state.current_time_ref();
|
||||
let err_collector = self.err_collector.clone();
|
||||
|
||||
let sub = self
|
||||
.df
|
||||
.add_subgraph_source("source_batch", send_port, move |_ctx, send| {
|
||||
loop {
|
||||
match src_recv.try_recv() {
|
||||
Ok(batch) => {
|
||||
send.give(vec![batch]);
|
||||
}
|
||||
Err(TryRecvError::Empty) => {
|
||||
break;
|
||||
}
|
||||
Err(TryRecvError::Lagged(lag_offset)) => {
|
||||
// use `err_collector` instead of `error!` to locate which operator caused the error
|
||||
err_collector.run(|| -> Result<(), EvalError> {
|
||||
InternalSnafu {
|
||||
reason: format!("Flow missing {} rows behind", lag_offset),
|
||||
}
|
||||
.fail()
|
||||
});
|
||||
break;
|
||||
}
|
||||
Err(TryRecvError::Closed) => {
|
||||
err_collector.run(|| -> Result<(), EvalError> {
|
||||
InternalSnafu {
|
||||
reason: "Source Batch Channel is closed".to_string(),
|
||||
}
|
||||
.fail()
|
||||
});
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let now = *now.borrow();
|
||||
// always schedule source to run at now so we can
|
||||
// repeatedly run source if needed
|
||||
inner_schd.schedule_at(now);
|
||||
});
|
||||
schd.set_cur_subgraph(sub);
|
||||
let bundle = CollectionBundle::from_collection(Collection::<Batch>::from_port(recv_port));
|
||||
Ok(bundle)
|
||||
}
|
||||
|
||||
/// Render a source which comes from brocast channel into the dataflow
|
||||
/// will immediately send updates not greater than `now` and buffer the rest in arrangement
|
||||
pub fn render_source(
|
||||
@@ -114,6 +170,32 @@ impl<'referred, 'df> Context<'referred, 'df> {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn render_unbounded_sink_batch(
|
||||
&mut self,
|
||||
bundle: CollectionBundle<Batch>,
|
||||
sender: mpsc::UnboundedSender<Batch>,
|
||||
) {
|
||||
let CollectionBundle {
|
||||
collection,
|
||||
arranged: _,
|
||||
} = bundle;
|
||||
|
||||
let _sink = self.df.add_subgraph_sink(
|
||||
"UnboundedSinkBatch",
|
||||
collection.into_inner(),
|
||||
move |_ctx, recv| {
|
||||
let data = recv.take_inner();
|
||||
for batch in data.into_iter().flat_map(|i| i.into_iter()) {
|
||||
// if the sender is closed unexpectedly, stop sending
|
||||
if sender.is_closed() || sender.send(batch).is_err() {
|
||||
common_telemetry::error!("UnboundedSinkBatch is closed");
|
||||
break;
|
||||
}
|
||||
}
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
pub fn render_unbounded_sink(
|
||||
&mut self,
|
||||
bundle: CollectionBundle,
|
||||
|
||||
@@ -105,11 +105,13 @@ impl Arranged {
|
||||
/// This type maintains the invariant that it does contain at least one(or both) valid
|
||||
/// source of data, either a collection or at least one arrangement. This is for convenience
|
||||
/// of reading the data from the collection.
|
||||
pub struct CollectionBundle {
|
||||
///
|
||||
// TODO(discord9): make T default to Batch and obsolete the Row Mode
|
||||
pub struct CollectionBundle<T: 'static = DiffRow> {
|
||||
/// This is useful for passively reading the new updates from the collection
|
||||
///
|
||||
/// Invariant: the timestamp of the updates should always not greater than now, since future updates should be stored in the arrangement
|
||||
pub collection: Collection<DiffRow>,
|
||||
pub collection: Collection<T>,
|
||||
/// the key [`ScalarExpr`] indicate how the keys(also a [`Row`]) used in Arranged is extract from collection's [`Row`]
|
||||
/// So it is the "index" of the arrangement
|
||||
///
|
||||
@@ -121,13 +123,16 @@ pub struct CollectionBundle {
|
||||
pub arranged: BTreeMap<Vec<ScalarExpr>, Arranged>,
|
||||
}
|
||||
|
||||
impl CollectionBundle {
|
||||
pub fn from_collection(collection: Collection<DiffRow>) -> Self {
|
||||
impl<T: 'static> CollectionBundle<T> {
|
||||
pub fn from_collection(collection: Collection<T>) -> Self {
|
||||
Self {
|
||||
collection,
|
||||
arranged: BTreeMap::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: 'static + Clone> CollectionBundle<T> {
|
||||
pub fn clone(&self, df: &mut Hydroflow) -> Self {
|
||||
Self {
|
||||
collection: self.collection.clone(df),
|
||||
|
||||
604
src/flow/src/df_optimizer.rs
Normal file
604
src/flow/src/df_optimizer.rs
Normal file
@@ -0,0 +1,604 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Datafusion optimizer for flow plan
|
||||
|
||||
#![warn(unused)]
|
||||
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_error::ext::BoxedError;
|
||||
use common_telemetry::debug;
|
||||
use datafusion::config::ConfigOptions;
|
||||
use datafusion::error::DataFusionError;
|
||||
use datafusion::optimizer::analyzer::type_coercion::TypeCoercion;
|
||||
use datafusion::optimizer::common_subexpr_eliminate::CommonSubexprEliminate;
|
||||
use datafusion::optimizer::optimize_projections::OptimizeProjections;
|
||||
use datafusion::optimizer::simplify_expressions::SimplifyExpressions;
|
||||
use datafusion::optimizer::unwrap_cast_in_comparison::UnwrapCastInComparison;
|
||||
use datafusion::optimizer::utils::NamePreserver;
|
||||
use datafusion::optimizer::{Analyzer, AnalyzerRule, Optimizer, OptimizerContext};
|
||||
use datafusion_common::tree_node::{
|
||||
Transformed, TreeNode, TreeNodeRecursion, TreeNodeRewriter, TreeNodeVisitor,
|
||||
};
|
||||
use datafusion_common::{Column, DFSchema, ScalarValue};
|
||||
use datafusion_expr::aggregate_function::AggregateFunction;
|
||||
use datafusion_expr::expr::AggregateFunctionDefinition;
|
||||
use datafusion_expr::utils::merge_schema;
|
||||
use datafusion_expr::{
|
||||
BinaryExpr, Expr, Operator, Projection, ScalarUDFImpl, Signature, TypeSignature, Volatility,
|
||||
};
|
||||
use query::parser::QueryLanguageParser;
|
||||
use query::plan::LogicalPlan;
|
||||
use query::query_engine::DefaultSerializer;
|
||||
use query::QueryEngine;
|
||||
use snafu::ResultExt;
|
||||
/// note here we are using the `substrait_proto_df` crate from the `substrait` module and
|
||||
/// rename it to `substrait_proto`
|
||||
use substrait::DFLogicalSubstraitConvertor;
|
||||
|
||||
use crate::adapter::FlownodeContext;
|
||||
use crate::error::{DatafusionSnafu, Error, ExternalSnafu, UnexpectedSnafu};
|
||||
use crate::expr::{TUMBLE_END, TUMBLE_START};
|
||||
use crate::plan::TypedPlan;
|
||||
|
||||
// TODO(discord9): use `Analyzer` to manage rules if more `AnalyzerRule` is needed
|
||||
pub async fn apply_df_optimizer(
|
||||
plan: datafusion_expr::LogicalPlan,
|
||||
) -> Result<datafusion_expr::LogicalPlan, Error> {
|
||||
let cfg = ConfigOptions::new();
|
||||
let analyzer = Analyzer::with_rules(vec![
|
||||
Arc::new(AvgExpandRule::new()),
|
||||
Arc::new(TumbleExpandRule::new()),
|
||||
Arc::new(CheckGroupByRule::new()),
|
||||
Arc::new(TypeCoercion::new()),
|
||||
]);
|
||||
let plan = analyzer
|
||||
.execute_and_check(plan, &cfg, |p, r| {
|
||||
debug!("After apply rule {}, get plan: \n{:?}", r.name(), p);
|
||||
})
|
||||
.context(DatafusionSnafu {
|
||||
context: "Fail to apply analyzer",
|
||||
})?;
|
||||
|
||||
let ctx = OptimizerContext::new();
|
||||
let optimizer = Optimizer::with_rules(vec![
|
||||
Arc::new(OptimizeProjections::new()),
|
||||
Arc::new(CommonSubexprEliminate::new()),
|
||||
Arc::new(SimplifyExpressions::new()),
|
||||
Arc::new(UnwrapCastInComparison::new()),
|
||||
]);
|
||||
let plan = optimizer
|
||||
.optimize(plan, &ctx, |_, _| {})
|
||||
.context(DatafusionSnafu {
|
||||
context: "Fail to apply optimizer",
|
||||
})?;
|
||||
|
||||
Ok(plan)
|
||||
}
|
||||
|
||||
/// To reuse existing code for parse sql, the sql is first parsed into a datafusion logical plan,
|
||||
/// then to a substrait plan, and finally to a flow plan.
|
||||
pub async fn sql_to_flow_plan(
|
||||
ctx: &mut FlownodeContext,
|
||||
engine: &Arc<dyn QueryEngine>,
|
||||
sql: &str,
|
||||
) -> Result<TypedPlan, Error> {
|
||||
let query_ctx = ctx.query_context.clone().ok_or_else(|| {
|
||||
UnexpectedSnafu {
|
||||
reason: "Query context is missing",
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
let stmt = QueryLanguageParser::parse_sql(sql, &query_ctx)
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
let plan = engine
|
||||
.planner()
|
||||
.plan(stmt, query_ctx)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
let LogicalPlan::DfPlan(plan) = plan;
|
||||
|
||||
let opted_plan = apply_df_optimizer(plan).await?;
|
||||
|
||||
// TODO(discord9): add df optimization
|
||||
let sub_plan = DFLogicalSubstraitConvertor {}
|
||||
.to_sub_plan(&opted_plan, DefaultSerializer)
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
|
||||
let flow_plan = TypedPlan::from_substrait_plan(ctx, &sub_plan).await?;
|
||||
|
||||
Ok(flow_plan)
|
||||
}
|
||||
|
||||
struct AvgExpandRule {}
|
||||
|
||||
impl AvgExpandRule {
|
||||
pub fn new() -> Self {
|
||||
Self {}
|
||||
}
|
||||
}
|
||||
|
||||
impl AnalyzerRule for AvgExpandRule {
|
||||
fn analyze(
|
||||
&self,
|
||||
plan: datafusion_expr::LogicalPlan,
|
||||
_config: &ConfigOptions,
|
||||
) -> datafusion_common::Result<datafusion_expr::LogicalPlan> {
|
||||
let transformed = plan
|
||||
.transform_up_with_subqueries(expand_avg_analyzer)?
|
||||
.data
|
||||
.transform_down_with_subqueries(put_aggr_to_proj_analyzer)?
|
||||
.data;
|
||||
Ok(transformed)
|
||||
}
|
||||
|
||||
fn name(&self) -> &str {
|
||||
"avg_expand"
|
||||
}
|
||||
}
|
||||
|
||||
/// lift aggr's composite aggr_expr to outer proj, and leave aggr only with simple direct aggr expr
|
||||
/// i.e.
|
||||
/// ```ignore
|
||||
/// proj: avg(x)
|
||||
/// -- aggr: [sum(x)/count(x) as avg(x)]
|
||||
/// ```
|
||||
/// becomes:
|
||||
/// ```ignore
|
||||
/// proj: sum(x)/count(x) as avg(x)
|
||||
/// -- aggr: [sum(x), count(x)]
|
||||
/// ```
|
||||
fn put_aggr_to_proj_analyzer(
|
||||
plan: datafusion_expr::LogicalPlan,
|
||||
) -> Result<Transformed<datafusion_expr::LogicalPlan>, DataFusionError> {
|
||||
if let datafusion_expr::LogicalPlan::Projection(proj) = &plan {
|
||||
if let datafusion_expr::LogicalPlan::Aggregate(aggr) = proj.input.as_ref() {
|
||||
let mut replace_old_proj_exprs = HashMap::new();
|
||||
let mut expanded_aggr_exprs = vec![];
|
||||
for aggr_expr in &aggr.aggr_expr {
|
||||
let mut is_composite = false;
|
||||
if let Expr::AggregateFunction(_) = &aggr_expr {
|
||||
expanded_aggr_exprs.push(aggr_expr.clone());
|
||||
} else {
|
||||
let old_name = aggr_expr.name_for_alias()?;
|
||||
let new_proj_expr = aggr_expr
|
||||
.clone()
|
||||
.transform(|ch| {
|
||||
if let Expr::AggregateFunction(_) = &ch {
|
||||
is_composite = true;
|
||||
expanded_aggr_exprs.push(ch.clone());
|
||||
Ok(Transformed::yes(Expr::Column(Column::from_qualified_name(
|
||||
ch.name_for_alias()?,
|
||||
))))
|
||||
} else {
|
||||
Ok(Transformed::no(ch))
|
||||
}
|
||||
})?
|
||||
.data;
|
||||
replace_old_proj_exprs.insert(old_name, new_proj_expr);
|
||||
}
|
||||
}
|
||||
|
||||
if expanded_aggr_exprs.len() > aggr.aggr_expr.len() {
|
||||
let mut aggr = aggr.clone();
|
||||
aggr.aggr_expr = expanded_aggr_exprs;
|
||||
let mut aggr_plan = datafusion_expr::LogicalPlan::Aggregate(aggr);
|
||||
// important to recompute schema after changing aggr_expr
|
||||
aggr_plan = aggr_plan.recompute_schema()?;
|
||||
|
||||
// reconstruct proj with new proj_exprs
|
||||
let mut new_proj_exprs = proj.expr.clone();
|
||||
for proj_expr in new_proj_exprs.iter_mut() {
|
||||
if let Some(new_proj_expr) =
|
||||
replace_old_proj_exprs.get(&proj_expr.name_for_alias()?)
|
||||
{
|
||||
*proj_expr = new_proj_expr.clone();
|
||||
}
|
||||
*proj_expr = proj_expr
|
||||
.clone()
|
||||
.transform(|expr| {
|
||||
if let Some(new_expr) =
|
||||
replace_old_proj_exprs.get(&expr.name_for_alias()?)
|
||||
{
|
||||
Ok(Transformed::yes(new_expr.clone()))
|
||||
} else {
|
||||
Ok(Transformed::no(expr))
|
||||
}
|
||||
})?
|
||||
.data;
|
||||
}
|
||||
let proj = datafusion_expr::LogicalPlan::Projection(Projection::try_new(
|
||||
new_proj_exprs,
|
||||
Arc::new(aggr_plan),
|
||||
)?);
|
||||
return Ok(Transformed::yes(proj));
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(Transformed::no(plan))
|
||||
}
|
||||
|
||||
/// expand `avg(<expr>)` function into `cast(sum((<expr>) AS f64)/count((<expr>)`
|
||||
fn expand_avg_analyzer(
|
||||
plan: datafusion_expr::LogicalPlan,
|
||||
) -> Result<Transformed<datafusion_expr::LogicalPlan>, DataFusionError> {
|
||||
let mut schema = merge_schema(plan.inputs());
|
||||
|
||||
if let datafusion_expr::LogicalPlan::TableScan(ts) = &plan {
|
||||
let source_schema =
|
||||
DFSchema::try_from_qualified_schema(ts.table_name.clone(), &ts.source.schema())?;
|
||||
schema.merge(&source_schema);
|
||||
}
|
||||
|
||||
let mut expr_rewrite = ExpandAvgRewriter::new(&schema);
|
||||
|
||||
let name_preserver = NamePreserver::new(&plan);
|
||||
// apply coercion rewrite all expressions in the plan individually
|
||||
plan.map_expressions(|expr| {
|
||||
let original_name = name_preserver.save(&expr)?;
|
||||
expr.rewrite(&mut expr_rewrite)?
|
||||
.map_data(|expr| original_name.restore(expr))
|
||||
})?
|
||||
.map_data(|plan| plan.recompute_schema())
|
||||
}
|
||||
|
||||
/// rewrite `avg(<expr>)` function into `CASE WHEN count(<expr>) !=0 THEN cast(sum((<expr>) AS avg_return_type)/count((<expr>) ELSE 0`
|
||||
///
|
||||
/// TODO(discord9): support avg return type decimal128
|
||||
///
|
||||
/// see impl details at https://github.com/apache/datafusion/blob/4ad4f90d86c57226a4e0fb1f79dfaaf0d404c273/datafusion/expr/src/type_coercion/aggregates.rs#L457-L462
|
||||
pub(crate) struct ExpandAvgRewriter<'a> {
|
||||
/// schema of the plan
|
||||
#[allow(unused)]
|
||||
pub(crate) schema: &'a DFSchema,
|
||||
}
|
||||
|
||||
impl<'a> ExpandAvgRewriter<'a> {
|
||||
fn new(schema: &'a DFSchema) -> Self {
|
||||
Self { schema }
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> TreeNodeRewriter for ExpandAvgRewriter<'a> {
|
||||
type Node = Expr;
|
||||
|
||||
fn f_up(&mut self, expr: Expr) -> Result<Transformed<Expr>, DataFusionError> {
|
||||
if let Expr::AggregateFunction(aggr_func) = &expr {
|
||||
if let AggregateFunctionDefinition::BuiltIn(AggregateFunction::Avg) =
|
||||
&aggr_func.func_def
|
||||
{
|
||||
let sum_expr = {
|
||||
let mut tmp = aggr_func.clone();
|
||||
tmp.func_def = AggregateFunctionDefinition::BuiltIn(AggregateFunction::Sum);
|
||||
Expr::AggregateFunction(tmp)
|
||||
};
|
||||
let sum_cast = {
|
||||
let mut tmp = sum_expr.clone();
|
||||
tmp = Expr::Cast(datafusion_expr::Cast {
|
||||
expr: Box::new(tmp),
|
||||
data_type: arrow_schema::DataType::Float64,
|
||||
});
|
||||
tmp
|
||||
};
|
||||
|
||||
let count_expr = {
|
||||
let mut tmp = aggr_func.clone();
|
||||
tmp.func_def = AggregateFunctionDefinition::BuiltIn(AggregateFunction::Count);
|
||||
|
||||
Expr::AggregateFunction(tmp)
|
||||
};
|
||||
let count_expr_ref =
|
||||
Expr::Column(Column::from_qualified_name(count_expr.name_for_alias()?));
|
||||
|
||||
let div =
|
||||
BinaryExpr::new(Box::new(sum_cast), Operator::Divide, Box::new(count_expr));
|
||||
let div_expr = Box::new(Expr::BinaryExpr(div));
|
||||
|
||||
let zero = Box::new(Expr::Literal(ScalarValue::Int64(Some(0))));
|
||||
let not_zero =
|
||||
BinaryExpr::new(Box::new(count_expr_ref), Operator::NotEq, zero.clone());
|
||||
let not_zero = Box::new(Expr::BinaryExpr(not_zero));
|
||||
let null = Box::new(Expr::Literal(ScalarValue::Null));
|
||||
|
||||
let case_when =
|
||||
datafusion_expr::Case::new(None, vec![(not_zero, div_expr)], Some(null));
|
||||
let case_when_expr = Expr::Case(case_when);
|
||||
|
||||
return Ok(Transformed::yes(case_when_expr));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Transformed::no(expr))
|
||||
}
|
||||
}
|
||||
|
||||
/// expand tumble in aggr expr to tumble_start and tumble_end with column name like `window_start`
|
||||
struct TumbleExpandRule {}
|
||||
|
||||
impl TumbleExpandRule {
|
||||
pub fn new() -> Self {
|
||||
Self {}
|
||||
}
|
||||
}
|
||||
|
||||
impl AnalyzerRule for TumbleExpandRule {
|
||||
fn analyze(
|
||||
&self,
|
||||
plan: datafusion_expr::LogicalPlan,
|
||||
_config: &ConfigOptions,
|
||||
) -> datafusion_common::Result<datafusion_expr::LogicalPlan> {
|
||||
let transformed = plan
|
||||
.transform_up_with_subqueries(expand_tumble_analyzer)?
|
||||
.data;
|
||||
Ok(transformed)
|
||||
}
|
||||
|
||||
fn name(&self) -> &str {
|
||||
"tumble_expand"
|
||||
}
|
||||
}
|
||||
|
||||
/// expand `tumble` in aggr expr to `tumble_start` and `tumble_end`, also expand related alias and column ref
|
||||
///
|
||||
/// will add `tumble_start` and `tumble_end` to outer projection if not exist before
|
||||
fn expand_tumble_analyzer(
|
||||
plan: datafusion_expr::LogicalPlan,
|
||||
) -> Result<Transformed<datafusion_expr::LogicalPlan>, DataFusionError> {
|
||||
if let datafusion_expr::LogicalPlan::Projection(proj) = &plan {
|
||||
if let datafusion_expr::LogicalPlan::Aggregate(aggr) = proj.input.as_ref() {
|
||||
let mut new_group_expr = vec![];
|
||||
let mut alias_to_expand = HashMap::new();
|
||||
let mut encountered_tumble = false;
|
||||
for expr in aggr.group_expr.iter() {
|
||||
match expr {
|
||||
datafusion_expr::Expr::ScalarFunction(func) if func.name() == "tumble" => {
|
||||
encountered_tumble = true;
|
||||
|
||||
let tumble_start = TumbleExpand::new(TUMBLE_START);
|
||||
let tumble_start = datafusion_expr::expr::ScalarFunction::new_udf(
|
||||
Arc::new(tumble_start.into()),
|
||||
func.args.clone(),
|
||||
);
|
||||
let tumble_start = datafusion_expr::Expr::ScalarFunction(tumble_start);
|
||||
let start_col_name = tumble_start.name_for_alias()?;
|
||||
new_group_expr.push(tumble_start);
|
||||
|
||||
let tumble_end = TumbleExpand::new(TUMBLE_END);
|
||||
let tumble_end = datafusion_expr::expr::ScalarFunction::new_udf(
|
||||
Arc::new(tumble_end.into()),
|
||||
func.args.clone(),
|
||||
);
|
||||
let tumble_end = datafusion_expr::Expr::ScalarFunction(tumble_end);
|
||||
let end_col_name = tumble_end.name_for_alias()?;
|
||||
new_group_expr.push(tumble_end);
|
||||
|
||||
alias_to_expand
|
||||
.insert(expr.name_for_alias()?, (start_col_name, end_col_name));
|
||||
}
|
||||
_ => new_group_expr.push(expr.clone()),
|
||||
}
|
||||
}
|
||||
if !encountered_tumble {
|
||||
return Ok(Transformed::no(plan));
|
||||
}
|
||||
let mut new_aggr = aggr.clone();
|
||||
new_aggr.group_expr = new_group_expr;
|
||||
let new_aggr = datafusion_expr::LogicalPlan::Aggregate(new_aggr).recompute_schema()?;
|
||||
// replace alias in projection if needed, and add new column ref if necessary
|
||||
let mut new_proj_expr = vec![];
|
||||
let mut have_expanded = false;
|
||||
|
||||
for proj_expr in proj.expr.iter() {
|
||||
if let Some((start_col_name, end_col_name)) =
|
||||
alias_to_expand.get(&proj_expr.name_for_alias()?)
|
||||
{
|
||||
let start_col = Column::from_qualified_name(start_col_name);
|
||||
let end_col = Column::from_qualified_name(end_col_name);
|
||||
new_proj_expr.push(datafusion_expr::Expr::Column(start_col));
|
||||
new_proj_expr.push(datafusion_expr::Expr::Column(end_col));
|
||||
have_expanded = true;
|
||||
} else {
|
||||
new_proj_expr.push(proj_expr.clone());
|
||||
}
|
||||
}
|
||||
|
||||
// append to end of projection if not exist
|
||||
if !have_expanded {
|
||||
for (start_col_name, end_col_name) in alias_to_expand.values() {
|
||||
let start_col = Column::from_qualified_name(start_col_name);
|
||||
let end_col = Column::from_qualified_name(end_col_name);
|
||||
new_proj_expr
|
||||
.push(datafusion_expr::Expr::Column(start_col).alias("window_start"));
|
||||
new_proj_expr.push(datafusion_expr::Expr::Column(end_col).alias("window_end"));
|
||||
}
|
||||
}
|
||||
|
||||
let new_proj = datafusion_expr::LogicalPlan::Projection(Projection::try_new(
|
||||
new_proj_expr,
|
||||
Arc::new(new_aggr),
|
||||
)?);
|
||||
return Ok(Transformed::yes(new_proj));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Transformed::no(plan))
|
||||
}
|
||||
|
||||
/// This is a placeholder for tumble_start and tumble_end function, so that datafusion can
|
||||
/// recognize them as scalar function
|
||||
#[derive(Debug)]
|
||||
pub struct TumbleExpand {
|
||||
signature: Signature,
|
||||
name: String,
|
||||
}
|
||||
|
||||
impl TumbleExpand {
|
||||
pub fn new(name: &str) -> Self {
|
||||
Self {
|
||||
signature: Signature::new(TypeSignature::UserDefined, Volatility::Immutable),
|
||||
name: name.to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ScalarUDFImpl for TumbleExpand {
|
||||
fn as_any(&self) -> &dyn std::any::Any {
|
||||
self
|
||||
}
|
||||
|
||||
fn name(&self) -> &str {
|
||||
&self.name
|
||||
}
|
||||
|
||||
/// elide the signature for now
|
||||
fn signature(&self) -> &Signature {
|
||||
&self.signature
|
||||
}
|
||||
|
||||
fn coerce_types(
|
||||
&self,
|
||||
arg_types: &[arrow_schema::DataType],
|
||||
) -> datafusion_common::Result<Vec<arrow_schema::DataType>> {
|
||||
match (arg_types.first(), arg_types.get(1), arg_types.get(2)) {
|
||||
(Some(ts), Some(window), opt) => {
|
||||
use arrow_schema::DataType::*;
|
||||
if !matches!(ts, Date32 | Date64 | Timestamp(_, _)) {
|
||||
return Err(DataFusionError::Plan(
|
||||
format!("Expect timestamp column as first arg for tumble_start, found {:?}", ts)
|
||||
));
|
||||
}
|
||||
if !matches!(window, Utf8 | Interval(_)) {
|
||||
return Err(DataFusionError::Plan(
|
||||
format!("Expect second arg for window size's type being interval for tumble_start, found {:?}", window),
|
||||
));
|
||||
}
|
||||
|
||||
if let Some(start_time) = opt{
|
||||
if !matches!(start_time, Utf8 | Date32 | Date64 | Timestamp(_, _)){
|
||||
return Err(DataFusionError::Plan(
|
||||
format!("Expect start_time to either be date, timestampe or string, found {:?}", start_time)
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(arg_types.to_vec())
|
||||
}
|
||||
_ => Err(DataFusionError::Plan(
|
||||
"Expect tumble function have at least two arg(timestamp column and window size) and a third optional arg for starting time".to_string(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
fn return_type(
|
||||
&self,
|
||||
arg_types: &[arrow_schema::DataType],
|
||||
) -> Result<arrow_schema::DataType, DataFusionError> {
|
||||
arg_types.first().cloned().ok_or_else(|| {
|
||||
DataFusionError::Plan(
|
||||
"Expect tumble function have at least two arg(timestamp column and window size)"
|
||||
.to_string(),
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
fn invoke(
|
||||
&self,
|
||||
_args: &[datafusion_expr::ColumnarValue],
|
||||
) -> Result<datafusion_expr::ColumnarValue, DataFusionError> {
|
||||
Err(DataFusionError::Plan(
|
||||
"This function should not be executed by datafusion".to_string(),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
/// This rule check all group by exprs, and make sure they are also in select clause in a aggr query
|
||||
struct CheckGroupByRule {}
|
||||
|
||||
impl CheckGroupByRule {
|
||||
pub fn new() -> Self {
|
||||
Self {}
|
||||
}
|
||||
}
|
||||
|
||||
impl AnalyzerRule for CheckGroupByRule {
|
||||
fn analyze(
|
||||
&self,
|
||||
plan: datafusion_expr::LogicalPlan,
|
||||
_config: &ConfigOptions,
|
||||
) -> datafusion_common::Result<datafusion_expr::LogicalPlan> {
|
||||
let transformed = plan
|
||||
.transform_up_with_subqueries(check_group_by_analyzer)?
|
||||
.data;
|
||||
Ok(transformed)
|
||||
}
|
||||
|
||||
fn name(&self) -> &str {
|
||||
"check_groupby"
|
||||
}
|
||||
}
|
||||
|
||||
/// make sure everything in group by's expr is in select
|
||||
fn check_group_by_analyzer(
|
||||
plan: datafusion_expr::LogicalPlan,
|
||||
) -> Result<Transformed<datafusion_expr::LogicalPlan>, DataFusionError> {
|
||||
if let datafusion_expr::LogicalPlan::Projection(proj) = &plan {
|
||||
if let datafusion_expr::LogicalPlan::Aggregate(aggr) = proj.input.as_ref() {
|
||||
let mut found_column_used = FindColumn::new();
|
||||
proj.expr
|
||||
.iter()
|
||||
.map(|i| i.visit(&mut found_column_used))
|
||||
.count();
|
||||
for expr in aggr.group_expr.iter() {
|
||||
if !found_column_used
|
||||
.names_for_alias
|
||||
.contains(&expr.name_for_alias()?)
|
||||
{
|
||||
return Err(DataFusionError::Plan(format!("Expect {} expr in group by also exist in select list, but select list only contain {:?}",expr.name_for_alias()?, found_column_used.names_for_alias)));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Transformed::no(plan))
|
||||
}
|
||||
|
||||
/// Find all column names in a plan
|
||||
#[derive(Debug, Default)]
|
||||
struct FindColumn {
|
||||
names_for_alias: HashSet<String>,
|
||||
}
|
||||
|
||||
impl FindColumn {
|
||||
fn new() -> Self {
|
||||
Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
impl TreeNodeVisitor<'_> for FindColumn {
|
||||
type Node = datafusion_expr::Expr;
|
||||
fn f_down(
|
||||
&mut self,
|
||||
node: &datafusion_expr::Expr,
|
||||
) -> Result<TreeNodeRecursion, DataFusionError> {
|
||||
if let datafusion_expr::Expr::Column(_) = node {
|
||||
self.names_for_alias.insert(node.name_for_alias()?);
|
||||
}
|
||||
Ok(TreeNodeRecursion::Continue)
|
||||
}
|
||||
}
|
||||
@@ -24,6 +24,7 @@ mod scalar;
|
||||
mod signature;
|
||||
|
||||
use datatypes::prelude::DataType;
|
||||
use datatypes::value::Value;
|
||||
use datatypes::vectors::VectorRef;
|
||||
pub(crate) use df_func::{DfScalarFunction, RawDfScalarFn};
|
||||
pub(crate) use error::{EvalError, InvalidArgumentSnafu};
|
||||
@@ -37,42 +38,168 @@ use snafu::{ensure, ResultExt};
|
||||
|
||||
use crate::expr::error::DataTypeSnafu;
|
||||
|
||||
pub const TUMBLE_START: &str = "tumble_start";
|
||||
pub const TUMBLE_END: &str = "tumble_end";
|
||||
|
||||
/// A batch of vectors with the same length but without schema, only useful in dataflow
|
||||
///
|
||||
/// somewhere cheap to clone since it just contains a list of VectorRef(which is a `Arc`).
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Batch {
|
||||
batch: Vec<VectorRef>,
|
||||
row_count: usize,
|
||||
/// describe if corresponding rows in batch is insert or delete, None means all rows are insert
|
||||
diffs: Option<VectorRef>,
|
||||
}
|
||||
|
||||
impl PartialEq for Batch {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
let mut batch_eq = true;
|
||||
if self.batch.len() != other.batch.len() {
|
||||
return false;
|
||||
}
|
||||
for (left, right) in self.batch.iter().zip(other.batch.iter()) {
|
||||
batch_eq = batch_eq
|
||||
&& <dyn arrow::array::Array>::eq(&left.to_arrow_array(), &right.to_arrow_array());
|
||||
}
|
||||
|
||||
let diff_eq = match (&self.diffs, &other.diffs) {
|
||||
(Some(left), Some(right)) => {
|
||||
<dyn arrow::array::Array>::eq(&left.to_arrow_array(), &right.to_arrow_array())
|
||||
}
|
||||
(None, None) => true,
|
||||
_ => false,
|
||||
};
|
||||
batch_eq && diff_eq && self.row_count == other.row_count
|
||||
}
|
||||
}
|
||||
|
||||
impl Eq for Batch {}
|
||||
|
||||
impl Default for Batch {
|
||||
fn default() -> Self {
|
||||
Self::empty()
|
||||
}
|
||||
}
|
||||
|
||||
impl Batch {
|
||||
pub fn new(batch: Vec<VectorRef>, row_count: usize) -> Self {
|
||||
Self { batch, row_count }
|
||||
pub fn try_from_rows(rows: Vec<crate::repr::Row>) -> Result<Self, EvalError> {
|
||||
if rows.is_empty() {
|
||||
return Ok(Self::empty());
|
||||
}
|
||||
let len = rows.len();
|
||||
let mut builder = rows
|
||||
.first()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.map(|v| v.data_type().create_mutable_vector(len))
|
||||
.collect_vec();
|
||||
for row in rows {
|
||||
ensure!(
|
||||
row.len() == builder.len(),
|
||||
InvalidArgumentSnafu {
|
||||
reason: format!(
|
||||
"row length not match, expect {}, found {}",
|
||||
builder.len(),
|
||||
row.len()
|
||||
)
|
||||
}
|
||||
);
|
||||
for (idx, value) in row.iter().enumerate() {
|
||||
builder[idx]
|
||||
.try_push_value_ref(value.as_value_ref())
|
||||
.context(DataTypeSnafu {
|
||||
msg: "Failed to convert rows to columns",
|
||||
})?;
|
||||
}
|
||||
}
|
||||
|
||||
let columns = builder.into_iter().map(|mut b| b.to_vector()).collect_vec();
|
||||
let batch = Self::try_new(columns, len)?;
|
||||
Ok(batch)
|
||||
}
|
||||
|
||||
pub fn empty() -> Self {
|
||||
Self {
|
||||
batch: vec![],
|
||||
row_count: 0,
|
||||
diffs: None,
|
||||
}
|
||||
}
|
||||
pub fn try_new(batch: Vec<VectorRef>, row_count: usize) -> Result<Self, EvalError> {
|
||||
ensure!(
|
||||
batch.iter().map(|v| v.len()).all_equal()
|
||||
&& batch.first().map(|v| v.len() == row_count).unwrap_or(true),
|
||||
InvalidArgumentSnafu {
|
||||
reason: "All columns should have same length".to_string()
|
||||
}
|
||||
);
|
||||
Ok(Self {
|
||||
batch,
|
||||
row_count,
|
||||
diffs: None,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn new_unchecked(batch: Vec<VectorRef>, row_count: usize) -> Self {
|
||||
Self {
|
||||
batch,
|
||||
row_count,
|
||||
diffs: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn batch(&self) -> &[VectorRef] {
|
||||
&self.batch
|
||||
}
|
||||
|
||||
pub fn batch_mut(&mut self) -> &mut Vec<VectorRef> {
|
||||
&mut self.batch
|
||||
}
|
||||
|
||||
pub fn row_count(&self) -> usize {
|
||||
self.row_count
|
||||
}
|
||||
|
||||
pub fn set_row_count(&mut self, row_count: usize) {
|
||||
self.row_count = row_count;
|
||||
}
|
||||
|
||||
pub fn column_count(&self) -> usize {
|
||||
self.batch.len()
|
||||
}
|
||||
|
||||
pub fn get_row(&self, idx: usize) -> Result<Vec<Value>, EvalError> {
|
||||
ensure!(
|
||||
idx < self.row_count,
|
||||
InvalidArgumentSnafu {
|
||||
reason: format!(
|
||||
"Expect row index to be less than {}, found {}",
|
||||
self.row_count, idx
|
||||
)
|
||||
}
|
||||
);
|
||||
Ok(self.batch.iter().map(|v| v.get(idx)).collect_vec())
|
||||
}
|
||||
|
||||
/// Slices the `Batch`, returning a new `Batch`.
|
||||
///
|
||||
/// # Panics
|
||||
/// This function panics if `offset + length > self.row_count()`.
|
||||
pub fn slice(&self, offset: usize, length: usize) -> Batch {
|
||||
pub fn slice(&self, offset: usize, length: usize) -> Result<Batch, EvalError> {
|
||||
let batch = self
|
||||
.batch()
|
||||
.iter()
|
||||
.map(|v| v.slice(offset, length))
|
||||
.collect_vec();
|
||||
Batch::new(batch, length)
|
||||
Batch::try_new(batch, length)
|
||||
}
|
||||
|
||||
/// append another batch to self
|
||||
///
|
||||
/// NOTE: This is expensive since it will create new vectors for each column
|
||||
pub fn append_batch(&mut self, other: Batch) -> Result<(), EvalError> {
|
||||
ensure!(
|
||||
self.batch.len() == other.batch.len(),
|
||||
self.batch.len() == other.batch.len()
|
||||
|| self.batch.is_empty()
|
||||
|| other.batch.is_empty(),
|
||||
InvalidArgumentSnafu {
|
||||
reason: format!(
|
||||
"Expect two batch to have same numbers of column, found {} and {} columns",
|
||||
@@ -82,21 +209,31 @@ impl Batch {
|
||||
}
|
||||
);
|
||||
|
||||
let batch_builders = self
|
||||
.batch
|
||||
if self.batch.is_empty() {
|
||||
self.batch = other.batch;
|
||||
self.row_count = other.row_count;
|
||||
return Ok(());
|
||||
} else if other.batch.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let dts = if self.batch.is_empty() {
|
||||
other.batch.iter().map(|v| v.data_type()).collect_vec()
|
||||
} else {
|
||||
self.batch.iter().map(|v| v.data_type()).collect_vec()
|
||||
};
|
||||
|
||||
let batch_builders = dts
|
||||
.iter()
|
||||
.map(|v| {
|
||||
v.data_type()
|
||||
.create_mutable_vector(self.row_count() + other.row_count())
|
||||
})
|
||||
.map(|dt| dt.create_mutable_vector(self.row_count() + other.row_count()))
|
||||
.collect_vec();
|
||||
|
||||
let mut result = vec![];
|
||||
let zelf_row_count = self.row_count();
|
||||
let self_row_count = self.row_count();
|
||||
let other_row_count = other.row_count();
|
||||
for (idx, mut builder) in batch_builders.into_iter().enumerate() {
|
||||
builder
|
||||
.extend_slice_of(self.batch()[idx].as_ref(), 0, zelf_row_count)
|
||||
.extend_slice_of(self.batch()[idx].as_ref(), 0, self_row_count)
|
||||
.context(DataTypeSnafu {
|
||||
msg: "Failed to extend vector",
|
||||
})?;
|
||||
@@ -108,7 +245,7 @@ impl Batch {
|
||||
result.push(builder.to_vector());
|
||||
}
|
||||
self.batch = result;
|
||||
self.row_count = zelf_row_count + other_row_count;
|
||||
self.row_count = self_row_count + other_row_count;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -35,13 +35,13 @@ use snafu::{ensure, OptionExt, ResultExt};
|
||||
use strum::{EnumIter, IntoEnumIterator};
|
||||
use substrait::df_logical_plan::consumer::name_to_op;
|
||||
|
||||
use crate::error::{Error, ExternalSnafu, InvalidQuerySnafu, PlanSnafu};
|
||||
use crate::error::{Error, ExternalSnafu, InvalidQuerySnafu, PlanSnafu, UnexpectedSnafu};
|
||||
use crate::expr::error::{
|
||||
ArrowSnafu, CastValueSnafu, DataTypeSnafu, DivisionByZeroSnafu, EvalError, OverflowSnafu,
|
||||
TryFromValueSnafu, TypeMismatchSnafu,
|
||||
};
|
||||
use crate::expr::signature::{GenericFn, Signature};
|
||||
use crate::expr::{Batch, InvalidArgumentSnafu, ScalarExpr, TypedExpr};
|
||||
use crate::expr::{Batch, InvalidArgumentSnafu, ScalarExpr, TypedExpr, TUMBLE_END, TUMBLE_START};
|
||||
use crate::repr::{self, value_to_internal_ts};
|
||||
|
||||
/// UnmaterializableFunc is a function that can't be eval independently,
|
||||
@@ -87,42 +87,10 @@ impl UnmaterializableFunc {
|
||||
}
|
||||
|
||||
/// Create a UnmaterializableFunc from a string of the function name
|
||||
pub fn from_str_args(name: &str, args: Vec<TypedExpr>) -> Result<Self, Error> {
|
||||
pub fn from_str_args(name: &str, _args: Vec<TypedExpr>) -> Result<Self, Error> {
|
||||
match name.to_lowercase().as_str() {
|
||||
"now" => Ok(Self::Now),
|
||||
"current_schema" => Ok(Self::CurrentSchema),
|
||||
"tumble" => {
|
||||
let ts = args.first().context(InvalidQuerySnafu {
|
||||
reason: "Tumble window function requires a timestamp argument",
|
||||
})?;
|
||||
let window_size = args
|
||||
.get(1)
|
||||
.and_then(|expr| expr.expr.as_literal())
|
||||
.context(InvalidQuerySnafu {
|
||||
reason: "Tumble window function requires a window size argument"
|
||||
})?.as_string() // TODO(discord9): since df to substrait convertor does not support interval type yet, we need to take a string and cast it to interval instead
|
||||
.map(|s|cast(Value::from(s), &ConcreteDataType::interval_month_day_nano_datatype())).transpose().map_err(BoxedError::new).context(
|
||||
ExternalSnafu
|
||||
)?.and_then(|v|v.as_interval())
|
||||
.with_context(||InvalidQuerySnafu {
|
||||
reason: format!("Tumble window function requires window size argument to be a string describe a interval, found {:?}", args.get(1))
|
||||
})?;
|
||||
let start_time = match args.get(2) {
|
||||
Some(start_time) => start_time.expr.as_literal(),
|
||||
None => None,
|
||||
}
|
||||
.map(|s| cast(s.clone(), &ConcreteDataType::datetime_datatype())).transpose().map_err(BoxedError::new).context(ExternalSnafu)?.map(|v|v.as_datetime().with_context(
|
||||
||InvalidQuerySnafu {
|
||||
reason: format!("Tumble window function requires start time argument to be a datetime describe in string, found {:?}", args.get(2))
|
||||
}
|
||||
)).transpose()?;
|
||||
|
||||
Ok(Self::TumbleWindow {
|
||||
ts: Box::new(ts.clone()),
|
||||
window_size,
|
||||
start_time,
|
||||
})
|
||||
}
|
||||
_ => InvalidQuerySnafu {
|
||||
reason: format!("Unknown unmaterializable function: {}", name),
|
||||
}
|
||||
@@ -347,6 +315,96 @@ impl UnaryFunc {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_tumble_func(name: &str, args: &[TypedExpr]) -> Result<(Self, TypedExpr), Error> {
|
||||
match name.to_lowercase().as_str() {
|
||||
TUMBLE_START | TUMBLE_END => {
|
||||
let ts = args.first().context(InvalidQuerySnafu {
|
||||
reason: "Tumble window function requires a timestamp argument",
|
||||
})?;
|
||||
let window_size = {
|
||||
let window_size_untyped = args
|
||||
.get(1)
|
||||
.and_then(|expr| expr.expr.as_literal())
|
||||
.context(InvalidQuerySnafu {
|
||||
reason: "Tumble window function requires a window size argument",
|
||||
})?;
|
||||
if let Some(window_size) = window_size_untyped.as_string() {
|
||||
// cast as interval
|
||||
cast(
|
||||
Value::from(window_size),
|
||||
&ConcreteDataType::interval_month_day_nano_datatype(),
|
||||
)
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?
|
||||
.as_interval()
|
||||
.context(UnexpectedSnafu {
|
||||
reason: "Expect window size arg to be interval after successful cast"
|
||||
.to_string(),
|
||||
})?
|
||||
} else if let Some(interval) = window_size_untyped.as_interval() {
|
||||
interval
|
||||
} else {
|
||||
InvalidQuerySnafu {
|
||||
reason: format!(
|
||||
"Tumble window function requires window size argument to be either a interval or a string describe a interval, found {:?}",
|
||||
window_size_untyped
|
||||
)
|
||||
}.fail()?
|
||||
}
|
||||
};
|
||||
|
||||
// start time argument is optional
|
||||
let start_time = match args.get(2) {
|
||||
Some(start_time) => {
|
||||
if let Some(value) = start_time.expr.as_literal() {
|
||||
// cast as DateTime
|
||||
let ret = cast(value, &ConcreteDataType::datetime_datatype())
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?
|
||||
.as_datetime()
|
||||
.context(UnexpectedSnafu {
|
||||
reason:
|
||||
"Expect start time arg to be datetime after successful cast"
|
||||
.to_string(),
|
||||
})?;
|
||||
Some(ret)
|
||||
} else {
|
||||
UnexpectedSnafu {
|
||||
reason: "Expect start time arg to be literal",
|
||||
}
|
||||
.fail()?
|
||||
}
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
|
||||
if name == TUMBLE_START {
|
||||
Ok((
|
||||
Self::TumbleWindowFloor {
|
||||
window_size,
|
||||
start_time,
|
||||
},
|
||||
ts.clone(),
|
||||
))
|
||||
} else if name == TUMBLE_END {
|
||||
Ok((
|
||||
Self::TumbleWindowCeiling {
|
||||
window_size,
|
||||
start_time,
|
||||
},
|
||||
ts.clone(),
|
||||
))
|
||||
} else {
|
||||
unreachable!()
|
||||
}
|
||||
}
|
||||
_ => crate::error::InternalSnafu {
|
||||
reason: format!("Unknown tumble kind function: {}", name),
|
||||
}
|
||||
.fail()?,
|
||||
}
|
||||
}
|
||||
|
||||
/// Evaluate the function with given values and expression
|
||||
///
|
||||
/// # Arguments
|
||||
@@ -712,8 +770,8 @@ impl BinaryFunc {
|
||||
t1 == t2,
|
||||
InvalidQuerySnafu {
|
||||
reason: format!(
|
||||
"Binary function {:?} requires both arguments to have the same type",
|
||||
generic
|
||||
"Binary function {:?} requires both arguments to have the same type, left={:?}, right={:?}",
|
||||
generic, t1, t2
|
||||
),
|
||||
}
|
||||
);
|
||||
|
||||
@@ -16,13 +16,18 @@
|
||||
|
||||
use std::collections::{BTreeMap, BTreeSet};
|
||||
|
||||
use arrow::array::BooleanArray;
|
||||
use arrow::compute::FilterBuilder;
|
||||
use common_telemetry::debug;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::value::Value;
|
||||
use snafu::ensure;
|
||||
use datatypes::vectors::{BooleanVector, Helper};
|
||||
use itertools::Itertools;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
|
||||
use crate::error::{Error, InvalidQuerySnafu};
|
||||
use crate::expr::error::{EvalError, InternalSnafu};
|
||||
use crate::expr::{InvalidArgumentSnafu, ScalarExpr};
|
||||
use crate::expr::error::{ArrowSnafu, DataTypeSnafu, EvalError, InternalSnafu, TypeMismatchSnafu};
|
||||
use crate::expr::{Batch, InvalidArgumentSnafu, ScalarExpr};
|
||||
use crate::repr::{self, value_to_internal_ts, Diff, Row};
|
||||
|
||||
/// A compound operator that can be applied row-by-row.
|
||||
@@ -473,6 +478,85 @@ impl SafeMfpPlan {
|
||||
self.mfp.permute(map, new_arity)
|
||||
}
|
||||
|
||||
/// similar to [`MapFilterProject::evaluate_into`], just in batch, and rows that don't pass the predicates are not included in the output.
|
||||
///
|
||||
/// so it's not guaranteed that the output will have the same number of rows as the input.
|
||||
pub fn eval_batch_into(&self, batch: &mut Batch) -> Result<Batch, EvalError> {
|
||||
ensure!(
|
||||
batch.column_count() == self.mfp.input_arity,
|
||||
InvalidArgumentSnafu {
|
||||
reason: format!(
|
||||
"batch column length {} is not equal to input_arity {}",
|
||||
batch.column_count(),
|
||||
self.mfp.input_arity
|
||||
),
|
||||
}
|
||||
);
|
||||
|
||||
let passed_predicates = self.eval_batch_inner(batch)?;
|
||||
let filter = FilterBuilder::new(passed_predicates.as_boolean_array());
|
||||
let pred = filter.build();
|
||||
let mut result = vec![];
|
||||
for col in batch.batch() {
|
||||
let filtered = pred
|
||||
.filter(col.to_arrow_array().as_ref())
|
||||
.context(ArrowSnafu {
|
||||
context: format!("failed to filter column for mfp operator {:?}", self),
|
||||
})?;
|
||||
result.push(Helper::try_into_vector(filtered).context(DataTypeSnafu {
|
||||
msg: "Failed to convert arrow array to vector",
|
||||
})?);
|
||||
}
|
||||
let projected = self
|
||||
.mfp
|
||||
.projection
|
||||
.iter()
|
||||
.map(|c| result[*c].clone())
|
||||
.collect_vec();
|
||||
let row_count = pred.count();
|
||||
|
||||
Batch::try_new(projected, row_count)
|
||||
}
|
||||
|
||||
/// similar to [`MapFilterProject::evaluate_into`], just in batch.
|
||||
pub fn eval_batch_inner(&self, batch: &mut Batch) -> Result<BooleanVector, EvalError> {
|
||||
// mark the columns that have been evaluated and appended to the `batch`
|
||||
let mut expression = 0;
|
||||
// preds default to true and will be updated as we evaluate each predicate
|
||||
let mut all_preds = BooleanVector::from(vec![Some(true); batch.row_count()]);
|
||||
|
||||
// to compute predicate, need to first compute all expressions used in predicates
|
||||
for (support, predicate) in self.mfp.predicates.iter() {
|
||||
while self.mfp.input_arity + expression < *support {
|
||||
let expr_eval = self.mfp.expressions[expression].eval_batch(batch)?;
|
||||
batch.batch_mut().push(expr_eval);
|
||||
expression += 1;
|
||||
}
|
||||
let pred_vec = predicate.eval_batch(batch)?;
|
||||
let pred_arr = pred_vec.to_arrow_array();
|
||||
let pred_arr = pred_arr.as_any().downcast_ref::<BooleanArray>().context({
|
||||
TypeMismatchSnafu {
|
||||
expected: ConcreteDataType::boolean_datatype(),
|
||||
actual: pred_vec.data_type(),
|
||||
}
|
||||
})?;
|
||||
let all_arr = all_preds.as_boolean_array();
|
||||
let res_arr = arrow::compute::and(all_arr, pred_arr).context(ArrowSnafu {
|
||||
context: format!("failed to compute predicate for mfp operator {:?}", self),
|
||||
})?;
|
||||
all_preds = BooleanVector::from(res_arr);
|
||||
}
|
||||
|
||||
// while evaluated expressions are less than total expressions, keep evaluating
|
||||
while expression < self.mfp.expressions.len() {
|
||||
let expr_eval = self.mfp.expressions[expression].eval_batch(batch)?;
|
||||
batch.batch_mut().push(expr_eval);
|
||||
expression += 1;
|
||||
}
|
||||
|
||||
Ok(all_preds)
|
||||
}
|
||||
|
||||
/// Evaluates the linear operator on a supplied list of datums.
|
||||
///
|
||||
/// The arguments are the initial datums associated with the row,
|
||||
@@ -735,10 +819,15 @@ impl MfpPlan {
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use std::sync::Arc;
|
||||
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::vectors::{Int32Vector, Int64Vector};
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
use super::*;
|
||||
use crate::expr::{BinaryFunc, UnaryFunc, UnmaterializableFunc};
|
||||
|
||||
#[test]
|
||||
fn test_mfp_with_time() {
|
||||
use crate::expr::func::BinaryFunc;
|
||||
@@ -844,6 +933,21 @@ mod test {
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(ret, Row::pack(vec![Value::from(false), Value::from(true)]));
|
||||
|
||||
// batch mode
|
||||
let mut batch = Batch::try_from_rows(vec![Row::from(vec![
|
||||
Value::from(4),
|
||||
Value::from(2),
|
||||
Value::from(3),
|
||||
])])
|
||||
.unwrap();
|
||||
let ret = safe_mfp.eval_batch_into(&mut batch).unwrap();
|
||||
|
||||
assert_eq!(
|
||||
ret,
|
||||
Batch::try_from_rows(vec![Row::from(vec![Value::from(false), Value::from(true)])])
|
||||
.unwrap()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -865,7 +969,7 @@ mod test {
|
||||
BinaryFunc::Gt,
|
||||
)])
|
||||
.unwrap();
|
||||
let mut input1 = vec![
|
||||
let input1 = vec![
|
||||
Value::from(4),
|
||||
Value::from(2),
|
||||
Value::from(3),
|
||||
@@ -873,19 +977,34 @@ mod test {
|
||||
];
|
||||
let safe_mfp = SafeMfpPlan { mfp };
|
||||
let ret = safe_mfp
|
||||
.evaluate_into(&mut input1, &mut Row::empty())
|
||||
.evaluate_into(&mut input1.clone(), &mut Row::empty())
|
||||
.unwrap();
|
||||
assert_eq!(ret, None);
|
||||
let mut input2 = vec![
|
||||
|
||||
let mut input1_batch = Batch::try_from_rows(vec![Row::new(input1)]).unwrap();
|
||||
let ret_batch = safe_mfp.eval_batch_into(&mut input1_batch).unwrap();
|
||||
assert_eq!(
|
||||
ret_batch,
|
||||
Batch::try_new(vec![Arc::new(Int32Vector::from_vec(vec![]))], 0).unwrap()
|
||||
);
|
||||
|
||||
let input2 = vec![
|
||||
Value::from(5),
|
||||
Value::from(2),
|
||||
Value::from(4),
|
||||
Value::from("abc"),
|
||||
];
|
||||
let ret = safe_mfp
|
||||
.evaluate_into(&mut input2, &mut Row::empty())
|
||||
.evaluate_into(&mut input2.clone(), &mut Row::empty())
|
||||
.unwrap();
|
||||
assert_eq!(ret, Some(Row::pack(vec![Value::from(11)])));
|
||||
|
||||
let mut input2_batch = Batch::try_from_rows(vec![Row::new(input2)]).unwrap();
|
||||
let ret_batch = safe_mfp.eval_batch_into(&mut input2_batch).unwrap();
|
||||
assert_eq!(
|
||||
ret_batch,
|
||||
Batch::try_new(vec![Arc::new(Int32Vector::from_vec(vec![11]))], 1).unwrap()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -923,27 +1042,50 @@ mod test {
|
||||
.unwrap()
|
||||
.project([0, 1, 2])
|
||||
.unwrap();
|
||||
let mut input1 = vec![
|
||||
let input1 = vec![
|
||||
Value::from(4i64),
|
||||
Value::from(2),
|
||||
Value::from(3),
|
||||
Value::from(53),
|
||||
];
|
||||
let safe_mfp = SafeMfpPlan { mfp };
|
||||
let ret = safe_mfp.evaluate_into(&mut input1, &mut Row::empty());
|
||||
let ret = safe_mfp.evaluate_into(&mut input1.clone(), &mut Row::empty());
|
||||
assert!(matches!(ret, Err(EvalError::InvalidArgument { .. })));
|
||||
|
||||
let mut input1_batch = Batch::try_from_rows(vec![Row::new(input1)]).unwrap();
|
||||
let ret_batch = safe_mfp.eval_batch_into(&mut input1_batch);
|
||||
assert!(matches!(ret_batch, Err(EvalError::InvalidArgument { .. })));
|
||||
|
||||
let input2 = vec![Value::from(4i64), Value::from(2), Value::from(3)];
|
||||
let ret = safe_mfp
|
||||
.evaluate_into(&mut input2.clone(), &mut Row::empty())
|
||||
.unwrap();
|
||||
assert_eq!(ret, Some(Row::new(input2)));
|
||||
assert_eq!(ret, Some(Row::new(input2.clone())));
|
||||
|
||||
let mut input3 = vec![Value::from(4i64), Value::from(5), Value::from(2)];
|
||||
let input2_batch = Batch::try_from_rows(vec![Row::new(input2)]).unwrap();
|
||||
let ret_batch = safe_mfp.eval_batch_into(&mut input2_batch.clone()).unwrap();
|
||||
assert_eq!(ret_batch, input2_batch);
|
||||
|
||||
let input3 = vec![Value::from(4i64), Value::from(5), Value::from(2)];
|
||||
let ret = safe_mfp
|
||||
.evaluate_into(&mut input3, &mut Row::empty())
|
||||
.evaluate_into(&mut input3.clone(), &mut Row::empty())
|
||||
.unwrap();
|
||||
assert_eq!(ret, None);
|
||||
|
||||
let input3_batch = Batch::try_from_rows(vec![Row::new(input3)]).unwrap();
|
||||
let ret_batch = safe_mfp.eval_batch_into(&mut input3_batch.clone()).unwrap();
|
||||
assert_eq!(
|
||||
ret_batch,
|
||||
Batch::try_new(
|
||||
vec![
|
||||
Arc::new(Int64Vector::from_vec(Default::default())),
|
||||
Arc::new(Int32Vector::from_vec(Default::default())),
|
||||
Arc::new(Int32Vector::from_vec(Default::default()))
|
||||
],
|
||||
0
|
||||
)
|
||||
.unwrap()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -961,10 +1103,18 @@ mod test {
|
||||
.unwrap()
|
||||
.project(vec![3])
|
||||
.unwrap();
|
||||
let mut input1 = vec![Value::from(2), Value::from(3), Value::from(4)];
|
||||
let input1 = vec![Value::from(2), Value::from(3), Value::from(4)];
|
||||
let safe_mfp = SafeMfpPlan { mfp };
|
||||
let ret = safe_mfp.evaluate_into(&mut input1, &mut Row::empty());
|
||||
let ret = safe_mfp.evaluate_into(&mut input1.clone(), &mut Row::empty());
|
||||
assert_eq!(ret.unwrap(), Some(Row::new(vec![Value::from(false)])));
|
||||
|
||||
let mut input1_batch = Batch::try_from_rows(vec![Row::new(input1)]).unwrap();
|
||||
let ret_batch = safe_mfp.eval_batch_into(&mut input1_batch).unwrap();
|
||||
|
||||
assert_eq!(
|
||||
ret_batch,
|
||||
Batch::try_new(vec![Arc::new(BooleanVector::from(vec![false]))], 1).unwrap()
|
||||
);
|
||||
}
|
||||
#[test]
|
||||
fn test_mfp_chore() {
|
||||
|
||||
@@ -18,15 +18,17 @@ use std::sync::OnceLock;
|
||||
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::value::Value;
|
||||
use datatypes::vectors::VectorRef;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use smallvec::smallvec;
|
||||
use snafu::{IntoError, OptionExt};
|
||||
use snafu::{ensure, IntoError, OptionExt};
|
||||
use strum::{EnumIter, IntoEnumIterator};
|
||||
|
||||
use crate::error::{DatafusionSnafu, Error, InvalidQuerySnafu};
|
||||
use crate::expr::error::EvalError;
|
||||
use crate::expr::relation::accum::{Accum, Accumulator};
|
||||
use crate::expr::signature::{GenericFn, Signature};
|
||||
use crate::expr::InvalidArgumentSnafu;
|
||||
use crate::repr::Diff;
|
||||
|
||||
/// Aggregate functions that can be applied to a group of rows.
|
||||
@@ -131,6 +133,98 @@ impl AggregateFunc {
|
||||
let res = accum.eval(self)?;
|
||||
Ok((res, accum.into_state()))
|
||||
}
|
||||
|
||||
/// return output value and new accumulator state
|
||||
pub fn eval_batch<A>(
|
||||
&self,
|
||||
accum: A,
|
||||
vector: VectorRef,
|
||||
diff: Option<VectorRef>,
|
||||
) -> Result<(Value, Vec<Value>), EvalError>
|
||||
where
|
||||
A: IntoIterator<Item = Value>,
|
||||
{
|
||||
let mut accum = accum.into_iter().peekable();
|
||||
|
||||
let mut accum = if accum.peek().is_none() {
|
||||
Accum::new_accum(self)?
|
||||
} else {
|
||||
Accum::try_from_iter(self, &mut accum)?
|
||||
};
|
||||
|
||||
let vector_diff = VectorDiff::try_new(vector, diff)?;
|
||||
|
||||
accum.update_batch(self, vector_diff)?;
|
||||
|
||||
let res = accum.eval(self)?;
|
||||
Ok((res, accum.into_state()))
|
||||
}
|
||||
}
|
||||
|
||||
struct VectorDiff {
|
||||
vector: VectorRef,
|
||||
diff: Option<VectorRef>,
|
||||
}
|
||||
|
||||
impl VectorDiff {
|
||||
fn len(&self) -> usize {
|
||||
self.vector.len()
|
||||
}
|
||||
|
||||
fn try_new(vector: VectorRef, diff: Option<VectorRef>) -> Result<Self, EvalError> {
|
||||
ensure!(
|
||||
diff.as_ref()
|
||||
.map_or(true, |diff| diff.len() == vector.len()),
|
||||
InvalidArgumentSnafu {
|
||||
reason: "Length of vector and diff should be the same"
|
||||
}
|
||||
);
|
||||
Ok(Self { vector, diff })
|
||||
}
|
||||
}
|
||||
|
||||
impl IntoIterator for VectorDiff {
|
||||
type Item = (Value, Diff);
|
||||
type IntoIter = VectorDiffIter;
|
||||
|
||||
fn into_iter(self) -> Self::IntoIter {
|
||||
VectorDiffIter {
|
||||
vector: self.vector,
|
||||
diff: self.diff,
|
||||
idx: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct VectorDiffIter {
|
||||
vector: VectorRef,
|
||||
diff: Option<VectorRef>,
|
||||
idx: usize,
|
||||
}
|
||||
|
||||
impl std::iter::Iterator for VectorDiffIter {
|
||||
type Item = (Value, Diff);
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
if self.idx >= self.vector.len() {
|
||||
return None;
|
||||
}
|
||||
let value = self.vector.get(self.idx);
|
||||
// +1 means insert, -1 means delete, and default to +1 insert when diff is not provided
|
||||
let diff = if let Some(diff) = self.diff.as_ref() {
|
||||
if let Ok(diff_at) = diff.get(self.idx).try_into() {
|
||||
diff_at
|
||||
} else {
|
||||
common_telemetry::warn!("Invalid diff value at index {}", self.idx);
|
||||
return None;
|
||||
}
|
||||
} else {
|
||||
1
|
||||
};
|
||||
|
||||
self.idx += 1;
|
||||
Some((value, diff))
|
||||
}
|
||||
}
|
||||
|
||||
/// Generate signature for each aggregate function
|
||||
|
||||
@@ -30,7 +30,7 @@ use crate::expr::error::{
|
||||
};
|
||||
use crate::expr::func::{BinaryFunc, UnaryFunc, UnmaterializableFunc, VariadicFunc};
|
||||
use crate::expr::{Batch, DfScalarFunction};
|
||||
use crate::repr::{ColumnType, RelationType};
|
||||
use crate::repr::ColumnType;
|
||||
/// A scalar expression with a known type.
|
||||
#[derive(Ord, PartialOrd, Clone, Debug, Eq, PartialEq, Hash)]
|
||||
pub struct TypedExpr {
|
||||
@@ -46,77 +46,6 @@ impl TypedExpr {
|
||||
}
|
||||
}
|
||||
|
||||
impl TypedExpr {
|
||||
/// expand multi-value expression to multiple expressions with new indices
|
||||
///
|
||||
/// Currently it just mean expand `TumbleWindow` to `TumbleWindowFloor` and `TumbleWindowCeiling`
|
||||
///
|
||||
/// TODO(discord9): test if nested reduce combine with df scalar function would cause problem
|
||||
pub fn expand_multi_value(
|
||||
input_typ: &RelationType,
|
||||
exprs: &[TypedExpr],
|
||||
) -> Result<Vec<TypedExpr>, Error> {
|
||||
// old indices in mfp, expanded expr
|
||||
let mut ret = vec![];
|
||||
let input_arity = input_typ.column_types.len();
|
||||
for (old_idx, expr) in exprs.iter().enumerate() {
|
||||
if let ScalarExpr::CallUnmaterializable(UnmaterializableFunc::TumbleWindow {
|
||||
ts,
|
||||
window_size,
|
||||
start_time,
|
||||
}) = &expr.expr
|
||||
{
|
||||
let floor = UnaryFunc::TumbleWindowFloor {
|
||||
window_size: *window_size,
|
||||
start_time: *start_time,
|
||||
};
|
||||
let ceil = UnaryFunc::TumbleWindowCeiling {
|
||||
window_size: *window_size,
|
||||
start_time: *start_time,
|
||||
};
|
||||
let floor = ScalarExpr::CallUnary {
|
||||
func: floor,
|
||||
expr: Box::new(ts.expr.clone()),
|
||||
}
|
||||
.with_type(ts.typ.clone());
|
||||
ret.push((None, floor));
|
||||
|
||||
let ceil = ScalarExpr::CallUnary {
|
||||
func: ceil,
|
||||
expr: Box::new(ts.expr.clone()),
|
||||
}
|
||||
.with_type(ts.typ.clone());
|
||||
ret.push((None, ceil));
|
||||
} else {
|
||||
ret.push((Some(input_arity + old_idx), expr.clone()))
|
||||
}
|
||||
}
|
||||
|
||||
// get shuffled index(old_idx -> new_idx)
|
||||
// note index is offset by input_arity because mfp is designed to be first include input columns then intermediate columns
|
||||
let shuffle = ret
|
||||
.iter()
|
||||
.map(|(old_idx, _)| *old_idx) // [Option<opt_idx>]
|
||||
.enumerate()
|
||||
.map(|(new, old)| (old, new + input_arity))
|
||||
.flat_map(|(old, new)| old.map(|o| (o, new)))
|
||||
.chain((0..input_arity).map(|i| (i, i))) // also remember to chain the input columns as not changed
|
||||
.collect::<BTreeMap<_, _>>();
|
||||
|
||||
// shuffle expr's index
|
||||
let exprs = ret
|
||||
.into_iter()
|
||||
.map(|(_, mut expr)| {
|
||||
// invariant: it is expect that no expr will try to refer the column being expanded
|
||||
expr.expr.permute_map(&shuffle)?;
|
||||
Ok(expr)
|
||||
})
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
|
||||
Ok(exprs)
|
||||
}
|
||||
}
|
||||
|
||||
/// A scalar expression, which can be evaluated to a value.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
||||
pub enum ScalarExpr {
|
||||
@@ -210,6 +139,13 @@ impl ScalarExpr {
|
||||
}
|
||||
|
||||
impl ScalarExpr {
|
||||
pub fn cast(self, typ: ConcreteDataType) -> Self {
|
||||
ScalarExpr::CallUnary {
|
||||
func: UnaryFunc::Cast(typ),
|
||||
expr: Box::new(self),
|
||||
}
|
||||
}
|
||||
|
||||
/// apply optimization to the expression, like flatten variadic function
|
||||
pub fn optimize(&mut self) {
|
||||
self.flatten_varidic_fn();
|
||||
@@ -341,7 +277,7 @@ impl ScalarExpr {
|
||||
// put a slice to corresponding batch
|
||||
let slice_offset = prev_cond_idx;
|
||||
let slice_length = idx - prev_cond_idx;
|
||||
let to_be_append = batch.slice(slice_offset, slice_length);
|
||||
let to_be_append = batch.slice(slice_offset, slice_length)?;
|
||||
|
||||
let to_put_back = match prev_cond {
|
||||
Some(true) => (
|
||||
@@ -364,7 +300,7 @@ impl ScalarExpr {
|
||||
if let Some(slice_offset) = prev_start_idx {
|
||||
let prev_cond = prev_cond.unwrap();
|
||||
let slice_length = bool_conds.len() - slice_offset;
|
||||
let to_be_append = batch.slice(slice_offset, slice_length);
|
||||
let to_be_append = batch.slice(slice_offset, slice_length)?;
|
||||
let to_put_back = match prev_cond {
|
||||
Some(true) => (
|
||||
Some(true),
|
||||
@@ -876,7 +812,7 @@ mod test {
|
||||
let raw_len = raw.len();
|
||||
let vectors = vec![Int32Vector::from(raw).slice(0, raw_len)];
|
||||
|
||||
let batch = Batch::new(vectors, raw_len);
|
||||
let batch = Batch::try_new(vectors, raw_len).unwrap();
|
||||
let expected = Int32Vector::from(vec![
|
||||
None,
|
||||
Some(42),
|
||||
@@ -895,7 +831,7 @@ mod test {
|
||||
let raw_len = raw.len();
|
||||
let vectors = vec![Int32Vector::from(raw).slice(0, raw_len)];
|
||||
|
||||
let batch = Batch::new(vectors, raw_len);
|
||||
let batch = Batch::try_new(vectors, raw_len).unwrap();
|
||||
let expected = Int32Vector::from(vec![Some(42)]).slice(0, raw_len);
|
||||
assert_eq!(expr.eval_batch(&batch).unwrap(), expected);
|
||||
|
||||
@@ -903,7 +839,7 @@ mod test {
|
||||
let raw_len = raw.len();
|
||||
let vectors = vec![Int32Vector::from(raw).slice(0, raw_len)];
|
||||
|
||||
let batch = Batch::new(vectors, raw_len);
|
||||
let batch = Batch::try_new(vectors, raw_len).unwrap();
|
||||
let expected = NullVector::new(raw_len).slice(0, raw_len);
|
||||
assert_eq!(expr.eval_batch(&batch).unwrap(), expected);
|
||||
}
|
||||
|
||||
@@ -19,6 +19,8 @@ use serde::{Deserialize, Serialize};
|
||||
use smallvec::SmallVec;
|
||||
|
||||
/// Function signature
|
||||
///
|
||||
/// TODO(discord9): use `common_query::signature::Signature` crate
|
||||
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize, Hash)]
|
||||
pub struct Signature {
|
||||
/// the input types, usually not great than two input arg
|
||||
|
||||
@@ -20,9 +20,14 @@
|
||||
#![allow(dead_code)]
|
||||
#![warn(clippy::missing_docs_in_private_items)]
|
||||
#![warn(clippy::too_many_lines)]
|
||||
|
||||
// TODO(discord9): enable this lint to handle out of bound access
|
||||
// #![cfg_attr(not(test), warn(clippy::indexing_slicing))]
|
||||
|
||||
// allow unused for now because it should be use later
|
||||
mod adapter;
|
||||
mod compute;
|
||||
mod df_optimizer;
|
||||
pub mod error;
|
||||
mod expr;
|
||||
pub mod heartbeat;
|
||||
|
||||
@@ -115,6 +115,8 @@ impl TypedPlan {
|
||||
|
||||
/// TODO(discord9): support `TableFunc`(by define FlatMap that map 1 to n)
|
||||
/// Plan describe how to transform data in dataflow
|
||||
///
|
||||
/// This can be considered as a physical plan in dataflow, which describe how to transform data in a streaming manner.
|
||||
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
|
||||
pub enum Plan {
|
||||
/// A constant collection of rows.
|
||||
|
||||
@@ -177,6 +177,12 @@ impl Row {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Vec<Value>> for Row {
|
||||
fn from(row: Vec<Value>) -> Self {
|
||||
Row::new(row)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ProtoRow> for Row {
|
||||
fn from(row: ProtoRow) -> Self {
|
||||
Row::pack(
|
||||
|
||||
@@ -374,10 +374,8 @@ impl RelationDesc {
|
||||
.collect();
|
||||
let arrow_schema = arrow_schema::Schema::new(fields);
|
||||
|
||||
DFSchema::try_from(arrow_schema.clone()).context({
|
||||
DatafusionSnafu {
|
||||
context: format!("Error when converting to DFSchema: {:?}", arrow_schema),
|
||||
}
|
||||
DFSchema::try_from(arrow_schema.clone()).with_context(|_e| DatafusionSnafu {
|
||||
context: format!("Error when converting to DFSchema: {:?}", arrow_schema),
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -17,24 +17,19 @@ use std::collections::{BTreeMap, HashMap};
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_error::ext::BoxedError;
|
||||
use datafusion::optimizer::simplify_expressions::SimplifyExpressions;
|
||||
use datafusion::optimizer::{OptimizerContext, OptimizerRule};
|
||||
use datatypes::data_type::ConcreteDataType as CDT;
|
||||
use query::parser::QueryLanguageParser;
|
||||
use query::plan::LogicalPlan;
|
||||
use query::query_engine::DefaultSerializer;
|
||||
use query::QueryEngine;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::ResultExt;
|
||||
/// note here we are using the `substrait_proto_df` crate from the `substrait` module and
|
||||
/// rename it to `substrait_proto`
|
||||
use substrait::{substrait_proto_df as substrait_proto, DFLogicalSubstraitConvertor};
|
||||
use substrait::substrait_proto_df as substrait_proto;
|
||||
use substrait_proto::proto::extensions::simple_extension_declaration::MappingType;
|
||||
use substrait_proto::proto::extensions::SimpleExtensionDeclaration;
|
||||
|
||||
use crate::adapter::FlownodeContext;
|
||||
use crate::error::{DatafusionSnafu, Error, ExternalSnafu, NotImplementedSnafu, UnexpectedSnafu};
|
||||
use crate::plan::TypedPlan;
|
||||
use crate::error::{Error, NotImplementedSnafu, UnexpectedSnafu};
|
||||
use crate::expr::{TUMBLE_END, TUMBLE_START};
|
||||
/// a simple macro to generate a not implemented error
|
||||
macro_rules! not_impl_err {
|
||||
($($arg:tt)*) => {
|
||||
@@ -102,68 +97,39 @@ impl FunctionExtensions {
|
||||
}
|
||||
}
|
||||
|
||||
/// To reuse existing code for parse sql, the sql is first parsed into a datafusion logical plan,
|
||||
/// then to a substrait plan, and finally to a flow plan.
|
||||
pub async fn sql_to_flow_plan(
|
||||
ctx: &mut FlownodeContext,
|
||||
engine: &Arc<dyn QueryEngine>,
|
||||
sql: &str,
|
||||
) -> Result<TypedPlan, Error> {
|
||||
let query_ctx = ctx.query_context.clone().ok_or_else(|| {
|
||||
UnexpectedSnafu {
|
||||
reason: "Query context is missing",
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
let stmt = QueryLanguageParser::parse_sql(sql, &query_ctx)
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
let plan = engine
|
||||
.planner()
|
||||
.plan(stmt, query_ctx)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
let LogicalPlan::DfPlan(plan) = plan;
|
||||
let plan = SimplifyExpressions::new()
|
||||
.rewrite(plan, &OptimizerContext::default())
|
||||
.context(DatafusionSnafu {
|
||||
context: "Fail to apply `SimplifyExpressions` optimization",
|
||||
})?
|
||||
.data;
|
||||
let sub_plan = DFLogicalSubstraitConvertor {}
|
||||
.to_sub_plan(&plan, DefaultSerializer)
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
|
||||
let flow_plan = TypedPlan::from_substrait_plan(ctx, &sub_plan).await?;
|
||||
|
||||
Ok(flow_plan)
|
||||
}
|
||||
|
||||
/// register flow-specific functions to the query engine
|
||||
pub fn register_function_to_query_engine(engine: &Arc<dyn QueryEngine>) {
|
||||
engine.register_function(Arc::new(TumbleFunction {}));
|
||||
engine.register_function(Arc::new(TumbleFunction::new("tumble")));
|
||||
engine.register_function(Arc::new(TumbleFunction::new(TUMBLE_START)));
|
||||
engine.register_function(Arc::new(TumbleFunction::new(TUMBLE_END)));
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct TumbleFunction {}
|
||||
pub struct TumbleFunction {
|
||||
name: String,
|
||||
}
|
||||
|
||||
const TUMBLE_NAME: &str = "tumble";
|
||||
impl TumbleFunction {
|
||||
fn new(name: &str) -> Self {
|
||||
Self {
|
||||
name: name.to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for TumbleFunction {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
write!(f, "{}", TUMBLE_NAME.to_ascii_uppercase())
|
||||
write!(f, "{}", self.name.to_ascii_uppercase())
|
||||
}
|
||||
}
|
||||
|
||||
impl common_function::function::Function for TumbleFunction {
|
||||
fn name(&self) -> &str {
|
||||
TUMBLE_NAME
|
||||
&self.name
|
||||
}
|
||||
|
||||
fn return_type(&self, _input_types: &[CDT]) -> common_query::error::Result<CDT> {
|
||||
Ok(CDT::datetime_datatype())
|
||||
Ok(CDT::timestamp_millisecond_datatype())
|
||||
}
|
||||
|
||||
fn signature(&self) -> common_query::prelude::Signature {
|
||||
@@ -198,6 +164,7 @@ mod test {
|
||||
use prost::Message;
|
||||
use query::parser::QueryLanguageParser;
|
||||
use query::plan::LogicalPlan;
|
||||
use query::query_engine::DefaultSerializer;
|
||||
use query::QueryEngine;
|
||||
use session::context::QueryContext;
|
||||
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
|
||||
@@ -207,6 +174,7 @@ mod test {
|
||||
|
||||
use super::*;
|
||||
use crate::adapter::node_context::IdToNameMap;
|
||||
use crate::df_optimizer::apply_df_optimizer;
|
||||
use crate::expr::GlobalId;
|
||||
use crate::repr::{ColumnType, RelationType};
|
||||
|
||||
@@ -292,7 +260,7 @@ mod test {
|
||||
let factory = query::QueryEngineFactory::new(catalog_list, None, None, None, None, false);
|
||||
|
||||
let engine = factory.query_engine();
|
||||
engine.register_function(Arc::new(TumbleFunction {}));
|
||||
register_function_to_query_engine(&engine);
|
||||
|
||||
assert_eq!("datafusion", engine.name());
|
||||
engine
|
||||
@@ -307,6 +275,7 @@ mod test {
|
||||
.await
|
||||
.unwrap();
|
||||
let LogicalPlan::DfPlan(plan) = plan;
|
||||
let plan = apply_df_optimizer(plan).await.unwrap();
|
||||
|
||||
// encode then decode so to rely on the impl of conversion from logical plan to substrait plan
|
||||
let bytes = DFLogicalSubstraitConvertor {}
|
||||
@@ -315,4 +284,22 @@ mod test {
|
||||
|
||||
proto::Plan::decode(bytes).unwrap()
|
||||
}
|
||||
|
||||
/// TODO(discord9): add more illegal sql tests
|
||||
#[tokio::test]
|
||||
async fn test_missing_key_check() {
|
||||
let engine = create_test_query_engine();
|
||||
let sql = "SELECT avg(number) FROM numbers_with_ts GROUP BY tumble(ts, '1 hour'), number";
|
||||
|
||||
let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap();
|
||||
let plan = engine
|
||||
.planner()
|
||||
.plan(stmt, QueryContext::arc())
|
||||
.await
|
||||
.unwrap();
|
||||
let LogicalPlan::DfPlan(plan) = plan;
|
||||
let plan = apply_df_optimizer(plan).await;
|
||||
|
||||
assert!(plan.is_err());
|
||||
}
|
||||
}
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -20,7 +20,7 @@ use common_error::ext::BoxedError;
|
||||
use common_telemetry::debug;
|
||||
use datafusion_physical_expr::PhysicalExpr;
|
||||
use datatypes::data_type::ConcreteDataType as CDT;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use substrait_proto::proto::expression::field_reference::ReferenceType::DirectReference;
|
||||
use substrait_proto::proto::expression::reference_segment::ReferenceType::StructField;
|
||||
use substrait_proto::proto::expression::{IfThen, RexType, ScalarFunction};
|
||||
@@ -33,7 +33,7 @@ use crate::error::{
|
||||
};
|
||||
use crate::expr::{
|
||||
BinaryFunc, DfScalarFunction, RawDfScalarFn, ScalarExpr, TypedExpr, UnaryFunc,
|
||||
UnmaterializableFunc, VariadicFunc,
|
||||
UnmaterializableFunc, VariadicFunc, TUMBLE_END, TUMBLE_START,
|
||||
};
|
||||
use crate::repr::{ColumnType, RelationDesc, RelationType};
|
||||
use crate::transform::literal::{
|
||||
@@ -167,6 +167,16 @@ fn rewrite_scalar_function(
|
||||
arg_typed_exprs: &[TypedExpr],
|
||||
) -> Result<ScalarFunction, Error> {
|
||||
let mut f_rewrite = f.clone();
|
||||
ensure!(
|
||||
f_rewrite.arguments.len() == arg_typed_exprs.len(),
|
||||
crate::error::InternalSnafu {
|
||||
reason: format!(
|
||||
"Expect `f_rewrite` and `arg_typed_expr` to be same length, found {} and {}",
|
||||
f_rewrite.arguments.len(),
|
||||
arg_typed_exprs.len()
|
||||
)
|
||||
}
|
||||
);
|
||||
for (idx, raw_expr) in f_rewrite.arguments.iter_mut().enumerate() {
|
||||
// only replace it with col(idx) if it is not literal
|
||||
// will try best to determine if it is literal, i.e. for function like `cast(<literal>)` will try
|
||||
@@ -351,7 +361,13 @@ impl TypedExpr {
|
||||
Ok(TypedExpr::new(ret_expr, ret_type))
|
||||
}
|
||||
_var => {
|
||||
if VariadicFunc::is_valid_func_name(fn_name) {
|
||||
if fn_name == TUMBLE_START || fn_name == TUMBLE_END {
|
||||
let (func, arg) = UnaryFunc::from_tumble_func(fn_name, &arg_typed_exprs)?;
|
||||
|
||||
let ret_type = ColumnType::new_nullable(func.signature().output.clone());
|
||||
|
||||
Ok(TypedExpr::new(arg.expr.call_unary(func), ret_type))
|
||||
} else if VariadicFunc::is_valid_func_name(fn_name) {
|
||||
let func = VariadicFunc::from_str_and_types(fn_name, &arg_types)?;
|
||||
let ret_type = ColumnType::new_nullable(func.signature().output.clone());
|
||||
let mut expr = ScalarExpr::CallVariadic {
|
||||
@@ -521,7 +537,6 @@ impl TypedExpr {
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use common_time::{DateTime, Interval};
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::value::Value;
|
||||
use pretty_assertions::assert_eq;
|
||||
@@ -562,7 +577,7 @@ mod test {
|
||||
};
|
||||
let expected = TypedPlan {
|
||||
schema: RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)])
|
||||
.into_named(vec![Some("number".to_string())]),
|
||||
.into_named(vec![Some("numbers.number".to_string())]),
|
||||
plan: Plan::Mfp {
|
||||
input: Box::new(
|
||||
Plan::Get {
|
||||
@@ -576,13 +591,7 @@ mod test {
|
||||
.into_named(vec![Some("number".to_string())]),
|
||||
),
|
||||
),
|
||||
mfp: MapFilterProject::new(1)
|
||||
.map(vec![ScalarExpr::Column(0)])
|
||||
.unwrap()
|
||||
.filter(vec![filter])
|
||||
.unwrap()
|
||||
.project(vec![1])
|
||||
.unwrap(),
|
||||
mfp: MapFilterProject::new(1).filter(vec![filter]).unwrap(),
|
||||
},
|
||||
};
|
||||
assert_eq!(flow_plan.unwrap(), expected);
|
||||
@@ -600,7 +609,7 @@ mod test {
|
||||
|
||||
let expected = TypedPlan {
|
||||
schema: RelationType::new(vec![ColumnType::new(CDT::boolean_datatype(), true)])
|
||||
.into_unnamed(),
|
||||
.into_named(vec![Some("Int64(1) + Int64(1) * Int64(2) - Int64(1) / Int64(1) + Int64(1) % Int64(2) = Int64(3)".to_string())]),
|
||||
plan: Plan::Constant {
|
||||
rows: vec![(
|
||||
repr::Row::new(vec![Value::from(true)]),
|
||||
@@ -624,8 +633,8 @@ mod test {
|
||||
let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan).await;
|
||||
|
||||
let expected = TypedPlan {
|
||||
schema: RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), true)])
|
||||
.into_unnamed(),
|
||||
schema: RelationType::new(vec![ColumnType::new(CDT::int64_datatype(), true)])
|
||||
.into_named(vec![Some("numbers.number + Int64(1)".to_string())]),
|
||||
plan: Plan::Mfp {
|
||||
input: Box::new(
|
||||
Plan::Get {
|
||||
@@ -640,10 +649,12 @@ mod test {
|
||||
),
|
||||
),
|
||||
mfp: MapFilterProject::new(1)
|
||||
.map(vec![ScalarExpr::Column(0).call_binary(
|
||||
ScalarExpr::Literal(Value::from(1u32), CDT::uint32_datatype()),
|
||||
BinaryFunc::AddUInt32,
|
||||
)])
|
||||
.map(vec![ScalarExpr::Column(0)
|
||||
.call_unary(UnaryFunc::Cast(CDT::int64_datatype()))
|
||||
.call_binary(
|
||||
ScalarExpr::Literal(Value::from(1i64), CDT::int64_datatype()),
|
||||
BinaryFunc::AddInt64,
|
||||
)])
|
||||
.unwrap()
|
||||
.project(vec![1])
|
||||
.unwrap(),
|
||||
@@ -663,7 +674,9 @@ mod test {
|
||||
|
||||
let expected = TypedPlan {
|
||||
schema: RelationType::new(vec![ColumnType::new(CDT::int16_datatype(), true)])
|
||||
.into_unnamed(),
|
||||
.into_named(vec![Some(
|
||||
"arrow_cast(Int64(1),Utf8(\"Int16\"))".to_string(),
|
||||
)]),
|
||||
plan: Plan::Constant {
|
||||
// cast of literal is constant folded
|
||||
rows: vec![(repr::Row::new(vec![Value::from(1i16)]), i64::MIN, 1)],
|
||||
@@ -683,7 +696,7 @@ mod test {
|
||||
|
||||
let expected = TypedPlan {
|
||||
schema: RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), true)])
|
||||
.into_unnamed(),
|
||||
.into_named(vec![Some("numbers.number + numbers.number".to_string())]),
|
||||
plan: Plan::Mfp {
|
||||
input: Box::new(
|
||||
Plan::Get {
|
||||
@@ -780,65 +793,5 @@ mod test {
|
||||
},
|
||||
}
|
||||
);
|
||||
|
||||
let f = substrait_proto::proto::expression::ScalarFunction {
|
||||
function_reference: 0,
|
||||
arguments: vec![proto_col(0), lit("1 second"), lit("2021-07-01 00:00:00")],
|
||||
options: vec![],
|
||||
output_type: None,
|
||||
..Default::default()
|
||||
};
|
||||
let input_schema = RelationType::new(vec![
|
||||
ColumnType::new(CDT::timestamp_nanosecond_datatype(), false),
|
||||
ColumnType::new(CDT::string_datatype(), false),
|
||||
])
|
||||
.into_unnamed();
|
||||
let extensions = FunctionExtensions::from_iter(vec![(0, "tumble".to_string())]);
|
||||
let res = TypedExpr::from_substrait_scalar_func(&f, &input_schema, &extensions)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
res,
|
||||
ScalarExpr::CallUnmaterializable(UnmaterializableFunc::TumbleWindow {
|
||||
ts: Box::new(
|
||||
ScalarExpr::Column(0)
|
||||
.with_type(ColumnType::new(CDT::timestamp_nanosecond_datatype(), false))
|
||||
),
|
||||
window_size: Interval::from_month_day_nano(0, 0, 1_000_000_000),
|
||||
start_time: Some(DateTime::new(1625097600000))
|
||||
})
|
||||
.with_type(ColumnType::new(CDT::timestamp_millisecond_datatype(), true)),
|
||||
);
|
||||
|
||||
let f = substrait_proto::proto::expression::ScalarFunction {
|
||||
function_reference: 0,
|
||||
arguments: vec![proto_col(0), lit("1 second")],
|
||||
options: vec![],
|
||||
output_type: None,
|
||||
..Default::default()
|
||||
};
|
||||
let input_schema = RelationType::new(vec![
|
||||
ColumnType::new(CDT::timestamp_nanosecond_datatype(), false),
|
||||
ColumnType::new(CDT::string_datatype(), false),
|
||||
])
|
||||
.into_unnamed();
|
||||
let extensions = FunctionExtensions::from_iter(vec![(0, "tumble".to_string())]);
|
||||
let res = TypedExpr::from_substrait_scalar_func(&f, &input_schema, &extensions)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
res,
|
||||
ScalarExpr::CallUnmaterializable(UnmaterializableFunc::TumbleWindow {
|
||||
ts: Box::new(
|
||||
ScalarExpr::Column(0)
|
||||
.with_type(ColumnType::new(CDT::timestamp_nanosecond_datatype(), false))
|
||||
),
|
||||
window_size: Interval::from_month_day_nano(0, 0, 1_000_000_000),
|
||||
start_time: None
|
||||
})
|
||||
.with_type(ColumnType::new(CDT::timestamp_millisecond_datatype(), true)),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -340,6 +340,8 @@ pub fn from_substrait_type(null_type: &substrait_proto::proto::Type) -> Result<C
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
use super::*;
|
||||
use crate::plan::{Plan, TypedPlan};
|
||||
use crate::repr::{self, ColumnType, RelationType};
|
||||
@@ -356,7 +358,7 @@ mod test {
|
||||
|
||||
let expected = TypedPlan {
|
||||
schema: RelationType::new(vec![ColumnType::new(CDT::int64_datatype(), true)])
|
||||
.into_unnamed(),
|
||||
.into_named(vec![Some("Int64(1)".to_string())]),
|
||||
plan: Plan::Constant {
|
||||
rows: vec![(
|
||||
repr::Row::new(vec![Value::Int64(1)]),
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::{BTreeMap, BTreeSet, HashSet};
|
||||
use std::collections::HashSet;
|
||||
|
||||
use itertools::Itertools;
|
||||
use snafu::OptionExt;
|
||||
@@ -23,9 +23,9 @@ use substrait_proto::proto::rel::RelType;
|
||||
use substrait_proto::proto::{plan_rel, Plan as SubPlan, ProjectRel, Rel};
|
||||
|
||||
use crate::error::{Error, InvalidQuerySnafu, NotImplementedSnafu, PlanSnafu, UnexpectedSnafu};
|
||||
use crate::expr::{MapFilterProject, ScalarExpr, TypedExpr, UnaryFunc};
|
||||
use crate::plan::{KeyValPlan, Plan, TypedPlan};
|
||||
use crate::repr::{self, RelationDesc, RelationType};
|
||||
use crate::expr::{MapFilterProject, TypedExpr};
|
||||
use crate::plan::{Plan, TypedPlan};
|
||||
use crate::repr::{self, RelationType};
|
||||
use crate::transform::{substrait_proto, FlownodeContext, FunctionExtensions};
|
||||
|
||||
impl TypedPlan {
|
||||
@@ -49,7 +49,14 @@ impl TypedPlan {
|
||||
let input = root.input.as_ref().with_context(|| InvalidQuerySnafu {
|
||||
reason: "Root relation without input",
|
||||
})?;
|
||||
Ok(TypedPlan::from_substrait_rel(ctx, input, &function_extension).await?)
|
||||
|
||||
let mut ret = TypedPlan::from_substrait_rel(ctx, input, &function_extension).await?;
|
||||
|
||||
if !root.names.is_empty() {
|
||||
ret.schema = ret.schema.clone().try_with_names(root.names.clone())?;
|
||||
}
|
||||
|
||||
Ok(ret)
|
||||
}
|
||||
},
|
||||
None => plan_err!("Cannot parse plan relation: None")
|
||||
@@ -115,17 +122,6 @@ impl TypedPlan {
|
||||
plan,
|
||||
})
|
||||
} else {
|
||||
match input.plan.clone() {
|
||||
Plan::Reduce { key_val_plan, .. } => {
|
||||
rewrite_projection_after_reduce(key_val_plan, &input.schema, &mut exprs)?;
|
||||
}
|
||||
Plan::Mfp { input, mfp: _ } => {
|
||||
if let Plan::Reduce { key_val_plan, .. } = input.plan {
|
||||
rewrite_projection_after_reduce(key_val_plan, &input.schema, &mut exprs)?;
|
||||
}
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
input.projection(exprs)
|
||||
}
|
||||
}
|
||||
@@ -233,120 +229,13 @@ impl TypedPlan {
|
||||
}
|
||||
}
|
||||
|
||||
/// if reduce_plan contains the special function like tumble floor/ceiling, add them to the proj_exprs
|
||||
/// so the effect is the window_start, window_end column are auto added to output rows
|
||||
///
|
||||
/// This is to fix a problem that we have certain functions that return two values, but since substrait doesn't know that, it will assume it return one value
|
||||
/// this function fix that and rewrite `proj_exprs` to correct form
|
||||
fn rewrite_projection_after_reduce(
|
||||
key_val_plan: KeyValPlan,
|
||||
reduce_output_type: &RelationDesc,
|
||||
proj_exprs: &mut Vec<TypedExpr>,
|
||||
) -> Result<(), Error> {
|
||||
// TODO(discord9): get keys correctly
|
||||
let key_exprs = key_val_plan
|
||||
.key_plan
|
||||
.projection
|
||||
.clone()
|
||||
.into_iter()
|
||||
.map(|i| {
|
||||
if i < key_val_plan.key_plan.input_arity {
|
||||
ScalarExpr::Column(i)
|
||||
} else {
|
||||
key_val_plan.key_plan.expressions[i - key_val_plan.key_plan.input_arity].clone()
|
||||
}
|
||||
})
|
||||
.collect_vec();
|
||||
let mut shift_offset = 0;
|
||||
let mut shuffle: BTreeMap<usize, usize> = BTreeMap::new();
|
||||
let special_keys = key_exprs
|
||||
.clone()
|
||||
.into_iter()
|
||||
.enumerate()
|
||||
.filter(|(idx, p)| {
|
||||
shuffle.insert(*idx, *idx + shift_offset);
|
||||
if matches!(
|
||||
p,
|
||||
ScalarExpr::CallUnary {
|
||||
func: UnaryFunc::TumbleWindowFloor { .. },
|
||||
..
|
||||
} | ScalarExpr::CallUnary {
|
||||
func: UnaryFunc::TumbleWindowCeiling { .. },
|
||||
..
|
||||
}
|
||||
) {
|
||||
if matches!(
|
||||
p,
|
||||
ScalarExpr::CallUnary {
|
||||
func: UnaryFunc::TumbleWindowFloor { .. },
|
||||
..
|
||||
}
|
||||
) {
|
||||
shift_offset += 1;
|
||||
}
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
})
|
||||
.collect_vec();
|
||||
let spec_key_arity = special_keys.len();
|
||||
if spec_key_arity == 0 {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// shuffle proj_exprs
|
||||
// because substrait use offset while assume `tumble` only return one value
|
||||
for proj_expr in proj_exprs.iter_mut() {
|
||||
proj_expr.expr.permute_map(&shuffle)?;
|
||||
} // add key to the end
|
||||
for (key_idx, _key_expr) in special_keys {
|
||||
// here we assume the output type of reduce operator(`reduce_output_type`) is just first keys columns, then append value columns
|
||||
// so we can use `key_idx` to index `reduce_output_type` and get the keys we need to append to `proj_exprs`
|
||||
proj_exprs.push(
|
||||
ScalarExpr::Column(key_idx)
|
||||
.with_type(reduce_output_type.typ().column_types[key_idx].clone()),
|
||||
);
|
||||
}
|
||||
|
||||
// check if normal expr in group exprs are all in proj_exprs
|
||||
let all_cols_ref_in_proj: BTreeSet<usize> = proj_exprs
|
||||
.iter()
|
||||
.filter_map(|e| {
|
||||
if let ScalarExpr::Column(i) = &e.expr {
|
||||
Some(*i)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
for (key_idx, key_expr) in key_exprs.iter().enumerate() {
|
||||
if let ScalarExpr::Column(_) = key_expr {
|
||||
if !all_cols_ref_in_proj.contains(&key_idx) {
|
||||
let err_msg = format!(
|
||||
"Expect normal column in group by also appear in projection, but column {}(name is {}) is missing",
|
||||
key_idx,
|
||||
reduce_output_type
|
||||
.get_name(key_idx)
|
||||
.clone()
|
||||
.map(|s|format!("'{}'",s))
|
||||
.unwrap_or("unknown".to_string())
|
||||
);
|
||||
return InvalidQuerySnafu { reason: err_msg }.fail();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
use super::*;
|
||||
use crate::expr::{GlobalId, ScalarExpr};
|
||||
use crate::expr::GlobalId;
|
||||
use crate::plan::{Plan, TypedPlan};
|
||||
use crate::repr::{ColumnType, RelationType};
|
||||
use crate::transform::test::{create_test_ctx, create_test_query_engine, sql_to_substrait};
|
||||
@@ -363,7 +252,7 @@ mod test {
|
||||
|
||||
let expected = TypedPlan {
|
||||
schema: RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)])
|
||||
.into_named(vec![Some("number".to_string())]),
|
||||
.into_named(vec![Some("numbers.number".to_string())]),
|
||||
plan: Plan::Mfp {
|
||||
input: Box::new(
|
||||
Plan::Get {
|
||||
@@ -377,11 +266,7 @@ mod test {
|
||||
.into_named(vec![Some("number".to_string())]),
|
||||
),
|
||||
),
|
||||
mfp: MapFilterProject::new(1)
|
||||
.map(vec![ScalarExpr::Column(0)])
|
||||
.unwrap()
|
||||
.project(vec![1])
|
||||
.unwrap(),
|
||||
mfp: MapFilterProject::new(1),
|
||||
},
|
||||
};
|
||||
|
||||
|
||||
@@ -513,7 +513,7 @@ pub type ArrangeReader<'a> = tokio::sync::RwLockReadGuard<'a, Arrangement>;
|
||||
pub type ArrangeWriter<'a> = tokio::sync::RwLockWriteGuard<'a, Arrangement>;
|
||||
|
||||
/// A handler to the inner Arrangement, can be cloned and shared, useful for query it's inner state
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ArrangeHandler {
|
||||
inner: Arc<RwLock<Arrangement>>,
|
||||
}
|
||||
|
||||
@@ -32,6 +32,7 @@ use tokio::time::{Duration, Instant};
|
||||
use crate::error;
|
||||
use crate::error::Result;
|
||||
use crate::frontend::FrontendOptions;
|
||||
use crate::metrics::{HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT};
|
||||
|
||||
pub mod handler;
|
||||
|
||||
@@ -94,10 +95,16 @@ impl HeartbeatTask {
|
||||
let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), resp);
|
||||
if let Err(e) = capture_self.handle_response(ctx).await {
|
||||
error!(e; "Error while handling heartbeat response");
|
||||
HEARTBEAT_RECV_COUNT
|
||||
.with_label_values(&["processing_error"])
|
||||
.inc();
|
||||
} else {
|
||||
HEARTBEAT_RECV_COUNT.with_label_values(&["success"]).inc();
|
||||
}
|
||||
}
|
||||
Ok(None) => break,
|
||||
Err(e) => {
|
||||
HEARTBEAT_RECV_COUNT.with_label_values(&["error"]).inc();
|
||||
error!(e; "Occur error while reading heartbeat response");
|
||||
capture_self
|
||||
.start_with_retry(Duration::from_millis(retry_interval))
|
||||
@@ -180,6 +187,7 @@ impl HeartbeatTask {
|
||||
error!(e; "Failed to send heartbeat to metasrv");
|
||||
break;
|
||||
} else {
|
||||
HEARTBEAT_SENT_COUNT.inc();
|
||||
debug!("Send a heartbeat request to metasrv, content: {:?}", req);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -79,9 +79,9 @@ pub use standalone::StandaloneDatanodeManager;
|
||||
|
||||
use self::prom_store::ExportMetricHandler;
|
||||
use crate::error::{
|
||||
self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, ParseSqlSnafu,
|
||||
PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu, StartServerSnafu,
|
||||
TableOperationSnafu,
|
||||
self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, InvalidSqlSnafu,
|
||||
ParseSqlSnafu, PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu,
|
||||
StartServerSnafu, TableOperationSnafu,
|
||||
};
|
||||
use crate::frontend::FrontendOptions;
|
||||
use crate::heartbeat::HeartbeatTask;
|
||||
@@ -452,6 +452,9 @@ pub fn check_permission(
|
||||
| Statement::DropDatabase(_)
|
||||
| Statement::DropFlow(_)
|
||||
| Statement::Use(_) => {}
|
||||
Statement::ShowCreateDatabase(stmt) => {
|
||||
validate_database(&stmt.database_name, query_ctx)?;
|
||||
}
|
||||
Statement::ShowCreateTable(stmt) => {
|
||||
validate_param(&stmt.table_name, query_ctx)?;
|
||||
}
|
||||
@@ -527,8 +530,8 @@ pub fn check_permission(
|
||||
},
|
||||
Statement::Copy(sql::statements::copy::Copy::CopyDatabase(copy_database)) => {
|
||||
match copy_database {
|
||||
CopyDatabase::To(stmt) => validate_param(&stmt.database_name, query_ctx)?,
|
||||
CopyDatabase::From(stmt) => validate_param(&stmt.database_name, query_ctx)?,
|
||||
CopyDatabase::To(stmt) => validate_database(&stmt.database_name, query_ctx)?,
|
||||
CopyDatabase::From(stmt) => validate_database(&stmt.database_name, query_ctx)?,
|
||||
}
|
||||
}
|
||||
Statement::TruncateTable(stmt) => {
|
||||
@@ -548,6 +551,26 @@ fn validate_param(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()>
|
||||
.context(SqlExecInterceptedSnafu)
|
||||
}
|
||||
|
||||
fn validate_database(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
|
||||
let (catalog, schema) = match &name.0[..] {
|
||||
[schema] => (
|
||||
query_ctx.current_catalog().to_string(),
|
||||
schema.value.clone(),
|
||||
),
|
||||
[catalog, schema] => (catalog.value.clone(), schema.value.clone()),
|
||||
_ => InvalidSqlSnafu {
|
||||
err_msg: format!(
|
||||
"expect database name to be <catalog>.<schema> or <schema>, actual: {name}",
|
||||
),
|
||||
}
|
||||
.fail()?,
|
||||
};
|
||||
|
||||
validate_catalog_and_schema(&catalog, &schema, query_ctx)
|
||||
.map_err(BoxedError::new)
|
||||
.context(SqlExecInterceptedSnafu)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::HashMap;
|
||||
|
||||
@@ -51,4 +51,17 @@ lazy_static! {
|
||||
"frontend otlp traces rows"
|
||||
)
|
||||
.unwrap();
|
||||
/// The number of heartbeats send by frontend node.
|
||||
pub static ref HEARTBEAT_SENT_COUNT: IntCounter = register_int_counter!(
|
||||
"greptime_frontend_heartbeat_send_count",
|
||||
"frontend heartbeat sent",
|
||||
)
|
||||
.unwrap();
|
||||
/// The number of heartbeats received by frontend node, labeled with result type.
|
||||
pub static ref HEARTBEAT_RECV_COUNT: IntCounterVec = register_int_counter_vec!(
|
||||
"greptime_frontend_heartbeat_recv_count",
|
||||
"frontend heartbeat received",
|
||||
&["result"]
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
@@ -45,4 +45,7 @@ lazy_static! {
|
||||
/// Meta kv cache miss counter.
|
||||
pub static ref METRIC_META_KV_CACHE_MISS: IntCounterVec =
|
||||
register_int_counter_vec!("greptime_meta_kv_cache_miss", "meta kv cache miss", &["op"]).unwrap();
|
||||
// Heartbeat received by metasrv.
|
||||
pub static ref METRIC_META_HEARTBEAT_RECV: IntCounterVec =
|
||||
register_int_counter_vec!("greptime_meta_heartbeat_recv", "heartbeats received by metasrv", &["pusher_key"]).unwrap();
|
||||
}
|
||||
|
||||
@@ -32,6 +32,7 @@ use crate::error;
|
||||
use crate::error::Result;
|
||||
use crate::handler::{HeartbeatHandlerGroup, Pusher};
|
||||
use crate::metasrv::{Context, Metasrv};
|
||||
use crate::metrics::METRIC_META_HEARTBEAT_RECV;
|
||||
use crate::service::{GrpcResult, GrpcStream};
|
||||
|
||||
#[async_trait::async_trait]
|
||||
@@ -65,7 +66,11 @@ impl heartbeat_server::Heartbeat for Metasrv {
|
||||
if pusher_key.is_none() {
|
||||
pusher_key = register_pusher(&handler_group, header, tx.clone()).await;
|
||||
}
|
||||
|
||||
if let Some(k) = &pusher_key {
|
||||
METRIC_META_HEARTBEAT_RECV.with_label_values(&[k]);
|
||||
} else {
|
||||
METRIC_META_HEARTBEAT_RECV.with_label_values(&["none"]);
|
||||
}
|
||||
let res = handler_group
|
||||
.handle(req, ctx.clone())
|
||||
.await
|
||||
|
||||
1
src/mito2/src/cache/file_cache.rs
vendored
1
src/mito2/src/cache/file_cache.rs
vendored
@@ -179,7 +179,6 @@ impl FileCache {
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
/// Removes a file from the cache explicitly.
|
||||
pub(crate) async fn remove(&self, key: IndexKey) {
|
||||
let file_path = self.cache_file_path(key);
|
||||
|
||||
87
src/mito2/src/cache/write_cache.rs
vendored
87
src/mito2/src/cache/write_cache.rs
vendored
@@ -27,7 +27,10 @@ use snafu::ResultExt;
|
||||
use crate::access_layer::{new_fs_cache_store, SstWriteRequest};
|
||||
use crate::cache::file_cache::{FileCache, FileCacheRef, FileType, IndexKey, IndexValue};
|
||||
use crate::error::{self, Result};
|
||||
use crate::metrics::{FLUSH_ELAPSED, UPLOAD_BYTES_TOTAL};
|
||||
use crate::metrics::{
|
||||
FLUSH_ELAPSED, UPLOAD_BYTES_TOTAL, WRITE_CACHE_DOWNLOAD_BYTES_TOTAL,
|
||||
WRITE_CACHE_DOWNLOAD_ELAPSED,
|
||||
};
|
||||
use crate::sst::index::intermediate::IntermediateManager;
|
||||
use crate::sst::index::puffin_manager::PuffinManagerFactory;
|
||||
use crate::sst::index::IndexerBuilder;
|
||||
@@ -166,6 +169,81 @@ impl WriteCache {
|
||||
Ok(Some(sst_info))
|
||||
}
|
||||
|
||||
/// Removes a file from the cache by `index_key`.
|
||||
pub(crate) async fn remove(&self, index_key: IndexKey) {
|
||||
self.file_cache.remove(index_key).await
|
||||
}
|
||||
|
||||
/// Downloads a file in `remote_path` from the remote object store to the local cache
|
||||
/// (specified by `index_key`).
|
||||
pub(crate) async fn download(
|
||||
&self,
|
||||
index_key: IndexKey,
|
||||
remote_path: &str,
|
||||
remote_store: &ObjectStore,
|
||||
file_size: u64,
|
||||
) -> Result<()> {
|
||||
const DOWNLOAD_READER_CONCURRENCY: usize = 8;
|
||||
const DOWNLOAD_READER_CHUNK_SIZE: ReadableSize = ReadableSize::mb(8);
|
||||
|
||||
let file_type = index_key.file_type;
|
||||
let timer = WRITE_CACHE_DOWNLOAD_ELAPSED
|
||||
.with_label_values(&[match file_type {
|
||||
FileType::Parquet => "download_parquet",
|
||||
FileType::Puffin => "download_puffin",
|
||||
}])
|
||||
.start_timer();
|
||||
|
||||
let reader = remote_store
|
||||
.reader_with(remote_path)
|
||||
.concurrent(DOWNLOAD_READER_CONCURRENCY)
|
||||
.chunk(DOWNLOAD_READER_CHUNK_SIZE.as_bytes() as usize)
|
||||
.await
|
||||
.context(error::OpenDalSnafu)?
|
||||
.into_futures_async_read(0..file_size)
|
||||
.await
|
||||
.context(error::OpenDalSnafu)?;
|
||||
|
||||
let cache_path = self.file_cache.cache_file_path(index_key);
|
||||
let mut writer = self
|
||||
.file_cache
|
||||
.local_store()
|
||||
.writer(&cache_path)
|
||||
.await
|
||||
.context(error::OpenDalSnafu)?
|
||||
.into_futures_async_write();
|
||||
|
||||
let region_id = index_key.region_id;
|
||||
let file_id = index_key.file_id;
|
||||
let bytes_written =
|
||||
futures::io::copy(reader, &mut writer)
|
||||
.await
|
||||
.context(error::DownloadSnafu {
|
||||
region_id,
|
||||
file_id,
|
||||
file_type,
|
||||
})?;
|
||||
writer.close().await.context(error::DownloadSnafu {
|
||||
region_id,
|
||||
file_id,
|
||||
file_type,
|
||||
})?;
|
||||
|
||||
WRITE_CACHE_DOWNLOAD_BYTES_TOTAL.inc_by(bytes_written);
|
||||
|
||||
let elapsed = timer.stop_and_record();
|
||||
debug!(
|
||||
"Successfully download file '{}' to local '{}', file size: {}, region: {}, cost: {:?}s",
|
||||
remote_path, cache_path, bytes_written, region_id, elapsed,
|
||||
);
|
||||
|
||||
let index_value = IndexValue {
|
||||
file_size: bytes_written as _,
|
||||
};
|
||||
self.file_cache.put(index_key, index_value).await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Uploads a Parquet file or a Puffin file to the remote object store.
|
||||
async fn upload(
|
||||
&self,
|
||||
@@ -351,6 +429,13 @@ mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(remote_index_data.to_vec(), cache_index_data.to_vec());
|
||||
|
||||
// Removes the file from the cache.
|
||||
let sst_index_key = IndexKey::new(region_id, file_id, FileType::Parquet);
|
||||
write_cache.remove(sst_index_key).await;
|
||||
assert!(!write_cache.file_cache.contains_key(&sst_index_key));
|
||||
write_cache.remove(index_key).await;
|
||||
assert!(!write_cache.file_cache.contains_key(&index_key));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
@@ -12,21 +12,96 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Duration;
|
||||
|
||||
use object_store::ObjectStore;
|
||||
use store_api::region_engine::RegionEngine;
|
||||
use store_api::region_request::RegionRequest;
|
||||
use store_api::storage::RegionId;
|
||||
use tokio::sync::Barrier;
|
||||
use tokio::sync::{oneshot, Barrier};
|
||||
|
||||
use crate::config::MitoConfig;
|
||||
use crate::engine::listener::EventListener;
|
||||
use crate::engine::MitoEngine;
|
||||
use crate::manifest::action::RegionEdit;
|
||||
use crate::region::MitoRegionRef;
|
||||
use crate::sst::file::{FileId, FileMeta};
|
||||
use crate::test_util::{CreateRequestBuilder, TestEnv};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_edit_region_fill_cache() {
|
||||
let mut env = TestEnv::new();
|
||||
|
||||
struct EditRegionListener {
|
||||
tx: Mutex<Option<oneshot::Sender<FileId>>>,
|
||||
}
|
||||
|
||||
impl EventListener for EditRegionListener {
|
||||
fn on_file_cache_filled(&self, file_id: FileId) {
|
||||
let mut tx = self.tx.lock().unwrap();
|
||||
tx.take().unwrap().send(file_id).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let engine = env
|
||||
.create_engine_with(
|
||||
MitoConfig {
|
||||
// Write cache must be enabled to download the ingested SST file.
|
||||
enable_experimental_write_cache: true,
|
||||
..Default::default()
|
||||
},
|
||||
None,
|
||||
Some(Arc::new(EditRegionListener {
|
||||
tx: Mutex::new(Some(tx)),
|
||||
})),
|
||||
)
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
engine
|
||||
.handle_request(
|
||||
region_id,
|
||||
RegionRequest::Create(CreateRequestBuilder::new().build()),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let region = engine.get_region(region_id).unwrap();
|
||||
|
||||
let file_id = FileId::random();
|
||||
// Simulating the ingestion of an SST file.
|
||||
env.get_object_store()
|
||||
.unwrap()
|
||||
.write(
|
||||
&format!("{}/{}.parquet", region.region_dir(), file_id),
|
||||
b"x".as_slice(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let edit = RegionEdit {
|
||||
files_to_add: vec![FileMeta {
|
||||
region_id: region.region_id,
|
||||
file_id,
|
||||
level: 0,
|
||||
..Default::default()
|
||||
}],
|
||||
files_to_remove: vec![],
|
||||
compaction_time_window: None,
|
||||
flushed_entry_id: None,
|
||||
flushed_sequence: None,
|
||||
};
|
||||
engine.edit_region(region.region_id, edit).await.unwrap();
|
||||
|
||||
// Asserts that the background downloading of the SST is succeeded.
|
||||
let actual = tokio::time::timeout(Duration::from_secs(9), rx)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(file_id, actual);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
async fn test_edit_region_concurrently() {
|
||||
const EDITS_PER_TASK: usize = 10;
|
||||
|
||||
@@ -22,6 +22,8 @@ use common_telemetry::info;
|
||||
use store_api::storage::RegionId;
|
||||
use tokio::sync::Notify;
|
||||
|
||||
use crate::sst::file::FileId;
|
||||
|
||||
/// Mito engine background event listener.
|
||||
#[async_trait]
|
||||
pub trait EventListener: Send + Sync {
|
||||
@@ -61,6 +63,9 @@ pub trait EventListener: Send + Sync {
|
||||
fn on_recv_requests(&self, request_num: usize) {
|
||||
let _ = request_num;
|
||||
}
|
||||
|
||||
/// Notifies the listener that the file cache is filled when, for example, editing region.
|
||||
fn on_file_cache_filled(&self, _file_id: FileId) {}
|
||||
}
|
||||
|
||||
pub type EventListenerRef = Arc<dyn EventListener>;
|
||||
|
||||
@@ -639,6 +639,22 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Failed to download file, region_id: {}, file_id: {}, file_type: {:?}",
|
||||
region_id,
|
||||
file_id,
|
||||
file_type,
|
||||
))]
|
||||
Download {
|
||||
region_id: RegionId,
|
||||
file_id: FileId,
|
||||
file_type: FileType,
|
||||
#[snafu(source)]
|
||||
error: std::io::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Failed to upload file, region_id: {}, file_id: {}, file_type: {:?}",
|
||||
region_id,
|
||||
@@ -965,7 +981,7 @@ impl ErrorExt for Error {
|
||||
|
||||
FilterRecordBatch { source, .. } => source.status_code(),
|
||||
|
||||
Upload { .. } => StatusCode::StorageUnavailable,
|
||||
Download { .. } | Upload { .. } => StatusCode::StorageUnavailable,
|
||||
ChecksumMismatch { .. } => StatusCode::Unexpected,
|
||||
RegionStopped { .. } => StatusCode::RegionNotReady,
|
||||
TimeRangePredicateOverflow { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
@@ -189,6 +189,17 @@ lazy_static! {
|
||||
&[TYPE_LABEL]
|
||||
)
|
||||
.unwrap();
|
||||
/// Download bytes counter in the write cache.
|
||||
pub static ref WRITE_CACHE_DOWNLOAD_BYTES_TOTAL: IntCounter = register_int_counter!(
|
||||
"mito_write_cache_download_bytes_total",
|
||||
"mito write cache download bytes total",
|
||||
).unwrap();
|
||||
/// Timer of the downloading task in the write cache.
|
||||
pub static ref WRITE_CACHE_DOWNLOAD_ELAPSED: HistogramVec = register_histogram_vec!(
|
||||
"mito_write_cache_download_elapsed",
|
||||
"mito write cache download elapsed",
|
||||
&[TYPE_LABEL],
|
||||
).unwrap();
|
||||
/// Upload bytes counter.
|
||||
pub static ref UPLOAD_BYTES_TOTAL: IntCounter = register_int_counter!(
|
||||
"mito_upload_bytes_total",
|
||||
|
||||
@@ -213,6 +213,10 @@ impl RowGroupLastRowReader {
|
||||
/// Updates row group's last row cache if cache manager is present.
|
||||
fn maybe_update_cache(&mut self) {
|
||||
if let Some(cache) = &self.cache_manager {
|
||||
if self.yielded_batches.is_empty() {
|
||||
// we always expect that row groups yields batches.
|
||||
return;
|
||||
}
|
||||
let value = Arc::new(SelectorResultValue {
|
||||
result: std::mem::take(&mut self.yielded_batches),
|
||||
projection: self
|
||||
|
||||
@@ -25,7 +25,7 @@ use common_time::range::TimestampRange;
|
||||
use common_time::Timestamp;
|
||||
use datafusion::physical_plan::DisplayFormatType;
|
||||
use smallvec::SmallVec;
|
||||
use store_api::region_engine::RegionScannerRef;
|
||||
use store_api::region_engine::{PartitionRange, RegionScannerRef};
|
||||
use store_api::storage::{ScanRequest, TimeSeriesRowSelector};
|
||||
use table::predicate::{build_time_range_predicate, Predicate};
|
||||
use tokio::sync::{mpsc, Mutex, Semaphore};
|
||||
@@ -705,6 +705,37 @@ impl ScanInput {
|
||||
let rows_in_memtables: usize = self.memtables.iter().map(|m| m.stats().num_rows()).sum();
|
||||
rows_in_files + rows_in_memtables
|
||||
}
|
||||
|
||||
/// Retrieves [`PartitionRange`] from memtable and files
|
||||
pub(crate) fn partition_ranges(&self) -> Vec<PartitionRange> {
|
||||
let mut id = 0;
|
||||
let mut container = Vec::with_capacity(self.memtables.len() + self.files.len());
|
||||
|
||||
for memtable in &self.memtables {
|
||||
let range = PartitionRange {
|
||||
// TODO(ruihang): filter out empty memtables in the future.
|
||||
start: memtable.stats().time_range().unwrap().0,
|
||||
end: memtable.stats().time_range().unwrap().1,
|
||||
num_rows: memtable.stats().num_rows(),
|
||||
identifier: id,
|
||||
};
|
||||
id += 1;
|
||||
container.push(range);
|
||||
}
|
||||
|
||||
for file in &self.files {
|
||||
let range = PartitionRange {
|
||||
start: file.meta_ref().time_range.0,
|
||||
end: file.meta_ref().time_range.1,
|
||||
num_rows: file.meta_ref().num_rows as usize,
|
||||
identifier: id,
|
||||
};
|
||||
id += 1;
|
||||
container.push(range);
|
||||
}
|
||||
|
||||
container
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -67,10 +67,11 @@ impl SeqScan {
|
||||
/// Creates a new [SeqScan].
|
||||
pub(crate) fn new(input: ScanInput) -> Self {
|
||||
let parallelism = input.parallelism.parallelism.max(1);
|
||||
let properties = ScannerProperties::default()
|
||||
let mut properties = ScannerProperties::default()
|
||||
.with_parallelism(parallelism)
|
||||
.with_append_mode(input.append_mode)
|
||||
.with_total_rows(input.total_rows());
|
||||
properties.partitions = vec![input.partition_ranges()];
|
||||
let stream_ctx = Arc::new(StreamContext::new(input));
|
||||
|
||||
Self {
|
||||
|
||||
@@ -18,6 +18,7 @@ use std::sync::Arc;
|
||||
use common_telemetry::{error, info};
|
||||
|
||||
use crate::access_layer::AccessLayerRef;
|
||||
use crate::cache::file_cache::{FileType, IndexKey};
|
||||
use crate::cache::CacheManagerRef;
|
||||
use crate::schedule::scheduler::SchedulerRef;
|
||||
use crate::sst::file::FileMeta;
|
||||
@@ -77,9 +78,10 @@ impl FilePurger for LocalFilePurger {
|
||||
cache.remove_parquet_meta_data(file_meta.region_id, file_meta.file_id);
|
||||
}
|
||||
|
||||
let cache_manager = self.cache_manager.clone();
|
||||
if let Err(e) = self.scheduler.schedule(Box::pin(async move {
|
||||
if let Err(e) = sst_layer.delete_sst(&file_meta).await {
|
||||
error!(e; "Failed to delete SST file, file_id: {}, region: {}",
|
||||
error!(e; "Failed to delete SST file, file_id: {}, region: {}",
|
||||
file_meta.file_id, file_meta.region_id);
|
||||
} else {
|
||||
info!(
|
||||
@@ -87,6 +89,28 @@ impl FilePurger for LocalFilePurger {
|
||||
file_meta.file_id, file_meta.region_id
|
||||
);
|
||||
}
|
||||
|
||||
if let Some(write_cache) = cache_manager.as_ref().and_then(|cache| cache.write_cache())
|
||||
{
|
||||
// Removes the inverted index from the cache.
|
||||
if file_meta.inverted_index_available() {
|
||||
write_cache
|
||||
.remove(IndexKey::new(
|
||||
file_meta.region_id,
|
||||
file_meta.file_id,
|
||||
FileType::Puffin,
|
||||
))
|
||||
.await;
|
||||
}
|
||||
// Remove the SST file from the cache.
|
||||
write_cache
|
||||
.remove(IndexKey::new(
|
||||
file_meta.region_id,
|
||||
file_meta.file_id,
|
||||
FileType::Parquet,
|
||||
))
|
||||
.await;
|
||||
}
|
||||
})) {
|
||||
error!(e; "Failed to schedule the file purge request");
|
||||
}
|
||||
|
||||
@@ -364,7 +364,7 @@ impl TestEnv {
|
||||
.as_path()
|
||||
.display()
|
||||
.to_string();
|
||||
let mut builder = Fs::default();
|
||||
let builder = Fs::default();
|
||||
let object_store = ObjectStore::new(builder.root(&data_path)).unwrap().finish();
|
||||
object_store_manager.add(storage_name, object_store);
|
||||
}
|
||||
|
||||
@@ -57,6 +57,7 @@ use crate::request::{
|
||||
BackgroundNotify, DdlRequest, SenderDdlRequest, SenderWriteRequest, WorkerRequest,
|
||||
};
|
||||
use crate::schedule::scheduler::{LocalScheduler, SchedulerRef};
|
||||
use crate::sst::file::FileId;
|
||||
use crate::sst::index::intermediate::IntermediateManager;
|
||||
use crate::sst::index::puffin_manager::PuffinManagerFactory;
|
||||
use crate::time_provider::{StdTimeProvider, TimeProviderRef};
|
||||
@@ -950,6 +951,13 @@ impl WorkerListener {
|
||||
// Avoid compiler warning.
|
||||
let _ = request_num;
|
||||
}
|
||||
|
||||
pub(crate) fn on_file_cache_filled(&self, _file_id: FileId) {
|
||||
#[cfg(any(test, feature = "test"))]
|
||||
if let Some(listener) = &self.listener {
|
||||
listener.on_file_cache_filled(_file_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -21,6 +21,8 @@ use std::collections::{HashMap, VecDeque};
|
||||
use common_telemetry::{info, warn};
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::cache::file_cache::{FileType, IndexKey};
|
||||
use crate::cache::CacheManagerRef;
|
||||
use crate::error::{RegionBusySnafu, RegionNotFoundSnafu, Result};
|
||||
use crate::manifest::action::{
|
||||
RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList, RegionTruncate,
|
||||
@@ -30,7 +32,8 @@ use crate::request::{
|
||||
BackgroundNotify, OptionOutputTx, RegionChangeResult, RegionEditRequest, RegionEditResult,
|
||||
TruncateResult, WorkerRequest,
|
||||
};
|
||||
use crate::worker::RegionWorkerLoop;
|
||||
use crate::sst::location;
|
||||
use crate::worker::{RegionWorkerLoop, WorkerListener};
|
||||
|
||||
pub(crate) type RegionEditQueues = HashMap<RegionId, RegionEditQueue>;
|
||||
|
||||
@@ -105,10 +108,12 @@ impl<S> RegionWorkerLoop<S> {
|
||||
}
|
||||
|
||||
let request_sender = self.sender.clone();
|
||||
let cache_manager = self.cache_manager.clone();
|
||||
let listener = self.listener.clone();
|
||||
// Now the region is in editing state.
|
||||
// Updates manifest in background.
|
||||
common_runtime::spawn_global(async move {
|
||||
let result = edit_region(®ion, edit.clone()).await;
|
||||
let result = edit_region(®ion, edit.clone(), cache_manager, listener).await;
|
||||
let notify = WorkerRequest::Background {
|
||||
region_id,
|
||||
notify: BackgroundNotify::RegionEdit(RegionEditResult {
|
||||
@@ -286,8 +291,41 @@ impl<S> RegionWorkerLoop<S> {
|
||||
}
|
||||
|
||||
/// Checks the edit, writes and applies it.
|
||||
async fn edit_region(region: &MitoRegionRef, edit: RegionEdit) -> Result<()> {
|
||||
async fn edit_region(
|
||||
region: &MitoRegionRef,
|
||||
edit: RegionEdit,
|
||||
cache_manager: CacheManagerRef,
|
||||
listener: WorkerListener,
|
||||
) -> Result<()> {
|
||||
let region_id = region.region_id;
|
||||
if let Some(write_cache) = cache_manager.write_cache() {
|
||||
for file_meta in &edit.files_to_add {
|
||||
let write_cache = write_cache.clone();
|
||||
let layer = region.access_layer.clone();
|
||||
let listener = listener.clone();
|
||||
|
||||
let index_key = IndexKey::new(region_id, file_meta.file_id, FileType::Parquet);
|
||||
let remote_path = location::sst_file_path(layer.region_dir(), file_meta.file_id);
|
||||
let file_size = file_meta.file_size;
|
||||
common_runtime::spawn_global(async move {
|
||||
if write_cache
|
||||
.download(index_key, &remote_path, layer.object_store(), file_size)
|
||||
.await
|
||||
.is_ok()
|
||||
{
|
||||
// Triggers the filling of the parquet metadata cache.
|
||||
// The parquet file is already downloaded.
|
||||
let _ = write_cache
|
||||
.file_cache()
|
||||
.get_parquet_meta_data(index_key)
|
||||
.await;
|
||||
|
||||
listener.on_file_cache_filled(index_key.file_id);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
info!("Applying {edit:?} to region {}", region_id);
|
||||
|
||||
let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit));
|
||||
|
||||
@@ -676,18 +676,6 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Invalid partition columns when creating table '{}', reason: {}",
|
||||
table,
|
||||
reason
|
||||
))]
|
||||
InvalidPartitionColumns {
|
||||
table: String,
|
||||
reason: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to prepare file table"))]
|
||||
PrepareFileTable {
|
||||
#[snafu(implicit)]
|
||||
@@ -784,6 +772,12 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to upgrade catalog manager reference"))]
|
||||
UpgradeCatalogManagerRef {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -806,7 +800,6 @@ impl ErrorExt for Error {
|
||||
| Error::ProjectSchema { .. }
|
||||
| Error::UnsupportedFormat { .. }
|
||||
| Error::ColumnNoneDefaultValue { .. }
|
||||
| Error::InvalidPartitionColumns { .. }
|
||||
| Error::PrepareFileTable { .. }
|
||||
| Error::InferFileTableSchema { .. }
|
||||
| Error::SchemaIncompatible { .. }
|
||||
@@ -931,6 +924,8 @@ impl ErrorExt for Error {
|
||||
|
||||
Error::ExecuteAdminFunction { source, .. } => source.status_code(),
|
||||
Error::BuildRecordBatch { source, .. } => source.status_code(),
|
||||
|
||||
Error::UpgradeCatalogManagerRef { .. } => StatusCode::Internal,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -23,8 +23,10 @@ mod set;
|
||||
mod show;
|
||||
mod tql;
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use catalog::kvbackend::KvBackendCatalogManager;
|
||||
use catalog::CatalogManagerRef;
|
||||
use client::RecordBatches;
|
||||
use common_error::ext::BoxedError;
|
||||
@@ -32,6 +34,7 @@ use common_meta::cache::TableRouteCacheRef;
|
||||
use common_meta::cache_invalidator::CacheInvalidatorRef;
|
||||
use common_meta::ddl::ProcedureExecutorRef;
|
||||
use common_meta::key::flow::{FlowMetadataManager, FlowMetadataManagerRef};
|
||||
use common_meta::key::schema_name::SchemaNameKey;
|
||||
use common_meta::key::view_info::{ViewInfoManager, ViewInfoManagerRef};
|
||||
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
|
||||
use common_meta::kv_backend::KvBackendRef;
|
||||
@@ -60,7 +63,8 @@ use table::TableRef;
|
||||
use self::set::{set_bytea_output, set_datestyle, set_timezone, validate_client_encoding};
|
||||
use crate::error::{
|
||||
self, CatalogSnafu, ExecLogicalPlanSnafu, ExternalSnafu, InvalidSqlSnafu, NotSupportedSnafu,
|
||||
PlanStatementSnafu, Result, SchemaNotFoundSnafu, TableNotFoundSnafu,
|
||||
PlanStatementSnafu, Result, SchemaNotFoundSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu,
|
||||
UpgradeCatalogManagerRefSnafu,
|
||||
};
|
||||
use crate::insert::InserterRef;
|
||||
use crate::statement::copy_database::{COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY};
|
||||
@@ -251,6 +255,29 @@ impl StatementExecutor {
|
||||
)
|
||||
.await
|
||||
}
|
||||
Statement::ShowCreateDatabase(show) => {
|
||||
let (catalog, database) =
|
||||
idents_to_full_database_name(&show.database_name, &query_ctx)
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
let table_metadata_manager = self
|
||||
.catalog_manager
|
||||
.as_any()
|
||||
.downcast_ref::<KvBackendCatalogManager>()
|
||||
.map(|manager| manager.table_metadata_manager_ref().clone())
|
||||
.context(UpgradeCatalogManagerRefSnafu)?;
|
||||
let opts: HashMap<String, String> = table_metadata_manager
|
||||
.schema_manager()
|
||||
.get(SchemaNameKey::new(&catalog, &database))
|
||||
.await
|
||||
.context(TableMetadataManagerSnafu)?
|
||||
.context(SchemaNotFoundSnafu {
|
||||
schema_info: &database,
|
||||
})?
|
||||
.into();
|
||||
|
||||
self.show_create_database(&database, opts.into()).await
|
||||
}
|
||||
Statement::ShowCreateTable(show) => {
|
||||
let (catalog, schema, table) =
|
||||
table_idents_to_full_name(&show.table_name, &query_ctx)
|
||||
|
||||
@@ -67,11 +67,6 @@ impl StatementExecutor {
|
||||
|
||||
let mut exported_rows = 0;
|
||||
for table_name in table_names {
|
||||
// TODO(hl): remove this hardcode once we've removed numbers table.
|
||||
if table_name == "numbers" {
|
||||
continue;
|
||||
}
|
||||
|
||||
let table = self
|
||||
.get_table(&TableReference {
|
||||
catalog: &req.catalog_name,
|
||||
@@ -79,6 +74,10 @@ impl StatementExecutor {
|
||||
table: &table_name,
|
||||
})
|
||||
.await?;
|
||||
// Only base tables, ignores views and temporary tables.
|
||||
if table.table_type() != table::metadata::TableType::Base {
|
||||
continue;
|
||||
}
|
||||
// Ignores physical tables of metric engine.
|
||||
if table.table_info().meta.engine == METRIC_ENGINE_NAME
|
||||
&& !table
|
||||
|
||||
@@ -70,11 +70,11 @@ use super::StatementExecutor;
|
||||
use crate::error::{
|
||||
self, AlterExprToRequestSnafu, CatalogSnafu, ColumnDataTypeSnafu, ColumnNotFoundSnafu,
|
||||
CreateLogicalTablesSnafu, CreateTableInfoSnafu, DeserializePartitionSnafu, EmptyDdlExprSnafu,
|
||||
ExtractTableNamesSnafu, FlowNotFoundSnafu, InvalidPartitionColumnsSnafu,
|
||||
InvalidPartitionRuleSnafu, InvalidPartitionSnafu, InvalidTableNameSnafu, InvalidViewNameSnafu,
|
||||
InvalidViewStmtSnafu, ParseSqlValueSnafu, Result, SchemaInUseSnafu, SchemaNotFoundSnafu,
|
||||
SchemaReadOnlySnafu, SubstraitCodecSnafu, TableAlreadyExistsSnafu, TableMetadataManagerSnafu,
|
||||
TableNotFoundSnafu, UnrecognizedTableOptionSnafu, ViewAlreadyExistsSnafu,
|
||||
ExtractTableNamesSnafu, FlowNotFoundSnafu, InvalidPartitionRuleSnafu, InvalidPartitionSnafu,
|
||||
InvalidTableNameSnafu, InvalidViewNameSnafu, InvalidViewStmtSnafu, ParseSqlValueSnafu, Result,
|
||||
SchemaInUseSnafu, SchemaNotFoundSnafu, SchemaReadOnlySnafu, SubstraitCodecSnafu,
|
||||
TableAlreadyExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu,
|
||||
UnrecognizedTableOptionSnafu, ViewAlreadyExistsSnafu,
|
||||
};
|
||||
use crate::expr_factory;
|
||||
use crate::statement::show::create_partitions_stmt;
|
||||
@@ -239,7 +239,6 @@ impl StatementExecutor {
|
||||
);
|
||||
|
||||
let (partitions, partition_cols) = parse_partitions(create_table, partitions, &query_ctx)?;
|
||||
validate_partition_columns(create_table, &partition_cols)?;
|
||||
let mut table_info = create_table_info(create_table, partition_cols, schema_opts)?;
|
||||
|
||||
let resp = self
|
||||
@@ -1209,22 +1208,6 @@ impl StatementExecutor {
|
||||
}
|
||||
}
|
||||
|
||||
fn validate_partition_columns(
|
||||
create_table: &CreateTableExpr,
|
||||
partition_cols: &[String],
|
||||
) -> Result<()> {
|
||||
ensure!(
|
||||
partition_cols
|
||||
.iter()
|
||||
.all(|col| &create_table.time_index == col || create_table.primary_keys.contains(col)),
|
||||
InvalidPartitionColumnsSnafu {
|
||||
table: &create_table.table_name,
|
||||
reason: "partition column must belongs to primary keys or equals to time index"
|
||||
}
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Parse partition statement [Partitions] into [MetaPartition] and partition columns.
|
||||
fn parse_partitions(
|
||||
create_table: &CreateTableExpr,
|
||||
@@ -1519,31 +1502,6 @@ mod test {
|
||||
assert!(NAME_PATTERN_REG.is_match("hello"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_validate_partition_columns() {
|
||||
let create_table = CreateTableExpr {
|
||||
table_name: "my_table".to_string(),
|
||||
time_index: "ts".to_string(),
|
||||
primary_keys: vec!["a".to_string(), "b".to_string()],
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
assert!(validate_partition_columns(&create_table, &[]).is_ok());
|
||||
assert!(validate_partition_columns(&create_table, &["ts".to_string()]).is_ok());
|
||||
assert!(validate_partition_columns(&create_table, &["a".to_string()]).is_ok());
|
||||
assert!(
|
||||
validate_partition_columns(&create_table, &["b".to_string(), "a".to_string()]).is_ok()
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
validate_partition_columns(&create_table, &["a".to_string(), "c".to_string()])
|
||||
.unwrap_err()
|
||||
.to_string(),
|
||||
"Invalid partition columns when creating table 'my_table', \
|
||||
reason: partition column must belongs to primary keys or equals to time index",
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[ignore = "TODO(ruihang): WIP new partition rule"]
|
||||
async fn test_parse_partitions() {
|
||||
|
||||
@@ -26,6 +26,7 @@ use sql::statements::show::{
|
||||
ShowColumns, ShowCreateFlow, ShowCreateView, ShowDatabases, ShowFlows, ShowIndex, ShowKind,
|
||||
ShowTableStatus, ShowTables, ShowVariables, ShowViews,
|
||||
};
|
||||
use sql::statements::OptionMap;
|
||||
use table::metadata::TableType;
|
||||
use table::table_name::TableName;
|
||||
use table::TableRef;
|
||||
@@ -92,6 +93,15 @@ impl StatementExecutor {
|
||||
.context(ExecuteStatementSnafu)
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub async fn show_create_database(
|
||||
&self,
|
||||
database_name: &str,
|
||||
opts: OptionMap,
|
||||
) -> Result<Output> {
|
||||
query::sql::show_create_database(database_name, opts).context(ExecuteStatementSnafu)
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub async fn show_create_table(
|
||||
&self,
|
||||
@@ -118,8 +128,7 @@ impl StatementExecutor {
|
||||
|
||||
let partitions = create_partitions_stmt(partitions)?;
|
||||
|
||||
query::sql::show_create_table(table, partitions, query_ctx)
|
||||
.context(error::ExecuteStatementSnafu)
|
||||
query::sql::show_create_table(table, partitions, query_ctx).context(ExecuteStatementSnafu)
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
|
||||
@@ -61,7 +61,6 @@ impl ParallelizeScan {
|
||||
debug!(
|
||||
"Assign {total_range_num} ranges to {expected_partition_num} partitions"
|
||||
);
|
||||
|
||||
// update the partition ranges
|
||||
let new_exec = region_scan_exec
|
||||
.with_new_partitions(partition_ranges)
|
||||
@@ -114,25 +113,25 @@ mod test {
|
||||
PartitionRange {
|
||||
start: Timestamp::new(0, TimeUnit::Second),
|
||||
end: Timestamp::new(10, TimeUnit::Second),
|
||||
estimated_size: 100,
|
||||
num_rows: 100,
|
||||
identifier: 1,
|
||||
},
|
||||
PartitionRange {
|
||||
start: Timestamp::new(10, TimeUnit::Second),
|
||||
end: Timestamp::new(20, TimeUnit::Second),
|
||||
estimated_size: 200,
|
||||
num_rows: 200,
|
||||
identifier: 2,
|
||||
},
|
||||
PartitionRange {
|
||||
start: Timestamp::new(20, TimeUnit::Second),
|
||||
end: Timestamp::new(30, TimeUnit::Second),
|
||||
estimated_size: 150,
|
||||
num_rows: 150,
|
||||
identifier: 3,
|
||||
},
|
||||
PartitionRange {
|
||||
start: Timestamp::new(30, TimeUnit::Second),
|
||||
end: Timestamp::new(40, TimeUnit::Second),
|
||||
estimated_size: 250,
|
||||
num_rows: 250,
|
||||
identifier: 4,
|
||||
},
|
||||
];
|
||||
@@ -146,13 +145,13 @@ mod test {
|
||||
PartitionRange {
|
||||
start: Timestamp::new(0, TimeUnit::Second),
|
||||
end: Timestamp::new(10, TimeUnit::Second),
|
||||
estimated_size: 100,
|
||||
num_rows: 100,
|
||||
identifier: 1,
|
||||
},
|
||||
PartitionRange {
|
||||
start: Timestamp::new(20, TimeUnit::Second),
|
||||
end: Timestamp::new(30, TimeUnit::Second),
|
||||
estimated_size: 150,
|
||||
num_rows: 150,
|
||||
identifier: 3,
|
||||
},
|
||||
],
|
||||
@@ -160,13 +159,13 @@ mod test {
|
||||
PartitionRange {
|
||||
start: Timestamp::new(10, TimeUnit::Second),
|
||||
end: Timestamp::new(20, TimeUnit::Second),
|
||||
estimated_size: 200,
|
||||
num_rows: 200,
|
||||
identifier: 2,
|
||||
},
|
||||
PartitionRange {
|
||||
start: Timestamp::new(30, TimeUnit::Second),
|
||||
end: Timestamp::new(40, TimeUnit::Second),
|
||||
estimated_size: 250,
|
||||
num_rows: 250,
|
||||
identifier: 4,
|
||||
},
|
||||
],
|
||||
@@ -180,25 +179,25 @@ mod test {
|
||||
vec![PartitionRange {
|
||||
start: Timestamp::new(0, TimeUnit::Second),
|
||||
end: Timestamp::new(10, TimeUnit::Second),
|
||||
estimated_size: 100,
|
||||
num_rows: 100,
|
||||
identifier: 1,
|
||||
}],
|
||||
vec![PartitionRange {
|
||||
start: Timestamp::new(10, TimeUnit::Second),
|
||||
end: Timestamp::new(20, TimeUnit::Second),
|
||||
estimated_size: 200,
|
||||
num_rows: 200,
|
||||
identifier: 2,
|
||||
}],
|
||||
vec![PartitionRange {
|
||||
start: Timestamp::new(20, TimeUnit::Second),
|
||||
end: Timestamp::new(30, TimeUnit::Second),
|
||||
estimated_size: 150,
|
||||
num_rows: 150,
|
||||
identifier: 3,
|
||||
}],
|
||||
vec![PartitionRange {
|
||||
start: Timestamp::new(30, TimeUnit::Second),
|
||||
end: Timestamp::new(40, TimeUnit::Second),
|
||||
estimated_size: 250,
|
||||
num_rows: 250,
|
||||
identifier: 4,
|
||||
}],
|
||||
];
|
||||
|
||||
@@ -52,12 +52,13 @@ pub use show_create_table::create_table_stmt;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use sql::ast::Ident;
|
||||
use sql::parser::ParserContext;
|
||||
use sql::statements::create::{CreateFlow, CreateView, Partitions};
|
||||
use sql::statements::create::{CreateDatabase, CreateFlow, CreateView, Partitions};
|
||||
use sql::statements::show::{
|
||||
ShowColumns, ShowDatabases, ShowFlows, ShowIndex, ShowKind, ShowTableStatus, ShowTables,
|
||||
ShowVariables, ShowViews,
|
||||
};
|
||||
use sql::statements::statement::Statement;
|
||||
use sql::statements::OptionMap;
|
||||
use sqlparser::ast::ObjectName;
|
||||
use table::requests::{FILE_TABLE_LOCATION_KEY, FILE_TABLE_PATTERN_KEY};
|
||||
use table::TableRef;
|
||||
@@ -136,6 +137,17 @@ static DESCRIBE_TABLE_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
|
||||
]))
|
||||
});
|
||||
|
||||
static SHOW_CREATE_DATABASE_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
|
||||
Arc::new(Schema::new(vec![
|
||||
ColumnSchema::new("Database", ConcreteDataType::string_datatype(), false),
|
||||
ColumnSchema::new(
|
||||
"Create Database",
|
||||
ConcreteDataType::string_datatype(),
|
||||
false,
|
||||
),
|
||||
]))
|
||||
});
|
||||
|
||||
static SHOW_CREATE_TABLE_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
|
||||
Arc::new(Schema::new(vec![
|
||||
ColumnSchema::new("Table", ConcreteDataType::string_datatype(), false),
|
||||
@@ -668,6 +680,26 @@ pub async fn show_status(_query_ctx: QueryContextRef) -> Result<Output> {
|
||||
Ok(Output::new_with_record_batches(records))
|
||||
}
|
||||
|
||||
pub fn show_create_database(database_name: &str, options: OptionMap) -> Result<Output> {
|
||||
let stmt = CreateDatabase {
|
||||
name: ObjectName(vec![Ident {
|
||||
value: database_name.to_string(),
|
||||
quote_style: None,
|
||||
}]),
|
||||
if_not_exists: true,
|
||||
options,
|
||||
};
|
||||
let sql = format!("{stmt}");
|
||||
let columns = vec![
|
||||
Arc::new(StringVector::from(vec![database_name.to_string()])) as _,
|
||||
Arc::new(StringVector::from(vec![sql])) as _,
|
||||
];
|
||||
let records =
|
||||
RecordBatches::try_from_columns(SHOW_CREATE_DATABASE_OUTPUT_SCHEMA.clone(), columns)
|
||||
.context(error::CreateRecordBatchSnafu)?;
|
||||
Ok(Output::new_with_record_batches(records))
|
||||
}
|
||||
|
||||
pub fn show_create_table(
|
||||
table: TableRef,
|
||||
partitions: Option<Partitions>,
|
||||
|
||||
@@ -380,7 +380,7 @@ impl<'a> ParserContext<'a> {
|
||||
Ok(options.into())
|
||||
}
|
||||
|
||||
/// "PARTITION BY ..." clause
|
||||
/// "PARTITION ON COLUMNS (...)" clause
|
||||
fn parse_partitions(&mut self) -> Result<Option<Partitions>> {
|
||||
if !self.parser.parse_keyword(Keyword::PARTITION) {
|
||||
return Ok(None);
|
||||
|
||||
@@ -21,8 +21,9 @@ use crate::error::{
|
||||
};
|
||||
use crate::parser::ParserContext;
|
||||
use crate::statements::show::{
|
||||
ShowColumns, ShowCreateFlow, ShowCreateTable, ShowCreateView, ShowDatabases, ShowFlows,
|
||||
ShowIndex, ShowKind, ShowStatus, ShowTableStatus, ShowTables, ShowVariables, ShowViews,
|
||||
ShowColumns, ShowCreateDatabase, ShowCreateFlow, ShowCreateTable, ShowCreateView,
|
||||
ShowDatabases, ShowFlows, ShowIndex, ShowKind, ShowStatus, ShowTableStatus, ShowTables,
|
||||
ShowVariables, ShowViews,
|
||||
};
|
||||
use crate::statements::statement::Statement;
|
||||
|
||||
@@ -74,7 +75,9 @@ impl<'a> ParserContext<'a> {
|
||||
// SHOW {INDEX | INDEXES | KEYS}
|
||||
self.parse_show_index()
|
||||
} else if self.consume_token("CREATE") {
|
||||
if self.consume_token("TABLE") {
|
||||
if self.consume_token("DATABASE") || self.consume_token("SCHEMA") {
|
||||
self.parse_show_create_database()
|
||||
} else if self.consume_token("TABLE") {
|
||||
self.parse_show_create_table()
|
||||
} else if self.consume_token("FLOW") {
|
||||
self.parse_show_create_flow()
|
||||
@@ -109,6 +112,25 @@ impl<'a> ParserContext<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_show_create_database(&mut self) -> Result<Statement> {
|
||||
let raw_database_name =
|
||||
self.parse_object_name()
|
||||
.with_context(|_| error::UnexpectedSnafu {
|
||||
expected: "a database name",
|
||||
actual: self.peek_token_as_string(),
|
||||
})?;
|
||||
let database_name = Self::canonicalize_object_name(raw_database_name);
|
||||
ensure!(
|
||||
!database_name.0.is_empty(),
|
||||
InvalidDatabaseNameSnafu {
|
||||
name: database_name.to_string(),
|
||||
}
|
||||
);
|
||||
Ok(Statement::ShowCreateDatabase(ShowCreateDatabase {
|
||||
database_name,
|
||||
}))
|
||||
}
|
||||
|
||||
/// Parse SHOW CREATE TABLE statement
|
||||
fn parse_show_create_table(&mut self) -> Result<Statement> {
|
||||
let raw_table_name = self
|
||||
|
||||
@@ -161,6 +161,19 @@ impl Display for ShowTableStatus {
|
||||
}
|
||||
}
|
||||
|
||||
/// SQL structure for `SHOW CREATE DATABASE`.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut)]
|
||||
pub struct ShowCreateDatabase {
|
||||
pub database_name: ObjectName,
|
||||
}
|
||||
|
||||
impl Display for ShowCreateDatabase {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
let database_name = &self.database_name;
|
||||
write!(f, r#"SHOW CREATE DATABASE {database_name}"#)
|
||||
}
|
||||
}
|
||||
|
||||
/// SQL structure for `SHOW CREATE TABLE`.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut)]
|
||||
pub struct ShowCreateTable {
|
||||
|
||||
@@ -32,8 +32,9 @@ use crate::statements::insert::Insert;
|
||||
use crate::statements::query::Query;
|
||||
use crate::statements::set_variables::SetVariables;
|
||||
use crate::statements::show::{
|
||||
ShowColumns, ShowCreateFlow, ShowCreateTable, ShowCreateView, ShowDatabases, ShowFlows,
|
||||
ShowIndex, ShowKind, ShowStatus, ShowTableStatus, ShowTables, ShowVariables, ShowViews,
|
||||
ShowColumns, ShowCreateDatabase, ShowCreateFlow, ShowCreateTable, ShowCreateView,
|
||||
ShowDatabases, ShowFlows, ShowIndex, ShowKind, ShowStatus, ShowTableStatus, ShowTables,
|
||||
ShowVariables, ShowViews,
|
||||
};
|
||||
use crate::statements::tql::Tql;
|
||||
use crate::statements::truncate::TruncateTable;
|
||||
@@ -84,6 +85,8 @@ pub enum Statement {
|
||||
ShowCollation(ShowKind),
|
||||
// SHOW INDEX
|
||||
ShowIndex(ShowIndex),
|
||||
// SHOW CREATE DATABASE
|
||||
ShowCreateDatabase(ShowCreateDatabase),
|
||||
// SHOW CREATE TABLE
|
||||
ShowCreateTable(ShowCreateTable),
|
||||
// SHOW CREATE FLOW
|
||||
@@ -139,6 +142,7 @@ impl Display for Statement {
|
||||
Statement::ShowCreateTable(s) => s.fmt(f),
|
||||
Statement::ShowCreateFlow(s) => s.fmt(f),
|
||||
Statement::ShowFlows(s) => s.fmt(f),
|
||||
Statement::ShowCreateDatabase(s) => s.fmt(f),
|
||||
Statement::ShowCreateView(s) => s.fmt(f),
|
||||
Statement::ShowViews(s) => s.fmt(f),
|
||||
Statement::ShowStatus(s) => s.fmt(f),
|
||||
|
||||
@@ -149,9 +149,8 @@ pub struct PartitionRange {
|
||||
pub start: Timestamp,
|
||||
/// End time of time index column. Inclusive.
|
||||
pub end: Timestamp,
|
||||
/// Estimate size of this range. Is used to balance ranges between partitions.
|
||||
/// No base unit, just a number.
|
||||
pub estimated_size: usize,
|
||||
/// Number of rows in this range. Is used to balance ranges between partitions.
|
||||
pub num_rows: usize,
|
||||
/// Identifier to this range. Assigned by storage engine.
|
||||
pub identifier: usize,
|
||||
}
|
||||
|
||||
@@ -746,6 +746,8 @@ pub async fn test_config_api(store_type: StorageType) {
|
||||
r#"
|
||||
mode = "standalone"
|
||||
enable_telemetry = true
|
||||
init_regions_in_background = false
|
||||
init_regions_parallelism = 16
|
||||
|
||||
[http]
|
||||
addr = "127.0.0.1:4000"
|
||||
|
||||
@@ -26,6 +26,17 @@ SHOW FULL DATABASES;
|
||||
| public | |
|
||||
+--------------------+----------+
|
||||
|
||||
SHOW CREATE DATABASE mydb;
|
||||
|
||||
+----------+------------------------------------+
|
||||
| Database | Create Database |
|
||||
+----------+------------------------------------+
|
||||
| mydb | CREATE DATABASE IF NOT EXISTS mydb |
|
||||
| | WITH( |
|
||||
| | ttl = '1h' |
|
||||
| | ) |
|
||||
+----------+------------------------------------+
|
||||
|
||||
USE mydb;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
@@ -4,6 +4,8 @@ SHOW DATABASES;
|
||||
|
||||
SHOW FULL DATABASES;
|
||||
|
||||
SHOW CREATE DATABASE mydb;
|
||||
|
||||
USE mydb;
|
||||
|
||||
CREATE TABLE test(host STRING, cpu DOUBLE, ts TIMESTAMP TIME INDEX);
|
||||
|
||||
@@ -40,13 +40,13 @@ admin flush_flow('test_numbers_basic');
|
||||
| 1 |
|
||||
+----------------------------------------+
|
||||
|
||||
SELECT col_0, window_start, window_end FROM out_num_cnt_basic;
|
||||
SELECT "SUM(numbers_input_basic.number)", window_start, window_end FROM out_num_cnt_basic;
|
||||
|
||||
+-------+---------------------+---------------------+
|
||||
| col_0 | window_start | window_end |
|
||||
+-------+---------------------+---------------------+
|
||||
| 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 |
|
||||
+-------+---------------------+---------------------+
|
||||
+---------------------------------+---------------------+---------------------+
|
||||
| SUM(numbers_input_basic.number) | window_start | window_end |
|
||||
+---------------------------------+---------------------+---------------------+
|
||||
| 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 |
|
||||
+---------------------------------+---------------------+---------------------+
|
||||
|
||||
admin flush_flow('test_numbers_basic');
|
||||
|
||||
@@ -71,14 +71,15 @@ admin flush_flow('test_numbers_basic');
|
||||
| 1 |
|
||||
+----------------------------------------+
|
||||
|
||||
SELECT col_0, window_start, window_end FROM out_num_cnt_basic;
|
||||
-- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion
|
||||
SELECT "SUM(numbers_input_basic.number)", window_start, window_end FROM out_num_cnt_basic;
|
||||
|
||||
+-------+---------------------+---------------------+
|
||||
| col_0 | window_start | window_end |
|
||||
+-------+---------------------+---------------------+
|
||||
| 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 |
|
||||
| 47 | 2021-07-01T00:00:01 | 2021-07-01T00:00:02 |
|
||||
+-------+---------------------+---------------------+
|
||||
+---------------------------------+---------------------+---------------------+
|
||||
| SUM(numbers_input_basic.number) | window_start | window_end |
|
||||
+---------------------------------+---------------------+---------------------+
|
||||
| 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 |
|
||||
| 47 | 2021-07-01T00:00:01 | 2021-07-01T00:00:02 |
|
||||
+---------------------------------+---------------------+---------------------+
|
||||
|
||||
DROP FLOW test_numbers_basic;
|
||||
|
||||
@@ -142,8 +143,9 @@ CREATE TABLE bytes_log (
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
-- TODO(discord9): remove this after auto infer table's time index is impl
|
||||
CREATE TABLE approx_rate (
|
||||
rate FLOAT,
|
||||
rate DOUBLE,
|
||||
time_window TIMESTAMP,
|
||||
update_at TIMESTAMP,
|
||||
TIME INDEX(time_window)
|
||||
@@ -154,7 +156,7 @@ Affected Rows: 0
|
||||
CREATE FLOW find_approx_rate
|
||||
SINK TO approx_rate
|
||||
AS
|
||||
SELECT CAST((max(byte) - min(byte)) AS FLOAT)/30.0, date_bin(INTERVAL '30 second', ts) as time_window from bytes_log GROUP BY time_window;
|
||||
SELECT (max(byte) - min(byte))/30.0 as rate, date_bin(INTERVAL '30 second', ts) as time_window from bytes_log GROUP BY time_window;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
@@ -174,11 +176,11 @@ admin flush_flow('find_approx_rate');
|
||||
|
||||
SELECT rate, time_window FROM approx_rate;
|
||||
|
||||
+----------+---------------------+
|
||||
| rate | time_window |
|
||||
+----------+---------------------+
|
||||
| 6.633333 | 2025-01-01T00:00:00 |
|
||||
+----------+---------------------+
|
||||
+-------------------+---------------------+
|
||||
| rate | time_window |
|
||||
+-------------------+---------------------+
|
||||
| 6.633333333333334 | 2025-01-01T00:00:00 |
|
||||
+-------------------+---------------------+
|
||||
|
||||
INSERT INTO bytes_log VALUES
|
||||
(450, '2025-01-01 00:00:32'),
|
||||
@@ -196,12 +198,12 @@ admin flush_flow('find_approx_rate');
|
||||
|
||||
SELECT rate, time_window FROM approx_rate;
|
||||
|
||||
+-----------+---------------------+
|
||||
| rate | time_window |
|
||||
+-----------+---------------------+
|
||||
| 6.633333 | 2025-01-01T00:00:00 |
|
||||
| 1.6666666 | 2025-01-01T00:00:30 |
|
||||
+-----------+---------------------+
|
||||
+--------------------+---------------------+
|
||||
| rate | time_window |
|
||||
+--------------------+---------------------+
|
||||
| 6.633333333333334 | 2025-01-01T00:00:00 |
|
||||
| 1.6666666666666667 | 2025-01-01T00:00:30 |
|
||||
+--------------------+---------------------+
|
||||
|
||||
DROP TABLE bytes_log;
|
||||
|
||||
|
||||
@@ -22,7 +22,7 @@ VALUES
|
||||
|
||||
admin flush_flow('test_numbers_basic');
|
||||
|
||||
SELECT col_0, window_start, window_end FROM out_num_cnt_basic;
|
||||
SELECT "SUM(numbers_input_basic.number)", window_start, window_end FROM out_num_cnt_basic;
|
||||
|
||||
admin flush_flow('test_numbers_basic');
|
||||
|
||||
@@ -33,7 +33,8 @@ VALUES
|
||||
|
||||
admin flush_flow('test_numbers_basic');
|
||||
|
||||
SELECT col_0, window_start, window_end FROM out_num_cnt_basic;
|
||||
-- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion
|
||||
SELECT "SUM(numbers_input_basic.number)", window_start, window_end FROM out_num_cnt_basic;
|
||||
|
||||
DROP FLOW test_numbers_basic;
|
||||
DROP TABLE numbers_input_basic;
|
||||
@@ -67,8 +68,9 @@ CREATE TABLE bytes_log (
|
||||
TIME INDEX(ts)
|
||||
);
|
||||
|
||||
-- TODO(discord9): remove this after auto infer table's time index is impl
|
||||
CREATE TABLE approx_rate (
|
||||
rate FLOAT,
|
||||
rate DOUBLE,
|
||||
time_window TIMESTAMP,
|
||||
update_at TIMESTAMP,
|
||||
TIME INDEX(time_window)
|
||||
@@ -77,7 +79,7 @@ CREATE TABLE approx_rate (
|
||||
CREATE FLOW find_approx_rate
|
||||
SINK TO approx_rate
|
||||
AS
|
||||
SELECT CAST((max(byte) - min(byte)) AS FLOAT)/30.0, date_bin(INTERVAL '30 second', ts) as time_window from bytes_log GROUP BY time_window;
|
||||
SELECT (max(byte) - min(byte))/30.0 as rate, date_bin(INTERVAL '30 second', ts) as time_window from bytes_log GROUP BY time_window;
|
||||
|
||||
INSERT INTO bytes_log VALUES
|
||||
(101, '2025-01-01 00:00:01'),
|
||||
|
||||
@@ -39,13 +39,14 @@ admin flush_flow('test_numbers_df_func');
|
||||
| 1 |
|
||||
+------------------------------------------+
|
||||
|
||||
SELECT col_0, window_start, window_end FROM out_num_cnt_df_func;
|
||||
-- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion
|
||||
SELECT "SUM(abs(numbers_input_df_func.number))", window_start, window_end FROM out_num_cnt_df_func;
|
||||
|
||||
+-------+---------------------+---------------------+
|
||||
| col_0 | window_start | window_end |
|
||||
+-------+---------------------+---------------------+
|
||||
| 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 |
|
||||
+-------+---------------------+---------------------+
|
||||
+----------------------------------------+---------------------+---------------------+
|
||||
| SUM(abs(numbers_input_df_func.number)) | window_start | window_end |
|
||||
+----------------------------------------+---------------------+---------------------+
|
||||
| 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 |
|
||||
+----------------------------------------+---------------------+---------------------+
|
||||
|
||||
admin flush_flow('test_numbers_df_func');
|
||||
|
||||
@@ -70,14 +71,15 @@ admin flush_flow('test_numbers_df_func');
|
||||
| 1 |
|
||||
+------------------------------------------+
|
||||
|
||||
SELECT col_0, window_start, window_end FROM out_num_cnt_df_func;
|
||||
-- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion
|
||||
SELECT "SUM(abs(numbers_input_df_func.number))", window_start, window_end FROM out_num_cnt_df_func;
|
||||
|
||||
+-------+---------------------+---------------------+
|
||||
| col_0 | window_start | window_end |
|
||||
+-------+---------------------+---------------------+
|
||||
| 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 |
|
||||
| 47 | 2021-07-01T00:00:01 | 2021-07-01T00:00:02 |
|
||||
+-------+---------------------+---------------------+
|
||||
+----------------------------------------+---------------------+---------------------+
|
||||
| SUM(abs(numbers_input_df_func.number)) | window_start | window_end |
|
||||
+----------------------------------------+---------------------+---------------------+
|
||||
| 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 |
|
||||
| 47 | 2021-07-01T00:00:01 | 2021-07-01T00:00:02 |
|
||||
+----------------------------------------+---------------------+---------------------+
|
||||
|
||||
DROP FLOW test_numbers_df_func;
|
||||
|
||||
@@ -132,13 +134,13 @@ admin flush_flow('test_numbers_df_func');
|
||||
| 1 |
|
||||
+------------------------------------------+
|
||||
|
||||
SELECT col_0, window_start, window_end FROM out_num_cnt_df_func;
|
||||
SELECT "abs(SUM(numbers_input_df_func.number))", window_start, window_end FROM out_num_cnt_df_func;
|
||||
|
||||
+-------+---------------------+---------------------+
|
||||
| col_0 | window_start | window_end |
|
||||
+-------+---------------------+---------------------+
|
||||
| 2 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 |
|
||||
+-------+---------------------+---------------------+
|
||||
+----------------------------------------+---------------------+---------------------+
|
||||
| abs(SUM(numbers_input_df_func.number)) | window_start | window_end |
|
||||
+----------------------------------------+---------------------+---------------------+
|
||||
| 2 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 |
|
||||
+----------------------------------------+---------------------+---------------------+
|
||||
|
||||
admin flush_flow('test_numbers_df_func');
|
||||
|
||||
@@ -163,14 +165,14 @@ admin flush_flow('test_numbers_df_func');
|
||||
| 1 |
|
||||
+------------------------------------------+
|
||||
|
||||
SELECT col_0, window_start, window_end FROM out_num_cnt_df_func;
|
||||
SELECT "abs(SUM(numbers_input_df_func.number))", window_start, window_end FROM out_num_cnt_df_func;
|
||||
|
||||
+-------+---------------------+---------------------+
|
||||
| col_0 | window_start | window_end |
|
||||
+-------+---------------------+---------------------+
|
||||
| 2 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 |
|
||||
| 1 | 2021-07-01T00:00:01 | 2021-07-01T00:00:02 |
|
||||
+-------+---------------------+---------------------+
|
||||
+----------------------------------------+---------------------+---------------------+
|
||||
| abs(SUM(numbers_input_df_func.number)) | window_start | window_end |
|
||||
+----------------------------------------+---------------------+---------------------+
|
||||
| 2 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 |
|
||||
| 1 | 2021-07-01T00:00:01 | 2021-07-01T00:00:02 |
|
||||
+----------------------------------------+---------------------+---------------------+
|
||||
|
||||
DROP FLOW test_numbers_df_func;
|
||||
|
||||
@@ -196,8 +198,8 @@ Affected Rows: 0
|
||||
|
||||
CREATE FLOW test_numbers_df_func
|
||||
SINK TO out_num_cnt_df_func
|
||||
AS
|
||||
SELECT max(number) - min(number), date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00'::TimestampNanosecond) FROM numbers_input_df_func GROUP BY date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00'::TimestampNanosecond);
|
||||
AS
|
||||
SELECT max(number) - min(number) as maxmin, date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00'::Timestamp) as time_window FROM numbers_input_df_func GROUP BY time_window;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
@@ -224,13 +226,13 @@ admin flush_flow('test_numbers_df_func');
|
||||
| 1 |
|
||||
+------------------------------------------+
|
||||
|
||||
SELECT col_0, col_1 FROM out_num_cnt_df_func;
|
||||
SELECT maxmin, time_window FROM out_num_cnt_df_func;
|
||||
|
||||
+-------+---------------------+
|
||||
| col_0 | col_1 |
|
||||
+-------+---------------------+
|
||||
| 2 | 2021-07-01T00:00:00 |
|
||||
+-------+---------------------+
|
||||
+--------+---------------------+
|
||||
| maxmin | time_window |
|
||||
+--------+---------------------+
|
||||
| 2 | 2021-07-01T00:00:00 |
|
||||
+--------+---------------------+
|
||||
|
||||
admin flush_flow('test_numbers_df_func');
|
||||
|
||||
@@ -255,14 +257,14 @@ admin flush_flow('test_numbers_df_func');
|
||||
| 1 |
|
||||
+------------------------------------------+
|
||||
|
||||
SELECT col_0, col_1 FROM out_num_cnt_df_func;
|
||||
SELECT maxmin, time_window FROM out_num_cnt_df_func;
|
||||
|
||||
+-------+---------------------+
|
||||
| col_0 | col_1 |
|
||||
+-------+---------------------+
|
||||
| 2 | 2021-07-01T00:00:00 |
|
||||
| 1 | 2021-07-01T00:00:01 |
|
||||
+-------+---------------------+
|
||||
+--------+---------------------+
|
||||
| maxmin | time_window |
|
||||
+--------+---------------------+
|
||||
| 2 | 2021-07-01T00:00:00 |
|
||||
| 1 | 2021-07-01T00:00:01 |
|
||||
+--------+---------------------+
|
||||
|
||||
DROP FLOW test_numbers_df_func;
|
||||
|
||||
@@ -288,8 +290,8 @@ Affected Rows: 0
|
||||
|
||||
CREATE FLOW test_numbers_df_func
|
||||
SINK TO out_num_cnt
|
||||
AS
|
||||
SELECT date_trunc('second', ts), sum(number) FROM numbers_input_df_func GROUP BY date_trunc('second', ts);
|
||||
AS
|
||||
SELECT date_trunc('second', ts) as time_window, sum(number) as sum_num FROM numbers_input_df_func GROUP BY date_trunc('second', ts);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
@@ -316,13 +318,13 @@ admin flush_flow('test_numbers_df_func');
|
||||
| 1 |
|
||||
+------------------------------------------+
|
||||
|
||||
SELECT col_0, col_1 FROM out_num_cnt;
|
||||
SELECT time_window, sum_num FROM out_num_cnt;
|
||||
|
||||
+---------------------+-------+
|
||||
| col_0 | col_1 |
|
||||
+---------------------+-------+
|
||||
| 2021-07-01T00:00:00 | 42 |
|
||||
+---------------------+-------+
|
||||
+---------------------+---------+
|
||||
| time_window | sum_num |
|
||||
+---------------------+---------+
|
||||
| 2021-07-01T00:00:00 | 42 |
|
||||
+---------------------+---------+
|
||||
|
||||
admin flush_flow('test_numbers_df_func');
|
||||
|
||||
@@ -347,14 +349,14 @@ admin flush_flow('test_numbers_df_func');
|
||||
| 1 |
|
||||
+------------------------------------------+
|
||||
|
||||
SELECT col_0, col_1 FROM out_num_cnt;
|
||||
SELECT time_window, sum_num FROM out_num_cnt;
|
||||
|
||||
+---------------------+-------+
|
||||
| col_0 | col_1 |
|
||||
+---------------------+-------+
|
||||
| 2021-07-01T00:00:00 | 42 |
|
||||
| 2021-07-01T00:00:01 | 47 |
|
||||
+---------------------+-------+
|
||||
+---------------------+---------+
|
||||
| time_window | sum_num |
|
||||
+---------------------+---------+
|
||||
| 2021-07-01T00:00:00 | 42 |
|
||||
| 2021-07-01T00:00:01 | 47 |
|
||||
+---------------------+---------+
|
||||
|
||||
DROP FLOW test_numbers_df_func;
|
||||
|
||||
|
||||
@@ -21,7 +21,8 @@ VALUES
|
||||
-- flush flow to make sure that table is created and data is inserted
|
||||
admin flush_flow('test_numbers_df_func');
|
||||
|
||||
SELECT col_0, window_start, window_end FROM out_num_cnt_df_func;
|
||||
-- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion
|
||||
SELECT "SUM(abs(numbers_input_df_func.number))", window_start, window_end FROM out_num_cnt_df_func;
|
||||
|
||||
admin flush_flow('test_numbers_df_func');
|
||||
|
||||
@@ -32,7 +33,8 @@ VALUES
|
||||
|
||||
admin flush_flow('test_numbers_df_func');
|
||||
|
||||
SELECT col_0, window_start, window_end FROM out_num_cnt_df_func;
|
||||
-- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion
|
||||
SELECT "SUM(abs(numbers_input_df_func.number))", window_start, window_end FROM out_num_cnt_df_func;
|
||||
|
||||
DROP FLOW test_numbers_df_func;
|
||||
DROP TABLE numbers_input_df_func;
|
||||
@@ -61,7 +63,7 @@ VALUES
|
||||
-- flush flow to make sure that table is created and data is inserted
|
||||
admin flush_flow('test_numbers_df_func');
|
||||
|
||||
SELECT col_0, window_start, window_end FROM out_num_cnt_df_func;
|
||||
SELECT "abs(SUM(numbers_input_df_func.number))", window_start, window_end FROM out_num_cnt_df_func;
|
||||
|
||||
admin flush_flow('test_numbers_df_func');
|
||||
|
||||
@@ -72,7 +74,7 @@ VALUES
|
||||
|
||||
admin flush_flow('test_numbers_df_func');
|
||||
|
||||
SELECT col_0, window_start, window_end FROM out_num_cnt_df_func;
|
||||
SELECT "abs(SUM(numbers_input_df_func.number))", window_start, window_end FROM out_num_cnt_df_func;
|
||||
|
||||
DROP FLOW test_numbers_df_func;
|
||||
DROP TABLE numbers_input_df_func;
|
||||
@@ -88,8 +90,8 @@ CREATE TABLE numbers_input_df_func (
|
||||
|
||||
CREATE FLOW test_numbers_df_func
|
||||
SINK TO out_num_cnt_df_func
|
||||
AS
|
||||
SELECT max(number) - min(number), date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00'::TimestampNanosecond) FROM numbers_input_df_func GROUP BY date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00'::TimestampNanosecond);
|
||||
AS
|
||||
SELECT max(number) - min(number) as maxmin, date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00'::Timestamp) as time_window FROM numbers_input_df_func GROUP BY time_window;
|
||||
|
||||
admin flush_flow('test_numbers_df_func');
|
||||
|
||||
@@ -100,7 +102,7 @@ VALUES
|
||||
|
||||
admin flush_flow('test_numbers_df_func');
|
||||
|
||||
SELECT col_0, col_1 FROM out_num_cnt_df_func;
|
||||
SELECT maxmin, time_window FROM out_num_cnt_df_func;
|
||||
|
||||
admin flush_flow('test_numbers_df_func');
|
||||
|
||||
@@ -111,7 +113,7 @@ VALUES
|
||||
|
||||
admin flush_flow('test_numbers_df_func');
|
||||
|
||||
SELECT col_0, col_1 FROM out_num_cnt_df_func;
|
||||
SELECT maxmin, time_window FROM out_num_cnt_df_func;
|
||||
|
||||
DROP FLOW test_numbers_df_func;
|
||||
DROP TABLE numbers_input_df_func;
|
||||
@@ -128,8 +130,8 @@ CREATE TABLE numbers_input_df_func (
|
||||
|
||||
CREATE FLOW test_numbers_df_func
|
||||
SINK TO out_num_cnt
|
||||
AS
|
||||
SELECT date_trunc('second', ts), sum(number) FROM numbers_input_df_func GROUP BY date_trunc('second', ts);
|
||||
AS
|
||||
SELECT date_trunc('second', ts) as time_window, sum(number) as sum_num FROM numbers_input_df_func GROUP BY date_trunc('second', ts);
|
||||
|
||||
admin flush_flow('test_numbers_df_func');
|
||||
|
||||
@@ -140,7 +142,7 @@ VALUES
|
||||
|
||||
admin flush_flow('test_numbers_df_func');
|
||||
|
||||
SELECT col_0, col_1 FROM out_num_cnt;
|
||||
SELECT time_window, sum_num FROM out_num_cnt;
|
||||
|
||||
admin flush_flow('test_numbers_df_func');
|
||||
|
||||
@@ -151,7 +153,7 @@ VALUES
|
||||
|
||||
admin flush_flow('test_numbers_df_func');
|
||||
|
||||
SELECT col_0, col_1 FROM out_num_cnt;
|
||||
SELECT time_window, sum_num FROM out_num_cnt;
|
||||
|
||||
DROP FLOW test_numbers_df_func;
|
||||
DROP TABLE numbers_input_df_func;
|
||||
|
||||
@@ -1,48 +0,0 @@
|
||||
SELECT h3(37.76938, -122.3889, 0);
|
||||
|
||||
+---------------------------------------------------+
|
||||
| h3(Float64(37.76938),Float64(-122.3889),Int64(0)) |
|
||||
+---------------------------------------------------+
|
||||
| 8029fffffffffff |
|
||||
+---------------------------------------------------+
|
||||
|
||||
SELECT h3(37.76938, -122.3889, 1);
|
||||
|
||||
+---------------------------------------------------+
|
||||
| h3(Float64(37.76938),Float64(-122.3889),Int64(1)) |
|
||||
+---------------------------------------------------+
|
||||
| 81283ffffffffff |
|
||||
+---------------------------------------------------+
|
||||
|
||||
SELECT h3(37.76938, -122.3889, 8);
|
||||
|
||||
+---------------------------------------------------+
|
||||
| h3(Float64(37.76938),Float64(-122.3889),Int64(8)) |
|
||||
+---------------------------------------------------+
|
||||
| 88283082e7fffff |
|
||||
+---------------------------------------------------+
|
||||
|
||||
SELECT geohash(37.76938, -122.3889, 9);
|
||||
|
||||
+--------------------------------------------------------+
|
||||
| geohash(Float64(37.76938),Float64(-122.3889),Int64(9)) |
|
||||
+--------------------------------------------------------+
|
||||
| 9q8yygxne |
|
||||
+--------------------------------------------------------+
|
||||
|
||||
SELECT geohash(37.76938, -122.3889, 10);
|
||||
|
||||
+---------------------------------------------------------+
|
||||
| geohash(Float64(37.76938),Float64(-122.3889),Int64(10)) |
|
||||
+---------------------------------------------------------+
|
||||
| 9q8yygxnef |
|
||||
+---------------------------------------------------------+
|
||||
|
||||
SELECT geohash(37.76938, -122.3889, 11);
|
||||
|
||||
+---------------------------------------------------------+
|
||||
| geohash(Float64(37.76938),Float64(-122.3889),Int64(11)) |
|
||||
+---------------------------------------------------------+
|
||||
| 9q8yygxneft |
|
||||
+---------------------------------------------------------+
|
||||
|
||||
@@ -1,11 +0,0 @@
|
||||
SELECT h3(37.76938, -122.3889, 0);
|
||||
|
||||
SELECT h3(37.76938, -122.3889, 1);
|
||||
|
||||
SELECT h3(37.76938, -122.3889, 8);
|
||||
|
||||
SELECT geohash(37.76938, -122.3889, 9);
|
||||
|
||||
SELECT geohash(37.76938, -122.3889, 10);
|
||||
|
||||
SELECT geohash(37.76938, -122.3889, 11);
|
||||
69
tests/cases/standalone/common/select/last_value.result
Normal file
69
tests/cases/standalone/common/select/last_value.result
Normal file
@@ -0,0 +1,69 @@
|
||||
create table t (
|
||||
ts timestamp time index,
|
||||
host string primary key,
|
||||
not_pk string,
|
||||
val double,
|
||||
) with (append_mode='true');
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
insert into t values
|
||||
(0, 'a', '🌕', 1.0),
|
||||
(1, 'b', '🌖', 2.0),
|
||||
(2, 'a', '🌗', 3.0),
|
||||
(3, 'c', '🌘', 4.0),
|
||||
(4, 'a', '🌑', 5.0),
|
||||
(5, 'b', '🌒', 6.0),
|
||||
(6, 'a', '🌓', 7.0),
|
||||
(7, 'c', '🌔', 8.0),
|
||||
(8, 'd', '🌕', 9.0);
|
||||
|
||||
Affected Rows: 9
|
||||
|
||||
admin flush_table('t');
|
||||
|
||||
+------------------------+
|
||||
| ADMIN flush_table('t') |
|
||||
+------------------------+
|
||||
| 0 |
|
||||
+------------------------+
|
||||
|
||||
select
|
||||
last_value(host order by ts),
|
||||
last_value(not_pk order by ts),
|
||||
last_value(val order by ts)
|
||||
from t
|
||||
group by host
|
||||
order by host;
|
||||
|
||||
+---------------------------------------------------+-----------------------------------------------------+--------------------------------------------------+
|
||||
| last_value(t.host) ORDER BY [t.ts ASC NULLS LAST] | last_value(t.not_pk) ORDER BY [t.ts ASC NULLS LAST] | last_value(t.val) ORDER BY [t.ts ASC NULLS LAST] |
|
||||
+---------------------------------------------------+-----------------------------------------------------+--------------------------------------------------+
|
||||
| a | 🌓 | 7.0 |
|
||||
| b | 🌒 | 6.0 |
|
||||
| c | 🌔 | 8.0 |
|
||||
| d | 🌕 | 9.0 |
|
||||
+---------------------------------------------------+-----------------------------------------------------+--------------------------------------------------+
|
||||
|
||||
-- repeat the query again, ref: https://github.com/GreptimeTeam/greptimedb/issues/4650
|
||||
select
|
||||
last_value(host order by ts),
|
||||
last_value(not_pk order by ts),
|
||||
last_value(val order by ts)
|
||||
from t
|
||||
group by host
|
||||
order by host;
|
||||
|
||||
+---------------------------------------------------+-----------------------------------------------------+--------------------------------------------------+
|
||||
| last_value(t.host) ORDER BY [t.ts ASC NULLS LAST] | last_value(t.not_pk) ORDER BY [t.ts ASC NULLS LAST] | last_value(t.val) ORDER BY [t.ts ASC NULLS LAST] |
|
||||
+---------------------------------------------------+-----------------------------------------------------+--------------------------------------------------+
|
||||
| a | 🌓 | 7.0 |
|
||||
| b | 🌒 | 6.0 |
|
||||
| c | 🌔 | 8.0 |
|
||||
| d | 🌕 | 9.0 |
|
||||
+---------------------------------------------------+-----------------------------------------------------+--------------------------------------------------+
|
||||
|
||||
drop table t;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
38
tests/cases/standalone/common/select/last_value.sql
Normal file
38
tests/cases/standalone/common/select/last_value.sql
Normal file
@@ -0,0 +1,38 @@
|
||||
create table t (
|
||||
ts timestamp time index,
|
||||
host string primary key,
|
||||
not_pk string,
|
||||
val double,
|
||||
) with (append_mode='true');
|
||||
|
||||
insert into t values
|
||||
(0, 'a', '🌕', 1.0),
|
||||
(1, 'b', '🌖', 2.0),
|
||||
(2, 'a', '🌗', 3.0),
|
||||
(3, 'c', '🌘', 4.0),
|
||||
(4, 'a', '🌑', 5.0),
|
||||
(5, 'b', '🌒', 6.0),
|
||||
(6, 'a', '🌓', 7.0),
|
||||
(7, 'c', '🌔', 8.0),
|
||||
(8, 'd', '🌕', 9.0);
|
||||
|
||||
admin flush_table('t');
|
||||
|
||||
select
|
||||
last_value(host order by ts),
|
||||
last_value(not_pk order by ts),
|
||||
last_value(val order by ts)
|
||||
from t
|
||||
group by host
|
||||
order by host;
|
||||
|
||||
-- repeat the query again, ref: https://github.com/GreptimeTeam/greptimedb/issues/4650
|
||||
select
|
||||
last_value(host order by ts),
|
||||
last_value(not_pk order by ts),
|
||||
last_value(val order by ts)
|
||||
from t
|
||||
group by host
|
||||
order by host;
|
||||
|
||||
drop table t;
|
||||
Reference in New Issue
Block a user