feat!: impl admin command (#4600)

* feat: impl admin statement parser

* feat: introduce AsyncFunction and implements it for admin functions

* feat: execute admin functions

* fix: license header

* fix: panic in test

* chore: fixed by code review
This commit is contained in:
dennis zhuang
2024-08-26 15:53:40 +08:00
committed by GitHub
parent da337a9635
commit 63f2463273
39 changed files with 777 additions and 322 deletions

1
Cargo.lock generated
View File

@@ -1954,6 +1954,7 @@ dependencies = [
"statrs",
"store-api",
"table",
"tokio",
]
[[package]]

View File

@@ -39,3 +39,4 @@ table.workspace = true
[dev-dependencies]
ron = "0.7"
serde = { version = "1.0", features = ["derive"] }
tokio.workspace = true

View File

@@ -110,7 +110,7 @@ mod test {
use session::context::QueryContext;
use super::*;
use crate::function::{Function, FunctionContext};
use crate::function::{AsyncFunction, FunctionContext};
#[test]
fn test_flush_flow_metadata() {
@@ -130,8 +130,8 @@ mod test {
);
}
#[test]
fn test_missing_flow_service() {
#[tokio::test]
async fn test_missing_flow_service() {
let f = FlushFlowFunction;
let args = vec!["flow_name"];
@@ -140,7 +140,7 @@ mod test {
.map(|arg| Arc::new(StringVector::from_slice(&[arg])) as _)
.collect::<Vec<_>>();
let result = f.eval(FunctionContext::default(), &args).unwrap_err();
let result = f.eval(FunctionContext::default(), &args).await.unwrap_err();
assert_eq!(
"Missing FlowServiceHandler, not expected",
result.to_string()

View File

@@ -56,8 +56,10 @@ pub trait Function: fmt::Display + Sync + Send {
/// Returns the name of the function, should be unique.
fn name(&self) -> &str;
/// The returned data type of function execution.
fn return_type(&self, input_types: &[ConcreteDataType]) -> Result<ConcreteDataType>;
/// The signature of function.
fn signature(&self) -> Signature;
/// Evaluate the function, e.g. run/execute the function.
@@ -65,3 +67,22 @@ pub trait Function: fmt::Display + Sync + Send {
}
pub type FunctionRef = Arc<dyn Function>;
/// Async Scalar function trait
#[async_trait::async_trait]
pub trait AsyncFunction: fmt::Display + Sync + Send {
/// Returns the name of the function, should be unique.
fn name(&self) -> &str;
/// The returned data type of function execution.
fn return_type(&self, input_types: &[ConcreteDataType]) -> Result<ConcreteDataType>;
/// The signature of function.
fn signature(&self) -> Signature;
/// Evaluate the function, e.g. run/execute the function.
/// TODO(dennis): simplify the signature and refactor all the admin functions.
async fn eval(&self, _func_ctx: FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef>;
}
pub type AsyncFunctionRef = Arc<dyn AsyncFunction>;

View File

@@ -18,7 +18,7 @@ use std::sync::{Arc, RwLock};
use once_cell::sync::Lazy;
use crate::function::FunctionRef;
use crate::function::{AsyncFunctionRef, FunctionRef};
use crate::scalars::aggregate::{AggregateFunctionMetaRef, AggregateFunctions};
use crate::scalars::date::DateFunction;
use crate::scalars::expression::ExpressionFunction;
@@ -32,6 +32,7 @@ use crate::table::TableFunction;
#[derive(Default)]
pub struct FunctionRegistry {
functions: RwLock<HashMap<String, FunctionRef>>,
async_functions: RwLock<HashMap<String, AsyncFunctionRef>>,
aggregate_functions: RwLock<HashMap<String, AggregateFunctionMetaRef>>,
}
@@ -44,6 +45,27 @@ impl FunctionRegistry {
.insert(func.name().to_string(), func);
}
pub fn register_async(&self, func: AsyncFunctionRef) {
let _ = self
.async_functions
.write()
.unwrap()
.insert(func.name().to_string(), func);
}
pub fn get_async_function(&self, name: &str) -> Option<AsyncFunctionRef> {
self.async_functions.read().unwrap().get(name).cloned()
}
pub fn async_functions(&self) -> Vec<AsyncFunctionRef> {
self.async_functions
.read()
.unwrap()
.values()
.cloned()
.collect()
}
pub fn register_aggregate_function(&self, func: AggregateFunctionMetaRef) {
let _ = self
.aggregate_functions

View File

@@ -38,7 +38,7 @@ impl SystemFunction {
registry.register(Arc::new(VersionFunction));
registry.register(Arc::new(DatabaseFunction));
registry.register(Arc::new(TimezoneFunction));
registry.register(Arc::new(ProcedureStateFunction));
registry.register_async(Arc::new(ProcedureStateFunction));
PGCatalogFunction::register(registry);
}
}

View File

@@ -96,7 +96,7 @@ mod tests {
use datatypes::vectors::StringVector;
use super::*;
use crate::function::{Function, FunctionContext};
use crate::function::{AsyncFunction, FunctionContext};
#[test]
fn test_procedure_state_misc() {
@@ -114,8 +114,8 @@ mod tests {
));
}
#[test]
fn test_missing_procedure_service() {
#[tokio::test]
async fn test_missing_procedure_service() {
let f = ProcedureStateFunction;
let args = vec!["pid"];
@@ -125,15 +125,15 @@ mod tests {
.map(|arg| Arc::new(StringVector::from_slice(&[arg])) as _)
.collect::<Vec<_>>();
let result = f.eval(FunctionContext::default(), &args).unwrap_err();
let result = f.eval(FunctionContext::default(), &args).await.unwrap_err();
assert_eq!(
"Missing ProcedureServiceHandler, not expected",
result.to_string()
);
}
#[test]
fn test_procedure_state() {
#[tokio::test]
async fn test_procedure_state() {
let f = ProcedureStateFunction;
let args = vec!["pid"];
@@ -143,7 +143,7 @@ mod tests {
.map(|arg| Arc::new(StringVector::from_slice(&[arg])) as _)
.collect::<Vec<_>>();
let result = f.eval(FunctionContext::mock(), &args).unwrap();
let result = f.eval(FunctionContext::mock(), &args).await.unwrap();
let expect: VectorRef = Arc::new(StringVector::from(vec![
"{\"status\":\"Done\",\"error\":\"OK\"}",

View File

@@ -31,11 +31,11 @@ pub(crate) struct TableFunction;
impl TableFunction {
/// Register all table functions to [`FunctionRegistry`].
pub fn register(registry: &FunctionRegistry) {
registry.register(Arc::new(MigrateRegionFunction));
registry.register(Arc::new(FlushRegionFunction));
registry.register(Arc::new(CompactRegionFunction));
registry.register(Arc::new(FlushTableFunction));
registry.register(Arc::new(CompactTableFunction));
registry.register(Arc::new(FlushFlowFunction));
registry.register_async(Arc::new(MigrateRegionFunction));
registry.register_async(Arc::new(FlushRegionFunction));
registry.register_async(Arc::new(CompactRegionFunction));
registry.register_async(Arc::new(FlushTableFunction));
registry.register_async(Arc::new(CompactTableFunction));
registry.register_async(Arc::new(FlushFlowFunction));
}
}

View File

@@ -77,7 +77,7 @@ mod tests {
use datatypes::vectors::UInt64Vector;
use super::*;
use crate::function::{Function, FunctionContext};
use crate::function::{AsyncFunction, FunctionContext};
macro_rules! define_region_function_test {
($name: ident, $func: ident) => {
@@ -97,8 +97,8 @@ mod tests {
} if valid_types == ConcreteDataType::numerics()));
}
#[test]
fn [<test_ $name _missing_table_mutation>]() {
#[tokio::test]
async fn [<test_ $name _missing_table_mutation>]() {
let f = $func;
let args = vec![99];
@@ -108,15 +108,15 @@ mod tests {
.map(|arg| Arc::new(UInt64Vector::from_slice([arg])) as _)
.collect::<Vec<_>>();
let result = f.eval(FunctionContext::default(), &args).unwrap_err();
let result = f.eval(FunctionContext::default(), &args).await.unwrap_err();
assert_eq!(
"Missing TableMutationHandler, not expected",
result.to_string()
);
}
#[test]
fn [<test_ $name>]() {
#[tokio::test]
async fn [<test_ $name>]() {
let f = $func;
@@ -127,7 +127,7 @@ mod tests {
.map(|arg| Arc::new(UInt64Vector::from_slice([arg])) as _)
.collect::<Vec<_>>();
let result = f.eval(FunctionContext::mock(), &args).unwrap();
let result = f.eval(FunctionContext::mock(), &args).await.unwrap();
let expect: VectorRef = Arc::new(UInt64Vector::from_slice([42]));
assert_eq!(expect, result);

View File

@@ -210,7 +210,7 @@ mod tests {
use session::context::QueryContext;
use super::*;
use crate::function::{Function, FunctionContext};
use crate::function::{AsyncFunction, FunctionContext};
macro_rules! define_table_function_test {
($name: ident, $func: ident) => {
@@ -230,8 +230,8 @@ mod tests {
} if valid_types == vec![ConcreteDataType::string_datatype()]));
}
#[test]
fn [<test_ $name _missing_table_mutation>]() {
#[tokio::test]
async fn [<test_ $name _missing_table_mutation>]() {
let f = $func;
let args = vec!["test"];
@@ -241,15 +241,15 @@ mod tests {
.map(|arg| Arc::new(StringVector::from(vec![arg])) as _)
.collect::<Vec<_>>();
let result = f.eval(FunctionContext::default(), &args).unwrap_err();
let result = f.eval(FunctionContext::default(), &args).await.unwrap_err();
assert_eq!(
"Missing TableMutationHandler, not expected",
result.to_string()
);
}
#[test]
fn [<test_ $name>]() {
#[tokio::test]
async fn [<test_ $name>]() {
let f = $func;
@@ -260,7 +260,7 @@ mod tests {
.map(|arg| Arc::new(StringVector::from(vec![arg])) as _)
.collect::<Vec<_>>();
let result = f.eval(FunctionContext::mock(), &args).unwrap();
let result = f.eval(FunctionContext::mock(), &args).await.unwrap();
let expect: VectorRef = Arc::new(UInt64Vector::from_slice([42]));
assert_eq!(expect, result);

View File

@@ -123,7 +123,7 @@ mod tests {
use datatypes::vectors::{StringVector, UInt64Vector, VectorRef};
use super::*;
use crate::function::{Function, FunctionContext};
use crate::function::{AsyncFunction, FunctionContext};
#[test]
fn test_migrate_region_misc() {
@@ -140,8 +140,8 @@ mod tests {
} if sigs.len() == 2));
}
#[test]
fn test_missing_procedure_service() {
#[tokio::test]
async fn test_missing_procedure_service() {
let f = MigrateRegionFunction;
let args = vec![1, 1, 1];
@@ -151,15 +151,15 @@ mod tests {
.map(|arg| Arc::new(UInt64Vector::from_slice([arg])) as _)
.collect::<Vec<_>>();
let result = f.eval(FunctionContext::default(), &args).unwrap_err();
let result = f.eval(FunctionContext::default(), &args).await.unwrap_err();
assert_eq!(
"Missing ProcedureServiceHandler, not expected",
result.to_string()
);
}
#[test]
fn test_migrate_region() {
#[tokio::test]
async fn test_migrate_region() {
let f = MigrateRegionFunction;
let args = vec![1, 1, 1];
@@ -169,7 +169,7 @@ mod tests {
.map(|arg| Arc::new(UInt64Vector::from_slice([arg])) as _)
.collect::<Vec<_>>();
let result = f.eval(FunctionContext::mock(), &args).unwrap();
let result = f.eval(FunctionContext::mock(), &args).await.unwrap();
let expect: VectorRef = Arc::new(StringVector::from(vec!["test_pid"]));
assert_eq!(expect, result);

View File

@@ -187,7 +187,8 @@ fn build_struct(
}
impl crate::function::Function for #name {
#[async_trait::async_trait]
impl crate::function::AsyncFunction for #name {
fn name(&self) -> &'static str {
#display_name
}
@@ -200,7 +201,7 @@ fn build_struct(
#sig_fn()
}
fn eval(&self, func_ctx: crate::function::FunctionContext, columns: &[datatypes::vectors::VectorRef]) -> common_query::error::Result<datatypes::vectors::VectorRef> {
async fn eval(&self, func_ctx: crate::function::FunctionContext, columns: &[datatypes::vectors::VectorRef]) -> common_query::error::Result<datatypes::vectors::VectorRef> {
// Ensure under the `greptime` catalog for security
crate::ensure_greptime!(func_ctx);
@@ -212,51 +213,36 @@ fn build_struct(
};
let columns = Vec::from(columns);
// TODO(dennis): DataFusion doesn't support async UDF currently
std::thread::spawn(move || {
use snafu::OptionExt;
use datatypes::data_type::DataType;
use snafu::OptionExt;
use datatypes::data_type::DataType;
let query_ctx = &func_ctx.query_ctx;
let handler = func_ctx
.state
.#handler
.as_ref()
.context(#snafu_type)?;
let query_ctx = &func_ctx.query_ctx;
let handler = func_ctx
.state
.#handler
.as_ref()
.context(#snafu_type)?;
let mut builder = store_api::storage::ConcreteDataType::#ret()
.create_mutable_vector(rows_num);
let mut builder = store_api::storage::ConcreteDataType::#ret()
.create_mutable_vector(rows_num);
if columns_num == 0 {
let result = common_runtime::block_on_global(async move {
#fn_name(handler, query_ctx, &[]).await
})?;
if columns_num == 0 {
let result = #fn_name(handler, query_ctx, &[]).await?;
builder.push_value_ref(result.as_value_ref());
} else {
for i in 0..rows_num {
let args: Vec<_> = columns.iter()
.map(|vector| vector.get_ref(i))
.collect();
let result = #fn_name(handler, query_ctx, &args).await?;
builder.push_value_ref(result.as_value_ref());
} else {
for i in 0..rows_num {
let args: Vec<_> = columns.iter()
.map(|vector| vector.get_ref(i))
.collect();
let result = common_runtime::block_on_global(async move {
#fn_name(handler, query_ctx, &args).await
})?;
builder.push_value_ref(result.as_value_ref());
}
}
}
Ok(builder.to_vector())
})
.join()
.map_err(|e| {
common_telemetry::error!(e; "Join thread error");
common_query::error::Error::ThreadJoin {
location: snafu::Location::default(),
}
})?
Ok(builder.to_vector())
}
}

View File

@@ -27,7 +27,7 @@ use common_runtime::Runtime;
use query::dataframe::DataFrame;
use query::plan::LogicalPlan;
use query::planner::LogicalPlanner;
use query::query_engine::DescribeResult;
use query::query_engine::{DescribeResult, QueryEngineState};
use query::{QueryEngine, QueryEngineContext};
use session::context::QueryContextRef;
use store_api::metadata::RegionMetadataRef;
@@ -86,6 +86,9 @@ impl QueryEngine for MockQueryEngine {
fn engine_context(&self, _query_ctx: QueryContextRef) -> QueryEngineContext {
unimplemented!()
}
fn engine_state(&self) -> &QueryEngineState {
unimplemented!()
}
}
/// Create a region server without any engine

View File

@@ -441,6 +441,9 @@ pub fn check_permission(
}
match stmt {
// Will be checked in execution.
// TODO(dennis): add a hook for admin commands.
Statement::Admin(_) => {}
// These are executed by query engine, and will be checked there.
Statement::Query(_) | Statement::Explain(_) | Statement::Tql(_) | Statement::Delete(_) => {}
// database ops won't be checked

View File

@@ -42,6 +42,19 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to execute admin function"))]
ExecuteAdminFunction {
#[snafu(implicit)]
location: Location,
source: common_query::error::Error,
},
#[snafu(display("Failed to build admin function args: {msg}"))]
BuildAdminFunctionArgs { msg: String },
#[snafu(display("Expected {expected} args, but actual {actual}"))]
FunctionArityMismatch { expected: usize, actual: usize },
#[snafu(display("Failed to invalidate table cache"))]
InvalidateTableCache {
#[snafu(implicit)]
@@ -209,6 +222,9 @@ pub enum Error {
#[snafu(display("Table not found: {}", table_name))]
TableNotFound { table_name: String },
#[snafu(display("Admin function not found: {}", name))]
AdminFunctionNotFound { name: String },
#[snafu(display("Flow not found: {}", flow_name))]
FlowNotFound { flow_name: String },
@@ -546,6 +562,13 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to build record batch"))]
BuildRecordBatch {
#[snafu(implicit)]
location: Location,
source: common_recordbatch::error::Error,
},
#[snafu(display("Failed to read orc schema"))]
ReadOrc {
source: common_datasource::error::Error,
@@ -792,9 +815,12 @@ impl ErrorExt for Error {
| Error::InvalidViewName { .. }
| Error::InvalidView { .. }
| Error::InvalidExpr { .. }
| Error::AdminFunctionNotFound { .. }
| Error::ViewColumnsMismatch { .. }
| Error::InvalidViewStmt { .. }
| Error::ConvertIdentifier { .. }
| Error::BuildAdminFunctionArgs { .. }
| Error::FunctionArityMismatch { .. }
| Error::InvalidPartition { .. }
| Error::PhysicalExpr { .. } => StatusCode::InvalidArguments,
@@ -902,6 +928,9 @@ impl ErrorExt for Error {
| Error::InvalidTimestampRange { .. } => StatusCode::InvalidArguments,
Error::CreateLogicalTables { .. } => StatusCode::Unexpected,
Error::ExecuteAdminFunction { source, .. } => source.status_code(),
Error::BuildRecordBatch { source, .. } => source.status_code(),
}
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod admin;
mod copy_database;
mod copy_table_from;
mod copy_table_to;
@@ -277,6 +278,7 @@ impl StatementExecutor {
Statement::ShowIndex(show_index) => self.show_index(show_index, query_ctx).await,
Statement::ShowStatus(_) => self.show_status(query_ctx).await,
Statement::Use(db) => self.use_database(db, query_ctx).await,
Statement::Admin(admin) => self.execute_admin_command(admin, query_ctx).await,
}
}

View File

@@ -0,0 +1,233 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_function::function::FunctionContext;
use common_function::function_registry::FUNCTION_REGISTRY;
use common_query::prelude::TypeSignature;
use common_query::Output;
use common_recordbatch::{RecordBatch, RecordBatches};
use common_telemetry::tracing;
use common_time::Timezone;
use datatypes::data_type::DataType;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::value::Value;
use datatypes::vectors::VectorRef;
use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
use sql::ast::{Expr, FunctionArg, FunctionArgExpr, Value as SqlValue};
use sql::statements::admin::Admin;
use sql::statements::sql_value_to_value;
use crate::error::{self, Result};
use crate::statement::StatementExecutor;
const DUMMY_COLUMN: &str = "<dummy>";
impl StatementExecutor {
/// Execute the [`Admin`] statement and returns the output.
#[tracing::instrument(skip_all)]
pub(super) async fn execute_admin_command(
&self,
stmt: Admin,
query_ctx: QueryContextRef,
) -> Result<Output> {
let Admin::Func(func) = &stmt;
// the function name should be in lower case.
let func_name = func.name.to_string().to_lowercase();
let admin_func = FUNCTION_REGISTRY
.get_async_function(&func_name)
.context(error::AdminFunctionNotFoundSnafu { name: func_name })?;
let signature = admin_func.signature();
let arg_values = func
.args
.iter()
.map(|arg| {
let FunctionArg::Unnamed(FunctionArgExpr::Expr(Expr::Value(value))) = arg else {
return error::BuildAdminFunctionArgsSnafu {
msg: "unsupported function arg {arg}",
}
.fail();
};
Ok(value)
})
.collect::<Result<Vec<_>>>()?;
let args = args_to_vector(&signature.type_signature, &arg_values, &query_ctx)?;
let arg_types = args.iter().map(|arg| arg.data_type()).collect::<Vec<_>>();
let func_ctx = FunctionContext {
query_ctx,
state: self.query_engine.engine_state().function_state(),
};
let result = admin_func
.eval(func_ctx, &args)
.await
.context(error::ExecuteAdminFunctionSnafu)?;
let column_schemas = vec![ColumnSchema::new(
// Use statement as the result column name
stmt.to_string(),
admin_func
.return_type(&arg_types)
.context(error::ExecuteAdminFunctionSnafu)?,
false,
)];
let schema = Arc::new(Schema::new(column_schemas));
let batch =
RecordBatch::new(schema.clone(), vec![result]).context(error::BuildRecordBatchSnafu)?;
let batches =
RecordBatches::try_new(schema, vec![batch]).context(error::BuildRecordBatchSnafu)?;
Ok(Output::new_with_record_batches(batches))
}
}
/// Try to cast the arguments to vectors by function's signature.
fn args_to_vector(
type_signature: &TypeSignature,
args: &Vec<&SqlValue>,
query_ctx: &QueryContextRef,
) -> Result<Vec<VectorRef>> {
let tz = query_ctx.timezone();
match type_signature {
TypeSignature::Variadic(valid_types) => {
values_to_vectors_by_valid_types(valid_types, args, Some(&tz))
}
TypeSignature::Uniform(arity, valid_types) => {
ensure!(
*arity == args.len(),
error::FunctionArityMismatchSnafu {
actual: args.len(),
expected: *arity,
}
);
values_to_vectors_by_valid_types(valid_types, args, Some(&tz))
}
TypeSignature::Exact(data_types) => {
values_to_vectors_by_exact_types(data_types, args, Some(&tz))
}
TypeSignature::VariadicAny => {
let data_types = args
.iter()
.map(|value| try_get_data_type_for_sql_value(value))
.collect::<Result<Vec<_>>>()?;
values_to_vectors_by_exact_types(&data_types, args, Some(&tz))
}
TypeSignature::Any(arity) => {
ensure!(
*arity == args.len(),
error::FunctionArityMismatchSnafu {
actual: args.len(),
expected: *arity,
}
);
let data_types = args
.iter()
.map(|value| try_get_data_type_for_sql_value(value))
.collect::<Result<Vec<_>>>()?;
values_to_vectors_by_exact_types(&data_types, args, Some(&tz))
}
TypeSignature::OneOf(type_sigs) => {
for type_sig in type_sigs {
if let Ok(vectors) = args_to_vector(type_sig, args, query_ctx) {
return Ok(vectors);
}
}
error::BuildAdminFunctionArgsSnafu {
msg: "function signature not match",
}
.fail()
}
}
}
/// Try to cast sql values to vectors by exact data types.
fn values_to_vectors_by_exact_types(
exact_types: &[ConcreteDataType],
args: &[&SqlValue],
tz: Option<&Timezone>,
) -> Result<Vec<VectorRef>> {
args.iter()
.zip(exact_types.iter())
.map(|(value, data_type)| {
let value = sql_value_to_value(DUMMY_COLUMN, data_type, value, tz, None)
.context(error::ParseSqlValueSnafu)?;
Ok(value_to_vector(value))
})
.collect()
}
/// Try to cast sql values to vectors by valid data types.
fn values_to_vectors_by_valid_types(
valid_types: &[ConcreteDataType],
args: &[&SqlValue],
tz: Option<&Timezone>,
) -> Result<Vec<VectorRef>> {
args.iter()
.map(|value| {
for data_type in valid_types {
if let Ok(value) = sql_value_to_value(DUMMY_COLUMN, data_type, value, tz, None) {
return Ok(value_to_vector(value));
}
}
error::BuildAdminFunctionArgsSnafu {
msg: "failed to cast {value}",
}
.fail()
})
.collect::<Result<Vec<_>>>()
}
/// Build a [`VectorRef`] from [`Value`]
fn value_to_vector(value: Value) -> VectorRef {
let data_type = value.data_type();
let mut mutable_vector = data_type.create_mutable_vector(1);
mutable_vector.push_value_ref(value.as_value_ref());
mutable_vector.to_vector()
}
/// Try to infer the data type from sql value.
fn try_get_data_type_for_sql_value(value: &SqlValue) -> Result<ConcreteDataType> {
match value {
SqlValue::Number(_, _) => Ok(ConcreteDataType::float64_datatype()),
SqlValue::Null => Ok(ConcreteDataType::null_datatype()),
SqlValue::Boolean(_) => Ok(ConcreteDataType::boolean_datatype()),
SqlValue::HexStringLiteral(_)
| SqlValue::DoubleQuotedString(_)
| SqlValue::SingleQuotedString(_) => Ok(ConcreteDataType::string_datatype()),
_ => error::BuildAdminFunctionArgsSnafu {
msg: format!("unsupported sql value: {value}"),
}
.fail(),
}
}

View File

@@ -447,6 +447,10 @@ impl QueryEngine for DatafusionQueryEngine {
state.config_mut().set_extension(query_ctx.clone());
QueryEngineContext::new(state, query_ctx)
}
fn engine_state(&self) -> &QueryEngineState {
&self.state
}
}
impl QueryExecutor for DatafusionQueryEngine {

View File

@@ -93,6 +93,9 @@ pub trait QueryEngine: Send + Sync {
/// Create a [`QueryEngineContext`].
fn engine_context(&self, query_ctx: QueryContextRef) -> QueryEngineContext;
/// Retrieve the query engine state [`QueryEngineState`]
fn engine_state(&self) -> &QueryEngineState;
}
pub struct QueryEngineFactory {

View File

@@ -159,8 +159,10 @@ impl<'a> ParserContext<'a> {
Keyword::SET => self.parse_set_variables(),
Keyword::ADMIN => self.parse_admin_command(),
Keyword::NoKeyword
if w.value.to_uppercase() == tql_parser::TQL && w.quote_style.is_none() =>
if w.quote_style.is_none() && w.value.to_uppercase() == tql_parser::TQL =>
{
self.parse_tql()
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub(crate) mod admin_parser;
mod alter_parser;
pub(crate) mod copy_parser;
pub(crate) mod create_parser;

View File

@@ -0,0 +1,124 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use snafu::ResultExt;
use crate::ast::Expr;
use crate::error::{Result, SyntaxSnafu};
use crate::parser::ParserContext;
use crate::statements::admin::Admin;
use crate::statements::statement::Statement;
/// `admin` extension parser: `admin function(arg1, arg2, ...)`
/// or `admin function`
impl<'a> ParserContext<'a> {
/// Parse `admin function(arg1, arg2, ...)` or `admin function` statement
pub(crate) fn parse_admin_command(&mut self) -> Result<Statement> {
let _token = self.parser.next_token();
let object_name = self.parser.parse_object_name(false).context(SyntaxSnafu)?;
let func = match self
.parser
.parse_function(object_name)
.context(SyntaxSnafu)?
{
Expr::Function(f) => f,
_ => {
return self.unsupported(self.peek_token_as_string());
}
};
Ok(Statement::Admin(Admin::Func(func)))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::ast::{Expr, Function, FunctionArg, FunctionArgExpr, Value};
use crate::dialect::GreptimeDbDialect;
use crate::parser::ParseOptions;
#[test]
fn test_parse_admin_function() {
let sql = "ADMIN flush_table('test')";
let mut result =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap();
assert_eq!(1, result.len());
let stmt = result.remove(0);
match &stmt {
Statement::Admin(Admin::Func(Function { name, args, .. })) => {
assert_eq!("flush_table", name.to_string());
assert_eq!(args.len(), 1);
assert!(matches!(&args[0],
FunctionArg::Unnamed(FunctionArgExpr::Expr(
Expr::Value(Value::SingleQuotedString(s))
)) if s == "test"));
}
_ => unreachable!(),
}
assert_eq!(sql, stmt.to_string());
}
#[test]
fn test_parse_admin_function_without_args() {
let sql = "ADMIN test()";
let mut result =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap();
assert_eq!(1, result.len());
let stmt = result.remove(0);
match &stmt {
Statement::Admin(Admin::Func(Function { name, args, .. })) => {
assert_eq!("test", name.to_string());
assert_eq!(args.len(), 0);
}
_ => unreachable!(),
}
assert_eq!("ADMIN test()", stmt.to_string());
}
#[test]
fn test_invalid_admin_statement() {
let sql = "ADMIN";
assert!(ParserContext::create_with_dialect(
sql,
&GreptimeDbDialect {},
ParseOptions::default()
)
.is_err());
let sql = "ADMIN test";
assert!(ParserContext::create_with_dialect(
sql,
&GreptimeDbDialect {},
ParseOptions::default()
)
.is_err());
let sql = "ADMIN test test";
assert!(ParserContext::create_with_dialect(
sql,
&GreptimeDbDialect {},
ParseOptions::default()
)
.is_err());
}
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod admin;
pub mod alter;
pub mod copy;
pub mod create;

View File

@@ -0,0 +1,34 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::Display;
use sqlparser_derive::{Visit, VisitMut};
use crate::ast::Function;
/// `ADMIN` statement to execute some administration commands.
#[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut)]
pub enum Admin {
/// Run a admin function.
Func(Function),
}
impl Display for Admin {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Admin::Func(func) => write!(f, "ADMIN {func}"),
}
}
}

View File

@@ -19,6 +19,7 @@ use sqlparser::ast::Statement as SpStatement;
use sqlparser_derive::{Visit, VisitMut};
use crate::error::{ConvertToDfStatementSnafu, Error};
use crate::statements::admin::Admin;
use crate::statements::alter::AlterTable;
use crate::statements::create::{
CreateDatabase, CreateExternalTable, CreateFlow, CreateTable, CreateTableLike, CreateView,
@@ -110,6 +111,8 @@ pub enum Statement {
ShowVariables(ShowVariables),
// USE
Use(String),
// Admin statement(extension)
Admin(Admin),
}
impl Display for Statement {
@@ -154,6 +157,7 @@ impl Display for Statement {
}
Statement::CreateView(s) => s.fmt(f),
Statement::Use(s) => s.fmt(f),
Statement::Admin(admin) => admin.fmt(f),
}
}
}

View File

@@ -99,7 +99,7 @@ pub const GT_FUZZ_CLUSTER_NAME: &str = "GT_FUZZ_CLUSTER_NAME";
/// Flushes memtable to SST file.
pub async fn flush_memtable(e: &Pool<MySql>, table_name: &Ident) -> Result<()> {
let sql = format!("SELECT flush_table(\"{}\")", table_name);
let sql = format!("admin flush_table(\"{}\")", table_name);
let result = sqlx::query(&sql)
.execute(e)
.await
@@ -111,7 +111,7 @@ pub async fn flush_memtable(e: &Pool<MySql>, table_name: &Ident) -> Result<()> {
/// Triggers a compaction for table
pub async fn compact_table(e: &Pool<MySql>, table_name: &Ident) -> Result<()> {
let sql = format!("SELECT compact_table(\"{}\")", table_name);
let sql = format!("admin compact_table(\"{}\")", table_name);
let result = sqlx::query(&sql)
.execute(e)
.await

View File

@@ -35,9 +35,8 @@ pub async fn migrate_region(
to_peer_id: u64,
timeout_secs: u64,
) -> String {
let sql = format!(
"select migrate_region({region_id}, {from_peer_id}, {to_peer_id}, {timeout_secs}) as output;"
);
let sql =
format!("admin migrate_region({region_id}, {from_peer_id}, {to_peer_id}, {timeout_secs});");
let result = sqlx::query(&sql)
.fetch_one(e)
.await

View File

@@ -23,7 +23,7 @@ use crate::error;
/// Fetches the state of a procedure.
pub async fn procedure_state(e: &Pool<MySql>, procedure_id: &str) -> String {
let sql = format!("select procedure_state(\"{procedure_id}\");");
let sql = format!("admin procedure_state(\"{procedure_id}\");");
let result = sqlx::query(&sql)
.fetch_one(e)
.await

View File

@@ -1118,9 +1118,9 @@ async fn trigger_migration_by_sql(
from_peer_id: u64,
to_peer_id: u64,
) -> String {
let OutputData::Stream(stream) = run_sql(
let OutputData::RecordBatches(recordbatches) = run_sql(
&cluster.frontend,
&format!("select migrate_region({region_id}, {from_peer_id}, {to_peer_id})"),
&format!("admin migrate_region({region_id}, {from_peer_id}, {to_peer_id})"),
QueryContext::arc(),
)
.await
@@ -1130,8 +1130,6 @@ async fn trigger_migration_by_sql(
unreachable!();
};
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
info!("SQL result:\n {}", recordbatches.pretty_print().unwrap());
let Value::String(procedure_id) = recordbatches.take()[0].column(0).get(0) else {
@@ -1143,9 +1141,9 @@ async fn trigger_migration_by_sql(
/// Query procedure state by SQL.
async fn query_procedure_by_sql(instance: &Arc<Instance>, pid: &str) -> String {
let OutputData::Stream(stream) = run_sql(
let OutputData::RecordBatches(recordbatches) = run_sql(
instance,
&format!("select procedure_state('{pid}')"),
&format!("admin procedure_state('{pid}')"),
QueryContext::arc(),
)
.await
@@ -1155,8 +1153,6 @@ async fn query_procedure_by_sql(instance: &Arc<Instance>, pid: &str) -> String {
unreachable!();
};
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
info!("SQL result:\n {}", recordbatches.pretty_print().unwrap());
let Value::String(state) = recordbatches.take()[0].column(0).get(0) else {

View File

@@ -31,22 +31,8 @@ SELECT * FROM my_table;
| 22 | f | 1970-01-01T00:00:00.005 |
+----+---+-------------------------+
SELECT flush_region(greptime_partition_id) from information_schema.partitions WHERE table_name = 'my_table';
+-------------------------------------------------------------------+
| flush_region(information_schema.partitions.greptime_partition_id) |
+-------------------------------------------------------------------+
| 0 |
+-------------------------------------------------------------------+
SELECT compact_region(greptime_partition_id) from information_schema.partitions WHERE table_name = 'my_table';
+---------------------------------------------------------------------+
| compact_region(information_schema.partitions.greptime_partition_id) |
+---------------------------------------------------------------------+
| 0 |
+---------------------------------------------------------------------+
-- SELECT flush_region(greptime_partition_id) from information_schema.partitions WHERE table_name = 'my_table'; --
-- SELECT compact_region(greptime_partition_id) from information_schema.partitions WHERE table_name = 'my_table'; --
SELECT * FROM my_table;
+----+---+-------------------------+

View File

@@ -17,9 +17,9 @@ INSERT INTO my_table VALUES
SELECT * FROM my_table;
SELECT flush_region(greptime_partition_id) from information_schema.partitions WHERE table_name = 'my_table';
-- SELECT flush_region(greptime_partition_id) from information_schema.partitions WHERE table_name = 'my_table'; --
SELECT compact_region(greptime_partition_id) from information_schema.partitions WHERE table_name = 'my_table';
-- SELECT compact_region(greptime_partition_id) from information_schema.partitions WHERE table_name = 'my_table'; --
SELECT * FROM my_table;

View File

@@ -15,10 +15,10 @@ SELECT * FROM test order by i;
| 1970-01-01T00:00:00.002 | 12 | 5 |
+-------------------------+----+---+
SELECT FLUSH_TABLE('test');
ADMIN FLUSH_TABLE('test');
+---------------------------+
| flush_table(Utf8("test")) |
| ADMIN FLUSH_TABLE('test') |
+---------------------------+
| 0 |
+---------------------------+

View File

@@ -4,7 +4,7 @@ INSERT INTO test(i, j, k) VALUES (1, 11, 5), (2, 12, 5);
SELECT * FROM test order by i;
SELECT FLUSH_TABLE('test');
ADMIN FLUSH_TABLE('test');
ALTER TABLE test DROP COLUMN j;

View File

@@ -16,13 +16,13 @@ Affected Rows: 0
-- TODO(discord9): confirm if it's necessary to flush flow here?
-- because flush_flow result is at most 1
select flush_flow('test_numbers_basic')<=1;
admin flush_flow('test_numbers_basic');
+----------------------------------------------------+
| flush_flow(Utf8("test_numbers_basic")) <= Int64(1) |
+----------------------------------------------------+
| true |
+----------------------------------------------------+
+----------------------------------------+
| ADMIN flush_flow('test_numbers_basic') |
+----------------------------------------+
| 0 |
+----------------------------------------+
-- SQLNESS ARG restart=true
INSERT INTO numbers_input_basic
@@ -32,13 +32,13 @@ VALUES
Affected Rows: 2
select flush_flow('test_numbers_basic')<=1;
admin flush_flow('test_numbers_basic');
+----------------------------------------------------+
| flush_flow(Utf8("test_numbers_basic")) <= Int64(1) |
+----------------------------------------------------+
| true |
+----------------------------------------------------+
+----------------------------------------+
| ADMIN flush_flow('test_numbers_basic') |
+----------------------------------------+
| 1 |
+----------------------------------------+
SELECT col_0, window_start, window_end FROM out_num_cnt_basic;
@@ -48,13 +48,13 @@ SELECT col_0, window_start, window_end FROM out_num_cnt_basic;
| 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 |
+-------+---------------------+---------------------+
select flush_flow('test_numbers_basic')<=1;
admin flush_flow('test_numbers_basic');
+----------------------------------------------------+
| flush_flow(Utf8("test_numbers_basic")) <= Int64(1) |
+----------------------------------------------------+
| true |
+----------------------------------------------------+
+----------------------------------------+
| ADMIN flush_flow('test_numbers_basic') |
+----------------------------------------+
| 0 |
+----------------------------------------+
INSERT INTO numbers_input_basic
VALUES
@@ -63,13 +63,13 @@ VALUES
Affected Rows: 2
select flush_flow('test_numbers_basic')<=1;
admin flush_flow('test_numbers_basic');
+----------------------------------------------------+
| flush_flow(Utf8("test_numbers_basic")) <= Int64(1) |
+----------------------------------------------------+
| true |
+----------------------------------------------------+
+----------------------------------------+
| ADMIN flush_flow('test_numbers_basic') |
+----------------------------------------+
| 1 |
+----------------------------------------+
SELECT col_0, window_start, window_end FROM out_num_cnt_basic;
@@ -164,13 +164,13 @@ INSERT INTO bytes_log VALUES
Affected Rows: 2
SELECT flush_flow('find_approx_rate')<=1;
admin flush_flow('find_approx_rate');
+--------------------------------------------------+
| flush_flow(Utf8("find_approx_rate")) <= Int64(1) |
+--------------------------------------------------+
| true |
+--------------------------------------------------+
+--------------------------------------+
| ADMIN flush_flow('find_approx_rate') |
+--------------------------------------+
| 1 |
+--------------------------------------+
SELECT rate, time_window FROM approx_rate;
@@ -186,13 +186,13 @@ INSERT INTO bytes_log VALUES
Affected Rows: 2
SELECT flush_flow('find_approx_rate')<=1;
admin flush_flow('find_approx_rate');
+--------------------------------------------------+
| flush_flow(Utf8("find_approx_rate")) <= Int64(1) |
+--------------------------------------------------+
| true |
+--------------------------------------------------+
+--------------------------------------+
| ADMIN flush_flow('find_approx_rate') |
+--------------------------------------+
| 1 |
+--------------------------------------+
SELECT rate, time_window FROM approx_rate;

View File

@@ -12,7 +12,7 @@ SELECT sum(number) FROM numbers_input_basic GROUP BY tumble(ts, '1 second', '202
-- TODO(discord9): confirm if it's necessary to flush flow here?
-- because flush_flow result is at most 1
select flush_flow('test_numbers_basic')<=1;
admin flush_flow('test_numbers_basic');
-- SQLNESS ARG restart=true
INSERT INTO numbers_input_basic
@@ -20,18 +20,18 @@ VALUES
(20, "2021-07-01 00:00:00.200"),
(22, "2021-07-01 00:00:00.600");
select flush_flow('test_numbers_basic')<=1;
admin flush_flow('test_numbers_basic');
SELECT col_0, window_start, window_end FROM out_num_cnt_basic;
select flush_flow('test_numbers_basic')<=1;
admin flush_flow('test_numbers_basic');
INSERT INTO numbers_input_basic
VALUES
(23,"2021-07-01 00:00:01.000"),
(24,"2021-07-01 00:00:01.500");
select flush_flow('test_numbers_basic')<=1;
admin flush_flow('test_numbers_basic');
SELECT col_0, window_start, window_end FROM out_num_cnt_basic;
@@ -83,7 +83,7 @@ INSERT INTO bytes_log VALUES
(101, '2025-01-01 00:00:01'),
(300, '2025-01-01 00:00:29');
SELECT flush_flow('find_approx_rate')<=1;
admin flush_flow('find_approx_rate');
SELECT rate, time_window FROM approx_rate;
@@ -91,7 +91,7 @@ INSERT INTO bytes_log VALUES
(450, '2025-01-01 00:00:32'),
(500, '2025-01-01 00:00:37');
SELECT flush_flow('find_approx_rate')<=1;
admin flush_flow('find_approx_rate');
SELECT rate, time_window FROM approx_rate;

View File

@@ -15,13 +15,13 @@ SELECT sum(abs(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second
Affected Rows: 0
select flush_flow('test_numbers_df_func')<=1;
admin flush_flow('test_numbers_df_func');
+------------------------------------------------------+
| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) |
+------------------------------------------------------+
| true |
+------------------------------------------------------+
+------------------------------------------+
| ADMIN flush_flow('test_numbers_df_func') |
+------------------------------------------+
| 0 |
+------------------------------------------+
INSERT INTO numbers_input_df_func
VALUES
@@ -31,13 +31,13 @@ VALUES
Affected Rows: 2
-- flush flow to make sure that table is created and data is inserted
select flush_flow('test_numbers_df_func')<=1;
admin flush_flow('test_numbers_df_func');
+------------------------------------------------------+
| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) |
+------------------------------------------------------+
| true |
+------------------------------------------------------+
+------------------------------------------+
| ADMIN flush_flow('test_numbers_df_func') |
+------------------------------------------+
| 1 |
+------------------------------------------+
SELECT col_0, window_start, window_end FROM out_num_cnt_df_func;
@@ -47,13 +47,13 @@ SELECT col_0, window_start, window_end FROM out_num_cnt_df_func;
| 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 |
+-------+---------------------+---------------------+
select flush_flow('test_numbers_df_func')<=1;
admin flush_flow('test_numbers_df_func');
+------------------------------------------------------+
| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) |
+------------------------------------------------------+
| true |
+------------------------------------------------------+
+------------------------------------------+
| ADMIN flush_flow('test_numbers_df_func') |
+------------------------------------------+
| 0 |
+------------------------------------------+
INSERT INTO numbers_input_df_func
VALUES
@@ -62,13 +62,13 @@ VALUES
Affected Rows: 2
select flush_flow('test_numbers_df_func')<=1;
admin flush_flow('test_numbers_df_func');
+------------------------------------------------------+
| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) |
+------------------------------------------------------+
| true |
+------------------------------------------------------+
+------------------------------------------+
| ADMIN flush_flow('test_numbers_df_func') |
+------------------------------------------+
| 1 |
+------------------------------------------+
SELECT col_0, window_start, window_end FROM out_num_cnt_df_func;
@@ -108,13 +108,13 @@ SELECT abs(sum(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second
Affected Rows: 0
select flush_flow('test_numbers_df_func')<=1;
admin flush_flow('test_numbers_df_func');
+------------------------------------------------------+
| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) |
+------------------------------------------------------+
| true |
+------------------------------------------------------+
+------------------------------------------+
| ADMIN flush_flow('test_numbers_df_func') |
+------------------------------------------+
| 0 |
+------------------------------------------+
INSERT INTO numbers_input_df_func
VALUES
@@ -124,13 +124,13 @@ VALUES
Affected Rows: 2
-- flush flow to make sure that table is created and data is inserted
select flush_flow('test_numbers_df_func')<=1;
admin flush_flow('test_numbers_df_func');
+------------------------------------------------------+
| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) |
+------------------------------------------------------+
| true |
+------------------------------------------------------+
+------------------------------------------+
| ADMIN flush_flow('test_numbers_df_func') |
+------------------------------------------+
| 1 |
+------------------------------------------+
SELECT col_0, window_start, window_end FROM out_num_cnt_df_func;
@@ -140,13 +140,13 @@ SELECT col_0, window_start, window_end FROM out_num_cnt_df_func;
| 2 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 |
+-------+---------------------+---------------------+
select flush_flow('test_numbers_df_func')<=1;
admin flush_flow('test_numbers_df_func');
+------------------------------------------------------+
| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) |
+------------------------------------------------------+
| true |
+------------------------------------------------------+
+------------------------------------------+
| ADMIN flush_flow('test_numbers_df_func') |
+------------------------------------------+
| 0 |
+------------------------------------------+
INSERT INTO numbers_input_df_func
VALUES
@@ -155,13 +155,13 @@ VALUES
Affected Rows: 2
select flush_flow('test_numbers_df_func')<=1;
admin flush_flow('test_numbers_df_func');
+------------------------------------------------------+
| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) |
+------------------------------------------------------+
| true |
+------------------------------------------------------+
+------------------------------------------+
| ADMIN flush_flow('test_numbers_df_func') |
+------------------------------------------+
| 1 |
+------------------------------------------+
SELECT col_0, window_start, window_end FROM out_num_cnt_df_func;
@@ -201,13 +201,13 @@ SELECT max(number) - min(number), date_bin(INTERVAL '1 second', ts, '2021-07-01
Affected Rows: 0
select flush_flow('test_numbers_df_func')<=1;
admin flush_flow('test_numbers_df_func');
+------------------------------------------------------+
| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) |
+------------------------------------------------------+
| true |
+------------------------------------------------------+
+------------------------------------------+
| ADMIN flush_flow('test_numbers_df_func') |
+------------------------------------------+
| 0 |
+------------------------------------------+
INSERT INTO numbers_input_df_func
VALUES
@@ -216,13 +216,13 @@ VALUES
Affected Rows: 2
select flush_flow('test_numbers_df_func')<=1;
admin flush_flow('test_numbers_df_func');
+------------------------------------------------------+
| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) |
+------------------------------------------------------+
| true |
+------------------------------------------------------+
+------------------------------------------+
| ADMIN flush_flow('test_numbers_df_func') |
+------------------------------------------+
| 1 |
+------------------------------------------+
SELECT col_0, col_1 FROM out_num_cnt_df_func;
@@ -232,13 +232,13 @@ SELECT col_0, col_1 FROM out_num_cnt_df_func;
| 2 | 2021-07-01T00:00:00 |
+-------+---------------------+
select flush_flow('test_numbers_df_func')<=1;
admin flush_flow('test_numbers_df_func');
+------------------------------------------------------+
| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) |
+------------------------------------------------------+
| true |
+------------------------------------------------------+
+------------------------------------------+
| ADMIN flush_flow('test_numbers_df_func') |
+------------------------------------------+
| 0 |
+------------------------------------------+
INSERT INTO numbers_input_df_func
VALUES
@@ -247,13 +247,13 @@ VALUES
Affected Rows: 2
select flush_flow('test_numbers_df_func')<=1;
admin flush_flow('test_numbers_df_func');
+------------------------------------------------------+
| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) |
+------------------------------------------------------+
| true |
+------------------------------------------------------+
+------------------------------------------+
| ADMIN flush_flow('test_numbers_df_func') |
+------------------------------------------+
| 1 |
+------------------------------------------+
SELECT col_0, col_1 FROM out_num_cnt_df_func;
@@ -293,13 +293,13 @@ SELECT date_trunc('second', ts), sum(number) FROM numbers_input_df_func GROUP BY
Affected Rows: 0
select flush_flow('test_numbers_df_func')<=1;
admin flush_flow('test_numbers_df_func');
+------------------------------------------------------+
| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) |
+------------------------------------------------------+
| true |
+------------------------------------------------------+
+------------------------------------------+
| ADMIN flush_flow('test_numbers_df_func') |
+------------------------------------------+
| 0 |
+------------------------------------------+
INSERT INTO numbers_input_df_func
VALUES
@@ -308,13 +308,13 @@ VALUES
Affected Rows: 2
select flush_flow('test_numbers_df_func')<=1;
admin flush_flow('test_numbers_df_func');
+------------------------------------------------------+
| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) |
+------------------------------------------------------+
| true |
+------------------------------------------------------+
+------------------------------------------+
| ADMIN flush_flow('test_numbers_df_func') |
+------------------------------------------+
| 1 |
+------------------------------------------+
SELECT col_0, col_1 FROM out_num_cnt;
@@ -324,13 +324,13 @@ SELECT col_0, col_1 FROM out_num_cnt;
| 2021-07-01T00:00:00 | 42 |
+---------------------+-------+
select flush_flow('test_numbers_df_func')<=1;
admin flush_flow('test_numbers_df_func');
+------------------------------------------------------+
| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) |
+------------------------------------------------------+
| true |
+------------------------------------------------------+
+------------------------------------------+
| ADMIN flush_flow('test_numbers_df_func') |
+------------------------------------------+
| 0 |
+------------------------------------------+
INSERT INTO numbers_input_df_func
VALUES
@@ -339,13 +339,13 @@ VALUES
Affected Rows: 2
select flush_flow('test_numbers_df_func')<=1;
admin flush_flow('test_numbers_df_func');
+------------------------------------------------------+
| flush_flow(Utf8("test_numbers_df_func")) <= Int64(1) |
+------------------------------------------------------+
| true |
+------------------------------------------------------+
+------------------------------------------+
| ADMIN flush_flow('test_numbers_df_func') |
+------------------------------------------+
| 1 |
+------------------------------------------+
SELECT col_0, col_1 FROM out_num_cnt;

View File

@@ -11,7 +11,7 @@ SINK TO out_num_cnt_df_func
AS
SELECT sum(abs(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00');
select flush_flow('test_numbers_df_func')<=1;
admin flush_flow('test_numbers_df_func');
INSERT INTO numbers_input_df_func
VALUES
@@ -19,18 +19,18 @@ VALUES
(22, "2021-07-01 00:00:00.600");
-- flush flow to make sure that table is created and data is inserted
select flush_flow('test_numbers_df_func')<=1;
admin flush_flow('test_numbers_df_func');
SELECT col_0, window_start, window_end FROM out_num_cnt_df_func;
select flush_flow('test_numbers_df_func')<=1;
admin flush_flow('test_numbers_df_func');
INSERT INTO numbers_input_df_func
VALUES
(23,"2021-07-01 00:00:01.000"),
(-24,"2021-07-01 00:00:01.500");
select flush_flow('test_numbers_df_func')<=1;
admin flush_flow('test_numbers_df_func');
SELECT col_0, window_start, window_end FROM out_num_cnt_df_func;
@@ -51,7 +51,7 @@ SINK TO out_num_cnt_df_func
AS
SELECT abs(sum(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00');
select flush_flow('test_numbers_df_func')<=1;
admin flush_flow('test_numbers_df_func');
INSERT INTO numbers_input_df_func
VALUES
@@ -59,18 +59,18 @@ VALUES
(22, "2021-07-01 00:00:00.600");
-- flush flow to make sure that table is created and data is inserted
select flush_flow('test_numbers_df_func')<=1;
admin flush_flow('test_numbers_df_func');
SELECT col_0, window_start, window_end FROM out_num_cnt_df_func;
select flush_flow('test_numbers_df_func')<=1;
admin flush_flow('test_numbers_df_func');
INSERT INTO numbers_input_df_func
VALUES
(23,"2021-07-01 00:00:01.000"),
(-24,"2021-07-01 00:00:01.500");
select flush_flow('test_numbers_df_func')<=1;
admin flush_flow('test_numbers_df_func');
SELECT col_0, window_start, window_end FROM out_num_cnt_df_func;
@@ -91,25 +91,25 @@ SINK TO out_num_cnt_df_func
AS
SELECT max(number) - min(number), date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00'::TimestampNanosecond) FROM numbers_input_df_func GROUP BY date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00'::TimestampNanosecond);
select flush_flow('test_numbers_df_func')<=1;
admin flush_flow('test_numbers_df_func');
INSERT INTO numbers_input_df_func
VALUES
(20, "2021-07-01 00:00:00.200"),
(22, "2021-07-01 00:00:00.600");
select flush_flow('test_numbers_df_func')<=1;
admin flush_flow('test_numbers_df_func');
SELECT col_0, col_1 FROM out_num_cnt_df_func;
select flush_flow('test_numbers_df_func')<=1;
admin flush_flow('test_numbers_df_func');
INSERT INTO numbers_input_df_func
VALUES
(23,"2021-07-01 00:00:01.000"),
(24,"2021-07-01 00:00:01.500");
select flush_flow('test_numbers_df_func')<=1;
admin flush_flow('test_numbers_df_func');
SELECT col_0, col_1 FROM out_num_cnt_df_func;
@@ -131,25 +131,25 @@ SINK TO out_num_cnt
AS
SELECT date_trunc('second', ts), sum(number) FROM numbers_input_df_func GROUP BY date_trunc('second', ts);
select flush_flow('test_numbers_df_func')<=1;
admin flush_flow('test_numbers_df_func');
INSERT INTO numbers_input_df_func
VALUES
(20, "2021-07-01 00:00:00.200"),
(22, "2021-07-01 00:00:00.600");
select flush_flow('test_numbers_df_func')<=1;
admin flush_flow('test_numbers_df_func');
SELECT col_0, col_1 FROM out_num_cnt;
select flush_flow('test_numbers_df_func')<=1;
admin flush_flow('test_numbers_df_func');
INSERT INTO numbers_input_df_func
VALUES
(23,"2021-07-01 00:00:01.000"),
(24,"2021-07-01 00:00:01.500");
select flush_flow('test_numbers_df_func')<=1;
admin flush_flow('test_numbers_df_func');
SELECT col_0, col_1 FROM out_num_cnt;

View File

@@ -19,18 +19,18 @@ SELECT * FROM test;
| 1970-01-01T00:00:00.005 |
+-------------------------+
SELECT FLUSH_TABLE('test');
ADMIN FLUSH_TABLE('test');
+---------------------------+
| flush_table(Utf8("test")) |
| ADMIN FLUSH_TABLE('test') |
+---------------------------+
| 0 |
+---------------------------+
SELECT COMPACT_TABLE('test');
ADMIN COMPACT_TABLE('test');
+-----------------------------+
| compact_table(Utf8("test")) |
| ADMIN COMPACT_TABLE('test') |
+-----------------------------+
| 0 |
+-----------------------------+

View File

@@ -6,9 +6,9 @@ INSERT INTO test VALUES (1), (2), (3), (4), (5);
SELECT * FROM test;
SELECT FLUSH_TABLE('test');
ADMIN FLUSH_TABLE('test');
SELECT COMPACT_TABLE('test');
ADMIN COMPACT_TABLE('test');
--- doesn't change anything ---
SELECT * FROM test;