mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-28 00:42:56 +00:00
Compare commits
5 Commits
flow/admin
...
flow/lb_fe
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3d17d195a3 | ||
|
|
0d4f27a699 | ||
|
|
c4da8bb69d | ||
|
|
0bd8856e2f | ||
|
|
92c5a9f5f4 |
@@ -18,6 +18,7 @@ pub mod create_table;
|
||||
pub mod datanode_handler;
|
||||
pub mod flownode_handler;
|
||||
|
||||
use std::assert_matches::assert_matches;
|
||||
use std::collections::HashMap;
|
||||
|
||||
use api::v1::meta::Partition;
|
||||
@@ -75,8 +76,6 @@ pub async fn create_logical_table(
|
||||
physical_table_id: TableId,
|
||||
table_name: &str,
|
||||
) -> TableId {
|
||||
use std::assert_matches::assert_matches;
|
||||
|
||||
let tasks = vec![test_create_logical_table_task(table_name)];
|
||||
let mut procedure = CreateLogicalTablesProcedure::new(tasks, physical_table_id, ddl_context);
|
||||
let status = procedure.on_prepare().await.unwrap();
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
|
||||
pub mod flow_info;
|
||||
pub(crate) mod flow_name;
|
||||
pub(crate) mod flow_route;
|
||||
pub mod flow_route;
|
||||
pub mod flow_state;
|
||||
mod flownode_addr_helper;
|
||||
pub(crate) mod flownode_flow;
|
||||
|
||||
@@ -114,37 +114,37 @@ impl<'a> MetadataKey<'a, FlowInfoKeyInner> for FlowInfoKeyInner {
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
pub struct FlowInfoValue {
|
||||
/// The source tables used by the flow.
|
||||
pub(crate) source_table_ids: Vec<TableId>,
|
||||
pub source_table_ids: Vec<TableId>,
|
||||
/// The sink table used by the flow.
|
||||
pub(crate) sink_table_name: TableName,
|
||||
pub sink_table_name: TableName,
|
||||
/// Which flow nodes this flow is running on.
|
||||
pub(crate) flownode_ids: BTreeMap<FlowPartitionId, FlownodeId>,
|
||||
pub flownode_ids: BTreeMap<FlowPartitionId, FlownodeId>,
|
||||
/// The catalog name.
|
||||
pub(crate) catalog_name: String,
|
||||
pub catalog_name: String,
|
||||
/// The query context used when create flow.
|
||||
/// Although flow doesn't belong to any schema, this query_context is needed to remember
|
||||
/// the query context when `create_flow` is executed
|
||||
/// for recovering flow using the same sql&query_context after db restart.
|
||||
/// if none, should use default query context
|
||||
#[serde(default)]
|
||||
pub(crate) query_context: Option<crate::rpc::ddl::QueryContext>,
|
||||
pub query_context: Option<crate::rpc::ddl::QueryContext>,
|
||||
/// The flow name.
|
||||
pub(crate) flow_name: String,
|
||||
pub flow_name: String,
|
||||
/// The raw sql.
|
||||
pub(crate) raw_sql: String,
|
||||
pub raw_sql: String,
|
||||
/// The expr of expire.
|
||||
/// Duration in seconds as `i64`.
|
||||
pub(crate) expire_after: Option<i64>,
|
||||
pub expire_after: Option<i64>,
|
||||
/// The comment.
|
||||
pub(crate) comment: String,
|
||||
pub comment: String,
|
||||
/// The options.
|
||||
pub(crate) options: HashMap<String, String>,
|
||||
pub options: HashMap<String, String>,
|
||||
/// The created time
|
||||
#[serde(default)]
|
||||
pub(crate) created_time: DateTime<Utc>,
|
||||
pub created_time: DateTime<Utc>,
|
||||
/// The updated time.
|
||||
#[serde(default)]
|
||||
pub(crate) updated_time: DateTime<Utc>,
|
||||
pub updated_time: DateTime<Utc>,
|
||||
}
|
||||
|
||||
impl FlowInfoValue {
|
||||
|
||||
@@ -133,6 +133,18 @@ pub enum Error {
|
||||
source: datatypes::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Failed to downcast vector of type '{:?}' to type '{:?}'",
|
||||
from_type,
|
||||
to_type
|
||||
))]
|
||||
DowncastVector {
|
||||
from_type: ConcreteDataType,
|
||||
to_type: ConcreteDataType,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Error occurs when performing arrow computation"))]
|
||||
ArrowCompute {
|
||||
#[snafu(source)]
|
||||
@@ -192,6 +204,8 @@ impl ErrorExt for Error {
|
||||
| Error::PhysicalExpr { .. }
|
||||
| Error::RecordBatchSliceIndexOverflow { .. } => StatusCode::Internal,
|
||||
|
||||
Error::DowncastVector { .. } => StatusCode::Unexpected,
|
||||
|
||||
Error::PollStream { .. } => StatusCode::EngineExecuteQuery,
|
||||
|
||||
Error::ArrowCompute { .. } => StatusCode::IllegalState,
|
||||
|
||||
@@ -30,13 +30,16 @@ pub use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecord
|
||||
use datatypes::arrow::compute::SortOptions;
|
||||
pub use datatypes::arrow::record_batch::RecordBatch as DfRecordBatch;
|
||||
use datatypes::arrow::util::pretty;
|
||||
use datatypes::prelude::VectorRef;
|
||||
use datatypes::schema::{Schema, SchemaRef};
|
||||
use datatypes::prelude::{ConcreteDataType, VectorRef};
|
||||
use datatypes::scalars::{ScalarVector, ScalarVectorBuilder};
|
||||
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
|
||||
use datatypes::types::json_type_value_to_string;
|
||||
use datatypes::vectors::{BinaryVector, StringVectorBuilder};
|
||||
use error::Result;
|
||||
use futures::task::{Context, Poll};
|
||||
use futures::{Stream, TryStreamExt};
|
||||
pub use recordbatch::RecordBatch;
|
||||
use snafu::{ensure, ResultExt};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
|
||||
pub trait RecordBatchStream: Stream<Item = Result<RecordBatch>> {
|
||||
fn name(&self) -> &str {
|
||||
@@ -58,6 +61,146 @@ pub struct OrderOption {
|
||||
pub options: SortOptions,
|
||||
}
|
||||
|
||||
/// A wrapper that maps a [RecordBatchStream] to a new [RecordBatchStream] by applying a function to each [RecordBatch].
|
||||
///
|
||||
/// The mapper function is applied to each [RecordBatch] in the stream.
|
||||
/// The schema of the new [RecordBatchStream] is the same as the schema of the inner [RecordBatchStream] after applying the schema mapper function.
|
||||
/// The output ordering of the new [RecordBatchStream] is the same as the output ordering of the inner [RecordBatchStream].
|
||||
/// The metrics of the new [RecordBatchStream] is the same as the metrics of the inner [RecordBatchStream] if it is not `None`.
|
||||
pub struct SendableRecordBatchMapper {
|
||||
inner: SendableRecordBatchStream,
|
||||
/// The mapper function is applied to each [RecordBatch] in the stream.
|
||||
/// The original schema and the mapped schema are passed to the mapper function.
|
||||
mapper: fn(RecordBatch, &SchemaRef, &SchemaRef) -> Result<RecordBatch>,
|
||||
/// The schema of the new [RecordBatchStream] is the same as the schema of the inner [RecordBatchStream] after applying the schema mapper function.
|
||||
schema: SchemaRef,
|
||||
/// Whether the mapper function is applied to each [RecordBatch] in the stream.
|
||||
apply_mapper: bool,
|
||||
}
|
||||
|
||||
/// Maps the json type to string in the batch.
|
||||
///
|
||||
/// The json type is mapped to string by converting the json value to string.
|
||||
/// The batch is updated to have the same number of columns as the original batch,
|
||||
/// but with the json type mapped to string.
|
||||
pub fn map_json_type_to_string(
|
||||
batch: RecordBatch,
|
||||
original_schema: &SchemaRef,
|
||||
mapped_schema: &SchemaRef,
|
||||
) -> Result<RecordBatch> {
|
||||
let mut vectors = Vec::with_capacity(original_schema.column_schemas().len());
|
||||
for (vector, schema) in batch.columns.iter().zip(original_schema.column_schemas()) {
|
||||
if let ConcreteDataType::Json(j) = schema.data_type {
|
||||
let mut string_vector_builder = StringVectorBuilder::with_capacity(vector.len());
|
||||
let binary_vector = vector
|
||||
.as_any()
|
||||
.downcast_ref::<BinaryVector>()
|
||||
.with_context(|| error::DowncastVectorSnafu {
|
||||
from_type: schema.data_type.clone(),
|
||||
to_type: ConcreteDataType::binary_datatype(),
|
||||
})?;
|
||||
for value in binary_vector.iter_data() {
|
||||
let Some(value) = value else {
|
||||
string_vector_builder.push(None);
|
||||
continue;
|
||||
};
|
||||
let string_value =
|
||||
json_type_value_to_string(value, &j.format).with_context(|_| {
|
||||
error::CastVectorSnafu {
|
||||
from_type: schema.data_type.clone(),
|
||||
to_type: ConcreteDataType::string_datatype(),
|
||||
}
|
||||
})?;
|
||||
string_vector_builder.push(Some(string_value.as_str()));
|
||||
}
|
||||
|
||||
let string_vector = string_vector_builder.finish();
|
||||
vectors.push(Arc::new(string_vector) as VectorRef);
|
||||
} else {
|
||||
vectors.push(vector.clone());
|
||||
}
|
||||
}
|
||||
|
||||
RecordBatch::new(mapped_schema.clone(), vectors)
|
||||
}
|
||||
|
||||
/// Maps the json type to string in the schema.
|
||||
///
|
||||
/// The json type is mapped to string by converting the json value to string.
|
||||
/// The schema is updated to have the same number of columns as the original schema,
|
||||
/// but with the json type mapped to string.
|
||||
///
|
||||
/// Returns the new schema and whether the schema needs to be mapped to string.
|
||||
pub fn map_json_type_to_string_schema(schema: SchemaRef) -> (SchemaRef, bool) {
|
||||
let mut new_columns = Vec::with_capacity(schema.column_schemas().len());
|
||||
let mut apply_mapper = false;
|
||||
for column in schema.column_schemas() {
|
||||
if matches!(column.data_type, ConcreteDataType::Json(_)) {
|
||||
new_columns.push(ColumnSchema::new(
|
||||
column.name.to_string(),
|
||||
ConcreteDataType::string_datatype(),
|
||||
column.is_nullable(),
|
||||
));
|
||||
apply_mapper = true;
|
||||
} else {
|
||||
new_columns.push(column.clone());
|
||||
}
|
||||
}
|
||||
(Arc::new(Schema::new(new_columns)), apply_mapper)
|
||||
}
|
||||
|
||||
impl SendableRecordBatchMapper {
|
||||
/// Creates a new [SendableRecordBatchMapper] with the given inner [RecordBatchStream], mapper function, and schema mapper function.
|
||||
pub fn new(
|
||||
inner: SendableRecordBatchStream,
|
||||
mapper: fn(RecordBatch, &SchemaRef, &SchemaRef) -> Result<RecordBatch>,
|
||||
schema_mapper: fn(SchemaRef) -> (SchemaRef, bool),
|
||||
) -> Self {
|
||||
let (mapped_schema, apply_mapper) = schema_mapper(inner.schema());
|
||||
Self {
|
||||
inner,
|
||||
mapper,
|
||||
schema: mapped_schema,
|
||||
apply_mapper,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl RecordBatchStream for SendableRecordBatchMapper {
|
||||
fn name(&self) -> &str {
|
||||
"SendableRecordBatchMapper"
|
||||
}
|
||||
|
||||
fn schema(&self) -> SchemaRef {
|
||||
self.schema.clone()
|
||||
}
|
||||
|
||||
fn output_ordering(&self) -> Option<&[OrderOption]> {
|
||||
self.inner.output_ordering()
|
||||
}
|
||||
|
||||
fn metrics(&self) -> Option<RecordBatchMetrics> {
|
||||
self.inner.metrics()
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for SendableRecordBatchMapper {
|
||||
type Item = Result<RecordBatch>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
if self.apply_mapper {
|
||||
Pin::new(&mut self.inner).poll_next(cx).map(|opt| {
|
||||
opt.map(|result| {
|
||||
result
|
||||
.and_then(|batch| (self.mapper)(batch, &self.inner.schema(), &self.schema))
|
||||
})
|
||||
})
|
||||
} else {
|
||||
Pin::new(&mut self.inner).poll_next(cx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// EmptyRecordBatchStream can be used to create a RecordBatchStream
|
||||
/// that will produce no results
|
||||
pub struct EmptyRecordBatchStream {
|
||||
|
||||
@@ -14,8 +14,9 @@
|
||||
|
||||
//! Frontend client to run flow as batching task which is time-window-aware normal query triggered every tick set by user
|
||||
|
||||
use std::sync::{Arc, Weak};
|
||||
use std::time::SystemTime;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::{Arc, Mutex, Weak};
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
|
||||
use api::v1::greptime_request::Request;
|
||||
use api::v1::CreateTableExpr;
|
||||
@@ -26,20 +27,21 @@ use common_meta::cluster::{NodeInfo, NodeInfoKey, Role};
|
||||
use common_meta::peer::Peer;
|
||||
use common_meta::rpc::store::RangeRequest;
|
||||
use common_query::Output;
|
||||
use common_telemetry::warn;
|
||||
use common_telemetry::{debug, warn};
|
||||
use itertools::Itertools;
|
||||
use meta_client::client::MetaClient;
|
||||
use rand::rng;
|
||||
use rand::seq::SliceRandom;
|
||||
use servers::query_handler::grpc::GrpcQueryHandler;
|
||||
use session::context::{QueryContextBuilder, QueryContextRef};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
|
||||
use crate::batching_mode::task::BatchingTask;
|
||||
use crate::batching_mode::{
|
||||
DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT, FRONTEND_ACTIVITY_TIMEOUT, GRPC_CONN_TIMEOUT,
|
||||
GRPC_MAX_RETRIES,
|
||||
};
|
||||
use crate::error::{ExternalSnafu, InvalidRequestSnafu, NoAvailableFrontendSnafu, UnexpectedSnafu};
|
||||
use crate::{Error, FlowAuthHeader};
|
||||
use crate::metrics::METRIC_FLOW_BATCHING_ENGINE_GUESS_FE_LOAD;
|
||||
use crate::{Error, FlowAuthHeader, FlowId};
|
||||
|
||||
/// Just like [`GrpcQueryHandler`] but use BoxedError
|
||||
///
|
||||
@@ -74,6 +76,105 @@ impl<
|
||||
|
||||
type HandlerMutable = Arc<std::sync::Mutex<Option<Weak<dyn GrpcQueryHandlerWithBoxedError>>>>;
|
||||
|
||||
/// Statistics about running query on this frontend from flownode
|
||||
#[derive(Debug, Default, Clone)]
|
||||
struct FrontendStat {
|
||||
/// The query for flow id has been running since this timestamp
|
||||
since: HashMap<FlowId, Instant>,
|
||||
/// The average query time for each flow id
|
||||
/// This is used to calculate the average query time for each flow id
|
||||
past_query_avg: HashMap<FlowId, (usize, Duration)>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Clone)]
|
||||
pub struct FrontendStats {
|
||||
/// The statistics for each flow id
|
||||
stats: Arc<Mutex<HashMap<String, FrontendStat>>>,
|
||||
}
|
||||
|
||||
impl FrontendStats {
|
||||
pub fn observe(&self, frontend_addr: &str, flow_id: FlowId) -> FrontendStatsGuard {
|
||||
let mut stats = self.stats.lock().expect("Failed to lock frontend stats");
|
||||
let stat = stats.entry(frontend_addr.to_string()).or_default();
|
||||
stat.since.insert(flow_id, Instant::now());
|
||||
|
||||
FrontendStatsGuard {
|
||||
stats: self.stats.clone(),
|
||||
frontend_addr: frontend_addr.to_string(),
|
||||
cur: flow_id,
|
||||
}
|
||||
}
|
||||
|
||||
/// return frontend addrs sorted by load, from lightest to heaviest
|
||||
/// The load is calculated as the total average query time for each flow id plus running query's total running time elapsed
|
||||
pub fn sort_by_load(&self) -> Vec<String> {
|
||||
let stats = self.stats.lock().expect("Failed to lock frontend stats");
|
||||
let fe_load_factor = stats
|
||||
.iter()
|
||||
.map(|(node_addr, stat)| {
|
||||
// total expected avg running time for all currently running queries
|
||||
let total_expect_avg_run_time = stat
|
||||
.since
|
||||
.keys()
|
||||
.map(|f| {
|
||||
let (count, total_duration) =
|
||||
stat.past_query_avg.get(f).unwrap_or(&(0, Duration::ZERO));
|
||||
if *count == 0 {
|
||||
0.0
|
||||
} else {
|
||||
total_duration.as_secs_f64() / *count as f64
|
||||
}
|
||||
})
|
||||
.sum::<f64>();
|
||||
let total_cur_running_time = stat
|
||||
.since
|
||||
.values()
|
||||
.map(|since| since.elapsed().as_secs_f64())
|
||||
.sum::<f64>();
|
||||
(
|
||||
node_addr.to_string(),
|
||||
total_expect_avg_run_time + total_cur_running_time,
|
||||
)
|
||||
})
|
||||
.sorted_by(|(_, load_a), (_, load_b)| {
|
||||
load_a
|
||||
.partial_cmp(load_b)
|
||||
.unwrap_or(std::cmp::Ordering::Equal)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
debug!("Frontend load factor: {:?}", fe_load_factor);
|
||||
for (node_addr, load) in &fe_load_factor {
|
||||
METRIC_FLOW_BATCHING_ENGINE_GUESS_FE_LOAD
|
||||
.with_label_values(&[&node_addr.to_string()])
|
||||
.observe(*load);
|
||||
}
|
||||
fe_load_factor
|
||||
.into_iter()
|
||||
.map(|(addr, _)| addr)
|
||||
.collect::<Vec<_>>()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct FrontendStatsGuard {
|
||||
stats: Arc<Mutex<HashMap<String, FrontendStat>>>,
|
||||
frontend_addr: String,
|
||||
cur: FlowId,
|
||||
}
|
||||
|
||||
impl Drop for FrontendStatsGuard {
|
||||
fn drop(&mut self) {
|
||||
let mut stats = self.stats.lock().expect("Failed to lock frontend stats");
|
||||
if let Some(stat) = stats.get_mut(&self.frontend_addr) {
|
||||
if let Some(since) = stat.since.remove(&self.cur) {
|
||||
let elapsed = since.elapsed();
|
||||
let (count, total_duration) = stat.past_query_avg.entry(self.cur).or_default();
|
||||
*count += 1;
|
||||
*total_duration += elapsed;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A simple frontend client able to execute sql using grpc protocol
|
||||
///
|
||||
/// This is for computation-heavy query which need to offload computation to frontend, lifting the load from flownode
|
||||
@@ -83,6 +184,7 @@ pub enum FrontendClient {
|
||||
meta_client: Arc<MetaClient>,
|
||||
chnl_mgr: ChannelManager,
|
||||
auth: Option<FlowAuthHeader>,
|
||||
fe_stats: FrontendStats,
|
||||
},
|
||||
Standalone {
|
||||
/// for the sake of simplicity still use grpc even in standalone mode
|
||||
@@ -114,6 +216,7 @@ impl FrontendClient {
|
||||
ChannelManager::with_config(cfg)
|
||||
},
|
||||
auth,
|
||||
fe_stats: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -192,6 +295,7 @@ impl FrontendClient {
|
||||
meta_client: _,
|
||||
chnl_mgr,
|
||||
auth,
|
||||
fe_stats,
|
||||
} = self
|
||||
else {
|
||||
return UnexpectedSnafu {
|
||||
@@ -208,8 +312,21 @@ impl FrontendClient {
|
||||
.duration_since(SystemTime::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_millis() as i64;
|
||||
// shuffle the frontends to avoid always pick the same one
|
||||
frontends.shuffle(&mut rng());
|
||||
let node_addrs_by_load = fe_stats.sort_by_load();
|
||||
// index+1 to load order asc, so that the lightest node has load 1 and non-existent node has load 0
|
||||
let addr2load = node_addrs_by_load
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(i, id)| (id.clone(), i + 1))
|
||||
.collect::<HashMap<_, _>>();
|
||||
// sort frontends by load, from lightest to heaviest
|
||||
frontends.sort_by(|(_, a), (_, b)| {
|
||||
// if not even in stats, treat as 0 load since never been queried
|
||||
let load_a = addr2load.get(&a.peer.addr).unwrap_or(&0);
|
||||
let load_b = addr2load.get(&b.peer.addr).unwrap_or(&0);
|
||||
load_a.cmp(load_b)
|
||||
});
|
||||
debug!("Frontend nodes sorted by load: {:?}", frontends);
|
||||
|
||||
// found node with maximum last_activity_ts
|
||||
for (_, node_info) in frontends
|
||||
@@ -257,6 +374,7 @@ impl FrontendClient {
|
||||
create: CreateTableExpr,
|
||||
catalog: &str,
|
||||
schema: &str,
|
||||
task: Option<&BatchingTask>,
|
||||
) -> Result<u32, Error> {
|
||||
self.handle(
|
||||
Request::Ddl(api::v1::DdlRequest {
|
||||
@@ -265,6 +383,7 @@ impl FrontendClient {
|
||||
catalog,
|
||||
schema,
|
||||
&mut None,
|
||||
task,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -276,15 +395,19 @@ impl FrontendClient {
|
||||
catalog: &str,
|
||||
schema: &str,
|
||||
peer_desc: &mut Option<PeerDesc>,
|
||||
task: Option<&BatchingTask>,
|
||||
) -> Result<u32, Error> {
|
||||
match self {
|
||||
FrontendClient::Distributed { .. } => {
|
||||
FrontendClient::Distributed { fe_stats, .. } => {
|
||||
let db = self.get_random_active_frontend(catalog, schema).await?;
|
||||
|
||||
*peer_desc = Some(PeerDesc::Dist {
|
||||
peer: db.peer.clone(),
|
||||
});
|
||||
|
||||
let flow_id = task.map(|t| t.config.flow_id).unwrap_or_default();
|
||||
let _guard = fe_stats.observe(&db.peer.addr, flow_id);
|
||||
|
||||
db.database
|
||||
.handle_with_retry(req.clone(), GRPC_MAX_RETRIES)
|
||||
.await
|
||||
|
||||
@@ -280,7 +280,7 @@ impl BatchingTask {
|
||||
let catalog = &self.config.sink_table_name[0];
|
||||
let schema = &self.config.sink_table_name[1];
|
||||
frontend_client
|
||||
.create(expr.clone(), catalog, schema)
|
||||
.create(expr.clone(), catalog, schema, Some(self))
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
@@ -361,7 +361,7 @@ impl BatchingTask {
|
||||
};
|
||||
|
||||
frontend_client
|
||||
.handle(req, catalog, schema, &mut peer_desc)
|
||||
.handle(req, catalog, schema, &mut peer_desc, Some(self))
|
||||
.await
|
||||
};
|
||||
|
||||
|
||||
@@ -58,6 +58,14 @@ lazy_static! {
|
||||
vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FLOW_BATCHING_ENGINE_GUESS_FE_LOAD: HistogramVec =
|
||||
register_histogram_vec!(
|
||||
"greptime_flow_batching_engine_guess_fe_load",
|
||||
"flow batching engine guessed frontend load",
|
||||
&["fe_addr"],
|
||||
vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FLOW_RUN_INTERVAL_MS: IntGauge =
|
||||
register_int_gauge!("greptime_flow_run_interval_ms", "flow run interval in ms").unwrap();
|
||||
pub static ref METRIC_FLOW_ROWS: IntCounterVec = register_int_counter_vec!(
|
||||
|
||||
@@ -375,7 +375,7 @@ impl HeartbeatMailbox {
|
||||
|
||||
/// Parses the [Instruction] from [MailboxMessage].
|
||||
#[cfg(test)]
|
||||
pub(crate) fn json_instruction(msg: &MailboxMessage) -> Result<Instruction> {
|
||||
pub fn json_instruction(msg: &MailboxMessage) -> Result<Instruction> {
|
||||
let Payload::Json(payload) =
|
||||
msg.payload
|
||||
.as_ref()
|
||||
|
||||
@@ -25,7 +25,10 @@ use common_datasource::object_store::{build_backend, parse_url};
|
||||
use common_datasource::util::find_dir_and_filename;
|
||||
use common_query::Output;
|
||||
use common_recordbatch::adapter::DfRecordBatchStreamAdapter;
|
||||
use common_recordbatch::SendableRecordBatchStream;
|
||||
use common_recordbatch::{
|
||||
map_json_type_to_string, map_json_type_to_string_schema, RecordBatchStream,
|
||||
SendableRecordBatchMapper, SendableRecordBatchStream,
|
||||
};
|
||||
use common_telemetry::{debug, tracing};
|
||||
use datafusion::datasource::DefaultTableSource;
|
||||
use datafusion_common::TableReference as DfTableReference;
|
||||
@@ -57,6 +60,11 @@ impl StatementExecutor {
|
||||
) -> Result<usize> {
|
||||
let threshold = WRITE_BUFFER_THRESHOLD.as_bytes() as usize;
|
||||
|
||||
let stream = Box::pin(SendableRecordBatchMapper::new(
|
||||
stream,
|
||||
map_json_type_to_string,
|
||||
map_json_type_to_string_schema,
|
||||
));
|
||||
match format {
|
||||
Format::Csv(_) => stream_to_csv(
|
||||
Box::pin(DfRecordBatchStreamAdapter::new(stream)),
|
||||
|
||||
@@ -187,24 +187,42 @@ impl StatementExecutor {
|
||||
}
|
||||
);
|
||||
|
||||
// Check if is creating logical table
|
||||
if create_table.engine == METRIC_ENGINE_NAME
|
||||
&& create_table
|
||||
.table_options
|
||||
.contains_key(LOGICAL_TABLE_METADATA_KEY)
|
||||
{
|
||||
return self
|
||||
.create_logical_tables(std::slice::from_ref(create_table), query_ctx)
|
||||
// Create logical tables
|
||||
ensure!(
|
||||
partitions.is_none(),
|
||||
InvalidPartitionRuleSnafu {
|
||||
reason: "logical table in metric engine should not have partition rule, it will be inherited from physical table",
|
||||
}
|
||||
);
|
||||
self.create_logical_tables(std::slice::from_ref(create_table), query_ctx)
|
||||
.await?
|
||||
.into_iter()
|
||||
.next()
|
||||
.context(error::UnexpectedSnafu {
|
||||
violated: "expected to create a logical table",
|
||||
});
|
||||
violated: "expected to create logical tables",
|
||||
})
|
||||
} else {
|
||||
// Create other normal table
|
||||
self.create_non_logic_table(create_table, partitions, query_ctx)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub async fn create_non_logic_table(
|
||||
&self,
|
||||
create_table: &mut CreateTableExpr,
|
||||
partitions: Option<Partitions>,
|
||||
query_ctx: QueryContextRef,
|
||||
) -> Result<TableRef> {
|
||||
let _timer = crate::metrics::DIST_CREATE_TABLE.start_timer();
|
||||
|
||||
// Check if schema exists
|
||||
let schema = self
|
||||
.table_metadata_manager
|
||||
.schema_manager()
|
||||
@@ -214,7 +232,6 @@ impl StatementExecutor {
|
||||
))
|
||||
.await
|
||||
.context(TableMetadataManagerSnafu)?;
|
||||
|
||||
ensure!(
|
||||
schema.is_some(),
|
||||
SchemaNotFoundSnafu {
|
||||
|
||||
@@ -569,6 +569,7 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Convert SQL value error"))]
|
||||
ConvertSqlValue {
|
||||
source: datatypes::error::Error,
|
||||
|
||||
@@ -391,13 +391,16 @@ fn parse_string_options(parser: &mut Parser) -> std::result::Result<(String, Str
|
||||
parser.expect_token(&Token::Eq)?;
|
||||
let value = if parser.parse_keyword(Keyword::NULL) {
|
||||
"".to_string()
|
||||
} else if let Ok(v) = parser.parse_literal_string() {
|
||||
v
|
||||
} else {
|
||||
return Err(ParserError::ParserError(format!(
|
||||
"Unexpected option value for alter table statements, expect string literal or NULL, got: `{}`",
|
||||
parser.next_token()
|
||||
)));
|
||||
let next_token = parser.peek_token();
|
||||
if let Token::Number(number_as_string, _) = next_token.token {
|
||||
parser.advance_token();
|
||||
number_as_string
|
||||
} else {
|
||||
parser.parse_literal_string().map_err(|_|{
|
||||
ParserError::ParserError(format!("Unexpected option value for alter table statements, expect string literal, numeric literal or NULL, got: `{}`", next_token))
|
||||
})?
|
||||
}
|
||||
};
|
||||
Ok((name, value))
|
||||
}
|
||||
@@ -1088,4 +1091,38 @@ mod tests {
|
||||
)
|
||||
.unwrap_err();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_alter_with_numeric_value() {
|
||||
for sql in [
|
||||
"ALTER TABLE test SET 'compaction.twcs.trigger_file_num'=8;",
|
||||
"ALTER TABLE test SET 'compaction.twcs.trigger_file_num'='8';",
|
||||
] {
|
||||
let mut result = ParserContext::create_with_dialect(
|
||||
sql,
|
||||
&GreptimeDbDialect {},
|
||||
ParseOptions::default(),
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(1, result.len());
|
||||
|
||||
let statement = result.remove(0);
|
||||
assert_matches!(statement, Statement::AlterTable { .. });
|
||||
match statement {
|
||||
Statement::AlterTable(alter_table) => {
|
||||
let alter_operation = alter_table.alter_operation();
|
||||
assert_matches!(alter_operation, AlterTableOperation::SetTableOptions { .. });
|
||||
match alter_operation {
|
||||
AlterTableOperation::SetTableOptions { options } => {
|
||||
assert_eq!(options.len(), 1);
|
||||
assert_eq!(options[0].key, "compaction.twcs.trigger_file_num");
|
||||
assert_eq!(options[0].value, "8");
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -211,6 +211,12 @@ impl ColumnExtensions {
|
||||
}
|
||||
}
|
||||
|
||||
/// Partition on columns or values.
|
||||
///
|
||||
/// - `column_list` is the list of columns in `PARTITION ON COLUMNS` clause.
|
||||
/// - `exprs` is the list of expressions in `PARTITION ON VALUES` clause, like
|
||||
/// `host <= 'host1'`, `host > 'host1' and host <= 'host2'` or `host > 'host2'`.
|
||||
/// Each expression stands for a partition.
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
|
||||
pub struct Partitions {
|
||||
pub column_list: Vec<Ident>,
|
||||
|
||||
@@ -1,15 +1,21 @@
|
||||
CREATE TABLE demo(host string, cpu double, memory double, ts TIMESTAMP time index);
|
||||
CREATE TABLE demo(host string, cpu double, memory double, jsons JSON, ts TIMESTAMP time index);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
insert into
|
||||
demo(host, cpu, memory, jsons, ts)
|
||||
values
|
||||
('host1', 66.6, 1024, '{"foo":"bar"}', 1655276557000),
|
||||
('host2', 88.8, 333.3, '{"a":null,"foo":"bar"}', 1655276558000);
|
||||
|
||||
Affected Rows: 2
|
||||
|
||||
insert into
|
||||
demo(host, cpu, memory, ts)
|
||||
values
|
||||
('host1', 66.6, 1024, 1655276557000),
|
||||
('host2', 88.8, 333.3, 1655276558000),
|
||||
('host3', 99.9, 444.4, 1722077263000);
|
||||
|
||||
Affected Rows: 3
|
||||
Affected Rows: 1
|
||||
|
||||
Copy demo TO '${SQLNESS_HOME}/demo/export/csv/demo.csv' with (format='csv');
|
||||
|
||||
@@ -32,6 +38,44 @@ select * from with_filename order by ts;
|
||||
| host2 | 88.8 | 333.3 | 2022-06-15T07:02:38 |
|
||||
+-------+------+--------+---------------------+
|
||||
|
||||
CREATE TABLE with_json(host string, cpu double, memory double, jsons JSON, ts timestamp time index);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
Copy with_json FROM '${SQLNESS_HOME}/demo/export/json/demo.json' with (format='json');
|
||||
|
||||
Affected Rows: 3
|
||||
|
||||
select host, cpu, memory, json_to_string(jsons), ts from with_json order by ts;
|
||||
|
||||
+-------+------+--------+---------------------------------+---------------------+
|
||||
| host | cpu | memory | json_to_string(with_json.jsons) | ts |
|
||||
+-------+------+--------+---------------------------------+---------------------+
|
||||
| host1 | 66.6 | 1024.0 | {"foo":"bar"} | 2022-06-15T07:02:37 |
|
||||
| host2 | 88.8 | 333.3 | {"a":null,"foo":"bar"} | 2022-06-15T07:02:38 |
|
||||
| host3 | 99.9 | 444.4 | | 2024-07-27T10:47:43 |
|
||||
+-------+------+--------+---------------------------------+---------------------+
|
||||
|
||||
-- SQLNESS PROTOCOL MYSQL
|
||||
select host, cpu, memory, jsons, ts from demo where host != 'host3';
|
||||
|
||||
+-------+------+--------+------------------------+---------------------+
|
||||
| host | cpu | memory | jsons | ts |
|
||||
+-------+------+--------+------------------------+---------------------+
|
||||
| host1 | 66.6 | 1024 | {"foo":"bar"} | 2022-06-15 07:02:37 |
|
||||
| host2 | 88.8 | 333.3 | {"a":null,"foo":"bar"} | 2022-06-15 07:02:38 |
|
||||
+-------+------+--------+------------------------+---------------------+
|
||||
|
||||
-- SQLNESS PROTOCOL POSTGRES
|
||||
select host, cpu, memory, jsons, ts from demo where host != 'host3';
|
||||
|
||||
+-------+------+--------+------------------------+----------------------------+
|
||||
| host | cpu | memory | jsons | ts |
|
||||
+-------+------+--------+------------------------+----------------------------+
|
||||
| host1 | 66.6 | 1024 | {"foo":"bar"} | 2022-06-15 07:02:37.000000 |
|
||||
| host2 | 88.8 | 333.3 | {"a":null,"foo":"bar"} | 2022-06-15 07:02:38.000000 |
|
||||
+-------+------+--------+------------------------+----------------------------+
|
||||
|
||||
CREATE TABLE with_path(host string, cpu double, memory double, ts timestamp time index);
|
||||
|
||||
Affected Rows: 0
|
||||
@@ -110,6 +154,10 @@ drop table with_filename;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
drop table with_json;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
drop table with_path;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
@@ -1,10 +1,14 @@
|
||||
CREATE TABLE demo(host string, cpu double, memory double, ts TIMESTAMP time index);
|
||||
CREATE TABLE demo(host string, cpu double, memory double, jsons JSON, ts TIMESTAMP time index);
|
||||
|
||||
insert into
|
||||
demo(host, cpu, memory, jsons, ts)
|
||||
values
|
||||
('host1', 66.6, 1024, '{"foo":"bar"}', 1655276557000),
|
||||
('host2', 88.8, 333.3, '{"a":null,"foo":"bar"}', 1655276558000);
|
||||
|
||||
insert into
|
||||
demo(host, cpu, memory, ts)
|
||||
values
|
||||
('host1', 66.6, 1024, 1655276557000),
|
||||
('host2', 88.8, 333.3, 1655276558000),
|
||||
('host3', 99.9, 444.4, 1722077263000);
|
||||
|
||||
Copy demo TO '${SQLNESS_HOME}/demo/export/csv/demo.csv' with (format='csv');
|
||||
@@ -15,6 +19,18 @@ Copy with_filename FROM '${SQLNESS_HOME}/demo/export/csv/demo.csv' with (format=
|
||||
|
||||
select * from with_filename order by ts;
|
||||
|
||||
CREATE TABLE with_json(host string, cpu double, memory double, jsons JSON, ts timestamp time index);
|
||||
|
||||
Copy with_json FROM '${SQLNESS_HOME}/demo/export/json/demo.json' with (format='json');
|
||||
|
||||
select host, cpu, memory, json_to_string(jsons), ts from with_json order by ts;
|
||||
|
||||
-- SQLNESS PROTOCOL MYSQL
|
||||
select host, cpu, memory, jsons, ts from demo where host != 'host3';
|
||||
|
||||
-- SQLNESS PROTOCOL POSTGRES
|
||||
select host, cpu, memory, jsons, ts from demo where host != 'host3';
|
||||
|
||||
CREATE TABLE with_path(host string, cpu double, memory double, ts timestamp time index);
|
||||
|
||||
Copy with_path FROM '${SQLNESS_HOME}/demo/export/csv/' with (format='csv', start_time='2023-06-15 07:02:37');
|
||||
@@ -43,6 +59,8 @@ drop table demo;
|
||||
|
||||
drop table with_filename;
|
||||
|
||||
drop table with_json;
|
||||
|
||||
drop table with_path;
|
||||
|
||||
drop table with_pattern;
|
||||
|
||||
@@ -1,15 +1,21 @@
|
||||
CREATE TABLE demo(host string, cpu double, memory double, ts TIMESTAMP time index);
|
||||
CREATE TABLE demo(host string, cpu double, memory double, jsons JSON, ts TIMESTAMP time index);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
insert into
|
||||
demo(host, cpu, memory, ts)
|
||||
demo(host, cpu, memory, jsons, ts)
|
||||
values
|
||||
('host1', 66.6, 1024, 1655276557000),
|
||||
('host2', 88.8, 333.3, 1655276558000),
|
||||
('host1', 66.6, 1024, '{"foo":"bar"}', 1655276557000),
|
||||
('host2', 88.8, 333.3, '{"a":null,"foo":"bar"}', 1655276558000);
|
||||
|
||||
Affected Rows: 2
|
||||
|
||||
insert into
|
||||
demo(host, cpu, memory, ts)
|
||||
values
|
||||
('host3', 99.9, 444.4, 1722077263000);
|
||||
|
||||
Affected Rows: 3
|
||||
Affected Rows: 1
|
||||
|
||||
Copy demo TO '${SQLNESS_HOME}/demo/export/json/demo.json' with (format='json');
|
||||
|
||||
@@ -32,6 +38,44 @@ select * from with_filename order by ts;
|
||||
| host2 | 88.8 | 333.3 | 2022-06-15T07:02:38 |
|
||||
+-------+------+--------+---------------------+
|
||||
|
||||
CREATE TABLE with_json(host string, cpu double, memory double, jsons JSON, ts timestamp time index);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
Copy with_json FROM '${SQLNESS_HOME}/demo/export/json/demo.json' with (format='json');
|
||||
|
||||
Affected Rows: 3
|
||||
|
||||
select host, cpu, memory, json_to_string(jsons), ts from with_json order by ts;
|
||||
|
||||
+-------+------+--------+---------------------------------+---------------------+
|
||||
| host | cpu | memory | json_to_string(with_json.jsons) | ts |
|
||||
+-------+------+--------+---------------------------------+---------------------+
|
||||
| host1 | 66.6 | 1024.0 | {"foo":"bar"} | 2022-06-15T07:02:37 |
|
||||
| host2 | 88.8 | 333.3 | {"a":null,"foo":"bar"} | 2022-06-15T07:02:38 |
|
||||
| host3 | 99.9 | 444.4 | | 2024-07-27T10:47:43 |
|
||||
+-------+------+--------+---------------------------------+---------------------+
|
||||
|
||||
-- SQLNESS PROTOCOL MYSQL
|
||||
select host, cpu, memory, jsons, ts from demo where host != 'host3';
|
||||
|
||||
+-------+------+--------+------------------------+---------------------+
|
||||
| host | cpu | memory | jsons | ts |
|
||||
+-------+------+--------+------------------------+---------------------+
|
||||
| host1 | 66.6 | 1024 | {"foo":"bar"} | 2022-06-15 07:02:37 |
|
||||
| host2 | 88.8 | 333.3 | {"a":null,"foo":"bar"} | 2022-06-15 07:02:38 |
|
||||
+-------+------+--------+------------------------+---------------------+
|
||||
|
||||
-- SQLNESS PROTOCOL POSTGRES
|
||||
select host, cpu, memory, jsons, ts from demo where host != 'host3';
|
||||
|
||||
+-------+------+--------+------------------------+----------------------------+
|
||||
| host | cpu | memory | jsons | ts |
|
||||
+-------+------+--------+------------------------+----------------------------+
|
||||
| host1 | 66.6 | 1024 | {"foo":"bar"} | 2022-06-15 07:02:37.000000 |
|
||||
| host2 | 88.8 | 333.3 | {"a":null,"foo":"bar"} | 2022-06-15 07:02:38.000000 |
|
||||
+-------+------+--------+------------------------+----------------------------+
|
||||
|
||||
CREATE TABLE with_path(host string, cpu double, memory double, ts timestamp time index);
|
||||
|
||||
Affected Rows: 0
|
||||
@@ -110,6 +154,10 @@ drop table with_filename;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
drop table with_json;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
drop table with_path;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
@@ -1,10 +1,14 @@
|
||||
CREATE TABLE demo(host string, cpu double, memory double, ts TIMESTAMP time index);
|
||||
CREATE TABLE demo(host string, cpu double, memory double, jsons JSON, ts TIMESTAMP time index);
|
||||
|
||||
insert into
|
||||
demo(host, cpu, memory, ts)
|
||||
demo(host, cpu, memory, jsons, ts)
|
||||
values
|
||||
('host1', 66.6, 1024, 1655276557000),
|
||||
('host2', 88.8, 333.3, 1655276558000),
|
||||
('host1', 66.6, 1024, '{"foo":"bar"}', 1655276557000),
|
||||
('host2', 88.8, 333.3, '{"a":null,"foo":"bar"}', 1655276558000);
|
||||
|
||||
insert into
|
||||
demo(host, cpu, memory, ts)
|
||||
values
|
||||
('host3', 99.9, 444.4, 1722077263000);
|
||||
|
||||
Copy demo TO '${SQLNESS_HOME}/demo/export/json/demo.json' with (format='json');
|
||||
@@ -15,6 +19,18 @@ Copy with_filename FROM '${SQLNESS_HOME}/demo/export/json/demo.json' with (forma
|
||||
|
||||
select * from with_filename order by ts;
|
||||
|
||||
CREATE TABLE with_json(host string, cpu double, memory double, jsons JSON, ts timestamp time index);
|
||||
|
||||
Copy with_json FROM '${SQLNESS_HOME}/demo/export/json/demo.json' with (format='json');
|
||||
|
||||
select host, cpu, memory, json_to_string(jsons), ts from with_json order by ts;
|
||||
|
||||
-- SQLNESS PROTOCOL MYSQL
|
||||
select host, cpu, memory, jsons, ts from demo where host != 'host3';
|
||||
|
||||
-- SQLNESS PROTOCOL POSTGRES
|
||||
select host, cpu, memory, jsons, ts from demo where host != 'host3';
|
||||
|
||||
CREATE TABLE with_path(host string, cpu double, memory double, ts timestamp time index);
|
||||
|
||||
Copy with_path FROM '${SQLNESS_HOME}/demo/export/json/' with (format='json', start_time='2022-06-15 07:02:37', end_time='2022-06-15 07:02:39');
|
||||
@@ -43,6 +59,8 @@ drop table demo;
|
||||
|
||||
drop table with_filename;
|
||||
|
||||
drop table with_json;
|
||||
|
||||
drop table with_path;
|
||||
|
||||
drop table with_pattern;
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
CREATE TABLE demo(host string, cpu double, memory double, ts TIMESTAMP time index);
|
||||
CREATE TABLE demo(host string, cpu double, memory double, jsons JSON, ts TIMESTAMP time index);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
@@ -6,14 +6,20 @@ CREATE TABLE demo_2(host string, cpu double, memory double, ts TIMESTAMP time in
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
insert into
|
||||
demo(host, cpu, memory, jsons, ts)
|
||||
values
|
||||
('host1', 66.6, 1024, '{"foo":"bar"}', 1655276557000),
|
||||
('host2', 88.8, 333.3, '{"a":null,"foo":"bar"}', 1655276558000);
|
||||
|
||||
Affected Rows: 2
|
||||
|
||||
insert into
|
||||
demo(host, cpu, memory, ts)
|
||||
values
|
||||
('host1', 66.6, 1024, 1655276557000),
|
||||
('host2', 88.8, 333.3, 1655276558000),
|
||||
('host3', 111.1, 444.4, 1722077263000);
|
||||
|
||||
Affected Rows: 3
|
||||
Affected Rows: 1
|
||||
|
||||
insert into
|
||||
demo_2(host, cpu, memory, ts)
|
||||
@@ -70,6 +76,44 @@ select * from with_path order by ts;
|
||||
| host6 | 222.2 | 555.5 | 2024-07-27T10:47:44 |
|
||||
+-------+-------+--------+---------------------+
|
||||
|
||||
CREATE TABLE with_json(host string, cpu double, memory double, jsons JSON, ts timestamp time index);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
Copy with_json FROM '${SQLNESS_HOME}/demo/export/parquet_files/demo.parquet';
|
||||
|
||||
Affected Rows: 3
|
||||
|
||||
select host, cpu, memory, json_to_string(jsons), ts from with_json order by ts;
|
||||
|
||||
+-------+-------+--------+---------------------------------+---------------------+
|
||||
| host | cpu | memory | json_to_string(with_json.jsons) | ts |
|
||||
+-------+-------+--------+---------------------------------+---------------------+
|
||||
| host1 | 66.6 | 1024.0 | {"foo":"bar"} | 2022-06-15T07:02:37 |
|
||||
| host2 | 88.8 | 333.3 | {"a":null,"foo":"bar"} | 2022-06-15T07:02:38 |
|
||||
| host3 | 111.1 | 444.4 | | 2024-07-27T10:47:43 |
|
||||
+-------+-------+--------+---------------------------------+---------------------+
|
||||
|
||||
-- SQLNESS PROTOCOL MYSQL
|
||||
select host, cpu, memory, jsons, ts from demo where host != 'host3';
|
||||
|
||||
+-------+------+--------+------------------------+---------------------+
|
||||
| host | cpu | memory | jsons | ts |
|
||||
+-------+------+--------+------------------------+---------------------+
|
||||
| host1 | 66.6 | 1024 | {"foo":"bar"} | 2022-06-15 07:02:37 |
|
||||
| host2 | 88.8 | 333.3 | {"a":null,"foo":"bar"} | 2022-06-15 07:02:38 |
|
||||
+-------+------+--------+------------------------+---------------------+
|
||||
|
||||
-- SQLNESS PROTOCOL POSTGRES
|
||||
select host, cpu, memory, jsons, ts from demo where host != 'host3';
|
||||
|
||||
+-------+------+--------+------------------------+----------------------------+
|
||||
| host | cpu | memory | jsons | ts |
|
||||
+-------+------+--------+------------------------+----------------------------+
|
||||
| host1 | 66.6 | 1024 | {"foo":"bar"} | 2022-06-15 07:02:37.000000 |
|
||||
| host2 | 88.8 | 333.3 | {"a":null,"foo":"bar"} | 2022-06-15 07:02:38.000000 |
|
||||
+-------+------+--------+------------------------+----------------------------+
|
||||
|
||||
CREATE TABLE with_pattern(host string, cpu double, memory double, ts timestamp time index);
|
||||
|
||||
Affected Rows: 0
|
||||
@@ -171,6 +215,10 @@ drop table with_filename;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
drop table with_json;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
drop table with_path;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
@@ -1,12 +1,16 @@
|
||||
CREATE TABLE demo(host string, cpu double, memory double, ts TIMESTAMP time index);
|
||||
CREATE TABLE demo(host string, cpu double, memory double, jsons JSON, ts TIMESTAMP time index);
|
||||
|
||||
CREATE TABLE demo_2(host string, cpu double, memory double, ts TIMESTAMP time index);
|
||||
|
||||
insert into
|
||||
demo(host, cpu, memory, jsons, ts)
|
||||
values
|
||||
('host1', 66.6, 1024, '{"foo":"bar"}', 1655276557000),
|
||||
('host2', 88.8, 333.3, '{"a":null,"foo":"bar"}', 1655276558000);
|
||||
|
||||
insert into
|
||||
demo(host, cpu, memory, ts)
|
||||
values
|
||||
('host1', 66.6, 1024, 1655276557000),
|
||||
('host2', 88.8, 333.3, 1655276558000),
|
||||
('host3', 111.1, 444.4, 1722077263000);
|
||||
|
||||
insert into
|
||||
@@ -32,6 +36,18 @@ Copy with_path FROM '${SQLNESS_HOME}/demo/export/parquet_files/';
|
||||
|
||||
select * from with_path order by ts;
|
||||
|
||||
CREATE TABLE with_json(host string, cpu double, memory double, jsons JSON, ts timestamp time index);
|
||||
|
||||
Copy with_json FROM '${SQLNESS_HOME}/demo/export/parquet_files/demo.parquet';
|
||||
|
||||
select host, cpu, memory, json_to_string(jsons), ts from with_json order by ts;
|
||||
|
||||
-- SQLNESS PROTOCOL MYSQL
|
||||
select host, cpu, memory, jsons, ts from demo where host != 'host3';
|
||||
|
||||
-- SQLNESS PROTOCOL POSTGRES
|
||||
select host, cpu, memory, jsons, ts from demo where host != 'host3';
|
||||
|
||||
CREATE TABLE with_pattern(host string, cpu double, memory double, ts timestamp time index);
|
||||
|
||||
Copy with_pattern FROM '${SQLNESS_HOME}/demo/export/parquet_files/' WITH (PATTERN = 'demo.*', start_time='2022-06-15 07:02:39');
|
||||
@@ -70,6 +86,8 @@ drop table demo_2;
|
||||
|
||||
drop table with_filename;
|
||||
|
||||
drop table with_json;
|
||||
|
||||
drop table with_path;
|
||||
|
||||
drop table with_pattern;
|
||||
|
||||
@@ -1,11 +1,15 @@
|
||||
CREATE TABLE demo(host string, cpu DOUBLE, memory DOUBLE, ts TIMESTAMP TIME INDEX);
|
||||
CREATE TABLE demo(host string, cpu DOUBLE, memory DOUBLE, jsons JSON, ts TIMESTAMP TIME INDEX);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
insert into demo(host, cpu, memory, ts) values ('host1', 66.6, 1024, 1655276557000), ('host2', 88.8, 333.3, 1655276558000);
|
||||
insert into demo(host, cpu, memory, jsons, ts) values ('host1', 66.6, 1024, '{"foo":"bar"}', 1655276557000), ('host2', 88.8, 333.3, '{"a":null,"foo":"bar"}', 1655276558000);
|
||||
|
||||
Affected Rows: 2
|
||||
|
||||
insert into demo(host, cpu, memory, ts) values ('host3', 111.1, 444.4, 1722077263000);
|
||||
|
||||
Affected Rows: 1
|
||||
|
||||
COPY demo TO '${SQLNESS_HOME}/export/demo.parquet' WITH (start_time='2022-06-15 07:02:37', end_time='2022-06-15 07:02:38');
|
||||
|
||||
Affected Rows: 1
|
||||
@@ -18,15 +22,15 @@ COPY demo TO '${SQLNESS_HOME}/export/demo.json' WITH (format='json', start_time=
|
||||
|
||||
Affected Rows: 1
|
||||
|
||||
COPY (select host, cpu, ts from demo where host = 'host2') TO '${SQLNESS_HOME}/export/demo.parquet';
|
||||
COPY (select host, cpu, jsons, ts from demo where host = 'host2') TO '${SQLNESS_HOME}/export/demo.parquet';
|
||||
|
||||
Affected Rows: 1
|
||||
|
||||
COPY (select host, cpu, ts from demo where host = 'host2') TO '${SQLNESS_HOME}/export/demo.csv' WITH (format='csv');
|
||||
COPY (select host, cpu, jsons, ts from demo where host = 'host2') TO '${SQLNESS_HOME}/export/demo.csv' WITH (format='csv');
|
||||
|
||||
Affected Rows: 1
|
||||
|
||||
COPY (select host, cpu, ts from demo where host = 'host2') TO '${SQLNESS_HOME}/export/demo.json' WITH (format='json');
|
||||
COPY (select host, cpu, jsons, ts from demo where host = 'host2') TO '${SQLNESS_HOME}/export/demo.json' WITH (format='json');
|
||||
|
||||
Affected Rows: 1
|
||||
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
CREATE TABLE demo(host string, cpu DOUBLE, memory DOUBLE, ts TIMESTAMP TIME INDEX);
|
||||
CREATE TABLE demo(host string, cpu DOUBLE, memory DOUBLE, jsons JSON, ts TIMESTAMP TIME INDEX);
|
||||
|
||||
insert into demo(host, cpu, memory, ts) values ('host1', 66.6, 1024, 1655276557000), ('host2', 88.8, 333.3, 1655276558000);
|
||||
insert into demo(host, cpu, memory, jsons, ts) values ('host1', 66.6, 1024, '{"foo":"bar"}', 1655276557000), ('host2', 88.8, 333.3, '{"a":null,"foo":"bar"}', 1655276558000);
|
||||
|
||||
insert into demo(host, cpu, memory, ts) values ('host3', 111.1, 444.4, 1722077263000);
|
||||
|
||||
COPY demo TO '${SQLNESS_HOME}/export/demo.parquet' WITH (start_time='2022-06-15 07:02:37', end_time='2022-06-15 07:02:38');
|
||||
|
||||
@@ -8,10 +10,10 @@ COPY demo TO '${SQLNESS_HOME}/export/demo.csv' WITH (format='csv', start_time='2
|
||||
|
||||
COPY demo TO '${SQLNESS_HOME}/export/demo.json' WITH (format='json', start_time='2022-06-15 07:02:37', end_time='2022-06-15 07:02:38');
|
||||
|
||||
COPY (select host, cpu, ts from demo where host = 'host2') TO '${SQLNESS_HOME}/export/demo.parquet';
|
||||
COPY (select host, cpu, jsons, ts from demo where host = 'host2') TO '${SQLNESS_HOME}/export/demo.parquet';
|
||||
|
||||
COPY (select host, cpu, ts from demo where host = 'host2') TO '${SQLNESS_HOME}/export/demo.csv' WITH (format='csv');
|
||||
COPY (select host, cpu, jsons, ts from demo where host = 'host2') TO '${SQLNESS_HOME}/export/demo.csv' WITH (format='csv');
|
||||
|
||||
COPY (select host, cpu, ts from demo where host = 'host2') TO '${SQLNESS_HOME}/export/demo.json' WITH (format='json');
|
||||
COPY (select host, cpu, jsons, ts from demo where host = 'host2') TO '${SQLNESS_HOME}/export/demo.json' WITH (format='json');
|
||||
|
||||
drop table demo;
|
||||
|
||||
@@ -0,0 +1,81 @@
|
||||
create table metric_engine_partition (
|
||||
ts timestamp time index,
|
||||
host string primary key,
|
||||
cpu double,
|
||||
)
|
||||
partition on columns (host) (
|
||||
host <= 'host1',
|
||||
host > 'host1' and host <= 'host2',
|
||||
host > 'host2'
|
||||
)
|
||||
engine = metric
|
||||
with (
|
||||
physical_metric_table = "true",
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
select count(*) from metric_engine_partition;
|
||||
|
||||
+----------+
|
||||
| count(*) |
|
||||
+----------+
|
||||
| 0 |
|
||||
+----------+
|
||||
|
||||
create table logical_table_1 (
|
||||
ts timestamp time index,
|
||||
host string primary key,
|
||||
cpu double,
|
||||
)
|
||||
partition on columns (host) ()
|
||||
engine = metric
|
||||
with (
|
||||
on_physical_table = "metric_engine_partition",
|
||||
);
|
||||
|
||||
Error: 1004(InvalidArguments), Invalid partition rule: logical table in metric engine should not have partition rule, it will be inherited from physical table
|
||||
|
||||
create table logical_table_2 (
|
||||
ts timestamp time index,
|
||||
host string primary key,
|
||||
cpu double,
|
||||
)
|
||||
engine = metric
|
||||
with (
|
||||
on_physical_table = "metric_engine_partition",
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
show create table logical_table_2;
|
||||
|
||||
+-----------------+-------------------------------------------------+
|
||||
| Table | Create Table |
|
||||
+-----------------+-------------------------------------------------+
|
||||
| logical_table_2 | CREATE TABLE IF NOT EXISTS "logical_table_2" ( |
|
||||
| | "cpu" DOUBLE NULL, |
|
||||
| | "host" STRING NULL, |
|
||||
| | "ts" TIMESTAMP(3) NOT NULL, |
|
||||
| | TIME INDEX ("ts"), |
|
||||
| | PRIMARY KEY ("host") |
|
||||
| | ) |
|
||||
| | PARTITION ON COLUMNS ("host") ( |
|
||||
| | host <= 'host1', |
|
||||
| | host > 'host2', |
|
||||
| | host > 'host1' AND host <= 'host2' |
|
||||
| | ) |
|
||||
| | ENGINE=metric |
|
||||
| | WITH( |
|
||||
| | on_physical_table = 'metric_engine_partition' |
|
||||
| | ) |
|
||||
+-----------------+-------------------------------------------------+
|
||||
|
||||
drop table logical_table_2;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
drop table metric_engine_partition;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
@@ -0,0 +1,43 @@
|
||||
create table metric_engine_partition (
|
||||
ts timestamp time index,
|
||||
host string primary key,
|
||||
cpu double,
|
||||
)
|
||||
partition on columns (host) (
|
||||
host <= 'host1',
|
||||
host > 'host1' and host <= 'host2',
|
||||
host > 'host2'
|
||||
)
|
||||
engine = metric
|
||||
with (
|
||||
physical_metric_table = "true",
|
||||
);
|
||||
|
||||
select count(*) from metric_engine_partition;
|
||||
|
||||
create table logical_table_1 (
|
||||
ts timestamp time index,
|
||||
host string primary key,
|
||||
cpu double,
|
||||
)
|
||||
partition on columns (host) ()
|
||||
engine = metric
|
||||
with (
|
||||
on_physical_table = "metric_engine_partition",
|
||||
);
|
||||
|
||||
create table logical_table_2 (
|
||||
ts timestamp time index,
|
||||
host string primary key,
|
||||
cpu double,
|
||||
)
|
||||
engine = metric
|
||||
with (
|
||||
on_physical_table = "metric_engine_partition",
|
||||
);
|
||||
|
||||
show create table logical_table_2;
|
||||
|
||||
drop table logical_table_2;
|
||||
|
||||
drop table metric_engine_partition;
|
||||
Reference in New Issue
Block a user