mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-05 12:52:57 +00:00
Compare commits
12 Commits
wyze_recor
...
wyze_with_
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7f7b974e8a | ||
|
|
4875ace0d0 | ||
|
|
a847d96649 | ||
|
|
23a0a54e18 | ||
|
|
78eb8b53f6 | ||
|
|
2455f39e8e | ||
|
|
7fe0074202 | ||
|
|
e16bc203d0 | ||
|
|
9a3c26bb0a | ||
|
|
e1ff398c32 | ||
|
|
780e3000de | ||
|
|
2b5ddf8427 |
@@ -445,10 +445,16 @@ impl Pool {
|
||||
|
||||
async fn recycle_channel_in_loop(pool: Arc<Pool>, interval_secs: u64) {
|
||||
let mut interval = tokio::time::interval(Duration::from_secs(interval_secs));
|
||||
|
||||
// use weak ref here to prevent pool being leaked
|
||||
let pool_weak = Arc::downgrade(&pool);
|
||||
loop {
|
||||
let _ = interval.tick().await;
|
||||
pool.retain_channel(|_, c| c.access.swap(0, Ordering::Relaxed) != 0)
|
||||
if let Some(pool) = pool_weak.upgrade() {
|
||||
pool.retain_channel(|_, c| c.access.swap(0, Ordering::Relaxed) != 0)
|
||||
} else {
|
||||
// no one is using this pool, so we can also let go
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -60,15 +60,13 @@ use crate::Error;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct TimeWindowExpr {
|
||||
expr: PhysicalExprRef,
|
||||
phy_expr: PhysicalExprRef,
|
||||
column_name: String,
|
||||
logical_expr: Expr,
|
||||
df_schema: DFSchema,
|
||||
}
|
||||
|
||||
impl TimeWindowExpr {
|
||||
pub fn new(expr: PhysicalExprRef, column_name: String) -> Self {
|
||||
Self { expr, column_name }
|
||||
}
|
||||
|
||||
pub fn from_expr(expr: &Expr, column_name: &str, df_schema: &DFSchema) -> Result<Self, Error> {
|
||||
let phy_planner = DefaultPhysicalPlanner::default();
|
||||
|
||||
@@ -80,11 +78,24 @@ impl TimeWindowExpr {
|
||||
),
|
||||
})?;
|
||||
Ok(Self {
|
||||
expr: phy_expr,
|
||||
phy_expr,
|
||||
column_name: column_name.to_string(),
|
||||
logical_expr: expr.clone(),
|
||||
df_schema: df_schema.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn eval(
|
||||
&self,
|
||||
current: Timestamp,
|
||||
) -> Result<(Option<Timestamp>, Option<Timestamp>), Error> {
|
||||
let lower_bound =
|
||||
find_expr_time_window_lower_bound(&self.logical_expr, &self.df_schema, current)?;
|
||||
let upper_bound =
|
||||
find_expr_time_window_upper_bound(&self.logical_expr, &self.df_schema, current)?;
|
||||
Ok((lower_bound, upper_bound))
|
||||
}
|
||||
|
||||
/// Find timestamps from rows using time window expr
|
||||
pub async fn handle_rows(
|
||||
&self,
|
||||
@@ -131,12 +142,15 @@ impl TimeWindowExpr {
|
||||
),
|
||||
})?;
|
||||
|
||||
let eval_res = self.expr.evaluate(&rb).with_context(|_| DatafusionSnafu {
|
||||
context: format!(
|
||||
"Failed to evaluate physical expression {:?} on {rb:?}",
|
||||
self.expr
|
||||
),
|
||||
})?;
|
||||
let eval_res = self
|
||||
.phy_expr
|
||||
.evaluate(&rb)
|
||||
.with_context(|_| DatafusionSnafu {
|
||||
context: format!(
|
||||
"Failed to evaluate physical expression {:?} on {rb:?}",
|
||||
self.phy_expr
|
||||
),
|
||||
})?;
|
||||
|
||||
let res = columnar_to_ts_vector(&eval_res)?;
|
||||
|
||||
@@ -447,6 +461,7 @@ pub async fn find_plan_time_window_bound(
|
||||
///
|
||||
/// i.e. for `current="2021-07-01 00:01:01.000"` and `expr=date_bin(INTERVAL '5 minutes', ts) as time_window` and `ts_col=ts`,
|
||||
/// return `Some("2021-07-01 00:00:00.000")` since it's the lower bound
|
||||
/// return `Some("2021-07-01 00:00:00.000")` since it's the lower bound
|
||||
/// of current time window given the current timestamp
|
||||
///
|
||||
/// if return None, meaning this time window have no lower bound
|
||||
@@ -576,7 +591,22 @@ fn eval_ts_to_ts(
|
||||
df_schema: &DFSchema,
|
||||
input_value: Timestamp,
|
||||
) -> Result<Timestamp, Error> {
|
||||
let ts_vector = match input_value.unit() {
|
||||
let schema_ty = df_schema.field(0).data_type();
|
||||
let schema_cdt = ConcreteDataType::from_arrow_type(schema_ty);
|
||||
let schema_unit = if let ConcreteDataType::Timestamp(ts) = schema_cdt {
|
||||
ts.unit()
|
||||
} else {
|
||||
return UnexpectedSnafu {
|
||||
reason: format!("Expect Timestamp, found {:?}", schema_cdt),
|
||||
}
|
||||
.fail();
|
||||
};
|
||||
let input_value = input_value
|
||||
.convert_to(schema_unit)
|
||||
.with_context(|| UnexpectedSnafu {
|
||||
reason: format!("Failed to convert timestamp {input_value:?} to {schema_unit}"),
|
||||
})?;
|
||||
let ts_vector = match schema_unit {
|
||||
TimeUnit::Second => {
|
||||
TimestampSecondVector::from_vec(vec![input_value.value()]).to_arrow_array()
|
||||
}
|
||||
@@ -656,7 +686,19 @@ impl TreeNodeRewriter for AddFilterRewriter {
|
||||
}
|
||||
|
||||
fn df_plan_to_sql(plan: &LogicalPlan) -> Result<String, Error> {
|
||||
let unparser = Unparser::default();
|
||||
/// A dialect that forces all identifiers to be quoted
|
||||
struct ForceQuoteIdentifiers;
|
||||
impl datafusion::sql::unparser::dialect::Dialect for ForceQuoteIdentifiers {
|
||||
fn identifier_quote_style(&self, identifier: &str) -> Option<char> {
|
||||
if identifier.to_lowercase() != identifier {
|
||||
Some('"')
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
let unparser = Unparser::new(&ForceQuoteIdentifiers);
|
||||
// first make all column qualified
|
||||
let sql = unparser
|
||||
.plan_to_sql(plan)
|
||||
.with_context(|_e| DatafusionSnafu {
|
||||
@@ -675,6 +717,22 @@ mod test {
|
||||
use crate::recording_rules::{df_plan_to_sql, AddFilterRewriter};
|
||||
use crate::test_utils::create_test_query_engine;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_sql_plan_convert() {
|
||||
let query_engine = create_test_query_engine();
|
||||
let ctx = QueryContext::arc();
|
||||
let old = r#"SELECT "NUMBER" FROM "UPPERCASE_NUMBERS_WITH_TS""#;
|
||||
let new = sql_to_df_plan(ctx.clone(), query_engine.clone(), old, false)
|
||||
.await
|
||||
.unwrap();
|
||||
let new_sql = df_plan_to_sql(&new).unwrap();
|
||||
|
||||
assert_eq!(
|
||||
r#"SELECT "UPPERCASE_NUMBERS_WITH_TS"."NUMBER" FROM "UPPERCASE_NUMBERS_WITH_TS""#,
|
||||
new_sql
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_add_filter() {
|
||||
let testcases = vec![
|
||||
@@ -723,7 +781,7 @@ mod test {
|
||||
Some(Timestamp::new(1740394109000, TimeUnit::Millisecond)),
|
||||
Some(Timestamp::new(1740394109001, TimeUnit::Millisecond)),
|
||||
),
|
||||
"SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS ts FROM numbers_with_ts WHERE ((ts >= CAST('2025-02-24 10:48:29' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:48:29.001' AS TIMESTAMP))) GROUP BY numbers_with_ts.ts"
|
||||
r#"SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS ts FROM numbers_with_ts WHERE ((ts >= CAST('2025-02-24 10:48:29' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:48:29.001' AS TIMESTAMP))) GROUP BY numbers_with_ts.ts"#
|
||||
),
|
||||
// complex time window index
|
||||
(
|
||||
|
||||
@@ -45,7 +45,7 @@ use crate::error::{
|
||||
TimeSnafu, UnexpectedSnafu,
|
||||
};
|
||||
use crate::metrics::{METRIC_FLOW_RULE_ENGINE_QUERY_TIME, METRIC_FLOW_RULE_ENGINE_SLOW_QUERY};
|
||||
use crate::recording_rules::{find_plan_time_window_bound, find_time_window_expr, sql_to_df_plan};
|
||||
use crate::recording_rules::{find_time_window_expr, sql_to_df_plan};
|
||||
use crate::Error;
|
||||
|
||||
/// TODO(discord9): make those constants configurable
|
||||
@@ -214,6 +214,8 @@ impl RecordingRuleEngine {
|
||||
.map(|expr| TimeWindowExpr::from_expr(&expr, &column_name, &df_schema))
|
||||
.transpose()?;
|
||||
|
||||
info!("Flow id={}, found time window expr={:?}", flow_id, phy_expr);
|
||||
|
||||
let task = RecordingRuleTask::new(
|
||||
flow_id,
|
||||
&sql,
|
||||
@@ -268,11 +270,11 @@ impl RecordingRuleEngine {
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct RecordingRuleTask {
|
||||
flow_id: FlowId,
|
||||
pub flow_id: FlowId,
|
||||
query: String,
|
||||
time_window_expr: Option<TimeWindowExpr>,
|
||||
pub time_window_expr: Option<TimeWindowExpr>,
|
||||
/// in seconds
|
||||
expire_after: Option<i64>,
|
||||
pub expire_after: Option<i64>,
|
||||
sink_table_name: [String; 3],
|
||||
source_table_names: HashSet<[String; 3]>,
|
||||
state: Arc<RwLock<RecordingRuleState>>,
|
||||
@@ -376,6 +378,8 @@ impl RecordingRuleTask {
|
||||
.write()
|
||||
.await
|
||||
.after_query_exec(elapsed, res.is_ok());
|
||||
// drop the result to free client-related resources
|
||||
drop(res);
|
||||
|
||||
let sleep_until = {
|
||||
let mut state = self.state.write().await;
|
||||
@@ -410,30 +414,40 @@ impl RecordingRuleTask {
|
||||
|
||||
let low_bound = Timestamp::new_second(low_bound as i64);
|
||||
|
||||
let plan = sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.query, true).await?;
|
||||
|
||||
// TODO(discord9): find more time window!
|
||||
let (col_name, lower, upper) =
|
||||
find_plan_time_window_bound(&plan, low_bound, query_ctx.clone(), engine.clone())
|
||||
.await?;
|
||||
// TODO(discord9): use time window expr to get the precise expire lower bound
|
||||
let expire_time_window_bound = self
|
||||
.time_window_expr
|
||||
.as_ref()
|
||||
.map(|expr| expr.eval(low_bound))
|
||||
.transpose()?;
|
||||
|
||||
let new_sql = {
|
||||
let expr = {
|
||||
match (lower, upper) {
|
||||
(Some(l), Some(u)) => {
|
||||
match expire_time_window_bound {
|
||||
Some((Some(l), Some(u))) => {
|
||||
let window_size = u.sub(&l).with_context(|| UnexpectedSnafu {
|
||||
reason: format!("Can't get window size from {u:?} - {l:?}"),
|
||||
})?;
|
||||
let col_name = self
|
||||
.time_window_expr
|
||||
.as_ref()
|
||||
.map(|expr| expr.column_name.clone())
|
||||
.with_context(|| UnexpectedSnafu {
|
||||
reason: format!(
|
||||
"Flow id={:?}, Failed to get column name from time window expr",
|
||||
self.flow_id
|
||||
),
|
||||
})?;
|
||||
|
||||
self.state
|
||||
.write()
|
||||
.await
|
||||
.dirty_time_windows
|
||||
.gen_filter_exprs(&col_name, lower, window_size)?
|
||||
.gen_filter_exprs(&col_name, Some(l), window_size, self)?
|
||||
}
|
||||
_ => {
|
||||
warn!(
|
||||
"Flow id = {:?}, can't get window size: lower={lower:?}, upper={upper:?}, using the same query", self.flow_id
|
||||
debug!(
|
||||
"Flow id = {:?}, can't get window size: precise_lower_bound={expire_time_window_bound:?}, using the same query", self.flow_id
|
||||
);
|
||||
// since no time window lower/upper bound is found, just return the original query
|
||||
return Ok(Some(self.query.clone()));
|
||||
@@ -530,19 +544,28 @@ impl DirtyTimeWindows {
|
||||
col_name: &str,
|
||||
expire_lower_bound: Option<Timestamp>,
|
||||
window_size: chrono::Duration,
|
||||
task_ctx: &RecordingRuleTask,
|
||||
) -> Result<Option<datafusion_expr::Expr>, Error> {
|
||||
debug!(
|
||||
"expire_lower_bound: {:?}, window_size: {:?}",
|
||||
expire_lower_bound.map(|t| t.to_iso8601_string()),
|
||||
window_size
|
||||
);
|
||||
self.merge_dirty_time_windows(window_size)?;
|
||||
self.merge_dirty_time_windows(window_size, expire_lower_bound)?;
|
||||
|
||||
if self.windows.len() > Self::MAX_FILTER_NUM {
|
||||
let first_time_window = self.windows.first_key_value();
|
||||
let last_time_window = self.windows.last_key_value();
|
||||
warn!(
|
||||
"Too many time windows: {}, only the first {} are taken for this query, the group by expression might be wrong",
|
||||
"Flow id = {:?}, too many time windows: {}, only the first {} are taken for this query, the group by expression might be wrong. Time window expr={:?}, expire_after={:?}, first_time_window={:?}, last_time_window={:?}, the original query: {:?}",
|
||||
task_ctx.flow_id,
|
||||
self.windows.len(),
|
||||
Self::MAX_FILTER_NUM
|
||||
Self::MAX_FILTER_NUM,
|
||||
task_ctx.time_window_expr,
|
||||
task_ctx.expire_after,
|
||||
first_time_window,
|
||||
last_time_window,
|
||||
task_ctx.query
|
||||
);
|
||||
}
|
||||
|
||||
@@ -570,18 +593,6 @@ impl DirtyTimeWindows {
|
||||
start.to_iso8601_string(),
|
||||
end.map(|t| t.to_iso8601_string())
|
||||
);
|
||||
let start = if let Some(expire) = expire_lower_bound {
|
||||
start.max(expire)
|
||||
} else {
|
||||
start
|
||||
};
|
||||
|
||||
// filter out expired time window
|
||||
if let Some(end) = end {
|
||||
if end <= start {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
use datafusion_expr::{col, lit};
|
||||
let lower = to_df_literal(start)?;
|
||||
@@ -600,11 +611,22 @@ impl DirtyTimeWindows {
|
||||
}
|
||||
|
||||
/// Merge time windows that overlaps or get too close
|
||||
pub fn merge_dirty_time_windows(&mut self, window_size: chrono::Duration) -> Result<(), Error> {
|
||||
pub fn merge_dirty_time_windows(
|
||||
&mut self,
|
||||
window_size: chrono::Duration,
|
||||
expire_lower_bound: Option<Timestamp>,
|
||||
) -> Result<(), Error> {
|
||||
let mut new_windows = BTreeMap::new();
|
||||
|
||||
let mut prev_tw = None;
|
||||
for (lower_bound, upper_bound) in std::mem::take(&mut self.windows) {
|
||||
// filter out expired time window
|
||||
if let Some(expire_lower_bound) = expire_lower_bound {
|
||||
if lower_bound <= expire_lower_bound {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
let Some(prev_tw) = &mut prev_tw else {
|
||||
prev_tw = Some((lower_bound, upper_bound));
|
||||
continue;
|
||||
@@ -705,7 +727,7 @@ mod test {
|
||||
.into_iter(),
|
||||
);
|
||||
dirty
|
||||
.merge_dirty_time_windows(chrono::Duration::seconds(5 * 60))
|
||||
.merge_dirty_time_windows(chrono::Duration::seconds(5 * 60), None)
|
||||
.unwrap();
|
||||
// just enough to merge
|
||||
assert_eq!(
|
||||
@@ -728,7 +750,7 @@ mod test {
|
||||
.into_iter(),
|
||||
);
|
||||
dirty
|
||||
.merge_dirty_time_windows(chrono::Duration::seconds(5 * 60))
|
||||
.merge_dirty_time_windows(chrono::Duration::seconds(5 * 60), None)
|
||||
.unwrap();
|
||||
// just enough to merge
|
||||
assert_eq!(
|
||||
@@ -757,7 +779,7 @@ mod test {
|
||||
.into_iter(),
|
||||
);
|
||||
dirty
|
||||
.merge_dirty_time_windows(chrono::Duration::seconds(5 * 60))
|
||||
.merge_dirty_time_windows(chrono::Duration::seconds(5 * 60), None)
|
||||
.unwrap();
|
||||
// just enough to merge
|
||||
assert_eq!(
|
||||
@@ -769,5 +791,25 @@ mod test {
|
||||
),]),
|
||||
dirty.windows
|
||||
);
|
||||
|
||||
// expired
|
||||
let mut dirty = DirtyTimeWindows::default();
|
||||
dirty.add_lower_bounds(
|
||||
vec![
|
||||
Timestamp::new_second(0),
|
||||
Timestamp::new_second((DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
|
||||
]
|
||||
.into_iter(),
|
||||
);
|
||||
dirty
|
||||
.merge_dirty_time_windows(
|
||||
chrono::Duration::seconds(5 * 60),
|
||||
Some(Timestamp::new_second(
|
||||
(DirtyTimeWindows::MERGE_DIST as i64) * 6 * 60,
|
||||
)),
|
||||
)
|
||||
.unwrap();
|
||||
// just enough to merge
|
||||
assert_eq!(BTreeMap::from([]), dirty.windows);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -43,6 +43,7 @@ fn client_from_urls(addrs: Vec<String>) -> Client {
|
||||
pub enum FrontendClient {
|
||||
Distributed {
|
||||
meta_client: Arc<MetaClient>,
|
||||
channel_mgr: ChannelManager,
|
||||
},
|
||||
Standalone {
|
||||
/// for the sake of simplicity still use grpc even in standalone mode
|
||||
@@ -66,7 +67,10 @@ impl DatabaseWithPeer {
|
||||
|
||||
impl FrontendClient {
|
||||
pub fn from_meta_client(meta_client: Arc<MetaClient>) -> Self {
|
||||
Self::Distributed { meta_client }
|
||||
Self::Distributed {
|
||||
meta_client,
|
||||
channel_mgr: default_channel_mgr(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_static_grpc_addr(addr: String) -> Self {
|
||||
@@ -75,7 +79,8 @@ impl FrontendClient {
|
||||
addr: addr.clone(),
|
||||
};
|
||||
|
||||
let client = client_from_urls(vec![addr]);
|
||||
let mgr = default_channel_mgr();
|
||||
let client = Client::with_manager_and_urls(mgr.clone(), vec![addr]);
|
||||
let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
|
||||
Self::Standalone {
|
||||
database_client: DatabaseWithPeer::new(database, peer),
|
||||
@@ -119,32 +124,40 @@ impl FrontendClient {
|
||||
if let Self::Standalone { database_client } = self {
|
||||
return Ok(database_client.clone());
|
||||
}
|
||||
|
||||
let frontends = self.scan_for_frontend().await?;
|
||||
let mut last_activity_ts = i64::MIN;
|
||||
let mut peer = None;
|
||||
for (_key, val) in frontends.iter() {
|
||||
if val.last_activity_ts > last_activity_ts {
|
||||
last_activity_ts = val.last_activity_ts;
|
||||
peer = Some(val.peer.clone());
|
||||
match &self {
|
||||
Self::Standalone { database_client } => Ok(database_client.clone()),
|
||||
Self::Distributed {
|
||||
meta_client: _,
|
||||
channel_mgr,
|
||||
} => {
|
||||
let frontends = self.scan_for_frontend().await?;
|
||||
let mut last_activity_ts = i64::MIN;
|
||||
let mut peer = None;
|
||||
for (_key, val) in frontends.iter() {
|
||||
if val.last_activity_ts > last_activity_ts {
|
||||
last_activity_ts = val.last_activity_ts;
|
||||
peer = Some(val.peer.clone());
|
||||
}
|
||||
}
|
||||
let Some(peer) = peer else {
|
||||
UnexpectedSnafu {
|
||||
reason: format!("No frontend available: {:?}", frontends),
|
||||
}
|
||||
.fail()?
|
||||
};
|
||||
let client =
|
||||
Client::with_manager_and_urls(channel_mgr.clone(), vec![peer.addr.clone()]);
|
||||
let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
|
||||
Ok(DatabaseWithPeer::new(database, peer))
|
||||
}
|
||||
}
|
||||
let Some(peer) = peer else {
|
||||
UnexpectedSnafu {
|
||||
reason: format!("No frontend available: {:?}", frontends),
|
||||
}
|
||||
.fail()?
|
||||
};
|
||||
let client = client_from_urls(vec![peer.addr.clone()]);
|
||||
let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
|
||||
Ok(DatabaseWithPeer::new(database, peer))
|
||||
}
|
||||
|
||||
/// Get a database client, and possibly update it before returning.
|
||||
pub async fn get_database_client(&self) -> Result<DatabaseWithPeer, Error> {
|
||||
match self {
|
||||
Self::Standalone { database_client } => Ok(database_client.clone()),
|
||||
Self::Distributed { meta_client: _ } => self.get_last_active_frontend().await,
|
||||
Self::Distributed { meta_client: _, .. } => self.get_last_active_frontend().await,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -115,6 +115,37 @@ pub fn create_test_query_engine() -> Arc<dyn QueryEngine> {
|
||||
};
|
||||
catalog_list.register_table_sync(req_with_ts).unwrap();
|
||||
|
||||
let schema = vec![
|
||||
datatypes::schema::ColumnSchema::new("NUMBER", CDT::uint32_datatype(), false),
|
||||
datatypes::schema::ColumnSchema::new("ts", CDT::timestamp_millisecond_datatype(), false)
|
||||
.with_time_index(true),
|
||||
];
|
||||
let mut columns = vec![];
|
||||
let numbers = (1..=10).collect_vec();
|
||||
let column: VectorRef = Arc::new(<u32 as Scalar>::VectorType::from_vec(numbers));
|
||||
columns.push(column);
|
||||
|
||||
let ts = (1..=10).collect_vec();
|
||||
let mut builder = TimestampMillisecondVectorBuilder::with_capacity(10);
|
||||
ts.into_iter()
|
||||
.map(|v| builder.push(Some(TimestampMillisecond::new(v))))
|
||||
.count();
|
||||
let column: VectorRef = builder.to_vector_cloned();
|
||||
columns.push(column);
|
||||
|
||||
let schema = Arc::new(Schema::new(schema));
|
||||
let recordbatch = common_recordbatch::RecordBatch::new(schema, columns).unwrap();
|
||||
let table = MemTable::table("UPPERCASE_NUMBERS_WITH_TS", recordbatch);
|
||||
|
||||
let req_with_ts = RegisterTableRequest {
|
||||
catalog: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: "UPPERCASE_NUMBERS_WITH_TS".to_string(),
|
||||
table_id: 1025,
|
||||
table,
|
||||
};
|
||||
catalog_list.register_table_sync(req_with_ts).unwrap();
|
||||
|
||||
let factory = query::QueryEngineFactory::new(catalog_list, None, None, None, None, false);
|
||||
|
||||
let engine = factory.query_engine();
|
||||
|
||||
@@ -7,6 +7,7 @@ license.workspace = true
|
||||
[features]
|
||||
mock = []
|
||||
pg_kvbackend = ["dep:tokio-postgres", "common-meta/pg_kvbackend"]
|
||||
mysql_kvbackend = [] # placeholder features so CI can compile
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
Reference in New Issue
Block a user