mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-28 00:42:56 +00:00
Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e46efb3d6c | ||
|
|
34af9580e0 | ||
|
|
b19d23d665 | ||
|
|
209f15dd51 | ||
|
|
0829fb204c | ||
|
|
c8e470e8ed |
801
Cargo.lock
generated
801
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -68,7 +68,7 @@ members = [
|
||||
resolver = "2"
|
||||
|
||||
[workspace.package]
|
||||
version = "0.14.3"
|
||||
version = "0.14.4"
|
||||
edition = "2021"
|
||||
license = "Apache-2.0"
|
||||
|
||||
|
||||
@@ -16,6 +16,7 @@ async-trait.workspace = true
|
||||
bytes.workspace = true
|
||||
cache.workspace = true
|
||||
catalog.workspace = true
|
||||
chrono.workspace = true
|
||||
client.workspace = true
|
||||
common-base.workspace = true
|
||||
common-config.workspace = true
|
||||
@@ -39,16 +40,13 @@ datafusion-expr.workspace = true
|
||||
datafusion-physical-expr.workspace = true
|
||||
datafusion-substrait.workspace = true
|
||||
datatypes.workspace = true
|
||||
dfir_rs = { version = "0.13.0", default-features = false }
|
||||
enum-as-inner = "0.6.0"
|
||||
enum_dispatch = "0.3"
|
||||
futures.workspace = true
|
||||
get-size2 = "0.1.2"
|
||||
greptime-proto.workspace = true
|
||||
# This fork of hydroflow is simply for keeping our dependency in our org, and pin the version
|
||||
# otherwise it is the same with upstream repo
|
||||
chrono.workspace = true
|
||||
http.workspace = true
|
||||
hydroflow = { git = "https://github.com/GreptimeTeam/hydroflow.git", branch = "main" }
|
||||
itertools.workspace = true
|
||||
lazy_static.workspace = true
|
||||
meta-client.workspace = true
|
||||
|
||||
@@ -19,8 +19,8 @@ use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_telemetry::info;
|
||||
use dfir_rs::scheduled::graph::Dfir;
|
||||
use enum_as_inner::EnumAsInner;
|
||||
use hydroflow::scheduled::graph::Hydroflow;
|
||||
use snafu::ensure;
|
||||
use tokio::sync::{broadcast, mpsc, oneshot, Mutex};
|
||||
|
||||
@@ -49,9 +49,9 @@ pub fn create_worker<'a>() -> (WorkerHandle, Worker<'a>) {
|
||||
(worker_handle, worker)
|
||||
}
|
||||
|
||||
/// ActiveDataflowState is a wrapper around `Hydroflow` and `DataflowState`
|
||||
/// ActiveDataflowState is a wrapper around `Dfir` and `DataflowState`
|
||||
pub(crate) struct ActiveDataflowState<'subgraph> {
|
||||
df: Hydroflow<'subgraph>,
|
||||
df: Dfir<'subgraph>,
|
||||
state: DataflowState,
|
||||
err_collector: ErrCollector,
|
||||
}
|
||||
@@ -59,7 +59,7 @@ pub(crate) struct ActiveDataflowState<'subgraph> {
|
||||
impl std::fmt::Debug for ActiveDataflowState<'_> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("ActiveDataflowState")
|
||||
.field("df", &"<Hydroflow>")
|
||||
.field("df", &"<Dfir>")
|
||||
.field("state", &self.state)
|
||||
.field("err_collector", &self.err_collector)
|
||||
.finish()
|
||||
@@ -69,7 +69,7 @@ impl std::fmt::Debug for ActiveDataflowState<'_> {
|
||||
impl Default for ActiveDataflowState<'_> {
|
||||
fn default() -> Self {
|
||||
ActiveDataflowState {
|
||||
df: Hydroflow::new(),
|
||||
df: Dfir::new(),
|
||||
state: DataflowState::default(),
|
||||
err_collector: ErrCollector::default(),
|
||||
}
|
||||
|
||||
@@ -304,7 +304,7 @@ impl BatchingEngine {
|
||||
})
|
||||
.transpose()?;
|
||||
|
||||
info!(
|
||||
debug!(
|
||||
"Flow id={}, found time window expr={}",
|
||||
flow_id,
|
||||
phy_expr
|
||||
|
||||
@@ -179,7 +179,7 @@ impl BatchingTask {
|
||||
frontend_client: &Arc<FrontendClient>,
|
||||
) -> Result<Option<(u32, Duration)>, Error> {
|
||||
if let Some(new_query) = self.gen_insert_plan(engine).await? {
|
||||
debug!("Generate new query: {:#?}", new_query);
|
||||
debug!("Generate new query: {}", new_query);
|
||||
self.execute_logical_plan(frontend_client, &new_query).await
|
||||
} else {
|
||||
debug!("Generate no query");
|
||||
|
||||
@@ -138,9 +138,12 @@ impl TreeNodeVisitor<'_> for FindGroupByFinalName {
|
||||
fn f_down(&mut self, node: &Self::Node) -> datafusion_common::Result<TreeNodeRecursion> {
|
||||
if let LogicalPlan::Aggregate(aggregate) = node {
|
||||
self.group_exprs = Some(aggregate.group_expr.iter().cloned().collect());
|
||||
debug!("Group by exprs: {:?}", self.group_exprs);
|
||||
debug!(
|
||||
"FindGroupByFinalName: Get Group by exprs from Aggregate: {:?}",
|
||||
self.group_exprs
|
||||
);
|
||||
} else if let LogicalPlan::Distinct(distinct) = node {
|
||||
debug!("Distinct: {:#?}", distinct);
|
||||
debug!("FindGroupByFinalName: Distinct: {}", node);
|
||||
match distinct {
|
||||
Distinct::All(input) => {
|
||||
if let LogicalPlan::TableScan(table_scan) = &**input {
|
||||
@@ -162,7 +165,10 @@ impl TreeNodeVisitor<'_> for FindGroupByFinalName {
|
||||
self.group_exprs = Some(distinct_on.on_expr.iter().cloned().collect())
|
||||
}
|
||||
}
|
||||
debug!("Group by exprs: {:?}", self.group_exprs);
|
||||
debug!(
|
||||
"FindGroupByFinalName: Get Group by exprs from Distinct: {:?}",
|
||||
self.group_exprs
|
||||
);
|
||||
}
|
||||
|
||||
Ok(TreeNodeRecursion::Continue)
|
||||
|
||||
@@ -18,9 +18,9 @@
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use hydroflow::scheduled::graph::Hydroflow;
|
||||
use hydroflow::scheduled::graph_ext::GraphExt;
|
||||
use hydroflow::scheduled::port::{PortCtx, SEND};
|
||||
use dfir_rs::scheduled::graph::Dfir;
|
||||
use dfir_rs::scheduled::graph_ext::GraphExt;
|
||||
use dfir_rs::scheduled::port::{PortCtx, SEND};
|
||||
use itertools::Itertools;
|
||||
use snafu::OptionExt;
|
||||
|
||||
@@ -38,7 +38,7 @@ mod src_sink;
|
||||
/// The Context for build a Operator with id of `GlobalId`
|
||||
pub struct Context<'referred, 'df> {
|
||||
pub id: GlobalId,
|
||||
pub df: &'referred mut Hydroflow<'df>,
|
||||
pub df: &'referred mut Dfir<'df>,
|
||||
pub compute_state: &'referred mut DataflowState,
|
||||
/// a list of all collections being used in the operator
|
||||
///
|
||||
@@ -361,16 +361,16 @@ mod test {
|
||||
use std::cell::RefCell;
|
||||
use std::rc::Rc;
|
||||
|
||||
use hydroflow::scheduled::graph::Hydroflow;
|
||||
use hydroflow::scheduled::graph_ext::GraphExt;
|
||||
use hydroflow::scheduled::handoff::VecHandoff;
|
||||
use dfir_rs::scheduled::graph::Dfir;
|
||||
use dfir_rs::scheduled::graph_ext::GraphExt;
|
||||
use dfir_rs::scheduled::handoff::VecHandoff;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
use super::*;
|
||||
use crate::repr::Row;
|
||||
pub fn run_and_check(
|
||||
state: &mut DataflowState,
|
||||
df: &mut Hydroflow,
|
||||
df: &mut Dfir,
|
||||
time_range: std::ops::Range<i64>,
|
||||
expected: BTreeMap<i64, Vec<DiffRow>>,
|
||||
output: Rc<RefCell<Vec<DiffRow>>>,
|
||||
@@ -416,7 +416,7 @@ mod test {
|
||||
}
|
||||
|
||||
pub fn harness_test_ctx<'r, 'h>(
|
||||
df: &'r mut Hydroflow<'h>,
|
||||
df: &'r mut Dfir<'h>,
|
||||
state: &'r mut DataflowState,
|
||||
) -> Context<'r, 'h> {
|
||||
let err_collector = state.get_err_collector();
|
||||
@@ -436,7 +436,7 @@ mod test {
|
||||
/// that is it only emit once, not multiple times
|
||||
#[test]
|
||||
fn test_render_constant() {
|
||||
let mut df = Hydroflow::new();
|
||||
let mut df = Dfir::new();
|
||||
let mut state = DataflowState::default();
|
||||
let mut ctx = harness_test_ctx(&mut df, &mut state);
|
||||
|
||||
@@ -473,7 +473,7 @@ mod test {
|
||||
/// a simple example to show how to use source and sink
|
||||
#[test]
|
||||
fn example_source_sink() {
|
||||
let mut df = Hydroflow::new();
|
||||
let mut df = Dfir::new();
|
||||
let (send_port, recv_port) = df.make_edge::<_, VecHandoff<i32>>("test_handoff");
|
||||
df.add_subgraph_source("test_handoff_source", send_port, move |_ctx, send| {
|
||||
for i in 0..10 {
|
||||
@@ -498,8 +498,8 @@ mod test {
|
||||
|
||||
#[test]
|
||||
fn test_tee_auto_schedule() {
|
||||
use hydroflow::scheduled::handoff::TeeingHandoff as Toff;
|
||||
let mut df = Hydroflow::new();
|
||||
use dfir_rs::scheduled::handoff::TeeingHandoff as Toff;
|
||||
let mut df = Dfir::new();
|
||||
let (send_port, recv_port) = df.make_edge::<_, Toff<i32>>("test_handoff");
|
||||
let source = df.add_subgraph_source("test_handoff_source", send_port, move |_ctx, send| {
|
||||
for i in 0..10 {
|
||||
|
||||
@@ -14,8 +14,8 @@
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use hydroflow::scheduled::graph_ext::GraphExt;
|
||||
use hydroflow::scheduled::port::{PortCtx, SEND};
|
||||
use dfir_rs::scheduled::graph_ext::GraphExt;
|
||||
use dfir_rs::scheduled::port::{PortCtx, SEND};
|
||||
use itertools::Itertools;
|
||||
use snafu::OptionExt;
|
||||
|
||||
@@ -256,7 +256,7 @@ fn eval_mfp_core(
|
||||
mod test {
|
||||
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use hydroflow::scheduled::graph::Hydroflow;
|
||||
use dfir_rs::scheduled::graph::Dfir;
|
||||
|
||||
use super::*;
|
||||
use crate::compute::render::test::{get_output_handle, harness_test_ctx, run_and_check};
|
||||
@@ -269,7 +269,7 @@ mod test {
|
||||
/// namely: if mfp operator can schedule a delete at the correct time
|
||||
#[test]
|
||||
fn test_render_mfp_with_temporal() {
|
||||
let mut df = Hydroflow::new();
|
||||
let mut df = Dfir::new();
|
||||
let mut state = DataflowState::default();
|
||||
let mut ctx = harness_test_ctx(&mut df, &mut state);
|
||||
|
||||
@@ -348,7 +348,7 @@ mod test {
|
||||
/// that is it filter the rows correctly
|
||||
#[test]
|
||||
fn test_render_mfp() {
|
||||
let mut df = Hydroflow::new();
|
||||
let mut df = Dfir::new();
|
||||
let mut state = DataflowState::default();
|
||||
let mut ctx = harness_test_ctx(&mut df, &mut state);
|
||||
|
||||
@@ -388,7 +388,7 @@ mod test {
|
||||
/// test if mfp operator can run multiple times within same tick
|
||||
#[test]
|
||||
fn test_render_mfp_multiple_times() {
|
||||
let mut df = Hydroflow::new();
|
||||
let mut df = Dfir::new();
|
||||
let mut state = DataflowState::default();
|
||||
let mut ctx = harness_test_ctx(&mut df, &mut state);
|
||||
|
||||
|
||||
@@ -22,7 +22,7 @@ use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::prelude::DataType;
|
||||
use datatypes::value::{ListValue, Value};
|
||||
use datatypes::vectors::{BooleanVector, NullVector};
|
||||
use hydroflow::scheduled::graph_ext::GraphExt;
|
||||
use dfir_rs::scheduled::graph_ext::GraphExt;
|
||||
use itertools::Itertools;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
|
||||
@@ -1212,7 +1212,7 @@ mod test {
|
||||
|
||||
use common_time::Timestamp;
|
||||
use datatypes::data_type::{ConcreteDataType, ConcreteDataType as CDT};
|
||||
use hydroflow::scheduled::graph::Hydroflow;
|
||||
use dfir_rs::scheduled::graph::Dfir;
|
||||
|
||||
use super::*;
|
||||
use crate::compute::render::test::{get_output_handle, harness_test_ctx, run_and_check};
|
||||
@@ -1228,7 +1228,7 @@ mod test {
|
||||
/// expected: sum(number), window_start, window_end
|
||||
#[test]
|
||||
fn test_tumble_group_by() {
|
||||
let mut df = Hydroflow::new();
|
||||
let mut df = Dfir::new();
|
||||
let mut state = DataflowState::default();
|
||||
let mut ctx = harness_test_ctx(&mut df, &mut state);
|
||||
const START: i64 = 1625097600000;
|
||||
@@ -1389,7 +1389,7 @@ mod test {
|
||||
/// select avg(number) from number;
|
||||
#[test]
|
||||
fn test_avg_eval() {
|
||||
let mut df = Hydroflow::new();
|
||||
let mut df = Dfir::new();
|
||||
let mut state = DataflowState::default();
|
||||
let mut ctx = harness_test_ctx(&mut df, &mut state);
|
||||
|
||||
@@ -1500,7 +1500,7 @@ mod test {
|
||||
/// | col | Int64 |
|
||||
#[test]
|
||||
fn test_basic_distinct() {
|
||||
let mut df = Hydroflow::new();
|
||||
let mut df = Dfir::new();
|
||||
let mut state = DataflowState::default();
|
||||
let mut ctx = harness_test_ctx(&mut df, &mut state);
|
||||
|
||||
@@ -1556,7 +1556,7 @@ mod test {
|
||||
/// | col | Int64 |
|
||||
#[test]
|
||||
fn test_basic_batch_reduce_accum() {
|
||||
let mut df = Hydroflow::new();
|
||||
let mut df = Dfir::new();
|
||||
let mut state = DataflowState::default();
|
||||
let now = state.current_time_ref();
|
||||
let mut ctx = harness_test_ctx(&mut df, &mut state);
|
||||
@@ -1662,7 +1662,7 @@ mod test {
|
||||
/// | col | Int64 |
|
||||
#[test]
|
||||
fn test_basic_reduce_accum() {
|
||||
let mut df = Hydroflow::new();
|
||||
let mut df = Dfir::new();
|
||||
let mut state = DataflowState::default();
|
||||
let mut ctx = harness_test_ctx(&mut df, &mut state);
|
||||
|
||||
@@ -1739,7 +1739,7 @@ mod test {
|
||||
/// this test include even more insert/delete case to cover all case for eval_distinct_core
|
||||
#[test]
|
||||
fn test_delete_reduce_distinct_accum() {
|
||||
let mut df = Hydroflow::new();
|
||||
let mut df = Dfir::new();
|
||||
let mut state = DataflowState::default();
|
||||
let mut ctx = harness_test_ctx(&mut df, &mut state);
|
||||
|
||||
@@ -1818,7 +1818,7 @@ mod test {
|
||||
/// this test include insert and delete which should cover all case for eval_distinct_core
|
||||
#[test]
|
||||
fn test_basic_reduce_distinct_accum() {
|
||||
let mut df = Hydroflow::new();
|
||||
let mut df = Dfir::new();
|
||||
let mut state = DataflowState::default();
|
||||
let mut ctx = harness_test_ctx(&mut df, &mut state);
|
||||
|
||||
@@ -1896,7 +1896,7 @@ mod test {
|
||||
/// | col | Int64 |
|
||||
#[test]
|
||||
fn test_composite_reduce_distinct_accum() {
|
||||
let mut df = Hydroflow::new();
|
||||
let mut df = Dfir::new();
|
||||
let mut state = DataflowState::default();
|
||||
let mut ctx = harness_test_ctx(&mut df, &mut state);
|
||||
|
||||
|
||||
@@ -17,7 +17,7 @@
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use common_telemetry::{debug, trace};
|
||||
use hydroflow::scheduled::graph_ext::GraphExt;
|
||||
use dfir_rs::scheduled::graph_ext::GraphExt;
|
||||
use itertools::Itertools;
|
||||
use snafu::OptionExt;
|
||||
use tokio::sync::broadcast::error::TryRecvError;
|
||||
|
||||
@@ -16,16 +16,16 @@ use std::cell::RefCell;
|
||||
use std::collections::{BTreeMap, VecDeque};
|
||||
use std::rc::Rc;
|
||||
|
||||
use dfir_rs::scheduled::graph::Dfir;
|
||||
use dfir_rs::scheduled::SubgraphId;
|
||||
use get_size2::GetSize;
|
||||
use hydroflow::scheduled::graph::Hydroflow;
|
||||
use hydroflow::scheduled::SubgraphId;
|
||||
|
||||
use crate::compute::types::ErrCollector;
|
||||
use crate::repr::{self, Timestamp};
|
||||
use crate::utils::{ArrangeHandler, Arrangement};
|
||||
|
||||
/// input/output of a dataflow
|
||||
/// One `ComputeState` manage the input/output/schedule of one `Hydroflow`
|
||||
/// One `ComputeState` manage the input/output/schedule of one `Dfir`
|
||||
#[derive(Debug, Default)]
|
||||
pub struct DataflowState {
|
||||
/// it is important to use a deque to maintain the order of subgraph here
|
||||
@@ -38,7 +38,7 @@ pub struct DataflowState {
|
||||
/// Which means it's also the current time in temporal filter to get current correct result
|
||||
as_of: Rc<RefCell<Timestamp>>,
|
||||
/// error collector local to this `ComputeState`,
|
||||
/// useful for distinguishing errors from different `Hydroflow`
|
||||
/// useful for distinguishing errors from different `Dfir`
|
||||
err_collector: ErrCollector,
|
||||
/// save all used arrange in this dataflow, since usually there is no delete operation
|
||||
/// we can just keep track of all used arrange and schedule subgraph when they need to be updated
|
||||
@@ -65,7 +65,7 @@ impl DataflowState {
|
||||
/// schedule all subgraph that need to run with time <= `as_of` and run_available()
|
||||
///
|
||||
/// return true if any subgraph actually executed
|
||||
pub fn run_available_with_schedule(&mut self, df: &mut Hydroflow) -> bool {
|
||||
pub fn run_available_with_schedule(&mut self, df: &mut Dfir) -> bool {
|
||||
// first split keys <= as_of into another map
|
||||
let mut before = self
|
||||
.schedule_subgraph
|
||||
|
||||
@@ -18,10 +18,10 @@ use std::rc::Rc;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_error::ext::ErrorExt;
|
||||
use hydroflow::scheduled::graph::Hydroflow;
|
||||
use hydroflow::scheduled::handoff::TeeingHandoff;
|
||||
use hydroflow::scheduled::port::RecvPort;
|
||||
use hydroflow::scheduled::SubgraphId;
|
||||
use dfir_rs::scheduled::graph::Dfir;
|
||||
use dfir_rs::scheduled::handoff::TeeingHandoff;
|
||||
use dfir_rs::scheduled::port::RecvPort;
|
||||
use dfir_rs::scheduled::SubgraphId;
|
||||
use itertools::Itertools;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
@@ -46,7 +46,7 @@ impl<T: 'static + Clone> Collection<T> {
|
||||
/// clone a collection, require a mutable reference to the hydroflow instance
|
||||
///
|
||||
/// Note: need to be the same hydroflow instance that this collection is created from
|
||||
pub fn clone(&self, df: &mut Hydroflow) -> Self {
|
||||
pub fn clone(&self, df: &mut Dfir) -> Self {
|
||||
Collection {
|
||||
stream: self.stream.tee(df),
|
||||
}
|
||||
@@ -151,7 +151,7 @@ impl<T: 'static> CollectionBundle<T> {
|
||||
}
|
||||
|
||||
impl<T: 'static + Clone> CollectionBundle<T> {
|
||||
pub fn clone(&self, df: &mut Hydroflow) -> Self {
|
||||
pub fn clone(&self, df: &mut Dfir) -> Self {
|
||||
Self {
|
||||
collection: self.collection.clone(df),
|
||||
arranged: self
|
||||
|
||||
@@ -21,7 +21,7 @@ use common_error::ext::BoxedError;
|
||||
use datatypes::prelude::{ConcreteDataType, DataType};
|
||||
use datatypes::value::Value;
|
||||
use datatypes::vectors::{BooleanVector, Helper, VectorRef};
|
||||
use hydroflow::lattices::cc_traits::Iter;
|
||||
use dfir_rs::lattices::cc_traits::Iter;
|
||||
use itertools::Itertools;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
|
||||
|
||||
@@ -581,7 +581,7 @@ impl FrontendInvoker {
|
||||
.start_timer();
|
||||
|
||||
self.inserter
|
||||
.handle_row_inserts(requests, ctx, &self.statement_executor, false)
|
||||
.handle_row_inserts(requests, ctx, &self.statement_executor, false, false)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(common_frontend::error::ExternalSnafu)
|
||||
|
||||
@@ -73,7 +73,7 @@ impl GrpcQueryHandler for Instance {
|
||||
let output = match request {
|
||||
Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?,
|
||||
Request::RowInserts(requests) => {
|
||||
self.handle_row_inserts(requests, ctx.clone(), false)
|
||||
self.handle_row_inserts(requests, ctx.clone(), false, false)
|
||||
.await?
|
||||
}
|
||||
Request::Deletes(requests) => self.handle_deletes(requests, ctx.clone()).await?,
|
||||
@@ -411,6 +411,7 @@ impl Instance {
|
||||
requests: RowInsertRequests,
|
||||
ctx: QueryContextRef,
|
||||
accommodate_existing_schema: bool,
|
||||
is_single_value: bool,
|
||||
) -> Result<Output> {
|
||||
self.inserter
|
||||
.handle_row_inserts(
|
||||
@@ -418,6 +419,7 @@ impl Instance {
|
||||
ctx,
|
||||
self.statement_executor.as_ref(),
|
||||
accommodate_existing_schema,
|
||||
is_single_value,
|
||||
)
|
||||
.await
|
||||
.context(TableOperationSnafu)
|
||||
@@ -430,7 +432,14 @@ impl Instance {
|
||||
ctx: QueryContextRef,
|
||||
) -> Result<Output> {
|
||||
self.inserter
|
||||
.handle_last_non_null_inserts(requests, ctx, self.statement_executor.as_ref(), true)
|
||||
.handle_last_non_null_inserts(
|
||||
requests,
|
||||
ctx,
|
||||
self.statement_executor.as_ref(),
|
||||
true,
|
||||
// Influx protocol may writes multiple fields (values).
|
||||
false,
|
||||
)
|
||||
.await
|
||||
.context(TableOperationSnafu)
|
||||
}
|
||||
|
||||
@@ -52,8 +52,9 @@ impl OpentsdbProtocolHandler for Instance {
|
||||
None
|
||||
};
|
||||
|
||||
// OpenTSDB is single value.
|
||||
let output = self
|
||||
.handle_row_inserts(requests, ctx, true)
|
||||
.handle_row_inserts(requests, ctx, true, true)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(servers::error::ExecuteGrpcQuerySnafu)?;
|
||||
|
||||
@@ -63,7 +63,7 @@ impl OpenTelemetryProtocolHandler for Instance {
|
||||
None
|
||||
};
|
||||
|
||||
self.handle_row_inserts(requests, ctx, false)
|
||||
self.handle_row_inserts(requests, ctx, false, false)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::ExecuteGrpcQuerySnafu)
|
||||
|
||||
@@ -195,7 +195,7 @@ impl PromStoreProtocolHandler for Instance {
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::ExecuteGrpcQuerySnafu)?
|
||||
} else {
|
||||
self.handle_row_inserts(request, ctx.clone(), true)
|
||||
self.handle_row_inserts(request, ctx.clone(), true, true)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::ExecuteGrpcQuerySnafu)?
|
||||
|
||||
@@ -206,7 +206,9 @@ impl DataRegion {
|
||||
) -> Result<AffectedRows> {
|
||||
match request.kind {
|
||||
AlterKind::SetRegionOptions { options: _ }
|
||||
| AlterKind::UnsetRegionOptions { keys: _ } => {
|
||||
| AlterKind::UnsetRegionOptions { keys: _ }
|
||||
| AlterKind::SetIndex { options: _ }
|
||||
| AlterKind::UnsetIndex { options: _ } => {
|
||||
let region_id = utils::to_data_region_id(region_id);
|
||||
self.mito
|
||||
.handle_request(region_id, RegionRequest::Alter(request))
|
||||
|
||||
@@ -57,7 +57,7 @@ use crate::region::options::MergeMode;
|
||||
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
|
||||
|
||||
/// Initial vector builder capacity.
|
||||
const INITIAL_BUILDER_CAPACITY: usize = 16;
|
||||
const INITIAL_BUILDER_CAPACITY: usize = 4;
|
||||
|
||||
/// Vector builder capacity.
|
||||
const BUILDER_CAPACITY: usize = 512;
|
||||
@@ -645,15 +645,19 @@ struct Series {
|
||||
}
|
||||
|
||||
impl Series {
|
||||
fn new(region_metadata: &RegionMetadataRef) -> Self {
|
||||
pub(crate) fn with_capacity(region_metadata: &RegionMetadataRef, builder_cap: usize) -> Self {
|
||||
Self {
|
||||
pk_cache: None,
|
||||
active: ValueBuilder::new(region_metadata, INITIAL_BUILDER_CAPACITY),
|
||||
active: ValueBuilder::new(region_metadata, builder_cap),
|
||||
frozen: vec![],
|
||||
region_metadata: region_metadata.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn new(region_metadata: &RegionMetadataRef) -> Self {
|
||||
Self::with_capacity(region_metadata, INITIAL_BUILDER_CAPACITY)
|
||||
}
|
||||
|
||||
/// Pushes a row of values into Series. Return the size of values.
|
||||
fn push<'a>(
|
||||
&mut self,
|
||||
|
||||
@@ -147,7 +147,7 @@ impl Inserter {
|
||||
statement_executor: &StatementExecutor,
|
||||
) -> Result<Output> {
|
||||
let row_inserts = ColumnToRow::convert(requests)?;
|
||||
self.handle_row_inserts(row_inserts, ctx, statement_executor, false)
|
||||
self.handle_row_inserts(row_inserts, ctx, statement_executor, false, false)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -158,6 +158,7 @@ impl Inserter {
|
||||
ctx: QueryContextRef,
|
||||
statement_executor: &StatementExecutor,
|
||||
accommodate_existing_schema: bool,
|
||||
is_single_value: bool,
|
||||
) -> Result<Output> {
|
||||
preprocess_row_insert_requests(&mut requests.inserts)?;
|
||||
self.handle_row_inserts_with_create_type(
|
||||
@@ -166,6 +167,7 @@ impl Inserter {
|
||||
statement_executor,
|
||||
AutoCreateTableType::Physical,
|
||||
accommodate_existing_schema,
|
||||
is_single_value,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -183,6 +185,7 @@ impl Inserter {
|
||||
statement_executor,
|
||||
AutoCreateTableType::Log,
|
||||
false,
|
||||
false,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -199,6 +202,7 @@ impl Inserter {
|
||||
statement_executor,
|
||||
AutoCreateTableType::Trace,
|
||||
false,
|
||||
false,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -210,6 +214,7 @@ impl Inserter {
|
||||
ctx: QueryContextRef,
|
||||
statement_executor: &StatementExecutor,
|
||||
accommodate_existing_schema: bool,
|
||||
is_single_value: bool,
|
||||
) -> Result<Output> {
|
||||
self.handle_row_inserts_with_create_type(
|
||||
requests,
|
||||
@@ -217,6 +222,7 @@ impl Inserter {
|
||||
statement_executor,
|
||||
AutoCreateTableType::LastNonNull,
|
||||
accommodate_existing_schema,
|
||||
is_single_value,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -229,6 +235,7 @@ impl Inserter {
|
||||
statement_executor: &StatementExecutor,
|
||||
create_type: AutoCreateTableType,
|
||||
accommodate_existing_schema: bool,
|
||||
is_single_value: bool,
|
||||
) -> Result<Output> {
|
||||
// remove empty requests
|
||||
requests.inserts.retain(|req| {
|
||||
@@ -249,6 +256,7 @@ impl Inserter {
|
||||
create_type,
|
||||
statement_executor,
|
||||
accommodate_existing_schema,
|
||||
is_single_value,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -299,6 +307,7 @@ impl Inserter {
|
||||
AutoCreateTableType::Logical(physical_table.to_string()),
|
||||
statement_executor,
|
||||
true,
|
||||
true,
|
||||
)
|
||||
.await?;
|
||||
let name_to_info = table_infos
|
||||
@@ -464,9 +473,10 @@ impl Inserter {
|
||||
/// This mapping is used in the conversion of RowToRegion.
|
||||
///
|
||||
/// `accommodate_existing_schema` is used to determine if the existing schema should override the new schema.
|
||||
/// It only works for TIME_INDEX and VALUE columns. This is for the case where the user creates a table with
|
||||
/// It only works for TIME_INDEX and single VALUE columns. This is for the case where the user creates a table with
|
||||
/// custom schema, and then inserts data with endpoints that have default schema setting, like prometheus
|
||||
/// remote write. This will modify the `RowInsertRequests` in place.
|
||||
/// `is_single_value` indicates whether the default schema only contains single value column so we can accommodate it.
|
||||
async fn create_or_alter_tables_on_demand(
|
||||
&self,
|
||||
requests: &mut RowInsertRequests,
|
||||
@@ -474,6 +484,7 @@ impl Inserter {
|
||||
auto_create_table_type: AutoCreateTableType,
|
||||
statement_executor: &StatementExecutor,
|
||||
accommodate_existing_schema: bool,
|
||||
is_single_value: bool,
|
||||
) -> Result<CreateAlterTableResult> {
|
||||
let _timer = crate::metrics::CREATE_ALTER_ON_DEMAND
|
||||
.with_label_values(&[auto_create_table_type.as_str()])
|
||||
@@ -537,6 +548,7 @@ impl Inserter {
|
||||
&table,
|
||||
ctx,
|
||||
accommodate_existing_schema,
|
||||
is_single_value,
|
||||
)? {
|
||||
alter_tables.push(alter_expr);
|
||||
}
|
||||
@@ -811,12 +823,15 @@ impl Inserter {
|
||||
/// When `accommodate_existing_schema` is true, it may modify the input `req` to
|
||||
/// accommodate it with existing schema. See [`create_or_alter_tables_on_demand`](Self::create_or_alter_tables_on_demand)
|
||||
/// for more details.
|
||||
/// When `accommodate_existing_schema` is true and `is_single_value` is true, it also consider fields when modifying the
|
||||
/// input `req`.
|
||||
fn get_alter_table_expr_on_demand(
|
||||
&self,
|
||||
req: &mut RowInsertRequest,
|
||||
table: &TableRef,
|
||||
ctx: &QueryContextRef,
|
||||
accommodate_existing_schema: bool,
|
||||
is_single_value: bool,
|
||||
) -> Result<Option<AlterTableExpr>> {
|
||||
let catalog_name = ctx.current_catalog();
|
||||
let schema_name = ctx.current_schema();
|
||||
@@ -834,18 +849,20 @@ impl Inserter {
|
||||
let table_schema = table.schema();
|
||||
// Find timestamp column name
|
||||
let ts_col_name = table_schema.timestamp_column().map(|c| c.name.clone());
|
||||
// Find field column name if there is only one
|
||||
// Find field column name if there is only one and `is_single_value` is true.
|
||||
let mut field_col_name = None;
|
||||
let mut multiple_field_cols = false;
|
||||
table.field_columns().for_each(|col| {
|
||||
if field_col_name.is_none() {
|
||||
field_col_name = Some(col.name.clone());
|
||||
} else {
|
||||
multiple_field_cols = true;
|
||||
if is_single_value {
|
||||
let mut multiple_field_cols = false;
|
||||
table.field_columns().for_each(|col| {
|
||||
if field_col_name.is_none() {
|
||||
field_col_name = Some(col.name.clone());
|
||||
} else {
|
||||
multiple_field_cols = true;
|
||||
}
|
||||
});
|
||||
if multiple_field_cols {
|
||||
field_col_name = None;
|
||||
}
|
||||
});
|
||||
if multiple_field_cols {
|
||||
field_col_name = None;
|
||||
}
|
||||
|
||||
// Update column name in request schema for Timestamp/Field columns
|
||||
@@ -871,11 +888,11 @@ impl Inserter {
|
||||
}
|
||||
}
|
||||
|
||||
// Remove from add_columns any column that is timestamp or field (if there is only one field column)
|
||||
// Only keep columns that are tags or non-single field.
|
||||
add_columns.add_columns.retain(|col| {
|
||||
let def = col.column_def.as_ref().unwrap();
|
||||
def.semantic_type != SemanticType::Timestamp as i32
|
||||
&& (def.semantic_type != SemanticType::Field as i32 && field_col_name.is_some())
|
||||
def.semantic_type == SemanticType::Tag as i32
|
||||
|| (def.semantic_type == SemanticType::Field as i32 && field_col_name.is_none())
|
||||
});
|
||||
|
||||
if add_columns.add_columns.is_empty() {
|
||||
@@ -1227,7 +1244,7 @@ mod tests {
|
||||
)),
|
||||
);
|
||||
let alter_expr = inserter
|
||||
.get_alter_table_expr_on_demand(&mut req, &table, &ctx, true)
|
||||
.get_alter_table_expr_on_demand(&mut req, &table, &ctx, true, true)
|
||||
.unwrap();
|
||||
assert!(alter_expr.is_none());
|
||||
|
||||
|
||||
@@ -250,6 +250,7 @@ impl PipelineTable {
|
||||
Self::query_ctx(&table_info),
|
||||
&self.statement_executor,
|
||||
false,
|
||||
false,
|
||||
)
|
||||
.await
|
||||
.context(InsertPipelineSnafu)?;
|
||||
|
||||
@@ -458,6 +458,7 @@ pub async fn setup_test_http_app_with_frontend_and_user_provider(
|
||||
))
|
||||
.with_log_ingest_handler(instance.fe_instance().clone(), None, None)
|
||||
.with_logs_handler(instance.fe_instance().clone())
|
||||
.with_influxdb_handler(instance.fe_instance().clone())
|
||||
.with_otlp_handler(instance.fe_instance().clone())
|
||||
.with_jaeger_handler(instance.fe_instance().clone())
|
||||
.with_greptime_config_options(instance.opts.to_toml().unwrap());
|
||||
|
||||
@@ -113,6 +113,8 @@ macro_rules! http_tests {
|
||||
test_log_query,
|
||||
test_jaeger_query_api,
|
||||
test_jaeger_query_api_for_trace_v1,
|
||||
|
||||
test_influxdb_write,
|
||||
);
|
||||
)*
|
||||
};
|
||||
@@ -3937,6 +3939,52 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) {
|
||||
guard.remove_all().await;
|
||||
}
|
||||
|
||||
pub async fn test_influxdb_write(store_type: StorageType) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let (app, mut guard) =
|
||||
setup_test_http_app_with_frontend(store_type, "test_influxdb_write").await;
|
||||
|
||||
let client = TestClient::new(app).await;
|
||||
|
||||
// Only write field cpu.
|
||||
let result = client
|
||||
.post("/v1/influxdb/write?db=public&p=greptime&u=greptime")
|
||||
.body("test_alter,host=host1 cpu=1.2 1664370459457010101")
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(result.status(), 204);
|
||||
assert!(result.text().await.is_empty());
|
||||
|
||||
// Only write field mem.
|
||||
let result = client
|
||||
.post("/v1/influxdb/write?db=public&p=greptime&u=greptime")
|
||||
.body("test_alter,host=host1 mem=10240.0 1664370469457010101")
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(result.status(), 204);
|
||||
assert!(result.text().await.is_empty());
|
||||
|
||||
// Write field cpu & mem.
|
||||
let result = client
|
||||
.post("/v1/influxdb/write?db=public&p=greptime&u=greptime")
|
||||
.body("test_alter,host=host1 cpu=3.2,mem=20480.0 1664370479457010101")
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(result.status(), 204);
|
||||
assert!(result.text().await.is_empty());
|
||||
|
||||
let expected = r#"[["host1",1.2,1664370459457010101,null],["host1",null,1664370469457010101,10240.0],["host1",3.2,1664370479457010101,20480.0]]"#;
|
||||
validate_data(
|
||||
"test_influxdb_write",
|
||||
&client,
|
||||
"select * from test_alter order by ts;",
|
||||
expected,
|
||||
)
|
||||
.await;
|
||||
|
||||
guard.remove_all().await;
|
||||
}
|
||||
|
||||
async fn validate_data(test_name: &str, client: &TestClient, sql: &str, expected: &str) {
|
||||
let res = client
|
||||
.get(format!("/v1/sql?sql={sql}").as_str())
|
||||
|
||||
109
tests/cases/standalone/common/alter/alter_physical_table.result
Normal file
109
tests/cases/standalone/common/alter/alter_physical_table.result
Normal file
@@ -0,0 +1,109 @@
|
||||
CREATE TABLE phy (ts timestamp time index, val double, a_label STRING, PRIMARY KEY(a_label)) engine=metric with ("physical_metric_table" = "");
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
ALTER TABLE phy ADD COLUMN b_label STRING PRIMARY KEY;
|
||||
|
||||
Error: 1001(Unsupported), Alter request to physical region is forbidden
|
||||
|
||||
ALTER TABLE phy DROP COLUMN a_label;
|
||||
|
||||
Error: 1004(InvalidArguments), Not allowed to remove index column a_label from table phy
|
||||
|
||||
ALTER TABLE phy SET 'ttl'='1d';
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
SHOW CREATE TABLE phy;
|
||||
|
||||
+-------+------------------------------------+
|
||||
| Table | Create Table |
|
||||
+-------+------------------------------------+
|
||||
| phy | CREATE TABLE IF NOT EXISTS "phy" ( |
|
||||
| | "ts" TIMESTAMP(3) NOT NULL, |
|
||||
| | "val" DOUBLE NULL, |
|
||||
| | "a_label" STRING NULL, |
|
||||
| | TIME INDEX ("ts"), |
|
||||
| | PRIMARY KEY ("a_label") |
|
||||
| | ) |
|
||||
| | |
|
||||
| | ENGINE=metric |
|
||||
| | WITH( |
|
||||
| | physical_metric_table = '', |
|
||||
| | ttl = '1day' |
|
||||
| | ) |
|
||||
+-------+------------------------------------+
|
||||
|
||||
ALTER TABLE phy UNSET 'ttl';
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
SHOW CREATE TABLE phy;
|
||||
|
||||
+-------+------------------------------------+
|
||||
| Table | Create Table |
|
||||
+-------+------------------------------------+
|
||||
| phy | CREATE TABLE IF NOT EXISTS "phy" ( |
|
||||
| | "ts" TIMESTAMP(3) NOT NULL, |
|
||||
| | "val" DOUBLE NULL, |
|
||||
| | "a_label" STRING NULL, |
|
||||
| | TIME INDEX ("ts"), |
|
||||
| | PRIMARY KEY ("a_label") |
|
||||
| | ) |
|
||||
| | |
|
||||
| | ENGINE=metric |
|
||||
| | WITH( |
|
||||
| | physical_metric_table = '' |
|
||||
| | ) |
|
||||
+-------+------------------------------------+
|
||||
|
||||
ALTER TABLE phy MODIFY COLUMN a_label SET INVERTED INDEX;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
SHOW CREATE TABLE phy;
|
||||
|
||||
+-------+-----------------------------------------+
|
||||
| Table | Create Table |
|
||||
+-------+-----------------------------------------+
|
||||
| phy | CREATE TABLE IF NOT EXISTS "phy" ( |
|
||||
| | "ts" TIMESTAMP(3) NOT NULL, |
|
||||
| | "val" DOUBLE NULL, |
|
||||
| | "a_label" STRING NULL INVERTED INDEX, |
|
||||
| | TIME INDEX ("ts"), |
|
||||
| | PRIMARY KEY ("a_label") |
|
||||
| | ) |
|
||||
| | |
|
||||
| | ENGINE=metric |
|
||||
| | WITH( |
|
||||
| | physical_metric_table = '' |
|
||||
| | ) |
|
||||
+-------+-----------------------------------------+
|
||||
|
||||
ALTER TABLE phy MODIFY COLUMN a_label UNSET INVERTED INDEX;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
SHOW CREATE TABLE phy;
|
||||
|
||||
+-------+------------------------------------+
|
||||
| Table | Create Table |
|
||||
+-------+------------------------------------+
|
||||
| phy | CREATE TABLE IF NOT EXISTS "phy" ( |
|
||||
| | "ts" TIMESTAMP(3) NOT NULL, |
|
||||
| | "val" DOUBLE NULL, |
|
||||
| | "a_label" STRING NULL, |
|
||||
| | TIME INDEX ("ts"), |
|
||||
| | PRIMARY KEY ("a_label") |
|
||||
| | ) |
|
||||
| | |
|
||||
| | ENGINE=metric |
|
||||
| | WITH( |
|
||||
| | physical_metric_table = '' |
|
||||
| | ) |
|
||||
+-------+------------------------------------+
|
||||
|
||||
DROP TABLE phy;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
23
tests/cases/standalone/common/alter/alter_physical_table.sql
Normal file
23
tests/cases/standalone/common/alter/alter_physical_table.sql
Normal file
@@ -0,0 +1,23 @@
|
||||
CREATE TABLE phy (ts timestamp time index, val double, a_label STRING, PRIMARY KEY(a_label)) engine=metric with ("physical_metric_table" = "");
|
||||
|
||||
ALTER TABLE phy ADD COLUMN b_label STRING PRIMARY KEY;
|
||||
|
||||
ALTER TABLE phy DROP COLUMN a_label;
|
||||
|
||||
ALTER TABLE phy SET 'ttl'='1d';
|
||||
|
||||
SHOW CREATE TABLE phy;
|
||||
|
||||
ALTER TABLE phy UNSET 'ttl';
|
||||
|
||||
SHOW CREATE TABLE phy;
|
||||
|
||||
ALTER TABLE phy MODIFY COLUMN a_label SET INVERTED INDEX;
|
||||
|
||||
SHOW CREATE TABLE phy;
|
||||
|
||||
ALTER TABLE phy MODIFY COLUMN a_label UNSET INVERTED INDEX;
|
||||
|
||||
SHOW CREATE TABLE phy;
|
||||
|
||||
DROP TABLE phy;
|
||||
Reference in New Issue
Block a user