feat: add partial truncate (#6602)

* feat: add partial truncate

Signed-off-by: discord9 <discord9@163.com>

* fix: per review

Signed-off-by: discord9 <discord9@163.com>

* feat: add proto partial truncate kind

Signed-off-by: discord9 <discord9@163.com>

* chore: clippy

Signed-off-by: discord9 <discord9@163.com>

* chore: update branched proto

Signed-off-by: discord9 <discord9@163.com>

* feat: grpc support truncate WIP sql support

Signed-off-by: discord9 <discord9@163.com>

* wip: parse truncate range

Signed-off-by: discord9 <discord9@163.com>

* feat: truncate by range

Signed-off-by: discord9 <discord9@163.com>

* fix: truncate range display

Signed-off-by: discord9 <discord9@163.com>

* chore: resolve todo

Signed-off-by: discord9 <discord9@163.com>

* refactor: per review

Signed-off-by: discord9 <discord9@163.com>

* test: more invalid parse

Signed-off-by: discord9 <discord9@163.com>

* chore: per review

Signed-off-by: discord9 <discord9@163.com>

* refactor: per review

Signed-off-by: discord9 <discord9@163.com>

* chore: unused

Signed-off-by: discord9 <discord9@163.com>

* chore: per review

Signed-off-by: discord9 <discord9@163.com>

* chore: update branch

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2025-08-04 18:50:27 +08:00
committed by Zhenchi
parent b31d307eb6
commit dc37382946
25 changed files with 938 additions and 102 deletions

View File

@@ -17,6 +17,7 @@ use std::any::Any;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use common_time::timestamp::TimeUnit;
use datatypes::prelude::ConcreteDataType;
use snafu::prelude::*;
use snafu::Location;
@@ -66,12 +67,28 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid time unit: {time_unit}"))]
InvalidTimeUnit {
time_unit: i32,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Inconsistent time unit: {:?}", units))]
InconsistentTimeUnit {
units: Vec<TimeUnit>,
#[snafu(implicit)]
location: Location,
},
}
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::UnknownColumnDataType { .. } => StatusCode::InvalidArguments,
Error::UnknownColumnDataType { .. }
| Error::InvalidTimeUnit { .. }
| Error::InconsistentTimeUnit { .. } => StatusCode::InvalidArguments,
Error::IntoColumnDataType { .. } | Error::SerializeJson { .. } => {
StatusCode::Unexpected
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use std::sync::Arc;
use common_base::BitVec;
@@ -46,7 +47,7 @@ use greptime_proto::v1::{
use paste::paste;
use snafu::prelude::*;
use crate::error::{self, Result};
use crate::error::{self, InconsistentTimeUnitSnafu, InvalidTimeUnitSnafu, Result};
use crate::v1::column::Values;
use crate::v1::{Column, ColumnDataType, Value as GrpcValue};
@@ -1079,6 +1080,89 @@ pub fn value_to_grpc_value(value: Value) -> GrpcValue {
}
}
pub fn from_pb_time_unit(unit: v1::TimeUnit) -> TimeUnit {
match unit {
v1::TimeUnit::Second => TimeUnit::Second,
v1::TimeUnit::Millisecond => TimeUnit::Millisecond,
v1::TimeUnit::Microsecond => TimeUnit::Microsecond,
v1::TimeUnit::Nanosecond => TimeUnit::Nanosecond,
}
}
pub fn to_pb_time_unit(unit: TimeUnit) -> v1::TimeUnit {
match unit {
TimeUnit::Second => v1::TimeUnit::Second,
TimeUnit::Millisecond => v1::TimeUnit::Millisecond,
TimeUnit::Microsecond => v1::TimeUnit::Microsecond,
TimeUnit::Nanosecond => v1::TimeUnit::Nanosecond,
}
}
pub fn from_pb_time_ranges(time_ranges: v1::TimeRanges) -> Result<Vec<(Timestamp, Timestamp)>> {
if time_ranges.time_ranges.is_empty() {
return Ok(vec![]);
}
let proto_time_unit = v1::TimeUnit::try_from(time_ranges.time_unit).map_err(|_| {
InvalidTimeUnitSnafu {
time_unit: time_ranges.time_unit,
}
.build()
})?;
let time_unit = from_pb_time_unit(proto_time_unit);
Ok(time_ranges
.time_ranges
.into_iter()
.map(|r| {
(
Timestamp::new(r.start, time_unit),
Timestamp::new(r.end, time_unit),
)
})
.collect())
}
/// All time_ranges must be of the same time unit.
///
/// if input `time_ranges` is empty, it will return a default `TimeRanges` with `Millisecond` as the time unit.
pub fn to_pb_time_ranges(time_ranges: &[(Timestamp, Timestamp)]) -> Result<v1::TimeRanges> {
let is_same_time_unit = time_ranges.windows(2).all(|x| {
x[0].0.unit() == x[1].0.unit()
&& x[0].1.unit() == x[1].1.unit()
&& x[0].0.unit() == x[0].1.unit()
});
if !is_same_time_unit {
let all_time_units: Vec<_> = time_ranges
.iter()
.map(|(s, e)| [s.unit(), e.unit()])
.clone()
.flatten()
.collect::<HashSet<_>>()
.into_iter()
.collect();
InconsistentTimeUnitSnafu {
units: all_time_units,
}
.fail()?
}
let mut pb_time_ranges = v1::TimeRanges {
// default time unit is Millisecond
time_unit: v1::TimeUnit::Millisecond as i32,
time_ranges: Vec::with_capacity(time_ranges.len()),
};
if let Some((start, _end)) = time_ranges.first() {
pb_time_ranges.time_unit = to_pb_time_unit(start.unit()) as i32;
}
for (start, end) in time_ranges {
pb_time_ranges.time_ranges.push(v1::TimeRange {
start: start.value(),
end: end.value(),
});
}
Ok(pb_time_ranges)
}
#[cfg(test)]
mod tests {
use std::sync::Arc;