mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-03 20:02:54 +00:00
Compare commits
9 Commits
v0.14.3
...
release/v0
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
66a784b58a | ||
|
|
ce95e051ff | ||
|
|
de08ddafc8 | ||
|
|
e46efb3d6c | ||
|
|
34af9580e0 | ||
|
|
b19d23d665 | ||
|
|
209f15dd51 | ||
|
|
0829fb204c | ||
|
|
c8e470e8ed |
801
Cargo.lock
generated
801
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -68,7 +68,7 @@ members = [
|
||||
resolver = "2"
|
||||
|
||||
[workspace.package]
|
||||
version = "0.14.3"
|
||||
version = "0.14.4"
|
||||
edition = "2021"
|
||||
license = "Apache-2.0"
|
||||
|
||||
|
||||
@@ -875,7 +875,10 @@ impl TableMetadataManager {
|
||||
) -> Result<()> {
|
||||
let table_metadata_keys =
|
||||
self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
|
||||
self.tombstone_manager.delete(table_metadata_keys).await
|
||||
self.tombstone_manager
|
||||
.delete(table_metadata_keys)
|
||||
.await
|
||||
.map(|_| ())
|
||||
}
|
||||
|
||||
/// Restores metadata for table.
|
||||
|
||||
@@ -14,19 +14,23 @@
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use common_telemetry::debug;
|
||||
use snafu::ensure;
|
||||
|
||||
use crate::error::{self, Result};
|
||||
use crate::key::txn_helper::TxnOpGetResponseSet;
|
||||
use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp};
|
||||
use crate::kv_backend::KvBackendRef;
|
||||
use crate::rpc::store::BatchGetRequest;
|
||||
use crate::rpc::store::{BatchDeleteRequest, BatchGetRequest};
|
||||
|
||||
/// [TombstoneManager] provides the ability to:
|
||||
/// - logically delete values
|
||||
/// - restore the deleted values
|
||||
pub(crate) struct TombstoneManager {
|
||||
kv_backend: KvBackendRef,
|
||||
// Only used for testing.
|
||||
#[cfg(test)]
|
||||
max_txn_ops: Option<usize>,
|
||||
}
|
||||
|
||||
const TOMBSTONE_PREFIX: &str = "__tombstone/";
|
||||
@@ -38,7 +42,16 @@ fn to_tombstone(key: &[u8]) -> Vec<u8> {
|
||||
impl TombstoneManager {
|
||||
/// Returns [TombstoneManager].
|
||||
pub fn new(kv_backend: KvBackendRef) -> Self {
|
||||
Self { kv_backend }
|
||||
Self {
|
||||
kv_backend,
|
||||
#[cfg(test)]
|
||||
max_txn_ops: None,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn set_max_txn_ops(&mut self, max_txn_ops: usize) {
|
||||
self.max_txn_ops = Some(max_txn_ops);
|
||||
}
|
||||
|
||||
/// Moves value to `dest_key`.
|
||||
@@ -67,11 +80,15 @@ impl TombstoneManager {
|
||||
(txn, TxnOpGetResponseSet::filter(src_key))
|
||||
}
|
||||
|
||||
async fn move_values_inner(&self, keys: &[Vec<u8>], dest_keys: &[Vec<u8>]) -> Result<()> {
|
||||
async fn move_values_inner(&self, keys: &[Vec<u8>], dest_keys: &[Vec<u8>]) -> Result<usize> {
|
||||
ensure!(
|
||||
keys.len() == dest_keys.len(),
|
||||
error::UnexpectedSnafu {
|
||||
err_msg: "The length of keys does not match the length of dest_keys."
|
||||
err_msg: format!(
|
||||
"The length of keys({}) does not match the length of dest_keys({}).",
|
||||
keys.len(),
|
||||
dest_keys.len()
|
||||
),
|
||||
}
|
||||
);
|
||||
// The key -> dest key mapping.
|
||||
@@ -102,7 +119,7 @@ impl TombstoneManager {
|
||||
.unzip();
|
||||
let mut resp = self.kv_backend.txn(Txn::merge_all(txns)).await?;
|
||||
if resp.succeeded {
|
||||
return Ok(());
|
||||
return Ok(keys.len());
|
||||
}
|
||||
let mut set = TxnOpGetResponseSet::from(&mut resp.responses);
|
||||
// Updates results.
|
||||
@@ -124,17 +141,45 @@ impl TombstoneManager {
|
||||
.fail()
|
||||
}
|
||||
|
||||
/// Moves values to `dest_key`.
|
||||
async fn move_values(&self, keys: Vec<Vec<u8>>, dest_keys: Vec<Vec<u8>>) -> Result<()> {
|
||||
let chunk_size = self.kv_backend.max_txn_ops() / 2;
|
||||
if keys.len() > chunk_size {
|
||||
let keys_chunks = keys.chunks(chunk_size).collect::<Vec<_>>();
|
||||
let dest_keys_chunks = keys.chunks(chunk_size).collect::<Vec<_>>();
|
||||
for (keys, dest_keys) in keys_chunks.into_iter().zip(dest_keys_chunks) {
|
||||
self.move_values_inner(keys, dest_keys).await?;
|
||||
}
|
||||
fn max_txn_ops(&self) -> usize {
|
||||
#[cfg(test)]
|
||||
if let Some(max_txn_ops) = self.max_txn_ops {
|
||||
return max_txn_ops;
|
||||
}
|
||||
self.kv_backend.max_txn_ops()
|
||||
}
|
||||
|
||||
Ok(())
|
||||
/// Moves values to `dest_key`.
|
||||
///
|
||||
/// Returns the number of keys that were moved.
|
||||
async fn move_values(&self, keys: Vec<Vec<u8>>, dest_keys: Vec<Vec<u8>>) -> Result<usize> {
|
||||
ensure!(
|
||||
keys.len() == dest_keys.len(),
|
||||
error::UnexpectedSnafu {
|
||||
err_msg: format!(
|
||||
"The length of keys({}) does not match the length of dest_keys({}).",
|
||||
keys.len(),
|
||||
dest_keys.len()
|
||||
),
|
||||
}
|
||||
);
|
||||
if keys.is_empty() {
|
||||
return Ok(0);
|
||||
}
|
||||
let chunk_size = self.max_txn_ops() / 2;
|
||||
if keys.len() > chunk_size {
|
||||
debug!(
|
||||
"Moving values with multiple chunks, keys len: {}, chunk_size: {}",
|
||||
keys.len(),
|
||||
chunk_size
|
||||
);
|
||||
let mut moved_keys = 0;
|
||||
let keys_chunks = keys.chunks(chunk_size).collect::<Vec<_>>();
|
||||
let dest_keys_chunks = dest_keys.chunks(chunk_size).collect::<Vec<_>>();
|
||||
for (keys, dest_keys) in keys_chunks.into_iter().zip(dest_keys_chunks) {
|
||||
moved_keys += self.move_values_inner(keys, dest_keys).await?;
|
||||
}
|
||||
Ok(moved_keys)
|
||||
} else {
|
||||
self.move_values_inner(&keys, &dest_keys).await
|
||||
}
|
||||
@@ -154,7 +199,7 @@ impl TombstoneManager {
|
||||
})
|
||||
.unzip();
|
||||
|
||||
self.move_values(keys, dest_keys).await
|
||||
self.move_values(keys, dest_keys).await.map(|_| ())
|
||||
}
|
||||
|
||||
/// Restores tombstones for keys.
|
||||
@@ -171,20 +216,22 @@ impl TombstoneManager {
|
||||
})
|
||||
.unzip();
|
||||
|
||||
self.move_values(keys, dest_keys).await
|
||||
self.move_values(keys, dest_keys).await.map(|_| ())
|
||||
}
|
||||
|
||||
/// Deletes tombstones values for the specified `keys`.
|
||||
pub(crate) async fn delete(&self, keys: Vec<Vec<u8>>) -> Result<()> {
|
||||
let operations = keys
|
||||
.iter()
|
||||
.map(|key| TxnOp::Delete(to_tombstone(key)))
|
||||
.collect::<Vec<_>>();
|
||||
///
|
||||
/// Returns the number of keys that were deleted.
|
||||
pub async fn delete(&self, keys: Vec<Vec<u8>>) -> Result<usize> {
|
||||
let keys = keys.iter().map(|key| to_tombstone(key)).collect::<Vec<_>>();
|
||||
|
||||
let txn = Txn::new().and_then(operations);
|
||||
// Always success.
|
||||
let _ = self.kv_backend.txn(txn).await?;
|
||||
Ok(())
|
||||
let num_keys = keys.len();
|
||||
let _ = self
|
||||
.kv_backend
|
||||
.batch_delete(BatchDeleteRequest::new().with_keys(keys))
|
||||
.await?;
|
||||
|
||||
Ok(num_keys)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -373,16 +420,73 @@ mod tests {
|
||||
.into_iter()
|
||||
.map(|kv| (kv.key, kv.dest_key))
|
||||
.unzip();
|
||||
tombstone_manager
|
||||
let moved_keys = tombstone_manager
|
||||
.move_values(keys.clone(), dest_keys.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(kvs.len(), moved_keys);
|
||||
check_moved_values(kv_backend.clone(), &move_values).await;
|
||||
// Moves again
|
||||
tombstone_manager
|
||||
let moved_keys = tombstone_manager
|
||||
.move_values(keys.clone(), dest_keys.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(0, moved_keys);
|
||||
check_moved_values(kv_backend.clone(), &move_values).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_move_values_with_max_txn_ops() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let kv_backend = Arc::new(MemoryKvBackend::default());
|
||||
let mut tombstone_manager = TombstoneManager::new(kv_backend.clone());
|
||||
tombstone_manager.set_max_txn_ops(4);
|
||||
let kvs = HashMap::from([
|
||||
(b"bar".to_vec(), b"baz".to_vec()),
|
||||
(b"foo".to_vec(), b"hi".to_vec()),
|
||||
(b"baz".to_vec(), b"hello".to_vec()),
|
||||
(b"qux".to_vec(), b"world".to_vec()),
|
||||
(b"quux".to_vec(), b"world".to_vec()),
|
||||
(b"quuux".to_vec(), b"world".to_vec()),
|
||||
(b"quuuux".to_vec(), b"world".to_vec()),
|
||||
(b"quuuuux".to_vec(), b"world".to_vec()),
|
||||
(b"quuuuuux".to_vec(), b"world".to_vec()),
|
||||
]);
|
||||
for (key, value) in &kvs {
|
||||
kv_backend
|
||||
.put(
|
||||
PutRequest::new()
|
||||
.with_key(key.clone())
|
||||
.with_value(value.clone()),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
let move_values = kvs
|
||||
.iter()
|
||||
.map(|(key, value)| MoveValue {
|
||||
key: key.clone(),
|
||||
dest_key: to_tombstone(key),
|
||||
value: value.clone(),
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
let (keys, dest_keys): (Vec<_>, Vec<_>) = move_values
|
||||
.clone()
|
||||
.into_iter()
|
||||
.map(|kv| (kv.key, kv.dest_key))
|
||||
.unzip();
|
||||
let moved_keys = tombstone_manager
|
||||
.move_values(keys.clone(), dest_keys.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(kvs.len(), moved_keys);
|
||||
check_moved_values(kv_backend.clone(), &move_values).await;
|
||||
// Moves again
|
||||
let moved_keys = tombstone_manager
|
||||
.move_values(keys.clone(), dest_keys.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(0, moved_keys);
|
||||
check_moved_values(kv_backend.clone(), &move_values).await;
|
||||
}
|
||||
|
||||
@@ -420,17 +524,19 @@ mod tests {
|
||||
.unzip();
|
||||
keys.push(b"non-exists".to_vec());
|
||||
dest_keys.push(b"hi/non-exists".to_vec());
|
||||
tombstone_manager
|
||||
let moved_keys = tombstone_manager
|
||||
.move_values(keys.clone(), dest_keys.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
check_moved_values(kv_backend.clone(), &move_values).await;
|
||||
assert_eq!(3, moved_keys);
|
||||
// Moves again
|
||||
tombstone_manager
|
||||
let moved_keys = tombstone_manager
|
||||
.move_values(keys.clone(), dest_keys.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
check_moved_values(kv_backend.clone(), &move_values).await;
|
||||
assert_eq!(0, moved_keys);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -471,10 +577,11 @@ mod tests {
|
||||
.into_iter()
|
||||
.map(|kv| (kv.key, kv.dest_key))
|
||||
.unzip();
|
||||
tombstone_manager
|
||||
let moved_keys = tombstone_manager
|
||||
.move_values(keys, dest_keys)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(kvs.len(), moved_keys);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -552,4 +659,24 @@ mod tests {
|
||||
.unwrap();
|
||||
check_moved_values(kv_backend.clone(), &move_values).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_move_values_with_different_lengths() {
|
||||
let kv_backend = Arc::new(MemoryKvBackend::default());
|
||||
let tombstone_manager = TombstoneManager::new(kv_backend.clone());
|
||||
|
||||
let keys = vec![b"bar".to_vec(), b"foo".to_vec()];
|
||||
let dest_keys = vec![b"bar".to_vec(), b"foo".to_vec(), b"baz".to_vec()];
|
||||
|
||||
let err = tombstone_manager
|
||||
.move_values(keys, dest_keys)
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert!(err
|
||||
.to_string()
|
||||
.contains("The length of keys(2) does not match the length of dest_keys(3)."),);
|
||||
|
||||
let moved_keys = tombstone_manager.move_values(vec![], vec![]).await.unwrap();
|
||||
assert_eq!(0, moved_keys);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,6 +16,7 @@ async-trait.workspace = true
|
||||
bytes.workspace = true
|
||||
cache.workspace = true
|
||||
catalog.workspace = true
|
||||
chrono.workspace = true
|
||||
client.workspace = true
|
||||
common-base.workspace = true
|
||||
common-config.workspace = true
|
||||
@@ -39,16 +40,13 @@ datafusion-expr.workspace = true
|
||||
datafusion-physical-expr.workspace = true
|
||||
datafusion-substrait.workspace = true
|
||||
datatypes.workspace = true
|
||||
dfir_rs = { version = "0.13.0", default-features = false }
|
||||
enum-as-inner = "0.6.0"
|
||||
enum_dispatch = "0.3"
|
||||
futures.workspace = true
|
||||
get-size2 = "0.1.2"
|
||||
greptime-proto.workspace = true
|
||||
# This fork of hydroflow is simply for keeping our dependency in our org, and pin the version
|
||||
# otherwise it is the same with upstream repo
|
||||
chrono.workspace = true
|
||||
http.workspace = true
|
||||
hydroflow = { git = "https://github.com/GreptimeTeam/hydroflow.git", branch = "main" }
|
||||
itertools.workspace = true
|
||||
lazy_static.workspace = true
|
||||
meta-client.workspace = true
|
||||
|
||||
@@ -897,7 +897,7 @@ impl StreamingEngine {
|
||||
let rows_send = self.run_available(true).await?;
|
||||
let row = self.send_writeback_requests().await?;
|
||||
debug!(
|
||||
"Done to flush flow_id={:?} with {} input rows flushed, {} rows sended and {} output rows flushed",
|
||||
"Done to flush flow_id={:?} with {} input rows flushed, {} rows sent and {} output rows flushed",
|
||||
flow_id, flushed_input_rows, rows_send, row
|
||||
);
|
||||
Ok(row)
|
||||
|
||||
@@ -19,8 +19,8 @@ use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_telemetry::info;
|
||||
use dfir_rs::scheduled::graph::Dfir;
|
||||
use enum_as_inner::EnumAsInner;
|
||||
use hydroflow::scheduled::graph::Hydroflow;
|
||||
use snafu::ensure;
|
||||
use tokio::sync::{broadcast, mpsc, oneshot, Mutex};
|
||||
|
||||
@@ -49,9 +49,9 @@ pub fn create_worker<'a>() -> (WorkerHandle, Worker<'a>) {
|
||||
(worker_handle, worker)
|
||||
}
|
||||
|
||||
/// ActiveDataflowState is a wrapper around `Hydroflow` and `DataflowState`
|
||||
/// ActiveDataflowState is a wrapper around `Dfir` and `DataflowState`
|
||||
pub(crate) struct ActiveDataflowState<'subgraph> {
|
||||
df: Hydroflow<'subgraph>,
|
||||
df: Dfir<'subgraph>,
|
||||
state: DataflowState,
|
||||
err_collector: ErrCollector,
|
||||
}
|
||||
@@ -59,7 +59,7 @@ pub(crate) struct ActiveDataflowState<'subgraph> {
|
||||
impl std::fmt::Debug for ActiveDataflowState<'_> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("ActiveDataflowState")
|
||||
.field("df", &"<Hydroflow>")
|
||||
.field("df", &"<Dfir>")
|
||||
.field("state", &self.state)
|
||||
.field("err_collector", &self.err_collector)
|
||||
.finish()
|
||||
@@ -69,7 +69,7 @@ impl std::fmt::Debug for ActiveDataflowState<'_> {
|
||||
impl Default for ActiveDataflowState<'_> {
|
||||
fn default() -> Self {
|
||||
ActiveDataflowState {
|
||||
df: Hydroflow::new(),
|
||||
df: Dfir::new(),
|
||||
state: DataflowState::default(),
|
||||
err_collector: ErrCollector::default(),
|
||||
}
|
||||
|
||||
@@ -304,7 +304,7 @@ impl BatchingEngine {
|
||||
})
|
||||
.transpose()?;
|
||||
|
||||
info!(
|
||||
debug!(
|
||||
"Flow id={}, found time window expr={}",
|
||||
flow_id,
|
||||
phy_expr
|
||||
|
||||
@@ -179,7 +179,7 @@ impl BatchingTask {
|
||||
frontend_client: &Arc<FrontendClient>,
|
||||
) -> Result<Option<(u32, Duration)>, Error> {
|
||||
if let Some(new_query) = self.gen_insert_plan(engine).await? {
|
||||
debug!("Generate new query: {:#?}", new_query);
|
||||
debug!("Generate new query: {}", new_query);
|
||||
self.execute_logical_plan(frontend_client, &new_query).await
|
||||
} else {
|
||||
debug!("Generate no query");
|
||||
|
||||
@@ -138,9 +138,12 @@ impl TreeNodeVisitor<'_> for FindGroupByFinalName {
|
||||
fn f_down(&mut self, node: &Self::Node) -> datafusion_common::Result<TreeNodeRecursion> {
|
||||
if let LogicalPlan::Aggregate(aggregate) = node {
|
||||
self.group_exprs = Some(aggregate.group_expr.iter().cloned().collect());
|
||||
debug!("Group by exprs: {:?}", self.group_exprs);
|
||||
debug!(
|
||||
"FindGroupByFinalName: Get Group by exprs from Aggregate: {:?}",
|
||||
self.group_exprs
|
||||
);
|
||||
} else if let LogicalPlan::Distinct(distinct) = node {
|
||||
debug!("Distinct: {:#?}", distinct);
|
||||
debug!("FindGroupByFinalName: Distinct: {}", node);
|
||||
match distinct {
|
||||
Distinct::All(input) => {
|
||||
if let LogicalPlan::TableScan(table_scan) = &**input {
|
||||
@@ -162,7 +165,10 @@ impl TreeNodeVisitor<'_> for FindGroupByFinalName {
|
||||
self.group_exprs = Some(distinct_on.on_expr.iter().cloned().collect())
|
||||
}
|
||||
}
|
||||
debug!("Group by exprs: {:?}", self.group_exprs);
|
||||
debug!(
|
||||
"FindGroupByFinalName: Get Group by exprs from Distinct: {:?}",
|
||||
self.group_exprs
|
||||
);
|
||||
}
|
||||
|
||||
Ok(TreeNodeRecursion::Continue)
|
||||
|
||||
@@ -18,9 +18,9 @@
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use hydroflow::scheduled::graph::Hydroflow;
|
||||
use hydroflow::scheduled::graph_ext::GraphExt;
|
||||
use hydroflow::scheduled::port::{PortCtx, SEND};
|
||||
use dfir_rs::scheduled::graph::Dfir;
|
||||
use dfir_rs::scheduled::graph_ext::GraphExt;
|
||||
use dfir_rs::scheduled::port::{PortCtx, SEND};
|
||||
use itertools::Itertools;
|
||||
use snafu::OptionExt;
|
||||
|
||||
@@ -38,7 +38,7 @@ mod src_sink;
|
||||
/// The Context for build a Operator with id of `GlobalId`
|
||||
pub struct Context<'referred, 'df> {
|
||||
pub id: GlobalId,
|
||||
pub df: &'referred mut Hydroflow<'df>,
|
||||
pub df: &'referred mut Dfir<'df>,
|
||||
pub compute_state: &'referred mut DataflowState,
|
||||
/// a list of all collections being used in the operator
|
||||
///
|
||||
@@ -361,16 +361,16 @@ mod test {
|
||||
use std::cell::RefCell;
|
||||
use std::rc::Rc;
|
||||
|
||||
use hydroflow::scheduled::graph::Hydroflow;
|
||||
use hydroflow::scheduled::graph_ext::GraphExt;
|
||||
use hydroflow::scheduled::handoff::VecHandoff;
|
||||
use dfir_rs::scheduled::graph::Dfir;
|
||||
use dfir_rs::scheduled::graph_ext::GraphExt;
|
||||
use dfir_rs::scheduled::handoff::VecHandoff;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
use super::*;
|
||||
use crate::repr::Row;
|
||||
pub fn run_and_check(
|
||||
state: &mut DataflowState,
|
||||
df: &mut Hydroflow,
|
||||
df: &mut Dfir,
|
||||
time_range: std::ops::Range<i64>,
|
||||
expected: BTreeMap<i64, Vec<DiffRow>>,
|
||||
output: Rc<RefCell<Vec<DiffRow>>>,
|
||||
@@ -416,7 +416,7 @@ mod test {
|
||||
}
|
||||
|
||||
pub fn harness_test_ctx<'r, 'h>(
|
||||
df: &'r mut Hydroflow<'h>,
|
||||
df: &'r mut Dfir<'h>,
|
||||
state: &'r mut DataflowState,
|
||||
) -> Context<'r, 'h> {
|
||||
let err_collector = state.get_err_collector();
|
||||
@@ -436,7 +436,7 @@ mod test {
|
||||
/// that is it only emit once, not multiple times
|
||||
#[test]
|
||||
fn test_render_constant() {
|
||||
let mut df = Hydroflow::new();
|
||||
let mut df = Dfir::new();
|
||||
let mut state = DataflowState::default();
|
||||
let mut ctx = harness_test_ctx(&mut df, &mut state);
|
||||
|
||||
@@ -473,7 +473,7 @@ mod test {
|
||||
/// a simple example to show how to use source and sink
|
||||
#[test]
|
||||
fn example_source_sink() {
|
||||
let mut df = Hydroflow::new();
|
||||
let mut df = Dfir::new();
|
||||
let (send_port, recv_port) = df.make_edge::<_, VecHandoff<i32>>("test_handoff");
|
||||
df.add_subgraph_source("test_handoff_source", send_port, move |_ctx, send| {
|
||||
for i in 0..10 {
|
||||
@@ -498,8 +498,8 @@ mod test {
|
||||
|
||||
#[test]
|
||||
fn test_tee_auto_schedule() {
|
||||
use hydroflow::scheduled::handoff::TeeingHandoff as Toff;
|
||||
let mut df = Hydroflow::new();
|
||||
use dfir_rs::scheduled::handoff::TeeingHandoff as Toff;
|
||||
let mut df = Dfir::new();
|
||||
let (send_port, recv_port) = df.make_edge::<_, Toff<i32>>("test_handoff");
|
||||
let source = df.add_subgraph_source("test_handoff_source", send_port, move |_ctx, send| {
|
||||
for i in 0..10 {
|
||||
|
||||
@@ -14,8 +14,8 @@
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use hydroflow::scheduled::graph_ext::GraphExt;
|
||||
use hydroflow::scheduled::port::{PortCtx, SEND};
|
||||
use dfir_rs::scheduled::graph_ext::GraphExt;
|
||||
use dfir_rs::scheduled::port::{PortCtx, SEND};
|
||||
use itertools::Itertools;
|
||||
use snafu::OptionExt;
|
||||
|
||||
@@ -256,7 +256,7 @@ fn eval_mfp_core(
|
||||
mod test {
|
||||
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use hydroflow::scheduled::graph::Hydroflow;
|
||||
use dfir_rs::scheduled::graph::Dfir;
|
||||
|
||||
use super::*;
|
||||
use crate::compute::render::test::{get_output_handle, harness_test_ctx, run_and_check};
|
||||
@@ -269,7 +269,7 @@ mod test {
|
||||
/// namely: if mfp operator can schedule a delete at the correct time
|
||||
#[test]
|
||||
fn test_render_mfp_with_temporal() {
|
||||
let mut df = Hydroflow::new();
|
||||
let mut df = Dfir::new();
|
||||
let mut state = DataflowState::default();
|
||||
let mut ctx = harness_test_ctx(&mut df, &mut state);
|
||||
|
||||
@@ -348,7 +348,7 @@ mod test {
|
||||
/// that is it filter the rows correctly
|
||||
#[test]
|
||||
fn test_render_mfp() {
|
||||
let mut df = Hydroflow::new();
|
||||
let mut df = Dfir::new();
|
||||
let mut state = DataflowState::default();
|
||||
let mut ctx = harness_test_ctx(&mut df, &mut state);
|
||||
|
||||
@@ -388,7 +388,7 @@ mod test {
|
||||
/// test if mfp operator can run multiple times within same tick
|
||||
#[test]
|
||||
fn test_render_mfp_multiple_times() {
|
||||
let mut df = Hydroflow::new();
|
||||
let mut df = Dfir::new();
|
||||
let mut state = DataflowState::default();
|
||||
let mut ctx = harness_test_ctx(&mut df, &mut state);
|
||||
|
||||
|
||||
@@ -22,7 +22,7 @@ use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::prelude::DataType;
|
||||
use datatypes::value::{ListValue, Value};
|
||||
use datatypes::vectors::{BooleanVector, NullVector};
|
||||
use hydroflow::scheduled::graph_ext::GraphExt;
|
||||
use dfir_rs::scheduled::graph_ext::GraphExt;
|
||||
use itertools::Itertools;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
|
||||
@@ -1212,7 +1212,7 @@ mod test {
|
||||
|
||||
use common_time::Timestamp;
|
||||
use datatypes::data_type::{ConcreteDataType, ConcreteDataType as CDT};
|
||||
use hydroflow::scheduled::graph::Hydroflow;
|
||||
use dfir_rs::scheduled::graph::Dfir;
|
||||
|
||||
use super::*;
|
||||
use crate::compute::render::test::{get_output_handle, harness_test_ctx, run_and_check};
|
||||
@@ -1228,7 +1228,7 @@ mod test {
|
||||
/// expected: sum(number), window_start, window_end
|
||||
#[test]
|
||||
fn test_tumble_group_by() {
|
||||
let mut df = Hydroflow::new();
|
||||
let mut df = Dfir::new();
|
||||
let mut state = DataflowState::default();
|
||||
let mut ctx = harness_test_ctx(&mut df, &mut state);
|
||||
const START: i64 = 1625097600000;
|
||||
@@ -1389,7 +1389,7 @@ mod test {
|
||||
/// select avg(number) from number;
|
||||
#[test]
|
||||
fn test_avg_eval() {
|
||||
let mut df = Hydroflow::new();
|
||||
let mut df = Dfir::new();
|
||||
let mut state = DataflowState::default();
|
||||
let mut ctx = harness_test_ctx(&mut df, &mut state);
|
||||
|
||||
@@ -1500,7 +1500,7 @@ mod test {
|
||||
/// | col | Int64 |
|
||||
#[test]
|
||||
fn test_basic_distinct() {
|
||||
let mut df = Hydroflow::new();
|
||||
let mut df = Dfir::new();
|
||||
let mut state = DataflowState::default();
|
||||
let mut ctx = harness_test_ctx(&mut df, &mut state);
|
||||
|
||||
@@ -1556,7 +1556,7 @@ mod test {
|
||||
/// | col | Int64 |
|
||||
#[test]
|
||||
fn test_basic_batch_reduce_accum() {
|
||||
let mut df = Hydroflow::new();
|
||||
let mut df = Dfir::new();
|
||||
let mut state = DataflowState::default();
|
||||
let now = state.current_time_ref();
|
||||
let mut ctx = harness_test_ctx(&mut df, &mut state);
|
||||
@@ -1662,7 +1662,7 @@ mod test {
|
||||
/// | col | Int64 |
|
||||
#[test]
|
||||
fn test_basic_reduce_accum() {
|
||||
let mut df = Hydroflow::new();
|
||||
let mut df = Dfir::new();
|
||||
let mut state = DataflowState::default();
|
||||
let mut ctx = harness_test_ctx(&mut df, &mut state);
|
||||
|
||||
@@ -1739,7 +1739,7 @@ mod test {
|
||||
/// this test include even more insert/delete case to cover all case for eval_distinct_core
|
||||
#[test]
|
||||
fn test_delete_reduce_distinct_accum() {
|
||||
let mut df = Hydroflow::new();
|
||||
let mut df = Dfir::new();
|
||||
let mut state = DataflowState::default();
|
||||
let mut ctx = harness_test_ctx(&mut df, &mut state);
|
||||
|
||||
@@ -1818,7 +1818,7 @@ mod test {
|
||||
/// this test include insert and delete which should cover all case for eval_distinct_core
|
||||
#[test]
|
||||
fn test_basic_reduce_distinct_accum() {
|
||||
let mut df = Hydroflow::new();
|
||||
let mut df = Dfir::new();
|
||||
let mut state = DataflowState::default();
|
||||
let mut ctx = harness_test_ctx(&mut df, &mut state);
|
||||
|
||||
@@ -1896,7 +1896,7 @@ mod test {
|
||||
/// | col | Int64 |
|
||||
#[test]
|
||||
fn test_composite_reduce_distinct_accum() {
|
||||
let mut df = Hydroflow::new();
|
||||
let mut df = Dfir::new();
|
||||
let mut state = DataflowState::default();
|
||||
let mut ctx = harness_test_ctx(&mut df, &mut state);
|
||||
|
||||
|
||||
@@ -17,7 +17,7 @@
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use common_telemetry::{debug, trace};
|
||||
use hydroflow::scheduled::graph_ext::GraphExt;
|
||||
use dfir_rs::scheduled::graph_ext::GraphExt;
|
||||
use itertools::Itertools;
|
||||
use snafu::OptionExt;
|
||||
use tokio::sync::broadcast::error::TryRecvError;
|
||||
|
||||
@@ -16,16 +16,16 @@ use std::cell::RefCell;
|
||||
use std::collections::{BTreeMap, VecDeque};
|
||||
use std::rc::Rc;
|
||||
|
||||
use dfir_rs::scheduled::graph::Dfir;
|
||||
use dfir_rs::scheduled::SubgraphId;
|
||||
use get_size2::GetSize;
|
||||
use hydroflow::scheduled::graph::Hydroflow;
|
||||
use hydroflow::scheduled::SubgraphId;
|
||||
|
||||
use crate::compute::types::ErrCollector;
|
||||
use crate::repr::{self, Timestamp};
|
||||
use crate::utils::{ArrangeHandler, Arrangement};
|
||||
|
||||
/// input/output of a dataflow
|
||||
/// One `ComputeState` manage the input/output/schedule of one `Hydroflow`
|
||||
/// One `ComputeState` manage the input/output/schedule of one `Dfir`
|
||||
#[derive(Debug, Default)]
|
||||
pub struct DataflowState {
|
||||
/// it is important to use a deque to maintain the order of subgraph here
|
||||
@@ -38,7 +38,7 @@ pub struct DataflowState {
|
||||
/// Which means it's also the current time in temporal filter to get current correct result
|
||||
as_of: Rc<RefCell<Timestamp>>,
|
||||
/// error collector local to this `ComputeState`,
|
||||
/// useful for distinguishing errors from different `Hydroflow`
|
||||
/// useful for distinguishing errors from different `Dfir`
|
||||
err_collector: ErrCollector,
|
||||
/// save all used arrange in this dataflow, since usually there is no delete operation
|
||||
/// we can just keep track of all used arrange and schedule subgraph when they need to be updated
|
||||
@@ -65,7 +65,7 @@ impl DataflowState {
|
||||
/// schedule all subgraph that need to run with time <= `as_of` and run_available()
|
||||
///
|
||||
/// return true if any subgraph actually executed
|
||||
pub fn run_available_with_schedule(&mut self, df: &mut Hydroflow) -> bool {
|
||||
pub fn run_available_with_schedule(&mut self, df: &mut Dfir) -> bool {
|
||||
// first split keys <= as_of into another map
|
||||
let mut before = self
|
||||
.schedule_subgraph
|
||||
|
||||
@@ -18,10 +18,10 @@ use std::rc::Rc;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_error::ext::ErrorExt;
|
||||
use hydroflow::scheduled::graph::Hydroflow;
|
||||
use hydroflow::scheduled::handoff::TeeingHandoff;
|
||||
use hydroflow::scheduled::port::RecvPort;
|
||||
use hydroflow::scheduled::SubgraphId;
|
||||
use dfir_rs::scheduled::graph::Dfir;
|
||||
use dfir_rs::scheduled::handoff::TeeingHandoff;
|
||||
use dfir_rs::scheduled::port::RecvPort;
|
||||
use dfir_rs::scheduled::SubgraphId;
|
||||
use itertools::Itertools;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
@@ -46,7 +46,7 @@ impl<T: 'static + Clone> Collection<T> {
|
||||
/// clone a collection, require a mutable reference to the hydroflow instance
|
||||
///
|
||||
/// Note: need to be the same hydroflow instance that this collection is created from
|
||||
pub fn clone(&self, df: &mut Hydroflow) -> Self {
|
||||
pub fn clone(&self, df: &mut Dfir) -> Self {
|
||||
Collection {
|
||||
stream: self.stream.tee(df),
|
||||
}
|
||||
@@ -151,7 +151,7 @@ impl<T: 'static> CollectionBundle<T> {
|
||||
}
|
||||
|
||||
impl<T: 'static + Clone> CollectionBundle<T> {
|
||||
pub fn clone(&self, df: &mut Hydroflow) -> Self {
|
||||
pub fn clone(&self, df: &mut Dfir) -> Self {
|
||||
Self {
|
||||
collection: self.collection.clone(df),
|
||||
arranged: self
|
||||
|
||||
@@ -21,7 +21,7 @@ use common_error::ext::BoxedError;
|
||||
use datatypes::prelude::{ConcreteDataType, DataType};
|
||||
use datatypes::value::Value;
|
||||
use datatypes::vectors::{BooleanVector, Helper, VectorRef};
|
||||
use hydroflow::lattices::cc_traits::Iter;
|
||||
use dfir_rs::lattices::cc_traits::Iter;
|
||||
use itertools::Itertools;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
|
||||
|
||||
@@ -581,7 +581,7 @@ impl FrontendInvoker {
|
||||
.start_timer();
|
||||
|
||||
self.inserter
|
||||
.handle_row_inserts(requests, ctx, &self.statement_executor, false)
|
||||
.handle_row_inserts(requests, ctx, &self.statement_executor, false, false)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(common_frontend::error::ExternalSnafu)
|
||||
|
||||
@@ -73,7 +73,7 @@ impl GrpcQueryHandler for Instance {
|
||||
let output = match request {
|
||||
Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?,
|
||||
Request::RowInserts(requests) => {
|
||||
self.handle_row_inserts(requests, ctx.clone(), false)
|
||||
self.handle_row_inserts(requests, ctx.clone(), false, false)
|
||||
.await?
|
||||
}
|
||||
Request::Deletes(requests) => self.handle_deletes(requests, ctx.clone()).await?,
|
||||
@@ -411,6 +411,7 @@ impl Instance {
|
||||
requests: RowInsertRequests,
|
||||
ctx: QueryContextRef,
|
||||
accommodate_existing_schema: bool,
|
||||
is_single_value: bool,
|
||||
) -> Result<Output> {
|
||||
self.inserter
|
||||
.handle_row_inserts(
|
||||
@@ -418,6 +419,7 @@ impl Instance {
|
||||
ctx,
|
||||
self.statement_executor.as_ref(),
|
||||
accommodate_existing_schema,
|
||||
is_single_value,
|
||||
)
|
||||
.await
|
||||
.context(TableOperationSnafu)
|
||||
@@ -430,7 +432,14 @@ impl Instance {
|
||||
ctx: QueryContextRef,
|
||||
) -> Result<Output> {
|
||||
self.inserter
|
||||
.handle_last_non_null_inserts(requests, ctx, self.statement_executor.as_ref(), true)
|
||||
.handle_last_non_null_inserts(
|
||||
requests,
|
||||
ctx,
|
||||
self.statement_executor.as_ref(),
|
||||
true,
|
||||
// Influx protocol may writes multiple fields (values).
|
||||
false,
|
||||
)
|
||||
.await
|
||||
.context(TableOperationSnafu)
|
||||
}
|
||||
|
||||
@@ -52,8 +52,9 @@ impl OpentsdbProtocolHandler for Instance {
|
||||
None
|
||||
};
|
||||
|
||||
// OpenTSDB is single value.
|
||||
let output = self
|
||||
.handle_row_inserts(requests, ctx, true)
|
||||
.handle_row_inserts(requests, ctx, true, true)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(servers::error::ExecuteGrpcQuerySnafu)?;
|
||||
|
||||
@@ -63,7 +63,7 @@ impl OpenTelemetryProtocolHandler for Instance {
|
||||
None
|
||||
};
|
||||
|
||||
self.handle_row_inserts(requests, ctx, false)
|
||||
self.handle_row_inserts(requests, ctx, false, false)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::ExecuteGrpcQuerySnafu)
|
||||
|
||||
@@ -195,7 +195,7 @@ impl PromStoreProtocolHandler for Instance {
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::ExecuteGrpcQuerySnafu)?
|
||||
} else {
|
||||
self.handle_row_inserts(request, ctx.clone(), true)
|
||||
self.handle_row_inserts(request, ctx.clone(), true, true)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::ExecuteGrpcQuerySnafu)?
|
||||
|
||||
@@ -206,7 +206,9 @@ impl DataRegion {
|
||||
) -> Result<AffectedRows> {
|
||||
match request.kind {
|
||||
AlterKind::SetRegionOptions { options: _ }
|
||||
| AlterKind::UnsetRegionOptions { keys: _ } => {
|
||||
| AlterKind::UnsetRegionOptions { keys: _ }
|
||||
| AlterKind::SetIndex { options: _ }
|
||||
| AlterKind::UnsetIndex { options: _ } => {
|
||||
let region_id = utils::to_data_region_id(region_id);
|
||||
self.mito
|
||||
.handle_request(region_id, RegionRequest::Alter(request))
|
||||
|
||||
@@ -34,7 +34,8 @@ use store_api::metric_engine_consts::{
|
||||
METADATA_SCHEMA_VALUE_COLUMN_INDEX, METADATA_SCHEMA_VALUE_COLUMN_NAME,
|
||||
};
|
||||
use store_api::mito_engine_options::{
|
||||
APPEND_MODE_KEY, MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING, SKIP_WAL_KEY, TTL_KEY,
|
||||
APPEND_MODE_KEY, MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING, MEMTABLE_TYPE, SKIP_WAL_KEY,
|
||||
TTL_KEY,
|
||||
};
|
||||
use store_api::region_engine::RegionEngine;
|
||||
use store_api::region_request::{AffectedRows, RegionCreateRequest, RegionRequest};
|
||||
@@ -564,6 +565,7 @@ pub(crate) fn region_options_for_metadata_region(
|
||||
original.remove(APPEND_MODE_KEY);
|
||||
// Don't allow to set primary key encoding for metadata region.
|
||||
original.remove(MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING);
|
||||
original.remove(MEMTABLE_TYPE);
|
||||
original.insert(TTL_KEY.to_string(), FOREVER.to_string());
|
||||
original.remove(SKIP_WAL_KEY);
|
||||
original
|
||||
|
||||
@@ -57,7 +57,7 @@ use crate::region::options::MergeMode;
|
||||
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
|
||||
|
||||
/// Initial vector builder capacity.
|
||||
const INITIAL_BUILDER_CAPACITY: usize = 16;
|
||||
const INITIAL_BUILDER_CAPACITY: usize = 4;
|
||||
|
||||
/// Vector builder capacity.
|
||||
const BUILDER_CAPACITY: usize = 512;
|
||||
@@ -645,15 +645,19 @@ struct Series {
|
||||
}
|
||||
|
||||
impl Series {
|
||||
fn new(region_metadata: &RegionMetadataRef) -> Self {
|
||||
pub(crate) fn with_capacity(region_metadata: &RegionMetadataRef, builder_cap: usize) -> Self {
|
||||
Self {
|
||||
pk_cache: None,
|
||||
active: ValueBuilder::new(region_metadata, INITIAL_BUILDER_CAPACITY),
|
||||
active: ValueBuilder::new(region_metadata, builder_cap),
|
||||
frozen: vec![],
|
||||
region_metadata: region_metadata.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn new(region_metadata: &RegionMetadataRef) -> Self {
|
||||
Self::with_capacity(region_metadata, INITIAL_BUILDER_CAPACITY)
|
||||
}
|
||||
|
||||
/// Pushes a row of values into Series. Return the size of values.
|
||||
fn push<'a>(
|
||||
&mut self,
|
||||
|
||||
@@ -147,7 +147,7 @@ impl Inserter {
|
||||
statement_executor: &StatementExecutor,
|
||||
) -> Result<Output> {
|
||||
let row_inserts = ColumnToRow::convert(requests)?;
|
||||
self.handle_row_inserts(row_inserts, ctx, statement_executor, false)
|
||||
self.handle_row_inserts(row_inserts, ctx, statement_executor, false, false)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -158,6 +158,7 @@ impl Inserter {
|
||||
ctx: QueryContextRef,
|
||||
statement_executor: &StatementExecutor,
|
||||
accommodate_existing_schema: bool,
|
||||
is_single_value: bool,
|
||||
) -> Result<Output> {
|
||||
preprocess_row_insert_requests(&mut requests.inserts)?;
|
||||
self.handle_row_inserts_with_create_type(
|
||||
@@ -166,6 +167,7 @@ impl Inserter {
|
||||
statement_executor,
|
||||
AutoCreateTableType::Physical,
|
||||
accommodate_existing_schema,
|
||||
is_single_value,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -183,6 +185,7 @@ impl Inserter {
|
||||
statement_executor,
|
||||
AutoCreateTableType::Log,
|
||||
false,
|
||||
false,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -199,6 +202,7 @@ impl Inserter {
|
||||
statement_executor,
|
||||
AutoCreateTableType::Trace,
|
||||
false,
|
||||
false,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -210,6 +214,7 @@ impl Inserter {
|
||||
ctx: QueryContextRef,
|
||||
statement_executor: &StatementExecutor,
|
||||
accommodate_existing_schema: bool,
|
||||
is_single_value: bool,
|
||||
) -> Result<Output> {
|
||||
self.handle_row_inserts_with_create_type(
|
||||
requests,
|
||||
@@ -217,6 +222,7 @@ impl Inserter {
|
||||
statement_executor,
|
||||
AutoCreateTableType::LastNonNull,
|
||||
accommodate_existing_schema,
|
||||
is_single_value,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -229,6 +235,7 @@ impl Inserter {
|
||||
statement_executor: &StatementExecutor,
|
||||
create_type: AutoCreateTableType,
|
||||
accommodate_existing_schema: bool,
|
||||
is_single_value: bool,
|
||||
) -> Result<Output> {
|
||||
// remove empty requests
|
||||
requests.inserts.retain(|req| {
|
||||
@@ -249,6 +256,7 @@ impl Inserter {
|
||||
create_type,
|
||||
statement_executor,
|
||||
accommodate_existing_schema,
|
||||
is_single_value,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -299,6 +307,7 @@ impl Inserter {
|
||||
AutoCreateTableType::Logical(physical_table.to_string()),
|
||||
statement_executor,
|
||||
true,
|
||||
true,
|
||||
)
|
||||
.await?;
|
||||
let name_to_info = table_infos
|
||||
@@ -464,9 +473,10 @@ impl Inserter {
|
||||
/// This mapping is used in the conversion of RowToRegion.
|
||||
///
|
||||
/// `accommodate_existing_schema` is used to determine if the existing schema should override the new schema.
|
||||
/// It only works for TIME_INDEX and VALUE columns. This is for the case where the user creates a table with
|
||||
/// It only works for TIME_INDEX and single VALUE columns. This is for the case where the user creates a table with
|
||||
/// custom schema, and then inserts data with endpoints that have default schema setting, like prometheus
|
||||
/// remote write. This will modify the `RowInsertRequests` in place.
|
||||
/// `is_single_value` indicates whether the default schema only contains single value column so we can accommodate it.
|
||||
async fn create_or_alter_tables_on_demand(
|
||||
&self,
|
||||
requests: &mut RowInsertRequests,
|
||||
@@ -474,6 +484,7 @@ impl Inserter {
|
||||
auto_create_table_type: AutoCreateTableType,
|
||||
statement_executor: &StatementExecutor,
|
||||
accommodate_existing_schema: bool,
|
||||
is_single_value: bool,
|
||||
) -> Result<CreateAlterTableResult> {
|
||||
let _timer = crate::metrics::CREATE_ALTER_ON_DEMAND
|
||||
.with_label_values(&[auto_create_table_type.as_str()])
|
||||
@@ -537,6 +548,7 @@ impl Inserter {
|
||||
&table,
|
||||
ctx,
|
||||
accommodate_existing_schema,
|
||||
is_single_value,
|
||||
)? {
|
||||
alter_tables.push(alter_expr);
|
||||
}
|
||||
@@ -811,12 +823,15 @@ impl Inserter {
|
||||
/// When `accommodate_existing_schema` is true, it may modify the input `req` to
|
||||
/// accommodate it with existing schema. See [`create_or_alter_tables_on_demand`](Self::create_or_alter_tables_on_demand)
|
||||
/// for more details.
|
||||
/// When `accommodate_existing_schema` is true and `is_single_value` is true, it also consider fields when modifying the
|
||||
/// input `req`.
|
||||
fn get_alter_table_expr_on_demand(
|
||||
&self,
|
||||
req: &mut RowInsertRequest,
|
||||
table: &TableRef,
|
||||
ctx: &QueryContextRef,
|
||||
accommodate_existing_schema: bool,
|
||||
is_single_value: bool,
|
||||
) -> Result<Option<AlterTableExpr>> {
|
||||
let catalog_name = ctx.current_catalog();
|
||||
let schema_name = ctx.current_schema();
|
||||
@@ -834,18 +849,20 @@ impl Inserter {
|
||||
let table_schema = table.schema();
|
||||
// Find timestamp column name
|
||||
let ts_col_name = table_schema.timestamp_column().map(|c| c.name.clone());
|
||||
// Find field column name if there is only one
|
||||
// Find field column name if there is only one and `is_single_value` is true.
|
||||
let mut field_col_name = None;
|
||||
let mut multiple_field_cols = false;
|
||||
table.field_columns().for_each(|col| {
|
||||
if field_col_name.is_none() {
|
||||
field_col_name = Some(col.name.clone());
|
||||
} else {
|
||||
multiple_field_cols = true;
|
||||
if is_single_value {
|
||||
let mut multiple_field_cols = false;
|
||||
table.field_columns().for_each(|col| {
|
||||
if field_col_name.is_none() {
|
||||
field_col_name = Some(col.name.clone());
|
||||
} else {
|
||||
multiple_field_cols = true;
|
||||
}
|
||||
});
|
||||
if multiple_field_cols {
|
||||
field_col_name = None;
|
||||
}
|
||||
});
|
||||
if multiple_field_cols {
|
||||
field_col_name = None;
|
||||
}
|
||||
|
||||
// Update column name in request schema for Timestamp/Field columns
|
||||
@@ -871,11 +888,11 @@ impl Inserter {
|
||||
}
|
||||
}
|
||||
|
||||
// Remove from add_columns any column that is timestamp or field (if there is only one field column)
|
||||
// Only keep columns that are tags or non-single field.
|
||||
add_columns.add_columns.retain(|col| {
|
||||
let def = col.column_def.as_ref().unwrap();
|
||||
def.semantic_type != SemanticType::Timestamp as i32
|
||||
&& (def.semantic_type != SemanticType::Field as i32 && field_col_name.is_some())
|
||||
def.semantic_type == SemanticType::Tag as i32
|
||||
|| (def.semantic_type == SemanticType::Field as i32 && field_col_name.is_none())
|
||||
});
|
||||
|
||||
if add_columns.add_columns.is_empty() {
|
||||
@@ -1227,7 +1244,7 @@ mod tests {
|
||||
)),
|
||||
);
|
||||
let alter_expr = inserter
|
||||
.get_alter_table_expr_on_demand(&mut req, &table, &ctx, true)
|
||||
.get_alter_table_expr_on_demand(&mut req, &table, &ctx, true, true)
|
||||
.unwrap();
|
||||
assert!(alter_expr.is_none());
|
||||
|
||||
|
||||
@@ -250,6 +250,7 @@ impl PipelineTable {
|
||||
Self::query_ctx(&table_info),
|
||||
&self.statement_executor,
|
||||
false,
|
||||
false,
|
||||
)
|
||||
.await
|
||||
.context(InsertPipelineSnafu)?;
|
||||
|
||||
@@ -17,6 +17,7 @@ use std::any::Any;
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_macro::stack_trace_debug;
|
||||
use common_time::timestamp::TimeUnit;
|
||||
use datafusion::error::DataFusionError;
|
||||
use promql::error::Error as PromqlError;
|
||||
use promql_parser::parser::token::TokenType;
|
||||
@@ -192,6 +193,14 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Timestamp out of range: {} of {:?}", timestamp, unit))]
|
||||
TimestampOutOfRange {
|
||||
timestamp: i64,
|
||||
unit: TimeUnit,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
impl ErrorExt for Error {
|
||||
@@ -211,7 +220,8 @@ impl ErrorExt for Error {
|
||||
| UnsupportedVectorMatch { .. }
|
||||
| CombineTableColumnMismatch { .. }
|
||||
| UnexpectedPlanExpr { .. }
|
||||
| UnsupportedMatcherOp { .. } => StatusCode::InvalidArguments,
|
||||
| UnsupportedMatcherOp { .. }
|
||||
| TimestampOutOfRange { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
UnknownTable { .. } => StatusCode::Internal,
|
||||
|
||||
|
||||
@@ -14,80 +14,73 @@
|
||||
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
|
||||
use common_time::timestamp::TimeUnit;
|
||||
use common_time::Timestamp;
|
||||
use datafusion_common::{Column, ScalarValue};
|
||||
use datafusion_expr::expr::Alias;
|
||||
use datafusion_expr::utils::conjunction;
|
||||
use datafusion_expr::{col, Cast, Expr, LogicalPlan, LogicalPlanBuilder};
|
||||
use datafusion_sql::TableReference;
|
||||
use datatypes::arrow::datatypes::{DataType as ArrowDataType, TimeUnit as ArrowTimeUnit};
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datafusion_expr::{col, Expr, LogicalPlan, LogicalPlanBuilder};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use table::TableRef;
|
||||
|
||||
use crate::promql::error::{DataFusionPlanningSnafu, Result, TimeIndexNotFoundSnafu};
|
||||
use crate::promql::error::{
|
||||
DataFusionPlanningSnafu, Result, TimeIndexNotFoundSnafu, TimestampOutOfRangeSnafu,
|
||||
};
|
||||
|
||||
fn build_time_filter(time_index_expr: Expr, start: i64, end: i64) -> Expr {
|
||||
fn build_time_filter(time_index_expr: Expr, start: Timestamp, end: Timestamp) -> Expr {
|
||||
time_index_expr
|
||||
.clone()
|
||||
.gt_eq(Expr::Literal(ScalarValue::TimestampMillisecond(
|
||||
Some(start),
|
||||
None,
|
||||
)))
|
||||
.and(
|
||||
time_index_expr.lt_eq(Expr::Literal(ScalarValue::TimestampMillisecond(
|
||||
Some(end),
|
||||
None,
|
||||
))),
|
||||
)
|
||||
.gt_eq(Expr::Literal(timestamp_to_scalar_value(start)))
|
||||
.and(time_index_expr.lt_eq(Expr::Literal(timestamp_to_scalar_value(end))))
|
||||
}
|
||||
|
||||
fn timestamp_to_scalar_value(timestamp: Timestamp) -> ScalarValue {
|
||||
let value = timestamp.value();
|
||||
match timestamp.unit() {
|
||||
TimeUnit::Second => ScalarValue::TimestampSecond(Some(value), None),
|
||||
TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(Some(value), None),
|
||||
TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(Some(value), None),
|
||||
TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(Some(value), None),
|
||||
}
|
||||
}
|
||||
|
||||
/// Rewrite label values query to DataFusion logical plan.
|
||||
pub fn rewrite_label_values_query(
|
||||
table: TableRef,
|
||||
mut scan_plan: LogicalPlan,
|
||||
scan_plan: LogicalPlan,
|
||||
mut conditions: Vec<Expr>,
|
||||
label_name: String,
|
||||
start: SystemTime,
|
||||
end: SystemTime,
|
||||
) -> Result<LogicalPlan> {
|
||||
let table_ref = TableReference::partial(
|
||||
table.table_info().schema_name.as_str(),
|
||||
table.table_info().name.as_str(),
|
||||
);
|
||||
let schema = table.schema();
|
||||
let ts_column = schema
|
||||
.timestamp_column()
|
||||
.with_context(|| TimeIndexNotFoundSnafu {
|
||||
table: table.table_info().full_table_name(),
|
||||
})?;
|
||||
let unit = ts_column
|
||||
.data_type
|
||||
.as_timestamp()
|
||||
.map(|data_type| data_type.unit())
|
||||
.with_context(|| TimeIndexNotFoundSnafu {
|
||||
table: table.table_info().full_table_name(),
|
||||
})?;
|
||||
|
||||
let is_time_index_ms =
|
||||
ts_column.data_type == ConcreteDataType::timestamp_millisecond_datatype();
|
||||
// We only support millisecond precision at most.
|
||||
let start =
|
||||
Timestamp::new_millisecond(start.duration_since(UNIX_EPOCH).unwrap().as_millis() as i64);
|
||||
let start = start.convert_to(unit).context(TimestampOutOfRangeSnafu {
|
||||
timestamp: start.value(),
|
||||
unit,
|
||||
})?;
|
||||
let end =
|
||||
Timestamp::new_millisecond(end.duration_since(UNIX_EPOCH).unwrap().as_millis() as i64);
|
||||
let end = end.convert_to(unit).context(TimestampOutOfRangeSnafu {
|
||||
timestamp: end.value(),
|
||||
unit,
|
||||
})?;
|
||||
let time_index_expr = col(Column::from_name(ts_column.name.clone()));
|
||||
|
||||
if !is_time_index_ms {
|
||||
// cast to ms if time_index not in Millisecond precision
|
||||
let expr = vec![
|
||||
col(Column::from_name(label_name.clone())),
|
||||
Expr::Alias(Alias {
|
||||
expr: Box::new(Expr::Cast(Cast {
|
||||
expr: Box::new(time_index_expr.clone()),
|
||||
data_type: ArrowDataType::Timestamp(ArrowTimeUnit::Millisecond, None),
|
||||
})),
|
||||
relation: Some(table_ref),
|
||||
name: ts_column.name.clone(),
|
||||
}),
|
||||
];
|
||||
scan_plan = LogicalPlanBuilder::from(scan_plan)
|
||||
.project(expr)
|
||||
.context(DataFusionPlanningSnafu)?
|
||||
.build()
|
||||
.context(DataFusionPlanningSnafu)?;
|
||||
};
|
||||
|
||||
let start = start.duration_since(UNIX_EPOCH).unwrap().as_millis() as i64;
|
||||
let end = end.duration_since(UNIX_EPOCH).unwrap().as_millis() as i64;
|
||||
|
||||
conditions.push(build_time_filter(time_index_expr, start, end));
|
||||
// Safety: `conditions` is not empty.
|
||||
let filter = conjunction(conditions).unwrap();
|
||||
|
||||
@@ -458,6 +458,7 @@ pub async fn setup_test_http_app_with_frontend_and_user_provider(
|
||||
))
|
||||
.with_log_ingest_handler(instance.fe_instance().clone(), None, None)
|
||||
.with_logs_handler(instance.fe_instance().clone())
|
||||
.with_influxdb_handler(instance.fe_instance().clone())
|
||||
.with_otlp_handler(instance.fe_instance().clone())
|
||||
.with_jaeger_handler(instance.fe_instance().clone())
|
||||
.with_greptime_config_options(instance.opts.to_toml().unwrap());
|
||||
@@ -491,11 +492,15 @@ pub async fn setup_test_prom_app_with_frontend(
|
||||
// build physical table
|
||||
let sql = "CREATE TABLE phy (ts timestamp time index, val double, host string primary key) engine=metric with ('physical_metric_table' = '')";
|
||||
run_sql(sql, &instance).await;
|
||||
let sql = "CREATE TABLE phy_ns (ts timestamp(0) time index, val double, host string primary key) engine=metric with ('physical_metric_table' = '')";
|
||||
run_sql(sql, &instance).await;
|
||||
// build metric tables
|
||||
let sql = "CREATE TABLE demo (ts timestamp time index, val double, host string primary key) engine=metric with ('on_physical_table' = 'phy')";
|
||||
run_sql(sql, &instance).await;
|
||||
let sql = "CREATE TABLE demo_metrics (ts timestamp time index, val double, idc string primary key) engine=metric with ('on_physical_table' = 'phy')";
|
||||
run_sql(sql, &instance).await;
|
||||
let sql = "CREATE TABLE multi_labels (ts timestamp(0) time index, val double, idc string, env string, host string, primary key (idc, env, host)) engine=metric with ('on_physical_table' = 'phy_ns')";
|
||||
run_sql(sql, &instance).await;
|
||||
|
||||
// insert rows
|
||||
let sql = "INSERT INTO demo(host, val, ts) VALUES ('host1', 1.1, 0), ('host2', 2.1, 600000)";
|
||||
@@ -507,6 +512,10 @@ pub async fn setup_test_prom_app_with_frontend(
|
||||
let sql = "INSERT INTO demo_metrics(val, ts) VALUES (1.1, 0)";
|
||||
run_sql(sql, &instance).await;
|
||||
|
||||
// insert rows to multi_labels
|
||||
let sql = "INSERT INTO multi_labels(idc, env, host, val, ts) VALUES ('idc1', 'dev', 'host1', 1.1, 0), ('idc1', 'dev', 'host2', 2.1, 0), ('idc2', 'dev', 'host1', 1.1, 0), ('idc2', 'test', 'host3', 2.1, 0)";
|
||||
run_sql(sql, &instance).await;
|
||||
|
||||
// build physical table
|
||||
let sql = "CREATE TABLE phy2 (ts timestamp(9) time index, val double, host string primary key) engine=metric with ('physical_metric_table' = '')";
|
||||
run_sql(sql, &instance).await;
|
||||
|
||||
@@ -113,6 +113,8 @@ macro_rules! http_tests {
|
||||
test_log_query,
|
||||
test_jaeger_query_api,
|
||||
test_jaeger_query_api_for_trace_v1,
|
||||
|
||||
test_influxdb_write,
|
||||
);
|
||||
)*
|
||||
};
|
||||
@@ -602,8 +604,10 @@ pub async fn test_prom_http_api(store_type: StorageType) {
|
||||
assert_eq!(body.status, "success");
|
||||
assert_eq!(
|
||||
body.data,
|
||||
serde_json::from_value::<PrometheusResponse>(json!(["__name__", "host", "idc", "number",]))
|
||||
.unwrap()
|
||||
serde_json::from_value::<PrometheusResponse>(json!([
|
||||
"__name__", "env", "host", "idc", "number",
|
||||
]))
|
||||
.unwrap()
|
||||
);
|
||||
|
||||
// labels query with multiple match[] params
|
||||
@@ -724,6 +728,19 @@ pub async fn test_prom_http_api(store_type: StorageType) {
|
||||
serde_json::from_value::<PrometheusResponse>(json!(["idc1"])).unwrap()
|
||||
);
|
||||
|
||||
// match labels.
|
||||
let res = client
|
||||
.get("/v1/prometheus/api/v1/label/host/values?match[]=multi_labels{idc=\"idc1\", env=\"dev\"}&start=0&end=600")
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
let body = serde_json::from_str::<PrometheusJsonResponse>(&res.text().await).unwrap();
|
||||
assert_eq!(body.status, "success");
|
||||
assert_eq!(
|
||||
body.data,
|
||||
serde_json::from_value::<PrometheusResponse>(json!(["host1", "host2"])).unwrap()
|
||||
);
|
||||
|
||||
// search field name
|
||||
let res = client
|
||||
.get("/v1/prometheus/api/v1/label/__field__/values?match[]=demo")
|
||||
@@ -813,6 +830,7 @@ pub async fn test_prom_http_api(store_type: StorageType) {
|
||||
"demo_metrics_with_nanos".to_string(),
|
||||
"logic_table".to_string(),
|
||||
"mito".to_string(),
|
||||
"multi_labels".to_string(),
|
||||
"numbers".to_string()
|
||||
])
|
||||
);
|
||||
@@ -3937,6 +3955,52 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) {
|
||||
guard.remove_all().await;
|
||||
}
|
||||
|
||||
pub async fn test_influxdb_write(store_type: StorageType) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let (app, mut guard) =
|
||||
setup_test_http_app_with_frontend(store_type, "test_influxdb_write").await;
|
||||
|
||||
let client = TestClient::new(app).await;
|
||||
|
||||
// Only write field cpu.
|
||||
let result = client
|
||||
.post("/v1/influxdb/write?db=public&p=greptime&u=greptime")
|
||||
.body("test_alter,host=host1 cpu=1.2 1664370459457010101")
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(result.status(), 204);
|
||||
assert!(result.text().await.is_empty());
|
||||
|
||||
// Only write field mem.
|
||||
let result = client
|
||||
.post("/v1/influxdb/write?db=public&p=greptime&u=greptime")
|
||||
.body("test_alter,host=host1 mem=10240.0 1664370469457010101")
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(result.status(), 204);
|
||||
assert!(result.text().await.is_empty());
|
||||
|
||||
// Write field cpu & mem.
|
||||
let result = client
|
||||
.post("/v1/influxdb/write?db=public&p=greptime&u=greptime")
|
||||
.body("test_alter,host=host1 cpu=3.2,mem=20480.0 1664370479457010101")
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(result.status(), 204);
|
||||
assert!(result.text().await.is_empty());
|
||||
|
||||
let expected = r#"[["host1",1.2,1664370459457010101,null],["host1",null,1664370469457010101,10240.0],["host1",3.2,1664370479457010101,20480.0]]"#;
|
||||
validate_data(
|
||||
"test_influxdb_write",
|
||||
&client,
|
||||
"select * from test_alter order by ts;",
|
||||
expected,
|
||||
)
|
||||
.await;
|
||||
|
||||
guard.remove_all().await;
|
||||
}
|
||||
|
||||
async fn validate_data(test_name: &str, client: &TestClient, sql: &str, expected: &str) {
|
||||
let res = client
|
||||
.get(format!("/v1/sql?sql={sql}").as_str())
|
||||
|
||||
109
tests/cases/standalone/common/alter/alter_physical_table.result
Normal file
109
tests/cases/standalone/common/alter/alter_physical_table.result
Normal file
@@ -0,0 +1,109 @@
|
||||
CREATE TABLE phy (ts timestamp time index, val double, a_label STRING, PRIMARY KEY(a_label)) engine=metric with ("physical_metric_table" = "");
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
ALTER TABLE phy ADD COLUMN b_label STRING PRIMARY KEY;
|
||||
|
||||
Error: 1001(Unsupported), Alter request to physical region is forbidden
|
||||
|
||||
ALTER TABLE phy DROP COLUMN a_label;
|
||||
|
||||
Error: 1004(InvalidArguments), Not allowed to remove index column a_label from table phy
|
||||
|
||||
ALTER TABLE phy SET 'ttl'='1d';
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
SHOW CREATE TABLE phy;
|
||||
|
||||
+-------+------------------------------------+
|
||||
| Table | Create Table |
|
||||
+-------+------------------------------------+
|
||||
| phy | CREATE TABLE IF NOT EXISTS "phy" ( |
|
||||
| | "ts" TIMESTAMP(3) NOT NULL, |
|
||||
| | "val" DOUBLE NULL, |
|
||||
| | "a_label" STRING NULL, |
|
||||
| | TIME INDEX ("ts"), |
|
||||
| | PRIMARY KEY ("a_label") |
|
||||
| | ) |
|
||||
| | |
|
||||
| | ENGINE=metric |
|
||||
| | WITH( |
|
||||
| | physical_metric_table = '', |
|
||||
| | ttl = '1day' |
|
||||
| | ) |
|
||||
+-------+------------------------------------+
|
||||
|
||||
ALTER TABLE phy UNSET 'ttl';
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
SHOW CREATE TABLE phy;
|
||||
|
||||
+-------+------------------------------------+
|
||||
| Table | Create Table |
|
||||
+-------+------------------------------------+
|
||||
| phy | CREATE TABLE IF NOT EXISTS "phy" ( |
|
||||
| | "ts" TIMESTAMP(3) NOT NULL, |
|
||||
| | "val" DOUBLE NULL, |
|
||||
| | "a_label" STRING NULL, |
|
||||
| | TIME INDEX ("ts"), |
|
||||
| | PRIMARY KEY ("a_label") |
|
||||
| | ) |
|
||||
| | |
|
||||
| | ENGINE=metric |
|
||||
| | WITH( |
|
||||
| | physical_metric_table = '' |
|
||||
| | ) |
|
||||
+-------+------------------------------------+
|
||||
|
||||
ALTER TABLE phy MODIFY COLUMN a_label SET INVERTED INDEX;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
SHOW CREATE TABLE phy;
|
||||
|
||||
+-------+-----------------------------------------+
|
||||
| Table | Create Table |
|
||||
+-------+-----------------------------------------+
|
||||
| phy | CREATE TABLE IF NOT EXISTS "phy" ( |
|
||||
| | "ts" TIMESTAMP(3) NOT NULL, |
|
||||
| | "val" DOUBLE NULL, |
|
||||
| | "a_label" STRING NULL INVERTED INDEX, |
|
||||
| | TIME INDEX ("ts"), |
|
||||
| | PRIMARY KEY ("a_label") |
|
||||
| | ) |
|
||||
| | |
|
||||
| | ENGINE=metric |
|
||||
| | WITH( |
|
||||
| | physical_metric_table = '' |
|
||||
| | ) |
|
||||
+-------+-----------------------------------------+
|
||||
|
||||
ALTER TABLE phy MODIFY COLUMN a_label UNSET INVERTED INDEX;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
SHOW CREATE TABLE phy;
|
||||
|
||||
+-------+------------------------------------+
|
||||
| Table | Create Table |
|
||||
+-------+------------------------------------+
|
||||
| phy | CREATE TABLE IF NOT EXISTS "phy" ( |
|
||||
| | "ts" TIMESTAMP(3) NOT NULL, |
|
||||
| | "val" DOUBLE NULL, |
|
||||
| | "a_label" STRING NULL, |
|
||||
| | TIME INDEX ("ts"), |
|
||||
| | PRIMARY KEY ("a_label") |
|
||||
| | ) |
|
||||
| | |
|
||||
| | ENGINE=metric |
|
||||
| | WITH( |
|
||||
| | physical_metric_table = '' |
|
||||
| | ) |
|
||||
+-------+------------------------------------+
|
||||
|
||||
DROP TABLE phy;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
23
tests/cases/standalone/common/alter/alter_physical_table.sql
Normal file
23
tests/cases/standalone/common/alter/alter_physical_table.sql
Normal file
@@ -0,0 +1,23 @@
|
||||
CREATE TABLE phy (ts timestamp time index, val double, a_label STRING, PRIMARY KEY(a_label)) engine=metric with ("physical_metric_table" = "");
|
||||
|
||||
ALTER TABLE phy ADD COLUMN b_label STRING PRIMARY KEY;
|
||||
|
||||
ALTER TABLE phy DROP COLUMN a_label;
|
||||
|
||||
ALTER TABLE phy SET 'ttl'='1d';
|
||||
|
||||
SHOW CREATE TABLE phy;
|
||||
|
||||
ALTER TABLE phy UNSET 'ttl';
|
||||
|
||||
SHOW CREATE TABLE phy;
|
||||
|
||||
ALTER TABLE phy MODIFY COLUMN a_label SET INVERTED INDEX;
|
||||
|
||||
SHOW CREATE TABLE phy;
|
||||
|
||||
ALTER TABLE phy MODIFY COLUMN a_label UNSET INVERTED INDEX;
|
||||
|
||||
SHOW CREATE TABLE phy;
|
||||
|
||||
DROP TABLE phy;
|
||||
Reference in New Issue
Block a user