feat: add partial truncate (#6602)

* feat: add partial truncate

Signed-off-by: discord9 <discord9@163.com>

* fix: per review

Signed-off-by: discord9 <discord9@163.com>

* feat: add proto partial truncate kind

Signed-off-by: discord9 <discord9@163.com>

* chore: clippy

Signed-off-by: discord9 <discord9@163.com>

* chore: update branched proto

Signed-off-by: discord9 <discord9@163.com>

* feat: grpc support truncate WIP sql support

Signed-off-by: discord9 <discord9@163.com>

* wip: parse truncate range

Signed-off-by: discord9 <discord9@163.com>

* feat: truncate by range

Signed-off-by: discord9 <discord9@163.com>

* fix: truncate range display

Signed-off-by: discord9 <discord9@163.com>

* chore: resolve todo

Signed-off-by: discord9 <discord9@163.com>

* refactor: per review

Signed-off-by: discord9 <discord9@163.com>

* test: more invalid parse

Signed-off-by: discord9 <discord9@163.com>

* chore: per review

Signed-off-by: discord9 <discord9@163.com>

* refactor: per review

Signed-off-by: discord9 <discord9@163.com>

* chore: unused

Signed-off-by: discord9 <discord9@163.com>

* chore: per review

Signed-off-by: discord9 <discord9@163.com>

* chore: update branch

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2025-08-04 18:50:27 +08:00
committed by GitHub
parent 414101fafa
commit 1afa0afc67
25 changed files with 938 additions and 102 deletions

2
Cargo.lock generated
View File

@@ -5310,7 +5310,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=91c3d7b97a2850c014aa5ce4ffa4caeb6b918446#91c3d7b97a2850c014aa5ce4ffa4caeb6b918446"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=ccfd4da48bc0254ed865e479cd981a3581b02d84#ccfd4da48bc0254ed865e479cd981a3581b02d84"
dependencies = [
"prost 0.13.5",
"serde",

View File

@@ -140,7 +140,7 @@ etcd-client = "0.14"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "91c3d7b97a2850c014aa5ce4ffa4caeb6b918446" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "ccfd4da48bc0254ed865e479cd981a3581b02d84" }
hex = "0.4"
http = "1"
humantime = "2.1"

View File

@@ -17,6 +17,7 @@ use std::any::Any;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use common_time::timestamp::TimeUnit;
use datatypes::prelude::ConcreteDataType;
use snafu::prelude::*;
use snafu::Location;
@@ -66,12 +67,28 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid time unit: {time_unit}"))]
InvalidTimeUnit {
time_unit: i32,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Inconsistent time unit: {:?}", units))]
InconsistentTimeUnit {
units: Vec<TimeUnit>,
#[snafu(implicit)]
location: Location,
},
}
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::UnknownColumnDataType { .. } => StatusCode::InvalidArguments,
Error::UnknownColumnDataType { .. }
| Error::InvalidTimeUnit { .. }
| Error::InconsistentTimeUnit { .. } => StatusCode::InvalidArguments,
Error::IntoColumnDataType { .. } | Error::SerializeJson { .. } => {
StatusCode::Unexpected
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use std::sync::Arc;
use common_base::BitVec;
@@ -46,7 +47,7 @@ use greptime_proto::v1::{
use paste::paste;
use snafu::prelude::*;
use crate::error::{self, Result};
use crate::error::{self, InconsistentTimeUnitSnafu, InvalidTimeUnitSnafu, Result};
use crate::v1::column::Values;
use crate::v1::{Column, ColumnDataType, Value as GrpcValue};
@@ -1079,6 +1080,89 @@ pub fn value_to_grpc_value(value: Value) -> GrpcValue {
}
}
pub fn from_pb_time_unit(unit: v1::TimeUnit) -> TimeUnit {
match unit {
v1::TimeUnit::Second => TimeUnit::Second,
v1::TimeUnit::Millisecond => TimeUnit::Millisecond,
v1::TimeUnit::Microsecond => TimeUnit::Microsecond,
v1::TimeUnit::Nanosecond => TimeUnit::Nanosecond,
}
}
pub fn to_pb_time_unit(unit: TimeUnit) -> v1::TimeUnit {
match unit {
TimeUnit::Second => v1::TimeUnit::Second,
TimeUnit::Millisecond => v1::TimeUnit::Millisecond,
TimeUnit::Microsecond => v1::TimeUnit::Microsecond,
TimeUnit::Nanosecond => v1::TimeUnit::Nanosecond,
}
}
pub fn from_pb_time_ranges(time_ranges: v1::TimeRanges) -> Result<Vec<(Timestamp, Timestamp)>> {
if time_ranges.time_ranges.is_empty() {
return Ok(vec![]);
}
let proto_time_unit = v1::TimeUnit::try_from(time_ranges.time_unit).map_err(|_| {
InvalidTimeUnitSnafu {
time_unit: time_ranges.time_unit,
}
.build()
})?;
let time_unit = from_pb_time_unit(proto_time_unit);
Ok(time_ranges
.time_ranges
.into_iter()
.map(|r| {
(
Timestamp::new(r.start, time_unit),
Timestamp::new(r.end, time_unit),
)
})
.collect())
}
/// All time_ranges must be of the same time unit.
///
/// if input `time_ranges` is empty, it will return a default `TimeRanges` with `Millisecond` as the time unit.
pub fn to_pb_time_ranges(time_ranges: &[(Timestamp, Timestamp)]) -> Result<v1::TimeRanges> {
let is_same_time_unit = time_ranges.windows(2).all(|x| {
x[0].0.unit() == x[1].0.unit()
&& x[0].1.unit() == x[1].1.unit()
&& x[0].0.unit() == x[0].1.unit()
});
if !is_same_time_unit {
let all_time_units: Vec<_> = time_ranges
.iter()
.map(|(s, e)| [s.unit(), e.unit()])
.clone()
.flatten()
.collect::<HashSet<_>>()
.into_iter()
.collect();
InconsistentTimeUnitSnafu {
units: all_time_units,
}
.fail()?
}
let mut pb_time_ranges = v1::TimeRanges {
// default time unit is Millisecond
time_unit: v1::TimeUnit::Millisecond as i32,
time_ranges: Vec::with_capacity(time_ranges.len()),
};
if let Some((start, _end)) = time_ranges.first() {
pb_time_ranges.time_unit = to_pb_time_unit(start.unit()) as i32;
}
for (start, end) in time_ranges {
pb_time_ranges.time_ranges.push(v1::TimeRange {
start: start.value(),
end: end.value(),
});
}
Ok(pb_time_ranges)
}
#[cfg(test)]
mod tests {
use std::sync::Arc;

View File

@@ -12,8 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::helper::to_pb_time_ranges;
use api::v1::region::{
region_request, RegionRequest, RegionRequestHeader, TruncateRequest as PbTruncateRegionRequest,
region_request, truncate_request, RegionRequest, RegionRequestHeader,
TruncateRequest as PbTruncateRegionRequest,
};
use async_trait::async_trait;
use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
@@ -33,7 +35,7 @@ use table::table_reference::TableReference;
use crate::ddl::utils::{add_peer_context_if_needed, map_to_procedure_error};
use crate::ddl::DdlContext;
use crate::error::{Result, TableNotFoundSnafu};
use crate::error::{ConvertTimeRangesSnafu, Result, TableNotFoundSnafu};
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
use crate::key::DeserializedValueWithBytes;
@@ -153,6 +155,15 @@ impl TruncateTableProcedure {
datanode
);
let time_ranges = &self.data.task.time_ranges;
let kind = if time_ranges.is_empty() {
truncate_request::Kind::All(api::v1::region::All {})
} else {
let pb_time_ranges =
to_pb_time_ranges(time_ranges).context(ConvertTimeRangesSnafu)?;
truncate_request::Kind::TimeRanges(pb_time_ranges)
};
let request = RegionRequest {
header: Some(RegionRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
@@ -160,6 +171,7 @@ impl TruncateTableProcedure {
}),
body: Some(region_request::Body::Truncate(PbTruncateRegionRequest {
region_id: region_id.as_u64(),
kind: Some(kind),
})),
};

View File

@@ -971,6 +971,13 @@ pub enum Error {
source: api::error::Error,
},
#[snafu(display("Failed to convert time ranges"))]
ConvertTimeRanges {
#[snafu(implicit)]
location: Location,
source: api::error::Error,
},
#[snafu(display(
"Column metadata inconsistencies found in table: {}, table_id: {}",
table_name,
@@ -1045,7 +1052,8 @@ impl ErrorExt for Error {
| KafkaGetOffset { .. }
| ReadFlexbuffers { .. }
| SerializeFlexbuffers { .. }
| DeserializeFlexbuffers { .. } => StatusCode::Unexpected,
| DeserializeFlexbuffers { .. }
| ConvertTimeRanges { .. } => StatusCode::Unexpected,
SendMessage { .. } | GetKvCache { .. } | CacheNotGet { .. } => StatusCode::Internal,

View File

@@ -18,6 +18,7 @@ pub mod trigger;
use std::collections::{HashMap, HashSet};
use std::result;
use api::helper::{from_pb_time_ranges, to_pb_time_ranges};
use api::v1::alter_database_expr::Kind as PbAlterDatabaseKind;
use api::v1::meta::ddl_task_request::Task;
use api::v1::meta::{
@@ -38,7 +39,8 @@ use api::v1::{
};
use base64::engine::general_purpose;
use base64::Engine as _;
use common_time::{DatabaseTimeToLive, Timezone};
use common_error::ext::BoxedError;
use common_time::{DatabaseTimeToLive, Timestamp, Timezone};
use prost::Message;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DefaultOnNull};
@@ -49,8 +51,8 @@ use table::table_name::TableName;
use table::table_reference::TableReference;
use crate::error::{
self, InvalidSetDatabaseOptionSnafu, InvalidTimeZoneSnafu, InvalidUnsetDatabaseOptionSnafu,
Result,
self, ConvertTimeRangesSnafu, ExternalSnafu, InvalidSetDatabaseOptionSnafu,
InvalidTimeZoneSnafu, InvalidUnsetDatabaseOptionSnafu, Result,
};
use crate::key::FlowId;
@@ -179,12 +181,14 @@ impl DdlTask {
schema: String,
table: String,
table_id: TableId,
time_ranges: Vec<(Timestamp, Timestamp)>,
) -> Self {
DdlTask::TruncateTable(TruncateTableTask {
catalog,
schema,
table,
table_id,
time_ranges,
})
}
@@ -826,6 +830,7 @@ pub struct TruncateTableTask {
pub schema: String,
pub table: String,
pub table_id: TableId,
pub time_ranges: Vec<(Timestamp, Timestamp)>,
}
impl TruncateTableTask {
@@ -864,6 +869,13 @@ impl TryFrom<PbTruncateTableTask> for TruncateTableTask {
err_msg: "expected table_id",
})?
.id,
time_ranges: truncate_table
.time_ranges
.map(from_pb_time_ranges)
.transpose()
.map_err(BoxedError::new)
.context(ExternalSnafu)?
.unwrap_or_default(),
})
}
}
@@ -878,6 +890,9 @@ impl TryFrom<TruncateTableTask> for PbTruncateTableTask {
schema_name: task.schema,
table_name: task.table,
table_id: Some(api::v1::TableId { id: task.table_id }),
time_ranges: Some(
to_pb_time_ranges(&task.time_ranges).context(ConvertTimeRangesSnafu)?,
),
}),
})
}

View File

@@ -558,7 +558,7 @@ impl fmt::Debug for Timestamp {
}
}
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum TimeUnit {
Second,
#[default]

View File

@@ -1462,7 +1462,10 @@ mod tests {
);
let err = mock_region_server
.handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {}))
.handle_request(
region_id,
RegionRequest::Truncate(RegionTruncateRequest::All),
)
.await
.unwrap_err();

View File

@@ -14,6 +14,7 @@
use std::sync::Arc;
use api::helper::from_pb_time_ranges;
use api::v1::ddl_request::{Expr as DdlExpr, Expr};
use api::v1::greptime_request::Request;
use api::v1::query_request::Query;
@@ -24,6 +25,7 @@ use api::v1::{
use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use common_base::AffectedRows;
use common_error::ext::BoxedError;
use common_grpc::flight::FlightDecoder;
use common_grpc::FlightData;
use common_query::logical_plan::add_insert_to_logical_plan;
@@ -39,7 +41,7 @@ use table::table_name::TableName;
use table::TableRef;
use crate::error::{
CatalogSnafu, DataFusionSnafu, Error, InFlightWriteBytesExceededSnafu,
CatalogSnafu, DataFusionSnafu, Error, ExternalSnafu, InFlightWriteBytesExceededSnafu,
IncompleteGrpcRequestSnafu, NotSupportedSnafu, PermissionSnafu, PlanStatementSnafu, Result,
SubstraitDecodeLogicalPlanSnafu, TableNotFoundSnafu, TableOperationSnafu,
};
@@ -195,8 +197,11 @@ impl GrpcQueryHandler for Instance {
DdlExpr::TruncateTable(expr) => {
let table_name =
TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name);
let time_ranges = from_pb_time_ranges(expr.time_ranges.unwrap_or_default())
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
self.statement_executor
.truncate_table(table_name, ctx.clone())
.truncate_table(table_name, time_ranges, ctx.clone())
.await?
}
DdlExpr::CreateFlow(expr) => {

View File

@@ -69,7 +69,10 @@ async fn test_engine_truncate_region_basic() {
// Truncate the region.
engine
.handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {}))
.handle_request(
region_id,
RegionRequest::Truncate(RegionTruncateRequest::All),
)
.await
.unwrap();
@@ -118,7 +121,10 @@ async fn test_engine_put_data_after_truncate() {
// Truncate the region.
engine
.handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {}))
.handle_request(
region_id,
RegionRequest::Truncate(RegionTruncateRequest::All),
)
.await
.unwrap();
@@ -194,7 +200,10 @@ async fn test_engine_truncate_after_flush() {
// Truncate the region.
engine
.handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {}))
.handle_request(
region_id,
RegionRequest::Truncate(RegionTruncateRequest::All),
)
.await
.unwrap();
@@ -249,7 +258,10 @@ async fn test_engine_truncate_reopen() {
// Truncate the region
engine
.handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {}))
.handle_request(
region_id,
RegionRequest::Truncate(RegionTruncateRequest::All),
)
.await
.unwrap();
@@ -337,7 +349,10 @@ async fn test_engine_truncate_during_flush() {
// Truncate the region.
engine
.handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {}))
.handle_request(
region_id,
RegionRequest::Truncate(RegionTruncateRequest::All),
)
.await
.unwrap();

View File

@@ -66,10 +66,21 @@ pub struct RegionRemove {
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct RegionTruncate {
pub region_id: RegionId,
/// Last WAL entry id of truncated data.
pub truncated_entry_id: EntryId,
// Last sequence number of truncated data.
pub truncated_sequence: SequenceNumber,
pub kind: TruncateKind,
}
/// The kind of truncate operation.
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub enum TruncateKind {
/// Truncate all data in the region, marked by all data before the given entry id&sequence.
All {
/// Last WAL entry id of truncated data.
truncated_entry_id: EntryId,
// Last sequence number of truncated data.
truncated_sequence: SequenceNumber,
},
/// Only remove certain files in the region.
Partial { files_to_remove: Vec<FileMeta> },
}
/// The region manifest data.
@@ -147,10 +158,22 @@ impl RegionManifestBuilder {
pub fn apply_truncate(&mut self, manifest_version: ManifestVersion, truncate: RegionTruncate) {
self.manifest_version = manifest_version;
self.flushed_entry_id = truncate.truncated_entry_id;
self.flushed_sequence = truncate.truncated_sequence;
self.truncated_entry_id = Some(truncate.truncated_entry_id);
self.files.clear();
match truncate.kind {
TruncateKind::All {
truncated_entry_id,
truncated_sequence,
} => {
self.flushed_entry_id = truncated_entry_id;
self.flushed_sequence = truncated_sequence;
self.truncated_entry_id = Some(truncated_entry_id);
self.files.clear();
}
TruncateKind::Partial { files_to_remove } => {
for file in files_to_remove {
self.files.remove(&file.file_id);
}
}
}
}
/// Check if the builder keeps a [RegionMetadata](store_api::metadata::RegionMetadata).

View File

@@ -31,7 +31,7 @@ use store_api::metadata::RegionMetadataRef;
use store_api::storage::SequenceNumber;
use crate::error::Result;
use crate::manifest::action::RegionEdit;
use crate::manifest::action::{RegionEdit, TruncateKind};
use crate::memtable::time_partition::{TimePartitions, TimePartitionsRef};
use crate::memtable::version::{MemtableVersion, MemtableVersionRef};
use crate::memtable::{MemtableBuilderRef, MemtableId};
@@ -196,8 +196,7 @@ impl VersionControl {
/// Truncate current version.
pub(crate) fn truncate(
&self,
truncated_entry_id: EntryId,
truncated_sequence: SequenceNumber,
truncate_kind: TruncateKind,
memtable_builder: &MemtableBuilderRef,
) {
let version = self.current().version;
@@ -210,13 +209,23 @@ impl VersionControl {
next_memtable_id,
Some(part_duration),
));
let new_version = Arc::new(
VersionBuilder::new(version.metadata.clone(), new_mutable)
.flushed_entry_id(truncated_entry_id)
.flushed_sequence(truncated_sequence)
.truncated_entry_id(Some(truncated_entry_id))
.build(),
);
let new_version = match truncate_kind {
TruncateKind::All {
truncated_entry_id,
truncated_sequence,
} => Arc::new(
VersionBuilder::new(version.metadata.clone(), new_mutable)
.flushed_entry_id(truncated_entry_id)
.flushed_sequence(truncated_sequence)
.truncated_entry_id(Some(truncated_entry_id))
.build(),
),
TruncateKind::Partial { files_to_remove } => Arc::new(
VersionBuilder::from_version(version)
.remove_files(files_to_remove.into_iter())
.build(),
),
};
let mut version_data = self.data.write().unwrap();
version_data.version.ssts.mark_all_deleted();
@@ -416,6 +425,14 @@ impl VersionBuilder {
self
}
pub(crate) fn remove_files(mut self, files: impl Iterator<Item = FileMeta>) -> Self {
let mut ssts = (*self.ssts).clone();
ssts.remove_files(files);
self.ssts = Arc::new(ssts);
self
}
/// Builds a new [Version] from the builder.
/// It overwrites the window size by compaction option.
pub(crate) fn build(self) -> Version {

View File

@@ -38,7 +38,7 @@ use store_api::region_request::{
RegionCloseRequest, RegionCompactRequest, RegionCreateRequest, RegionFlushRequest,
RegionOpenRequest, RegionRequest, RegionTruncateRequest,
};
use store_api::storage::{RegionId, SequenceNumber};
use store_api::storage::RegionId;
use store_api::ManifestVersion;
use tokio::sync::oneshot::{self, Receiver, Sender};
@@ -46,7 +46,7 @@ use crate::error::{
CompactRegionSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu, Error, FillDefaultSnafu,
FlushRegionSnafu, InvalidRequestSnafu, Result, UnexpectedSnafu,
};
use crate::manifest::action::RegionEdit;
use crate::manifest::action::{RegionEdit, TruncateKind};
use crate::memtable::bulk::part::BulkPart;
use crate::memtable::MemtableId;
use crate::metrics::COMPACTION_ELAPSED_TOTAL;
@@ -886,10 +886,7 @@ pub(crate) struct TruncateResult {
pub(crate) sender: OptionOutputTx,
/// Truncate result.
pub(crate) result: Result<()>,
/// Truncated entry id.
pub(crate) truncated_entry_id: EntryId,
/// Truncated sequence.
pub(crate) truncated_sequence: SequenceNumber,
pub(crate) kind: TruncateKind,
}
/// Notifies the region the result of writing region change action.

View File

@@ -944,8 +944,8 @@ impl<S: LogStore> RegionWorkerLoop<S> {
.await;
continue;
}
DdlRequest::Truncate(_) => {
self.handle_truncate_request(ddl.region_id, ddl.sender)
DdlRequest::Truncate(req) => {
self.handle_truncate_request(ddl.region_id, req, ddl.sender)
.await;
continue;
}

View File

@@ -317,8 +317,7 @@ impl<S> RegionWorkerLoop<S> {
region_id: truncate.region_id,
sender,
result,
truncated_entry_id: truncate.truncated_entry_id,
truncated_sequence: truncate.truncated_sequence,
kind: truncate.kind,
};
let _ = request_sender
.send(WorkerRequestWithTime::new(WorkerRequest::Background {

View File

@@ -14,12 +14,13 @@
//! Handling truncate related requests.
use common_telemetry::info;
use common_telemetry::{debug, info};
use store_api::logstore::LogStore;
use store_api::region_request::RegionTruncateRequest;
use store_api::storage::RegionId;
use crate::error::RegionNotFoundSnafu;
use crate::manifest::action::RegionTruncate;
use crate::manifest::action::{RegionTruncate, TruncateKind};
use crate::region::RegionLeaderState;
use crate::request::{OptionOutputTx, TruncateResult};
use crate::worker::RegionWorkerLoop;
@@ -28,24 +29,65 @@ impl<S: LogStore> RegionWorkerLoop<S> {
pub(crate) async fn handle_truncate_request(
&mut self,
region_id: RegionId,
req: RegionTruncateRequest,
mut sender: OptionOutputTx,
) {
let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else {
return;
};
info!("Try to truncate region {}", region_id);
let version_data = region.version_control.current();
let truncated_entry_id = version_data.last_entry_id;
let truncated_sequence = version_data.committed_sequence;
// Write region truncated to manifest.
let truncate = RegionTruncate {
region_id,
truncated_entry_id,
truncated_sequence,
let truncate = match req {
RegionTruncateRequest::All => {
info!("Try to fully truncate region {}", region_id);
let truncated_entry_id = version_data.last_entry_id;
let truncated_sequence = version_data.committed_sequence;
// Write region truncated to manifest.
RegionTruncate {
region_id,
kind: TruncateKind::All {
truncated_entry_id,
truncated_sequence,
},
}
}
RegionTruncateRequest::ByTimeRanges { time_ranges } => {
info!(
"Try to partially truncate region {} by time ranges: {:?}",
region_id, time_ranges
);
// find all files that are fully contained in the time ranges
let mut files_to_truncate = Vec::new();
for level in version_data.version.ssts.levels() {
for file in level.files() {
let file_time_range = file.time_range();
// TODO(discord9): This is a naive way to check if is contained, we should
// optimize it later.
let is_subset = time_ranges.iter().any(|(start, end)| {
file_time_range.0 >= *start && file_time_range.1 <= *end
});
if is_subset {
files_to_truncate.push(file.meta_ref().clone());
}
}
}
debug!(
"Found {} files to partially truncate in region {}",
files_to_truncate.len(),
region_id
);
RegionTruncate {
region_id,
kind: TruncateKind::Partial {
files_to_remove: files_to_truncate,
},
}
}
};
self.handle_manifest_truncate_action(region, truncate, sender);
}
@@ -68,11 +110,9 @@ impl<S: LogStore> RegionWorkerLoop<S> {
match truncate_result.result {
Ok(()) => {
// Applies the truncate action to the region.
region.version_control.truncate(
truncate_result.truncated_entry_id,
truncate_result.truncated_sequence,
&region.memtable_builder,
);
region
.version_control
.truncate(truncate_result.kind.clone(), &region.memtable_builder);
}
Err(e) => {
// Unable to truncate the region.
@@ -86,23 +126,25 @@ impl<S: LogStore> RegionWorkerLoop<S> {
// Notifies compaction scheduler.
self.compaction_scheduler.on_region_truncated(region_id);
// Make all data obsolete.
if let Err(e) = self
.wal
.obsolete(
region_id,
truncate_result.truncated_entry_id,
&region.provider,
)
.await
if let TruncateKind::All {
truncated_entry_id,
truncated_sequence: _,
} = &truncate_result.kind
{
truncate_result.sender.send(Err(e));
return;
// Make all data obsolete.
if let Err(e) = self
.wal
.obsolete(region_id, *truncated_entry_id, &region.provider)
.await
{
truncate_result.sender.send(Err(e));
return;
}
}
info!(
"Complete truncating region: {}, entry id: {} and sequence: {}.",
region_id, truncate_result.truncated_entry_id, truncate_result.truncated_sequence
"Complete truncating region: {}, kind: {:?}.",
region_id, truncate_result.kind
);
truncate_result.sender.send(Ok(0));

View File

@@ -47,6 +47,7 @@ use common_telemetry::tracing;
use common_time::range::TimestampRange;
use common_time::Timestamp;
use datafusion_expr::LogicalPlan;
use datatypes::prelude::ConcreteDataType;
use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
use query::parser::QueryStatement;
use query::QueryEngineRef;
@@ -73,8 +74,8 @@ use self::set::{
};
use crate::error::{
self, CatalogSnafu, ExecLogicalPlanSnafu, ExternalSnafu, InvalidSqlSnafu, NotSupportedSnafu,
PlanStatementSnafu, Result, SchemaNotFoundSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu,
UpgradeCatalogManagerRefSnafu,
PlanStatementSnafu, Result, SchemaNotFoundSnafu, SqlCommonSnafu, TableMetadataManagerSnafu,
TableNotFoundSnafu, UnexpectedSnafu, UpgradeCatalogManagerRefSnafu,
};
use crate::insert::InserterRef;
use crate::statement::copy_database::{COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY};
@@ -306,7 +307,11 @@ impl StatementExecutor {
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let table_name = TableName::new(catalog, schema, table);
self.truncate_table(table_name, query_ctx).await
let time_ranges = self
.convert_truncate_time_ranges(&table_name, stmt.time_ranges(), &query_ctx)
.await?;
self.truncate_table(table_name, time_ranges, query_ctx)
.await
}
Statement::CreateDatabase(stmt) => {
self.create_database(
@@ -530,6 +535,84 @@ impl StatementExecutor {
pub fn cache_invalidator(&self) -> &CacheInvalidatorRef {
&self.cache_invalidator
}
/// Convert truncate time ranges for the given table from sql values to timestamps
///
pub async fn convert_truncate_time_ranges(
&self,
table_name: &TableName,
sql_values_time_range: &[(sqlparser::ast::Value, sqlparser::ast::Value)],
query_ctx: &QueryContextRef,
) -> Result<Vec<(Timestamp, Timestamp)>> {
if sql_values_time_range.is_empty() {
return Ok(vec![]);
}
let table = self.get_table(&table_name.table_ref()).await?;
let info = table.table_info();
let time_index_dt = info
.meta
.schema
.timestamp_column()
.context(UnexpectedSnafu {
violated: "Table must have a timestamp column",
})?;
let time_unit = time_index_dt
.data_type
.as_timestamp()
.with_context(|| UnexpectedSnafu {
violated: format!(
"Table {}'s time index column must be a timestamp type, found: {:?}",
table_name, time_index_dt
),
})?
.unit();
let mut time_ranges = Vec::with_capacity(sql_values_time_range.len());
for (start, end) in sql_values_time_range {
let start = common_sql::convert::sql_value_to_value(
"range_start",
&ConcreteDataType::timestamp_datatype(time_unit),
start,
Some(&query_ctx.timezone()),
None,
false,
)
.context(SqlCommonSnafu)
.and_then(|v| {
if let datatypes::value::Value::Timestamp(t) = v {
Ok(t)
} else {
error::InvalidSqlSnafu {
err_msg: format!("Expected a timestamp value, found {v:?}"),
}
.fail()
}
})?;
let end = common_sql::convert::sql_value_to_value(
"range_end",
&ConcreteDataType::timestamp_datatype(time_unit),
end,
Some(&query_ctx.timezone()),
None,
false,
)
.context(SqlCommonSnafu)
.and_then(|v| {
if let datatypes::value::Value::Timestamp(t) = v {
Ok(t)
} else {
error::InvalidSqlSnafu {
err_msg: format!("Expected a timestamp value, found {v:?}"),
}
.fail()
}
})?;
time_ranges.push((start, end));
}
Ok(time_ranges)
}
}
fn to_copy_query_request(stmt: CopyQueryToArgument) -> Result<CopyQueryToRequest> {

View File

@@ -46,7 +46,7 @@ use common_meta::rpc::ddl::{
use common_query::Output;
use common_sql::convert::sql_value_to_value;
use common_telemetry::{debug, info, tracing, warn};
use common_time::Timezone;
use common_time::{Timestamp, Timezone};
use datafusion_common::tree_node::TreeNodeVisitor;
use datafusion_expr::LogicalPlan;
use datatypes::prelude::ConcreteDataType;
@@ -1151,6 +1151,7 @@ impl StatementExecutor {
pub async fn truncate_table(
&self,
table_name: TableName,
time_ranges: Vec<(Timestamp, Timestamp)>,
query_context: QueryContextRef,
) -> Result<Output> {
ensure!(
@@ -1174,7 +1175,7 @@ impl StatementExecutor {
table_name: table_name.to_string(),
})?;
let table_id = table.table_info().table_id();
self.truncate_table_procedure(&table_name, table_id, query_context)
self.truncate_table_procedure(&table_name, table_id, time_ranges, query_context)
.await?;
Ok(Output::new_with_affected_rows(0))
@@ -1490,6 +1491,7 @@ impl StatementExecutor {
&self,
table_name: &TableName,
table_id: TableId,
time_ranges: Vec<(Timestamp, Timestamp)>,
query_context: QueryContextRef,
) -> Result<SubmitDdlTaskResponse> {
let request = SubmitDdlTaskRequest {
@@ -1499,6 +1501,7 @@ impl StatementExecutor {
table_name.schema_name.to_string(),
table_name.table_name.to_string(),
table_id,
time_ranges,
),
};

View File

@@ -14,8 +14,9 @@
use snafu::{ensure, ResultExt};
use sqlparser::keywords::Keyword;
use sqlparser::tokenizer::Token;
use crate::error::{self, InvalidTableNameSnafu, Result};
use crate::error::{self, InvalidSqlSnafu, InvalidTableNameSnafu, Result, UnexpectedTokenSnafu};
use crate::parser::ParserContext;
use crate::statements::statement::Statement;
use crate::statements::truncate::TruncateTable;
@@ -41,7 +42,92 @@ impl ParserContext<'_> {
}
);
Ok(Statement::TruncateTable(TruncateTable::new(table_ident)))
let have_range = self.parser.parse_keywords(&[Keyword::FILE, Keyword::RANGE]);
// if no range is specified, we just truncate the table
if !have_range {
return Ok(Statement::TruncateTable(TruncateTable::new(table_ident)));
}
// parse a list of time ranges consist of (Timestamp, Timestamp),?
let mut time_ranges = vec![];
loop {
let _ = self
.parser
.expect_token(&sqlparser::tokenizer::Token::LParen)
.with_context(|_| error::UnexpectedSnafu {
expected: "a left parenthesis",
actual: self.peek_token_as_string(),
})?;
// parse to values here, no need to valid in parser
let start = self
.parser
.parse_value()
.with_context(|e| error::UnexpectedSnafu {
expected: "a timestamp value",
actual: e.to_string(),
})?;
let _ = self
.parser
.expect_token(&sqlparser::tokenizer::Token::Comma)
.with_context(|_| error::UnexpectedSnafu {
expected: "a comma",
actual: self.peek_token_as_string(),
})?;
let end = self
.parser
.parse_value()
.with_context(|_| error::UnexpectedSnafu {
expected: "a timestamp",
actual: self.peek_token_as_string(),
})?;
let _ = self
.parser
.expect_token(&sqlparser::tokenizer::Token::RParen)
.with_context(|_| error::UnexpectedSnafu {
expected: "a right parenthesis",
actual: self.peek_token_as_string(),
})?;
time_ranges.push((start, end));
let peek = self.parser.peek_token().token;
match peek {
sqlparser::tokenizer::Token::EOF | Token::SemiColon => {
if time_ranges.is_empty() {
return Err(InvalidSqlSnafu {
msg: "TRUNCATE TABLE RANGE must have at least one range".to_string(),
}
.build());
}
break;
}
Token::Comma => {
self.parser.next_token(); // Consume the comma
let next_peek = self.parser.peek_token().token; // Peek the token after the comma
if matches!(next_peek, Token::EOF | Token::SemiColon) {
break; // Trailing comma, end of statement
}
// Otherwise, continue to parse next range
continue;
}
_ => UnexpectedTokenSnafu {
expected: "a comma or end of statement",
actual: self.peek_token_as_string(),
}
.fail()?,
}
}
Ok(Statement::TruncateTable(TruncateTable::new_with_ranges(
table_ident,
time_ranges,
)))
}
}
@@ -53,6 +139,145 @@ mod tests {
use crate::dialect::GreptimeDbDialect;
use crate::parser::ParseOptions;
#[test]
pub fn test_parse_truncate_with_ranges() {
let sql = r#"TRUNCATE foo FILE RANGE (0, 20)"#;
let mut stmts =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap();
assert_eq!(
stmts.pop().unwrap(),
Statement::TruncateTable(TruncateTable::new_with_ranges(
ObjectName(vec![Ident::new("foo")]),
vec![(
sqlparser::ast::Value::Number("0".to_string(), false),
sqlparser::ast::Value::Number("20".to_string(), false)
)]
))
);
let sql = r#"TRUNCATE TABLE foo FILE RANGE ("2000-01-01 00:00:00+00:00", "2000-01-01 00:00:00+00:00"), (2,33)"#;
let mut stmts =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap();
assert_eq!(
stmts.pop().unwrap(),
Statement::TruncateTable(TruncateTable::new_with_ranges(
ObjectName(vec![Ident::new("foo")]),
vec![
(
sqlparser::ast::Value::DoubleQuotedString(
"2000-01-01 00:00:00+00:00".to_string()
),
sqlparser::ast::Value::DoubleQuotedString(
"2000-01-01 00:00:00+00:00".to_string()
)
),
(
sqlparser::ast::Value::Number("2".to_string(), false),
sqlparser::ast::Value::Number("33".to_string(), false)
)
]
))
);
let sql = "TRUNCATE TABLE my_schema.foo FILE RANGE (1, 2), (3, 4),";
let mut stmts =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap();
assert_eq!(
stmts.pop().unwrap(),
Statement::TruncateTable(TruncateTable::new_with_ranges(
ObjectName(vec![Ident::new("my_schema"), Ident::new("foo")]),
vec![
(
sqlparser::ast::Value::Number("1".to_string(), false),
sqlparser::ast::Value::Number("2".to_string(), false)
),
(
sqlparser::ast::Value::Number("3".to_string(), false),
sqlparser::ast::Value::Number("4".to_string(), false)
)
]
))
);
let sql = "TRUNCATE my_schema.foo FILE RANGE (1,2),";
let mut stmts =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap();
assert_eq!(
stmts.pop().unwrap(),
Statement::TruncateTable(TruncateTable::new_with_ranges(
ObjectName(vec![Ident::new("my_schema"), Ident::new("foo")]),
vec![(
sqlparser::ast::Value::Number("1".to_string(), false),
sqlparser::ast::Value::Number("2".to_string(), false)
)]
))
);
let sql = "TRUNCATE TABLE my_catalog.my_schema.foo FILE RANGE (1,2);";
let mut stmts =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap();
assert_eq!(
stmts.pop().unwrap(),
Statement::TruncateTable(TruncateTable::new_with_ranges(
ObjectName(vec![
Ident::new("my_catalog"),
Ident::new("my_schema"),
Ident::new("foo")
]),
vec![(
sqlparser::ast::Value::Number("1".to_string(), false),
sqlparser::ast::Value::Number("2".to_string(), false)
)]
))
);
let sql = "TRUNCATE drop FILE RANGE (1,2)";
let mut stmts =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap();
assert_eq!(
stmts.pop().unwrap(),
Statement::TruncateTable(TruncateTable::new_with_ranges(
ObjectName(vec![Ident::new("drop")]),
vec![(
sqlparser::ast::Value::Number("1".to_string(), false),
sqlparser::ast::Value::Number("2".to_string(), false)
)]
))
);
let sql = "TRUNCATE `drop`";
let mut stmts =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap();
assert_eq!(
stmts.pop().unwrap(),
Statement::TruncateTable(TruncateTable::new(ObjectName(vec![Ident::with_quote(
'`', "drop"
),])))
);
let sql = "TRUNCATE \"drop\" FILE RANGE (\"1\", \"2\")";
let mut stmts =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap();
assert_eq!(
stmts.pop().unwrap(),
Statement::TruncateTable(TruncateTable::new_with_ranges(
ObjectName(vec![Ident::with_quote('"', "drop")]),
vec![(
sqlparser::ast::Value::DoubleQuotedString("1".to_string()),
sqlparser::ast::Value::DoubleQuotedString("2".to_string())
)]
))
);
}
#[test]
pub fn test_parse_truncate() {
let sql = "TRUNCATE foo";
@@ -153,5 +378,82 @@ mod tests {
let result =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
assert!(result.is_err(), "result is: {result:?}");
let sql = "TRUNCATE TABLE foo RANGE";
let result =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
assert!(
result.is_err()
&& format!("{result:?}").contains("SQL statement is not supported, keyword: RANGE"),
"result is: {result:?}"
);
let sql = "TRUNCATE TABLE foo FILE";
let result =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
assert!(
result.is_err()
&& format!("{result:?}").contains("SQL statement is not supported, keyword: FILE"),
"result is: {result:?}"
);
let sql = "TRUNCATE TABLE foo FILE RANGE";
let result =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
assert!(
result.is_err() && format!("{result:?}").contains("expected: 'a left parenthesis'"),
"result is: {result:?}"
);
let sql = "TRUNCATE TABLE foo FILE RANGE (";
let result =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
assert!(
result.is_err() && format!("{result:?}").contains("expected: 'a timestamp value'"),
"result is: {result:?}"
);
let sql = "TRUNCATE TABLE foo FILE RANGE ()";
let result =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
assert!(
result.is_err() && format!("{result:?}").contains("expected: 'a timestamp value'"),
"result is: {result:?}"
);
let sql = "TRUNCATE TABLE foo FILE RANGE (1 2) (3 4)";
let result =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
assert!(
result.is_err() && format!("{result:?}").contains("expected: 'a comma'"),
"result is: {result:?}"
);
let sql = "TRUNCATE TABLE foo FILE RANGE (,),(3,4)";
let result =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
assert!(
result.is_err() && format!("{result:?}").contains("Expected: a value, found: ,"),
"result is: {result:?}"
);
let sql = "TRUNCATE TABLE foo FILE RANGE (1,2) (3,4)";
let result =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
assert!(
result.is_err()
&& format!("{result:?}")
.contains("expected: 'a comma or end of statement', found: ("),
"result is: {result:?}"
);
let sql = "TRUNCATE TABLE foo FILE RANGE (1,2),,,,,,,,,(3,4)";
let result =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
assert!(
result.is_err()
&& format!("{result:?}").contains("expected: 'a left parenthesis', found: ,"),
"result is: {result:?}"
);
}
}

View File

@@ -14,31 +14,85 @@
use std::fmt::Display;
use itertools::Itertools;
use serde::Serialize;
use sqlparser::ast::ObjectName;
use sqlparser_derive::{Visit, VisitMut};
use sqlparser::ast::{ObjectName, Visit, VisitMut, Visitor, VisitorMut};
/// TRUNCATE TABLE statement.
#[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut, Serialize)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
pub struct TruncateTable {
table_name: ObjectName,
time_ranges: Vec<(sqlparser::ast::Value, sqlparser::ast::Value)>,
}
impl Visit for TruncateTable {
fn visit<V: Visitor>(&self, visitor: &mut V) -> ::std::ops::ControlFlow<V::Break> {
self.table_name.visit(visitor)?;
for (start, end) in &self.time_ranges {
start.visit(visitor)?;
end.visit(visitor)?;
}
::std::ops::ControlFlow::Continue(())
}
}
impl VisitMut for TruncateTable {
fn visit<V: VisitorMut>(&mut self, visitor: &mut V) -> ::std::ops::ControlFlow<V::Break> {
sqlparser::ast::VisitMut::visit(&mut self.table_name, visitor)?;
for (start, end) in &mut self.time_ranges {
start.visit(visitor)?;
end.visit(visitor)?;
}
::std::ops::ControlFlow::Continue(())
}
}
impl TruncateTable {
/// Creates a statement for `TRUNCATE TABLE`
pub fn new(table_name: ObjectName) -> Self {
Self { table_name }
Self {
table_name,
time_ranges: Vec::new(),
}
}
/// Creates a statement for `TRUNCATE TABLE RANGE`
pub fn new_with_ranges(
table_name: ObjectName,
time_ranges: Vec<(sqlparser::ast::Value, sqlparser::ast::Value)>,
) -> Self {
Self {
table_name,
time_ranges,
}
}
pub fn table_name(&self) -> &ObjectName {
&self.table_name
}
pub fn time_ranges(&self) -> &[(sqlparser::ast::Value, sqlparser::ast::Value)] {
&self.time_ranges
}
}
impl Display for TruncateTable {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let table_name = self.table_name();
write!(f, r#"TRUNCATE TABLE {table_name}"#)
write!(f, r#"TRUNCATE TABLE {table_name}"#)?;
if self.time_ranges.is_empty() {
return Ok(());
}
write!(f, " FILE RANGE ")?;
write!(
f,
"{}",
self.time_ranges
.iter()
.map(|(start, end)| format!("({}, {})", start, end))
.join(", ")
)
}
}
@@ -71,5 +125,45 @@ TRUNCATE TABLE t1"#,
unreachable!();
}
}
let sql = r"truncate table t1 file range (1,2);";
let stmts: Vec<Statement> =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap();
assert_eq!(1, stmts.len());
assert_matches!(&stmts[0], Statement::TruncateTable { .. });
match &stmts[0] {
Statement::TruncateTable(trunc) => {
let new_sql = format!("\n{}", trunc);
assert_eq!(
r#"
TRUNCATE TABLE t1 FILE RANGE (1, 2)"#,
&new_sql
);
}
_ => {
unreachable!();
}
}
let sql = r"truncate table t1 file range (1,2), (3,4);";
let stmts: Vec<Statement> =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap();
assert_eq!(1, stmts.len());
assert_matches!(&stmts[0], Statement::TruncateTable { .. });
match &stmts[0] {
Statement::TruncateTable(trunc) => {
let new_sql = format!("\n{}", trunc);
assert_eq!(
r#"
TRUNCATE TABLE t1 FILE RANGE (1, 2), (3, 4)"#,
&new_sql
);
}
_ => {
unreachable!();
}
}
}
}

View File

@@ -1035,6 +1035,13 @@ pub enum MetadataError {
location: Location,
},
#[snafu(display("Failed to convert TimeRanges"))]
ConvertTimeRanges {
source: api::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid set region option request, key: {}, value: {}", key, value))]
InvalidSetRegionOptionRequest {
key: String,

View File

@@ -15,16 +15,16 @@
use std::collections::HashMap;
use std::fmt::{self, Display};
use api::helper::ColumnDataTypeWrapper;
use api::helper::{from_pb_time_ranges, ColumnDataTypeWrapper};
use api::v1::add_column_location::LocationType;
use api::v1::column_def::{
as_fulltext_option_analyzer, as_fulltext_option_backend, as_skipping_index_type,
};
use api::v1::region::bulk_insert_request::Body;
use api::v1::region::{
alter_request, compact_request, region_request, AlterRequest, AlterRequests, BulkInsertRequest,
CloseRequest, CompactRequest, CreateRequest, CreateRequests, DeleteRequests, DropRequest,
DropRequests, FlushRequest, InsertRequests, OpenRequest, TruncateRequest,
alter_request, compact_request, region_request, truncate_request, AlterRequest, AlterRequests,
BulkInsertRequest, CloseRequest, CompactRequest, CreateRequest, CreateRequests, DeleteRequests,
DropRequest, DropRequests, FlushRequest, InsertRequests, OpenRequest, TruncateRequest,
};
use api::v1::{
self, Analyzer, ArrowIpc, FulltextBackend as PbFulltextBackend, Option as PbOption, Rows,
@@ -33,7 +33,7 @@ use api::v1::{
pub use common_base::AffectedRows;
use common_grpc::flight::FlightDecoder;
use common_recordbatch::DfRecordBatch;
use common_time::TimeToLive;
use common_time::{TimeToLive, Timestamp};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{FulltextOptions, SkippingIndexOptions};
use serde::{Deserialize, Serialize};
@@ -42,9 +42,10 @@ use strum::{AsRefStr, IntoStaticStr};
use crate::logstore::entry;
use crate::metadata::{
ColumnMetadata, DecodeProtoSnafu, FlightCodecSnafu, InvalidIndexOptionSnafu,
InvalidRawRegionRequestSnafu, InvalidRegionRequestSnafu, InvalidSetRegionOptionRequestSnafu,
InvalidUnsetRegionOptionRequestSnafu, MetadataError, RegionMetadata, Result, UnexpectedSnafu,
ColumnMetadata, ConvertTimeRangesSnafu, DecodeProtoSnafu, FlightCodecSnafu,
InvalidIndexOptionSnafu, InvalidRawRegionRequestSnafu, InvalidRegionRequestSnafu,
InvalidSetRegionOptionRequestSnafu, InvalidUnsetRegionOptionRequestSnafu, MetadataError,
RegionMetadata, Result, UnexpectedSnafu,
};
use crate::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
use crate::metrics;
@@ -340,10 +341,24 @@ fn make_region_compact(compact: CompactRequest) -> Result<Vec<(RegionId, RegionR
fn make_region_truncate(truncate: TruncateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
let region_id = truncate.region_id.into();
Ok(vec![(
region_id,
RegionRequest::Truncate(RegionTruncateRequest {}),
)])
match truncate.kind {
None => InvalidRawRegionRequestSnafu {
err: "missing kind in TruncateRequest".to_string(),
}
.fail(),
Some(truncate_request::Kind::All(_)) => Ok(vec![(
region_id,
RegionRequest::Truncate(RegionTruncateRequest::All),
)]),
Some(truncate_request::Kind::TimeRanges(time_ranges)) => {
let time_ranges = from_pb_time_ranges(time_ranges).context(ConvertTimeRangesSnafu)?;
Ok(vec![(
region_id,
RegionRequest::Truncate(RegionTruncateRequest::ByTimeRanges { time_ranges }),
)])
}
}
}
/// Convert [BulkInsertRequest] to [RegionRequest] and group by [RegionId].
@@ -1312,7 +1327,16 @@ impl Default for RegionCompactRequest {
/// Truncate region request.
#[derive(Debug)]
pub struct RegionTruncateRequest {}
pub enum RegionTruncateRequest {
/// Truncate all data in the region.
All,
ByTimeRanges {
/// Time ranges to truncate. Both bound are inclusive.
/// only files that are fully contained in the time range will be truncated.
/// so no guarantee that all data in the time range will be truncated.
time_ranges: Vec<(Timestamp, Timestamp)>,
},
}
/// Catchup region request.
///

View File

@@ -44,6 +44,67 @@ SELECT ts, host, cpu, memory FROM monitor ORDER BY ts;
++
++
-- truncate with time range
INSERT INTO monitor(ts, host, cpu, memory) VALUES
(1695217652000, 'host1', 66.6, 1024),
(1695217652000, 'host2', 66.6, 1024),
(1695217652000, 'host3', 66.6, 1024),
(1695217654000, 'host1', 77.7, 2048),
(1695217654000, 'host2', 77.7, 2048),
(1695217654000, 'host3', 77.7, 2048),
(1695217656000, 'host1', 88.8, 4096),
(1695217656000, 'host2', 88.8, 4096),
(1695217656000, 'host3', 88.8, 4096);
Affected Rows: 9
ADMIN FLUSH_TABLE('monitor');
+------------------------------+
| ADMIN FLUSH_TABLE('monitor') |
+------------------------------+
| 0 |
+------------------------------+
INSERT INTO monitor(ts, host, cpu, memory) VALUES
(1700000000111, 'host42', 66.6, 1024);
Affected Rows: 1
ADMIN FLUSH_TABLE('monitor');
+------------------------------+
| ADMIN FLUSH_TABLE('monitor') |
+------------------------------+
| 0 |
+------------------------------+
SELECT ts, host, cpu, memory FROM monitor ORDER BY ts;
+-------------------------+--------+------+--------+
| ts | host | cpu | memory |
+-------------------------+--------+------+--------+
| 2023-09-20T13:47:32 | host1 | 66.6 | 1024.0 |
| 2023-09-20T13:47:32 | host2 | 66.6 | 1024.0 |
| 2023-09-20T13:47:32 | host3 | 66.6 | 1024.0 |
| 2023-09-20T13:47:34 | host1 | 77.7 | 2048.0 |
| 2023-09-20T13:47:34 | host2 | 77.7 | 2048.0 |
| 2023-09-20T13:47:34 | host3 | 77.7 | 2048.0 |
| 2023-09-20T13:47:36 | host1 | 88.8 | 4096.0 |
| 2023-09-20T13:47:36 | host2 | 88.8 | 4096.0 |
| 2023-09-20T13:47:36 | host3 | 88.8 | 4096.0 |
| 2023-11-14T22:13:20.111 | host42 | 66.6 | 1024.0 |
+-------------------------+--------+------+--------+
TRUNCATE monitor FILE RANGE (0, 1700000000000), (1700000000111, 1700000000200);
Affected Rows: 0
SELECT ts, host, cpu, memory FROM monitor ORDER BY ts;
++
++
INSERT INTO monitor(ts, host, cpu, memory) VALUES
(1695217660000, 'host1', 88.8, 4096),
(1695217662000, 'host2', 88.8, 4096),

View File

@@ -19,6 +19,31 @@ TRUNCATE monitor;
SELECT ts, host, cpu, memory FROM monitor ORDER BY ts;
-- truncate with time range
INSERT INTO monitor(ts, host, cpu, memory) VALUES
(1695217652000, 'host1', 66.6, 1024),
(1695217652000, 'host2', 66.6, 1024),
(1695217652000, 'host3', 66.6, 1024),
(1695217654000, 'host1', 77.7, 2048),
(1695217654000, 'host2', 77.7, 2048),
(1695217654000, 'host3', 77.7, 2048),
(1695217656000, 'host1', 88.8, 4096),
(1695217656000, 'host2', 88.8, 4096),
(1695217656000, 'host3', 88.8, 4096);
ADMIN FLUSH_TABLE('monitor');
INSERT INTO monitor(ts, host, cpu, memory) VALUES
(1700000000111, 'host42', 66.6, 1024);
ADMIN FLUSH_TABLE('monitor');
SELECT ts, host, cpu, memory FROM monitor ORDER BY ts;
TRUNCATE monitor FILE RANGE (0, 1700000000000), (1700000000111, 1700000000200);
SELECT ts, host, cpu, memory FROM monitor ORDER BY ts;
INSERT INTO monitor(ts, host, cpu, memory) VALUES
(1695217660000, 'host1', 88.8, 4096),
(1695217662000, 'host2', 88.8, 4096),