chore: merge table scan metrics

This commit is contained in:
shuiyisong
2023-06-23 14:44:47 +08:00
48 changed files with 864 additions and 148 deletions

2
Cargo.lock generated
View File

@@ -3223,6 +3223,7 @@ dependencies = [
"common-runtime",
"common-telemetry",
"common-test-util",
"common-time",
"datafusion",
"datafusion-common",
"datafusion-expr",
@@ -8520,6 +8521,7 @@ dependencies = [
"axum-macros",
"axum-test-helper",
"base64 0.13.1",
"build-data",
"bytes",
"catalog",
"chrono",

7
Cross.toml Normal file
View File

@@ -0,0 +1,7 @@
[build]
pre-build = [
"dpkg --add-architecture $CROSS_DEB_ARCH",
"apt update && apt install -y unzip zlib1g-dev:$CROSS_DEB_ARCH",
"curl -LO https://github.com/protocolbuffers/protobuf/releases/download/v3.15.8/protoc-3.15.8-linux-x86_64.zip && unzip protoc-3.15.8-linux-x86_64.zip -d /usr/",
"chmod a+x /usr/bin/protoc && chmod -R a+rx /usr/include/google",
]

View File

@@ -26,8 +26,8 @@ tcp_nodelay = true
[wal]
# WAL data directory
# dir = "/tmp/greptimedb/wal"
file_size = "1GB"
purge_threshold = "50GB"
file_size = "256MB"
purge_threshold = "4GB"
purge_interval = "10m"
read_batch_size = 128
sync_write = false

View File

@@ -81,9 +81,9 @@ addr = "127.0.0.1:4004"
# WAL data directory
# dir = "/tmp/greptimedb/wal"
# WAL file size in bytes.
file_size = "1GB"
# WAL purge threshold in bytes.
purge_threshold = "50GB"
file_size = "256MB"
# WAL purge threshold.
purge_threshold = "4GB"
# WAL purge interval in seconds.
purge_interval = "10m"
# WAL read batch size.

View File

@@ -467,10 +467,7 @@ impl CatalogManager for LocalCatalogManager {
.ident
.table_id;
if !self.system.deregister_table(&request, table_id).await? {
return Ok(false);
}
self.system.deregister_table(&request, table_id).await?;
self.catalogs.deregister_table(request).await
}
}

View File

@@ -662,7 +662,7 @@ impl CatalogManager for RemoteCatalogManager {
.await;
}
Ok(result.is_none())
Ok(true)
}
async fn register_schema(&self, request: RegisterSchemaRequest) -> Result<bool> {
@@ -954,15 +954,26 @@ impl SchemaProvider for RemoteSchemaProvider {
async fn table_names(&self) -> Result<Vec<String>> {
let key_prefix = build_table_regional_prefix(&self.catalog_name, &self.schema_name);
let iter = self.backend.range(key_prefix.as_bytes());
let table_names = iter
let regional_keys = iter
.map(|kv| {
let Kv(key, _) = kv?;
let regional_key = TableRegionalKey::parse(String::from_utf8_lossy(&key))
.context(InvalidCatalogValueSnafu)?;
Ok(regional_key.table_name)
Ok(regional_key)
})
.try_collect()
.try_collect::<Vec<_>>()
.await?;
let table_names = regional_keys
.into_iter()
.filter_map(|x| {
if x.node_id == self.node_id {
Some(x.table_name)
} else {
None
}
})
.collect();
Ok(table_names)
}

View File

@@ -104,10 +104,14 @@ impl RegionAliveKeepers {
Ok(())
}
pub async fn deregister_table(&self, table_ident: &TableIdent) {
if self.keepers.lock().await.remove(table_ident).is_some() {
pub async fn deregister_table(
&self,
table_ident: &TableIdent,
) -> Option<Arc<RegionAliveKeeper>> {
self.keepers.lock().await.remove(table_ident).map(|x| {
info!("Deregister RegionAliveKeeper for table {table_ident}");
}
x
})
}
pub async fn register_region(&self, region_ident: &RegionIdent) {
@@ -127,7 +131,7 @@ impl RegionAliveKeepers {
warn!("Alive keeper for region {region_ident} is not found!");
return;
};
keeper.deregister_region(region_ident.region_number).await
let _ = keeper.deregister_region(region_ident.region_number).await;
}
pub async fn start(&self) {
@@ -230,9 +234,11 @@ impl RegionAliveKeeper {
return;
}
let countdown_task_handles = self.countdown_task_handles.clone();
let countdown_task_handles = Arc::downgrade(&self.countdown_task_handles);
let on_task_finished = async move {
let _ = countdown_task_handles.lock().await.remove(&region);
if let Some(x) = countdown_task_handles.upgrade() {
x.lock().await.remove(&region);
} // Else the countdown task handles map could be dropped because the keeper is dropped.
};
let handle = Arc::new(CountdownTaskHandle::new(
self.table_engine.clone(),
@@ -259,19 +265,18 @@ impl RegionAliveKeeper {
}
}
async fn deregister_region(&self, region: RegionNumber) {
if self
.countdown_task_handles
async fn deregister_region(&self, region: RegionNumber) -> Option<Arc<CountdownTaskHandle>> {
self.countdown_task_handles
.lock()
.await
.remove(&region)
.is_some()
{
info!(
"Deregister alive countdown for region {region} in table {}",
self.table_ident
)
}
.map(|x| {
info!(
"Deregister alive countdown for region {region} in table {}",
self.table_ident
);
x
})
}
async fn start(&self) {
@@ -319,6 +324,8 @@ enum CountdownCommand {
struct CountdownTaskHandle {
tx: mpsc::Sender<CountdownCommand>,
handler: JoinHandle<()>,
table_ident: TableIdent,
region: RegionNumber,
}
impl CountdownTaskHandle {
@@ -341,7 +348,7 @@ impl CountdownTaskHandle {
let mut countdown_task = CountdownTask {
table_engine,
table_ident,
table_ident: table_ident.clone(),
region,
rx,
};
@@ -350,7 +357,12 @@ impl CountdownTaskHandle {
on_task_finished().await;
});
Self { tx, handler }
Self {
tx,
handler,
table_ident,
region,
}
}
async fn start(&self, heartbeat_interval_millis: u64) {
@@ -378,7 +390,11 @@ impl CountdownTaskHandle {
impl Drop for CountdownTaskHandle {
fn drop(&mut self) {
self.handler.abort()
debug!(
"Aborting region alive countdown task for region {} in table {}",
self.region, self.table_ident,
);
self.handler.abort();
}
}
@@ -640,7 +656,8 @@ mod test {
regions.sort();
assert_eq!(regions, vec![2, 3, 4]);
keepers.deregister_table(&table_ident).await;
let keeper = keepers.deregister_table(&table_ident).await.unwrap();
assert!(Arc::try_unwrap(keeper).is_ok(), "keeper is not dropped");
assert!(keepers.keepers.lock().await.is_empty());
}
@@ -676,7 +693,8 @@ mod test {
// assert keep_lived works if keeper is started
assert!(keeper.deadline(region).await.unwrap() <= ten_seconds_later());
keeper.deregister_region(region).await;
let handle = keeper.deregister_region(region).await.unwrap();
assert!(Arc::try_unwrap(handle).is_ok(), "handle is not dropped");
assert!(keeper.find_handle(&region).await.is_none());
}

View File

@@ -19,6 +19,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use common_catalog::consts::{INFORMATION_SCHEMA_NAME, SYSTEM_CATALOG_TABLE_NAME};
use common_telemetry::logging;
use snafu::ResultExt;
use table::metadata::TableId;
use table::{Table, TableRef};
@@ -91,12 +92,21 @@ impl SystemCatalog {
&self,
request: &DeregisterTableRequest,
table_id: TableId,
) -> CatalogResult<bool> {
) -> CatalogResult<()> {
self.information_schema
.system
.delete(build_table_deletion_request(request, table_id))
.await
.map(|x| x == 1)
.map(|x| {
if x != 1 {
let table = common_catalog::format_full_table_name(
&request.catalog,
&request.schema,
&request.table_name
);
logging::warn!("Failed to delete table record from information_schema, unexpected returned result: {x}, table: {table}");
}
})
.with_context(|_| error::DeregisterTableSnafu {
request: request.clone(),
})

View File

@@ -52,4 +52,4 @@ serde.workspace = true
toml = "0.5"
[build-dependencies]
build-data = "0.1.3"
build-data = "0.1.4"

View File

@@ -58,6 +58,16 @@ pub enum Format {
Parquet(ParquetFormat),
}
impl Format {
pub fn suffix(&self) -> &'static str {
match self {
Format::Csv(_) => ".csv",
Format::Json(_) => ".json",
Format::Parquet(_) => ".parquet",
}
}
}
impl TryFrom<&HashMap<String, String>> for Format {
type Error = error::Error;

View File

@@ -20,6 +20,7 @@ mod udf;
use std::sync::Arc;
use datatypes::prelude::ConcreteDataType;
pub use expr::build_filter_from_timestamp;
pub use self::accumulator::{Accumulator, AggregateFunctionCreator, AggregateFunctionCreatorRef};
pub use self::expr::{DfExpr, Expr};
@@ -28,7 +29,6 @@ pub use self::udf::ScalarUdf;
use crate::function::{ReturnTypeFunction, ScalarFunctionImplementation};
use crate::logical_plan::accumulator::*;
use crate::signature::{Signature, Volatility};
/// Creates a new UDF with a specific signature and specific return type.
/// This is a helper function to create a new UDF.
/// The function `create_udf` returns a subset of all possible `ScalarFunction`:

View File

@@ -12,7 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_time::range::TimestampRange;
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use datafusion_common::{Column, ScalarValue};
pub use datafusion_expr::expr::Expr as DfExpr;
use datafusion_expr::{and, binary_expr, Operator};
/// Central struct of query API.
/// Represent logical expressions such as `A + 1`, or `CAST(c1 AS int)`.
@@ -33,6 +38,54 @@ impl From<DfExpr> for Expr {
}
}
/// Builds an `Expr` that filters timestamp column from given timestamp range.
/// Returns [None] if time range is [None] or full time range.
pub fn build_filter_from_timestamp(
ts_col_name: &str,
time_range: Option<&TimestampRange>,
) -> Option<Expr> {
let Some(time_range) = time_range else { return None; };
let ts_col_expr = DfExpr::Column(Column {
relation: None,
name: ts_col_name.to_string(),
});
let df_expr = match (time_range.start(), time_range.end()) {
(None, None) => None,
(Some(start), None) => Some(binary_expr(
ts_col_expr,
Operator::GtEq,
timestamp_to_literal(start),
)),
(None, Some(end)) => Some(binary_expr(
ts_col_expr,
Operator::Lt,
timestamp_to_literal(end),
)),
(Some(start), Some(end)) => Some(and(
binary_expr(
ts_col_expr.clone(),
Operator::GtEq,
timestamp_to_literal(start),
),
binary_expr(ts_col_expr, Operator::Lt, timestamp_to_literal(end)),
)),
};
df_expr.map(Expr::from)
}
/// Converts a [Timestamp] to datafusion literal value.
fn timestamp_to_literal(timestamp: &Timestamp) -> DfExpr {
let scalar_value = match timestamp.unit() {
TimeUnit::Second => ScalarValue::TimestampSecond(Some(timestamp.value()), None),
TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(Some(timestamp.value()), None),
TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(Some(timestamp.value()), None),
TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(Some(timestamp.value()), None),
};
DfExpr::Literal(scalar_value)
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -22,7 +22,7 @@ use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef;
use datafusion::error::Result as DfResult;
pub use datafusion::execution::context::{SessionContext, TaskContext};
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::metrics::MetricsSet;
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
pub use datafusion::physical_plan::Partitioning;
use datafusion::physical_plan::Statistics;
use datatypes::schema::SchemaRef;
@@ -71,7 +71,6 @@ pub trait PhysicalPlan: Debug + Send + Sync {
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream>;
/// Returns metrics of this plan during execution
fn metrics(&self) -> Option<MetricsSet> {
None
}
@@ -82,11 +81,16 @@ pub trait PhysicalPlan: Debug + Send + Sync {
pub struct PhysicalPlanAdapter {
schema: SchemaRef,
df_plan: Arc<dyn DfPhysicalPlan>,
metric: ExecutionPlanMetricsSet,
}
impl PhysicalPlanAdapter {
pub fn new(schema: SchemaRef, df_plan: Arc<dyn DfPhysicalPlan>) -> Self {
Self { schema, df_plan }
Self {
schema,
df_plan,
metric: ExecutionPlanMetricsSet::new(),
}
}
pub fn df_plan(&self) -> Arc<dyn DfPhysicalPlan> {
@@ -133,18 +137,20 @@ impl PhysicalPlan for PhysicalPlanAdapter {
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let baseline_metric = BaselineMetrics::new(&self.metric, partition);
let df_plan = self.df_plan.clone();
let stream = df_plan
.execute(partition, context)
.context(error::GeneralDataFusionSnafu)?;
let adapter = RecordBatchStreamAdapter::try_new(stream)
let adapter = RecordBatchStreamAdapter::try_new_with_metrics(stream, baseline_metric)
.context(error::ConvertDfRecordBatchStreamSnafu)?;
Ok(Box::pin(adapter))
}
fn metrics(&self) -> Option<MetricsSet> {
self.df_plan.metrics()
Some(self.metric.clone_inner())
}
}
@@ -203,13 +209,13 @@ impl DfPhysicalPlan for DfPhysicalPlanAdapter {
Ok(Box::pin(DfRecordBatchStreamAdapter::new(stream)))
}
fn metrics(&self) -> Option<MetricsSet> {
self.0.metrics()
}
fn statistics(&self) -> Statistics {
Statistics::default()
}
fn metrics(&self) -> Option<MetricsSet> {
self.0.metrics()
}
}
#[cfg(test)]

View File

@@ -20,6 +20,7 @@ use std::task::{Context, Poll};
use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef;
use datafusion::error::Result as DfResult;
use datafusion::parquet::arrow::async_reader::{AsyncFileReader, ParquetRecordBatchStream};
use datafusion::physical_plan::metrics::BaselineMetrics;
use datafusion::physical_plan::RecordBatchStream as DfRecordBatchStream;
use datafusion_common::DataFusionError;
use datatypes::schema::{Schema, SchemaRef};
@@ -115,13 +116,31 @@ impl Stream for DfRecordBatchStreamAdapter {
pub struct RecordBatchStreamAdapter {
schema: SchemaRef,
stream: DfSendableRecordBatchStream,
metrics: Option<BaselineMetrics>,
}
impl RecordBatchStreamAdapter {
pub fn try_new(stream: DfSendableRecordBatchStream) -> Result<Self> {
let schema =
Arc::new(Schema::try_from(stream.schema()).context(error::SchemaConversionSnafu)?);
Ok(Self { schema, stream })
Ok(Self {
schema,
stream,
metrics: None,
})
}
pub fn try_new_with_metrics(
stream: DfSendableRecordBatchStream,
metrics: BaselineMetrics,
) -> Result<Self> {
let schema =
Arc::new(Schema::try_from(stream.schema()).context(error::SchemaConversionSnafu)?);
Ok(Self {
schema,
stream,
metrics: Some(metrics),
})
}
}
@@ -135,6 +154,12 @@ impl Stream for RecordBatchStreamAdapter {
type Item = Result<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let timer = self
.metrics
.as_ref()
.map(|m| m.elapsed_compute().clone())
.unwrap_or_default();
let _guard = timer.timer();
match Pin::new(&mut self.stream).poll_next(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Some(df_record_batch)) => {

View File

@@ -14,6 +14,8 @@
use std::fmt::{Debug, Display, Formatter};
use serde::{Deserialize, Serialize};
use crate::timestamp::TimeUnit;
use crate::timestamp_millis::TimestampMillis;
use crate::Timestamp;
@@ -23,7 +25,7 @@ use crate::Timestamp;
/// The range contains values that `value >= start` and `val < end`.
///
/// The range is empty iff `start == end == "the default value of T"`
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct GenericRange<T> {
start: Option<T>,
end: Option<T>,
@@ -522,4 +524,25 @@ mod tests {
);
assert!(range.is_empty());
}
#[test]
fn test_serialize_timestamp_range() {
macro_rules! test_serde_for_unit {
($($unit: expr),*) => {
$(
let original_range = TimestampRange::with_unit(0, 10, $unit).unwrap();
let string = serde_json::to_string(&original_range).unwrap();
let deserialized: TimestampRange = serde_json::from_str(&string).unwrap();
assert_eq!(original_range, deserialized);
)*
};
}
test_serde_for_unit!(
TimeUnit::Second,
TimeUnit::Millisecond,
TimeUnit::Microsecond,
TimeUnit::Nanosecond
);
}
}

View File

@@ -193,8 +193,8 @@ impl Default for WalConfig {
fn default() -> Self {
Self {
dir: None,
file_size: ReadableSize::gb(1), // log file size 1G
purge_threshold: ReadableSize::gb(50), // purge threshold 50G
file_size: ReadableSize::mb(256), // log file size 256MB
purge_threshold: ReadableSize::gb(4), // purge threshold 4GB
purge_interval: Duration::from_secs(600),
read_batch_size: 128,
sync_write: false,

View File

@@ -228,6 +228,22 @@ pub fn table_idents_to_full_name(
}
}
pub fn idents_to_full_database_name(
obj_name: &ObjectName,
query_ctx: &QueryContextRef,
) -> Result<(String, String)> {
match &obj_name.0[..] {
[database] => Ok((query_ctx.current_catalog(), database.value.clone())),
[catalog, database] => Ok((catalog.value.clone(), database.value.clone())),
_ => error::InvalidSqlSnafu {
msg: format!(
"expect database name to be <catalog>.<database>, <database>, found: {obj_name}",
),
}
.fail(),
}
}
#[async_trait]
impl SqlStatementExecutor for Instance {
async fn execute_sql(

View File

@@ -30,6 +30,7 @@ common-meta = { path = "../common/meta" }
common-recordbatch = { path = "../common/recordbatch" }
common-runtime = { path = "../common/runtime" }
common-telemetry = { path = "../common/telemetry" }
common-time = { path = "../common/time" }
datafusion.workspace = true
datafusion-common.workspace = true
datafusion-expr.workspace = true

View File

@@ -548,6 +548,13 @@ pub enum Error {
#[snafu(backtrace)]
source: query::error::Error,
},
#[snafu(display("Invalid COPY parameter, key: {}, value: {}", key, value))]
InvalidCopyParameter {
key: String,
value: String,
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -667,6 +674,7 @@ impl ErrorExt for Error {
| Error::BuildBackend { source } => source.status_code(),
Error::WriteParquet { source, .. } => source.status_code(),
Error::InvalidCopyParameter { .. } => StatusCode::InvalidArguments,
}
}

View File

@@ -627,12 +627,15 @@ pub fn check_permission(
Statement::DescribeTable(stmt) => {
validate_param(stmt.name(), query_ctx)?;
}
Statement::Copy(stmd) => match stmd {
Statement::Copy(sql::statements::copy::Copy::CopyTable(stmt)) => match stmt {
CopyTable::To(copy_table_to) => validate_param(&copy_table_to.table_name, query_ctx)?,
CopyTable::From(copy_table_from) => {
validate_param(&copy_table_from.table_name, query_ctx)?
}
},
Statement::Copy(sql::statements::copy::Copy::CopyDatabase(stmt)) => {
validate_param(&stmt.database_name, query_ctx)?
}
}
Ok(())
}

View File

@@ -12,32 +12,40 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod backup;
mod copy_table_from;
mod copy_table_to;
mod describe;
mod show;
mod tql;
use std::collections::HashMap;
use std::str::FromStr;
use catalog::CatalogManagerRef;
use common_error::prelude::BoxedError;
use common_query::Output;
use common_recordbatch::RecordBatches;
use datanode::instance::sql::table_idents_to_full_name;
use common_time::range::TimestampRange;
use common_time::Timestamp;
use datanode::instance::sql::{idents_to_full_database_name, table_idents_to_full_name};
use query::parser::QueryStatement;
use query::query_engine::SqlStatementExecutorRef;
use query::QueryEngineRef;
use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
use sql::statements::copy::{CopyTable, CopyTableArgument};
use sql::statements::copy::{CopyDatabaseArgument, CopyTable, CopyTableArgument};
use sql::statements::statement::Statement;
use table::engine::TableReference;
use table::requests::{CopyDirection, CopyTableRequest};
use table::requests::{CopyDatabaseRequest, CopyDirection, CopyTableRequest};
use table::TableRef;
use crate::error;
use crate::error::{
CatalogSnafu, ExecLogicalPlanSnafu, ExecuteStatementSnafu, ExternalSnafu, PlanStatementSnafu,
Result, SchemaNotFoundSnafu, TableNotFoundSnafu,
};
use crate::statement::backup::{COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY};
#[derive(Clone)]
pub struct StatementExecutor {
@@ -92,14 +100,23 @@ impl StatementExecutor {
Statement::ShowTables(stmt) => self.show_tables(stmt, query_ctx).await,
Statement::Copy(stmt) => {
Statement::Copy(sql::statements::copy::Copy::CopyTable(stmt)) => {
let req = to_copy_table_request(stmt, query_ctx)?;
match req.direction {
CopyDirection::Export => self.copy_table_to(req).await,
CopyDirection::Import => self.copy_table_from(req).await,
CopyDirection::Export => {
self.copy_table_to(req).await.map(Output::AffectedRows)
}
CopyDirection::Import => {
self.copy_table_from(req).await.map(Output::AffectedRows)
}
}
}
Statement::Copy(sql::statements::copy::Copy::CopyDatabase(arg)) => {
self.copy_database(to_copy_database_request(arg, &query_ctx)?)
.await
}
Statement::CreateDatabase(_)
| Statement::CreateTable(_)
| Statement::CreateExternalTable(_)
@@ -191,5 +208,47 @@ fn to_copy_table_request(stmt: CopyTable, query_ctx: QueryContextRef) -> Result<
connection,
pattern,
direction,
// we copy the whole table by default.
timestamp_range: None,
})
}
/// Converts [CopyDatabaseArgument] to [CopyDatabaseRequest].
/// This function extracts the necessary info including catalog/database name, time range, etc.
fn to_copy_database_request(
arg: CopyDatabaseArgument,
query_ctx: &QueryContextRef,
) -> Result<CopyDatabaseRequest> {
let (catalog_name, database_name) = idents_to_full_database_name(&arg.database_name, query_ctx)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let start_timestamp = extract_timestamp(&arg.with, COPY_DATABASE_TIME_START_KEY)?;
let end_timestamp = extract_timestamp(&arg.with, COPY_DATABASE_TIME_END_KEY)?;
let time_range = match (start_timestamp, end_timestamp) {
(Some(start), Some(end)) => TimestampRange::new(start, end),
(Some(start), None) => Some(TimestampRange::from_start(start)),
(None, Some(end)) => Some(TimestampRange::until_end(end, false)), // exclusive end
(None, None) => None,
};
Ok(CopyDatabaseRequest {
catalog_name,
schema_name: database_name,
location: arg.location,
with: arg.with,
connection: arg.connection,
time_range,
})
}
/// Extracts timestamp from a [HashMap<String, String>] with given key.
fn extract_timestamp(map: &HashMap<String, String>, key: &str) -> Result<Option<Timestamp>> {
map.get(key)
.map(|v| {
Timestamp::from_str(v)
.map_err(|_| error::InvalidCopyParameterSnafu { key, value: v }.build())
})
.transpose()
}

View File

@@ -0,0 +1,97 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_datasource::file_format::Format;
use common_query::Output;
use common_telemetry::info;
use snafu::{ensure, OptionExt, ResultExt};
use table::requests::{CopyDatabaseRequest, CopyDirection, CopyTableRequest};
use crate::error;
use crate::error::{
CatalogNotFoundSnafu, CatalogSnafu, InvalidCopyParameterSnafu, SchemaNotFoundSnafu,
};
use crate::statement::StatementExecutor;
pub(crate) const COPY_DATABASE_TIME_START_KEY: &str = "start_time";
pub(crate) const COPY_DATABASE_TIME_END_KEY: &str = "end_time";
impl StatementExecutor {
pub(crate) async fn copy_database(&self, req: CopyDatabaseRequest) -> error::Result<Output> {
// location must end with / so that every table is exported to a file.
ensure!(
req.location.ends_with('/'),
InvalidCopyParameterSnafu {
key: "location",
value: req.location,
}
);
info!(
"Copy database {}.{}, dir: {},. time: {:?}",
req.catalog_name, req.schema_name, req.location, req.time_range
);
let schema = self
.catalog_manager
.catalog(&req.catalog_name)
.await
.context(CatalogSnafu)?
.context(CatalogNotFoundSnafu {
catalog_name: &req.catalog_name,
})?
.schema(&req.schema_name)
.await
.context(CatalogSnafu)?
.context(SchemaNotFoundSnafu {
schema_info: &req.schema_name,
})?;
let suffix = Format::try_from(&req.with)
.context(error::ParseFileFormatSnafu)?
.suffix();
let table_names = schema.table_names().await.context(CatalogSnafu)?;
let mut exported_rows = 0;
for table_name in table_names {
// TODO(hl): remove this hardcode once we've removed numbers table.
if table_name == "numbers" {
continue;
}
let mut table_file = req.location.clone();
table_file.push_str(&table_name);
table_file.push_str(suffix);
info!(
"Copy table: {}.{}.{} to {}",
req.catalog_name, req.schema_name, table_name, table_file
);
let exported = self
.copy_table_to(CopyTableRequest {
catalog_name: req.catalog_name.clone(),
schema_name: req.schema_name.clone(),
table_name,
location: table_file,
with: req.with.clone(),
connection: req.connection.clone(),
pattern: None,
direction: CopyDirection::Export,
timestamp_range: req.time_range,
})
.await?;
exported_rows += exported;
}
Ok(Output::AffectedRows(exported_rows))
}
}

View File

@@ -24,7 +24,6 @@ use common_datasource::file_format::{FileFormat, Format};
use common_datasource::lister::{Lister, Source};
use common_datasource::object_store::{build_backend, parse_url};
use common_datasource::util::find_dir_and_filename;
use common_query::Output;
use common_recordbatch::adapter::ParquetRecordBatchStreamAdapter;
use common_recordbatch::DfSendableRecordBatchStream;
use datafusion::datasource::listing::PartitionedFile;
@@ -205,7 +204,7 @@ impl StatementExecutor {
}
}
pub async fn copy_table_from(&self, req: CopyTableRequest) -> Result<Output> {
pub async fn copy_table_from(&self, req: CopyTableRequest) -> Result<usize> {
let table_ref = TableReference {
catalog: &req.catalog_name,
schema: &req.schema_name,
@@ -313,7 +312,7 @@ impl StatementExecutor {
}
}
Ok(Output::AffectedRows(rows_inserted))
Ok(rows_inserted)
}
}

View File

@@ -18,7 +18,6 @@ use common_datasource::file_format::json::stream_to_json;
use common_datasource::file_format::Format;
use common_datasource::object_store::{build_backend, parse_url};
use common_query::physical_plan::SessionContext;
use common_query::Output;
use common_recordbatch::adapter::DfRecordBatchStreamAdapter;
use common_recordbatch::SendableRecordBatchStream;
use object_store::ObjectStore;
@@ -72,7 +71,7 @@ impl StatementExecutor {
}
}
pub(crate) async fn copy_table_to(&self, req: CopyTableRequest) -> Result<Output> {
pub(crate) async fn copy_table_to(&self, req: CopyTableRequest) -> Result<usize> {
let table_ref = TableReference {
catalog: &req.catalog_name,
schema: &req.schema_name,
@@ -82,12 +81,25 @@ impl StatementExecutor {
let format = Format::try_from(&req.with).context(error::ParseFileFormatSnafu)?;
let stream = table
.scan(None, &[], None)
.await
.with_context(|_| error::CopyTableSnafu {
table_name: table_ref.to_string(),
})?;
let filters = table
.schema()
.timestamp_column()
.and_then(|c| {
common_query::logical_plan::build_filter_from_timestamp(
&c.name,
req.timestamp_range.as_ref(),
)
})
.into_iter()
.collect::<Vec<_>>();
let stream =
table
.scan(None, &filters, None)
.await
.with_context(|_| error::CopyTableSnafu {
table_name: table_ref.to_string(),
})?;
let stream = stream
.execute(0, SessionContext::default().task_ctx())
@@ -101,6 +113,6 @@ impl StatementExecutor {
.stream_to_file(stream, &format, object_store, &path)
.await?;
Ok(Output::AffectedRows(rows_copied))
Ok(rows_copied)
}
}

View File

@@ -756,7 +756,7 @@ mod tests {
let tc = new_client("test_batch_put").await;
let mut req = BatchPutRequest::new();
for i in 0..256 {
for i in 0..275 {
req = req.add_kv(
tc.key(&format!("key-{}", i)),
format!("value-{}", i).into_bytes(),
@@ -769,7 +769,7 @@ mod tests {
let req = RangeRequest::new().with_prefix(tc.key("key-"));
let res = tc.client.range(req).await;
let kvs = res.unwrap().take_kvs();
assert_eq!(256, kvs.len());
assert_eq!(275, kvs.len());
}
#[tokio::test]

View File

@@ -32,6 +32,10 @@ use crate::error::Result;
use crate::metrics::METRIC_META_KV_REQUEST;
use crate::service::store::kv::{KvStore, KvStoreRef};
// Maximum number of operations permitted in a transaction.
// The etcd default configuration's `--max-txn-ops` is 128.
//
// For more detail, see: https://etcd.io/docs/v3.5/op-guide/configuration/
const MAX_TXN_SIZE: usize = 128;
pub struct EtcdStore {
@@ -55,7 +59,7 @@ impl EtcdStore {
Ok(Arc::new(Self { client }))
}
async fn do_multi_txn(&self, mut txn_ops: Vec<TxnOp>) -> Result<Vec<TxnResponse>> {
async fn do_multi_txn(&self, txn_ops: Vec<TxnOp>) -> Result<Vec<TxnResponse>> {
if txn_ops.len() < MAX_TXN_SIZE {
// fast path
let txn = Txn::new().and_then(txn_ops);
@@ -68,36 +72,17 @@ impl EtcdStore {
return Ok(vec![txn_res]);
}
let mut txns = vec![];
loop {
if txn_ops.is_empty() {
break;
}
let txns = txn_ops
.chunks(MAX_TXN_SIZE)
.map(|part| async move {
let txn = Txn::new().and_then(part);
self.client.kv_client().txn(txn).await
})
.collect::<Vec<_>>();
if txn_ops.len() < MAX_TXN_SIZE {
let txn = Txn::new().and_then(txn_ops);
txns.push(txn);
break;
}
let part = txn_ops.drain(..MAX_TXN_SIZE).collect::<Vec<_>>();
let txn = Txn::new().and_then(part);
txns.push(txn);
}
let mut txn_responses = Vec::with_capacity(txns.len());
// Considering the pressure on etcd, it would be more appropriate to execute txn in
// a serial manner.
for txn in txns {
let txn_res = self
.client
.kv_client()
.txn(txn)
.await
.context(error::EtcdFailedSnafu)?;
txn_responses.push(txn_res);
}
Ok(txn_responses)
futures::future::try_join_all(txns)
.await
.context(error::EtcdFailedSnafu)
}
}
@@ -241,7 +226,7 @@ impl KvStore for EtcdStore {
prev_kvs.push(KvPair::from_etcd_kv(prev_kv));
}
}
_ => unreachable!(), // never get here
_ => unreachable!(),
}
}
}
@@ -283,7 +268,7 @@ impl KvStore for EtcdStore {
prev_kvs.push(KvPair::from_etcd_kv(kv));
});
}
_ => unreachable!(), // never get here
_ => unreachable!(),
}
}
}
@@ -343,7 +328,7 @@ impl KvStore for EtcdStore {
let prev_kv = match op_res {
TxnOpResponse::Put(res) => res.prev_key().map(KvPair::from_etcd_kv),
TxnOpResponse::Get(res) => res.kvs().first().map(KvPair::from_etcd_kv),
_ => unreachable!(), // never get here
_ => unreachable!(),
};
let header = Some(ResponseHeader::success(cluster_id));

View File

@@ -541,7 +541,7 @@ impl PromPlanner {
result_set.insert(matcher.value.clone());
} else {
return Err(ColumnNotFoundSnafu {
col: self.ctx.table_name.clone().unwrap(),
col: matcher.value.clone(),
}
.build());
}
@@ -550,8 +550,8 @@ impl PromPlanner {
if col_set.contains(&matcher.value) {
reverse_set.insert(matcher.value.clone());
} else {
return Err(ValueNotFoundSnafu {
table: self.ctx.table_name.clone().unwrap(),
return Err(ColumnNotFoundSnafu {
col: matcher.value.clone(),
}
.build());
}

View File

@@ -14,7 +14,7 @@
use std::sync::Arc;
use datafusion_expr::{LogicalPlan, UserDefinedLogicalNode};
use datafusion_expr::{Expr, LogicalPlan, UserDefinedLogicalNode};
use promql::extension_plan::{
EmptyMetric, InstantManipulate, RangeManipulate, SeriesDivide, SeriesNormalize,
};
@@ -37,7 +37,8 @@ impl Categorizer {
pub fn check_plan(plan: &LogicalPlan) -> Commutativity {
match plan {
LogicalPlan::Projection(_) => Commutativity::Unimplemented,
LogicalPlan::Filter(_) => Commutativity::Commutative,
// TODO(ruihang): Change this to Commutative once Like is supported in substrait
LogicalPlan::Filter(filter) => Self::check_expr(&filter.predicate),
LogicalPlan::Window(_) => Commutativity::Unimplemented,
LogicalPlan::Aggregate(_) => {
// check all children exprs and uses the strictest level
@@ -85,6 +86,50 @@ impl Categorizer {
_ => Commutativity::Unsupported,
}
}
pub fn check_expr(expr: &Expr) -> Commutativity {
match expr {
Expr::Alias(_, _)
| Expr::Column(_)
| Expr::ScalarVariable(_, _)
| Expr::Literal(_)
| Expr::BinaryExpr(_)
| Expr::Not(_)
| Expr::IsNotNull(_)
| Expr::IsNull(_)
| Expr::IsTrue(_)
| Expr::IsFalse(_)
| Expr::IsNotTrue(_)
| Expr::IsNotFalse(_)
| Expr::Negative(_)
| Expr::Between(_)
| Expr::Sort(_)
| Expr::Exists(_) => Commutativity::Commutative,
Expr::Like(_)
| Expr::ILike(_)
| Expr::SimilarTo(_)
| Expr::IsUnknown(_)
| Expr::IsNotUnknown(_)
| Expr::GetIndexedField(_)
| Expr::Case(_)
| Expr::Cast(_)
| Expr::TryCast(_)
| Expr::ScalarFunction(_)
| Expr::ScalarUDF(_)
| Expr::AggregateFunction(_)
| Expr::WindowFunction(_)
| Expr::AggregateUDF(_)
| Expr::InList(_)
| Expr::InSubquery(_)
| Expr::ScalarSubquery(_)
| Expr::Wildcard => Commutativity::Unimplemented,
Expr::QualifiedWildcard { .. }
| Expr::GroupingSet(_)
| Expr::Placeholder(_)
| Expr::OuterReferenceColumn(_, _) => Commutativity::Unimplemented,
}
}
}
pub type Transformer = Arc<dyn Fn(&LogicalPlan) -> Option<LogicalPlan>>;

View File

@@ -103,3 +103,6 @@ table = { path = "../table" }
tokio-postgres = "0.7"
tokio-postgres-rustls = "0.10"
tokio-test = "0.4"
[build-dependencies]
build-data = "0.1.4"

View File

@@ -13,6 +13,12 @@
// limitations under the License.
fn main() {
build_data::set_RUSTC_VERSION();
build_data::set_BUILD_HOSTNAME();
build_data::set_GIT_BRANCH();
build_data::set_GIT_COMMIT();
build_data::set_SOURCE_TIMESTAMP();
#[cfg(feature = "dashboard")]
fetch_dashboard_assets();
}

View File

@@ -512,6 +512,8 @@ impl HttpServer {
routing::get(handler::health).post(handler::health),
);
router = router.route("/status", routing::get(handler::status));
#[cfg(feature = "dashboard")]
{
if !self.options.disable_dashboard {

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use std::collections::HashMap;
use std::env;
use std::time::Instant;
use aide::transform::TransformOperation;
@@ -158,3 +159,26 @@ pub struct HealthResponse {}
pub async fn health(Query(_params): Query<HealthQuery>) -> Json<HealthResponse> {
Json(HealthResponse {})
}
#[derive(Debug, Serialize, Deserialize, JsonSchema, PartialEq, Eq)]
pub struct StatusResponse<'a> {
pub source_time: &'a str,
pub commit: &'a str,
pub branch: &'a str,
pub rustc_version: &'a str,
pub hostname: &'a str,
pub version: &'a str,
}
/// Handler to expose information info about runtime, build, etc.
#[axum_macros::debug_handler]
pub async fn status() -> Json<StatusResponse<'static>> {
Json(StatusResponse {
source_time: env!("SOURCE_TIMESTAMP"),
commit: env!("GIT_COMMIT"),
branch: env!("GIT_BRANCH"),
rustc_version: env!("RUSTC_VERSION"),
hostname: env!("BUILD_HOSTNAME"),
version: env!("CARGO_PKG_VERSION"),
})
}

View File

@@ -365,3 +365,18 @@ async fn test_health() {
expected_json_str
);
}
#[tokio::test]
async fn test_status() {
let expected_json = http_handler::StatusResponse {
source_time: env!("SOURCE_TIMESTAMP"),
commit: env!("GIT_COMMIT"),
branch: env!("GIT_BRANCH"),
rustc_version: env!("RUSTC_VERSION"),
hostname: env!("BUILD_HOSTNAME"),
version: env!("CARGO_PKG_VERSION"),
};
let Json(json) = http_handler::status().await;
assert_eq!(json, expected_json);
}

View File

@@ -13,6 +13,7 @@
// limitations under the License.
#![feature(box_patterns)]
#![feature(assert_matches)]
#![feature(let_chains)]
pub mod ast;
pub mod dialect;

View File

@@ -12,22 +12,60 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use snafu::ResultExt;
use sqlparser::ast::ObjectName;
use sqlparser::keywords::Keyword;
use sqlparser::tokenizer::Token::Word;
use crate::error::{self, Result};
use crate::parser::ParserContext;
use crate::statements::copy::{CopyTable, CopyTableArgument};
use crate::statements::copy::{CopyDatabaseArgument, CopyTable, CopyTableArgument};
use crate::statements::statement::Statement;
use crate::util::parse_option_string;
pub type With = HashMap<String, String>;
pub type Connection = HashMap<String, String>;
// COPY tbl TO 'output.parquet';
impl<'a> ParserContext<'a> {
pub(crate) fn parse_copy(&mut self) -> Result<Statement> {
self.parser.next_token();
let copy_table = self.parse_copy_table()?;
Ok(Statement::Copy(copy_table))
let next = self.parser.peek_token();
let copy = if let Word(word) = next.token && word.keyword == Keyword::DATABASE {
self.parser.next_token();
let copy_database = self.parser_copy_database()?;
crate::statements::copy::Copy::CopyDatabase(copy_database)
} else {
let copy_table = self.parse_copy_table()?;
crate::statements::copy::Copy::CopyTable(copy_table)
};
Ok(Statement::Copy(copy))
}
fn parser_copy_database(&mut self) -> Result<CopyDatabaseArgument> {
let database_name =
self.parser
.parse_object_name()
.with_context(|_| error::UnexpectedSnafu {
sql: self.sql,
expected: "a database name",
actual: self.peek_token_as_string(),
})?;
self.parser
.expect_keyword(Keyword::TO)
.context(error::SyntaxSnafu { sql: self.sql })?;
let (with, connection, location) = self.parse_copy_to()?;
Ok(CopyDatabaseArgument {
database_name,
with,
connection,
location,
})
}
fn parse_copy_table(&mut self) -> Result<CopyTable> {
@@ -41,7 +79,13 @@ impl<'a> ParserContext<'a> {
})?;
if self.parser.parse_keyword(Keyword::TO) {
Ok(CopyTable::To(self.parse_copy_table_to(table_name)?))
let (with, connection, location) = self.parse_copy_to()?;
Ok(CopyTable::To(CopyTableArgument {
table_name,
with,
connection,
location,
}))
} else {
self.parser
.expect_keyword(Keyword::FROM)
@@ -91,7 +135,7 @@ impl<'a> ParserContext<'a> {
})
}
fn parse_copy_table_to(&mut self, table_name: ObjectName) -> Result<CopyTableArgument> {
fn parse_copy_to(&mut self) -> Result<(With, Connection, String)> {
let location =
self.parser
.parse_literal_string()
@@ -125,12 +169,7 @@ impl<'a> ParserContext<'a> {
})
.collect();
Ok(CopyTableArgument {
table_name,
with,
connection,
location,
})
Ok((with, connection, location))
}
}
@@ -139,8 +178,11 @@ mod tests {
use std::assert_matches::assert_matches;
use std::collections::HashMap;
use sqlparser::ast::Ident;
use super::*;
use crate::dialect::GreptimeDbDialect;
use crate::statements::statement::Statement::Copy;
#[test]
fn test_parse_copy_table() {
@@ -155,7 +197,8 @@ mod tests {
let statement = result.remove(0);
assert_matches!(statement, Statement::Copy { .. });
match statement {
Statement::Copy(CopyTable::To(copy_table)) => {
Copy(copy) => {
let crate::statements::copy::Copy::CopyTable(CopyTable::To(copy_table)) = copy else { unreachable!() };
let (catalog, schema, table) =
if let [catalog, schema, table] = &copy_table.table_name.0[..] {
(
@@ -198,7 +241,9 @@ mod tests {
let statement = result.remove(0);
assert_matches!(statement, Statement::Copy { .. });
match statement {
Statement::Copy(CopyTable::From(copy_table)) => {
Statement::Copy(crate::statements::copy::Copy::CopyTable(CopyTable::From(
copy_table,
))) => {
let (catalog, schema, table) =
if let [catalog, schema, table] = &copy_table.table_name.0[..] {
(
@@ -254,7 +299,9 @@ mod tests {
let statement = result.remove(0);
assert_matches!(statement, Statement::Copy { .. });
match statement {
Statement::Copy(CopyTable::From(copy_table)) => {
Statement::Copy(crate::statements::copy::Copy::CopyTable(CopyTable::From(
copy_table,
))) => {
if let Some(expected_pattern) = test.expected_pattern {
assert_eq!(copy_table.pattern().unwrap(), expected_pattern);
}
@@ -295,11 +342,44 @@ mod tests {
let statement = result.remove(0);
assert_matches!(statement, Statement::Copy { .. });
match statement {
Statement::Copy(CopyTable::To(copy_table)) => {
Statement::Copy(crate::statements::copy::Copy::CopyTable(CopyTable::To(
copy_table,
))) => {
assert_eq!(copy_table.connection.clone(), test.expected_connection);
}
_ => unreachable!(),
}
}
}
#[test]
fn test_copy_database_to() {
let sql = "COPY DATABASE catalog0.schema0 TO 'tbl_file.parquet' WITH (FORMAT = 'parquet') CONNECTION (FOO='Bar', ONE='two')";
let stmt = ParserContext::create_with_dialect(sql, &GreptimeDbDialect {})
.unwrap()
.pop()
.unwrap();
let Statement::Copy(crate::statements::copy::Copy::CopyDatabase(stmt)) = stmt else { unreachable!() };
assert_eq!(
ObjectName(vec![Ident::new("catalog0"), Ident::new("schema0")]),
stmt.database_name
);
assert_eq!(
[("format".to_string(), "parquet".to_string())]
.into_iter()
.collect::<HashMap<_, _>>(),
stmt.with
);
assert_eq!(
[
("foo".to_string(), "Bar".to_string()),
("one".to_string(), "two".to_string())
]
.into_iter()
.collect::<HashMap<_, _>>(),
stmt.connection
);
}
}

View File

@@ -15,12 +15,27 @@
use std::collections::HashMap;
use sqlparser::ast::ObjectName;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Copy {
CopyTable(CopyTable),
CopyDatabase(CopyDatabaseArgument),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CopyTable {
To(CopyTableArgument),
From(CopyTableArgument),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CopyDatabaseArgument {
pub database_name: ObjectName,
pub with: HashMap<String, String>,
pub connection: HashMap<String, String>,
pub location: String,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CopyTableArgument {
pub table_name: ObjectName,

View File

@@ -17,7 +17,6 @@ use sqlparser::ast::Statement as SpStatement;
use crate::error::{ConvertToDfStatementSnafu, Error};
use crate::statements::alter::AlterTable;
use crate::statements::copy::CopyTable;
use crate::statements::create::{CreateDatabase, CreateExternalTable, CreateTable};
use crate::statements::delete::Delete;
use crate::statements::describe::DescribeTable;
@@ -60,7 +59,7 @@ pub enum Statement {
Explain(Explain),
Use(String),
// COPY
Copy(CopyTable),
Copy(crate::statements::copy::Copy),
Tql(Tql),
}

View File

@@ -312,7 +312,7 @@ where
#[cfg(test)]
mod tests {
use std::sync::atomic::AtomicI32;
use std::sync::atomic::{AtomicBool, AtomicI32};
use std::time::Duration;
use store_api::storage::RegionId;
@@ -564,7 +564,9 @@ mod tests {
let task_scheduled = Arc::new(AtomicI32::new(0));
let task_scheduled_cloned = task_scheduled.clone();
common_runtime::spawn_write(async move {
let scheduling = Arc::new(AtomicBool::new(true));
let scheduling_clone = scheduling.clone();
let handle = common_runtime::spawn_write(async move {
for i in 0..10000 {
if let Ok(res) = scheduler_cloned.schedule(MockRequest {
region_id: i as RegionId,
@@ -573,12 +575,19 @@ mod tests {
task_scheduled_cloned.fetch_add(1, Ordering::Relaxed);
}
}
if !scheduling_clone.load(Ordering::Relaxed) {
break;
}
}
});
tokio::time::sleep(Duration::from_millis(1)).await;
scheduler.stop(true).await.unwrap();
scheduling.store(false, Ordering::Relaxed);
let finished = finished.load(Ordering::Relaxed);
handle.await.unwrap();
assert_eq!(finished, task_scheduled.load(Ordering::Relaxed));
}
}

View File

@@ -28,7 +28,8 @@ use table::engine::{EngineContext, TableEngineProcedureRef, TableReference};
use table::requests::DropTableRequest;
use crate::error::{
AccessCatalogSnafu, DeserializeProcedureSnafu, SerializeProcedureSnafu, TableNotFoundSnafu,
AccessCatalogSnafu, DeregisterTableSnafu, DeserializeProcedureSnafu, SerializeProcedureSnafu,
TableNotFoundSnafu,
};
/// Procedure to drop a table.
@@ -158,10 +159,17 @@ impl DropTableProcedure {
schema: self.data.request.schema_name.clone(),
table_name: self.data.request.table_name.clone(),
};
self.catalog_manager
if !self
.catalog_manager
.deregister_table(deregister_table_req)
.await
.context(AccessCatalogSnafu)?;
.context(AccessCatalogSnafu)?
{
return DeregisterTableSnafu {
name: request.table_ref().to_string(),
}
.fail()?;
}
}
self.data.state = DropTableState::EngineDropTable;

View File

@@ -55,6 +55,9 @@ pub enum Error {
#[snafu(display("Table already exists: {}", name))]
TableExists { name: String },
#[snafu(display("Failed to deregister table: {}", name))]
DeregisterTable { name: String },
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -64,7 +67,9 @@ impl ErrorExt for Error {
use Error::*;
match self {
SerializeProcedure { .. } | DeserializeProcedure { .. } => StatusCode::Internal,
DeregisterTable { .. } | SerializeProcedure { .. } | DeserializeProcedure { .. } => {
StatusCode::Internal
}
InvalidRawSchema { source, .. } => source.status_code(),
AccessCatalog { source, .. } => source.status_code(),
CatalogNotFound { .. } | SchemaNotFound { .. } | TableExists { .. } => {

View File

@@ -162,15 +162,14 @@ impl TableMeta {
}
pub fn field_column_names(&self) -> impl Iterator<Item = &String> {
let columns_schemas = &self.schema.column_schemas();
self.value_indices.iter().filter_map(|idx| {
let column = &columns_schemas[*idx];
if column.is_time_index() {
None
} else {
Some(&column.name)
}
})
// `value_indices` is wrong under distributed mode. Use the logic copied from DESC TABLE
let columns_schemas = self.schema.column_schemas();
let primary_key_indices = &self.primary_key_indices;
columns_schemas
.iter()
.enumerate()
.filter(|(i, cs)| !primary_key_indices.contains(i) && !cs.is_time_index())
.map(|(_, cs)| &cs.name)
}
/// Returns the new [TableMetaBuilder] after applying given `alter_kind`.

View File

@@ -20,6 +20,7 @@ use std::time::Duration;
use common_base::readable_size::ReadableSize;
use common_query::AddColumnLocation;
use common_time::range::TimestampRange;
use datatypes::prelude::VectorRef;
use datatypes::schema::{ColumnSchema, RawSchema};
use serde::{Deserialize, Serialize};
@@ -268,6 +269,7 @@ pub struct CopyTableRequest {
pub connection: HashMap<String, String>,
pub pattern: Option<String>,
pub direction: CopyDirection,
pub timestamp_range: Option<TimestampRange>,
}
#[derive(Debug, Clone, Default)]
@@ -293,6 +295,16 @@ macro_rules! meter_insert_request {
};
}
#[derive(Debug, Clone, Default, Deserialize, Serialize)]
pub struct CopyDatabaseRequest {
pub catalog_name: String,
pub schema_name: String,
pub location: String,
pub with: HashMap<String, String>,
pub connection: HashMap<String, String>,
pub time_range: Option<TimestampRange>,
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -14,15 +14,20 @@
use std::any::Any;
use std::fmt::{Debug, Formatter};
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use common_query::error as query_error;
use common_query::error::Result as QueryResult;
use common_query::physical_plan::{Partitioning, PhysicalPlan, PhysicalPlanRef};
use common_recordbatch::SendableRecordBatchStream;
use common_recordbatch::error::Result as RecordBatchResult;
use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream};
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use datafusion_physical_expr::PhysicalSortExpr;
use datatypes::schema::SchemaRef;
use futures::{Stream, StreamExt};
use snafu::OptionExt;
/// Adapt greptime's [SendableRecordBatchStream] to DataFusion's [PhysicalPlan].
@@ -30,6 +35,7 @@ pub struct StreamScanAdapter {
stream: Mutex<Option<SendableRecordBatchStream>>,
schema: SchemaRef,
output_ordering: Option<Vec<PhysicalSortExpr>>,
metric: ExecutionPlanMetricsSet,
}
impl Debug for StreamScanAdapter {
@@ -49,6 +55,7 @@ impl StreamScanAdapter {
stream: Mutex::new(Some(stream)),
schema,
output_ordering: None,
metric: ExecutionPlanMetricsSet::new(),
}
}
@@ -85,11 +92,46 @@ impl PhysicalPlan for StreamScanAdapter {
fn execute(
&self,
_partition: usize,
partition: usize,
_context: Arc<TaskContext>,
) -> QueryResult<SendableRecordBatchStream> {
let mut stream = self.stream.lock().unwrap();
stream.take().context(query_error::ExecuteRepeatedlySnafu)
let stream = stream.take().context(query_error::ExecuteRepeatedlySnafu)?;
let baseline_metric = BaselineMetrics::new(&self.metric, partition);
Ok(Box::pin(StreamWithMetricWrapper {
stream,
metric: baseline_metric,
}))
}
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metric.clone_inner())
}
}
pub struct StreamWithMetricWrapper {
stream: SendableRecordBatchStream,
metric: BaselineMetrics,
}
impl Stream for StreamWithMetricWrapper {
type Item = RecordBatchResult<RecordBatch>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
let _timer = this.metric.elapsed_compute().timer();
let poll = this.stream.poll_next_unpin(cx);
if let Poll::Ready(Option::Some(Result::Ok(record_batch))) = &poll {
this.metric.record_output(record_batch.num_rows());
}
poll
}
}
impl RecordBatchStream for StreamWithMetricWrapper {
fn schema(&self) -> SchemaRef {
self.stream.schema()
}
}

View File

@@ -90,11 +90,23 @@ SELECT i1.i,i2.i FROM integers i1 LEFT OUTER JOIN integers i2 ON 1=1 WHERE i1.i=
SELECT * FROM integers WHERE i IN ((SELECT i FROM integers)) ORDER BY i;
Error: 3001(EngineExecuteQuery), No field named __correlated_sq_1.i. Valid fields are integers.i, integers.j.
+---+---+
| i | j |
+---+---+
| 1 | 1 |
| 2 | 2 |
| 3 | 3 |
+---+---+
SELECT * FROM integers WHERE i NOT IN ((SELECT i FROM integers WHERE i=1)) ORDER BY i;
Error: 3001(EngineExecuteQuery), No field named __correlated_sq_2.i. Valid fields are integers.i, integers.j.
+---+---+
| i | j |
+---+---+
| 2 | 2 |
| 3 | 3 |
| | 4 |
+---+---+
SELECT * FROM integers WHERE i IN ((SELECT i FROM integers)) AND i<3 ORDER BY i;

View File

@@ -31,3 +31,41 @@ DROP TABLE test;
Affected Rows: 1
CREATE TABLE host_load1 (
ts TIMESTAMP(3) NOT NULL,
collector STRING NULL,
host STRING NULL,
val DOUBLE NULL,
TIME INDEX (ts),
PRIMARY KEY (collector, host)
);
Affected Rows: 0
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (peer-.*) REDACTED
TQL EXPLAIN host_load1{__field__="val"};
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | PromInstantManipulate: range=[0..0], lookback=[300000], interval=[300000], time index=[ts] |
| | PromSeriesNormalize: offset=[0], time index=[ts], filter NaN: [false] |
| | PromSeriesDivide: tags=["collector", "host"] |
| | Sort: host_load1.collector DESC NULLS LAST, host_load1.host DESC NULLS LAST, host_load1.ts DESC NULLS LAST |
| | Projection: host_load1.val, host_load1.collector, host_load1.host, host_load1.ts |
| | MergeScan [is_placeholder=false] |
| | TableScan: host_load1 projection=[ts, collector, host, val], partial_filters=[ts >= TimestampMillisecond(-300000, None), ts <= TimestampMillisecond(300000, None)] |
| physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[ts] |
| | PromSeriesNormalizeExec: offset=[0], time index=[ts], filter NaN: [false] |
| | PromSeriesDivideExec: tags=["collector", "host"] |
| | RepartitionExec: partitioning=REDACTED
| | ProjectionExec: expr=[val@3 as val, collector@1 as collector, host@2 as host, ts@0 as ts] |
| | MergeScanExec: peers=[REDACTED
| | |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
DROP TABLE host_load1;
Affected Rows: 1

View File

@@ -9,3 +9,18 @@ INSERT INTO test VALUES (1, 1, "a"), (1, 1, "b"), (2, 2, "a");
TQL EXPLAIN (0, 10, '5s') test;
DROP TABLE test;
CREATE TABLE host_load1 (
ts TIMESTAMP(3) NOT NULL,
collector STRING NULL,
host STRING NULL,
val DOUBLE NULL,
TIME INDEX (ts),
PRIMARY KEY (collector, host)
);
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (peer-.*) REDACTED
TQL EXPLAIN host_load1{__field__="val"};
DROP TABLE host_load1;

View File

@@ -0,0 +1,29 @@
CREATE TABLE host (
ts TIMESTAMP(3) TIME INDEX,
host STRING PRIMARY KEY,
val DOUBLE,
);
Affected Rows: 0
INSERT INTO TABLE host VALUES
(0, 'a+b', 1.0),
(1, 'b+c', 2.0),
(2, 'a', 3.0),
(3, 'c', 4.0);
Affected Rows: 4
SELECT * FROM host WHERE host LIKE '%+%';
+-------------------------+------+-----+
| ts | host | val |
+-------------------------+------+-----+
| 1970-01-01T00:00:00 | a+b | 1.0 |
| 1970-01-01T00:00:00.001 | b+c | 2.0 |
+-------------------------+------+-----+
DROP TABLE host;
Affected Rows: 1

View File

@@ -0,0 +1,15 @@
CREATE TABLE host (
ts TIMESTAMP(3) TIME INDEX,
host STRING PRIMARY KEY,
val DOUBLE,
);
INSERT INTO TABLE host VALUES
(0, 'a+b', 1.0),
(1, 'b+c', 2.0),
(2, 'a', 3.0),
(3, 'c', 4.0);
SELECT * FROM host WHERE host LIKE '%+%';
DROP TABLE host;