Compare commits

..

11 Commits

Author SHA1 Message Date
discord9
365421c452 Revert "feat: stream drop record metrics"
This reverts commit 3eda4a2257928d95cf9c1328ae44fae84cfbb017.

Signed-off-by: discord9 <discord9@163.com>
2025-07-10 17:23:49 +08:00
discord9
b361c5f50a chore: more dbg
Signed-off-by: discord9 <discord9@163.com>
2025-07-10 17:23:49 +08:00
discord9
5d78bc1efa test: update sqlness
Signed-off-by: discord9 <discord9@163.com>
2025-07-10 17:23:49 +08:00
discord9
99c78b2f97 fix: expand differently
Signed-off-by: discord9 <discord9@163.com>
2025-07-10 17:23:49 +08:00
discord9
bdecdb869e feat: stream drop record metrics
Signed-off-by: discord9 <discord9@163.com>

refactor: move logging to drop too

Signed-off-by: discord9 <discord9@163.com>

fix: drop input stream before collect metrics

Signed-off-by: discord9 <discord9@163.com>
2025-07-10 17:23:49 +08:00
discord9
df4cd157e1 Revert "feat: stream drop record metrics"
This reverts commit 6a16946a5b8ea37557bbb1b600847d24274d6500.

Signed-off-by: discord9 <discord9@163.com>
2025-07-10 17:23:49 +08:00
discord9
60fbe54f90 feat: stream drop record metrics
Signed-off-by: discord9 <discord9@163.com>
2025-07-10 17:23:49 +08:00
discord9
7c15e71407 revert
Signed-off-by: discord9 <discord9@163.com>
2025-07-10 17:23:49 +08:00
discord9
13f20b5a40 add logging to figure test failure
Signed-off-by: discord9 <discord9@163.com>
2025-07-10 17:23:49 +08:00
Ruihang Xia
2b802e45f5 update sqlness result
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Signed-off-by: discord9 <discord9@163.com>
2025-07-10 17:23:49 +08:00
Ruihang Xia
def9b7c01d fix: expand on conditional commutative as well
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Signed-off-by: discord9 <discord9@163.com>
2025-07-10 17:23:49 +08:00
63 changed files with 1013 additions and 1569 deletions

View File

@@ -12,6 +12,3 @@ fetch = true
checkout = true
list_files = true
internal_use_git2 = false
[env]
CARGO_WORKSPACE_DIR = { value = "", relative = true }

19
Cargo.lock generated
View File

@@ -1602,17 +1602,6 @@ version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "acbc26382d871df4b7442e3df10a9402bf3cf5e55cbd66f12be38861425f0564"
[[package]]
name = "cargo-manifest"
version = "0.19.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1d8af896b707212cd0e99c112a78c9497dd32994192a463ed2f7419d29bd8c6"
dependencies = [
"serde",
"thiserror 2.0.12",
"toml 0.8.19",
]
[[package]]
name = "cast"
version = "0.3.0"
@@ -2703,7 +2692,6 @@ version = "0.16.0"
dependencies = [
"backtrace",
"common-error",
"common-version",
"console-subscriber",
"greptime-proto",
"humantime-serde",
@@ -2761,7 +2749,6 @@ name = "common-version"
version = "0.16.0"
dependencies = [
"build-data",
"cargo-manifest",
"const_format",
"serde",
"shadow-rs",
@@ -9554,9 +9541,8 @@ dependencies = [
[[package]]
name = "promql-parser"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "328fe69c2443ec4f8e6c33ea925dde04a1026e6c95928e89ed02343944cac9bf"
version = "0.5.1"
source = "git+https://github.com/GreptimeTeam/promql-parser.git?rev=0410e8b459dda7cb222ce9596f8bf3971bd07bd2#0410e8b459dda7cb222ce9596f8bf3971bd07bd2"
dependencies = [
"cfgrammar",
"chrono",
@@ -13190,7 +13176,6 @@ version = "0.8.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1ed1f98e3fdc28d6d910e6737ae6ab1a93bf1985935a1193e68f93eeb68d24e"
dependencies = [
"indexmap 2.9.0",
"serde",
"serde_spanned",
"toml_datetime",

View File

@@ -172,7 +172,9 @@ parquet = { version = "54.2", default-features = false, features = ["arrow", "as
paste = "1.0"
pin-project = "1.0"
prometheus = { version = "0.13.3", features = ["process"] }
promql-parser = { version = "0.6", features = ["ser"] }
promql-parser = { git = "https://github.com/GreptimeTeam/promql-parser.git", rev = "0410e8b459dda7cb222ce9596f8bf3971bd07bd2", features = [
"ser",
] }
prost = { version = "0.13", features = ["no-recursion-limit"] }
raft-engine = { version = "0.4.1", default-features = false }
rand = "0.9"

View File

@@ -241,6 +241,7 @@ impl RepairTool {
let alter_table_request = alter_table::make_alter_region_request_for_peer(
logical_table_id,
&alter_table_expr,
full_table_metadata.table_info.ident.version,
peer,
physical_region_routes,
)?;

View File

@@ -66,6 +66,7 @@ pub fn generate_alter_table_expr_for_all_columns(
pub fn make_alter_region_request_for_peer(
logical_table_id: TableId,
alter_table_expr: &AlterTableExpr,
schema_version: u64,
peer: &Peer,
region_routes: &[RegionRoute],
) -> Result<RegionRequest> {
@@ -73,7 +74,7 @@ pub fn make_alter_region_request_for_peer(
let mut requests = Vec::with_capacity(regions_on_this_peer.len());
for region_number in &regions_on_this_peer {
let region_id = RegionId::new(logical_table_id, *region_number);
let request = make_alter_region_request(region_id, alter_table_expr);
let request = make_alter_region_request(region_id, alter_table_expr, schema_version);
requests.push(request);
}

View File

@@ -20,11 +20,11 @@ use cmd::error::{InitTlsProviderSnafu, Result};
use cmd::options::GlobalOptions;
use cmd::{cli, datanode, flownode, frontend, metasrv, standalone, App};
use common_base::Plugins;
use common_version::{verbose_version, version};
use common_version::version;
use servers::install_ring_crypto_provider;
#[derive(Parser)]
#[command(name = "greptime", author, version, long_version = verbose_version(), about)]
#[command(name = "greptime", author, version, long_version = version(), about)]
#[command(propagate_version = true)]
pub(crate) struct Command {
#[clap(subcommand)]
@@ -143,8 +143,10 @@ async fn start(cli: Command) -> Result<()> {
}
fn setup_human_panic() {
human_panic::setup_panic!(human_panic::Metadata::new("GreptimeDB", version())
.homepage("https://github.com/GreptimeTeam/greptimedb/discussions"));
human_panic::setup_panic!(
human_panic::Metadata::new("GreptimeDB", env!("CARGO_PKG_VERSION"))
.homepage("https://github.com/GreptimeTeam/greptimedb/discussions")
);
common_telemetry::set_panic_hook();
}

View File

@@ -19,7 +19,7 @@ use catalog::kvbackend::MetaKvBackend;
use common_base::Plugins;
use common_meta::cache::LayeredCacheRegistryBuilder;
use common_telemetry::info;
use common_version::{short_version, verbose_version};
use common_version::{short_version, version};
use datanode::datanode::DatanodeBuilder;
use datanode::service::DatanodeServiceBuilder;
use meta_client::MetaClientType;
@@ -67,7 +67,7 @@ impl InstanceBuilder {
None,
);
log_versions(verbose_version(), short_version(), APP_NAME);
log_versions(version(), short_version(), APP_NAME);
create_resource_limit_metrics(APP_NAME);
plugins::setup_datanode_plugins(plugins, &opts.plugins, dn_opts)

View File

@@ -32,7 +32,7 @@ use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::TableMetadataManager;
use common_telemetry::info;
use common_telemetry::logging::{TracingOptions, DEFAULT_LOGGING_DIR};
use common_version::{short_version, verbose_version};
use common_version::{short_version, version};
use flow::{
get_flow_auth_options, FlownodeBuilder, FlownodeInstance, FlownodeServiceBuilder,
FrontendClient, FrontendInvoker,
@@ -279,7 +279,7 @@ impl StartCommand {
None,
);
log_versions(verbose_version(), short_version(), APP_NAME);
log_versions(version(), short_version(), APP_NAME);
create_resource_limit_metrics(APP_NAME);
info!("Flownode start command: {:#?}", self);

View File

@@ -33,7 +33,7 @@ use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_telemetry::info;
use common_telemetry::logging::{TracingOptions, DEFAULT_LOGGING_DIR};
use common_time::timezone::set_default_timezone;
use common_version::{short_version, verbose_version};
use common_version::{short_version, version};
use frontend::frontend::Frontend;
use frontend::heartbeat::HeartbeatTask;
use frontend::instance::builder::FrontendBuilder;
@@ -282,7 +282,7 @@ impl StartCommand {
opts.component.slow_query.as_ref(),
);
log_versions(verbose_version(), short_version(), APP_NAME);
log_versions(version(), short_version(), APP_NAME);
create_resource_limit_metrics(APP_NAME);
info!("Frontend start command: {:#?}", self);

View File

@@ -112,7 +112,7 @@ pub trait App: Send {
pub fn log_versions(version: &str, short_version: &str, app: &str) {
// Report app version as gauge.
APP_VERSION
.with_label_values(&[common_version::version(), short_version, app])
.with_label_values(&[env!("CARGO_PKG_VERSION"), short_version, app])
.inc();
// Log version and argument flags.

View File

@@ -22,7 +22,7 @@ use common_base::Plugins;
use common_config::Configurable;
use common_telemetry::info;
use common_telemetry::logging::{TracingOptions, DEFAULT_LOGGING_DIR};
use common_version::{short_version, verbose_version};
use common_version::{short_version, version};
use meta_srv::bootstrap::MetasrvInstance;
use meta_srv::metasrv::BackendImpl;
use snafu::ResultExt;
@@ -324,7 +324,7 @@ impl StartCommand {
None,
);
log_versions(verbose_version(), short_version(), APP_NAME);
log_versions(version(), short_version(), APP_NAME);
create_resource_limit_metrics(APP_NAME);
info!("Metasrv start command: {:#?}", self);
@@ -340,12 +340,12 @@ impl StartCommand {
.await
.context(StartMetaServerSnafu)?;
let builder = meta_srv::bootstrap::metasrv_builder(&opts, plugins, None)
let builder = meta_srv::bootstrap::metasrv_builder(&opts, plugins.clone(), None)
.await
.context(error::BuildMetaServerSnafu)?;
let metasrv = builder.build().await.context(error::BuildMetaServerSnafu)?;
let instance = MetasrvInstance::new(metasrv)
let instance = MetasrvInstance::new(opts, plugins, metasrv)
.await
.context(error::BuildMetaServerSnafu)?;

View File

@@ -51,7 +51,7 @@ use common_telemetry::logging::{
LoggingOptions, SlowQueryOptions, TracingOptions, DEFAULT_LOGGING_DIR,
};
use common_time::timezone::set_default_timezone;
use common_version::{short_version, verbose_version};
use common_version::{short_version, version};
use common_wal::config::DatanodeWalConfig;
use datanode::config::{DatanodeOptions, ProcedureConfig, RegionEngineConfig, StorageConfig};
use datanode::datanode::{Datanode, DatanodeBuilder};
@@ -485,7 +485,7 @@ impl StartCommand {
opts.component.slow_query.as_ref(),
);
log_versions(verbose_version(), short_version(), APP_NAME);
log_versions(version(), short_version(), APP_NAME);
create_resource_limit_metrics(APP_NAME);
info!("Standalone start command: {:#?}", self);

View File

@@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt;
use std::sync::Arc;
use std::{env, fmt};
use common_query::error::Result;
use common_query::prelude::{Signature, Volatility};
@@ -47,7 +47,7 @@ impl Function for PGVersionFunction {
fn eval(&self, _func_ctx: &FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {
let result = StringVector::from(vec![format!(
"PostgreSQL 16.3 GreptimeDB {}",
common_version::version()
env!("CARGO_PKG_VERSION")
)]);
Ok(Arc::new(result))
}

View File

@@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt;
use std::sync::Arc;
use std::{env, fmt};
use common_query::error::Result;
use common_query::prelude::{Signature, Volatility};
@@ -52,13 +52,13 @@ impl Function for VersionFunction {
"{}-greptimedb-{}",
std::env::var("GREPTIMEDB_MYSQL_SERVER_VERSION")
.unwrap_or_else(|_| "8.4.2".to_string()),
common_version::version()
env!("CARGO_PKG_VERSION")
)
}
Channel::Postgres => {
format!("16.3-greptimedb-{}", common_version::version())
format!("16.3-greptimedb-{}", env!("CARGO_PKG_VERSION"))
}
_ => common_version::version().to_string(),
_ => env!("CARGO_PKG_VERSION").to_string(),
};
let result = StringVector::from(vec![version]);
Ok(Arc::new(result))

View File

@@ -50,6 +50,7 @@ pub mod drop_flow;
pub mod drop_table;
pub mod drop_view;
pub mod flow_meta;
mod physical_table_metadata;
pub mod table_meta;
#[cfg(any(test, feature = "testing"))]
pub mod test_util;

View File

@@ -12,17 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod executor;
mod check;
mod metadata;
mod region_request;
mod table_cache_keys;
mod update_metadata;
mod validator;
use api::region::RegionResponse;
use async_trait::async_trait;
use common_catalog::format_full_table_name;
use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
use common_procedure::{Context, LockKey, Procedure, Status};
use common_telemetry::{debug, error, info, warn};
pub use executor::make_alter_region_request;
use common_telemetry::{error, info, warn};
use futures_util::future;
pub use region_request::make_alter_region_request;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use store_api::metadata::ColumnMetadata;
@@ -30,12 +33,10 @@ use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY;
use strum::AsRefStr;
use table::metadata::TableId;
use crate::cache_invalidator::Context as CacheContext;
use crate::ddl::alter_logical_tables::executor::AlterLogicalTablesExecutor;
use crate::ddl::alter_logical_tables::validator::{
retain_unskipped, AlterLogicalTableValidator, ValidatorResult,
use crate::ddl::utils::{
add_peer_context_if_needed, extract_column_metadatas, map_to_procedure_error,
sync_follower_regions,
};
use crate::ddl::utils::{extract_column_metadatas, map_to_procedure_error, sync_follower_regions};
use crate::ddl::DdlContext;
use crate::error::Result;
use crate::instruction::CacheIdent;
@@ -45,38 +46,13 @@ use crate::key::DeserializedValueWithBytes;
use crate::lock_key::{CatalogLock, SchemaLock, TableLock};
use crate::metrics;
use crate::rpc::ddl::AlterTableTask;
use crate::rpc::router::RegionRoute;
use crate::rpc::router::{find_leaders, RegionRoute};
pub struct AlterLogicalTablesProcedure {
pub context: DdlContext,
pub data: AlterTablesData,
}
/// Builds the validator from the [`AlterTablesData`].
fn build_validator_from_alter_table_data<'a>(
data: &'a AlterTablesData,
) -> AlterLogicalTableValidator<'a> {
let phsycial_table_id = data.physical_table_id;
let alters = data
.tasks
.iter()
.map(|task| &task.alter_table)
.collect::<Vec<_>>();
AlterLogicalTableValidator::new(phsycial_table_id, alters)
}
/// Builds the executor from the [`AlterTablesData`].
fn build_executor_from_alter_expr<'a>(data: &'a AlterTablesData) -> AlterLogicalTablesExecutor<'a> {
debug_assert_eq!(data.tasks.len(), data.table_info_values.len());
let alters = data
.tasks
.iter()
.zip(data.table_info_values.iter())
.map(|(task, table_info)| (table_info.table_info.ident.table_id, &task.alter_table))
.collect::<Vec<_>>();
AlterLogicalTablesExecutor::new(alters)
}
impl AlterLogicalTablesProcedure {
pub const TYPE_NAME: &'static str = "metasrv-procedure::AlterLogicalTables";
@@ -106,44 +82,35 @@ impl AlterLogicalTablesProcedure {
}
pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
let validator = build_validator_from_alter_table_data(&self.data);
let ValidatorResult {
num_skipped,
skip_alter,
table_info_values,
physical_table_info,
physical_table_route,
} = validator
.validate(&self.context.table_metadata_manager)
.await?;
let num_tasks = self.data.tasks.len();
if num_skipped == num_tasks {
// Checks all the tasks
self.check_input_tasks()?;
// Fills the table info values
self.fill_table_info_values().await?;
// Checks the physical table, must after [fill_table_info_values]
self.check_physical_table().await?;
// Fills the physical table info
self.fill_physical_table_info().await?;
// Filter the finished tasks
let finished_tasks = self.check_finished_tasks()?;
let already_finished_count = finished_tasks
.iter()
.map(|x| if *x { 1 } else { 0 })
.sum::<usize>();
let apply_tasks_count = self.data.tasks.len();
if already_finished_count == apply_tasks_count {
info!("All the alter tasks are finished, will skip the procedure.");
let cache_ident_keys = AlterLogicalTablesExecutor::build_cache_ident_keys(
&physical_table_info,
&table_info_values
.iter()
.map(|v| v.get_inner_ref())
.collect::<Vec<_>>(),
);
self.data.table_cache_keys_to_invalidate = cache_ident_keys;
// Re-invalidate the table cache
self.data.state = AlterTablesState::InvalidateTableCache;
return Ok(Status::executing(true));
} else if num_skipped > 0 {
} else if already_finished_count > 0 {
info!(
"There are {} alter tasks, {} of them were already finished.",
num_tasks, num_skipped
apply_tasks_count, already_finished_count
);
}
self.filter_task(&finished_tasks)?;
// Updates the procedure state.
retain_unskipped(&mut self.data.tasks, &skip_alter);
self.data.physical_table_info = Some(physical_table_info);
self.data.physical_table_route = Some(physical_table_route);
self.data.table_info_values = table_info_values;
debug_assert_eq!(self.data.tasks.len(), self.data.table_info_values.len());
// Next state
self.data.state = AlterTablesState::SubmitAlterRegionRequests;
Ok(Status::executing(true))
}
@@ -151,13 +118,25 @@ impl AlterLogicalTablesProcedure {
pub(crate) async fn on_submit_alter_region_requests(&mut self) -> Result<Status> {
// Safety: we have checked the state in on_prepare
let physical_table_route = &self.data.physical_table_route.as_ref().unwrap();
let executor = build_executor_from_alter_expr(&self.data);
let mut results = executor
.on_alter_regions(
&self.context.node_manager,
&physical_table_route.region_routes,
)
.await?;
let leaders = find_leaders(&physical_table_route.region_routes);
let mut alter_region_tasks = Vec::with_capacity(leaders.len());
for peer in leaders {
let requester = self.context.node_manager.datanode(&peer).await;
let request = self.make_request(&peer, &physical_table_route.region_routes)?;
alter_region_tasks.push(async move {
requester
.handle(request)
.await
.map_err(add_peer_context_if_needed(peer))
});
}
let mut results = future::join_all(alter_region_tasks)
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;
if let Some(column_metadatas) =
extract_column_metadatas(&mut results, ALTER_PHYSICAL_EXTENSION_KEY)?
@@ -198,18 +177,7 @@ impl AlterLogicalTablesProcedure {
self.update_physical_table_metadata().await?;
self.update_logical_tables_metadata().await?;
let logical_table_info_values = self
.data
.table_info_values
.iter()
.map(|v| v.get_inner_ref())
.collect::<Vec<_>>();
let cache_ident_keys = AlterLogicalTablesExecutor::build_cache_ident_keys(
self.data.physical_table_info.as_ref().unwrap(),
&logical_table_info_values,
);
self.data.table_cache_keys_to_invalidate = cache_ident_keys;
self.data.build_cache_keys_to_invalidate();
self.data.clear_metadata_fields();
self.data.state = AlterTablesState::InvalidateTableCache;
@@ -219,16 +187,9 @@ impl AlterLogicalTablesProcedure {
pub(crate) async fn on_invalidate_table_cache(&mut self) -> Result<Status> {
let to_invalidate = &self.data.table_cache_keys_to_invalidate;
let ctx = CacheContext {
subject: Some(format!(
"Invalidate table cache by altering logical tables, physical_table_id: {}",
self.data.physical_table_id,
)),
};
self.context
.cache_invalidator
.invalidate(&ctx, to_invalidate)
.invalidate(&Default::default(), to_invalidate)
.await?;
Ok(Status::done())
}
@@ -248,10 +209,6 @@ impl Procedure for AlterLogicalTablesProcedure {
let _timer = metrics::METRIC_META_PROCEDURE_ALTER_TABLE
.with_label_values(&[step])
.start_timer();
debug!(
"Executing alter logical tables procedure, state: {:?}",
state
);
match state {
AlterTablesState::Prepare => self.on_prepare().await,

View File

@@ -0,0 +1,136 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use api::v1::alter_table_expr::Kind;
use snafu::{ensure, OptionExt};
use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
use crate::error::{AlterLogicalTablesInvalidArgumentsSnafu, Result};
use crate::key::table_info::TableInfoValue;
use crate::key::table_route::TableRouteValue;
use crate::rpc::ddl::AlterTableTask;
impl AlterLogicalTablesProcedure {
pub(crate) fn check_input_tasks(&self) -> Result<()> {
self.check_schema()?;
self.check_alter_kind()?;
Ok(())
}
pub(crate) async fn check_physical_table(&self) -> Result<()> {
let table_route_manager = self.context.table_metadata_manager.table_route_manager();
let table_ids = self
.data
.table_info_values
.iter()
.map(|v| v.table_info.ident.table_id)
.collect::<Vec<_>>();
let table_routes = table_route_manager
.table_route_storage()
.batch_get(&table_ids)
.await?;
let physical_table_id = self.data.physical_table_id;
let is_same_physical_table = table_routes.iter().all(|r| {
if let Some(TableRouteValue::Logical(r)) = r {
r.physical_table_id() == physical_table_id
} else {
false
}
});
ensure!(
is_same_physical_table,
AlterLogicalTablesInvalidArgumentsSnafu {
err_msg: "All the tasks should have the same physical table id"
}
);
Ok(())
}
pub(crate) fn check_finished_tasks(&self) -> Result<Vec<bool>> {
let task = &self.data.tasks;
let table_info_values = &self.data.table_info_values;
Ok(task
.iter()
.zip(table_info_values.iter())
.map(|(task, table)| Self::check_finished_task(task, table))
.collect())
}
// Checks if the schemas of the tasks are the same
fn check_schema(&self) -> Result<()> {
let is_same_schema = self.data.tasks.windows(2).all(|pair| {
pair[0].alter_table.catalog_name == pair[1].alter_table.catalog_name
&& pair[0].alter_table.schema_name == pair[1].alter_table.schema_name
});
ensure!(
is_same_schema,
AlterLogicalTablesInvalidArgumentsSnafu {
err_msg: "Schemas of the tasks are not the same"
}
);
Ok(())
}
fn check_alter_kind(&self) -> Result<()> {
for task in &self.data.tasks {
let kind = task.alter_table.kind.as_ref().context(
AlterLogicalTablesInvalidArgumentsSnafu {
err_msg: "Alter kind is missing",
},
)?;
let Kind::AddColumns(_) = kind else {
return AlterLogicalTablesInvalidArgumentsSnafu {
err_msg: "Only support add columns operation",
}
.fail();
};
}
Ok(())
}
fn check_finished_task(task: &AlterTableTask, table: &TableInfoValue) -> bool {
let columns = table
.table_info
.meta
.schema
.column_schemas
.iter()
.map(|c| &c.name)
.collect::<HashSet<_>>();
let Some(kind) = task.alter_table.kind.as_ref() else {
return true; // Never get here since we have checked it in `check_alter_kind`
};
let Kind::AddColumns(add_columns) = kind else {
return true; // Never get here since we have checked it in `check_alter_kind`
};
// We only check that all columns have been finished. That is to say,
// if one part is finished but another part is not, it will be considered
// unfinished.
add_columns
.add_columns
.iter()
.map(|add_column| add_column.column_def.as_ref().map(|c| &c.name))
.all(|column| column.map(|c| columns.contains(c)).unwrap_or(false))
}
}

View File

@@ -1,216 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use api::region::RegionResponse;
use api::v1::alter_table_expr::Kind;
use api::v1::region::{
alter_request, region_request, AddColumn, AddColumns, AlterRequest, AlterRequests,
RegionColumnDef, RegionRequest, RegionRequestHeader,
};
use api::v1::{self, AlterTableExpr};
use common_telemetry::tracing_context::TracingContext;
use common_telemetry::{debug, warn};
use futures::future;
use store_api::metadata::ColumnMetadata;
use store_api::storage::{RegionId, RegionNumber, TableId};
use crate::ddl::utils::{add_peer_context_if_needed, raw_table_info};
use crate::error::Result;
use crate::instruction::CacheIdent;
use crate::key::table_info::TableInfoValue;
use crate::key::{DeserializedValueWithBytes, RegionDistribution, TableMetadataManagerRef};
use crate::node_manager::NodeManagerRef;
use crate::rpc::router::{find_leaders, region_distribution, RegionRoute};
/// [AlterLogicalTablesExecutor] performs:
/// - Alters logical regions on the datanodes.
/// - Updates table metadata for alter table operation.
pub struct AlterLogicalTablesExecutor<'a> {
/// The alter table expressions.
///
/// The first element is the logical table id, the second element is the alter table expression.
alters: Vec<(TableId, &'a AlterTableExpr)>,
}
impl<'a> AlterLogicalTablesExecutor<'a> {
pub fn new(alters: Vec<(TableId, &'a AlterTableExpr)>) -> Self {
Self { alters }
}
/// Alters logical regions on the datanodes.
pub(crate) async fn on_alter_regions(
&self,
node_manager: &NodeManagerRef,
region_routes: &[RegionRoute],
) -> Result<Vec<RegionResponse>> {
let region_distribution = region_distribution(region_routes);
let leaders = find_leaders(region_routes)
.into_iter()
.map(|p| (p.id, p))
.collect::<HashMap<_, _>>();
let mut alter_region_tasks = Vec::with_capacity(leaders.len());
for (datanode_id, region_role_set) in region_distribution {
if region_role_set.leader_regions.is_empty() {
continue;
}
// Safety: must exists.
let peer = leaders.get(&datanode_id).unwrap();
let requester = node_manager.datanode(peer).await;
let requests = self.make_alter_region_request(&region_role_set.leader_regions);
let requester = requester.clone();
let peer = peer.clone();
debug!("Sending alter region requests to datanode {}", peer);
alter_region_tasks.push(async move {
requester
.handle(make_request(requests))
.await
.map_err(add_peer_context_if_needed(peer))
});
}
future::join_all(alter_region_tasks)
.await
.into_iter()
.collect::<Result<Vec<_>>>()
}
fn make_alter_region_request(&self, region_numbers: &[RegionNumber]) -> AlterRequests {
let mut requests = Vec::with_capacity(region_numbers.len() * self.alters.len());
for (table_id, alter) in self.alters.iter() {
for region_number in region_numbers {
let region_id = RegionId::new(*table_id, *region_number);
let request = make_alter_region_request(region_id, alter);
requests.push(request);
}
}
AlterRequests { requests }
}
/// Updates table metadata for alter table operation.
///
/// ## Panic:
/// - If the region distribution is not set when updating table metadata.
pub(crate) async fn on_alter_metadata(
physical_table_id: TableId,
table_metadata_manager: &TableMetadataManagerRef,
current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
region_distribution: RegionDistribution,
physical_columns: &[ColumnMetadata],
) -> Result<()> {
if physical_columns.is_empty() {
warn!("No physical columns found, leaving the physical table's schema unchanged when altering logical tables");
return Ok(());
}
let table_ref = current_table_info_value.table_ref();
let table_id = physical_table_id;
// Generates new table info
let old_raw_table_info = current_table_info_value.table_info.clone();
let new_raw_table_info =
raw_table_info::build_new_physical_table_info(old_raw_table_info, physical_columns);
debug!(
"Starting update table: {} metadata, table_id: {}, new table info: {:?}",
table_ref, table_id, new_raw_table_info
);
table_metadata_manager
.update_table_info(
current_table_info_value,
Some(region_distribution),
new_raw_table_info,
)
.await?;
Ok(())
}
/// Builds the cache ident keys for the alter logical tables.
///
/// The cache ident keys are:
/// - The table id of the logical tables.
/// - The table name of the logical tables.
/// - The table id of the physical table.
pub(crate) fn build_cache_ident_keys(
physical_table_info: &TableInfoValue,
logical_table_info_values: &[&TableInfoValue],
) -> Vec<CacheIdent> {
let mut cache_keys = Vec::with_capacity(logical_table_info_values.len() * 2 + 2);
cache_keys.extend(logical_table_info_values.iter().flat_map(|table| {
vec![
CacheIdent::TableId(table.table_info.ident.table_id),
CacheIdent::TableName(table.table_name()),
]
}));
cache_keys.push(CacheIdent::TableId(
physical_table_info.table_info.ident.table_id,
));
cache_keys.push(CacheIdent::TableName(physical_table_info.table_name()));
cache_keys
}
}
fn make_request(alter_requests: AlterRequests) -> RegionRequest {
RegionRequest {
header: Some(RegionRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
..Default::default()
}),
body: Some(region_request::Body::Alters(alter_requests)),
}
}
/// Makes an alter region request.
pub fn make_alter_region_request(
region_id: RegionId,
alter_table_expr: &AlterTableExpr,
) -> AlterRequest {
let region_id = region_id.as_u64();
let kind = match &alter_table_expr.kind {
Some(Kind::AddColumns(add_columns)) => Some(alter_request::Kind::AddColumns(
to_region_add_columns(add_columns),
)),
_ => unreachable!(), // Safety: we have checked the kind in check_input_tasks
};
AlterRequest {
region_id,
schema_version: 0,
kind,
}
}
fn to_region_add_columns(add_columns: &v1::AddColumns) -> AddColumns {
let add_columns = add_columns
.add_columns
.iter()
.map(|add_column| {
let region_column_def = RegionColumnDef {
column_def: add_column.column_def.clone(),
..Default::default() // other fields are not used in alter logical table
};
AddColumn {
column_def: Some(region_column_def),
..Default::default() // other fields are not used in alter logical table
}
})
.collect();
AddColumns { add_columns }
}

View File

@@ -0,0 +1,158 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_catalog::format_full_table_name;
use snafu::OptionExt;
use table::metadata::TableId;
use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
use crate::error::{
AlterLogicalTablesInvalidArgumentsSnafu, Result, TableInfoNotFoundSnafu, TableNotFoundSnafu,
TableRouteNotFoundSnafu,
};
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::key::DeserializedValueWithBytes;
use crate::rpc::ddl::AlterTableTask;
impl AlterLogicalTablesProcedure {
pub(crate) fn filter_task(&mut self, finished_tasks: &[bool]) -> Result<()> {
debug_assert_eq!(finished_tasks.len(), self.data.tasks.len());
debug_assert_eq!(finished_tasks.len(), self.data.table_info_values.len());
self.data.tasks = self
.data
.tasks
.drain(..)
.zip(finished_tasks.iter())
.filter_map(|(task, finished)| if *finished { None } else { Some(task) })
.collect();
self.data.table_info_values = self
.data
.table_info_values
.drain(..)
.zip(finished_tasks.iter())
.filter_map(|(table_info_value, finished)| {
if *finished {
None
} else {
Some(table_info_value)
}
})
.collect();
Ok(())
}
pub(crate) async fn fill_physical_table_info(&mut self) -> Result<()> {
let (physical_table_info, physical_table_route) = self
.context
.table_metadata_manager
.get_full_table_info(self.data.physical_table_id)
.await?;
let physical_table_info = physical_table_info.with_context(|| TableInfoNotFoundSnafu {
table: format!("table id - {}", self.data.physical_table_id),
})?;
let physical_table_route = physical_table_route
.context(TableRouteNotFoundSnafu {
table_id: self.data.physical_table_id,
})?
.into_inner();
self.data.physical_table_info = Some(physical_table_info);
let TableRouteValue::Physical(physical_table_route) = physical_table_route else {
return AlterLogicalTablesInvalidArgumentsSnafu {
err_msg: format!(
"expected a physical table but got a logical table: {:?}",
self.data.physical_table_id
),
}
.fail();
};
self.data.physical_table_route = Some(physical_table_route);
Ok(())
}
pub(crate) async fn fill_table_info_values(&mut self) -> Result<()> {
let table_ids = self.get_all_table_ids().await?;
let table_info_values = self.get_all_table_info_values(&table_ids).await?;
debug_assert_eq!(table_info_values.len(), self.data.tasks.len());
self.data.table_info_values = table_info_values;
Ok(())
}
async fn get_all_table_info_values(
&self,
table_ids: &[TableId],
) -> Result<Vec<DeserializedValueWithBytes<TableInfoValue>>> {
let table_info_manager = self.context.table_metadata_manager.table_info_manager();
let mut table_info_map = table_info_manager.batch_get_raw(table_ids).await?;
let mut table_info_values = Vec::with_capacity(table_ids.len());
for (table_id, task) in table_ids.iter().zip(self.data.tasks.iter()) {
let table_info_value =
table_info_map
.remove(table_id)
.with_context(|| TableInfoNotFoundSnafu {
table: extract_table_name(task),
})?;
table_info_values.push(table_info_value);
}
Ok(table_info_values)
}
async fn get_all_table_ids(&self) -> Result<Vec<TableId>> {
let table_name_manager = self.context.table_metadata_manager.table_name_manager();
let table_name_keys = self
.data
.tasks
.iter()
.map(|task| extract_table_name_key(task))
.collect();
let table_name_values = table_name_manager.batch_get(table_name_keys).await?;
let mut table_ids = Vec::with_capacity(table_name_values.len());
for (value, task) in table_name_values.into_iter().zip(self.data.tasks.iter()) {
let table_id = value
.with_context(|| TableNotFoundSnafu {
table_name: extract_table_name(task),
})?
.table_id();
table_ids.push(table_id);
}
Ok(table_ids)
}
}
#[inline]
fn extract_table_name(task: &AlterTableTask) -> String {
format_full_table_name(
&task.alter_table.catalog_name,
&task.alter_table.schema_name,
&task.alter_table.table_name,
)
}
#[inline]
fn extract_table_name_key(task: &AlterTableTask) -> TableNameKey {
TableNameKey::new(
&task.alter_table.catalog_name,
&task.alter_table.schema_name,
&task.alter_table.table_name,
)
}

View File

@@ -0,0 +1,113 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::alter_table_expr::Kind;
use api::v1::region::{
alter_request, region_request, AddColumn, AddColumns, AlterRequest, AlterRequests,
RegionColumnDef, RegionRequest, RegionRequestHeader,
};
use api::v1::{self, AlterTableExpr};
use common_telemetry::tracing_context::TracingContext;
use store_api::storage::RegionId;
use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
use crate::error::Result;
use crate::peer::Peer;
use crate::rpc::router::{find_leader_regions, RegionRoute};
impl AlterLogicalTablesProcedure {
pub(crate) fn make_request(
&self,
peer: &Peer,
region_routes: &[RegionRoute],
) -> Result<RegionRequest> {
let alter_requests = self.make_alter_region_requests(peer, region_routes)?;
let request = RegionRequest {
header: Some(RegionRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
..Default::default()
}),
body: Some(region_request::Body::Alters(alter_requests)),
};
Ok(request)
}
fn make_alter_region_requests(
&self,
peer: &Peer,
region_routes: &[RegionRoute],
) -> Result<AlterRequests> {
let tasks = &self.data.tasks;
let regions_on_this_peer = find_leader_regions(region_routes, peer);
let mut requests = Vec::with_capacity(tasks.len() * regions_on_this_peer.len());
for (task, table) in self
.data
.tasks
.iter()
.zip(self.data.table_info_values.iter())
{
for region_number in &regions_on_this_peer {
let region_id = RegionId::new(table.table_info.ident.table_id, *region_number);
let request = make_alter_region_request(
region_id,
&task.alter_table,
table.table_info.ident.version,
);
requests.push(request);
}
}
Ok(AlterRequests { requests })
}
}
/// Makes an alter region request.
pub fn make_alter_region_request(
region_id: RegionId,
alter_table_expr: &AlterTableExpr,
schema_version: u64,
) -> AlterRequest {
let region_id = region_id.as_u64();
let kind = match &alter_table_expr.kind {
Some(Kind::AddColumns(add_columns)) => Some(alter_request::Kind::AddColumns(
to_region_add_columns(add_columns),
)),
_ => unreachable!(), // Safety: we have checked the kind in check_input_tasks
};
AlterRequest {
region_id,
schema_version,
kind,
}
}
fn to_region_add_columns(add_columns: &v1::AddColumns) -> AddColumns {
let add_columns = add_columns
.add_columns
.iter()
.map(|add_column| {
let region_column_def = RegionColumnDef {
column_def: add_column.column_def.clone(),
..Default::default() // other fields are not used in alter logical table
};
AddColumn {
column_def: Some(region_column_def),
..Default::default() // other fields are not used in alter logical table
}
})
.collect();
AddColumns { add_columns }
}

View File

@@ -0,0 +1,50 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use table::metadata::RawTableInfo;
use table::table_name::TableName;
use crate::ddl::alter_logical_tables::AlterTablesData;
use crate::instruction::CacheIdent;
impl AlterTablesData {
pub(crate) fn build_cache_keys_to_invalidate(&mut self) {
let mut cache_keys = self
.table_info_values
.iter()
.flat_map(|table| {
vec![
CacheIdent::TableId(table.table_info.ident.table_id),
CacheIdent::TableName(extract_table_name(&table.table_info)),
]
})
.collect::<Vec<_>>();
cache_keys.push(CacheIdent::TableId(self.physical_table_id));
// Safety: physical_table_info already filled in previous steps
let physical_table_info = &self.physical_table_info.as_ref().unwrap().table_info;
cache_keys.push(CacheIdent::TableName(extract_table_name(
physical_table_info,
)));
self.table_cache_keys_to_invalidate = cache_keys;
}
}
fn extract_table_name(table_info: &RawTableInfo) -> TableName {
TableName::new(
&table_info.catalog_name,
&table_info.schema_name,
&table_info.name,
)
}

View File

@@ -13,35 +13,41 @@
// limitations under the License.
use common_grpc_expr::alter_expr_to_request;
use common_telemetry::warn;
use itertools::Itertools;
use snafu::ResultExt;
use table::metadata::{RawTableInfo, TableInfo};
use crate::ddl::alter_logical_tables::executor::AlterLogicalTablesExecutor;
use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
use crate::ddl::physical_table_metadata;
use crate::error;
use crate::error::{ConvertAlterTableRequestSnafu, Result};
use crate::key::table_info::TableInfoValue;
use crate::key::DeserializedValueWithBytes;
use crate::rpc::ddl::AlterTableTask;
use crate::rpc::router::region_distribution;
impl AlterLogicalTablesProcedure {
pub(crate) async fn update_physical_table_metadata(&mut self) -> Result<()> {
if self.data.physical_columns.is_empty() {
warn!("No physical columns found, leaving the physical table's schema unchanged when altering logical tables");
return Ok(());
}
// Safety: must exist.
let physical_table_info = self.data.physical_table_info.as_ref().unwrap();
let physical_table_route = self.data.physical_table_route.as_ref().unwrap();
let region_distribution = region_distribution(&physical_table_route.region_routes);
// Updates physical table's metadata.
AlterLogicalTablesExecutor::on_alter_metadata(
self.data.physical_table_id,
&self.context.table_metadata_manager,
physical_table_info,
region_distribution,
// Generates new table info
let old_raw_table_info = physical_table_info.table_info.clone();
let new_raw_table_info = physical_table_metadata::build_new_physical_table_info(
old_raw_table_info,
&self.data.physical_columns,
)
.await?;
);
// Updates physical table's metadata, and we don't need to touch per-region settings.
self.context
.table_metadata_manager
.update_table_info(physical_table_info, None, new_raw_table_info)
.await?;
Ok(())
}

View File

@@ -1,284 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use api::v1::alter_table_expr::Kind;
use api::v1::AlterTableExpr;
use snafu::{ensure, OptionExt};
use store_api::storage::TableId;
use table::table_reference::TableReference;
use crate::ddl::utils::table_id::get_all_table_ids_by_names;
use crate::ddl::utils::table_info::get_all_table_info_values_by_table_ids;
use crate::error::{
AlterLogicalTablesInvalidArgumentsSnafu, Result, TableInfoNotFoundSnafu,
TableRouteNotFoundSnafu,
};
use crate::key::table_info::TableInfoValue;
use crate::key::table_route::{PhysicalTableRouteValue, TableRouteManager, TableRouteValue};
use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
/// [AlterLogicalTableValidator] validates the alter logical expressions.
pub struct AlterLogicalTableValidator<'a> {
physical_table_id: TableId,
alters: Vec<&'a AlterTableExpr>,
}
impl<'a> AlterLogicalTableValidator<'a> {
pub fn new(physical_table_id: TableId, alters: Vec<&'a AlterTableExpr>) -> Self {
Self {
physical_table_id,
alters,
}
}
/// Validates all alter table expressions have the same schema and catalog.
fn validate_schema(&self) -> Result<()> {
let is_same_schema = self.alters.windows(2).all(|pair| {
pair[0].catalog_name == pair[1].catalog_name
&& pair[0].schema_name == pair[1].schema_name
});
ensure!(
is_same_schema,
AlterLogicalTablesInvalidArgumentsSnafu {
err_msg: "Schemas of the alter table expressions are not the same"
}
);
Ok(())
}
/// Validates that all alter table expressions are of the supported kind.
/// Currently only supports `AddColumns` operations.
fn validate_alter_kind(&self) -> Result<()> {
for alter in &self.alters {
let kind = alter
.kind
.as_ref()
.context(AlterLogicalTablesInvalidArgumentsSnafu {
err_msg: "Alter kind is missing",
})?;
let Kind::AddColumns(_) = kind else {
return AlterLogicalTablesInvalidArgumentsSnafu {
err_msg: "Only support add columns operation",
}
.fail();
};
}
Ok(())
}
fn table_names(&self) -> Vec<TableReference> {
self.alters
.iter()
.map(|alter| {
TableReference::full(&alter.catalog_name, &alter.schema_name, &alter.table_name)
})
.collect()
}
/// Validates that the physical table info and route exist.
///
/// This method performs the following validations:
/// 1. Retrieves the full table info and route for the given physical table id
/// 2. Ensures the table info and table route exists
/// 3. Verifies that the table route is actually a physical table route, not a logical one
///
/// Returns a tuple containing the validated table info and physical table route.
async fn validate_physical_table(
&self,
table_metadata_manager: &TableMetadataManagerRef,
) -> Result<(
DeserializedValueWithBytes<TableInfoValue>,
PhysicalTableRouteValue,
)> {
let (table_info, table_route) = table_metadata_manager
.get_full_table_info(self.physical_table_id)
.await?;
let table_info = table_info.with_context(|| TableInfoNotFoundSnafu {
table: format!("table id - {}", self.physical_table_id),
})?;
let physical_table_route = table_route
.context(TableRouteNotFoundSnafu {
table_id: self.physical_table_id,
})?
.into_inner();
let TableRouteValue::Physical(table_route) = physical_table_route else {
return AlterLogicalTablesInvalidArgumentsSnafu {
err_msg: format!(
"expected a physical table but got a logical table: {:?}",
self.physical_table_id
),
}
.fail();
};
Ok((table_info, table_route))
}
/// Validates that all logical table routes have the same physical table id.
///
/// This method performs the following validations:
/// 1. Retrieves table routes for all the given table ids.
/// 2. Ensures that all retrieved routes are logical table routes (not physical)
/// 3. Verifies that all logical table routes reference the same physical table id.
/// 4. Returns an error if any route is not logical or references a different physical table.
async fn validate_logical_table_routes(
&self,
table_route_manager: &TableRouteManager,
table_ids: &[TableId],
) -> Result<()> {
let table_routes = table_route_manager
.table_route_storage()
.batch_get(table_ids)
.await?;
let physical_table_id = self.physical_table_id;
let is_same_physical_table = table_routes.iter().all(|r| {
if let Some(TableRouteValue::Logical(r)) = r {
r.physical_table_id() == physical_table_id
} else {
false
}
});
ensure!(
is_same_physical_table,
AlterLogicalTablesInvalidArgumentsSnafu {
err_msg: "All the tasks should have the same physical table id"
}
);
Ok(())
}
/// Validates the alter logical expressions.
///
/// This method performs the following validations:
/// 1. Validates that all alter table expressions have the same schema and catalog.
/// 2. Validates that all alter table expressions are of the supported kind.
/// 3. Validates that the physical table info and route exist.
/// 4. Validates that all logical table routes have the same physical table id.
///
/// Returns a [ValidatorResult] containing the validation results.
pub async fn validate(
&self,
table_metadata_manager: &TableMetadataManagerRef,
) -> Result<ValidatorResult> {
self.validate_schema()?;
self.validate_alter_kind()?;
let (physical_table_info, physical_table_route) =
self.validate_physical_table(table_metadata_manager).await?;
let table_names = self.table_names();
let table_ids =
get_all_table_ids_by_names(table_metadata_manager.table_name_manager(), &table_names)
.await?;
let mut table_info_values = get_all_table_info_values_by_table_ids(
table_metadata_manager.table_info_manager(),
&table_ids,
&table_names,
)
.await?;
self.validate_logical_table_routes(
table_metadata_manager.table_route_manager(),
&table_ids,
)
.await?;
let skip_alter = self
.alters
.iter()
.zip(table_info_values.iter())
.map(|(task, table)| skip_alter_logical_region(task, table))
.collect::<Vec<_>>();
retain_unskipped(&mut table_info_values, &skip_alter);
let num_skipped = skip_alter.iter().filter(|&&x| x).count();
Ok(ValidatorResult {
num_skipped,
skip_alter,
table_info_values,
physical_table_info,
physical_table_route,
})
}
}
/// The result of the validator.
pub(crate) struct ValidatorResult {
pub(crate) num_skipped: usize,
pub(crate) skip_alter: Vec<bool>,
pub(crate) table_info_values: Vec<DeserializedValueWithBytes<TableInfoValue>>,
pub(crate) physical_table_info: DeserializedValueWithBytes<TableInfoValue>,
pub(crate) physical_table_route: PhysicalTableRouteValue,
}
/// Retains the elements that are not skipped.
pub(crate) fn retain_unskipped<T>(target: &mut Vec<T>, skipped: &[bool]) {
debug_assert_eq!(target.len(), skipped.len());
let mut iter = skipped.iter();
target.retain(|_| !iter.next().unwrap());
}
/// Returns true if does not required to alter the logical region.
fn skip_alter_logical_region(alter: &AlterTableExpr, table: &TableInfoValue) -> bool {
let existing_columns = table
.table_info
.meta
.schema
.column_schemas
.iter()
.map(|c| &c.name)
.collect::<HashSet<_>>();
let Some(kind) = alter.kind.as_ref() else {
return true; // Never get here since we have checked it in `validate_alter_kind`
};
let Kind::AddColumns(add_columns) = kind else {
return true; // Never get here since we have checked it in `validate_alter_kind`
};
// We only check that all columns have been finished. That is to say,
// if one part is finished but another part is not, it will be considered
// unfinished.
add_columns
.add_columns
.iter()
.map(|add_column| add_column.column_def.as_ref().map(|c| &c.name))
.all(|column| {
column
.map(|c| existing_columns.contains(c))
.unwrap_or(false)
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_retain_unskipped() {
let mut target = vec![1, 2, 3, 4, 5];
let skipped = vec![false, true, false, true, false];
retain_unskipped(&mut target, &skipped);
assert_eq!(target, vec![1, 3, 5]);
}
}

View File

@@ -12,9 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod executor;
mod check;
mod metadata;
mod region_request;
mod update_metadata;
use std::vec;
@@ -28,29 +29,33 @@ use common_procedure::{
Context as ProcedureContext, ContextProvider, Error as ProcedureError, LockKey, PoisonKey,
PoisonKeys, Procedure, ProcedureId, Status, StringKey,
};
use common_telemetry::{error, info, warn};
use common_telemetry::{debug, error, info, warn};
use futures::future::{self};
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
use store_api::metadata::ColumnMetadata;
use store_api::metric_engine_consts::TABLE_COLUMN_METADATA_EXTENSION_KEY;
use store_api::storage::RegionId;
use strum::AsRefStr;
use table::metadata::{RawTableInfo, TableId, TableInfo};
use table::table_reference::TableReference;
use crate::ddl::alter_table::executor::AlterTableExecutor;
use crate::cache_invalidator::Context;
use crate::ddl::physical_table_metadata::update_table_info_column_ids;
use crate::ddl::utils::{
extract_column_metadatas, handle_multiple_results, map_to_procedure_error,
sync_follower_regions, MultipleResults,
add_peer_context_if_needed, extract_column_metadatas, handle_multiple_results,
map_to_procedure_error, sync_follower_regions, MultipleResults,
};
use crate::ddl::DdlContext;
use crate::error::{AbortProcedureSnafu, NoLeaderSnafu, PutPoisonSnafu, Result, RetryLaterSnafu};
use crate::instruction::CacheIdent;
use crate::key::table_info::TableInfoValue;
use crate::key::{DeserializedValueWithBytes, RegionDistribution};
use crate::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock};
use crate::metrics;
use crate::poison_key::table_poison_key;
use crate::rpc::ddl::AlterTableTask;
use crate::rpc::router::{find_leaders, region_distribution, RegionRoute};
use crate::rpc::router::{find_leader_regions, find_leaders, region_distribution, RegionRoute};
/// The alter table procedure
pub struct AlterTableProcedure {
@@ -62,24 +67,6 @@ pub struct AlterTableProcedure {
/// If we recover the procedure from json, then the table info value is not cached.
/// But we already validated it in the prepare step.
new_table_info: Option<TableInfo>,
/// The alter table executor.
executor: AlterTableExecutor,
}
/// Builds the executor from the [`AlterTableData`].
///
/// # Panics
/// - If the alter kind is not set.
fn build_executor_from_alter_expr(alter_data: &AlterTableData) -> AlterTableExecutor {
let table_name = alter_data.table_ref().into();
let table_id = alter_data.table_id;
let alter_kind = alter_data.task.alter_table.kind.as_ref().unwrap();
let new_table_name = if let Kind::RenameTable(RenameTable { new_table_name }) = alter_kind {
Some(new_table_name.to_string())
} else {
None
};
AlterTableExecutor::new(table_name, table_id, new_table_name)
}
impl AlterTableProcedure {
@@ -87,42 +74,33 @@ impl AlterTableProcedure {
pub fn new(table_id: TableId, task: AlterTableTask, context: DdlContext) -> Result<Self> {
task.validate()?;
let data = AlterTableData::new(task, table_id);
let executor = build_executor_from_alter_expr(&data);
Ok(Self {
context,
data,
data: AlterTableData::new(task, table_id),
new_table_info: None,
executor,
})
}
pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
let data: AlterTableData = serde_json::from_str(json).context(FromJsonSnafu)?;
let executor = build_executor_from_alter_expr(&data);
Ok(AlterTableProcedure {
context,
data,
new_table_info: None,
executor,
})
}
// Checks whether the table exists.
pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
self.executor
.on_prepare(&self.context.table_metadata_manager)
.await?;
self.check_alter().await?;
self.fill_table_info().await?;
// Safety: filled in `fill_table_info`.
// Validates the request and builds the new table info.
// We need to build the new table info here because we should ensure the alteration
// is valid in `UpdateMeta` state as we already altered the region.
// Safety: `fill_table_info()` already set it.
let table_info_value = self.data.table_info_value.as_ref().unwrap();
let new_table_info = AlterTableExecutor::validate_alter_table_expr(
&table_info_value.table_info,
self.data.task.alter_table.clone(),
)?;
self.new_table_info = Some(new_table_info);
self.new_table_info = Some(self.build_new_table_info(&table_info_value.table_info)?);
// Safety: Checked in `AlterTableProcedure::new`.
let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
@@ -165,7 +143,9 @@ impl AlterTableProcedure {
self.data.region_distribution =
Some(region_distribution(&physical_table_route.region_routes));
let leaders = find_leaders(&physical_table_route.region_routes);
let mut alter_region_tasks = Vec::with_capacity(leaders.len());
let alter_kind = self.make_region_alter_kind()?;
info!(
@@ -178,14 +158,31 @@ impl AlterTableProcedure {
ensure!(!leaders.is_empty(), NoLeaderSnafu { table_id });
// Puts the poison before submitting alter region requests to datanodes.
self.put_poison(ctx_provider, procedure_id).await?;
let results = self
.executor
.on_alter_regions(
&self.context.node_manager,
&physical_table_route.region_routes,
alter_kind,
)
.await;
for datanode in leaders {
let requester = self.context.node_manager.datanode(&datanode).await;
let regions = find_leader_regions(&physical_table_route.region_routes, &datanode);
for region in regions {
let region_id = RegionId::new(table_id, region);
let request = self.make_alter_region_request(region_id, alter_kind.clone())?;
debug!("Submitting {request:?} to {datanode}");
let datanode = datanode.clone();
let requester = requester.clone();
alter_region_tasks.push(async move {
requester
.handle(request)
.await
.map_err(add_peer_context_if_needed(datanode))
});
}
}
let results = future::join_all(alter_region_tasks)
.await
.into_iter()
.collect::<Vec<_>>();
match handle_multiple_results(results) {
MultipleResults::PartialRetryable(error) => {
@@ -263,34 +260,43 @@ impl AlterTableProcedure {
pub(crate) async fn on_update_metadata(&mut self) -> Result<Status> {
let table_id = self.data.table_id();
let table_ref = self.data.table_ref();
// Safety: filled in `fill_table_info`.
// Safety: checked before.
let table_info_value = self.data.table_info_value.as_ref().unwrap();
// Safety: Checked in `AlterTableProcedure::new`.
let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
// Gets the table info from the cache or builds it.
let new_info = match &self.new_table_info {
let new_info = match &self.new_table_info {
Some(cached) => cached.clone(),
None => AlterTableExecutor::validate_alter_table_expr(
&table_info_value.table_info,
self.data.task.alter_table.clone(),
)
None => self.build_new_table_info(&table_info_value.table_info)
.inspect_err(|e| {
// We already check the table info in the prepare step so this should not happen.
error!(e; "Unable to build info for table {} in update metadata step, table_id: {}", table_ref, table_id);
})?,
};
// Safety: region distribution is set in `submit_alter_region_requests`.
self.executor
.on_alter_metadata(
&self.context.table_metadata_manager,
debug!(
"Starting update table: {} metadata, new table info {:?}",
table_ref.to_string(),
new_info
);
// Safety: Checked in `AlterTableProcedure::new`.
let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
if let Kind::RenameTable(RenameTable { new_table_name }) = alter_kind {
self.on_update_metadata_for_rename(new_table_name.to_string(), table_info_value)
.await?;
} else {
let mut raw_table_info = new_info.into();
if !self.data.column_metadatas.is_empty() {
update_table_info_column_ids(&mut raw_table_info, &self.data.column_metadatas);
}
// region distribution is set in submit_alter_region_requests
let region_distribution = self.data.region_distribution.as_ref().unwrap().clone();
self.on_update_metadata_for_alter(
raw_table_info,
region_distribution,
table_info_value,
self.data.region_distribution.as_ref(),
new_info.into(),
&self.data.column_metadatas,
)
.await?;
}
info!("Updated table metadata for table {table_ref}, table_id: {table_id}, kind: {alter_kind:?}");
self.data.state = AlterTableState::InvalidateTableCache;
@@ -299,9 +305,18 @@ impl AlterTableProcedure {
/// Broadcasts the invalidating table cache instructions.
async fn on_broadcast(&mut self) -> Result<Status> {
self.executor
.invalidate_table_cache(&self.context.cache_invalidator)
let cache_invalidator = &self.context.cache_invalidator;
cache_invalidator
.invalidate(
&Context::default(),
&[
CacheIdent::TableId(self.data.table_id()),
CacheIdent::TableName(self.data.table_ref().into()),
],
)
.await?;
Ok(Status::done())
}

View File

@@ -0,0 +1,62 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::alter_table_expr::Kind;
use api::v1::RenameTable;
use common_catalog::format_full_table_name;
use snafu::ensure;
use crate::ddl::alter_table::AlterTableProcedure;
use crate::error::{self, Result};
use crate::key::table_name::TableNameKey;
impl AlterTableProcedure {
/// Checks:
/// - The new table name doesn't exist (rename).
/// - Table exists.
pub(crate) async fn check_alter(&self) -> Result<()> {
let alter_expr = &self.data.task.alter_table;
let catalog = &alter_expr.catalog_name;
let schema = &alter_expr.schema_name;
let table_name = &alter_expr.table_name;
// Safety: Checked in `AlterTableProcedure::new`.
let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
let manager = &self.context.table_metadata_manager;
if let Kind::RenameTable(RenameTable { new_table_name }) = alter_kind {
let new_table_name_key = TableNameKey::new(catalog, schema, new_table_name);
let exists = manager
.table_name_manager()
.exists(new_table_name_key)
.await?;
ensure!(
!exists,
error::TableAlreadyExistsSnafu {
table_name: format_full_table_name(catalog, schema, new_table_name),
}
)
}
let table_name_key = TableNameKey::new(catalog, schema, table_name);
let exists = manager.table_name_manager().exists(table_name_key).await?;
ensure!(
exists,
error::TableNotFoundSnafu {
table_name: format_full_table_name(catalog, schema, &alter_expr.table_name),
}
);
Ok(())
}
}

View File

@@ -1,308 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use api::region::RegionResponse;
use api::v1::region::region_request::Body;
use api::v1::region::{alter_request, AlterRequest, RegionRequest, RegionRequestHeader};
use api::v1::AlterTableExpr;
use common_catalog::format_full_table_name;
use common_grpc_expr::alter_expr_to_request;
use common_telemetry::debug;
use common_telemetry::tracing_context::TracingContext;
use futures::future;
use snafu::{ensure, ResultExt};
use store_api::metadata::ColumnMetadata;
use store_api::storage::{RegionId, TableId};
use table::metadata::{RawTableInfo, TableInfo};
use table::requests::AlterKind;
use table::table_name::TableName;
use crate::cache_invalidator::{CacheInvalidatorRef, Context};
use crate::ddl::utils::{add_peer_context_if_needed, raw_table_info};
use crate::error::{self, Result, UnexpectedSnafu};
use crate::instruction::CacheIdent;
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
use crate::key::{DeserializedValueWithBytes, RegionDistribution, TableMetadataManagerRef};
use crate::node_manager::NodeManagerRef;
use crate::rpc::router::{find_leaders, region_distribution, RegionRoute};
/// [AlterTableExecutor] performs:
/// - Alters the metadata of the table.
/// - Alters regions on the datanode nodes.
pub struct AlterTableExecutor {
table: TableName,
table_id: TableId,
/// The new table name if the alter kind is rename table.
new_table_name: Option<String>,
}
impl AlterTableExecutor {
/// Creates a new [`AlterTableExecutor`].
pub fn new(table: TableName, table_id: TableId, new_table_name: Option<String>) -> Self {
Self {
table,
table_id,
new_table_name,
}
}
/// Prepares to alter the table.
///
/// ## Checks:
/// - The new table name doesn't exist (rename).
/// - Table exists.
pub(crate) async fn on_prepare(
&self,
table_metadata_manager: &TableMetadataManagerRef,
) -> Result<()> {
let catalog = &self.table.catalog_name;
let schema = &self.table.schema_name;
let table_name = &self.table.table_name;
let manager = table_metadata_manager;
if let Some(new_table_name) = &self.new_table_name {
let new_table_name_key = TableNameKey::new(catalog, schema, new_table_name);
let exists = manager
.table_name_manager()
.exists(new_table_name_key)
.await?;
ensure!(
!exists,
error::TableAlreadyExistsSnafu {
table_name: format_full_table_name(catalog, schema, new_table_name),
}
)
}
let table_name_key = TableNameKey::new(catalog, schema, table_name);
let exists = manager.table_name_manager().exists(table_name_key).await?;
ensure!(
exists,
error::TableNotFoundSnafu {
table_name: format_full_table_name(catalog, schema, table_name),
}
);
Ok(())
}
/// Validates the alter table expression and builds the new table info.
///
/// This validation is performed early to ensure the alteration is valid before
/// proceeding to the `on_alter_metadata` state, where regions have already been altered.
/// Building the new table info here allows us to catch any issues with the
/// alteration before committing metadata changes.
pub(crate) fn validate_alter_table_expr(
table_info: &RawTableInfo,
alter_table_expr: AlterTableExpr,
) -> Result<TableInfo> {
build_new_table_info(table_info, alter_table_expr)
}
/// Updates table metadata for alter table operation.
pub(crate) async fn on_alter_metadata(
&self,
table_metadata_manager: &TableMetadataManagerRef,
current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
region_distribution: Option<&RegionDistribution>,
mut raw_table_info: RawTableInfo,
column_metadatas: &[ColumnMetadata],
) -> Result<()> {
let table_ref = self.table.table_ref();
let table_id = self.table_id;
if let Some(new_table_name) = &self.new_table_name {
debug!(
"Starting update table: {} metadata, table_id: {}, new table info: {:?}, new table name: {}",
table_ref, table_id, raw_table_info, new_table_name
);
table_metadata_manager
.rename_table(current_table_info_value, new_table_name.to_string())
.await?;
} else {
debug!(
"Starting update table: {} metadata, table_id: {}, new table info: {:?}",
table_ref, table_id, raw_table_info
);
ensure!(
region_distribution.is_some(),
UnexpectedSnafu {
err_msg: "region distribution is not set when updating table metadata",
}
);
if !column_metadatas.is_empty() {
raw_table_info::update_table_info_column_ids(&mut raw_table_info, column_metadatas);
}
table_metadata_manager
.update_table_info(
current_table_info_value,
region_distribution.cloned(),
raw_table_info,
)
.await?;
}
Ok(())
}
/// Alters regions on the datanode nodes.
pub(crate) async fn on_alter_regions(
&self,
node_manager: &NodeManagerRef,
region_routes: &[RegionRoute],
kind: Option<alter_request::Kind>,
) -> Vec<Result<RegionResponse>> {
let region_distribution = region_distribution(region_routes);
let leaders = find_leaders(region_routes)
.into_iter()
.map(|p| (p.id, p))
.collect::<HashMap<_, _>>();
let total_num_region = region_distribution
.values()
.map(|r| r.leader_regions.len())
.sum::<usize>();
let mut alter_region_tasks = Vec::with_capacity(total_num_region);
for (datanode_id, region_role_set) in region_distribution {
if region_role_set.leader_regions.is_empty() {
continue;
}
// Safety: must exists.
let peer = leaders.get(&datanode_id).unwrap();
let requester = node_manager.datanode(peer).await;
for region_id in region_role_set.leader_regions {
let region_id = RegionId::new(self.table_id, region_id);
let request = make_alter_region_request(region_id, kind.clone());
let requester = requester.clone();
let peer = peer.clone();
alter_region_tasks.push(async move {
requester
.handle(request)
.await
.map_err(add_peer_context_if_needed(peer))
});
}
}
future::join_all(alter_region_tasks)
.await
.into_iter()
.collect::<Vec<_>>()
}
/// Invalidates cache for the table.
pub(crate) async fn invalidate_table_cache(
&self,
cache_invalidator: &CacheInvalidatorRef,
) -> Result<()> {
let ctx = Context {
subject: Some(format!(
"Invalidate table cache by altering table {}, table_id: {}",
self.table.table_ref(),
self.table_id,
)),
};
cache_invalidator
.invalidate(
&ctx,
&[
CacheIdent::TableName(self.table.clone()),
CacheIdent::TableId(self.table_id),
],
)
.await?;
Ok(())
}
}
/// Makes alter region request.
pub(crate) fn make_alter_region_request(
region_id: RegionId,
kind: Option<alter_request::Kind>,
) -> RegionRequest {
RegionRequest {
header: Some(RegionRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
..Default::default()
}),
body: Some(Body::Alter(AlterRequest {
region_id: region_id.as_u64(),
kind,
..Default::default()
})),
}
}
/// Builds new table info after alteration.
///
/// This function creates a new table info by applying the alter table expression
/// to the existing table info. For add column operations, it increments the
/// `next_column_id` by the number of columns being added, which may result in gaps
/// in the column id sequence.
fn build_new_table_info(
table_info: &RawTableInfo,
alter_table_expr: AlterTableExpr,
) -> Result<TableInfo> {
let table_info =
TableInfo::try_from(table_info.clone()).context(error::ConvertRawTableInfoSnafu)?;
let schema_name = &table_info.schema_name;
let catalog_name = &table_info.catalog_name;
let table_name = &table_info.name;
let table_id = table_info.ident.table_id;
let request = alter_expr_to_request(table_id, alter_table_expr)
.context(error::ConvertAlterTableRequestSnafu)?;
let new_meta = table_info
.meta
.builder_with_alter_kind(table_name, &request.alter_kind)
.context(error::TableSnafu)?
.build()
.with_context(|_| error::BuildTableMetaSnafu {
table_name: format_full_table_name(catalog_name, schema_name, table_name),
})?;
let mut new_info = table_info.clone();
new_info.meta = new_meta;
new_info.ident.version = table_info.ident.version + 1;
match request.alter_kind {
AlterKind::AddColumns { columns } => {
// Bumps the column id for the new columns.
// It may bump more than the actual number of columns added if there are
// existing columns, but it's fine.
new_info.meta.next_column_id += columns.len() as u32;
}
AlterKind::RenameTable { new_table_name } => {
new_info.name = new_table_name.to_string();
}
AlterKind::DropColumns { .. }
| AlterKind::ModifyColumnTypes { .. }
| AlterKind::SetTableOptions { .. }
| AlterKind::UnsetTableOptions { .. }
| AlterKind::SetIndex { .. }
| AlterKind::UnsetIndex { .. }
| AlterKind::DropDefaults { .. } => {}
}
Ok(new_info)
}

View File

@@ -15,16 +15,43 @@
use std::collections::HashSet;
use api::v1::alter_table_expr::Kind;
use api::v1::region::region_request::Body;
use api::v1::region::{
alter_request, AddColumn, AddColumns, DropColumn, DropColumns, RegionColumnDef,
alter_request, AddColumn, AddColumns, AlterRequest, DropColumn, DropColumns, RegionColumnDef,
RegionRequest, RegionRequestHeader,
};
use common_telemetry::tracing_context::TracingContext;
use snafu::OptionExt;
use store_api::storage::RegionId;
use table::metadata::RawTableInfo;
use crate::ddl::alter_table::AlterTableProcedure;
use crate::error::{InvalidProtoMsgSnafu, Result};
impl AlterTableProcedure {
/// Makes alter region request from existing an alter kind.
/// Region alter request always add columns if not exist.
pub(crate) fn make_alter_region_request(
&self,
region_id: RegionId,
kind: Option<alter_request::Kind>,
) -> Result<RegionRequest> {
// Safety: checked
let table_info = self.data.table_info().unwrap();
Ok(RegionRequest {
header: Some(RegionRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
..Default::default()
}),
body: Some(Body::Alter(AlterRequest {
region_id: region_id.as_u64(),
schema_version: table_info.ident.version,
kind,
})),
})
}
/// Makes alter kind proto that all regions can reuse.
/// Region alter request always add columns if not exist.
pub(crate) fn make_region_alter_kind(&self) -> Result<Option<alter_request::Kind>> {
@@ -128,7 +155,6 @@ mod tests {
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use store_api::storage::{RegionId, TableId};
use crate::ddl::alter_table::executor::make_alter_region_request;
use crate::ddl::alter_table::AlterTableProcedure;
use crate::ddl::test_util::columns::TestColumnDefBuilder;
use crate::ddl::test_util::create_table::{
@@ -235,13 +261,15 @@ mod tests {
let mut procedure = AlterTableProcedure::new(table_id, task, ddl_context).unwrap();
procedure.on_prepare().await.unwrap();
let alter_kind = procedure.make_region_alter_kind().unwrap();
let Some(Body::Alter(alter_region_request)) =
make_alter_region_request(region_id, alter_kind).body
let Some(Body::Alter(alter_region_request)) = procedure
.make_alter_region_request(region_id, alter_kind)
.unwrap()
.body
else {
unreachable!()
};
assert_eq!(alter_region_request.region_id, region_id.as_u64());
assert_eq!(alter_region_request.schema_version, 0);
assert_eq!(alter_region_request.schema_version, 1);
assert_eq!(
alter_region_request.kind,
Some(region::alter_request::Kind::AddColumns(
@@ -291,13 +319,15 @@ mod tests {
let mut procedure = AlterTableProcedure::new(table_id, task, ddl_context).unwrap();
procedure.on_prepare().await.unwrap();
let alter_kind = procedure.make_region_alter_kind().unwrap();
let Some(Body::Alter(alter_region_request)) =
make_alter_region_request(region_id, alter_kind).body
let Some(Body::Alter(alter_region_request)) = procedure
.make_alter_region_request(region_id, alter_kind)
.unwrap()
.body
else {
unreachable!()
};
assert_eq!(alter_region_request.region_id, region_id.as_u64());
assert_eq!(alter_region_request.schema_version, 0);
assert_eq!(alter_region_request.schema_version, 1);
assert_eq!(
alter_region_request.kind,
Some(region::alter_request::Kind::ModifyColumnTypes(

View File

@@ -0,0 +1,103 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_grpc_expr::alter_expr_to_request;
use snafu::ResultExt;
use table::metadata::{RawTableInfo, TableInfo};
use table::requests::AlterKind;
use crate::ddl::alter_table::AlterTableProcedure;
use crate::error::{self, Result};
use crate::key::table_info::TableInfoValue;
use crate::key::{DeserializedValueWithBytes, RegionDistribution};
impl AlterTableProcedure {
/// Builds new table info after alteration.
/// It bumps the column id of the table by the number of the add column requests.
/// So there may be holes in the column id sequence.
pub(crate) fn build_new_table_info(&self, table_info: &RawTableInfo) -> Result<TableInfo> {
let table_info =
TableInfo::try_from(table_info.clone()).context(error::ConvertRawTableInfoSnafu)?;
let table_ref = self.data.table_ref();
let alter_expr = self.data.task.alter_table.clone();
let request = alter_expr_to_request(self.data.table_id(), alter_expr)
.context(error::ConvertAlterTableRequestSnafu)?;
let new_meta = table_info
.meta
.builder_with_alter_kind(table_ref.table, &request.alter_kind)
.context(error::TableSnafu)?
.build()
.with_context(|_| error::BuildTableMetaSnafu {
table_name: table_ref.table,
})?;
let mut new_info = table_info.clone();
new_info.meta = new_meta;
new_info.ident.version = table_info.ident.version + 1;
match request.alter_kind {
AlterKind::AddColumns { columns } => {
// Bumps the column id for the new columns.
// It may bump more than the actual number of columns added if there are
// existing columns, but it's fine.
new_info.meta.next_column_id += columns.len() as u32;
}
AlterKind::RenameTable { new_table_name } => {
new_info.name = new_table_name.to_string();
}
AlterKind::DropColumns { .. }
| AlterKind::ModifyColumnTypes { .. }
| AlterKind::SetTableOptions { .. }
| AlterKind::UnsetTableOptions { .. }
| AlterKind::SetIndex { .. }
| AlterKind::UnsetIndex { .. }
| AlterKind::DropDefaults { .. } => {}
}
Ok(new_info)
}
/// Updates table metadata for rename table operation.
pub(crate) async fn on_update_metadata_for_rename(
&self,
new_table_name: String,
current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
) -> Result<()> {
let table_metadata_manager = &self.context.table_metadata_manager;
table_metadata_manager
.rename_table(current_table_info_value, new_table_name)
.await?;
Ok(())
}
/// Updates table metadata for alter table operation.
pub(crate) async fn on_update_metadata_for_alter(
&self,
new_table_info: RawTableInfo,
region_distribution: RegionDistribution,
current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
) -> Result<()> {
let table_metadata_manager = &self.context.table_metadata_manager;
table_metadata_manager
.update_table_info(
current_table_info_value,
Some(region_distribution),
new_table_info,
)
.await?;
Ok(())
}
}

View File

@@ -22,7 +22,7 @@ use table::table_name::TableName;
use crate::cache_invalidator::Context;
use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
use crate::ddl::utils::raw_table_info;
use crate::ddl::physical_table_metadata;
use crate::error::{Result, TableInfoNotFoundSnafu};
use crate::instruction::CacheIdent;
@@ -47,7 +47,7 @@ impl CreateLogicalTablesProcedure {
// Generates new table info
let raw_table_info = physical_table_info.deref().table_info.clone();
let new_table_info = raw_table_info::build_new_physical_table_info(
let new_table_info = physical_table_metadata::build_new_physical_table_info(
raw_table_info,
&self.data.physical_columns,
);

View File

@@ -35,7 +35,7 @@ use table::metadata::{RawTableInfo, TableId};
use table::table_reference::TableReference;
use crate::ddl::create_table_template::{build_template, CreateRequestBuilder};
use crate::ddl::utils::raw_table_info::update_table_info_column_ids;
use crate::ddl::physical_table_metadata::update_table_info_column_ids;
use crate::ddl::utils::{
add_peer_context_if_needed, convert_region_routes_to_detecting_regions,
extract_column_metadatas, map_to_procedure_error, region_storage_path,

View File

@@ -185,15 +185,11 @@ impl DropTableExecutor {
.await
}
/// Invalidates caches for the table.
/// Invalidates frontend caches
pub async fn invalidate_table_cache(&self, ctx: &DdlContext) -> Result<()> {
let cache_invalidator = &ctx.cache_invalidator;
let ctx = Context {
subject: Some(format!(
"Invalidate table cache by dropping table {}, table_id: {}",
self.table.table_ref(),
self.table_id,
)),
subject: Some("Invalidate table cache by dropping table".to_string()),
};
cache_invalidator

View File

@@ -12,10 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub(crate) mod raw_table_info;
pub(crate) mod table_id;
pub(crate) mod table_info;
use std::collections::HashMap;
use std::fmt::Debug;

View File

@@ -1,46 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use snafu::OptionExt;
use store_api::storage::TableId;
use table::table_reference::TableReference;
use crate::error::{Result, TableNotFoundSnafu};
use crate::key::table_name::{TableNameKey, TableNameManager};
/// Get all the table ids from the table names.
///
/// Returns an error if any table does not exist.
pub(crate) async fn get_all_table_ids_by_names<'a>(
table_name_manager: &TableNameManager,
table_names: &[TableReference<'a>],
) -> Result<Vec<TableId>> {
let table_name_keys = table_names
.iter()
.map(TableNameKey::from)
.collect::<Vec<_>>();
let table_name_values = table_name_manager.batch_get(table_name_keys).await?;
let mut table_ids = Vec::with_capacity(table_name_values.len());
for (value, table_name) in table_name_values.into_iter().zip(table_names) {
let value = value
.with_context(|| TableNotFoundSnafu {
table_name: table_name.to_string(),
})?
.table_id();
table_ids.push(value);
}
Ok(table_ids)
}

View File

@@ -1,44 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use snafu::OptionExt;
use store_api::storage::TableId;
use table::table_reference::TableReference;
use crate::error::{Result, TableInfoNotFoundSnafu};
use crate::key::table_info::{TableInfoManager, TableInfoValue};
use crate::key::DeserializedValueWithBytes;
/// Get all table info values by table ids.
///
/// Returns an error if any table does not exist.
pub(crate) async fn get_all_table_info_values_by_table_ids<'a>(
table_info_manager: &TableInfoManager,
table_ids: &[TableId],
table_names: &[TableReference<'a>],
) -> Result<Vec<DeserializedValueWithBytes<TableInfoValue>>> {
let mut table_info_map = table_info_manager.batch_get_raw(table_ids).await?;
let mut table_info_values = Vec::with_capacity(table_ids.len());
for (table_id, table_name) in table_ids.iter().zip(table_names) {
let table_info_value =
table_info_map
.remove(table_id)
.with_context(|| TableInfoNotFoundSnafu {
table: table_name.to_string(),
})?;
table_info_values.push(table_info_value);
}
Ok(table_info_values)
}

View File

@@ -103,26 +103,6 @@ pub fn table_decoder(kv: KeyValue) -> Result<(String, TableNameValue)> {
Ok((table_name_key.table.to_string(), table_name_value))
}
impl<'a> From<&TableReference<'a>> for TableNameKey<'a> {
fn from(value: &TableReference<'a>) -> Self {
Self {
catalog: value.catalog,
schema: value.schema,
table: value.table,
}
}
}
impl<'a> From<TableReference<'a>> for TableNameKey<'a> {
fn from(value: TableReference<'a>) -> Self {
Self {
catalog: value.catalog,
schema: value.schema,
table: value.table,
}
}
}
impl<'a> From<&'a TableName> for TableNameKey<'a> {
fn from(value: &'a TableName) -> Self {
Self {

View File

@@ -14,7 +14,6 @@ workspace = true
[dependencies]
backtrace = "0.3"
common-error.workspace = true
common-version.workspace = true
console-subscriber = { version = "0.1", optional = true }
greptime-proto.workspace = true
humantime-serde.workspace = true

View File

@@ -403,7 +403,7 @@ pub fn init_global_logging(
resource::SERVICE_INSTANCE_ID,
node_id.unwrap_or("none".to_string()),
),
KeyValue::new(resource::SERVICE_VERSION, common_version::version()),
KeyValue::new(resource::SERVICE_VERSION, env!("CARGO_PKG_VERSION")),
KeyValue::new(resource::PROCESS_PID, std::process::id().to_string()),
]));

View File

@@ -13,7 +13,6 @@
// limitations under the License.
use std::fmt::{Debug, Display, Formatter};
use std::ops::{Bound, RangeBounds};
use serde::{Deserialize, Serialize};
@@ -304,41 +303,6 @@ impl TimestampRange {
}
}
/// Create [TimestampRange] from a timestamp tuple.
/// The tuple's two elements form the "start" and "end" (both inclusive) of the timestamp range.
impl From<(Timestamp, Timestamp)> for TimestampRange {
fn from((start, end): (Timestamp, Timestamp)) -> Self {
if start > end {
Self::empty()
} else {
Self::new_inclusive(Some(start), Some(end))
}
}
}
/// Create [TimestampRange] from Rust's "range".
impl<R: RangeBounds<Timestamp>> From<R> for TimestampRange {
fn from(r: R) -> Self {
let start = match r.start_bound() {
Bound::Included(x) => Some(*x),
Bound::Excluded(x) => x
.value()
.checked_sub(1)
.map(|v| Timestamp::new(v, x.unit())),
Bound::Unbounded => None,
};
let end = match r.end_bound() {
Bound::Included(x) => x
.value()
.checked_add(1)
.map(|v| Timestamp::new(v, x.unit())),
Bound::Excluded(x) => Some(*x),
Bound::Unbounded => None,
};
Self::from_optional(start, end)
}
}
/// Time range in milliseconds.
pub type RangeMillis = GenericRange<TimestampMillis>;
@@ -581,75 +545,4 @@ mod tests {
TimeUnit::Nanosecond
);
}
#[test]
fn test_from_timestamp_tuple() {
let timestamp_range: TimestampRange =
(Timestamp::new_millisecond(1), Timestamp::new_millisecond(3)).into();
assert_eq!(
timestamp_range,
TimestampRange::from_optional(
Some(Timestamp::new_millisecond(1)),
Some(Timestamp::new_millisecond(4))
)
);
let timestamp_range: TimestampRange =
(Timestamp::new_millisecond(1), Timestamp::new_millisecond(1)).into();
assert_eq!(
timestamp_range,
TimestampRange::from_optional(
Some(Timestamp::new_millisecond(1)),
Some(Timestamp::new_millisecond(2))
)
);
let timestamp_range: TimestampRange =
(Timestamp::new_second(1), Timestamp::new_millisecond(3)).into();
assert_eq!(timestamp_range, TimestampRange::empty());
}
#[test]
fn test_from_timestamp_range() {
let timestamp_range: TimestampRange =
(Timestamp::new_millisecond(1)..Timestamp::new_millisecond(3)).into();
assert_eq!(
timestamp_range,
TimestampRange::from_optional(
Some(Timestamp::new_millisecond(1)),
Some(Timestamp::new_millisecond(3))
)
);
let timestamp_range: TimestampRange =
(Timestamp::new_millisecond(1)..=Timestamp::new_millisecond(3)).into();
assert_eq!(
timestamp_range,
TimestampRange::from_optional(
Some(Timestamp::new_millisecond(1)),
Some(Timestamp::new_millisecond(4))
)
);
let timestamp_range: TimestampRange = (Timestamp::new_millisecond(1)..).into();
assert_eq!(
timestamp_range,
TimestampRange::from_optional(Some(Timestamp::new_millisecond(1)), None)
);
let timestamp_range: TimestampRange = (..Timestamp::new_millisecond(3)).into();
assert_eq!(
timestamp_range,
TimestampRange::from_optional(None, Some(Timestamp::new_millisecond(3)))
);
let timestamp_range: TimestampRange = (..=Timestamp::new_millisecond(3)).into();
assert_eq!(
timestamp_range,
TimestampRange::from_optional(None, Some(Timestamp::new_millisecond(4)))
);
let timestamp_range: TimestampRange = (..).into();
assert_eq!(timestamp_range, TimestampRange::min_to_max(),);
}
}

View File

@@ -17,5 +17,4 @@ shadow-rs.workspace = true
[build-dependencies]
build-data = "0.2"
cargo-manifest = "0.19"
shadow-rs.workspace = true

View File

@@ -14,10 +14,8 @@
use std::collections::BTreeSet;
use std::env;
use std::path::PathBuf;
use build_data::{format_timestamp, get_source_time};
use cargo_manifest::Manifest;
use shadow_rs::{BuildPattern, ShadowBuilder, CARGO_METADATA, CARGO_TREE};
fn main() -> shadow_rs::SdResult<()> {
@@ -35,24 +33,6 @@ fn main() -> shadow_rs::SdResult<()> {
// solve the problem where the "CARGO_MANIFEST_DIR" is not what we want when this repo is
// made as a submodule in another repo.
let src_path = env::var("CARGO_WORKSPACE_DIR").or_else(|_| env::var("CARGO_MANIFEST_DIR"))?;
let manifest = Manifest::from_path(PathBuf::from(&src_path).join("Cargo.toml"))
.expect("Failed to parse Cargo.toml");
if let Some(product_version) = manifest.workspace.as_ref().and_then(|w| {
w.metadata.as_ref().and_then(|m| {
m.get("greptime")
.and_then(|g| g.get("product_version").and_then(|v| v.as_str()))
})
}) {
println!(
"cargo:rustc-env=GREPTIME_PRODUCT_VERSION={}",
product_version
);
} else {
let version = env::var("CARGO_PKG_VERSION").unwrap();
println!("cargo:rustc-env=GREPTIME_PRODUCT_VERSION={}", version,);
}
let out_path = env::var("OUT_DIR")?;
let _ = ShadowBuilder::builder()

View File

@@ -105,17 +105,13 @@ pub const fn build_info() -> BuildInfo {
build_time: env!("BUILD_TIMESTAMP"),
rustc: build::RUST_VERSION,
target: build::BUILD_TARGET,
version: env!("GREPTIME_PRODUCT_VERSION"),
version: build::PKG_VERSION,
}
}
const BUILD_INFO: BuildInfo = build_info();
pub const fn version() -> &'static str {
BUILD_INFO.version
}
pub const fn verbose_version() -> &'static str {
const_format::formatcp!(
"\nbranch: {}\ncommit: {}\nclean: {}\nversion: {}",
BUILD_INFO.branch,

View File

@@ -402,9 +402,6 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Not yet implemented: {what}"))]
NotYetImplemented { what: String },
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -459,7 +456,7 @@ impl ErrorExt for Error {
OpenLogStore { source, .. } => source.status_code(),
MetaClientInit { source, .. } => source.status_code(),
UnsupportedOutput { .. } | NotYetImplemented { .. } => StatusCode::Unsupported,
UnsupportedOutput { .. } => StatusCode::Unsupported,
HandleRegionRequest { source, .. }
| GetRegionMetadata { source, .. }
| HandleBatchOpenRequest { source, .. }

View File

@@ -40,7 +40,7 @@ use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
use table::TableRef;
use tokio::sync::mpsc::{Receiver, Sender};
use crate::error::{Error, NotYetImplementedSnafu};
use crate::error::Error;
use crate::event_listener::NoopRegionServerEventListener;
use crate::region_server::RegionServer;
@@ -232,9 +232,7 @@ impl RegionEngine for MockRegionEngine {
_region_id: RegionId,
_request: ScanRequest,
) -> Result<RegionScannerRef, BoxedError> {
Err(BoxedError::new(
NotYetImplementedSnafu { what: "blah" }.build(),
))
unimplemented!()
}
async fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef, BoxedError> {

View File

@@ -14,7 +14,7 @@
//! Batching mode engine
use std::collections::{BTreeMap, HashMap, HashSet};
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use api::v1::flow::{DirtyWindowRequests, FlowResponse};
@@ -142,7 +142,7 @@ impl BatchingEngine {
let handle: JoinHandle<Result<(), Error>> = tokio::spawn(async move {
let src_table_names = &task.config.source_table_names;
let mut all_dirty_windows = HashSet::new();
let mut all_dirty_windows = vec![];
for src_table_name in src_table_names {
if let Some((timestamps, unit)) = group_by_table_name.get(src_table_name) {
let Some(expr) = &task.config.time_window_expr else {
@@ -155,7 +155,7 @@ impl BatchingEngine {
.context(UnexpectedSnafu {
reason: "Failed to eval start value",
})?;
all_dirty_windows.insert(align_start);
all_dirty_windows.push(align_start);
}
}
}

View File

@@ -50,8 +50,7 @@ use snafu::{ensure, OptionExt, ResultExt};
use crate::adapter::util::from_proto_to_data_type;
use crate::error::{
ArrowSnafu, DatafusionSnafu, DatatypesSnafu, ExternalSnafu, PlanSnafu, TimeSnafu,
UnexpectedSnafu,
ArrowSnafu, DatafusionSnafu, DatatypesSnafu, ExternalSnafu, PlanSnafu, UnexpectedSnafu,
};
use crate::expr::error::DataTypeSnafu;
use crate::Error;
@@ -75,7 +74,6 @@ pub struct TimeWindowExpr {
logical_expr: Expr,
df_schema: DFSchema,
eval_time_window_size: Option<std::time::Duration>,
eval_time_original: Option<Timestamp>,
}
impl std::fmt::Display for TimeWindowExpr {
@@ -108,11 +106,10 @@ impl TimeWindowExpr {
logical_expr: expr.clone(),
df_schema: df_schema.clone(),
eval_time_window_size: None,
eval_time_original: None,
};
let test_ts = DEFAULT_TEST_TIMESTAMP;
let (lower, upper) = zelf.eval(test_ts)?;
let time_window_size = match (lower, upper) {
let (l, u) = zelf.eval(test_ts)?;
let time_window_size = match (l, u) {
(Some(l), Some(u)) => u.sub(&l).map(|r| r.to_std()).transpose().map_err(|_| {
UnexpectedSnafu {
reason: format!(
@@ -124,59 +121,13 @@ impl TimeWindowExpr {
_ => None,
};
zelf.eval_time_window_size = time_window_size;
zelf.eval_time_original = lower;
Ok(zelf)
}
/// TODO(discord9): add `eval_batch` too
pub fn eval(
&self,
current: Timestamp,
) -> Result<(Option<Timestamp>, Option<Timestamp>), Error> {
fn compute_distance(time_diff_ns: i64, stride_ns: i64) -> i64 {
if stride_ns == 0 {
return time_diff_ns;
}
// a - (a % n) impl ceil to nearest n * stride
let time_delta = time_diff_ns - (time_diff_ns % stride_ns);
if time_diff_ns < 0 && time_delta != time_diff_ns {
// The origin is later than the source timestamp, round down to the previous bin
time_delta - stride_ns
} else {
time_delta
}
}
// FAST PATH: if we have eval_time_original and eval_time_window_size,
// we can compute the bounds directly
if let (Some(original), Some(window_size)) =
(self.eval_time_original, self.eval_time_window_size)
{
// date_bin align current to lower bound
let time_diff_ns = current.sub(&original).and_then(|s|s.num_nanoseconds()).with_context(||UnexpectedSnafu {
reason: format!(
"Failed to compute time difference between current {current:?} and original {original:?}"
),
})?;
let window_size_ns = window_size.as_nanos() as i64;
let distance_ns = compute_distance(time_diff_ns, window_size_ns);
let lower_bound = if distance_ns >= 0 {
original.add_duration(std::time::Duration::from_nanos(distance_ns as u64))
} else {
original.sub_duration(std::time::Duration::from_nanos((-distance_ns) as u64))
}
.context(TimeSnafu)?;
let upper_bound = lower_bound.add_duration(window_size).context(TimeSnafu)?;
return Ok((Some(lower_bound), Some(upper_bound)));
}
let lower_bound =
calc_expr_time_window_lower_bound(&self.phy_expr, &self.df_schema, current)?;
let upper_bound =

View File

@@ -383,13 +383,6 @@ impl SqlQueryHandler for Instance {
.and_then(|stmts| query_interceptor.post_parsing(stmts, query_ctx.clone()))
{
Ok(stmts) => {
if stmts.is_empty() {
return vec![InvalidSqlSnafu {
err_msg: "empty statements",
}
.fail()];
}
let mut results = Vec::with_capacity(stmts.len());
for stmt in stmts {
if let Err(e) = checker

View File

@@ -95,10 +95,11 @@ pub struct MetasrvInstance {
}
impl MetasrvInstance {
pub async fn new(metasrv: Metasrv) -> Result<MetasrvInstance> {
let opts = metasrv.options().clone();
let plugins = metasrv.plugins().clone();
pub async fn new(
opts: MetasrvOptions,
plugins: Plugins,
metasrv: Metasrv,
) -> Result<MetasrvInstance> {
let builder = HttpServerBuilder::new(opts.http.clone())
.with_metrics_handler(MetricsHandler)
.with_greptime_config_options(opts.to_toml().context(error::TomlFormatSnafu)?);

View File

@@ -1,4 +1,3 @@
use std::fmt::Display;
use std::sync::Arc;
use async_trait::async_trait;
@@ -20,7 +19,7 @@ pub type InclusiveTimeRange = (Timestamp, Timestamp);
/// memtable range and sst file range, but resides on the outside.
/// It can be scanned side by side as other ranges to produce the final result, so it's very useful
/// to extend the source of data in GreptimeDB.
pub trait ExtensionRange: Display + Send + Sync {
pub trait ExtensionRange: Send + Sync {
/// The number of rows in this range.
fn num_rows(&self) -> u64;

View File

@@ -98,12 +98,6 @@ impl MemtableStats {
self
}
#[cfg(feature = "test")]
pub fn with_max_sequence(mut self, max_sequence: SequenceNumber) -> Self {
self.max_sequence = max_sequence;
self
}
/// Returns the estimated bytes allocated by this memtable.
pub fn bytes_allocated(&self) -> usize {
self.estimated_bytes

View File

@@ -1040,15 +1040,13 @@ impl StreamContext {
/// Format the context for explain.
pub(crate) fn format_for_explain(&self, verbose: bool, f: &mut fmt::Formatter) -> fmt::Result {
let (mut num_mem_ranges, mut num_file_ranges, mut num_other_ranges) = (0, 0, 0);
let (mut num_mem_ranges, mut num_file_ranges) = (0, 0);
for range_meta in &self.ranges {
for idx in &range_meta.row_group_indices {
if self.is_mem_range_index(*idx) {
num_mem_ranges += 1;
} else if self.is_file_range_index(*idx) {
num_file_ranges += 1;
} else {
num_other_ranges += 1;
num_file_ranges += 1;
}
}
}
@@ -1057,17 +1055,12 @@ impl StreamContext {
}
write!(
f,
r#""partition_count":{{"count":{}, "mem_ranges":{}, "files":{}, "file_ranges":{}"#,
"\"partition_count\":{{\"count\":{}, \"mem_ranges\":{}, \"files\":{}, \"file_ranges\":{}}}",
self.ranges.len(),
num_mem_ranges,
self.input.num_files(),
num_file_ranges,
)?;
if num_other_ranges > 0 {
write!(f, r#"", other_ranges":{}"#, num_other_ranges)?;
}
write!(f, "}}")?;
if let Some(selector) = &self.input.series_row_selector {
write!(f, ", \"selector\":\"{}\"", selector)?;
}
@@ -1109,24 +1102,6 @@ impl StreamContext {
input: &'a ScanInput,
}
#[cfg(feature = "enterprise")]
impl InputWrapper<'_> {
fn format_extension_ranges(&self, f: &mut fmt::Formatter) -> fmt::Result {
if self.input.extension_ranges.is_empty() {
return Ok(());
}
let mut delimiter = "";
write!(f, ", extension_ranges: [")?;
for range in self.input.extension_ranges() {
write!(f, "{}{}", delimiter, range)?;
delimiter = ", ";
}
write!(f, "]")?;
Ok(())
}
}
impl fmt::Debug for InputWrapper<'_> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let output_schema = self.input.mapper.output_schema();
@@ -1152,9 +1127,6 @@ impl StreamContext {
.finish()?;
}
#[cfg(feature = "enterprise")]
self.format_extension_ranges(f)?;
Ok(())
}
}

View File

@@ -598,17 +598,16 @@ pub fn build_file_range_scan_stream(
pub(crate) async fn scan_extension_range(
context: Arc<StreamContext>,
index: RowGroupIndex,
partition_metrics: PartitionMetrics,
metrics: PartitionMetrics,
) -> Result<BoxedBatchStream> {
use snafu::ResultExt;
let range = context.input.extension_range(index.index);
let reader = range.reader(context.as_ref());
let stream = reader
.read(context, partition_metrics, index)
reader
.read(context, metrics, index)
.await
.context(crate::error::ScanExternalRangeSnafu)?;
Ok(stream)
.context(crate::error::ScanExternalRangeSnafu)
}
pub(crate) async fn maybe_scan_other_ranges(

View File

@@ -155,7 +155,23 @@ struct PlanRewriter {
/// Partition columns of the table in current pass
partition_cols: Option<Vec<String>>,
column_requirements: HashSet<Column>,
/// Whether to expand on next call
/// This is used to handle the case where a plan is transformed, but need to be expanded from it's
/// parent node. For example a Aggregate plan is split into two parts in frontend and datanode, and need
/// to be expanded from the parent node of the Aggregate plan.
expand_on_next_call: bool,
/// Expanding on next partial/conditional/transformed commutative plan
/// This is used to handle the case where a plan is transformed, but still
/// need to push down as many node as possible before next partial/conditional/transformed commutative
/// plan. I.e.
/// ```
/// Limit:
/// Sort:
/// ```
/// where `Limit` is partial commutative, and `Sort` is conditional commutative.
/// In this case, we need to expand the `Limit` plan,
/// so that we can push down the `Sort` plan as much as possible.
expand_on_next_part_cond_trans_commutative: bool,
new_child_plan: Option<LogicalPlan>,
}
@@ -177,15 +193,38 @@ impl PlanRewriter {
{
return true;
}
if self.expand_on_next_call {
self.expand_on_next_call = false;
return true;
}
if self.expand_on_next_part_cond_trans_commutative {
let comm = Categorizer::check_plan(plan, self.partition_cols.clone());
match comm {
Commutativity::PartialCommutative => {
// a small difference is that for partial commutative, we still need to
// expand on next call(so `Limit` can be pushed down)
self.expand_on_next_part_cond_trans_commutative = false;
self.expand_on_next_call = true;
}
Commutativity::ConditionalCommutative(_)
| Commutativity::TransformedCommutative { .. } => {
// for conditional commutative and transformed commutative, we can
// expand now
self.expand_on_next_part_cond_trans_commutative = false;
return true;
}
_ => (),
}
}
match Categorizer::check_plan(plan, self.partition_cols.clone()) {
Commutativity::Commutative => {}
Commutativity::PartialCommutative => {
if let Some(plan) = partial_commutative_transformer(plan) {
self.update_column_requirements(&plan);
self.expand_on_next_part_cond_trans_commutative = true;
self.stage.push(plan)
}
}
@@ -194,6 +233,7 @@ impl PlanRewriter {
&& let Some(plan) = transformer(plan)
{
self.update_column_requirements(&plan);
self.expand_on_next_part_cond_trans_commutative = true;
self.stage.push(plan)
}
}
@@ -202,7 +242,7 @@ impl PlanRewriter {
&& let Some(transformer_actions) = transformer(plan)
{
debug!(
"PlanRewriter: transformed plan: {:#?}\n from {plan}",
"PlanRewriter: transformed plan: {:?}\n from {plan}",
transformer_actions.extra_parent_plans
);
if let Some(last_stage) = transformer_actions.extra_parent_plans.last() {
@@ -226,6 +266,10 @@ impl PlanRewriter {
}
fn update_column_requirements(&mut self, plan: &LogicalPlan) {
debug!(
"PlanRewriter: update column requirements for plan: {plan}\n withcolumn_requirements: {:?}",
self.column_requirements
);
let mut container = HashSet::new();
for expr in plan.expressions() {
// this method won't fail
@@ -235,6 +279,10 @@ impl PlanRewriter {
for col in container {
self.column_requirements.insert(col);
}
debug!(
"PlanRewriter: updated column requirements: {:?}",
self.column_requirements
);
}
fn is_expanded(&self) -> bool {

View File

@@ -13,7 +13,7 @@
// limitations under the License.
//! prom supply the prometheus HTTP API Server compliance
use std::collections::{BTreeMap, HashMap, HashSet};
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use axum::extract::{Path, Query, State};
@@ -62,7 +62,7 @@ use crate::prometheus_handler::PrometheusHandlerRef;
/// For [ValueType::Vector] result type
#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
pub struct PromSeriesVector {
pub metric: BTreeMap<String, String>,
pub metric: HashMap<String, String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub value: Option<(f64, String)>,
}
@@ -70,7 +70,7 @@ pub struct PromSeriesVector {
/// For [ValueType::Matrix] result type
#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
pub struct PromSeriesMatrix {
pub metric: BTreeMap<String, String>,
pub metric: HashMap<String, String>,
pub values: Vec<(f64, String)>,
}

View File

@@ -13,8 +13,7 @@
// limitations under the License.
//! prom supply the prometheus HTTP API Server compliance
use std::cmp::Ordering;
use std::collections::{BTreeMap, HashMap};
use std::collections::HashMap;
use axum::http::HeaderValue;
use axum::response::{IntoResponse, Response};
@@ -312,7 +311,7 @@ impl PrometheusJsonResponse {
let metric = tags
.into_iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect::<BTreeMap<_, _>>();
.collect::<HashMap<_, _>>();
match result {
PromQueryResult::Vector(ref mut v) => {
v.push(PromSeriesVector {
@@ -321,11 +320,6 @@ impl PrometheusJsonResponse {
});
}
PromQueryResult::Matrix(ref mut v) => {
// sort values by timestamp
if !values.is_sorted_by(|a, b| a.0 <= b.0) {
values.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(Ordering::Equal));
}
v.push(PromSeriesMatrix { metric, values });
}
PromQueryResult::Scalar(ref mut v) => {
@@ -337,12 +331,6 @@ impl PrometheusJsonResponse {
}
});
// sort matrix by metric
// see: https://prometheus.io/docs/prometheus/3.5/querying/api/#range-vectors
if let PromQueryResult::Matrix(ref mut v) = result {
v.sort_by(|a, b| a.metric.cmp(&b.metric));
}
let result_type_string = result_type.to_string();
let data = PrometheusResponse::PromData(PromData {
result_type: result_type_string,

View File

@@ -49,7 +49,7 @@ pub(crate) struct GreptimeDBStartupParameters {
impl GreptimeDBStartupParameters {
fn new() -> GreptimeDBStartupParameters {
GreptimeDBStartupParameters {
version: format!("16.3-greptimedb-{}", common_version::version()),
version: format!("16.3-greptimedb-{}", env!("CARGO_PKG_VERSION")),
}
}
}

View File

@@ -390,10 +390,6 @@ impl PromSeriesProcessor {
let one_sample = series.samples.len() == 1;
for s in series.samples.iter() {
// skip NaN value
if s.value.is_nan() {
continue;
}
let timestamp = s.timestamp;
pipeline_map.insert(GREPTIME_TIMESTAMP.to_string(), Value::Int64(timestamp));
pipeline_map.insert(GREPTIME_VALUE.to_string(), Value::Float64(s.value));

View File

@@ -616,17 +616,6 @@ pub async fn test_prom_http_api(store_type: StorageType) {
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let res = client
.get("/v1/prometheus/api/v1/query_range?query=up&start=1&end=100&step=0.5")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let res = client
.post("/v1/prometheus/api/v1/query_range?query=up&start=1&end=100&step=0.5")
.header("Content-Type", "application/x-www-form-urlencoded")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
// labels
let res = client
@@ -2416,19 +2405,14 @@ processors:
ignore_missing: true
- vrl:
source: |
.from_source = "channel_2"
cond, err = .id1 > .id2
if (cond) {
.from_source = "channel_1"
}
del(.id1)
del(.id2)
.log_id = .id
del(.id)
.
transform:
- fields:
- from_source
type: string
- log_id
type: int32
- field: time
type: time
index: timestamp
@@ -2448,8 +2432,7 @@ transform:
let data_body = r#"
[
{
"id1": 2436,
"id2": 123,
"id": "2436",
"time": "2024-05-25 20:16:37.217"
}
]
@@ -2466,7 +2449,7 @@ transform:
"test_pipeline_with_vrl",
&client,
"select * from d_table",
"[[\"channel_1\",1716668197217000000]]",
"[[2436,1716668197217000000]]",
)
.await;

View File

@@ -152,16 +152,6 @@ pub async fn test_mysql_stmts(store_type: StorageType) {
conn.execute("SET TRANSACTION READ ONLY").await.unwrap();
// empty statements
let err = conn.execute(" ------- ;").await.unwrap_err();
assert!(err.to_string().contains("empty statements"));
let err = conn.execute("----------\n;").await.unwrap_err();
assert!(err.to_string().contains("empty statements"));
let err = conn.execute(" ;").await.unwrap_err();
assert!(err.to_string().contains("empty statements"));
let err = conn.execute(" \n ;").await.unwrap_err();
assert!(err.to_string().contains("empty statements"));
let _ = fe_mysql_server.shutdown().await;
guard.remove_all().await;
}

View File

@@ -112,102 +112,6 @@ TQL EVAL (0, 10, '1s', '2s') test{k="a"};
| 2.0 | 1970-01-01T00:00:02 | a |
+-----+---------------------+---+
-- SQLNESS SORT_RESULT 2 1
TQL EVAL (0, 10, '0.5') test;
+-----+-------------------------+---+
| i | j | k |
+-----+-------------------------+---+
| 1.0 | 1970-01-01T00:00:00.500 | b |
| 1.0 | 1970-01-01T00:00:01 | b |
| 1.0 | 1970-01-01T00:00:01.500 | b |
| 1.0 | 1970-01-01T00:00:02 | b |
| 1.0 | 1970-01-01T00:00:02.500 | b |
| 1.0 | 1970-01-01T00:00:03 | b |
| 1.0 | 1970-01-01T00:00:03.500 | b |
| 1.0 | 1970-01-01T00:00:04 | b |
| 1.0 | 1970-01-01T00:00:04.500 | b |
| 1.0 | 1970-01-01T00:00:05 | b |
| 1.0 | 1970-01-01T00:00:05.500 | b |
| 1.0 | 1970-01-01T00:00:06 | b |
| 1.0 | 1970-01-01T00:00:06.500 | b |
| 1.0 | 1970-01-01T00:00:07 | b |
| 1.0 | 1970-01-01T00:00:07.500 | b |
| 1.0 | 1970-01-01T00:00:08 | b |
| 1.0 | 1970-01-01T00:00:08.500 | b |
| 1.0 | 1970-01-01T00:00:09 | b |
| 1.0 | 1970-01-01T00:00:09.500 | b |
| 1.0 | 1970-01-01T00:00:10 | b |
| 2.0 | 1970-01-01T00:00:00.500 | a |
| 2.0 | 1970-01-01T00:00:01 | a |
| 2.0 | 1970-01-01T00:00:01.500 | a |
| 2.0 | 1970-01-01T00:00:02 | a |
| 2.0 | 1970-01-01T00:00:02.500 | a |
| 2.0 | 1970-01-01T00:00:03 | a |
| 2.0 | 1970-01-01T00:00:03.500 | a |
| 2.0 | 1970-01-01T00:00:04 | a |
| 2.0 | 1970-01-01T00:00:04.500 | a |
| 2.0 | 1970-01-01T00:00:05 | a |
| 2.0 | 1970-01-01T00:00:05.500 | a |
| 2.0 | 1970-01-01T00:00:06 | a |
| 2.0 | 1970-01-01T00:00:06.500 | a |
| 2.0 | 1970-01-01T00:00:07 | a |
| 2.0 | 1970-01-01T00:00:07.500 | a |
| 2.0 | 1970-01-01T00:00:08 | a |
| 2.0 | 1970-01-01T00:00:08.500 | a |
| 2.0 | 1970-01-01T00:00:09 | a |
| 2.0 | 1970-01-01T00:00:09.500 | a |
| 2.0 | 1970-01-01T00:00:10 | a |
+-----+-------------------------+---+
-- SQLNESS SORT_RESULT 2 1
TQL EVAL (0, 10, 0.5) test;
+-----+-------------------------+---+
| i | j | k |
+-----+-------------------------+---+
| 1.0 | 1970-01-01T00:00:00.500 | b |
| 1.0 | 1970-01-01T00:00:01 | b |
| 1.0 | 1970-01-01T00:00:01.500 | b |
| 1.0 | 1970-01-01T00:00:02 | b |
| 1.0 | 1970-01-01T00:00:02.500 | b |
| 1.0 | 1970-01-01T00:00:03 | b |
| 1.0 | 1970-01-01T00:00:03.500 | b |
| 1.0 | 1970-01-01T00:00:04 | b |
| 1.0 | 1970-01-01T00:00:04.500 | b |
| 1.0 | 1970-01-01T00:00:05 | b |
| 1.0 | 1970-01-01T00:00:05.500 | b |
| 1.0 | 1970-01-01T00:00:06 | b |
| 1.0 | 1970-01-01T00:00:06.500 | b |
| 1.0 | 1970-01-01T00:00:07 | b |
| 1.0 | 1970-01-01T00:00:07.500 | b |
| 1.0 | 1970-01-01T00:00:08 | b |
| 1.0 | 1970-01-01T00:00:08.500 | b |
| 1.0 | 1970-01-01T00:00:09 | b |
| 1.0 | 1970-01-01T00:00:09.500 | b |
| 1.0 | 1970-01-01T00:00:10 | b |
| 2.0 | 1970-01-01T00:00:00.500 | a |
| 2.0 | 1970-01-01T00:00:01 | a |
| 2.0 | 1970-01-01T00:00:01.500 | a |
| 2.0 | 1970-01-01T00:00:02 | a |
| 2.0 | 1970-01-01T00:00:02.500 | a |
| 2.0 | 1970-01-01T00:00:03 | a |
| 2.0 | 1970-01-01T00:00:03.500 | a |
| 2.0 | 1970-01-01T00:00:04 | a |
| 2.0 | 1970-01-01T00:00:04.500 | a |
| 2.0 | 1970-01-01T00:00:05 | a |
| 2.0 | 1970-01-01T00:00:05.500 | a |
| 2.0 | 1970-01-01T00:00:06 | a |
| 2.0 | 1970-01-01T00:00:06.500 | a |
| 2.0 | 1970-01-01T00:00:07 | a |
| 2.0 | 1970-01-01T00:00:07.500 | a |
| 2.0 | 1970-01-01T00:00:08 | a |
| 2.0 | 1970-01-01T00:00:08.500 | a |
| 2.0 | 1970-01-01T00:00:09 | a |
| 2.0 | 1970-01-01T00:00:09.500 | a |
| 2.0 | 1970-01-01T00:00:10 | a |
+-----+-------------------------+---+
TQL EVAL ('1970-01-01T00:00:00'::timestamp, '1970-01-01T00:00:00'::timestamp + '10 seconds'::interval, '1s') test{k="a"};
+-----+---------------------+---+

View File

@@ -35,13 +35,6 @@ TQL EVAL (0, 10, '5s') test{k="a"};
TQL EVAL (0, 10, '1s', '2s') test{k="a"};
-- SQLNESS SORT_RESULT 2 1
TQL EVAL (0, 10, '0.5') test;
-- SQLNESS SORT_RESULT 2 1
TQL EVAL (0, 10, 0.5) test;
TQL EVAL ('1970-01-01T00:00:00'::timestamp, '1970-01-01T00:00:00'::timestamp + '10 seconds'::interval, '1s') test{k="a"};
TQL EVAL (now() - now(), now() - (now() - '10 seconds'::interval), '1s') test{k="a"};

View File

@@ -234,3 +234,57 @@ drop table test;
Affected Rows: 0
CREATE TABLE test2 (
"greptime_timestamp" TIMESTAMP(3) NOT NULL,
"greptime_value" DOUBLE NULL,
"shard" STRING NULL INVERTED INDEX,
TIME INDEX ("greptime_timestamp"),
PRIMARY KEY ("shard")
)
PARTITION ON COLUMNS ("shard") (
shard <= '2',
shard > '2'
);
Affected Rows: 0
TQL EVAL sum(test2);
++
++
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
TQL ANALYZE sum(test2);
+-+-+-+
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_SortPreservingMergeExec: [greptime_timestamp@0 ASC NULLS LAST] REDACTED
|_|_|_SortExec: expr=[greptime_timestamp@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[sum(test2.greptime_value)] REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_RepartitionExec: partitioning=Hash([greptime_timestamp@0], 20), input_partitions=20 REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[sum(test2.greptime_value)] REDACTED
|_|_|_ProjectionExec: expr=[greptime_timestamp@0 as greptime_timestamp, greptime_value@1 as greptime_value] REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[greptime_timestamp] REDACTED
|_|_|_PromSeriesDivideExec: tags=["shard"] REDACTED
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|_|_|_|
| 1_| 1_|_PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[greptime_timestamp] REDACTED
|_|_|_PromSeriesDivideExec: tags=["shard"] REDACTED
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|_|_|_|
|_|_| Total rows: 0_|
+-+-+-+
DROP TABLE test2;
Affected Rows: 0

View File

@@ -95,3 +95,27 @@ TQL ANALYZE VERBOSE FORMAT JSON (0, 10, '5s') test;
TQL ANALYZE FORMAT TEXT (0, 10, '5s') test;
drop table test;
CREATE TABLE test2 (
"greptime_timestamp" TIMESTAMP(3) NOT NULL,
"greptime_value" DOUBLE NULL,
"shard" STRING NULL INVERTED INDEX,
TIME INDEX ("greptime_timestamp"),
PRIMARY KEY ("shard")
)
PARTITION ON COLUMNS ("shard") (
shard <= '2',
shard > '2'
);
TQL EVAL sum(test2);
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
TQL ANALYZE sum(test2);
DROP TABLE test2;