Compare commits

..

6 Commits

Author SHA1 Message Date
Zhenchi
e46efb3d6c chore: bump version to 0.14.4
Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2025-06-04 15:59:41 +08:00
Yingwen
34af9580e0 fix: do not accommodate fields for multi-value protocol (#6237) 2025-06-04 15:59:41 +08:00
Lei, HUANG
b19d23d665 fix(mito): revert initial builder capacity for TimeSeriesMemtable (#6231)
* fix/initial-builder-cap:
 ### Enhance Series Initialization and Capacity Management

 - **`simple_bulk_memtable.rs`**: Updated the `Series` initialization to use `with_capacity` with a specified capacity of 8192, improving memory management.
 - **`time_series.rs`**: Introduced `with_capacity` method in `Series` to allow custom initial capacity for `ValueBuilder`. Adjusted `INITIAL_BUILDER_CAPACITY` to 16 for more efficient memory usage. Added a new `new` method to maintain backward compatibility.

* fix/initial-builder-cap:
 ### Adjust Memory Allocation in Memtable

 - **`simple_bulk_memtable.rs`**: Reduced the initial capacity of `Series` from 8192 to 1024 to optimize memory usage.
 - **`time_series.rs`**: Decreased `INITIAL_BUILDER_CAPACITY` from 16 to 4 to improve efficiency in vector building.
2025-06-04 15:59:41 +08:00
dennis zhuang
209f15dd51 fix: set column index can't work in physical table (#6179) 2025-06-04 15:59:41 +08:00
discord9
0829fb204c chore: rm unnecessary depend for flow (#6047) 2025-06-04 15:59:41 +08:00
discord9
c8e470e8ed chore: upgrade hydroflow depend (#6011)
* chore: `hydroflow` -> `dfir`

* chore: refine log msg
2025-06-04 15:59:41 +08:00
27 changed files with 449 additions and 737 deletions

801
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -68,7 +68,7 @@ members = [
resolver = "2"
[workspace.package]
version = "0.14.3"
version = "0.14.4"
edition = "2021"
license = "Apache-2.0"

View File

@@ -16,6 +16,7 @@ async-trait.workspace = true
bytes.workspace = true
cache.workspace = true
catalog.workspace = true
chrono.workspace = true
client.workspace = true
common-base.workspace = true
common-config.workspace = true
@@ -39,16 +40,13 @@ datafusion-expr.workspace = true
datafusion-physical-expr.workspace = true
datafusion-substrait.workspace = true
datatypes.workspace = true
dfir_rs = { version = "0.13.0", default-features = false }
enum-as-inner = "0.6.0"
enum_dispatch = "0.3"
futures.workspace = true
get-size2 = "0.1.2"
greptime-proto.workspace = true
# This fork of hydroflow is simply for keeping our dependency in our org, and pin the version
# otherwise it is the same with upstream repo
chrono.workspace = true
http.workspace = true
hydroflow = { git = "https://github.com/GreptimeTeam/hydroflow.git", branch = "main" }
itertools.workspace = true
lazy_static.workspace = true
meta-client.workspace = true

View File

@@ -19,8 +19,8 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use common_telemetry::info;
use dfir_rs::scheduled::graph::Dfir;
use enum_as_inner::EnumAsInner;
use hydroflow::scheduled::graph::Hydroflow;
use snafu::ensure;
use tokio::sync::{broadcast, mpsc, oneshot, Mutex};
@@ -49,9 +49,9 @@ pub fn create_worker<'a>() -> (WorkerHandle, Worker<'a>) {
(worker_handle, worker)
}
/// ActiveDataflowState is a wrapper around `Hydroflow` and `DataflowState`
/// ActiveDataflowState is a wrapper around `Dfir` and `DataflowState`
pub(crate) struct ActiveDataflowState<'subgraph> {
df: Hydroflow<'subgraph>,
df: Dfir<'subgraph>,
state: DataflowState,
err_collector: ErrCollector,
}
@@ -59,7 +59,7 @@ pub(crate) struct ActiveDataflowState<'subgraph> {
impl std::fmt::Debug for ActiveDataflowState<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ActiveDataflowState")
.field("df", &"<Hydroflow>")
.field("df", &"<Dfir>")
.field("state", &self.state)
.field("err_collector", &self.err_collector)
.finish()
@@ -69,7 +69,7 @@ impl std::fmt::Debug for ActiveDataflowState<'_> {
impl Default for ActiveDataflowState<'_> {
fn default() -> Self {
ActiveDataflowState {
df: Hydroflow::new(),
df: Dfir::new(),
state: DataflowState::default(),
err_collector: ErrCollector::default(),
}

View File

@@ -304,7 +304,7 @@ impl BatchingEngine {
})
.transpose()?;
info!(
debug!(
"Flow id={}, found time window expr={}",
flow_id,
phy_expr

View File

@@ -179,7 +179,7 @@ impl BatchingTask {
frontend_client: &Arc<FrontendClient>,
) -> Result<Option<(u32, Duration)>, Error> {
if let Some(new_query) = self.gen_insert_plan(engine).await? {
debug!("Generate new query: {:#?}", new_query);
debug!("Generate new query: {}", new_query);
self.execute_logical_plan(frontend_client, &new_query).await
} else {
debug!("Generate no query");

View File

@@ -138,9 +138,12 @@ impl TreeNodeVisitor<'_> for FindGroupByFinalName {
fn f_down(&mut self, node: &Self::Node) -> datafusion_common::Result<TreeNodeRecursion> {
if let LogicalPlan::Aggregate(aggregate) = node {
self.group_exprs = Some(aggregate.group_expr.iter().cloned().collect());
debug!("Group by exprs: {:?}", self.group_exprs);
debug!(
"FindGroupByFinalName: Get Group by exprs from Aggregate: {:?}",
self.group_exprs
);
} else if let LogicalPlan::Distinct(distinct) = node {
debug!("Distinct: {:#?}", distinct);
debug!("FindGroupByFinalName: Distinct: {}", node);
match distinct {
Distinct::All(input) => {
if let LogicalPlan::TableScan(table_scan) = &**input {
@@ -162,7 +165,10 @@ impl TreeNodeVisitor<'_> for FindGroupByFinalName {
self.group_exprs = Some(distinct_on.on_expr.iter().cloned().collect())
}
}
debug!("Group by exprs: {:?}", self.group_exprs);
debug!(
"FindGroupByFinalName: Get Group by exprs from Distinct: {:?}",
self.group_exprs
);
}
Ok(TreeNodeRecursion::Continue)

View File

@@ -18,9 +18,9 @@
use std::collections::BTreeMap;
use hydroflow::scheduled::graph::Hydroflow;
use hydroflow::scheduled::graph_ext::GraphExt;
use hydroflow::scheduled::port::{PortCtx, SEND};
use dfir_rs::scheduled::graph::Dfir;
use dfir_rs::scheduled::graph_ext::GraphExt;
use dfir_rs::scheduled::port::{PortCtx, SEND};
use itertools::Itertools;
use snafu::OptionExt;
@@ -38,7 +38,7 @@ mod src_sink;
/// The Context for build a Operator with id of `GlobalId`
pub struct Context<'referred, 'df> {
pub id: GlobalId,
pub df: &'referred mut Hydroflow<'df>,
pub df: &'referred mut Dfir<'df>,
pub compute_state: &'referred mut DataflowState,
/// a list of all collections being used in the operator
///
@@ -361,16 +361,16 @@ mod test {
use std::cell::RefCell;
use std::rc::Rc;
use hydroflow::scheduled::graph::Hydroflow;
use hydroflow::scheduled::graph_ext::GraphExt;
use hydroflow::scheduled::handoff::VecHandoff;
use dfir_rs::scheduled::graph::Dfir;
use dfir_rs::scheduled::graph_ext::GraphExt;
use dfir_rs::scheduled::handoff::VecHandoff;
use pretty_assertions::assert_eq;
use super::*;
use crate::repr::Row;
pub fn run_and_check(
state: &mut DataflowState,
df: &mut Hydroflow,
df: &mut Dfir,
time_range: std::ops::Range<i64>,
expected: BTreeMap<i64, Vec<DiffRow>>,
output: Rc<RefCell<Vec<DiffRow>>>,
@@ -416,7 +416,7 @@ mod test {
}
pub fn harness_test_ctx<'r, 'h>(
df: &'r mut Hydroflow<'h>,
df: &'r mut Dfir<'h>,
state: &'r mut DataflowState,
) -> Context<'r, 'h> {
let err_collector = state.get_err_collector();
@@ -436,7 +436,7 @@ mod test {
/// that is it only emit once, not multiple times
#[test]
fn test_render_constant() {
let mut df = Hydroflow::new();
let mut df = Dfir::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);
@@ -473,7 +473,7 @@ mod test {
/// a simple example to show how to use source and sink
#[test]
fn example_source_sink() {
let mut df = Hydroflow::new();
let mut df = Dfir::new();
let (send_port, recv_port) = df.make_edge::<_, VecHandoff<i32>>("test_handoff");
df.add_subgraph_source("test_handoff_source", send_port, move |_ctx, send| {
for i in 0..10 {
@@ -498,8 +498,8 @@ mod test {
#[test]
fn test_tee_auto_schedule() {
use hydroflow::scheduled::handoff::TeeingHandoff as Toff;
let mut df = Hydroflow::new();
use dfir_rs::scheduled::handoff::TeeingHandoff as Toff;
let mut df = Dfir::new();
let (send_port, recv_port) = df.make_edge::<_, Toff<i32>>("test_handoff");
let source = df.add_subgraph_source("test_handoff_source", send_port, move |_ctx, send| {
for i in 0..10 {

View File

@@ -14,8 +14,8 @@
use std::collections::BTreeMap;
use hydroflow::scheduled::graph_ext::GraphExt;
use hydroflow::scheduled::port::{PortCtx, SEND};
use dfir_rs::scheduled::graph_ext::GraphExt;
use dfir_rs::scheduled::port::{PortCtx, SEND};
use itertools::Itertools;
use snafu::OptionExt;
@@ -256,7 +256,7 @@ fn eval_mfp_core(
mod test {
use datatypes::data_type::ConcreteDataType;
use hydroflow::scheduled::graph::Hydroflow;
use dfir_rs::scheduled::graph::Dfir;
use super::*;
use crate::compute::render::test::{get_output_handle, harness_test_ctx, run_and_check};
@@ -269,7 +269,7 @@ mod test {
/// namely: if mfp operator can schedule a delete at the correct time
#[test]
fn test_render_mfp_with_temporal() {
let mut df = Hydroflow::new();
let mut df = Dfir::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);
@@ -348,7 +348,7 @@ mod test {
/// that is it filter the rows correctly
#[test]
fn test_render_mfp() {
let mut df = Hydroflow::new();
let mut df = Dfir::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);
@@ -388,7 +388,7 @@ mod test {
/// test if mfp operator can run multiple times within same tick
#[test]
fn test_render_mfp_multiple_times() {
let mut df = Hydroflow::new();
let mut df = Dfir::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);

View File

@@ -22,7 +22,7 @@ use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::DataType;
use datatypes::value::{ListValue, Value};
use datatypes::vectors::{BooleanVector, NullVector};
use hydroflow::scheduled::graph_ext::GraphExt;
use dfir_rs::scheduled::graph_ext::GraphExt;
use itertools::Itertools;
use snafu::{ensure, OptionExt, ResultExt};
@@ -1212,7 +1212,7 @@ mod test {
use common_time::Timestamp;
use datatypes::data_type::{ConcreteDataType, ConcreteDataType as CDT};
use hydroflow::scheduled::graph::Hydroflow;
use dfir_rs::scheduled::graph::Dfir;
use super::*;
use crate::compute::render::test::{get_output_handle, harness_test_ctx, run_and_check};
@@ -1228,7 +1228,7 @@ mod test {
/// expected: sum(number), window_start, window_end
#[test]
fn test_tumble_group_by() {
let mut df = Hydroflow::new();
let mut df = Dfir::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);
const START: i64 = 1625097600000;
@@ -1389,7 +1389,7 @@ mod test {
/// select avg(number) from number;
#[test]
fn test_avg_eval() {
let mut df = Hydroflow::new();
let mut df = Dfir::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);
@@ -1500,7 +1500,7 @@ mod test {
/// | col | Int64 |
#[test]
fn test_basic_distinct() {
let mut df = Hydroflow::new();
let mut df = Dfir::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);
@@ -1556,7 +1556,7 @@ mod test {
/// | col | Int64 |
#[test]
fn test_basic_batch_reduce_accum() {
let mut df = Hydroflow::new();
let mut df = Dfir::new();
let mut state = DataflowState::default();
let now = state.current_time_ref();
let mut ctx = harness_test_ctx(&mut df, &mut state);
@@ -1662,7 +1662,7 @@ mod test {
/// | col | Int64 |
#[test]
fn test_basic_reduce_accum() {
let mut df = Hydroflow::new();
let mut df = Dfir::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);
@@ -1739,7 +1739,7 @@ mod test {
/// this test include even more insert/delete case to cover all case for eval_distinct_core
#[test]
fn test_delete_reduce_distinct_accum() {
let mut df = Hydroflow::new();
let mut df = Dfir::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);
@@ -1818,7 +1818,7 @@ mod test {
/// this test include insert and delete which should cover all case for eval_distinct_core
#[test]
fn test_basic_reduce_distinct_accum() {
let mut df = Hydroflow::new();
let mut df = Dfir::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);
@@ -1896,7 +1896,7 @@ mod test {
/// | col | Int64 |
#[test]
fn test_composite_reduce_distinct_accum() {
let mut df = Hydroflow::new();
let mut df = Dfir::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);

View File

@@ -17,7 +17,7 @@
use std::collections::BTreeMap;
use common_telemetry::{debug, trace};
use hydroflow::scheduled::graph_ext::GraphExt;
use dfir_rs::scheduled::graph_ext::GraphExt;
use itertools::Itertools;
use snafu::OptionExt;
use tokio::sync::broadcast::error::TryRecvError;

View File

@@ -16,16 +16,16 @@ use std::cell::RefCell;
use std::collections::{BTreeMap, VecDeque};
use std::rc::Rc;
use dfir_rs::scheduled::graph::Dfir;
use dfir_rs::scheduled::SubgraphId;
use get_size2::GetSize;
use hydroflow::scheduled::graph::Hydroflow;
use hydroflow::scheduled::SubgraphId;
use crate::compute::types::ErrCollector;
use crate::repr::{self, Timestamp};
use crate::utils::{ArrangeHandler, Arrangement};
/// input/output of a dataflow
/// One `ComputeState` manage the input/output/schedule of one `Hydroflow`
/// One `ComputeState` manage the input/output/schedule of one `Dfir`
#[derive(Debug, Default)]
pub struct DataflowState {
/// it is important to use a deque to maintain the order of subgraph here
@@ -38,7 +38,7 @@ pub struct DataflowState {
/// Which means it's also the current time in temporal filter to get current correct result
as_of: Rc<RefCell<Timestamp>>,
/// error collector local to this `ComputeState`,
/// useful for distinguishing errors from different `Hydroflow`
/// useful for distinguishing errors from different `Dfir`
err_collector: ErrCollector,
/// save all used arrange in this dataflow, since usually there is no delete operation
/// we can just keep track of all used arrange and schedule subgraph when they need to be updated
@@ -65,7 +65,7 @@ impl DataflowState {
/// schedule all subgraph that need to run with time <= `as_of` and run_available()
///
/// return true if any subgraph actually executed
pub fn run_available_with_schedule(&mut self, df: &mut Hydroflow) -> bool {
pub fn run_available_with_schedule(&mut self, df: &mut Dfir) -> bool {
// first split keys <= as_of into another map
let mut before = self
.schedule_subgraph

View File

@@ -18,10 +18,10 @@ use std::rc::Rc;
use std::sync::Arc;
use common_error::ext::ErrorExt;
use hydroflow::scheduled::graph::Hydroflow;
use hydroflow::scheduled::handoff::TeeingHandoff;
use hydroflow::scheduled::port::RecvPort;
use hydroflow::scheduled::SubgraphId;
use dfir_rs::scheduled::graph::Dfir;
use dfir_rs::scheduled::handoff::TeeingHandoff;
use dfir_rs::scheduled::port::RecvPort;
use dfir_rs::scheduled::SubgraphId;
use itertools::Itertools;
use tokio::sync::Mutex;
@@ -46,7 +46,7 @@ impl<T: 'static + Clone> Collection<T> {
/// clone a collection, require a mutable reference to the hydroflow instance
///
/// Note: need to be the same hydroflow instance that this collection is created from
pub fn clone(&self, df: &mut Hydroflow) -> Self {
pub fn clone(&self, df: &mut Dfir) -> Self {
Collection {
stream: self.stream.tee(df),
}
@@ -151,7 +151,7 @@ impl<T: 'static> CollectionBundle<T> {
}
impl<T: 'static + Clone> CollectionBundle<T> {
pub fn clone(&self, df: &mut Hydroflow) -> Self {
pub fn clone(&self, df: &mut Dfir) -> Self {
Self {
collection: self.collection.clone(df),
arranged: self

View File

@@ -21,7 +21,7 @@ use common_error::ext::BoxedError;
use datatypes::prelude::{ConcreteDataType, DataType};
use datatypes::value::Value;
use datatypes::vectors::{BooleanVector, Helper, VectorRef};
use hydroflow::lattices::cc_traits::Iter;
use dfir_rs::lattices::cc_traits::Iter;
use itertools::Itertools;
use snafu::{ensure, OptionExt, ResultExt};

View File

@@ -581,7 +581,7 @@ impl FrontendInvoker {
.start_timer();
self.inserter
.handle_row_inserts(requests, ctx, &self.statement_executor, false)
.handle_row_inserts(requests, ctx, &self.statement_executor, false, false)
.await
.map_err(BoxedError::new)
.context(common_frontend::error::ExternalSnafu)

View File

@@ -73,7 +73,7 @@ impl GrpcQueryHandler for Instance {
let output = match request {
Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?,
Request::RowInserts(requests) => {
self.handle_row_inserts(requests, ctx.clone(), false)
self.handle_row_inserts(requests, ctx.clone(), false, false)
.await?
}
Request::Deletes(requests) => self.handle_deletes(requests, ctx.clone()).await?,
@@ -411,6 +411,7 @@ impl Instance {
requests: RowInsertRequests,
ctx: QueryContextRef,
accommodate_existing_schema: bool,
is_single_value: bool,
) -> Result<Output> {
self.inserter
.handle_row_inserts(
@@ -418,6 +419,7 @@ impl Instance {
ctx,
self.statement_executor.as_ref(),
accommodate_existing_schema,
is_single_value,
)
.await
.context(TableOperationSnafu)
@@ -430,7 +432,14 @@ impl Instance {
ctx: QueryContextRef,
) -> Result<Output> {
self.inserter
.handle_last_non_null_inserts(requests, ctx, self.statement_executor.as_ref(), true)
.handle_last_non_null_inserts(
requests,
ctx,
self.statement_executor.as_ref(),
true,
// Influx protocol may writes multiple fields (values).
false,
)
.await
.context(TableOperationSnafu)
}

View File

@@ -52,8 +52,9 @@ impl OpentsdbProtocolHandler for Instance {
None
};
// OpenTSDB is single value.
let output = self
.handle_row_inserts(requests, ctx, true)
.handle_row_inserts(requests, ctx, true, true)
.await
.map_err(BoxedError::new)
.context(servers::error::ExecuteGrpcQuerySnafu)?;

View File

@@ -63,7 +63,7 @@ impl OpenTelemetryProtocolHandler for Instance {
None
};
self.handle_row_inserts(requests, ctx, false)
self.handle_row_inserts(requests, ctx, false, false)
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)

View File

@@ -195,7 +195,7 @@ impl PromStoreProtocolHandler for Instance {
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)?
} else {
self.handle_row_inserts(request, ctx.clone(), true)
self.handle_row_inserts(request, ctx.clone(), true, true)
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)?

View File

@@ -206,7 +206,9 @@ impl DataRegion {
) -> Result<AffectedRows> {
match request.kind {
AlterKind::SetRegionOptions { options: _ }
| AlterKind::UnsetRegionOptions { keys: _ } => {
| AlterKind::UnsetRegionOptions { keys: _ }
| AlterKind::SetIndex { options: _ }
| AlterKind::UnsetIndex { options: _ } => {
let region_id = utils::to_data_region_id(region_id);
self.mito
.handle_request(region_id, RegionRequest::Alter(request))

View File

@@ -57,7 +57,7 @@ use crate::region::options::MergeMode;
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
/// Initial vector builder capacity.
const INITIAL_BUILDER_CAPACITY: usize = 16;
const INITIAL_BUILDER_CAPACITY: usize = 4;
/// Vector builder capacity.
const BUILDER_CAPACITY: usize = 512;
@@ -645,15 +645,19 @@ struct Series {
}
impl Series {
fn new(region_metadata: &RegionMetadataRef) -> Self {
pub(crate) fn with_capacity(region_metadata: &RegionMetadataRef, builder_cap: usize) -> Self {
Self {
pk_cache: None,
active: ValueBuilder::new(region_metadata, INITIAL_BUILDER_CAPACITY),
active: ValueBuilder::new(region_metadata, builder_cap),
frozen: vec![],
region_metadata: region_metadata.clone(),
}
}
pub(crate) fn new(region_metadata: &RegionMetadataRef) -> Self {
Self::with_capacity(region_metadata, INITIAL_BUILDER_CAPACITY)
}
/// Pushes a row of values into Series. Return the size of values.
fn push<'a>(
&mut self,

View File

@@ -147,7 +147,7 @@ impl Inserter {
statement_executor: &StatementExecutor,
) -> Result<Output> {
let row_inserts = ColumnToRow::convert(requests)?;
self.handle_row_inserts(row_inserts, ctx, statement_executor, false)
self.handle_row_inserts(row_inserts, ctx, statement_executor, false, false)
.await
}
@@ -158,6 +158,7 @@ impl Inserter {
ctx: QueryContextRef,
statement_executor: &StatementExecutor,
accommodate_existing_schema: bool,
is_single_value: bool,
) -> Result<Output> {
preprocess_row_insert_requests(&mut requests.inserts)?;
self.handle_row_inserts_with_create_type(
@@ -166,6 +167,7 @@ impl Inserter {
statement_executor,
AutoCreateTableType::Physical,
accommodate_existing_schema,
is_single_value,
)
.await
}
@@ -183,6 +185,7 @@ impl Inserter {
statement_executor,
AutoCreateTableType::Log,
false,
false,
)
.await
}
@@ -199,6 +202,7 @@ impl Inserter {
statement_executor,
AutoCreateTableType::Trace,
false,
false,
)
.await
}
@@ -210,6 +214,7 @@ impl Inserter {
ctx: QueryContextRef,
statement_executor: &StatementExecutor,
accommodate_existing_schema: bool,
is_single_value: bool,
) -> Result<Output> {
self.handle_row_inserts_with_create_type(
requests,
@@ -217,6 +222,7 @@ impl Inserter {
statement_executor,
AutoCreateTableType::LastNonNull,
accommodate_existing_schema,
is_single_value,
)
.await
}
@@ -229,6 +235,7 @@ impl Inserter {
statement_executor: &StatementExecutor,
create_type: AutoCreateTableType,
accommodate_existing_schema: bool,
is_single_value: bool,
) -> Result<Output> {
// remove empty requests
requests.inserts.retain(|req| {
@@ -249,6 +256,7 @@ impl Inserter {
create_type,
statement_executor,
accommodate_existing_schema,
is_single_value,
)
.await?;
@@ -299,6 +307,7 @@ impl Inserter {
AutoCreateTableType::Logical(physical_table.to_string()),
statement_executor,
true,
true,
)
.await?;
let name_to_info = table_infos
@@ -464,9 +473,10 @@ impl Inserter {
/// This mapping is used in the conversion of RowToRegion.
///
/// `accommodate_existing_schema` is used to determine if the existing schema should override the new schema.
/// It only works for TIME_INDEX and VALUE columns. This is for the case where the user creates a table with
/// It only works for TIME_INDEX and single VALUE columns. This is for the case where the user creates a table with
/// custom schema, and then inserts data with endpoints that have default schema setting, like prometheus
/// remote write. This will modify the `RowInsertRequests` in place.
/// `is_single_value` indicates whether the default schema only contains single value column so we can accommodate it.
async fn create_or_alter_tables_on_demand(
&self,
requests: &mut RowInsertRequests,
@@ -474,6 +484,7 @@ impl Inserter {
auto_create_table_type: AutoCreateTableType,
statement_executor: &StatementExecutor,
accommodate_existing_schema: bool,
is_single_value: bool,
) -> Result<CreateAlterTableResult> {
let _timer = crate::metrics::CREATE_ALTER_ON_DEMAND
.with_label_values(&[auto_create_table_type.as_str()])
@@ -537,6 +548,7 @@ impl Inserter {
&table,
ctx,
accommodate_existing_schema,
is_single_value,
)? {
alter_tables.push(alter_expr);
}
@@ -811,12 +823,15 @@ impl Inserter {
/// When `accommodate_existing_schema` is true, it may modify the input `req` to
/// accommodate it with existing schema. See [`create_or_alter_tables_on_demand`](Self::create_or_alter_tables_on_demand)
/// for more details.
/// When `accommodate_existing_schema` is true and `is_single_value` is true, it also consider fields when modifying the
/// input `req`.
fn get_alter_table_expr_on_demand(
&self,
req: &mut RowInsertRequest,
table: &TableRef,
ctx: &QueryContextRef,
accommodate_existing_schema: bool,
is_single_value: bool,
) -> Result<Option<AlterTableExpr>> {
let catalog_name = ctx.current_catalog();
let schema_name = ctx.current_schema();
@@ -834,18 +849,20 @@ impl Inserter {
let table_schema = table.schema();
// Find timestamp column name
let ts_col_name = table_schema.timestamp_column().map(|c| c.name.clone());
// Find field column name if there is only one
// Find field column name if there is only one and `is_single_value` is true.
let mut field_col_name = None;
let mut multiple_field_cols = false;
table.field_columns().for_each(|col| {
if field_col_name.is_none() {
field_col_name = Some(col.name.clone());
} else {
multiple_field_cols = true;
if is_single_value {
let mut multiple_field_cols = false;
table.field_columns().for_each(|col| {
if field_col_name.is_none() {
field_col_name = Some(col.name.clone());
} else {
multiple_field_cols = true;
}
});
if multiple_field_cols {
field_col_name = None;
}
});
if multiple_field_cols {
field_col_name = None;
}
// Update column name in request schema for Timestamp/Field columns
@@ -871,11 +888,11 @@ impl Inserter {
}
}
// Remove from add_columns any column that is timestamp or field (if there is only one field column)
// Only keep columns that are tags or non-single field.
add_columns.add_columns.retain(|col| {
let def = col.column_def.as_ref().unwrap();
def.semantic_type != SemanticType::Timestamp as i32
&& (def.semantic_type != SemanticType::Field as i32 && field_col_name.is_some())
def.semantic_type == SemanticType::Tag as i32
|| (def.semantic_type == SemanticType::Field as i32 && field_col_name.is_none())
});
if add_columns.add_columns.is_empty() {
@@ -1227,7 +1244,7 @@ mod tests {
)),
);
let alter_expr = inserter
.get_alter_table_expr_on_demand(&mut req, &table, &ctx, true)
.get_alter_table_expr_on_demand(&mut req, &table, &ctx, true, true)
.unwrap();
assert!(alter_expr.is_none());

View File

@@ -250,6 +250,7 @@ impl PipelineTable {
Self::query_ctx(&table_info),
&self.statement_executor,
false,
false,
)
.await
.context(InsertPipelineSnafu)?;

View File

@@ -458,6 +458,7 @@ pub async fn setup_test_http_app_with_frontend_and_user_provider(
))
.with_log_ingest_handler(instance.fe_instance().clone(), None, None)
.with_logs_handler(instance.fe_instance().clone())
.with_influxdb_handler(instance.fe_instance().clone())
.with_otlp_handler(instance.fe_instance().clone())
.with_jaeger_handler(instance.fe_instance().clone())
.with_greptime_config_options(instance.opts.to_toml().unwrap());

View File

@@ -113,6 +113,8 @@ macro_rules! http_tests {
test_log_query,
test_jaeger_query_api,
test_jaeger_query_api_for_trace_v1,
test_influxdb_write,
);
)*
};
@@ -3937,6 +3939,52 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) {
guard.remove_all().await;
}
pub async fn test_influxdb_write(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) =
setup_test_http_app_with_frontend(store_type, "test_influxdb_write").await;
let client = TestClient::new(app).await;
// Only write field cpu.
let result = client
.post("/v1/influxdb/write?db=public&p=greptime&u=greptime")
.body("test_alter,host=host1 cpu=1.2 1664370459457010101")
.send()
.await;
assert_eq!(result.status(), 204);
assert!(result.text().await.is_empty());
// Only write field mem.
let result = client
.post("/v1/influxdb/write?db=public&p=greptime&u=greptime")
.body("test_alter,host=host1 mem=10240.0 1664370469457010101")
.send()
.await;
assert_eq!(result.status(), 204);
assert!(result.text().await.is_empty());
// Write field cpu & mem.
let result = client
.post("/v1/influxdb/write?db=public&p=greptime&u=greptime")
.body("test_alter,host=host1 cpu=3.2,mem=20480.0 1664370479457010101")
.send()
.await;
assert_eq!(result.status(), 204);
assert!(result.text().await.is_empty());
let expected = r#"[["host1",1.2,1664370459457010101,null],["host1",null,1664370469457010101,10240.0],["host1",3.2,1664370479457010101,20480.0]]"#;
validate_data(
"test_influxdb_write",
&client,
"select * from test_alter order by ts;",
expected,
)
.await;
guard.remove_all().await;
}
async fn validate_data(test_name: &str, client: &TestClient, sql: &str, expected: &str) {
let res = client
.get(format!("/v1/sql?sql={sql}").as_str())

View File

@@ -0,0 +1,109 @@
CREATE TABLE phy (ts timestamp time index, val double, a_label STRING, PRIMARY KEY(a_label)) engine=metric with ("physical_metric_table" = "");
Affected Rows: 0
ALTER TABLE phy ADD COLUMN b_label STRING PRIMARY KEY;
Error: 1001(Unsupported), Alter request to physical region is forbidden
ALTER TABLE phy DROP COLUMN a_label;
Error: 1004(InvalidArguments), Not allowed to remove index column a_label from table phy
ALTER TABLE phy SET 'ttl'='1d';
Affected Rows: 0
SHOW CREATE TABLE phy;
+-------+------------------------------------+
| Table | Create Table |
+-------+------------------------------------+
| phy | CREATE TABLE IF NOT EXISTS "phy" ( |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "val" DOUBLE NULL, |
| | "a_label" STRING NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("a_label") |
| | ) |
| | |
| | ENGINE=metric |
| | WITH( |
| | physical_metric_table = '', |
| | ttl = '1day' |
| | ) |
+-------+------------------------------------+
ALTER TABLE phy UNSET 'ttl';
Affected Rows: 0
SHOW CREATE TABLE phy;
+-------+------------------------------------+
| Table | Create Table |
+-------+------------------------------------+
| phy | CREATE TABLE IF NOT EXISTS "phy" ( |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "val" DOUBLE NULL, |
| | "a_label" STRING NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("a_label") |
| | ) |
| | |
| | ENGINE=metric |
| | WITH( |
| | physical_metric_table = '' |
| | ) |
+-------+------------------------------------+
ALTER TABLE phy MODIFY COLUMN a_label SET INVERTED INDEX;
Affected Rows: 0
SHOW CREATE TABLE phy;
+-------+-----------------------------------------+
| Table | Create Table |
+-------+-----------------------------------------+
| phy | CREATE TABLE IF NOT EXISTS "phy" ( |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "val" DOUBLE NULL, |
| | "a_label" STRING NULL INVERTED INDEX, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("a_label") |
| | ) |
| | |
| | ENGINE=metric |
| | WITH( |
| | physical_metric_table = '' |
| | ) |
+-------+-----------------------------------------+
ALTER TABLE phy MODIFY COLUMN a_label UNSET INVERTED INDEX;
Affected Rows: 0
SHOW CREATE TABLE phy;
+-------+------------------------------------+
| Table | Create Table |
+-------+------------------------------------+
| phy | CREATE TABLE IF NOT EXISTS "phy" ( |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "val" DOUBLE NULL, |
| | "a_label" STRING NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("a_label") |
| | ) |
| | |
| | ENGINE=metric |
| | WITH( |
| | physical_metric_table = '' |
| | ) |
+-------+------------------------------------+
DROP TABLE phy;
Affected Rows: 0

View File

@@ -0,0 +1,23 @@
CREATE TABLE phy (ts timestamp time index, val double, a_label STRING, PRIMARY KEY(a_label)) engine=metric with ("physical_metric_table" = "");
ALTER TABLE phy ADD COLUMN b_label STRING PRIMARY KEY;
ALTER TABLE phy DROP COLUMN a_label;
ALTER TABLE phy SET 'ttl'='1d';
SHOW CREATE TABLE phy;
ALTER TABLE phy UNSET 'ttl';
SHOW CREATE TABLE phy;
ALTER TABLE phy MODIFY COLUMN a_label SET INVERTED INDEX;
SHOW CREATE TABLE phy;
ALTER TABLE phy MODIFY COLUMN a_label UNSET INVERTED INDEX;
SHOW CREATE TABLE phy;
DROP TABLE phy;