Compare commits

..

9 Commits

Author SHA1 Message Date
shuiyisong
66a784b58a fix: fix dest_keys chunks bug in TombstoneManager (#6432) (#6448)
* fix(meta): fix dest_keys_chunks bug in TombstoneManager



* chore: fix typo



* fix: fix sqlness tests



---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
Co-authored-by: Weny Xu <wenymedia@gmail.com>
2025-07-03 04:21:57 +00:00
Yingwen
ce95e051ff fix: do not add projection to cast timestamp in label_values (#6040)
* fix: do not add projection for cast

Use cast to build time filter directly instead of adding a projection,
which will cause column not found

* feat: cast before creating plan
2025-06-17 11:52:45 -07:00
shuiyisong
de08ddafc8 fix: logical table missing column
Signed-off-by: shuiyisong <xixing.sys@gmail.com>
2025-06-17 11:43:48 -07:00
Zhenchi
e46efb3d6c chore: bump version to 0.14.4
Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2025-06-04 15:59:41 +08:00
Yingwen
34af9580e0 fix: do not accommodate fields for multi-value protocol (#6237) 2025-06-04 15:59:41 +08:00
Lei, HUANG
b19d23d665 fix(mito): revert initial builder capacity for TimeSeriesMemtable (#6231)
* fix/initial-builder-cap:
 ### Enhance Series Initialization and Capacity Management

 - **`simple_bulk_memtable.rs`**: Updated the `Series` initialization to use `with_capacity` with a specified capacity of 8192, improving memory management.
 - **`time_series.rs`**: Introduced `with_capacity` method in `Series` to allow custom initial capacity for `ValueBuilder`. Adjusted `INITIAL_BUILDER_CAPACITY` to 16 for more efficient memory usage. Added a new `new` method to maintain backward compatibility.

* fix/initial-builder-cap:
 ### Adjust Memory Allocation in Memtable

 - **`simple_bulk_memtable.rs`**: Reduced the initial capacity of `Series` from 8192 to 1024 to optimize memory usage.
 - **`time_series.rs`**: Decreased `INITIAL_BUILDER_CAPACITY` from 16 to 4 to improve efficiency in vector building.
2025-06-04 15:59:41 +08:00
dennis zhuang
209f15dd51 fix: set column index can't work in physical table (#6179) 2025-06-04 15:59:41 +08:00
discord9
0829fb204c chore: rm unnecessary depend for flow (#6047) 2025-06-04 15:59:41 +08:00
discord9
c8e470e8ed chore: upgrade hydroflow depend (#6011)
* chore: `hydroflow` -> `dfir`

* chore: refine log msg
2025-06-04 15:59:41 +08:00
33 changed files with 692 additions and 821 deletions

801
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -68,7 +68,7 @@ members = [
resolver = "2"
[workspace.package]
version = "0.14.3"
version = "0.14.4"
edition = "2021"
license = "Apache-2.0"

View File

@@ -875,7 +875,10 @@ impl TableMetadataManager {
) -> Result<()> {
let table_metadata_keys =
self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
self.tombstone_manager.delete(table_metadata_keys).await
self.tombstone_manager
.delete(table_metadata_keys)
.await
.map(|_| ())
}
/// Restores metadata for table.

View File

@@ -14,19 +14,23 @@
use std::collections::HashMap;
use common_telemetry::debug;
use snafu::ensure;
use crate::error::{self, Result};
use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::BatchGetRequest;
use crate::rpc::store::{BatchDeleteRequest, BatchGetRequest};
/// [TombstoneManager] provides the ability to:
/// - logically delete values
/// - restore the deleted values
pub(crate) struct TombstoneManager {
kv_backend: KvBackendRef,
// Only used for testing.
#[cfg(test)]
max_txn_ops: Option<usize>,
}
const TOMBSTONE_PREFIX: &str = "__tombstone/";
@@ -38,7 +42,16 @@ fn to_tombstone(key: &[u8]) -> Vec<u8> {
impl TombstoneManager {
/// Returns [TombstoneManager].
pub fn new(kv_backend: KvBackendRef) -> Self {
Self { kv_backend }
Self {
kv_backend,
#[cfg(test)]
max_txn_ops: None,
}
}
#[cfg(test)]
pub fn set_max_txn_ops(&mut self, max_txn_ops: usize) {
self.max_txn_ops = Some(max_txn_ops);
}
/// Moves value to `dest_key`.
@@ -67,11 +80,15 @@ impl TombstoneManager {
(txn, TxnOpGetResponseSet::filter(src_key))
}
async fn move_values_inner(&self, keys: &[Vec<u8>], dest_keys: &[Vec<u8>]) -> Result<()> {
async fn move_values_inner(&self, keys: &[Vec<u8>], dest_keys: &[Vec<u8>]) -> Result<usize> {
ensure!(
keys.len() == dest_keys.len(),
error::UnexpectedSnafu {
err_msg: "The length of keys does not match the length of dest_keys."
err_msg: format!(
"The length of keys({}) does not match the length of dest_keys({}).",
keys.len(),
dest_keys.len()
),
}
);
// The key -> dest key mapping.
@@ -102,7 +119,7 @@ impl TombstoneManager {
.unzip();
let mut resp = self.kv_backend.txn(Txn::merge_all(txns)).await?;
if resp.succeeded {
return Ok(());
return Ok(keys.len());
}
let mut set = TxnOpGetResponseSet::from(&mut resp.responses);
// Updates results.
@@ -124,17 +141,45 @@ impl TombstoneManager {
.fail()
}
/// Moves values to `dest_key`.
async fn move_values(&self, keys: Vec<Vec<u8>>, dest_keys: Vec<Vec<u8>>) -> Result<()> {
let chunk_size = self.kv_backend.max_txn_ops() / 2;
if keys.len() > chunk_size {
let keys_chunks = keys.chunks(chunk_size).collect::<Vec<_>>();
let dest_keys_chunks = keys.chunks(chunk_size).collect::<Vec<_>>();
for (keys, dest_keys) in keys_chunks.into_iter().zip(dest_keys_chunks) {
self.move_values_inner(keys, dest_keys).await?;
}
fn max_txn_ops(&self) -> usize {
#[cfg(test)]
if let Some(max_txn_ops) = self.max_txn_ops {
return max_txn_ops;
}
self.kv_backend.max_txn_ops()
}
Ok(())
/// Moves values to `dest_key`.
///
/// Returns the number of keys that were moved.
async fn move_values(&self, keys: Vec<Vec<u8>>, dest_keys: Vec<Vec<u8>>) -> Result<usize> {
ensure!(
keys.len() == dest_keys.len(),
error::UnexpectedSnafu {
err_msg: format!(
"The length of keys({}) does not match the length of dest_keys({}).",
keys.len(),
dest_keys.len()
),
}
);
if keys.is_empty() {
return Ok(0);
}
let chunk_size = self.max_txn_ops() / 2;
if keys.len() > chunk_size {
debug!(
"Moving values with multiple chunks, keys len: {}, chunk_size: {}",
keys.len(),
chunk_size
);
let mut moved_keys = 0;
let keys_chunks = keys.chunks(chunk_size).collect::<Vec<_>>();
let dest_keys_chunks = dest_keys.chunks(chunk_size).collect::<Vec<_>>();
for (keys, dest_keys) in keys_chunks.into_iter().zip(dest_keys_chunks) {
moved_keys += self.move_values_inner(keys, dest_keys).await?;
}
Ok(moved_keys)
} else {
self.move_values_inner(&keys, &dest_keys).await
}
@@ -154,7 +199,7 @@ impl TombstoneManager {
})
.unzip();
self.move_values(keys, dest_keys).await
self.move_values(keys, dest_keys).await.map(|_| ())
}
/// Restores tombstones for keys.
@@ -171,20 +216,22 @@ impl TombstoneManager {
})
.unzip();
self.move_values(keys, dest_keys).await
self.move_values(keys, dest_keys).await.map(|_| ())
}
/// Deletes tombstones values for the specified `keys`.
pub(crate) async fn delete(&self, keys: Vec<Vec<u8>>) -> Result<()> {
let operations = keys
.iter()
.map(|key| TxnOp::Delete(to_tombstone(key)))
.collect::<Vec<_>>();
///
/// Returns the number of keys that were deleted.
pub async fn delete(&self, keys: Vec<Vec<u8>>) -> Result<usize> {
let keys = keys.iter().map(|key| to_tombstone(key)).collect::<Vec<_>>();
let txn = Txn::new().and_then(operations);
// Always success.
let _ = self.kv_backend.txn(txn).await?;
Ok(())
let num_keys = keys.len();
let _ = self
.kv_backend
.batch_delete(BatchDeleteRequest::new().with_keys(keys))
.await?;
Ok(num_keys)
}
}
@@ -373,16 +420,73 @@ mod tests {
.into_iter()
.map(|kv| (kv.key, kv.dest_key))
.unzip();
tombstone_manager
let moved_keys = tombstone_manager
.move_values(keys.clone(), dest_keys.clone())
.await
.unwrap();
assert_eq!(kvs.len(), moved_keys);
check_moved_values(kv_backend.clone(), &move_values).await;
// Moves again
tombstone_manager
let moved_keys = tombstone_manager
.move_values(keys.clone(), dest_keys.clone())
.await
.unwrap();
assert_eq!(0, moved_keys);
check_moved_values(kv_backend.clone(), &move_values).await;
}
#[tokio::test]
async fn test_move_values_with_max_txn_ops() {
common_telemetry::init_default_ut_logging();
let kv_backend = Arc::new(MemoryKvBackend::default());
let mut tombstone_manager = TombstoneManager::new(kv_backend.clone());
tombstone_manager.set_max_txn_ops(4);
let kvs = HashMap::from([
(b"bar".to_vec(), b"baz".to_vec()),
(b"foo".to_vec(), b"hi".to_vec()),
(b"baz".to_vec(), b"hello".to_vec()),
(b"qux".to_vec(), b"world".to_vec()),
(b"quux".to_vec(), b"world".to_vec()),
(b"quuux".to_vec(), b"world".to_vec()),
(b"quuuux".to_vec(), b"world".to_vec()),
(b"quuuuux".to_vec(), b"world".to_vec()),
(b"quuuuuux".to_vec(), b"world".to_vec()),
]);
for (key, value) in &kvs {
kv_backend
.put(
PutRequest::new()
.with_key(key.clone())
.with_value(value.clone()),
)
.await
.unwrap();
}
let move_values = kvs
.iter()
.map(|(key, value)| MoveValue {
key: key.clone(),
dest_key: to_tombstone(key),
value: value.clone(),
})
.collect::<Vec<_>>();
let (keys, dest_keys): (Vec<_>, Vec<_>) = move_values
.clone()
.into_iter()
.map(|kv| (kv.key, kv.dest_key))
.unzip();
let moved_keys = tombstone_manager
.move_values(keys.clone(), dest_keys.clone())
.await
.unwrap();
assert_eq!(kvs.len(), moved_keys);
check_moved_values(kv_backend.clone(), &move_values).await;
// Moves again
let moved_keys = tombstone_manager
.move_values(keys.clone(), dest_keys.clone())
.await
.unwrap();
assert_eq!(0, moved_keys);
check_moved_values(kv_backend.clone(), &move_values).await;
}
@@ -420,17 +524,19 @@ mod tests {
.unzip();
keys.push(b"non-exists".to_vec());
dest_keys.push(b"hi/non-exists".to_vec());
tombstone_manager
let moved_keys = tombstone_manager
.move_values(keys.clone(), dest_keys.clone())
.await
.unwrap();
check_moved_values(kv_backend.clone(), &move_values).await;
assert_eq!(3, moved_keys);
// Moves again
tombstone_manager
let moved_keys = tombstone_manager
.move_values(keys.clone(), dest_keys.clone())
.await
.unwrap();
check_moved_values(kv_backend.clone(), &move_values).await;
assert_eq!(0, moved_keys);
}
#[tokio::test]
@@ -471,10 +577,11 @@ mod tests {
.into_iter()
.map(|kv| (kv.key, kv.dest_key))
.unzip();
tombstone_manager
let moved_keys = tombstone_manager
.move_values(keys, dest_keys)
.await
.unwrap();
assert_eq!(kvs.len(), moved_keys);
}
#[tokio::test]
@@ -552,4 +659,24 @@ mod tests {
.unwrap();
check_moved_values(kv_backend.clone(), &move_values).await;
}
#[tokio::test]
async fn test_move_values_with_different_lengths() {
let kv_backend = Arc::new(MemoryKvBackend::default());
let tombstone_manager = TombstoneManager::new(kv_backend.clone());
let keys = vec![b"bar".to_vec(), b"foo".to_vec()];
let dest_keys = vec![b"bar".to_vec(), b"foo".to_vec(), b"baz".to_vec()];
let err = tombstone_manager
.move_values(keys, dest_keys)
.await
.unwrap_err();
assert!(err
.to_string()
.contains("The length of keys(2) does not match the length of dest_keys(3)."),);
let moved_keys = tombstone_manager.move_values(vec![], vec![]).await.unwrap();
assert_eq!(0, moved_keys);
}
}

View File

@@ -16,6 +16,7 @@ async-trait.workspace = true
bytes.workspace = true
cache.workspace = true
catalog.workspace = true
chrono.workspace = true
client.workspace = true
common-base.workspace = true
common-config.workspace = true
@@ -39,16 +40,13 @@ datafusion-expr.workspace = true
datafusion-physical-expr.workspace = true
datafusion-substrait.workspace = true
datatypes.workspace = true
dfir_rs = { version = "0.13.0", default-features = false }
enum-as-inner = "0.6.0"
enum_dispatch = "0.3"
futures.workspace = true
get-size2 = "0.1.2"
greptime-proto.workspace = true
# This fork of hydroflow is simply for keeping our dependency in our org, and pin the version
# otherwise it is the same with upstream repo
chrono.workspace = true
http.workspace = true
hydroflow = { git = "https://github.com/GreptimeTeam/hydroflow.git", branch = "main" }
itertools.workspace = true
lazy_static.workspace = true
meta-client.workspace = true

View File

@@ -897,7 +897,7 @@ impl StreamingEngine {
let rows_send = self.run_available(true).await?;
let row = self.send_writeback_requests().await?;
debug!(
"Done to flush flow_id={:?} with {} input rows flushed, {} rows sended and {} output rows flushed",
"Done to flush flow_id={:?} with {} input rows flushed, {} rows sent and {} output rows flushed",
flow_id, flushed_input_rows, rows_send, row
);
Ok(row)

View File

@@ -19,8 +19,8 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use common_telemetry::info;
use dfir_rs::scheduled::graph::Dfir;
use enum_as_inner::EnumAsInner;
use hydroflow::scheduled::graph::Hydroflow;
use snafu::ensure;
use tokio::sync::{broadcast, mpsc, oneshot, Mutex};
@@ -49,9 +49,9 @@ pub fn create_worker<'a>() -> (WorkerHandle, Worker<'a>) {
(worker_handle, worker)
}
/// ActiveDataflowState is a wrapper around `Hydroflow` and `DataflowState`
/// ActiveDataflowState is a wrapper around `Dfir` and `DataflowState`
pub(crate) struct ActiveDataflowState<'subgraph> {
df: Hydroflow<'subgraph>,
df: Dfir<'subgraph>,
state: DataflowState,
err_collector: ErrCollector,
}
@@ -59,7 +59,7 @@ pub(crate) struct ActiveDataflowState<'subgraph> {
impl std::fmt::Debug for ActiveDataflowState<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ActiveDataflowState")
.field("df", &"<Hydroflow>")
.field("df", &"<Dfir>")
.field("state", &self.state)
.field("err_collector", &self.err_collector)
.finish()
@@ -69,7 +69,7 @@ impl std::fmt::Debug for ActiveDataflowState<'_> {
impl Default for ActiveDataflowState<'_> {
fn default() -> Self {
ActiveDataflowState {
df: Hydroflow::new(),
df: Dfir::new(),
state: DataflowState::default(),
err_collector: ErrCollector::default(),
}

View File

@@ -304,7 +304,7 @@ impl BatchingEngine {
})
.transpose()?;
info!(
debug!(
"Flow id={}, found time window expr={}",
flow_id,
phy_expr

View File

@@ -179,7 +179,7 @@ impl BatchingTask {
frontend_client: &Arc<FrontendClient>,
) -> Result<Option<(u32, Duration)>, Error> {
if let Some(new_query) = self.gen_insert_plan(engine).await? {
debug!("Generate new query: {:#?}", new_query);
debug!("Generate new query: {}", new_query);
self.execute_logical_plan(frontend_client, &new_query).await
} else {
debug!("Generate no query");

View File

@@ -138,9 +138,12 @@ impl TreeNodeVisitor<'_> for FindGroupByFinalName {
fn f_down(&mut self, node: &Self::Node) -> datafusion_common::Result<TreeNodeRecursion> {
if let LogicalPlan::Aggregate(aggregate) = node {
self.group_exprs = Some(aggregate.group_expr.iter().cloned().collect());
debug!("Group by exprs: {:?}", self.group_exprs);
debug!(
"FindGroupByFinalName: Get Group by exprs from Aggregate: {:?}",
self.group_exprs
);
} else if let LogicalPlan::Distinct(distinct) = node {
debug!("Distinct: {:#?}", distinct);
debug!("FindGroupByFinalName: Distinct: {}", node);
match distinct {
Distinct::All(input) => {
if let LogicalPlan::TableScan(table_scan) = &**input {
@@ -162,7 +165,10 @@ impl TreeNodeVisitor<'_> for FindGroupByFinalName {
self.group_exprs = Some(distinct_on.on_expr.iter().cloned().collect())
}
}
debug!("Group by exprs: {:?}", self.group_exprs);
debug!(
"FindGroupByFinalName: Get Group by exprs from Distinct: {:?}",
self.group_exprs
);
}
Ok(TreeNodeRecursion::Continue)

View File

@@ -18,9 +18,9 @@
use std::collections::BTreeMap;
use hydroflow::scheduled::graph::Hydroflow;
use hydroflow::scheduled::graph_ext::GraphExt;
use hydroflow::scheduled::port::{PortCtx, SEND};
use dfir_rs::scheduled::graph::Dfir;
use dfir_rs::scheduled::graph_ext::GraphExt;
use dfir_rs::scheduled::port::{PortCtx, SEND};
use itertools::Itertools;
use snafu::OptionExt;
@@ -38,7 +38,7 @@ mod src_sink;
/// The Context for build a Operator with id of `GlobalId`
pub struct Context<'referred, 'df> {
pub id: GlobalId,
pub df: &'referred mut Hydroflow<'df>,
pub df: &'referred mut Dfir<'df>,
pub compute_state: &'referred mut DataflowState,
/// a list of all collections being used in the operator
///
@@ -361,16 +361,16 @@ mod test {
use std::cell::RefCell;
use std::rc::Rc;
use hydroflow::scheduled::graph::Hydroflow;
use hydroflow::scheduled::graph_ext::GraphExt;
use hydroflow::scheduled::handoff::VecHandoff;
use dfir_rs::scheduled::graph::Dfir;
use dfir_rs::scheduled::graph_ext::GraphExt;
use dfir_rs::scheduled::handoff::VecHandoff;
use pretty_assertions::assert_eq;
use super::*;
use crate::repr::Row;
pub fn run_and_check(
state: &mut DataflowState,
df: &mut Hydroflow,
df: &mut Dfir,
time_range: std::ops::Range<i64>,
expected: BTreeMap<i64, Vec<DiffRow>>,
output: Rc<RefCell<Vec<DiffRow>>>,
@@ -416,7 +416,7 @@ mod test {
}
pub fn harness_test_ctx<'r, 'h>(
df: &'r mut Hydroflow<'h>,
df: &'r mut Dfir<'h>,
state: &'r mut DataflowState,
) -> Context<'r, 'h> {
let err_collector = state.get_err_collector();
@@ -436,7 +436,7 @@ mod test {
/// that is it only emit once, not multiple times
#[test]
fn test_render_constant() {
let mut df = Hydroflow::new();
let mut df = Dfir::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);
@@ -473,7 +473,7 @@ mod test {
/// a simple example to show how to use source and sink
#[test]
fn example_source_sink() {
let mut df = Hydroflow::new();
let mut df = Dfir::new();
let (send_port, recv_port) = df.make_edge::<_, VecHandoff<i32>>("test_handoff");
df.add_subgraph_source("test_handoff_source", send_port, move |_ctx, send| {
for i in 0..10 {
@@ -498,8 +498,8 @@ mod test {
#[test]
fn test_tee_auto_schedule() {
use hydroflow::scheduled::handoff::TeeingHandoff as Toff;
let mut df = Hydroflow::new();
use dfir_rs::scheduled::handoff::TeeingHandoff as Toff;
let mut df = Dfir::new();
let (send_port, recv_port) = df.make_edge::<_, Toff<i32>>("test_handoff");
let source = df.add_subgraph_source("test_handoff_source", send_port, move |_ctx, send| {
for i in 0..10 {

View File

@@ -14,8 +14,8 @@
use std::collections::BTreeMap;
use hydroflow::scheduled::graph_ext::GraphExt;
use hydroflow::scheduled::port::{PortCtx, SEND};
use dfir_rs::scheduled::graph_ext::GraphExt;
use dfir_rs::scheduled::port::{PortCtx, SEND};
use itertools::Itertools;
use snafu::OptionExt;
@@ -256,7 +256,7 @@ fn eval_mfp_core(
mod test {
use datatypes::data_type::ConcreteDataType;
use hydroflow::scheduled::graph::Hydroflow;
use dfir_rs::scheduled::graph::Dfir;
use super::*;
use crate::compute::render::test::{get_output_handle, harness_test_ctx, run_and_check};
@@ -269,7 +269,7 @@ mod test {
/// namely: if mfp operator can schedule a delete at the correct time
#[test]
fn test_render_mfp_with_temporal() {
let mut df = Hydroflow::new();
let mut df = Dfir::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);
@@ -348,7 +348,7 @@ mod test {
/// that is it filter the rows correctly
#[test]
fn test_render_mfp() {
let mut df = Hydroflow::new();
let mut df = Dfir::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);
@@ -388,7 +388,7 @@ mod test {
/// test if mfp operator can run multiple times within same tick
#[test]
fn test_render_mfp_multiple_times() {
let mut df = Hydroflow::new();
let mut df = Dfir::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);

View File

@@ -22,7 +22,7 @@ use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::DataType;
use datatypes::value::{ListValue, Value};
use datatypes::vectors::{BooleanVector, NullVector};
use hydroflow::scheduled::graph_ext::GraphExt;
use dfir_rs::scheduled::graph_ext::GraphExt;
use itertools::Itertools;
use snafu::{ensure, OptionExt, ResultExt};
@@ -1212,7 +1212,7 @@ mod test {
use common_time::Timestamp;
use datatypes::data_type::{ConcreteDataType, ConcreteDataType as CDT};
use hydroflow::scheduled::graph::Hydroflow;
use dfir_rs::scheduled::graph::Dfir;
use super::*;
use crate::compute::render::test::{get_output_handle, harness_test_ctx, run_and_check};
@@ -1228,7 +1228,7 @@ mod test {
/// expected: sum(number), window_start, window_end
#[test]
fn test_tumble_group_by() {
let mut df = Hydroflow::new();
let mut df = Dfir::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);
const START: i64 = 1625097600000;
@@ -1389,7 +1389,7 @@ mod test {
/// select avg(number) from number;
#[test]
fn test_avg_eval() {
let mut df = Hydroflow::new();
let mut df = Dfir::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);
@@ -1500,7 +1500,7 @@ mod test {
/// | col | Int64 |
#[test]
fn test_basic_distinct() {
let mut df = Hydroflow::new();
let mut df = Dfir::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);
@@ -1556,7 +1556,7 @@ mod test {
/// | col | Int64 |
#[test]
fn test_basic_batch_reduce_accum() {
let mut df = Hydroflow::new();
let mut df = Dfir::new();
let mut state = DataflowState::default();
let now = state.current_time_ref();
let mut ctx = harness_test_ctx(&mut df, &mut state);
@@ -1662,7 +1662,7 @@ mod test {
/// | col | Int64 |
#[test]
fn test_basic_reduce_accum() {
let mut df = Hydroflow::new();
let mut df = Dfir::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);
@@ -1739,7 +1739,7 @@ mod test {
/// this test include even more insert/delete case to cover all case for eval_distinct_core
#[test]
fn test_delete_reduce_distinct_accum() {
let mut df = Hydroflow::new();
let mut df = Dfir::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);
@@ -1818,7 +1818,7 @@ mod test {
/// this test include insert and delete which should cover all case for eval_distinct_core
#[test]
fn test_basic_reduce_distinct_accum() {
let mut df = Hydroflow::new();
let mut df = Dfir::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);
@@ -1896,7 +1896,7 @@ mod test {
/// | col | Int64 |
#[test]
fn test_composite_reduce_distinct_accum() {
let mut df = Hydroflow::new();
let mut df = Dfir::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);

View File

@@ -17,7 +17,7 @@
use std::collections::BTreeMap;
use common_telemetry::{debug, trace};
use hydroflow::scheduled::graph_ext::GraphExt;
use dfir_rs::scheduled::graph_ext::GraphExt;
use itertools::Itertools;
use snafu::OptionExt;
use tokio::sync::broadcast::error::TryRecvError;

View File

@@ -16,16 +16,16 @@ use std::cell::RefCell;
use std::collections::{BTreeMap, VecDeque};
use std::rc::Rc;
use dfir_rs::scheduled::graph::Dfir;
use dfir_rs::scheduled::SubgraphId;
use get_size2::GetSize;
use hydroflow::scheduled::graph::Hydroflow;
use hydroflow::scheduled::SubgraphId;
use crate::compute::types::ErrCollector;
use crate::repr::{self, Timestamp};
use crate::utils::{ArrangeHandler, Arrangement};
/// input/output of a dataflow
/// One `ComputeState` manage the input/output/schedule of one `Hydroflow`
/// One `ComputeState` manage the input/output/schedule of one `Dfir`
#[derive(Debug, Default)]
pub struct DataflowState {
/// it is important to use a deque to maintain the order of subgraph here
@@ -38,7 +38,7 @@ pub struct DataflowState {
/// Which means it's also the current time in temporal filter to get current correct result
as_of: Rc<RefCell<Timestamp>>,
/// error collector local to this `ComputeState`,
/// useful for distinguishing errors from different `Hydroflow`
/// useful for distinguishing errors from different `Dfir`
err_collector: ErrCollector,
/// save all used arrange in this dataflow, since usually there is no delete operation
/// we can just keep track of all used arrange and schedule subgraph when they need to be updated
@@ -65,7 +65,7 @@ impl DataflowState {
/// schedule all subgraph that need to run with time <= `as_of` and run_available()
///
/// return true if any subgraph actually executed
pub fn run_available_with_schedule(&mut self, df: &mut Hydroflow) -> bool {
pub fn run_available_with_schedule(&mut self, df: &mut Dfir) -> bool {
// first split keys <= as_of into another map
let mut before = self
.schedule_subgraph

View File

@@ -18,10 +18,10 @@ use std::rc::Rc;
use std::sync::Arc;
use common_error::ext::ErrorExt;
use hydroflow::scheduled::graph::Hydroflow;
use hydroflow::scheduled::handoff::TeeingHandoff;
use hydroflow::scheduled::port::RecvPort;
use hydroflow::scheduled::SubgraphId;
use dfir_rs::scheduled::graph::Dfir;
use dfir_rs::scheduled::handoff::TeeingHandoff;
use dfir_rs::scheduled::port::RecvPort;
use dfir_rs::scheduled::SubgraphId;
use itertools::Itertools;
use tokio::sync::Mutex;
@@ -46,7 +46,7 @@ impl<T: 'static + Clone> Collection<T> {
/// clone a collection, require a mutable reference to the hydroflow instance
///
/// Note: need to be the same hydroflow instance that this collection is created from
pub fn clone(&self, df: &mut Hydroflow) -> Self {
pub fn clone(&self, df: &mut Dfir) -> Self {
Collection {
stream: self.stream.tee(df),
}
@@ -151,7 +151,7 @@ impl<T: 'static> CollectionBundle<T> {
}
impl<T: 'static + Clone> CollectionBundle<T> {
pub fn clone(&self, df: &mut Hydroflow) -> Self {
pub fn clone(&self, df: &mut Dfir) -> Self {
Self {
collection: self.collection.clone(df),
arranged: self

View File

@@ -21,7 +21,7 @@ use common_error::ext::BoxedError;
use datatypes::prelude::{ConcreteDataType, DataType};
use datatypes::value::Value;
use datatypes::vectors::{BooleanVector, Helper, VectorRef};
use hydroflow::lattices::cc_traits::Iter;
use dfir_rs::lattices::cc_traits::Iter;
use itertools::Itertools;
use snafu::{ensure, OptionExt, ResultExt};

View File

@@ -581,7 +581,7 @@ impl FrontendInvoker {
.start_timer();
self.inserter
.handle_row_inserts(requests, ctx, &self.statement_executor, false)
.handle_row_inserts(requests, ctx, &self.statement_executor, false, false)
.await
.map_err(BoxedError::new)
.context(common_frontend::error::ExternalSnafu)

View File

@@ -73,7 +73,7 @@ impl GrpcQueryHandler for Instance {
let output = match request {
Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?,
Request::RowInserts(requests) => {
self.handle_row_inserts(requests, ctx.clone(), false)
self.handle_row_inserts(requests, ctx.clone(), false, false)
.await?
}
Request::Deletes(requests) => self.handle_deletes(requests, ctx.clone()).await?,
@@ -411,6 +411,7 @@ impl Instance {
requests: RowInsertRequests,
ctx: QueryContextRef,
accommodate_existing_schema: bool,
is_single_value: bool,
) -> Result<Output> {
self.inserter
.handle_row_inserts(
@@ -418,6 +419,7 @@ impl Instance {
ctx,
self.statement_executor.as_ref(),
accommodate_existing_schema,
is_single_value,
)
.await
.context(TableOperationSnafu)
@@ -430,7 +432,14 @@ impl Instance {
ctx: QueryContextRef,
) -> Result<Output> {
self.inserter
.handle_last_non_null_inserts(requests, ctx, self.statement_executor.as_ref(), true)
.handle_last_non_null_inserts(
requests,
ctx,
self.statement_executor.as_ref(),
true,
// Influx protocol may writes multiple fields (values).
false,
)
.await
.context(TableOperationSnafu)
}

View File

@@ -52,8 +52,9 @@ impl OpentsdbProtocolHandler for Instance {
None
};
// OpenTSDB is single value.
let output = self
.handle_row_inserts(requests, ctx, true)
.handle_row_inserts(requests, ctx, true, true)
.await
.map_err(BoxedError::new)
.context(servers::error::ExecuteGrpcQuerySnafu)?;

View File

@@ -63,7 +63,7 @@ impl OpenTelemetryProtocolHandler for Instance {
None
};
self.handle_row_inserts(requests, ctx, false)
self.handle_row_inserts(requests, ctx, false, false)
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)

View File

@@ -195,7 +195,7 @@ impl PromStoreProtocolHandler for Instance {
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)?
} else {
self.handle_row_inserts(request, ctx.clone(), true)
self.handle_row_inserts(request, ctx.clone(), true, true)
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)?

View File

@@ -206,7 +206,9 @@ impl DataRegion {
) -> Result<AffectedRows> {
match request.kind {
AlterKind::SetRegionOptions { options: _ }
| AlterKind::UnsetRegionOptions { keys: _ } => {
| AlterKind::UnsetRegionOptions { keys: _ }
| AlterKind::SetIndex { options: _ }
| AlterKind::UnsetIndex { options: _ } => {
let region_id = utils::to_data_region_id(region_id);
self.mito
.handle_request(region_id, RegionRequest::Alter(request))

View File

@@ -34,7 +34,8 @@ use store_api::metric_engine_consts::{
METADATA_SCHEMA_VALUE_COLUMN_INDEX, METADATA_SCHEMA_VALUE_COLUMN_NAME,
};
use store_api::mito_engine_options::{
APPEND_MODE_KEY, MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING, SKIP_WAL_KEY, TTL_KEY,
APPEND_MODE_KEY, MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING, MEMTABLE_TYPE, SKIP_WAL_KEY,
TTL_KEY,
};
use store_api::region_engine::RegionEngine;
use store_api::region_request::{AffectedRows, RegionCreateRequest, RegionRequest};
@@ -564,6 +565,7 @@ pub(crate) fn region_options_for_metadata_region(
original.remove(APPEND_MODE_KEY);
// Don't allow to set primary key encoding for metadata region.
original.remove(MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING);
original.remove(MEMTABLE_TYPE);
original.insert(TTL_KEY.to_string(), FOREVER.to_string());
original.remove(SKIP_WAL_KEY);
original

View File

@@ -57,7 +57,7 @@ use crate::region::options::MergeMode;
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
/// Initial vector builder capacity.
const INITIAL_BUILDER_CAPACITY: usize = 16;
const INITIAL_BUILDER_CAPACITY: usize = 4;
/// Vector builder capacity.
const BUILDER_CAPACITY: usize = 512;
@@ -645,15 +645,19 @@ struct Series {
}
impl Series {
fn new(region_metadata: &RegionMetadataRef) -> Self {
pub(crate) fn with_capacity(region_metadata: &RegionMetadataRef, builder_cap: usize) -> Self {
Self {
pk_cache: None,
active: ValueBuilder::new(region_metadata, INITIAL_BUILDER_CAPACITY),
active: ValueBuilder::new(region_metadata, builder_cap),
frozen: vec![],
region_metadata: region_metadata.clone(),
}
}
pub(crate) fn new(region_metadata: &RegionMetadataRef) -> Self {
Self::with_capacity(region_metadata, INITIAL_BUILDER_CAPACITY)
}
/// Pushes a row of values into Series. Return the size of values.
fn push<'a>(
&mut self,

View File

@@ -147,7 +147,7 @@ impl Inserter {
statement_executor: &StatementExecutor,
) -> Result<Output> {
let row_inserts = ColumnToRow::convert(requests)?;
self.handle_row_inserts(row_inserts, ctx, statement_executor, false)
self.handle_row_inserts(row_inserts, ctx, statement_executor, false, false)
.await
}
@@ -158,6 +158,7 @@ impl Inserter {
ctx: QueryContextRef,
statement_executor: &StatementExecutor,
accommodate_existing_schema: bool,
is_single_value: bool,
) -> Result<Output> {
preprocess_row_insert_requests(&mut requests.inserts)?;
self.handle_row_inserts_with_create_type(
@@ -166,6 +167,7 @@ impl Inserter {
statement_executor,
AutoCreateTableType::Physical,
accommodate_existing_schema,
is_single_value,
)
.await
}
@@ -183,6 +185,7 @@ impl Inserter {
statement_executor,
AutoCreateTableType::Log,
false,
false,
)
.await
}
@@ -199,6 +202,7 @@ impl Inserter {
statement_executor,
AutoCreateTableType::Trace,
false,
false,
)
.await
}
@@ -210,6 +214,7 @@ impl Inserter {
ctx: QueryContextRef,
statement_executor: &StatementExecutor,
accommodate_existing_schema: bool,
is_single_value: bool,
) -> Result<Output> {
self.handle_row_inserts_with_create_type(
requests,
@@ -217,6 +222,7 @@ impl Inserter {
statement_executor,
AutoCreateTableType::LastNonNull,
accommodate_existing_schema,
is_single_value,
)
.await
}
@@ -229,6 +235,7 @@ impl Inserter {
statement_executor: &StatementExecutor,
create_type: AutoCreateTableType,
accommodate_existing_schema: bool,
is_single_value: bool,
) -> Result<Output> {
// remove empty requests
requests.inserts.retain(|req| {
@@ -249,6 +256,7 @@ impl Inserter {
create_type,
statement_executor,
accommodate_existing_schema,
is_single_value,
)
.await?;
@@ -299,6 +307,7 @@ impl Inserter {
AutoCreateTableType::Logical(physical_table.to_string()),
statement_executor,
true,
true,
)
.await?;
let name_to_info = table_infos
@@ -464,9 +473,10 @@ impl Inserter {
/// This mapping is used in the conversion of RowToRegion.
///
/// `accommodate_existing_schema` is used to determine if the existing schema should override the new schema.
/// It only works for TIME_INDEX and VALUE columns. This is for the case where the user creates a table with
/// It only works for TIME_INDEX and single VALUE columns. This is for the case where the user creates a table with
/// custom schema, and then inserts data with endpoints that have default schema setting, like prometheus
/// remote write. This will modify the `RowInsertRequests` in place.
/// `is_single_value` indicates whether the default schema only contains single value column so we can accommodate it.
async fn create_or_alter_tables_on_demand(
&self,
requests: &mut RowInsertRequests,
@@ -474,6 +484,7 @@ impl Inserter {
auto_create_table_type: AutoCreateTableType,
statement_executor: &StatementExecutor,
accommodate_existing_schema: bool,
is_single_value: bool,
) -> Result<CreateAlterTableResult> {
let _timer = crate::metrics::CREATE_ALTER_ON_DEMAND
.with_label_values(&[auto_create_table_type.as_str()])
@@ -537,6 +548,7 @@ impl Inserter {
&table,
ctx,
accommodate_existing_schema,
is_single_value,
)? {
alter_tables.push(alter_expr);
}
@@ -811,12 +823,15 @@ impl Inserter {
/// When `accommodate_existing_schema` is true, it may modify the input `req` to
/// accommodate it with existing schema. See [`create_or_alter_tables_on_demand`](Self::create_or_alter_tables_on_demand)
/// for more details.
/// When `accommodate_existing_schema` is true and `is_single_value` is true, it also consider fields when modifying the
/// input `req`.
fn get_alter_table_expr_on_demand(
&self,
req: &mut RowInsertRequest,
table: &TableRef,
ctx: &QueryContextRef,
accommodate_existing_schema: bool,
is_single_value: bool,
) -> Result<Option<AlterTableExpr>> {
let catalog_name = ctx.current_catalog();
let schema_name = ctx.current_schema();
@@ -834,18 +849,20 @@ impl Inserter {
let table_schema = table.schema();
// Find timestamp column name
let ts_col_name = table_schema.timestamp_column().map(|c| c.name.clone());
// Find field column name if there is only one
// Find field column name if there is only one and `is_single_value` is true.
let mut field_col_name = None;
let mut multiple_field_cols = false;
table.field_columns().for_each(|col| {
if field_col_name.is_none() {
field_col_name = Some(col.name.clone());
} else {
multiple_field_cols = true;
if is_single_value {
let mut multiple_field_cols = false;
table.field_columns().for_each(|col| {
if field_col_name.is_none() {
field_col_name = Some(col.name.clone());
} else {
multiple_field_cols = true;
}
});
if multiple_field_cols {
field_col_name = None;
}
});
if multiple_field_cols {
field_col_name = None;
}
// Update column name in request schema for Timestamp/Field columns
@@ -871,11 +888,11 @@ impl Inserter {
}
}
// Remove from add_columns any column that is timestamp or field (if there is only one field column)
// Only keep columns that are tags or non-single field.
add_columns.add_columns.retain(|col| {
let def = col.column_def.as_ref().unwrap();
def.semantic_type != SemanticType::Timestamp as i32
&& (def.semantic_type != SemanticType::Field as i32 && field_col_name.is_some())
def.semantic_type == SemanticType::Tag as i32
|| (def.semantic_type == SemanticType::Field as i32 && field_col_name.is_none())
});
if add_columns.add_columns.is_empty() {
@@ -1227,7 +1244,7 @@ mod tests {
)),
);
let alter_expr = inserter
.get_alter_table_expr_on_demand(&mut req, &table, &ctx, true)
.get_alter_table_expr_on_demand(&mut req, &table, &ctx, true, true)
.unwrap();
assert!(alter_expr.is_none());

View File

@@ -250,6 +250,7 @@ impl PipelineTable {
Self::query_ctx(&table_info),
&self.statement_executor,
false,
false,
)
.await
.context(InsertPipelineSnafu)?;

View File

@@ -17,6 +17,7 @@ use std::any::Any;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use common_time::timestamp::TimeUnit;
use datafusion::error::DataFusionError;
use promql::error::Error as PromqlError;
use promql_parser::parser::token::TokenType;
@@ -192,6 +193,14 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Timestamp out of range: {} of {:?}", timestamp, unit))]
TimestampOutOfRange {
timestamp: i64,
unit: TimeUnit,
#[snafu(implicit)]
location: Location,
},
}
impl ErrorExt for Error {
@@ -211,7 +220,8 @@ impl ErrorExt for Error {
| UnsupportedVectorMatch { .. }
| CombineTableColumnMismatch { .. }
| UnexpectedPlanExpr { .. }
| UnsupportedMatcherOp { .. } => StatusCode::InvalidArguments,
| UnsupportedMatcherOp { .. }
| TimestampOutOfRange { .. } => StatusCode::InvalidArguments,
UnknownTable { .. } => StatusCode::Internal,

View File

@@ -14,80 +14,73 @@
use std::time::{SystemTime, UNIX_EPOCH};
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use datafusion_common::{Column, ScalarValue};
use datafusion_expr::expr::Alias;
use datafusion_expr::utils::conjunction;
use datafusion_expr::{col, Cast, Expr, LogicalPlan, LogicalPlanBuilder};
use datafusion_sql::TableReference;
use datatypes::arrow::datatypes::{DataType as ArrowDataType, TimeUnit as ArrowTimeUnit};
use datatypes::prelude::ConcreteDataType;
use datafusion_expr::{col, Expr, LogicalPlan, LogicalPlanBuilder};
use snafu::{OptionExt, ResultExt};
use table::TableRef;
use crate::promql::error::{DataFusionPlanningSnafu, Result, TimeIndexNotFoundSnafu};
use crate::promql::error::{
DataFusionPlanningSnafu, Result, TimeIndexNotFoundSnafu, TimestampOutOfRangeSnafu,
};
fn build_time_filter(time_index_expr: Expr, start: i64, end: i64) -> Expr {
fn build_time_filter(time_index_expr: Expr, start: Timestamp, end: Timestamp) -> Expr {
time_index_expr
.clone()
.gt_eq(Expr::Literal(ScalarValue::TimestampMillisecond(
Some(start),
None,
)))
.and(
time_index_expr.lt_eq(Expr::Literal(ScalarValue::TimestampMillisecond(
Some(end),
None,
))),
)
.gt_eq(Expr::Literal(timestamp_to_scalar_value(start)))
.and(time_index_expr.lt_eq(Expr::Literal(timestamp_to_scalar_value(end))))
}
fn timestamp_to_scalar_value(timestamp: Timestamp) -> ScalarValue {
let value = timestamp.value();
match timestamp.unit() {
TimeUnit::Second => ScalarValue::TimestampSecond(Some(value), None),
TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(Some(value), None),
TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(Some(value), None),
TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(Some(value), None),
}
}
/// Rewrite label values query to DataFusion logical plan.
pub fn rewrite_label_values_query(
table: TableRef,
mut scan_plan: LogicalPlan,
scan_plan: LogicalPlan,
mut conditions: Vec<Expr>,
label_name: String,
start: SystemTime,
end: SystemTime,
) -> Result<LogicalPlan> {
let table_ref = TableReference::partial(
table.table_info().schema_name.as_str(),
table.table_info().name.as_str(),
);
let schema = table.schema();
let ts_column = schema
.timestamp_column()
.with_context(|| TimeIndexNotFoundSnafu {
table: table.table_info().full_table_name(),
})?;
let unit = ts_column
.data_type
.as_timestamp()
.map(|data_type| data_type.unit())
.with_context(|| TimeIndexNotFoundSnafu {
table: table.table_info().full_table_name(),
})?;
let is_time_index_ms =
ts_column.data_type == ConcreteDataType::timestamp_millisecond_datatype();
// We only support millisecond precision at most.
let start =
Timestamp::new_millisecond(start.duration_since(UNIX_EPOCH).unwrap().as_millis() as i64);
let start = start.convert_to(unit).context(TimestampOutOfRangeSnafu {
timestamp: start.value(),
unit,
})?;
let end =
Timestamp::new_millisecond(end.duration_since(UNIX_EPOCH).unwrap().as_millis() as i64);
let end = end.convert_to(unit).context(TimestampOutOfRangeSnafu {
timestamp: end.value(),
unit,
})?;
let time_index_expr = col(Column::from_name(ts_column.name.clone()));
if !is_time_index_ms {
// cast to ms if time_index not in Millisecond precision
let expr = vec![
col(Column::from_name(label_name.clone())),
Expr::Alias(Alias {
expr: Box::new(Expr::Cast(Cast {
expr: Box::new(time_index_expr.clone()),
data_type: ArrowDataType::Timestamp(ArrowTimeUnit::Millisecond, None),
})),
relation: Some(table_ref),
name: ts_column.name.clone(),
}),
];
scan_plan = LogicalPlanBuilder::from(scan_plan)
.project(expr)
.context(DataFusionPlanningSnafu)?
.build()
.context(DataFusionPlanningSnafu)?;
};
let start = start.duration_since(UNIX_EPOCH).unwrap().as_millis() as i64;
let end = end.duration_since(UNIX_EPOCH).unwrap().as_millis() as i64;
conditions.push(build_time_filter(time_index_expr, start, end));
// Safety: `conditions` is not empty.
let filter = conjunction(conditions).unwrap();

View File

@@ -458,6 +458,7 @@ pub async fn setup_test_http_app_with_frontend_and_user_provider(
))
.with_log_ingest_handler(instance.fe_instance().clone(), None, None)
.with_logs_handler(instance.fe_instance().clone())
.with_influxdb_handler(instance.fe_instance().clone())
.with_otlp_handler(instance.fe_instance().clone())
.with_jaeger_handler(instance.fe_instance().clone())
.with_greptime_config_options(instance.opts.to_toml().unwrap());
@@ -491,11 +492,15 @@ pub async fn setup_test_prom_app_with_frontend(
// build physical table
let sql = "CREATE TABLE phy (ts timestamp time index, val double, host string primary key) engine=metric with ('physical_metric_table' = '')";
run_sql(sql, &instance).await;
let sql = "CREATE TABLE phy_ns (ts timestamp(0) time index, val double, host string primary key) engine=metric with ('physical_metric_table' = '')";
run_sql(sql, &instance).await;
// build metric tables
let sql = "CREATE TABLE demo (ts timestamp time index, val double, host string primary key) engine=metric with ('on_physical_table' = 'phy')";
run_sql(sql, &instance).await;
let sql = "CREATE TABLE demo_metrics (ts timestamp time index, val double, idc string primary key) engine=metric with ('on_physical_table' = 'phy')";
run_sql(sql, &instance).await;
let sql = "CREATE TABLE multi_labels (ts timestamp(0) time index, val double, idc string, env string, host string, primary key (idc, env, host)) engine=metric with ('on_physical_table' = 'phy_ns')";
run_sql(sql, &instance).await;
// insert rows
let sql = "INSERT INTO demo(host, val, ts) VALUES ('host1', 1.1, 0), ('host2', 2.1, 600000)";
@@ -507,6 +512,10 @@ pub async fn setup_test_prom_app_with_frontend(
let sql = "INSERT INTO demo_metrics(val, ts) VALUES (1.1, 0)";
run_sql(sql, &instance).await;
// insert rows to multi_labels
let sql = "INSERT INTO multi_labels(idc, env, host, val, ts) VALUES ('idc1', 'dev', 'host1', 1.1, 0), ('idc1', 'dev', 'host2', 2.1, 0), ('idc2', 'dev', 'host1', 1.1, 0), ('idc2', 'test', 'host3', 2.1, 0)";
run_sql(sql, &instance).await;
// build physical table
let sql = "CREATE TABLE phy2 (ts timestamp(9) time index, val double, host string primary key) engine=metric with ('physical_metric_table' = '')";
run_sql(sql, &instance).await;

View File

@@ -113,6 +113,8 @@ macro_rules! http_tests {
test_log_query,
test_jaeger_query_api,
test_jaeger_query_api_for_trace_v1,
test_influxdb_write,
);
)*
};
@@ -602,8 +604,10 @@ pub async fn test_prom_http_api(store_type: StorageType) {
assert_eq!(body.status, "success");
assert_eq!(
body.data,
serde_json::from_value::<PrometheusResponse>(json!(["__name__", "host", "idc", "number",]))
.unwrap()
serde_json::from_value::<PrometheusResponse>(json!([
"__name__", "env", "host", "idc", "number",
]))
.unwrap()
);
// labels query with multiple match[] params
@@ -724,6 +728,19 @@ pub async fn test_prom_http_api(store_type: StorageType) {
serde_json::from_value::<PrometheusResponse>(json!(["idc1"])).unwrap()
);
// match labels.
let res = client
.get("/v1/prometheus/api/v1/label/host/values?match[]=multi_labels{idc=\"idc1\", env=\"dev\"}&start=0&end=600")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<PrometheusJsonResponse>(&res.text().await).unwrap();
assert_eq!(body.status, "success");
assert_eq!(
body.data,
serde_json::from_value::<PrometheusResponse>(json!(["host1", "host2"])).unwrap()
);
// search field name
let res = client
.get("/v1/prometheus/api/v1/label/__field__/values?match[]=demo")
@@ -813,6 +830,7 @@ pub async fn test_prom_http_api(store_type: StorageType) {
"demo_metrics_with_nanos".to_string(),
"logic_table".to_string(),
"mito".to_string(),
"multi_labels".to_string(),
"numbers".to_string()
])
);
@@ -3937,6 +3955,52 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) {
guard.remove_all().await;
}
pub async fn test_influxdb_write(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) =
setup_test_http_app_with_frontend(store_type, "test_influxdb_write").await;
let client = TestClient::new(app).await;
// Only write field cpu.
let result = client
.post("/v1/influxdb/write?db=public&p=greptime&u=greptime")
.body("test_alter,host=host1 cpu=1.2 1664370459457010101")
.send()
.await;
assert_eq!(result.status(), 204);
assert!(result.text().await.is_empty());
// Only write field mem.
let result = client
.post("/v1/influxdb/write?db=public&p=greptime&u=greptime")
.body("test_alter,host=host1 mem=10240.0 1664370469457010101")
.send()
.await;
assert_eq!(result.status(), 204);
assert!(result.text().await.is_empty());
// Write field cpu & mem.
let result = client
.post("/v1/influxdb/write?db=public&p=greptime&u=greptime")
.body("test_alter,host=host1 cpu=3.2,mem=20480.0 1664370479457010101")
.send()
.await;
assert_eq!(result.status(), 204);
assert!(result.text().await.is_empty());
let expected = r#"[["host1",1.2,1664370459457010101,null],["host1",null,1664370469457010101,10240.0],["host1",3.2,1664370479457010101,20480.0]]"#;
validate_data(
"test_influxdb_write",
&client,
"select * from test_alter order by ts;",
expected,
)
.await;
guard.remove_all().await;
}
async fn validate_data(test_name: &str, client: &TestClient, sql: &str, expected: &str) {
let res = client
.get(format!("/v1/sql?sql={sql}").as_str())

View File

@@ -0,0 +1,109 @@
CREATE TABLE phy (ts timestamp time index, val double, a_label STRING, PRIMARY KEY(a_label)) engine=metric with ("physical_metric_table" = "");
Affected Rows: 0
ALTER TABLE phy ADD COLUMN b_label STRING PRIMARY KEY;
Error: 1001(Unsupported), Alter request to physical region is forbidden
ALTER TABLE phy DROP COLUMN a_label;
Error: 1004(InvalidArguments), Not allowed to remove index column a_label from table phy
ALTER TABLE phy SET 'ttl'='1d';
Affected Rows: 0
SHOW CREATE TABLE phy;
+-------+------------------------------------+
| Table | Create Table |
+-------+------------------------------------+
| phy | CREATE TABLE IF NOT EXISTS "phy" ( |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "val" DOUBLE NULL, |
| | "a_label" STRING NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("a_label") |
| | ) |
| | |
| | ENGINE=metric |
| | WITH( |
| | physical_metric_table = '', |
| | ttl = '1day' |
| | ) |
+-------+------------------------------------+
ALTER TABLE phy UNSET 'ttl';
Affected Rows: 0
SHOW CREATE TABLE phy;
+-------+------------------------------------+
| Table | Create Table |
+-------+------------------------------------+
| phy | CREATE TABLE IF NOT EXISTS "phy" ( |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "val" DOUBLE NULL, |
| | "a_label" STRING NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("a_label") |
| | ) |
| | |
| | ENGINE=metric |
| | WITH( |
| | physical_metric_table = '' |
| | ) |
+-------+------------------------------------+
ALTER TABLE phy MODIFY COLUMN a_label SET INVERTED INDEX;
Affected Rows: 0
SHOW CREATE TABLE phy;
+-------+-----------------------------------------+
| Table | Create Table |
+-------+-----------------------------------------+
| phy | CREATE TABLE IF NOT EXISTS "phy" ( |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "val" DOUBLE NULL, |
| | "a_label" STRING NULL INVERTED INDEX, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("a_label") |
| | ) |
| | |
| | ENGINE=metric |
| | WITH( |
| | physical_metric_table = '' |
| | ) |
+-------+-----------------------------------------+
ALTER TABLE phy MODIFY COLUMN a_label UNSET INVERTED INDEX;
Affected Rows: 0
SHOW CREATE TABLE phy;
+-------+------------------------------------+
| Table | Create Table |
+-------+------------------------------------+
| phy | CREATE TABLE IF NOT EXISTS "phy" ( |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "val" DOUBLE NULL, |
| | "a_label" STRING NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("a_label") |
| | ) |
| | |
| | ENGINE=metric |
| | WITH( |
| | physical_metric_table = '' |
| | ) |
+-------+------------------------------------+
DROP TABLE phy;
Affected Rows: 0

View File

@@ -0,0 +1,23 @@
CREATE TABLE phy (ts timestamp time index, val double, a_label STRING, PRIMARY KEY(a_label)) engine=metric with ("physical_metric_table" = "");
ALTER TABLE phy ADD COLUMN b_label STRING PRIMARY KEY;
ALTER TABLE phy DROP COLUMN a_label;
ALTER TABLE phy SET 'ttl'='1d';
SHOW CREATE TABLE phy;
ALTER TABLE phy UNSET 'ttl';
SHOW CREATE TABLE phy;
ALTER TABLE phy MODIFY COLUMN a_label SET INVERTED INDEX;
SHOW CREATE TABLE phy;
ALTER TABLE phy MODIFY COLUMN a_label UNSET INVERTED INDEX;
SHOW CREATE TABLE phy;
DROP TABLE phy;