mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-24 23:19:57 +00:00
Compare commits
3 Commits
release/v0
...
release/v0
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
66a784b58a | ||
|
|
ce95e051ff | ||
|
|
de08ddafc8 |
@@ -875,7 +875,10 @@ impl TableMetadataManager {
|
||||
) -> Result<()> {
|
||||
let table_metadata_keys =
|
||||
self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
|
||||
self.tombstone_manager.delete(table_metadata_keys).await
|
||||
self.tombstone_manager
|
||||
.delete(table_metadata_keys)
|
||||
.await
|
||||
.map(|_| ())
|
||||
}
|
||||
|
||||
/// Restores metadata for table.
|
||||
|
||||
@@ -14,19 +14,23 @@
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use common_telemetry::debug;
|
||||
use snafu::ensure;
|
||||
|
||||
use crate::error::{self, Result};
|
||||
use crate::key::txn_helper::TxnOpGetResponseSet;
|
||||
use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp};
|
||||
use crate::kv_backend::KvBackendRef;
|
||||
use crate::rpc::store::BatchGetRequest;
|
||||
use crate::rpc::store::{BatchDeleteRequest, BatchGetRequest};
|
||||
|
||||
/// [TombstoneManager] provides the ability to:
|
||||
/// - logically delete values
|
||||
/// - restore the deleted values
|
||||
pub(crate) struct TombstoneManager {
|
||||
kv_backend: KvBackendRef,
|
||||
// Only used for testing.
|
||||
#[cfg(test)]
|
||||
max_txn_ops: Option<usize>,
|
||||
}
|
||||
|
||||
const TOMBSTONE_PREFIX: &str = "__tombstone/";
|
||||
@@ -38,7 +42,16 @@ fn to_tombstone(key: &[u8]) -> Vec<u8> {
|
||||
impl TombstoneManager {
|
||||
/// Returns [TombstoneManager].
|
||||
pub fn new(kv_backend: KvBackendRef) -> Self {
|
||||
Self { kv_backend }
|
||||
Self {
|
||||
kv_backend,
|
||||
#[cfg(test)]
|
||||
max_txn_ops: None,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn set_max_txn_ops(&mut self, max_txn_ops: usize) {
|
||||
self.max_txn_ops = Some(max_txn_ops);
|
||||
}
|
||||
|
||||
/// Moves value to `dest_key`.
|
||||
@@ -67,11 +80,15 @@ impl TombstoneManager {
|
||||
(txn, TxnOpGetResponseSet::filter(src_key))
|
||||
}
|
||||
|
||||
async fn move_values_inner(&self, keys: &[Vec<u8>], dest_keys: &[Vec<u8>]) -> Result<()> {
|
||||
async fn move_values_inner(&self, keys: &[Vec<u8>], dest_keys: &[Vec<u8>]) -> Result<usize> {
|
||||
ensure!(
|
||||
keys.len() == dest_keys.len(),
|
||||
error::UnexpectedSnafu {
|
||||
err_msg: "The length of keys does not match the length of dest_keys."
|
||||
err_msg: format!(
|
||||
"The length of keys({}) does not match the length of dest_keys({}).",
|
||||
keys.len(),
|
||||
dest_keys.len()
|
||||
),
|
||||
}
|
||||
);
|
||||
// The key -> dest key mapping.
|
||||
@@ -102,7 +119,7 @@ impl TombstoneManager {
|
||||
.unzip();
|
||||
let mut resp = self.kv_backend.txn(Txn::merge_all(txns)).await?;
|
||||
if resp.succeeded {
|
||||
return Ok(());
|
||||
return Ok(keys.len());
|
||||
}
|
||||
let mut set = TxnOpGetResponseSet::from(&mut resp.responses);
|
||||
// Updates results.
|
||||
@@ -124,17 +141,45 @@ impl TombstoneManager {
|
||||
.fail()
|
||||
}
|
||||
|
||||
/// Moves values to `dest_key`.
|
||||
async fn move_values(&self, keys: Vec<Vec<u8>>, dest_keys: Vec<Vec<u8>>) -> Result<()> {
|
||||
let chunk_size = self.kv_backend.max_txn_ops() / 2;
|
||||
if keys.len() > chunk_size {
|
||||
let keys_chunks = keys.chunks(chunk_size).collect::<Vec<_>>();
|
||||
let dest_keys_chunks = keys.chunks(chunk_size).collect::<Vec<_>>();
|
||||
for (keys, dest_keys) in keys_chunks.into_iter().zip(dest_keys_chunks) {
|
||||
self.move_values_inner(keys, dest_keys).await?;
|
||||
}
|
||||
fn max_txn_ops(&self) -> usize {
|
||||
#[cfg(test)]
|
||||
if let Some(max_txn_ops) = self.max_txn_ops {
|
||||
return max_txn_ops;
|
||||
}
|
||||
self.kv_backend.max_txn_ops()
|
||||
}
|
||||
|
||||
Ok(())
|
||||
/// Moves values to `dest_key`.
|
||||
///
|
||||
/// Returns the number of keys that were moved.
|
||||
async fn move_values(&self, keys: Vec<Vec<u8>>, dest_keys: Vec<Vec<u8>>) -> Result<usize> {
|
||||
ensure!(
|
||||
keys.len() == dest_keys.len(),
|
||||
error::UnexpectedSnafu {
|
||||
err_msg: format!(
|
||||
"The length of keys({}) does not match the length of dest_keys({}).",
|
||||
keys.len(),
|
||||
dest_keys.len()
|
||||
),
|
||||
}
|
||||
);
|
||||
if keys.is_empty() {
|
||||
return Ok(0);
|
||||
}
|
||||
let chunk_size = self.max_txn_ops() / 2;
|
||||
if keys.len() > chunk_size {
|
||||
debug!(
|
||||
"Moving values with multiple chunks, keys len: {}, chunk_size: {}",
|
||||
keys.len(),
|
||||
chunk_size
|
||||
);
|
||||
let mut moved_keys = 0;
|
||||
let keys_chunks = keys.chunks(chunk_size).collect::<Vec<_>>();
|
||||
let dest_keys_chunks = dest_keys.chunks(chunk_size).collect::<Vec<_>>();
|
||||
for (keys, dest_keys) in keys_chunks.into_iter().zip(dest_keys_chunks) {
|
||||
moved_keys += self.move_values_inner(keys, dest_keys).await?;
|
||||
}
|
||||
Ok(moved_keys)
|
||||
} else {
|
||||
self.move_values_inner(&keys, &dest_keys).await
|
||||
}
|
||||
@@ -154,7 +199,7 @@ impl TombstoneManager {
|
||||
})
|
||||
.unzip();
|
||||
|
||||
self.move_values(keys, dest_keys).await
|
||||
self.move_values(keys, dest_keys).await.map(|_| ())
|
||||
}
|
||||
|
||||
/// Restores tombstones for keys.
|
||||
@@ -171,20 +216,22 @@ impl TombstoneManager {
|
||||
})
|
||||
.unzip();
|
||||
|
||||
self.move_values(keys, dest_keys).await
|
||||
self.move_values(keys, dest_keys).await.map(|_| ())
|
||||
}
|
||||
|
||||
/// Deletes tombstones values for the specified `keys`.
|
||||
pub(crate) async fn delete(&self, keys: Vec<Vec<u8>>) -> Result<()> {
|
||||
let operations = keys
|
||||
.iter()
|
||||
.map(|key| TxnOp::Delete(to_tombstone(key)))
|
||||
.collect::<Vec<_>>();
|
||||
///
|
||||
/// Returns the number of keys that were deleted.
|
||||
pub async fn delete(&self, keys: Vec<Vec<u8>>) -> Result<usize> {
|
||||
let keys = keys.iter().map(|key| to_tombstone(key)).collect::<Vec<_>>();
|
||||
|
||||
let txn = Txn::new().and_then(operations);
|
||||
// Always success.
|
||||
let _ = self.kv_backend.txn(txn).await?;
|
||||
Ok(())
|
||||
let num_keys = keys.len();
|
||||
let _ = self
|
||||
.kv_backend
|
||||
.batch_delete(BatchDeleteRequest::new().with_keys(keys))
|
||||
.await?;
|
||||
|
||||
Ok(num_keys)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -373,16 +420,73 @@ mod tests {
|
||||
.into_iter()
|
||||
.map(|kv| (kv.key, kv.dest_key))
|
||||
.unzip();
|
||||
tombstone_manager
|
||||
let moved_keys = tombstone_manager
|
||||
.move_values(keys.clone(), dest_keys.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(kvs.len(), moved_keys);
|
||||
check_moved_values(kv_backend.clone(), &move_values).await;
|
||||
// Moves again
|
||||
tombstone_manager
|
||||
let moved_keys = tombstone_manager
|
||||
.move_values(keys.clone(), dest_keys.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(0, moved_keys);
|
||||
check_moved_values(kv_backend.clone(), &move_values).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_move_values_with_max_txn_ops() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let kv_backend = Arc::new(MemoryKvBackend::default());
|
||||
let mut tombstone_manager = TombstoneManager::new(kv_backend.clone());
|
||||
tombstone_manager.set_max_txn_ops(4);
|
||||
let kvs = HashMap::from([
|
||||
(b"bar".to_vec(), b"baz".to_vec()),
|
||||
(b"foo".to_vec(), b"hi".to_vec()),
|
||||
(b"baz".to_vec(), b"hello".to_vec()),
|
||||
(b"qux".to_vec(), b"world".to_vec()),
|
||||
(b"quux".to_vec(), b"world".to_vec()),
|
||||
(b"quuux".to_vec(), b"world".to_vec()),
|
||||
(b"quuuux".to_vec(), b"world".to_vec()),
|
||||
(b"quuuuux".to_vec(), b"world".to_vec()),
|
||||
(b"quuuuuux".to_vec(), b"world".to_vec()),
|
||||
]);
|
||||
for (key, value) in &kvs {
|
||||
kv_backend
|
||||
.put(
|
||||
PutRequest::new()
|
||||
.with_key(key.clone())
|
||||
.with_value(value.clone()),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
let move_values = kvs
|
||||
.iter()
|
||||
.map(|(key, value)| MoveValue {
|
||||
key: key.clone(),
|
||||
dest_key: to_tombstone(key),
|
||||
value: value.clone(),
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
let (keys, dest_keys): (Vec<_>, Vec<_>) = move_values
|
||||
.clone()
|
||||
.into_iter()
|
||||
.map(|kv| (kv.key, kv.dest_key))
|
||||
.unzip();
|
||||
let moved_keys = tombstone_manager
|
||||
.move_values(keys.clone(), dest_keys.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(kvs.len(), moved_keys);
|
||||
check_moved_values(kv_backend.clone(), &move_values).await;
|
||||
// Moves again
|
||||
let moved_keys = tombstone_manager
|
||||
.move_values(keys.clone(), dest_keys.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(0, moved_keys);
|
||||
check_moved_values(kv_backend.clone(), &move_values).await;
|
||||
}
|
||||
|
||||
@@ -420,17 +524,19 @@ mod tests {
|
||||
.unzip();
|
||||
keys.push(b"non-exists".to_vec());
|
||||
dest_keys.push(b"hi/non-exists".to_vec());
|
||||
tombstone_manager
|
||||
let moved_keys = tombstone_manager
|
||||
.move_values(keys.clone(), dest_keys.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
check_moved_values(kv_backend.clone(), &move_values).await;
|
||||
assert_eq!(3, moved_keys);
|
||||
// Moves again
|
||||
tombstone_manager
|
||||
let moved_keys = tombstone_manager
|
||||
.move_values(keys.clone(), dest_keys.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
check_moved_values(kv_backend.clone(), &move_values).await;
|
||||
assert_eq!(0, moved_keys);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -471,10 +577,11 @@ mod tests {
|
||||
.into_iter()
|
||||
.map(|kv| (kv.key, kv.dest_key))
|
||||
.unzip();
|
||||
tombstone_manager
|
||||
let moved_keys = tombstone_manager
|
||||
.move_values(keys, dest_keys)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(kvs.len(), moved_keys);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -552,4 +659,24 @@ mod tests {
|
||||
.unwrap();
|
||||
check_moved_values(kv_backend.clone(), &move_values).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_move_values_with_different_lengths() {
|
||||
let kv_backend = Arc::new(MemoryKvBackend::default());
|
||||
let tombstone_manager = TombstoneManager::new(kv_backend.clone());
|
||||
|
||||
let keys = vec![b"bar".to_vec(), b"foo".to_vec()];
|
||||
let dest_keys = vec![b"bar".to_vec(), b"foo".to_vec(), b"baz".to_vec()];
|
||||
|
||||
let err = tombstone_manager
|
||||
.move_values(keys, dest_keys)
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert!(err
|
||||
.to_string()
|
||||
.contains("The length of keys(2) does not match the length of dest_keys(3)."),);
|
||||
|
||||
let moved_keys = tombstone_manager.move_values(vec![], vec![]).await.unwrap();
|
||||
assert_eq!(0, moved_keys);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -897,7 +897,7 @@ impl StreamingEngine {
|
||||
let rows_send = self.run_available(true).await?;
|
||||
let row = self.send_writeback_requests().await?;
|
||||
debug!(
|
||||
"Done to flush flow_id={:?} with {} input rows flushed, {} rows sended and {} output rows flushed",
|
||||
"Done to flush flow_id={:?} with {} input rows flushed, {} rows sent and {} output rows flushed",
|
||||
flow_id, flushed_input_rows, rows_send, row
|
||||
);
|
||||
Ok(row)
|
||||
|
||||
@@ -34,7 +34,8 @@ use store_api::metric_engine_consts::{
|
||||
METADATA_SCHEMA_VALUE_COLUMN_INDEX, METADATA_SCHEMA_VALUE_COLUMN_NAME,
|
||||
};
|
||||
use store_api::mito_engine_options::{
|
||||
APPEND_MODE_KEY, MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING, SKIP_WAL_KEY, TTL_KEY,
|
||||
APPEND_MODE_KEY, MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING, MEMTABLE_TYPE, SKIP_WAL_KEY,
|
||||
TTL_KEY,
|
||||
};
|
||||
use store_api::region_engine::RegionEngine;
|
||||
use store_api::region_request::{AffectedRows, RegionCreateRequest, RegionRequest};
|
||||
@@ -564,6 +565,7 @@ pub(crate) fn region_options_for_metadata_region(
|
||||
original.remove(APPEND_MODE_KEY);
|
||||
// Don't allow to set primary key encoding for metadata region.
|
||||
original.remove(MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING);
|
||||
original.remove(MEMTABLE_TYPE);
|
||||
original.insert(TTL_KEY.to_string(), FOREVER.to_string());
|
||||
original.remove(SKIP_WAL_KEY);
|
||||
original
|
||||
|
||||
@@ -17,6 +17,7 @@ use std::any::Any;
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_macro::stack_trace_debug;
|
||||
use common_time::timestamp::TimeUnit;
|
||||
use datafusion::error::DataFusionError;
|
||||
use promql::error::Error as PromqlError;
|
||||
use promql_parser::parser::token::TokenType;
|
||||
@@ -192,6 +193,14 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Timestamp out of range: {} of {:?}", timestamp, unit))]
|
||||
TimestampOutOfRange {
|
||||
timestamp: i64,
|
||||
unit: TimeUnit,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
impl ErrorExt for Error {
|
||||
@@ -211,7 +220,8 @@ impl ErrorExt for Error {
|
||||
| UnsupportedVectorMatch { .. }
|
||||
| CombineTableColumnMismatch { .. }
|
||||
| UnexpectedPlanExpr { .. }
|
||||
| UnsupportedMatcherOp { .. } => StatusCode::InvalidArguments,
|
||||
| UnsupportedMatcherOp { .. }
|
||||
| TimestampOutOfRange { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
UnknownTable { .. } => StatusCode::Internal,
|
||||
|
||||
|
||||
@@ -14,80 +14,73 @@
|
||||
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
|
||||
use common_time::timestamp::TimeUnit;
|
||||
use common_time::Timestamp;
|
||||
use datafusion_common::{Column, ScalarValue};
|
||||
use datafusion_expr::expr::Alias;
|
||||
use datafusion_expr::utils::conjunction;
|
||||
use datafusion_expr::{col, Cast, Expr, LogicalPlan, LogicalPlanBuilder};
|
||||
use datafusion_sql::TableReference;
|
||||
use datatypes::arrow::datatypes::{DataType as ArrowDataType, TimeUnit as ArrowTimeUnit};
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datafusion_expr::{col, Expr, LogicalPlan, LogicalPlanBuilder};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use table::TableRef;
|
||||
|
||||
use crate::promql::error::{DataFusionPlanningSnafu, Result, TimeIndexNotFoundSnafu};
|
||||
use crate::promql::error::{
|
||||
DataFusionPlanningSnafu, Result, TimeIndexNotFoundSnafu, TimestampOutOfRangeSnafu,
|
||||
};
|
||||
|
||||
fn build_time_filter(time_index_expr: Expr, start: i64, end: i64) -> Expr {
|
||||
fn build_time_filter(time_index_expr: Expr, start: Timestamp, end: Timestamp) -> Expr {
|
||||
time_index_expr
|
||||
.clone()
|
||||
.gt_eq(Expr::Literal(ScalarValue::TimestampMillisecond(
|
||||
Some(start),
|
||||
None,
|
||||
)))
|
||||
.and(
|
||||
time_index_expr.lt_eq(Expr::Literal(ScalarValue::TimestampMillisecond(
|
||||
Some(end),
|
||||
None,
|
||||
))),
|
||||
)
|
||||
.gt_eq(Expr::Literal(timestamp_to_scalar_value(start)))
|
||||
.and(time_index_expr.lt_eq(Expr::Literal(timestamp_to_scalar_value(end))))
|
||||
}
|
||||
|
||||
fn timestamp_to_scalar_value(timestamp: Timestamp) -> ScalarValue {
|
||||
let value = timestamp.value();
|
||||
match timestamp.unit() {
|
||||
TimeUnit::Second => ScalarValue::TimestampSecond(Some(value), None),
|
||||
TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(Some(value), None),
|
||||
TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(Some(value), None),
|
||||
TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(Some(value), None),
|
||||
}
|
||||
}
|
||||
|
||||
/// Rewrite label values query to DataFusion logical plan.
|
||||
pub fn rewrite_label_values_query(
|
||||
table: TableRef,
|
||||
mut scan_plan: LogicalPlan,
|
||||
scan_plan: LogicalPlan,
|
||||
mut conditions: Vec<Expr>,
|
||||
label_name: String,
|
||||
start: SystemTime,
|
||||
end: SystemTime,
|
||||
) -> Result<LogicalPlan> {
|
||||
let table_ref = TableReference::partial(
|
||||
table.table_info().schema_name.as_str(),
|
||||
table.table_info().name.as_str(),
|
||||
);
|
||||
let schema = table.schema();
|
||||
let ts_column = schema
|
||||
.timestamp_column()
|
||||
.with_context(|| TimeIndexNotFoundSnafu {
|
||||
table: table.table_info().full_table_name(),
|
||||
})?;
|
||||
let unit = ts_column
|
||||
.data_type
|
||||
.as_timestamp()
|
||||
.map(|data_type| data_type.unit())
|
||||
.with_context(|| TimeIndexNotFoundSnafu {
|
||||
table: table.table_info().full_table_name(),
|
||||
})?;
|
||||
|
||||
let is_time_index_ms =
|
||||
ts_column.data_type == ConcreteDataType::timestamp_millisecond_datatype();
|
||||
// We only support millisecond precision at most.
|
||||
let start =
|
||||
Timestamp::new_millisecond(start.duration_since(UNIX_EPOCH).unwrap().as_millis() as i64);
|
||||
let start = start.convert_to(unit).context(TimestampOutOfRangeSnafu {
|
||||
timestamp: start.value(),
|
||||
unit,
|
||||
})?;
|
||||
let end =
|
||||
Timestamp::new_millisecond(end.duration_since(UNIX_EPOCH).unwrap().as_millis() as i64);
|
||||
let end = end.convert_to(unit).context(TimestampOutOfRangeSnafu {
|
||||
timestamp: end.value(),
|
||||
unit,
|
||||
})?;
|
||||
let time_index_expr = col(Column::from_name(ts_column.name.clone()));
|
||||
|
||||
if !is_time_index_ms {
|
||||
// cast to ms if time_index not in Millisecond precision
|
||||
let expr = vec![
|
||||
col(Column::from_name(label_name.clone())),
|
||||
Expr::Alias(Alias {
|
||||
expr: Box::new(Expr::Cast(Cast {
|
||||
expr: Box::new(time_index_expr.clone()),
|
||||
data_type: ArrowDataType::Timestamp(ArrowTimeUnit::Millisecond, None),
|
||||
})),
|
||||
relation: Some(table_ref),
|
||||
name: ts_column.name.clone(),
|
||||
}),
|
||||
];
|
||||
scan_plan = LogicalPlanBuilder::from(scan_plan)
|
||||
.project(expr)
|
||||
.context(DataFusionPlanningSnafu)?
|
||||
.build()
|
||||
.context(DataFusionPlanningSnafu)?;
|
||||
};
|
||||
|
||||
let start = start.duration_since(UNIX_EPOCH).unwrap().as_millis() as i64;
|
||||
let end = end.duration_since(UNIX_EPOCH).unwrap().as_millis() as i64;
|
||||
|
||||
conditions.push(build_time_filter(time_index_expr, start, end));
|
||||
// Safety: `conditions` is not empty.
|
||||
let filter = conjunction(conditions).unwrap();
|
||||
|
||||
@@ -492,11 +492,15 @@ pub async fn setup_test_prom_app_with_frontend(
|
||||
// build physical table
|
||||
let sql = "CREATE TABLE phy (ts timestamp time index, val double, host string primary key) engine=metric with ('physical_metric_table' = '')";
|
||||
run_sql(sql, &instance).await;
|
||||
let sql = "CREATE TABLE phy_ns (ts timestamp(0) time index, val double, host string primary key) engine=metric with ('physical_metric_table' = '')";
|
||||
run_sql(sql, &instance).await;
|
||||
// build metric tables
|
||||
let sql = "CREATE TABLE demo (ts timestamp time index, val double, host string primary key) engine=metric with ('on_physical_table' = 'phy')";
|
||||
run_sql(sql, &instance).await;
|
||||
let sql = "CREATE TABLE demo_metrics (ts timestamp time index, val double, idc string primary key) engine=metric with ('on_physical_table' = 'phy')";
|
||||
run_sql(sql, &instance).await;
|
||||
let sql = "CREATE TABLE multi_labels (ts timestamp(0) time index, val double, idc string, env string, host string, primary key (idc, env, host)) engine=metric with ('on_physical_table' = 'phy_ns')";
|
||||
run_sql(sql, &instance).await;
|
||||
|
||||
// insert rows
|
||||
let sql = "INSERT INTO demo(host, val, ts) VALUES ('host1', 1.1, 0), ('host2', 2.1, 600000)";
|
||||
@@ -508,6 +512,10 @@ pub async fn setup_test_prom_app_with_frontend(
|
||||
let sql = "INSERT INTO demo_metrics(val, ts) VALUES (1.1, 0)";
|
||||
run_sql(sql, &instance).await;
|
||||
|
||||
// insert rows to multi_labels
|
||||
let sql = "INSERT INTO multi_labels(idc, env, host, val, ts) VALUES ('idc1', 'dev', 'host1', 1.1, 0), ('idc1', 'dev', 'host2', 2.1, 0), ('idc2', 'dev', 'host1', 1.1, 0), ('idc2', 'test', 'host3', 2.1, 0)";
|
||||
run_sql(sql, &instance).await;
|
||||
|
||||
// build physical table
|
||||
let sql = "CREATE TABLE phy2 (ts timestamp(9) time index, val double, host string primary key) engine=metric with ('physical_metric_table' = '')";
|
||||
run_sql(sql, &instance).await;
|
||||
|
||||
@@ -604,8 +604,10 @@ pub async fn test_prom_http_api(store_type: StorageType) {
|
||||
assert_eq!(body.status, "success");
|
||||
assert_eq!(
|
||||
body.data,
|
||||
serde_json::from_value::<PrometheusResponse>(json!(["__name__", "host", "idc", "number",]))
|
||||
.unwrap()
|
||||
serde_json::from_value::<PrometheusResponse>(json!([
|
||||
"__name__", "env", "host", "idc", "number",
|
||||
]))
|
||||
.unwrap()
|
||||
);
|
||||
|
||||
// labels query with multiple match[] params
|
||||
@@ -726,6 +728,19 @@ pub async fn test_prom_http_api(store_type: StorageType) {
|
||||
serde_json::from_value::<PrometheusResponse>(json!(["idc1"])).unwrap()
|
||||
);
|
||||
|
||||
// match labels.
|
||||
let res = client
|
||||
.get("/v1/prometheus/api/v1/label/host/values?match[]=multi_labels{idc=\"idc1\", env=\"dev\"}&start=0&end=600")
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
let body = serde_json::from_str::<PrometheusJsonResponse>(&res.text().await).unwrap();
|
||||
assert_eq!(body.status, "success");
|
||||
assert_eq!(
|
||||
body.data,
|
||||
serde_json::from_value::<PrometheusResponse>(json!(["host1", "host2"])).unwrap()
|
||||
);
|
||||
|
||||
// search field name
|
||||
let res = client
|
||||
.get("/v1/prometheus/api/v1/label/__field__/values?match[]=demo")
|
||||
@@ -815,6 +830,7 @@ pub async fn test_prom_http_api(store_type: StorageType) {
|
||||
"demo_metrics_with_nanos".to_string(),
|
||||
"logic_table".to_string(),
|
||||
"mito".to_string(),
|
||||
"multi_labels".to_string(),
|
||||
"numbers".to_string()
|
||||
])
|
||||
);
|
||||
|
||||
Reference in New Issue
Block a user