Compare commits

...

3 Commits

Author SHA1 Message Date
shuiyisong
66a784b58a fix: fix dest_keys chunks bug in TombstoneManager (#6432) (#6448)
* fix(meta): fix dest_keys_chunks bug in TombstoneManager



* chore: fix typo



* fix: fix sqlness tests



---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
Co-authored-by: Weny Xu <wenymedia@gmail.com>
2025-07-03 04:21:57 +00:00
Yingwen
ce95e051ff fix: do not add projection to cast timestamp in label_values (#6040)
* fix: do not add projection for cast

Use cast to build time filter directly instead of adding a projection,
which will cause column not found

* feat: cast before creating plan
2025-06-17 11:52:45 -07:00
shuiyisong
de08ddafc8 fix: logical table missing column
Signed-off-by: shuiyisong <xixing.sys@gmail.com>
2025-06-17 11:43:48 -07:00
8 changed files with 243 additions and 84 deletions

View File

@@ -875,7 +875,10 @@ impl TableMetadataManager {
) -> Result<()> {
let table_metadata_keys =
self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
self.tombstone_manager.delete(table_metadata_keys).await
self.tombstone_manager
.delete(table_metadata_keys)
.await
.map(|_| ())
}
/// Restores metadata for table.

View File

@@ -14,19 +14,23 @@
use std::collections::HashMap;
use common_telemetry::debug;
use snafu::ensure;
use crate::error::{self, Result};
use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::BatchGetRequest;
use crate::rpc::store::{BatchDeleteRequest, BatchGetRequest};
/// [TombstoneManager] provides the ability to:
/// - logically delete values
/// - restore the deleted values
pub(crate) struct TombstoneManager {
kv_backend: KvBackendRef,
// Only used for testing.
#[cfg(test)]
max_txn_ops: Option<usize>,
}
const TOMBSTONE_PREFIX: &str = "__tombstone/";
@@ -38,7 +42,16 @@ fn to_tombstone(key: &[u8]) -> Vec<u8> {
impl TombstoneManager {
/// Returns [TombstoneManager].
pub fn new(kv_backend: KvBackendRef) -> Self {
Self { kv_backend }
Self {
kv_backend,
#[cfg(test)]
max_txn_ops: None,
}
}
#[cfg(test)]
pub fn set_max_txn_ops(&mut self, max_txn_ops: usize) {
self.max_txn_ops = Some(max_txn_ops);
}
/// Moves value to `dest_key`.
@@ -67,11 +80,15 @@ impl TombstoneManager {
(txn, TxnOpGetResponseSet::filter(src_key))
}
async fn move_values_inner(&self, keys: &[Vec<u8>], dest_keys: &[Vec<u8>]) -> Result<()> {
async fn move_values_inner(&self, keys: &[Vec<u8>], dest_keys: &[Vec<u8>]) -> Result<usize> {
ensure!(
keys.len() == dest_keys.len(),
error::UnexpectedSnafu {
err_msg: "The length of keys does not match the length of dest_keys."
err_msg: format!(
"The length of keys({}) does not match the length of dest_keys({}).",
keys.len(),
dest_keys.len()
),
}
);
// The key -> dest key mapping.
@@ -102,7 +119,7 @@ impl TombstoneManager {
.unzip();
let mut resp = self.kv_backend.txn(Txn::merge_all(txns)).await?;
if resp.succeeded {
return Ok(());
return Ok(keys.len());
}
let mut set = TxnOpGetResponseSet::from(&mut resp.responses);
// Updates results.
@@ -124,17 +141,45 @@ impl TombstoneManager {
.fail()
}
/// Moves values to `dest_key`.
async fn move_values(&self, keys: Vec<Vec<u8>>, dest_keys: Vec<Vec<u8>>) -> Result<()> {
let chunk_size = self.kv_backend.max_txn_ops() / 2;
if keys.len() > chunk_size {
let keys_chunks = keys.chunks(chunk_size).collect::<Vec<_>>();
let dest_keys_chunks = keys.chunks(chunk_size).collect::<Vec<_>>();
for (keys, dest_keys) in keys_chunks.into_iter().zip(dest_keys_chunks) {
self.move_values_inner(keys, dest_keys).await?;
}
fn max_txn_ops(&self) -> usize {
#[cfg(test)]
if let Some(max_txn_ops) = self.max_txn_ops {
return max_txn_ops;
}
self.kv_backend.max_txn_ops()
}
Ok(())
/// Moves values to `dest_key`.
///
/// Returns the number of keys that were moved.
async fn move_values(&self, keys: Vec<Vec<u8>>, dest_keys: Vec<Vec<u8>>) -> Result<usize> {
ensure!(
keys.len() == dest_keys.len(),
error::UnexpectedSnafu {
err_msg: format!(
"The length of keys({}) does not match the length of dest_keys({}).",
keys.len(),
dest_keys.len()
),
}
);
if keys.is_empty() {
return Ok(0);
}
let chunk_size = self.max_txn_ops() / 2;
if keys.len() > chunk_size {
debug!(
"Moving values with multiple chunks, keys len: {}, chunk_size: {}",
keys.len(),
chunk_size
);
let mut moved_keys = 0;
let keys_chunks = keys.chunks(chunk_size).collect::<Vec<_>>();
let dest_keys_chunks = dest_keys.chunks(chunk_size).collect::<Vec<_>>();
for (keys, dest_keys) in keys_chunks.into_iter().zip(dest_keys_chunks) {
moved_keys += self.move_values_inner(keys, dest_keys).await?;
}
Ok(moved_keys)
} else {
self.move_values_inner(&keys, &dest_keys).await
}
@@ -154,7 +199,7 @@ impl TombstoneManager {
})
.unzip();
self.move_values(keys, dest_keys).await
self.move_values(keys, dest_keys).await.map(|_| ())
}
/// Restores tombstones for keys.
@@ -171,20 +216,22 @@ impl TombstoneManager {
})
.unzip();
self.move_values(keys, dest_keys).await
self.move_values(keys, dest_keys).await.map(|_| ())
}
/// Deletes tombstones values for the specified `keys`.
pub(crate) async fn delete(&self, keys: Vec<Vec<u8>>) -> Result<()> {
let operations = keys
.iter()
.map(|key| TxnOp::Delete(to_tombstone(key)))
.collect::<Vec<_>>();
///
/// Returns the number of keys that were deleted.
pub async fn delete(&self, keys: Vec<Vec<u8>>) -> Result<usize> {
let keys = keys.iter().map(|key| to_tombstone(key)).collect::<Vec<_>>();
let txn = Txn::new().and_then(operations);
// Always success.
let _ = self.kv_backend.txn(txn).await?;
Ok(())
let num_keys = keys.len();
let _ = self
.kv_backend
.batch_delete(BatchDeleteRequest::new().with_keys(keys))
.await?;
Ok(num_keys)
}
}
@@ -373,16 +420,73 @@ mod tests {
.into_iter()
.map(|kv| (kv.key, kv.dest_key))
.unzip();
tombstone_manager
let moved_keys = tombstone_manager
.move_values(keys.clone(), dest_keys.clone())
.await
.unwrap();
assert_eq!(kvs.len(), moved_keys);
check_moved_values(kv_backend.clone(), &move_values).await;
// Moves again
tombstone_manager
let moved_keys = tombstone_manager
.move_values(keys.clone(), dest_keys.clone())
.await
.unwrap();
assert_eq!(0, moved_keys);
check_moved_values(kv_backend.clone(), &move_values).await;
}
#[tokio::test]
async fn test_move_values_with_max_txn_ops() {
common_telemetry::init_default_ut_logging();
let kv_backend = Arc::new(MemoryKvBackend::default());
let mut tombstone_manager = TombstoneManager::new(kv_backend.clone());
tombstone_manager.set_max_txn_ops(4);
let kvs = HashMap::from([
(b"bar".to_vec(), b"baz".to_vec()),
(b"foo".to_vec(), b"hi".to_vec()),
(b"baz".to_vec(), b"hello".to_vec()),
(b"qux".to_vec(), b"world".to_vec()),
(b"quux".to_vec(), b"world".to_vec()),
(b"quuux".to_vec(), b"world".to_vec()),
(b"quuuux".to_vec(), b"world".to_vec()),
(b"quuuuux".to_vec(), b"world".to_vec()),
(b"quuuuuux".to_vec(), b"world".to_vec()),
]);
for (key, value) in &kvs {
kv_backend
.put(
PutRequest::new()
.with_key(key.clone())
.with_value(value.clone()),
)
.await
.unwrap();
}
let move_values = kvs
.iter()
.map(|(key, value)| MoveValue {
key: key.clone(),
dest_key: to_tombstone(key),
value: value.clone(),
})
.collect::<Vec<_>>();
let (keys, dest_keys): (Vec<_>, Vec<_>) = move_values
.clone()
.into_iter()
.map(|kv| (kv.key, kv.dest_key))
.unzip();
let moved_keys = tombstone_manager
.move_values(keys.clone(), dest_keys.clone())
.await
.unwrap();
assert_eq!(kvs.len(), moved_keys);
check_moved_values(kv_backend.clone(), &move_values).await;
// Moves again
let moved_keys = tombstone_manager
.move_values(keys.clone(), dest_keys.clone())
.await
.unwrap();
assert_eq!(0, moved_keys);
check_moved_values(kv_backend.clone(), &move_values).await;
}
@@ -420,17 +524,19 @@ mod tests {
.unzip();
keys.push(b"non-exists".to_vec());
dest_keys.push(b"hi/non-exists".to_vec());
tombstone_manager
let moved_keys = tombstone_manager
.move_values(keys.clone(), dest_keys.clone())
.await
.unwrap();
check_moved_values(kv_backend.clone(), &move_values).await;
assert_eq!(3, moved_keys);
// Moves again
tombstone_manager
let moved_keys = tombstone_manager
.move_values(keys.clone(), dest_keys.clone())
.await
.unwrap();
check_moved_values(kv_backend.clone(), &move_values).await;
assert_eq!(0, moved_keys);
}
#[tokio::test]
@@ -471,10 +577,11 @@ mod tests {
.into_iter()
.map(|kv| (kv.key, kv.dest_key))
.unzip();
tombstone_manager
let moved_keys = tombstone_manager
.move_values(keys, dest_keys)
.await
.unwrap();
assert_eq!(kvs.len(), moved_keys);
}
#[tokio::test]
@@ -552,4 +659,24 @@ mod tests {
.unwrap();
check_moved_values(kv_backend.clone(), &move_values).await;
}
#[tokio::test]
async fn test_move_values_with_different_lengths() {
let kv_backend = Arc::new(MemoryKvBackend::default());
let tombstone_manager = TombstoneManager::new(kv_backend.clone());
let keys = vec![b"bar".to_vec(), b"foo".to_vec()];
let dest_keys = vec![b"bar".to_vec(), b"foo".to_vec(), b"baz".to_vec()];
let err = tombstone_manager
.move_values(keys, dest_keys)
.await
.unwrap_err();
assert!(err
.to_string()
.contains("The length of keys(2) does not match the length of dest_keys(3)."),);
let moved_keys = tombstone_manager.move_values(vec![], vec![]).await.unwrap();
assert_eq!(0, moved_keys);
}
}

View File

@@ -897,7 +897,7 @@ impl StreamingEngine {
let rows_send = self.run_available(true).await?;
let row = self.send_writeback_requests().await?;
debug!(
"Done to flush flow_id={:?} with {} input rows flushed, {} rows sended and {} output rows flushed",
"Done to flush flow_id={:?} with {} input rows flushed, {} rows sent and {} output rows flushed",
flow_id, flushed_input_rows, rows_send, row
);
Ok(row)

View File

@@ -34,7 +34,8 @@ use store_api::metric_engine_consts::{
METADATA_SCHEMA_VALUE_COLUMN_INDEX, METADATA_SCHEMA_VALUE_COLUMN_NAME,
};
use store_api::mito_engine_options::{
APPEND_MODE_KEY, MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING, SKIP_WAL_KEY, TTL_KEY,
APPEND_MODE_KEY, MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING, MEMTABLE_TYPE, SKIP_WAL_KEY,
TTL_KEY,
};
use store_api::region_engine::RegionEngine;
use store_api::region_request::{AffectedRows, RegionCreateRequest, RegionRequest};
@@ -564,6 +565,7 @@ pub(crate) fn region_options_for_metadata_region(
original.remove(APPEND_MODE_KEY);
// Don't allow to set primary key encoding for metadata region.
original.remove(MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING);
original.remove(MEMTABLE_TYPE);
original.insert(TTL_KEY.to_string(), FOREVER.to_string());
original.remove(SKIP_WAL_KEY);
original

View File

@@ -17,6 +17,7 @@ use std::any::Any;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use common_time::timestamp::TimeUnit;
use datafusion::error::DataFusionError;
use promql::error::Error as PromqlError;
use promql_parser::parser::token::TokenType;
@@ -192,6 +193,14 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Timestamp out of range: {} of {:?}", timestamp, unit))]
TimestampOutOfRange {
timestamp: i64,
unit: TimeUnit,
#[snafu(implicit)]
location: Location,
},
}
impl ErrorExt for Error {
@@ -211,7 +220,8 @@ impl ErrorExt for Error {
| UnsupportedVectorMatch { .. }
| CombineTableColumnMismatch { .. }
| UnexpectedPlanExpr { .. }
| UnsupportedMatcherOp { .. } => StatusCode::InvalidArguments,
| UnsupportedMatcherOp { .. }
| TimestampOutOfRange { .. } => StatusCode::InvalidArguments,
UnknownTable { .. } => StatusCode::Internal,

View File

@@ -14,80 +14,73 @@
use std::time::{SystemTime, UNIX_EPOCH};
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use datafusion_common::{Column, ScalarValue};
use datafusion_expr::expr::Alias;
use datafusion_expr::utils::conjunction;
use datafusion_expr::{col, Cast, Expr, LogicalPlan, LogicalPlanBuilder};
use datafusion_sql::TableReference;
use datatypes::arrow::datatypes::{DataType as ArrowDataType, TimeUnit as ArrowTimeUnit};
use datatypes::prelude::ConcreteDataType;
use datafusion_expr::{col, Expr, LogicalPlan, LogicalPlanBuilder};
use snafu::{OptionExt, ResultExt};
use table::TableRef;
use crate::promql::error::{DataFusionPlanningSnafu, Result, TimeIndexNotFoundSnafu};
use crate::promql::error::{
DataFusionPlanningSnafu, Result, TimeIndexNotFoundSnafu, TimestampOutOfRangeSnafu,
};
fn build_time_filter(time_index_expr: Expr, start: i64, end: i64) -> Expr {
fn build_time_filter(time_index_expr: Expr, start: Timestamp, end: Timestamp) -> Expr {
time_index_expr
.clone()
.gt_eq(Expr::Literal(ScalarValue::TimestampMillisecond(
Some(start),
None,
)))
.and(
time_index_expr.lt_eq(Expr::Literal(ScalarValue::TimestampMillisecond(
Some(end),
None,
))),
)
.gt_eq(Expr::Literal(timestamp_to_scalar_value(start)))
.and(time_index_expr.lt_eq(Expr::Literal(timestamp_to_scalar_value(end))))
}
fn timestamp_to_scalar_value(timestamp: Timestamp) -> ScalarValue {
let value = timestamp.value();
match timestamp.unit() {
TimeUnit::Second => ScalarValue::TimestampSecond(Some(value), None),
TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(Some(value), None),
TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(Some(value), None),
TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(Some(value), None),
}
}
/// Rewrite label values query to DataFusion logical plan.
pub fn rewrite_label_values_query(
table: TableRef,
mut scan_plan: LogicalPlan,
scan_plan: LogicalPlan,
mut conditions: Vec<Expr>,
label_name: String,
start: SystemTime,
end: SystemTime,
) -> Result<LogicalPlan> {
let table_ref = TableReference::partial(
table.table_info().schema_name.as_str(),
table.table_info().name.as_str(),
);
let schema = table.schema();
let ts_column = schema
.timestamp_column()
.with_context(|| TimeIndexNotFoundSnafu {
table: table.table_info().full_table_name(),
})?;
let unit = ts_column
.data_type
.as_timestamp()
.map(|data_type| data_type.unit())
.with_context(|| TimeIndexNotFoundSnafu {
table: table.table_info().full_table_name(),
})?;
let is_time_index_ms =
ts_column.data_type == ConcreteDataType::timestamp_millisecond_datatype();
// We only support millisecond precision at most.
let start =
Timestamp::new_millisecond(start.duration_since(UNIX_EPOCH).unwrap().as_millis() as i64);
let start = start.convert_to(unit).context(TimestampOutOfRangeSnafu {
timestamp: start.value(),
unit,
})?;
let end =
Timestamp::new_millisecond(end.duration_since(UNIX_EPOCH).unwrap().as_millis() as i64);
let end = end.convert_to(unit).context(TimestampOutOfRangeSnafu {
timestamp: end.value(),
unit,
})?;
let time_index_expr = col(Column::from_name(ts_column.name.clone()));
if !is_time_index_ms {
// cast to ms if time_index not in Millisecond precision
let expr = vec![
col(Column::from_name(label_name.clone())),
Expr::Alias(Alias {
expr: Box::new(Expr::Cast(Cast {
expr: Box::new(time_index_expr.clone()),
data_type: ArrowDataType::Timestamp(ArrowTimeUnit::Millisecond, None),
})),
relation: Some(table_ref),
name: ts_column.name.clone(),
}),
];
scan_plan = LogicalPlanBuilder::from(scan_plan)
.project(expr)
.context(DataFusionPlanningSnafu)?
.build()
.context(DataFusionPlanningSnafu)?;
};
let start = start.duration_since(UNIX_EPOCH).unwrap().as_millis() as i64;
let end = end.duration_since(UNIX_EPOCH).unwrap().as_millis() as i64;
conditions.push(build_time_filter(time_index_expr, start, end));
// Safety: `conditions` is not empty.
let filter = conjunction(conditions).unwrap();

View File

@@ -492,11 +492,15 @@ pub async fn setup_test_prom_app_with_frontend(
// build physical table
let sql = "CREATE TABLE phy (ts timestamp time index, val double, host string primary key) engine=metric with ('physical_metric_table' = '')";
run_sql(sql, &instance).await;
let sql = "CREATE TABLE phy_ns (ts timestamp(0) time index, val double, host string primary key) engine=metric with ('physical_metric_table' = '')";
run_sql(sql, &instance).await;
// build metric tables
let sql = "CREATE TABLE demo (ts timestamp time index, val double, host string primary key) engine=metric with ('on_physical_table' = 'phy')";
run_sql(sql, &instance).await;
let sql = "CREATE TABLE demo_metrics (ts timestamp time index, val double, idc string primary key) engine=metric with ('on_physical_table' = 'phy')";
run_sql(sql, &instance).await;
let sql = "CREATE TABLE multi_labels (ts timestamp(0) time index, val double, idc string, env string, host string, primary key (idc, env, host)) engine=metric with ('on_physical_table' = 'phy_ns')";
run_sql(sql, &instance).await;
// insert rows
let sql = "INSERT INTO demo(host, val, ts) VALUES ('host1', 1.1, 0), ('host2', 2.1, 600000)";
@@ -508,6 +512,10 @@ pub async fn setup_test_prom_app_with_frontend(
let sql = "INSERT INTO demo_metrics(val, ts) VALUES (1.1, 0)";
run_sql(sql, &instance).await;
// insert rows to multi_labels
let sql = "INSERT INTO multi_labels(idc, env, host, val, ts) VALUES ('idc1', 'dev', 'host1', 1.1, 0), ('idc1', 'dev', 'host2', 2.1, 0), ('idc2', 'dev', 'host1', 1.1, 0), ('idc2', 'test', 'host3', 2.1, 0)";
run_sql(sql, &instance).await;
// build physical table
let sql = "CREATE TABLE phy2 (ts timestamp(9) time index, val double, host string primary key) engine=metric with ('physical_metric_table' = '')";
run_sql(sql, &instance).await;

View File

@@ -604,8 +604,10 @@ pub async fn test_prom_http_api(store_type: StorageType) {
assert_eq!(body.status, "success");
assert_eq!(
body.data,
serde_json::from_value::<PrometheusResponse>(json!(["__name__", "host", "idc", "number",]))
.unwrap()
serde_json::from_value::<PrometheusResponse>(json!([
"__name__", "env", "host", "idc", "number",
]))
.unwrap()
);
// labels query with multiple match[] params
@@ -726,6 +728,19 @@ pub async fn test_prom_http_api(store_type: StorageType) {
serde_json::from_value::<PrometheusResponse>(json!(["idc1"])).unwrap()
);
// match labels.
let res = client
.get("/v1/prometheus/api/v1/label/host/values?match[]=multi_labels{idc=\"idc1\", env=\"dev\"}&start=0&end=600")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<PrometheusJsonResponse>(&res.text().await).unwrap();
assert_eq!(body.status, "success");
assert_eq!(
body.data,
serde_json::from_value::<PrometheusResponse>(json!(["host1", "host2"])).unwrap()
);
// search field name
let res = client
.get("/v1/prometheus/api/v1/label/__field__/values?match[]=demo")
@@ -815,6 +830,7 @@ pub async fn test_prom_http_api(store_type: StorageType) {
"demo_metrics_with_nanos".to_string(),
"logic_table".to_string(),
"mito".to_string(),
"multi_labels".to_string(),
"numbers".to_string()
])
);