diff --git a/Cargo.lock b/Cargo.lock index bad087953f..ca03b88277 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5310,7 +5310,7 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=91c3d7b97a2850c014aa5ce4ffa4caeb6b918446#91c3d7b97a2850c014aa5ce4ffa4caeb6b918446" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=ccfd4da48bc0254ed865e479cd981a3581b02d84#ccfd4da48bc0254ed865e479cd981a3581b02d84" dependencies = [ "prost 0.13.5", "serde", diff --git a/Cargo.toml b/Cargo.toml index ade070d082..4c8ae155ec 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -140,7 +140,7 @@ etcd-client = "0.14" fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "91c3d7b97a2850c014aa5ce4ffa4caeb6b918446" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "ccfd4da48bc0254ed865e479cd981a3581b02d84" } hex = "0.4" http = "1" humantime = "2.1" diff --git a/src/api/src/error.rs b/src/api/src/error.rs index 01a3197bc3..bc37363060 100644 --- a/src/api/src/error.rs +++ b/src/api/src/error.rs @@ -17,6 +17,7 @@ use std::any::Any; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; +use common_time::timestamp::TimeUnit; use datatypes::prelude::ConcreteDataType; use snafu::prelude::*; use snafu::Location; @@ -66,12 +67,28 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Invalid time unit: {time_unit}"))] + InvalidTimeUnit { + time_unit: i32, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Inconsistent time unit: {:?}", units))] + InconsistentTimeUnit { + units: Vec, + #[snafu(implicit)] + location: Location, + }, } impl ErrorExt for Error { fn status_code(&self) -> StatusCode { match self { - Error::UnknownColumnDataType { .. } => StatusCode::InvalidArguments, + Error::UnknownColumnDataType { .. } + | Error::InvalidTimeUnit { .. } + | Error::InconsistentTimeUnit { .. } => StatusCode::InvalidArguments, Error::IntoColumnDataType { .. } | Error::SerializeJson { .. } => { StatusCode::Unexpected } diff --git a/src/api/src/helper.rs b/src/api/src/helper.rs index 37a535902c..ae51802de9 100644 --- a/src/api/src/helper.rs +++ b/src/api/src/helper.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; use std::sync::Arc; use common_base::BitVec; @@ -46,7 +47,7 @@ use greptime_proto::v1::{ use paste::paste; use snafu::prelude::*; -use crate::error::{self, Result}; +use crate::error::{self, InconsistentTimeUnitSnafu, InvalidTimeUnitSnafu, Result}; use crate::v1::column::Values; use crate::v1::{Column, ColumnDataType, Value as GrpcValue}; @@ -1079,6 +1080,89 @@ pub fn value_to_grpc_value(value: Value) -> GrpcValue { } } +pub fn from_pb_time_unit(unit: v1::TimeUnit) -> TimeUnit { + match unit { + v1::TimeUnit::Second => TimeUnit::Second, + v1::TimeUnit::Millisecond => TimeUnit::Millisecond, + v1::TimeUnit::Microsecond => TimeUnit::Microsecond, + v1::TimeUnit::Nanosecond => TimeUnit::Nanosecond, + } +} + +pub fn to_pb_time_unit(unit: TimeUnit) -> v1::TimeUnit { + match unit { + TimeUnit::Second => v1::TimeUnit::Second, + TimeUnit::Millisecond => v1::TimeUnit::Millisecond, + TimeUnit::Microsecond => v1::TimeUnit::Microsecond, + TimeUnit::Nanosecond => v1::TimeUnit::Nanosecond, + } +} + +pub fn from_pb_time_ranges(time_ranges: v1::TimeRanges) -> Result> { + if time_ranges.time_ranges.is_empty() { + return Ok(vec![]); + } + let proto_time_unit = v1::TimeUnit::try_from(time_ranges.time_unit).map_err(|_| { + InvalidTimeUnitSnafu { + time_unit: time_ranges.time_unit, + } + .build() + })?; + let time_unit = from_pb_time_unit(proto_time_unit); + Ok(time_ranges + .time_ranges + .into_iter() + .map(|r| { + ( + Timestamp::new(r.start, time_unit), + Timestamp::new(r.end, time_unit), + ) + }) + .collect()) +} + +/// All time_ranges must be of the same time unit. +/// +/// if input `time_ranges` is empty, it will return a default `TimeRanges` with `Millisecond` as the time unit. +pub fn to_pb_time_ranges(time_ranges: &[(Timestamp, Timestamp)]) -> Result { + let is_same_time_unit = time_ranges.windows(2).all(|x| { + x[0].0.unit() == x[1].0.unit() + && x[0].1.unit() == x[1].1.unit() + && x[0].0.unit() == x[0].1.unit() + }); + + if !is_same_time_unit { + let all_time_units: Vec<_> = time_ranges + .iter() + .map(|(s, e)| [s.unit(), e.unit()]) + .clone() + .flatten() + .collect::>() + .into_iter() + .collect(); + InconsistentTimeUnitSnafu { + units: all_time_units, + } + .fail()? + } + + let mut pb_time_ranges = v1::TimeRanges { + // default time unit is Millisecond + time_unit: v1::TimeUnit::Millisecond as i32, + time_ranges: Vec::with_capacity(time_ranges.len()), + }; + if let Some((start, _end)) = time_ranges.first() { + pb_time_ranges.time_unit = to_pb_time_unit(start.unit()) as i32; + } + for (start, end) in time_ranges { + pb_time_ranges.time_ranges.push(v1::TimeRange { + start: start.value(), + end: end.value(), + }); + } + Ok(pb_time_ranges) +} + #[cfg(test)] mod tests { use std::sync::Arc; diff --git a/src/common/meta/src/ddl/truncate_table.rs b/src/common/meta/src/ddl/truncate_table.rs index 1739cdb153..82434ed184 100644 --- a/src/common/meta/src/ddl/truncate_table.rs +++ b/src/common/meta/src/ddl/truncate_table.rs @@ -12,8 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +use api::helper::to_pb_time_ranges; use api::v1::region::{ - region_request, RegionRequest, RegionRequestHeader, TruncateRequest as PbTruncateRegionRequest, + region_request, truncate_request, RegionRequest, RegionRequestHeader, + TruncateRequest as PbTruncateRegionRequest, }; use async_trait::async_trait; use common_procedure::error::{FromJsonSnafu, ToJsonSnafu}; @@ -33,7 +35,7 @@ use table::table_reference::TableReference; use crate::ddl::utils::{add_peer_context_if_needed, map_to_procedure_error}; use crate::ddl::DdlContext; -use crate::error::{Result, TableNotFoundSnafu}; +use crate::error::{ConvertTimeRangesSnafu, Result, TableNotFoundSnafu}; use crate::key::table_info::TableInfoValue; use crate::key::table_name::TableNameKey; use crate::key::DeserializedValueWithBytes; @@ -153,6 +155,15 @@ impl TruncateTableProcedure { datanode ); + let time_ranges = &self.data.task.time_ranges; + let kind = if time_ranges.is_empty() { + truncate_request::Kind::All(api::v1::region::All {}) + } else { + let pb_time_ranges = + to_pb_time_ranges(time_ranges).context(ConvertTimeRangesSnafu)?; + truncate_request::Kind::TimeRanges(pb_time_ranges) + }; + let request = RegionRequest { header: Some(RegionRequestHeader { tracing_context: TracingContext::from_current_span().to_w3c(), @@ -160,6 +171,7 @@ impl TruncateTableProcedure { }), body: Some(region_request::Body::Truncate(PbTruncateRegionRequest { region_id: region_id.as_u64(), + kind: Some(kind), })), }; diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 0f97207ef3..be0f681d88 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -971,6 +971,13 @@ pub enum Error { source: api::error::Error, }, + #[snafu(display("Failed to convert time ranges"))] + ConvertTimeRanges { + #[snafu(implicit)] + location: Location, + source: api::error::Error, + }, + #[snafu(display( "Column metadata inconsistencies found in table: {}, table_id: {}", table_name, @@ -1045,7 +1052,8 @@ impl ErrorExt for Error { | KafkaGetOffset { .. } | ReadFlexbuffers { .. } | SerializeFlexbuffers { .. } - | DeserializeFlexbuffers { .. } => StatusCode::Unexpected, + | DeserializeFlexbuffers { .. } + | ConvertTimeRanges { .. } => StatusCode::Unexpected, SendMessage { .. } | GetKvCache { .. } | CacheNotGet { .. } => StatusCode::Internal, diff --git a/src/common/meta/src/rpc/ddl.rs b/src/common/meta/src/rpc/ddl.rs index 13d6736390..6c1c2168f0 100644 --- a/src/common/meta/src/rpc/ddl.rs +++ b/src/common/meta/src/rpc/ddl.rs @@ -18,6 +18,7 @@ pub mod trigger; use std::collections::{HashMap, HashSet}; use std::result; +use api::helper::{from_pb_time_ranges, to_pb_time_ranges}; use api::v1::alter_database_expr::Kind as PbAlterDatabaseKind; use api::v1::meta::ddl_task_request::Task; use api::v1::meta::{ @@ -38,7 +39,8 @@ use api::v1::{ }; use base64::engine::general_purpose; use base64::Engine as _; -use common_time::{DatabaseTimeToLive, Timezone}; +use common_error::ext::BoxedError; +use common_time::{DatabaseTimeToLive, Timestamp, Timezone}; use prost::Message; use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DefaultOnNull}; @@ -49,8 +51,8 @@ use table::table_name::TableName; use table::table_reference::TableReference; use crate::error::{ - self, InvalidSetDatabaseOptionSnafu, InvalidTimeZoneSnafu, InvalidUnsetDatabaseOptionSnafu, - Result, + self, ConvertTimeRangesSnafu, ExternalSnafu, InvalidSetDatabaseOptionSnafu, + InvalidTimeZoneSnafu, InvalidUnsetDatabaseOptionSnafu, Result, }; use crate::key::FlowId; @@ -179,12 +181,14 @@ impl DdlTask { schema: String, table: String, table_id: TableId, + time_ranges: Vec<(Timestamp, Timestamp)>, ) -> Self { DdlTask::TruncateTable(TruncateTableTask { catalog, schema, table, table_id, + time_ranges, }) } @@ -826,6 +830,7 @@ pub struct TruncateTableTask { pub schema: String, pub table: String, pub table_id: TableId, + pub time_ranges: Vec<(Timestamp, Timestamp)>, } impl TruncateTableTask { @@ -864,6 +869,13 @@ impl TryFrom for TruncateTableTask { err_msg: "expected table_id", })? .id, + time_ranges: truncate_table + .time_ranges + .map(from_pb_time_ranges) + .transpose() + .map_err(BoxedError::new) + .context(ExternalSnafu)? + .unwrap_or_default(), }) } } @@ -878,6 +890,9 @@ impl TryFrom for PbTruncateTableTask { schema_name: task.schema, table_name: task.table, table_id: Some(api::v1::TableId { id: task.table_id }), + time_ranges: Some( + to_pb_time_ranges(&task.time_ranges).context(ConvertTimeRangesSnafu)?, + ), }), }) } diff --git a/src/common/time/src/timestamp.rs b/src/common/time/src/timestamp.rs index 4656b24d76..cad239a94f 100644 --- a/src/common/time/src/timestamp.rs +++ b/src/common/time/src/timestamp.rs @@ -558,7 +558,7 @@ impl fmt::Debug for Timestamp { } } -#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] pub enum TimeUnit { Second, #[default] diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 13354f5bff..8c2edac648 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -1462,7 +1462,10 @@ mod tests { ); let err = mock_region_server - .handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {})) + .handle_request( + region_id, + RegionRequest::Truncate(RegionTruncateRequest::All), + ) .await .unwrap_err(); diff --git a/src/frontend/src/instance/grpc.rs b/src/frontend/src/instance/grpc.rs index 5383bd931a..eaf97a2e2b 100644 --- a/src/frontend/src/instance/grpc.rs +++ b/src/frontend/src/instance/grpc.rs @@ -14,6 +14,7 @@ use std::sync::Arc; +use api::helper::from_pb_time_ranges; use api::v1::ddl_request::{Expr as DdlExpr, Expr}; use api::v1::greptime_request::Request; use api::v1::query_request::Query; @@ -24,6 +25,7 @@ use api::v1::{ use async_trait::async_trait; use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; use common_base::AffectedRows; +use common_error::ext::BoxedError; use common_grpc::flight::FlightDecoder; use common_grpc::FlightData; use common_query::logical_plan::add_insert_to_logical_plan; @@ -39,7 +41,7 @@ use table::table_name::TableName; use table::TableRef; use crate::error::{ - CatalogSnafu, DataFusionSnafu, Error, InFlightWriteBytesExceededSnafu, + CatalogSnafu, DataFusionSnafu, Error, ExternalSnafu, InFlightWriteBytesExceededSnafu, IncompleteGrpcRequestSnafu, NotSupportedSnafu, PermissionSnafu, PlanStatementSnafu, Result, SubstraitDecodeLogicalPlanSnafu, TableNotFoundSnafu, TableOperationSnafu, }; @@ -195,8 +197,11 @@ impl GrpcQueryHandler for Instance { DdlExpr::TruncateTable(expr) => { let table_name = TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name); + let time_ranges = from_pb_time_ranges(expr.time_ranges.unwrap_or_default()) + .map_err(BoxedError::new) + .context(ExternalSnafu)?; self.statement_executor - .truncate_table(table_name, ctx.clone()) + .truncate_table(table_name, time_ranges, ctx.clone()) .await? } DdlExpr::CreateFlow(expr) => { diff --git a/src/mito2/src/engine/truncate_test.rs b/src/mito2/src/engine/truncate_test.rs index b4de857c06..4da3978ba6 100644 --- a/src/mito2/src/engine/truncate_test.rs +++ b/src/mito2/src/engine/truncate_test.rs @@ -69,7 +69,10 @@ async fn test_engine_truncate_region_basic() { // Truncate the region. engine - .handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {})) + .handle_request( + region_id, + RegionRequest::Truncate(RegionTruncateRequest::All), + ) .await .unwrap(); @@ -118,7 +121,10 @@ async fn test_engine_put_data_after_truncate() { // Truncate the region. engine - .handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {})) + .handle_request( + region_id, + RegionRequest::Truncate(RegionTruncateRequest::All), + ) .await .unwrap(); @@ -194,7 +200,10 @@ async fn test_engine_truncate_after_flush() { // Truncate the region. engine - .handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {})) + .handle_request( + region_id, + RegionRequest::Truncate(RegionTruncateRequest::All), + ) .await .unwrap(); @@ -249,7 +258,10 @@ async fn test_engine_truncate_reopen() { // Truncate the region engine - .handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {})) + .handle_request( + region_id, + RegionRequest::Truncate(RegionTruncateRequest::All), + ) .await .unwrap(); @@ -337,7 +349,10 @@ async fn test_engine_truncate_during_flush() { // Truncate the region. engine - .handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {})) + .handle_request( + region_id, + RegionRequest::Truncate(RegionTruncateRequest::All), + ) .await .unwrap(); diff --git a/src/mito2/src/manifest/action.rs b/src/mito2/src/manifest/action.rs index 1a01f8adbf..46e1cf1a5b 100644 --- a/src/mito2/src/manifest/action.rs +++ b/src/mito2/src/manifest/action.rs @@ -66,10 +66,21 @@ pub struct RegionRemove { #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] pub struct RegionTruncate { pub region_id: RegionId, - /// Last WAL entry id of truncated data. - pub truncated_entry_id: EntryId, - // Last sequence number of truncated data. - pub truncated_sequence: SequenceNumber, + pub kind: TruncateKind, +} + +/// The kind of truncate operation. +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] +pub enum TruncateKind { + /// Truncate all data in the region, marked by all data before the given entry id&sequence. + All { + /// Last WAL entry id of truncated data. + truncated_entry_id: EntryId, + // Last sequence number of truncated data. + truncated_sequence: SequenceNumber, + }, + /// Only remove certain files in the region. + Partial { files_to_remove: Vec }, } /// The region manifest data. @@ -147,10 +158,22 @@ impl RegionManifestBuilder { pub fn apply_truncate(&mut self, manifest_version: ManifestVersion, truncate: RegionTruncate) { self.manifest_version = manifest_version; - self.flushed_entry_id = truncate.truncated_entry_id; - self.flushed_sequence = truncate.truncated_sequence; - self.truncated_entry_id = Some(truncate.truncated_entry_id); - self.files.clear(); + match truncate.kind { + TruncateKind::All { + truncated_entry_id, + truncated_sequence, + } => { + self.flushed_entry_id = truncated_entry_id; + self.flushed_sequence = truncated_sequence; + self.truncated_entry_id = Some(truncated_entry_id); + self.files.clear(); + } + TruncateKind::Partial { files_to_remove } => { + for file in files_to_remove { + self.files.remove(&file.file_id); + } + } + } } /// Check if the builder keeps a [RegionMetadata](store_api::metadata::RegionMetadata). diff --git a/src/mito2/src/region/version.rs b/src/mito2/src/region/version.rs index 935b2bdc12..5c740c3071 100644 --- a/src/mito2/src/region/version.rs +++ b/src/mito2/src/region/version.rs @@ -31,7 +31,7 @@ use store_api::metadata::RegionMetadataRef; use store_api::storage::SequenceNumber; use crate::error::Result; -use crate::manifest::action::RegionEdit; +use crate::manifest::action::{RegionEdit, TruncateKind}; use crate::memtable::time_partition::{TimePartitions, TimePartitionsRef}; use crate::memtable::version::{MemtableVersion, MemtableVersionRef}; use crate::memtable::{MemtableBuilderRef, MemtableId}; @@ -196,8 +196,7 @@ impl VersionControl { /// Truncate current version. pub(crate) fn truncate( &self, - truncated_entry_id: EntryId, - truncated_sequence: SequenceNumber, + truncate_kind: TruncateKind, memtable_builder: &MemtableBuilderRef, ) { let version = self.current().version; @@ -210,13 +209,23 @@ impl VersionControl { next_memtable_id, Some(part_duration), )); - let new_version = Arc::new( - VersionBuilder::new(version.metadata.clone(), new_mutable) - .flushed_entry_id(truncated_entry_id) - .flushed_sequence(truncated_sequence) - .truncated_entry_id(Some(truncated_entry_id)) - .build(), - ); + let new_version = match truncate_kind { + TruncateKind::All { + truncated_entry_id, + truncated_sequence, + } => Arc::new( + VersionBuilder::new(version.metadata.clone(), new_mutable) + .flushed_entry_id(truncated_entry_id) + .flushed_sequence(truncated_sequence) + .truncated_entry_id(Some(truncated_entry_id)) + .build(), + ), + TruncateKind::Partial { files_to_remove } => Arc::new( + VersionBuilder::from_version(version) + .remove_files(files_to_remove.into_iter()) + .build(), + ), + }; let mut version_data = self.data.write().unwrap(); version_data.version.ssts.mark_all_deleted(); @@ -416,6 +425,14 @@ impl VersionBuilder { self } + pub(crate) fn remove_files(mut self, files: impl Iterator) -> Self { + let mut ssts = (*self.ssts).clone(); + ssts.remove_files(files); + self.ssts = Arc::new(ssts); + + self + } + /// Builds a new [Version] from the builder. /// It overwrites the window size by compaction option. pub(crate) fn build(self) -> Version { diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index be010e71f6..94e66a97ac 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -38,7 +38,7 @@ use store_api::region_request::{ RegionCloseRequest, RegionCompactRequest, RegionCreateRequest, RegionFlushRequest, RegionOpenRequest, RegionRequest, RegionTruncateRequest, }; -use store_api::storage::{RegionId, SequenceNumber}; +use store_api::storage::RegionId; use store_api::ManifestVersion; use tokio::sync::oneshot::{self, Receiver, Sender}; @@ -46,7 +46,7 @@ use crate::error::{ CompactRegionSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu, Error, FillDefaultSnafu, FlushRegionSnafu, InvalidRequestSnafu, Result, UnexpectedSnafu, }; -use crate::manifest::action::RegionEdit; +use crate::manifest::action::{RegionEdit, TruncateKind}; use crate::memtable::bulk::part::BulkPart; use crate::memtable::MemtableId; use crate::metrics::COMPACTION_ELAPSED_TOTAL; @@ -886,10 +886,7 @@ pub(crate) struct TruncateResult { pub(crate) sender: OptionOutputTx, /// Truncate result. pub(crate) result: Result<()>, - /// Truncated entry id. - pub(crate) truncated_entry_id: EntryId, - /// Truncated sequence. - pub(crate) truncated_sequence: SequenceNumber, + pub(crate) kind: TruncateKind, } /// Notifies the region the result of writing region change action. diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 06fd2a86fe..0a09de96c5 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -944,8 +944,8 @@ impl RegionWorkerLoop { .await; continue; } - DdlRequest::Truncate(_) => { - self.handle_truncate_request(ddl.region_id, ddl.sender) + DdlRequest::Truncate(req) => { + self.handle_truncate_request(ddl.region_id, req, ddl.sender) .await; continue; } diff --git a/src/mito2/src/worker/handle_manifest.rs b/src/mito2/src/worker/handle_manifest.rs index 4e4c67af54..9cf94f1021 100644 --- a/src/mito2/src/worker/handle_manifest.rs +++ b/src/mito2/src/worker/handle_manifest.rs @@ -317,8 +317,7 @@ impl RegionWorkerLoop { region_id: truncate.region_id, sender, result, - truncated_entry_id: truncate.truncated_entry_id, - truncated_sequence: truncate.truncated_sequence, + kind: truncate.kind, }; let _ = request_sender .send(WorkerRequestWithTime::new(WorkerRequest::Background { diff --git a/src/mito2/src/worker/handle_truncate.rs b/src/mito2/src/worker/handle_truncate.rs index 863b1961a3..d571b99abe 100644 --- a/src/mito2/src/worker/handle_truncate.rs +++ b/src/mito2/src/worker/handle_truncate.rs @@ -14,12 +14,13 @@ //! Handling truncate related requests. -use common_telemetry::info; +use common_telemetry::{debug, info}; use store_api::logstore::LogStore; +use store_api::region_request::RegionTruncateRequest; use store_api::storage::RegionId; use crate::error::RegionNotFoundSnafu; -use crate::manifest::action::RegionTruncate; +use crate::manifest::action::{RegionTruncate, TruncateKind}; use crate::region::RegionLeaderState; use crate::request::{OptionOutputTx, TruncateResult}; use crate::worker::RegionWorkerLoop; @@ -28,24 +29,65 @@ impl RegionWorkerLoop { pub(crate) async fn handle_truncate_request( &mut self, region_id: RegionId, + req: RegionTruncateRequest, mut sender: OptionOutputTx, ) { let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else { return; }; - - info!("Try to truncate region {}", region_id); - let version_data = region.version_control.current(); - let truncated_entry_id = version_data.last_entry_id; - let truncated_sequence = version_data.committed_sequence; - // Write region truncated to manifest. - let truncate = RegionTruncate { - region_id, - truncated_entry_id, - truncated_sequence, + let truncate = match req { + RegionTruncateRequest::All => { + info!("Try to fully truncate region {}", region_id); + + let truncated_entry_id = version_data.last_entry_id; + let truncated_sequence = version_data.committed_sequence; + + // Write region truncated to manifest. + RegionTruncate { + region_id, + kind: TruncateKind::All { + truncated_entry_id, + truncated_sequence, + }, + } + } + RegionTruncateRequest::ByTimeRanges { time_ranges } => { + info!( + "Try to partially truncate region {} by time ranges: {:?}", + region_id, time_ranges + ); + // find all files that are fully contained in the time ranges + let mut files_to_truncate = Vec::new(); + for level in version_data.version.ssts.levels() { + for file in level.files() { + let file_time_range = file.time_range(); + // TODO(discord9): This is a naive way to check if is contained, we should + // optimize it later. + let is_subset = time_ranges.iter().any(|(start, end)| { + file_time_range.0 >= *start && file_time_range.1 <= *end + }); + + if is_subset { + files_to_truncate.push(file.meta_ref().clone()); + } + } + } + debug!( + "Found {} files to partially truncate in region {}", + files_to_truncate.len(), + region_id + ); + RegionTruncate { + region_id, + kind: TruncateKind::Partial { + files_to_remove: files_to_truncate, + }, + } + } }; + self.handle_manifest_truncate_action(region, truncate, sender); } @@ -68,11 +110,9 @@ impl RegionWorkerLoop { match truncate_result.result { Ok(()) => { // Applies the truncate action to the region. - region.version_control.truncate( - truncate_result.truncated_entry_id, - truncate_result.truncated_sequence, - ®ion.memtable_builder, - ); + region + .version_control + .truncate(truncate_result.kind.clone(), ®ion.memtable_builder); } Err(e) => { // Unable to truncate the region. @@ -86,23 +126,25 @@ impl RegionWorkerLoop { // Notifies compaction scheduler. self.compaction_scheduler.on_region_truncated(region_id); - // Make all data obsolete. - if let Err(e) = self - .wal - .obsolete( - region_id, - truncate_result.truncated_entry_id, - ®ion.provider, - ) - .await + if let TruncateKind::All { + truncated_entry_id, + truncated_sequence: _, + } = &truncate_result.kind { - truncate_result.sender.send(Err(e)); - return; + // Make all data obsolete. + if let Err(e) = self + .wal + .obsolete(region_id, *truncated_entry_id, ®ion.provider) + .await + { + truncate_result.sender.send(Err(e)); + return; + } } info!( - "Complete truncating region: {}, entry id: {} and sequence: {}.", - region_id, truncate_result.truncated_entry_id, truncate_result.truncated_sequence + "Complete truncating region: {}, kind: {:?}.", + region_id, truncate_result.kind ); truncate_result.sender.send(Ok(0)); diff --git a/src/operator/src/statement.rs b/src/operator/src/statement.rs index 9d53732227..4975d87154 100644 --- a/src/operator/src/statement.rs +++ b/src/operator/src/statement.rs @@ -47,6 +47,7 @@ use common_telemetry::tracing; use common_time::range::TimestampRange; use common_time::Timestamp; use datafusion_expr::LogicalPlan; +use datatypes::prelude::ConcreteDataType; use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef}; use query::parser::QueryStatement; use query::QueryEngineRef; @@ -73,8 +74,8 @@ use self::set::{ }; use crate::error::{ self, CatalogSnafu, ExecLogicalPlanSnafu, ExternalSnafu, InvalidSqlSnafu, NotSupportedSnafu, - PlanStatementSnafu, Result, SchemaNotFoundSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu, - UpgradeCatalogManagerRefSnafu, + PlanStatementSnafu, Result, SchemaNotFoundSnafu, SqlCommonSnafu, TableMetadataManagerSnafu, + TableNotFoundSnafu, UnexpectedSnafu, UpgradeCatalogManagerRefSnafu, }; use crate::insert::InserterRef; use crate::statement::copy_database::{COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY}; @@ -306,7 +307,11 @@ impl StatementExecutor { .map_err(BoxedError::new) .context(ExternalSnafu)?; let table_name = TableName::new(catalog, schema, table); - self.truncate_table(table_name, query_ctx).await + let time_ranges = self + .convert_truncate_time_ranges(&table_name, stmt.time_ranges(), &query_ctx) + .await?; + self.truncate_table(table_name, time_ranges, query_ctx) + .await } Statement::CreateDatabase(stmt) => { self.create_database( @@ -530,6 +535,84 @@ impl StatementExecutor { pub fn cache_invalidator(&self) -> &CacheInvalidatorRef { &self.cache_invalidator } + + /// Convert truncate time ranges for the given table from sql values to timestamps + /// + pub async fn convert_truncate_time_ranges( + &self, + table_name: &TableName, + sql_values_time_range: &[(sqlparser::ast::Value, sqlparser::ast::Value)], + query_ctx: &QueryContextRef, + ) -> Result> { + if sql_values_time_range.is_empty() { + return Ok(vec![]); + } + let table = self.get_table(&table_name.table_ref()).await?; + let info = table.table_info(); + let time_index_dt = info + .meta + .schema + .timestamp_column() + .context(UnexpectedSnafu { + violated: "Table must have a timestamp column", + })?; + + let time_unit = time_index_dt + .data_type + .as_timestamp() + .with_context(|| UnexpectedSnafu { + violated: format!( + "Table {}'s time index column must be a timestamp type, found: {:?}", + table_name, time_index_dt + ), + })? + .unit(); + + let mut time_ranges = Vec::with_capacity(sql_values_time_range.len()); + for (start, end) in sql_values_time_range { + let start = common_sql::convert::sql_value_to_value( + "range_start", + &ConcreteDataType::timestamp_datatype(time_unit), + start, + Some(&query_ctx.timezone()), + None, + false, + ) + .context(SqlCommonSnafu) + .and_then(|v| { + if let datatypes::value::Value::Timestamp(t) = v { + Ok(t) + } else { + error::InvalidSqlSnafu { + err_msg: format!("Expected a timestamp value, found {v:?}"), + } + .fail() + } + })?; + + let end = common_sql::convert::sql_value_to_value( + "range_end", + &ConcreteDataType::timestamp_datatype(time_unit), + end, + Some(&query_ctx.timezone()), + None, + false, + ) + .context(SqlCommonSnafu) + .and_then(|v| { + if let datatypes::value::Value::Timestamp(t) = v { + Ok(t) + } else { + error::InvalidSqlSnafu { + err_msg: format!("Expected a timestamp value, found {v:?}"), + } + .fail() + } + })?; + time_ranges.push((start, end)); + } + Ok(time_ranges) + } } fn to_copy_query_request(stmt: CopyQueryToArgument) -> Result { diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index b0c2e26b9a..83b3878e60 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -46,7 +46,7 @@ use common_meta::rpc::ddl::{ use common_query::Output; use common_sql::convert::sql_value_to_value; use common_telemetry::{debug, info, tracing, warn}; -use common_time::Timezone; +use common_time::{Timestamp, Timezone}; use datafusion_common::tree_node::TreeNodeVisitor; use datafusion_expr::LogicalPlan; use datatypes::prelude::ConcreteDataType; @@ -1151,6 +1151,7 @@ impl StatementExecutor { pub async fn truncate_table( &self, table_name: TableName, + time_ranges: Vec<(Timestamp, Timestamp)>, query_context: QueryContextRef, ) -> Result { ensure!( @@ -1174,7 +1175,7 @@ impl StatementExecutor { table_name: table_name.to_string(), })?; let table_id = table.table_info().table_id(); - self.truncate_table_procedure(&table_name, table_id, query_context) + self.truncate_table_procedure(&table_name, table_id, time_ranges, query_context) .await?; Ok(Output::new_with_affected_rows(0)) @@ -1490,6 +1491,7 @@ impl StatementExecutor { &self, table_name: &TableName, table_id: TableId, + time_ranges: Vec<(Timestamp, Timestamp)>, query_context: QueryContextRef, ) -> Result { let request = SubmitDdlTaskRequest { @@ -1499,6 +1501,7 @@ impl StatementExecutor { table_name.schema_name.to_string(), table_name.table_name.to_string(), table_id, + time_ranges, ), }; diff --git a/src/sql/src/parsers/truncate_parser.rs b/src/sql/src/parsers/truncate_parser.rs index 3fa136baff..9d020b644e 100644 --- a/src/sql/src/parsers/truncate_parser.rs +++ b/src/sql/src/parsers/truncate_parser.rs @@ -14,8 +14,9 @@ use snafu::{ensure, ResultExt}; use sqlparser::keywords::Keyword; +use sqlparser::tokenizer::Token; -use crate::error::{self, InvalidTableNameSnafu, Result}; +use crate::error::{self, InvalidSqlSnafu, InvalidTableNameSnafu, Result, UnexpectedTokenSnafu}; use crate::parser::ParserContext; use crate::statements::statement::Statement; use crate::statements::truncate::TruncateTable; @@ -41,7 +42,92 @@ impl ParserContext<'_> { } ); - Ok(Statement::TruncateTable(TruncateTable::new(table_ident))) + let have_range = self.parser.parse_keywords(&[Keyword::FILE, Keyword::RANGE]); + + // if no range is specified, we just truncate the table + if !have_range { + return Ok(Statement::TruncateTable(TruncateTable::new(table_ident))); + } + + // parse a list of time ranges consist of (Timestamp, Timestamp),? + let mut time_ranges = vec![]; + loop { + let _ = self + .parser + .expect_token(&sqlparser::tokenizer::Token::LParen) + .with_context(|_| error::UnexpectedSnafu { + expected: "a left parenthesis", + actual: self.peek_token_as_string(), + })?; + + // parse to values here, no need to valid in parser + let start = self + .parser + .parse_value() + .with_context(|e| error::UnexpectedSnafu { + expected: "a timestamp value", + actual: e.to_string(), + })?; + + let _ = self + .parser + .expect_token(&sqlparser::tokenizer::Token::Comma) + .with_context(|_| error::UnexpectedSnafu { + expected: "a comma", + actual: self.peek_token_as_string(), + })?; + + let end = self + .parser + .parse_value() + .with_context(|_| error::UnexpectedSnafu { + expected: "a timestamp", + actual: self.peek_token_as_string(), + })?; + + let _ = self + .parser + .expect_token(&sqlparser::tokenizer::Token::RParen) + .with_context(|_| error::UnexpectedSnafu { + expected: "a right parenthesis", + actual: self.peek_token_as_string(), + })?; + + time_ranges.push((start, end)); + + let peek = self.parser.peek_token().token; + + match peek { + sqlparser::tokenizer::Token::EOF | Token::SemiColon => { + if time_ranges.is_empty() { + return Err(InvalidSqlSnafu { + msg: "TRUNCATE TABLE RANGE must have at least one range".to_string(), + } + .build()); + } + break; + } + Token::Comma => { + self.parser.next_token(); // Consume the comma + let next_peek = self.parser.peek_token().token; // Peek the token after the comma + if matches!(next_peek, Token::EOF | Token::SemiColon) { + break; // Trailing comma, end of statement + } + // Otherwise, continue to parse next range + continue; + } + _ => UnexpectedTokenSnafu { + expected: "a comma or end of statement", + actual: self.peek_token_as_string(), + } + .fail()?, + } + } + + Ok(Statement::TruncateTable(TruncateTable::new_with_ranges( + table_ident, + time_ranges, + ))) } } @@ -53,6 +139,145 @@ mod tests { use crate::dialect::GreptimeDbDialect; use crate::parser::ParseOptions; + #[test] + pub fn test_parse_truncate_with_ranges() { + let sql = r#"TRUNCATE foo FILE RANGE (0, 20)"#; + let mut stmts = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap(); + assert_eq!( + stmts.pop().unwrap(), + Statement::TruncateTable(TruncateTable::new_with_ranges( + ObjectName(vec![Ident::new("foo")]), + vec![( + sqlparser::ast::Value::Number("0".to_string(), false), + sqlparser::ast::Value::Number("20".to_string(), false) + )] + )) + ); + + let sql = r#"TRUNCATE TABLE foo FILE RANGE ("2000-01-01 00:00:00+00:00", "2000-01-01 00:00:00+00:00"), (2,33)"#; + let mut stmts = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap(); + assert_eq!( + stmts.pop().unwrap(), + Statement::TruncateTable(TruncateTable::new_with_ranges( + ObjectName(vec![Ident::new("foo")]), + vec![ + ( + sqlparser::ast::Value::DoubleQuotedString( + "2000-01-01 00:00:00+00:00".to_string() + ), + sqlparser::ast::Value::DoubleQuotedString( + "2000-01-01 00:00:00+00:00".to_string() + ) + ), + ( + sqlparser::ast::Value::Number("2".to_string(), false), + sqlparser::ast::Value::Number("33".to_string(), false) + ) + ] + )) + ); + + let sql = "TRUNCATE TABLE my_schema.foo FILE RANGE (1, 2), (3, 4),"; + let mut stmts = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap(); + assert_eq!( + stmts.pop().unwrap(), + Statement::TruncateTable(TruncateTable::new_with_ranges( + ObjectName(vec![Ident::new("my_schema"), Ident::new("foo")]), + vec![ + ( + sqlparser::ast::Value::Number("1".to_string(), false), + sqlparser::ast::Value::Number("2".to_string(), false) + ), + ( + sqlparser::ast::Value::Number("3".to_string(), false), + sqlparser::ast::Value::Number("4".to_string(), false) + ) + ] + )) + ); + + let sql = "TRUNCATE my_schema.foo FILE RANGE (1,2),"; + let mut stmts = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap(); + assert_eq!( + stmts.pop().unwrap(), + Statement::TruncateTable(TruncateTable::new_with_ranges( + ObjectName(vec![Ident::new("my_schema"), Ident::new("foo")]), + vec![( + sqlparser::ast::Value::Number("1".to_string(), false), + sqlparser::ast::Value::Number("2".to_string(), false) + )] + )) + ); + + let sql = "TRUNCATE TABLE my_catalog.my_schema.foo FILE RANGE (1,2);"; + let mut stmts = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap(); + assert_eq!( + stmts.pop().unwrap(), + Statement::TruncateTable(TruncateTable::new_with_ranges( + ObjectName(vec![ + Ident::new("my_catalog"), + Ident::new("my_schema"), + Ident::new("foo") + ]), + vec![( + sqlparser::ast::Value::Number("1".to_string(), false), + sqlparser::ast::Value::Number("2".to_string(), false) + )] + )) + ); + + let sql = "TRUNCATE drop FILE RANGE (1,2)"; + let mut stmts = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap(); + assert_eq!( + stmts.pop().unwrap(), + Statement::TruncateTable(TruncateTable::new_with_ranges( + ObjectName(vec![Ident::new("drop")]), + vec![( + sqlparser::ast::Value::Number("1".to_string(), false), + sqlparser::ast::Value::Number("2".to_string(), false) + )] + )) + ); + + let sql = "TRUNCATE `drop`"; + let mut stmts = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap(); + assert_eq!( + stmts.pop().unwrap(), + Statement::TruncateTable(TruncateTable::new(ObjectName(vec![Ident::with_quote( + '`', "drop" + ),]))) + ); + + let sql = "TRUNCATE \"drop\" FILE RANGE (\"1\", \"2\")"; + let mut stmts = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap(); + assert_eq!( + stmts.pop().unwrap(), + Statement::TruncateTable(TruncateTable::new_with_ranges( + ObjectName(vec![Ident::with_quote('"', "drop")]), + vec![( + sqlparser::ast::Value::DoubleQuotedString("1".to_string()), + sqlparser::ast::Value::DoubleQuotedString("2".to_string()) + )] + )) + ); + } + #[test] pub fn test_parse_truncate() { let sql = "TRUNCATE foo"; @@ -153,5 +378,82 @@ mod tests { let result = ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()); assert!(result.is_err(), "result is: {result:?}"); + + let sql = "TRUNCATE TABLE foo RANGE"; + let result = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()); + assert!( + result.is_err() + && format!("{result:?}").contains("SQL statement is not supported, keyword: RANGE"), + "result is: {result:?}" + ); + + let sql = "TRUNCATE TABLE foo FILE"; + let result = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()); + assert!( + result.is_err() + && format!("{result:?}").contains("SQL statement is not supported, keyword: FILE"), + "result is: {result:?}" + ); + + let sql = "TRUNCATE TABLE foo FILE RANGE"; + let result = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()); + assert!( + result.is_err() && format!("{result:?}").contains("expected: 'a left parenthesis'"), + "result is: {result:?}" + ); + + let sql = "TRUNCATE TABLE foo FILE RANGE ("; + let result = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()); + assert!( + result.is_err() && format!("{result:?}").contains("expected: 'a timestamp value'"), + "result is: {result:?}" + ); + + let sql = "TRUNCATE TABLE foo FILE RANGE ()"; + let result = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()); + assert!( + result.is_err() && format!("{result:?}").contains("expected: 'a timestamp value'"), + "result is: {result:?}" + ); + + let sql = "TRUNCATE TABLE foo FILE RANGE (1 2) (3 4)"; + let result = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()); + assert!( + result.is_err() && format!("{result:?}").contains("expected: 'a comma'"), + "result is: {result:?}" + ); + + let sql = "TRUNCATE TABLE foo FILE RANGE (,),(3,4)"; + let result = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()); + assert!( + result.is_err() && format!("{result:?}").contains("Expected: a value, found: ,"), + "result is: {result:?}" + ); + + let sql = "TRUNCATE TABLE foo FILE RANGE (1,2) (3,4)"; + let result = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()); + assert!( + result.is_err() + && format!("{result:?}") + .contains("expected: 'a comma or end of statement', found: ("), + "result is: {result:?}" + ); + + let sql = "TRUNCATE TABLE foo FILE RANGE (1,2),,,,,,,,,(3,4)"; + let result = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()); + assert!( + result.is_err() + && format!("{result:?}").contains("expected: 'a left parenthesis', found: ,"), + "result is: {result:?}" + ); } } diff --git a/src/sql/src/statements/truncate.rs b/src/sql/src/statements/truncate.rs index 710b5f72df..a4e4c49d5b 100644 --- a/src/sql/src/statements/truncate.rs +++ b/src/sql/src/statements/truncate.rs @@ -14,31 +14,85 @@ use std::fmt::Display; +use itertools::Itertools; use serde::Serialize; -use sqlparser::ast::ObjectName; -use sqlparser_derive::{Visit, VisitMut}; +use sqlparser::ast::{ObjectName, Visit, VisitMut, Visitor, VisitorMut}; /// TRUNCATE TABLE statement. -#[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut, Serialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize)] pub struct TruncateTable { table_name: ObjectName, + time_ranges: Vec<(sqlparser::ast::Value, sqlparser::ast::Value)>, +} + +impl Visit for TruncateTable { + fn visit(&self, visitor: &mut V) -> ::std::ops::ControlFlow { + self.table_name.visit(visitor)?; + for (start, end) in &self.time_ranges { + start.visit(visitor)?; + end.visit(visitor)?; + } + ::std::ops::ControlFlow::Continue(()) + } +} + +impl VisitMut for TruncateTable { + fn visit(&mut self, visitor: &mut V) -> ::std::ops::ControlFlow { + sqlparser::ast::VisitMut::visit(&mut self.table_name, visitor)?; + for (start, end) in &mut self.time_ranges { + start.visit(visitor)?; + end.visit(visitor)?; + } + ::std::ops::ControlFlow::Continue(()) + } } impl TruncateTable { /// Creates a statement for `TRUNCATE TABLE` pub fn new(table_name: ObjectName) -> Self { - Self { table_name } + Self { + table_name, + time_ranges: Vec::new(), + } + } + + /// Creates a statement for `TRUNCATE TABLE RANGE` + pub fn new_with_ranges( + table_name: ObjectName, + time_ranges: Vec<(sqlparser::ast::Value, sqlparser::ast::Value)>, + ) -> Self { + Self { + table_name, + time_ranges, + } } pub fn table_name(&self) -> &ObjectName { &self.table_name } + + pub fn time_ranges(&self) -> &[(sqlparser::ast::Value, sqlparser::ast::Value)] { + &self.time_ranges + } } impl Display for TruncateTable { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let table_name = self.table_name(); - write!(f, r#"TRUNCATE TABLE {table_name}"#) + write!(f, r#"TRUNCATE TABLE {table_name}"#)?; + if self.time_ranges.is_empty() { + return Ok(()); + } + + write!(f, " FILE RANGE ")?; + write!( + f, + "{}", + self.time_ranges + .iter() + .map(|(start, end)| format!("({}, {})", start, end)) + .join(", ") + ) } } @@ -71,5 +125,45 @@ TRUNCATE TABLE t1"#, unreachable!(); } } + + let sql = r"truncate table t1 file range (1,2);"; + let stmts: Vec = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap(); + assert_eq!(1, stmts.len()); + assert_matches!(&stmts[0], Statement::TruncateTable { .. }); + match &stmts[0] { + Statement::TruncateTable(trunc) => { + let new_sql = format!("\n{}", trunc); + assert_eq!( + r#" +TRUNCATE TABLE t1 FILE RANGE (1, 2)"#, + &new_sql + ); + } + _ => { + unreachable!(); + } + } + + let sql = r"truncate table t1 file range (1,2), (3,4);"; + let stmts: Vec = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap(); + assert_eq!(1, stmts.len()); + assert_matches!(&stmts[0], Statement::TruncateTable { .. }); + match &stmts[0] { + Statement::TruncateTable(trunc) => { + let new_sql = format!("\n{}", trunc); + assert_eq!( + r#" +TRUNCATE TABLE t1 FILE RANGE (1, 2), (3, 4)"#, + &new_sql + ); + } + _ => { + unreachable!(); + } + } } } diff --git a/src/store-api/src/metadata.rs b/src/store-api/src/metadata.rs index 229f50ed26..0e24706f38 100644 --- a/src/store-api/src/metadata.rs +++ b/src/store-api/src/metadata.rs @@ -1035,6 +1035,13 @@ pub enum MetadataError { location: Location, }, + #[snafu(display("Failed to convert TimeRanges"))] + ConvertTimeRanges { + source: api::error::Error, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Invalid set region option request, key: {}, value: {}", key, value))] InvalidSetRegionOptionRequest { key: String, diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index 1e5e3c3d14..779e8ccefa 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -15,16 +15,16 @@ use std::collections::HashMap; use std::fmt::{self, Display}; -use api::helper::ColumnDataTypeWrapper; +use api::helper::{from_pb_time_ranges, ColumnDataTypeWrapper}; use api::v1::add_column_location::LocationType; use api::v1::column_def::{ as_fulltext_option_analyzer, as_fulltext_option_backend, as_skipping_index_type, }; use api::v1::region::bulk_insert_request::Body; use api::v1::region::{ - alter_request, compact_request, region_request, AlterRequest, AlterRequests, BulkInsertRequest, - CloseRequest, CompactRequest, CreateRequest, CreateRequests, DeleteRequests, DropRequest, - DropRequests, FlushRequest, InsertRequests, OpenRequest, TruncateRequest, + alter_request, compact_request, region_request, truncate_request, AlterRequest, AlterRequests, + BulkInsertRequest, CloseRequest, CompactRequest, CreateRequest, CreateRequests, DeleteRequests, + DropRequest, DropRequests, FlushRequest, InsertRequests, OpenRequest, TruncateRequest, }; use api::v1::{ self, Analyzer, ArrowIpc, FulltextBackend as PbFulltextBackend, Option as PbOption, Rows, @@ -33,7 +33,7 @@ use api::v1::{ pub use common_base::AffectedRows; use common_grpc::flight::FlightDecoder; use common_recordbatch::DfRecordBatch; -use common_time::TimeToLive; +use common_time::{TimeToLive, Timestamp}; use datatypes::prelude::ConcreteDataType; use datatypes::schema::{FulltextOptions, SkippingIndexOptions}; use serde::{Deserialize, Serialize}; @@ -42,9 +42,10 @@ use strum::{AsRefStr, IntoStaticStr}; use crate::logstore::entry; use crate::metadata::{ - ColumnMetadata, DecodeProtoSnafu, FlightCodecSnafu, InvalidIndexOptionSnafu, - InvalidRawRegionRequestSnafu, InvalidRegionRequestSnafu, InvalidSetRegionOptionRequestSnafu, - InvalidUnsetRegionOptionRequestSnafu, MetadataError, RegionMetadata, Result, UnexpectedSnafu, + ColumnMetadata, ConvertTimeRangesSnafu, DecodeProtoSnafu, FlightCodecSnafu, + InvalidIndexOptionSnafu, InvalidRawRegionRequestSnafu, InvalidRegionRequestSnafu, + InvalidSetRegionOptionRequestSnafu, InvalidUnsetRegionOptionRequestSnafu, MetadataError, + RegionMetadata, Result, UnexpectedSnafu, }; use crate::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY; use crate::metrics; @@ -340,10 +341,24 @@ fn make_region_compact(compact: CompactRequest) -> Result Result> { let region_id = truncate.region_id.into(); - Ok(vec![( - region_id, - RegionRequest::Truncate(RegionTruncateRequest {}), - )]) + match truncate.kind { + None => InvalidRawRegionRequestSnafu { + err: "missing kind in TruncateRequest".to_string(), + } + .fail(), + Some(truncate_request::Kind::All(_)) => Ok(vec![( + region_id, + RegionRequest::Truncate(RegionTruncateRequest::All), + )]), + Some(truncate_request::Kind::TimeRanges(time_ranges)) => { + let time_ranges = from_pb_time_ranges(time_ranges).context(ConvertTimeRangesSnafu)?; + + Ok(vec![( + region_id, + RegionRequest::Truncate(RegionTruncateRequest::ByTimeRanges { time_ranges }), + )]) + } + } } /// Convert [BulkInsertRequest] to [RegionRequest] and group by [RegionId]. @@ -1312,7 +1327,16 @@ impl Default for RegionCompactRequest { /// Truncate region request. #[derive(Debug)] -pub struct RegionTruncateRequest {} +pub enum RegionTruncateRequest { + /// Truncate all data in the region. + All, + ByTimeRanges { + /// Time ranges to truncate. Both bound are inclusive. + /// only files that are fully contained in the time range will be truncated. + /// so no guarantee that all data in the time range will be truncated. + time_ranges: Vec<(Timestamp, Timestamp)>, + }, +} /// Catchup region request. /// diff --git a/tests/cases/standalone/common/truncate/truncate.result b/tests/cases/standalone/common/truncate/truncate.result index c0e7560f81..6cd490e26d 100644 --- a/tests/cases/standalone/common/truncate/truncate.result +++ b/tests/cases/standalone/common/truncate/truncate.result @@ -44,6 +44,67 @@ SELECT ts, host, cpu, memory FROM monitor ORDER BY ts; ++ ++ +-- truncate with time range +INSERT INTO monitor(ts, host, cpu, memory) VALUES +(1695217652000, 'host1', 66.6, 1024), +(1695217652000, 'host2', 66.6, 1024), +(1695217652000, 'host3', 66.6, 1024), +(1695217654000, 'host1', 77.7, 2048), +(1695217654000, 'host2', 77.7, 2048), +(1695217654000, 'host3', 77.7, 2048), +(1695217656000, 'host1', 88.8, 4096), +(1695217656000, 'host2', 88.8, 4096), +(1695217656000, 'host3', 88.8, 4096); + +Affected Rows: 9 + +ADMIN FLUSH_TABLE('monitor'); + ++------------------------------+ +| ADMIN FLUSH_TABLE('monitor') | ++------------------------------+ +| 0 | ++------------------------------+ + +INSERT INTO monitor(ts, host, cpu, memory) VALUES +(1700000000111, 'host42', 66.6, 1024); + +Affected Rows: 1 + +ADMIN FLUSH_TABLE('monitor'); + ++------------------------------+ +| ADMIN FLUSH_TABLE('monitor') | ++------------------------------+ +| 0 | ++------------------------------+ + +SELECT ts, host, cpu, memory FROM monitor ORDER BY ts; + ++-------------------------+--------+------+--------+ +| ts | host | cpu | memory | ++-------------------------+--------+------+--------+ +| 2023-09-20T13:47:32 | host1 | 66.6 | 1024.0 | +| 2023-09-20T13:47:32 | host2 | 66.6 | 1024.0 | +| 2023-09-20T13:47:32 | host3 | 66.6 | 1024.0 | +| 2023-09-20T13:47:34 | host1 | 77.7 | 2048.0 | +| 2023-09-20T13:47:34 | host2 | 77.7 | 2048.0 | +| 2023-09-20T13:47:34 | host3 | 77.7 | 2048.0 | +| 2023-09-20T13:47:36 | host1 | 88.8 | 4096.0 | +| 2023-09-20T13:47:36 | host2 | 88.8 | 4096.0 | +| 2023-09-20T13:47:36 | host3 | 88.8 | 4096.0 | +| 2023-11-14T22:13:20.111 | host42 | 66.6 | 1024.0 | ++-------------------------+--------+------+--------+ + +TRUNCATE monitor FILE RANGE (0, 1700000000000), (1700000000111, 1700000000200); + +Affected Rows: 0 + +SELECT ts, host, cpu, memory FROM monitor ORDER BY ts; + +++ +++ + INSERT INTO monitor(ts, host, cpu, memory) VALUES (1695217660000, 'host1', 88.8, 4096), (1695217662000, 'host2', 88.8, 4096), diff --git a/tests/cases/standalone/common/truncate/truncate.sql b/tests/cases/standalone/common/truncate/truncate.sql index 66cfd813cc..a116664f40 100644 --- a/tests/cases/standalone/common/truncate/truncate.sql +++ b/tests/cases/standalone/common/truncate/truncate.sql @@ -19,6 +19,31 @@ TRUNCATE monitor; SELECT ts, host, cpu, memory FROM monitor ORDER BY ts; +-- truncate with time range +INSERT INTO monitor(ts, host, cpu, memory) VALUES +(1695217652000, 'host1', 66.6, 1024), +(1695217652000, 'host2', 66.6, 1024), +(1695217652000, 'host3', 66.6, 1024), +(1695217654000, 'host1', 77.7, 2048), +(1695217654000, 'host2', 77.7, 2048), +(1695217654000, 'host3', 77.7, 2048), +(1695217656000, 'host1', 88.8, 4096), +(1695217656000, 'host2', 88.8, 4096), +(1695217656000, 'host3', 88.8, 4096); + +ADMIN FLUSH_TABLE('monitor'); + +INSERT INTO monitor(ts, host, cpu, memory) VALUES +(1700000000111, 'host42', 66.6, 1024); + +ADMIN FLUSH_TABLE('monitor'); + +SELECT ts, host, cpu, memory FROM monitor ORDER BY ts; + +TRUNCATE monitor FILE RANGE (0, 1700000000000), (1700000000111, 1700000000200); + +SELECT ts, host, cpu, memory FROM monitor ORDER BY ts; + INSERT INTO monitor(ts, host, cpu, memory) VALUES (1695217660000, 'host1', 88.8, 4096), (1695217662000, 'host2', 88.8, 4096),