mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-05 21:02:58 +00:00
refactor: passing QueryContext to RegionServer (#3829)
* refactor: passing QueryContext to RegionServer * refactor: change the return type of build() in QueryContextBuilder * fix: update greptime-proto reference * chore: apply suggestion * chore: revert the last commit --------- Co-authored-by: dennis zhuang <killme2008@gmail.com>
This commit is contained in:
632
Cargo.lock
generated
632
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -116,7 +116,7 @@ etcd-client = { git = "https://github.com/MichaelScofield/etcd-client.git", rev
|
||||
fst = "0.4.7"
|
||||
futures = "0.3"
|
||||
futures-util = "0.3"
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "f699e240f7a6c83f139dabac8669714f08513120" }
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a191edaea1089362a86ebc7d8e98ee9a1bd522d1" }
|
||||
humantime = "2.1"
|
||||
humantime-serde = "1.1"
|
||||
itertools = "0.10"
|
||||
|
||||
@@ -161,7 +161,10 @@ impl Repl {
|
||||
let start = Instant::now();
|
||||
|
||||
let output = if let Some(query_engine) = &self.query_engine {
|
||||
let query_ctx = QueryContext::with(self.database.catalog(), self.database.schema());
|
||||
let query_ctx = Arc::new(QueryContext::with(
|
||||
self.database.catalog(),
|
||||
self.database.schema(),
|
||||
));
|
||||
|
||||
let stmt = QueryLanguageParser::parse_sql(&sql, &query_ctx)
|
||||
.with_context(|_| ParseSqlSnafu { sql: sql.clone() })?;
|
||||
|
||||
@@ -35,7 +35,7 @@ impl FunctionContext {
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
pub fn mock() -> Self {
|
||||
Self {
|
||||
query_ctx: QueryContextBuilder::default().build(),
|
||||
query_ctx: QueryContextBuilder::default().build().into(),
|
||||
state: Arc::new(FunctionState::mock()),
|
||||
}
|
||||
}
|
||||
@@ -44,7 +44,7 @@ impl FunctionContext {
|
||||
impl Default for FunctionContext {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
query_ctx: QueryContextBuilder::default().build(),
|
||||
query_ctx: QueryContextBuilder::default().build().into(),
|
||||
state: Arc::new(FunctionState::default()),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -89,7 +89,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_create_udf() {
|
||||
let f = Arc::new(TestAndFunction);
|
||||
let query_ctx = QueryContextBuilder::default().build();
|
||||
let query_ctx = QueryContextBuilder::default().build().into();
|
||||
|
||||
let args: Vec<VectorRef> = vec![
|
||||
Arc::new(ConstantVector::new(
|
||||
|
||||
@@ -79,7 +79,8 @@ mod tests {
|
||||
|
||||
let query_ctx = QueryContextBuilder::default()
|
||||
.current_schema("test_db".to_string())
|
||||
.build();
|
||||
.build()
|
||||
.into();
|
||||
|
||||
let func_ctx = FunctionContext {
|
||||
query_ctx,
|
||||
|
||||
@@ -77,7 +77,7 @@ mod tests {
|
||||
} if valid_types == vec![]
|
||||
));
|
||||
|
||||
let query_ctx = QueryContextBuilder::default().build();
|
||||
let query_ctx = QueryContextBuilder::default().build().into();
|
||||
|
||||
let func_ctx = FunctionContext {
|
||||
query_ctx,
|
||||
|
||||
@@ -613,7 +613,7 @@ impl RegionServerInner {
|
||||
let ctx: QueryContextRef = header
|
||||
.as_ref()
|
||||
.map(|h| Arc::new(h.into()))
|
||||
.unwrap_or_else(|| QueryContextBuilder::default().build());
|
||||
.unwrap_or_else(|| QueryContextBuilder::default().build().into());
|
||||
|
||||
// build dummy catalog list
|
||||
let region_status = self
|
||||
|
||||
@@ -658,7 +658,8 @@ mod tests {
|
||||
// query context with timezone `+08:00`
|
||||
let ctx = QueryContextBuilder::default()
|
||||
.timezone(Timezone::from_tz_string("+08:00").unwrap().into())
|
||||
.build();
|
||||
.build()
|
||||
.into();
|
||||
let expr = create_to_expr(&create_table, ctx).unwrap();
|
||||
let ts_column = &expr.column_defs[1];
|
||||
let constraint = assert_ts_column(ts_column);
|
||||
@@ -712,7 +713,8 @@ mod tests {
|
||||
// query context with timezone `+08:00`
|
||||
let ctx = QueryContextBuilder::default()
|
||||
.timezone(Timezone::from_tz_string("+08:00").unwrap().into())
|
||||
.build();
|
||||
.build()
|
||||
.into();
|
||||
let expr = to_alter_expr(alter_table, ctx).unwrap();
|
||||
let kind = expr.kind.unwrap();
|
||||
|
||||
|
||||
@@ -422,7 +422,8 @@ mod tests {
|
||||
fn check_timestamp_range((start, end): (&str, &str)) -> error::Result<Option<TimestampRange>> {
|
||||
let query_ctx = QueryContextBuilder::default()
|
||||
.timezone(Arc::new(Timezone::from_tz_string("Asia/Shanghai").unwrap()))
|
||||
.build();
|
||||
.build()
|
||||
.into();
|
||||
let map = OptionMap::from(
|
||||
[
|
||||
(COPY_DATABASE_TIME_START_KEY.to_string(), start.to_string()),
|
||||
|
||||
@@ -1089,7 +1089,7 @@ ENGINE=mito",
|
||||
r#"[{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"hz\"}}","{\"Value\":{\"Int32\":10}}"]},{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"sh\"}}","{\"Value\":{\"Int32\":20}}"]},{"column_list":["b","a"],"value_list":["\"MaxValue\"","\"MaxValue\""]}]"#,
|
||||
),
|
||||
];
|
||||
let ctx = QueryContextBuilder::default().build();
|
||||
let ctx = QueryContextBuilder::default().build().into();
|
||||
for (sql, expected) in cases {
|
||||
let result = ParserContext::create_with_dialect(
|
||||
sql,
|
||||
|
||||
@@ -41,7 +41,7 @@ use datafusion_expr::{Extension, LogicalPlan, UserDefinedLogicalNodeCore};
|
||||
use datafusion_physical_expr::EquivalenceProperties;
|
||||
use datatypes::schema::{Schema, SchemaRef};
|
||||
use futures_util::StreamExt;
|
||||
use greptime_proto::v1::region::{QueryRequest, RegionRequestHeader};
|
||||
use greptime_proto::v1::region::{QueryContext, QueryRequest, RegionRequestHeader};
|
||||
use meter_core::data::ReadItem;
|
||||
use meter_macros::read_meter;
|
||||
use session::context::QueryContextRef;
|
||||
@@ -179,7 +179,10 @@ impl MergeScanExec {
|
||||
|
||||
let dbname = context.task_id().unwrap_or_default();
|
||||
let tracing_context = TracingContext::from_json(context.session_id().as_str());
|
||||
let tz = self.query_ctx.timezone().to_string();
|
||||
let current_catalog = self.query_ctx.current_catalog().to_string();
|
||||
let current_schema = self.query_ctx.current_schema().to_string();
|
||||
let timezone = self.query_ctx.timezone().to_string();
|
||||
let extensions = self.query_ctx.extensions();
|
||||
|
||||
let stream = Box::pin(stream!({
|
||||
MERGE_SCAN_REGIONS.observe(regions.len() as f64);
|
||||
@@ -192,7 +195,12 @@ impl MergeScanExec {
|
||||
header: Some(RegionRequestHeader {
|
||||
tracing_context: tracing_context.to_w3c(),
|
||||
dbname: dbname.clone(),
|
||||
timezone: tz.clone(),
|
||||
query_context: Some(QueryContext {
|
||||
current_catalog: current_catalog.clone(),
|
||||
current_schema: current_schema.clone(),
|
||||
timezone: timezone.clone(),
|
||||
extensions: extensions.clone(),
|
||||
}),
|
||||
}),
|
||||
region_id: region_id.into(),
|
||||
plan: substrait_plan.clone(),
|
||||
|
||||
@@ -42,13 +42,15 @@ pub fn validate_catalog_and_schema(
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use session::context::QueryContext;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_validate_catalog_and_schema() {
|
||||
let context = QueryContext::with("greptime", "public");
|
||||
let context = Arc::new(QueryContext::with("greptime", "public"));
|
||||
|
||||
validate_catalog_and_schema("greptime", "public", &context).unwrap();
|
||||
let re = validate_catalog_and_schema("greptime", "private_schema", &context);
|
||||
|
||||
@@ -944,9 +944,11 @@ mod test {
|
||||
let stmt = ShowVariables {
|
||||
variable: ObjectName(vec![Ident::new(variable)]),
|
||||
};
|
||||
let ctx = QueryContextBuilder::default()
|
||||
.timezone(Arc::new(Timezone::from_tz_string(tz).unwrap()))
|
||||
.build();
|
||||
let ctx = Arc::new(
|
||||
QueryContextBuilder::default()
|
||||
.timezone(Arc::new(Timezone::from_tz_string(tz).unwrap()))
|
||||
.build(),
|
||||
);
|
||||
match show_variable(stmt, ctx) {
|
||||
Ok(Output {
|
||||
data: OutputData::RecordBatches(record),
|
||||
|
||||
@@ -411,7 +411,7 @@ impl PyQueryEngine {
|
||||
let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
|
||||
let handle = rt.handle().clone();
|
||||
let res = handle.block_on(async {
|
||||
let ctx = QueryContextBuilder::default().build();
|
||||
let ctx = Arc::new(QueryContextBuilder::default().build());
|
||||
let plan = engine
|
||||
.planner()
|
||||
.plan(stmt, ctx.clone())
|
||||
|
||||
@@ -339,6 +339,7 @@ fn query_ctx(table_info: &TableInfo) -> QueryContextRef {
|
||||
.current_catalog(table_info.catalog_name.to_string())
|
||||
.current_schema(table_info.schema_name.to_string())
|
||||
.build()
|
||||
.into()
|
||||
}
|
||||
|
||||
/// Builds scripts schema, returns (time index, primary keys, column defs)
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use axum::http::HeaderValue;
|
||||
@@ -247,7 +248,7 @@ pub async fn write_system_metric_by_handler(
|
||||
);
|
||||
// Pass the first tick. Because the first tick completes immediately.
|
||||
interval.tick().await;
|
||||
let ctx = QueryContextBuilder::default().current_schema(db).build();
|
||||
let ctx = Arc::new(QueryContextBuilder::default().current_schema(db).build());
|
||||
loop {
|
||||
interval.tick().await;
|
||||
let metric_families = prometheus::gather();
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
use std::pin::Pin;
|
||||
use std::result::Result as StdResult;
|
||||
use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use auth::UserProviderRef;
|
||||
@@ -104,7 +105,7 @@ async fn do_auth<T>(
|
||||
) -> Result<(), tonic::Status> {
|
||||
let (catalog, schema) = extract_catalog_and_schema(req);
|
||||
|
||||
let query_ctx = QueryContext::with(&catalog, &schema);
|
||||
let query_ctx = Arc::new(QueryContext::with(&catalog, &schema));
|
||||
|
||||
let Some(user_provider) = user_provider else {
|
||||
query_ctx.set_current_user(Some(auth::userinfo_by_name(None)));
|
||||
|
||||
@@ -190,6 +190,7 @@ pub(crate) fn create_query_context(header: Option<&RequestHeader>) -> QueryConte
|
||||
.current_schema(schema)
|
||||
.timezone(Arc::new(timezone))
|
||||
.build()
|
||||
.into()
|
||||
}
|
||||
|
||||
/// Histogram timer for handling gRPC request.
|
||||
|
||||
@@ -68,7 +68,7 @@ pub async fn inner_auth<B>(
|
||||
.current_schema(schema.clone())
|
||||
.timezone(timezone);
|
||||
|
||||
let query_ctx = query_ctx_builder.build();
|
||||
let query_ctx = Arc::new(query_ctx_builder.build());
|
||||
let need_auth = need_auth(&req);
|
||||
|
||||
// 2. check if auth is needed
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
//! prom supply the prometheus HTTP API Server compliance
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
|
||||
use axum::extract::{Path, Query, State};
|
||||
use axum::{Extension, Form};
|
||||
@@ -572,10 +573,12 @@ pub(crate) fn try_update_catalog_schema(
|
||||
schema: &str,
|
||||
) -> QueryContextRef {
|
||||
if ctx.current_catalog() != catalog || ctx.current_schema() != schema {
|
||||
QueryContextBuilder::from_existing(&ctx)
|
||||
.current_catalog(catalog.to_string())
|
||||
.current_schema(schema.to_string())
|
||||
.build()
|
||||
Arc::new(
|
||||
QueryContextBuilder::from_existing(&ctx)
|
||||
.current_catalog(catalog.to_string())
|
||||
.current_schema(schema.to_string())
|
||||
.build(),
|
||||
)
|
||||
} else {
|
||||
ctx
|
||||
}
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
use axum::extract::{Query, RawBody, State};
|
||||
@@ -76,7 +77,7 @@ pub async fn scripts(
|
||||
unwrap_or_json_err!(String::from_utf8(bytes.to_vec()).context(InvalidUtf8ValueSnafu));
|
||||
|
||||
// Safety: schema and name are already checked above.
|
||||
let query_ctx = QueryContext::with(&catalog, schema.unwrap());
|
||||
let query_ctx = Arc::new(QueryContext::with(&catalog, schema.unwrap()));
|
||||
match script_handler
|
||||
.insert_script(query_ctx, name.unwrap(), &script)
|
||||
.await
|
||||
@@ -128,7 +129,7 @@ pub async fn run_script(
|
||||
}
|
||||
|
||||
// Safety: schema and name are already checked above.
|
||||
let query_ctx = QueryContext::with(&catalog, schema.unwrap());
|
||||
let query_ctx = Arc::new(QueryContext::with(&catalog, schema.unwrap()));
|
||||
let output = script_handler
|
||||
.execute_script(query_ctx, name.unwrap(), params.params)
|
||||
.await;
|
||||
|
||||
@@ -807,7 +807,8 @@ mod test {
|
||||
];
|
||||
let query_context = QueryContextBuilder::default()
|
||||
.configuration_parameter(Default::default())
|
||||
.build();
|
||||
.build()
|
||||
.into();
|
||||
let mut builder = DataRowEncoder::new(Arc::new(schema));
|
||||
for i in values.iter() {
|
||||
encode_value(&query_context, i, &mut builder).unwrap();
|
||||
|
||||
@@ -56,10 +56,12 @@ def hello() -> vector[str]:
|
||||
|
||||
let table = MemTable::table("scripts", recordbatch);
|
||||
|
||||
let query_ctx = QueryContextBuilder::default()
|
||||
.current_catalog(catalog.to_string())
|
||||
.current_schema(schema.to_string())
|
||||
.build();
|
||||
let query_ctx = Arc::new(
|
||||
QueryContextBuilder::default()
|
||||
.current_catalog(catalog.to_string())
|
||||
.current_schema(schema.to_string())
|
||||
.build(),
|
||||
);
|
||||
|
||||
let instance = create_testing_instance(table);
|
||||
instance
|
||||
|
||||
@@ -44,7 +44,7 @@ pub struct QueryContext {
|
||||
timezone: ArcSwap<Timezone>,
|
||||
sql_dialect: Arc<dyn Dialect + Send + Sync>,
|
||||
#[builder(default)]
|
||||
extension: HashMap<String, String>,
|
||||
extensions: HashMap<String, String>,
|
||||
// The configuration parameter are used to store the parameters that are set by the user
|
||||
#[builder(default)]
|
||||
configuration_parameter: Arc<ConfigurationVariables>,
|
||||
@@ -76,7 +76,7 @@ impl Clone for QueryContext {
|
||||
current_user: self.current_user.load().clone().into(),
|
||||
timezone: self.timezone.load().clone().into(),
|
||||
sql_dialect: self.sql_dialect.clone(),
|
||||
extension: self.extension.clone(),
|
||||
extensions: self.extensions.clone(),
|
||||
configuration_parameter: self.configuration_parameter.clone(),
|
||||
}
|
||||
}
|
||||
@@ -84,32 +84,31 @@ impl Clone for QueryContext {
|
||||
|
||||
impl From<&RegionRequestHeader> for QueryContext {
|
||||
fn from(value: &RegionRequestHeader) -> Self {
|
||||
let (catalog, schema) = parse_catalog_and_schema_from_db_string(&value.dbname);
|
||||
QueryContext {
|
||||
current_catalog: catalog.to_string(),
|
||||
current_schema: schema.to_string(),
|
||||
current_user: Default::default(),
|
||||
timezone: ArcSwap::new(Arc::new(parse_timezone(Some(&value.timezone)))),
|
||||
sql_dialect: Arc::new(GreptimeDbDialect {}),
|
||||
extension: Default::default(),
|
||||
configuration_parameter: Default::default(),
|
||||
let mut builder = QueryContextBuilder::default();
|
||||
if let Some(ctx) = &value.query_context {
|
||||
builder = builder
|
||||
.current_catalog(ctx.current_catalog.clone())
|
||||
.current_schema(ctx.current_schema.clone())
|
||||
.timezone(Arc::new(parse_timezone(Some(&ctx.timezone))))
|
||||
.extensions(ctx.extensions.clone());
|
||||
}
|
||||
builder.build()
|
||||
}
|
||||
}
|
||||
|
||||
impl QueryContext {
|
||||
pub fn arc() -> QueryContextRef {
|
||||
QueryContextBuilder::default().build()
|
||||
Arc::new(QueryContextBuilder::default().build())
|
||||
}
|
||||
|
||||
pub fn with(catalog: &str, schema: &str) -> QueryContextRef {
|
||||
pub fn with(catalog: &str, schema: &str) -> QueryContext {
|
||||
QueryContextBuilder::default()
|
||||
.current_catalog(catalog.to_string())
|
||||
.current_schema(schema.to_string())
|
||||
.build()
|
||||
}
|
||||
|
||||
pub fn with_db_name(db_name: Option<&str>) -> QueryContextRef {
|
||||
pub fn with_db_name(db_name: Option<&str>) -> QueryContext {
|
||||
let (catalog, schema) = db_name
|
||||
.map(|db| {
|
||||
let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
|
||||
@@ -162,11 +161,15 @@ impl QueryContext {
|
||||
}
|
||||
|
||||
pub fn set_extension<S1: Into<String>, S2: Into<String>>(&mut self, key: S1, value: S2) {
|
||||
self.extension.insert(key.into(), value.into());
|
||||
self.extensions.insert(key.into(), value.into());
|
||||
}
|
||||
|
||||
pub fn extension<S: AsRef<str>>(&self, key: S) -> Option<&str> {
|
||||
self.extension.get(key.as_ref()).map(|v| v.as_str())
|
||||
self.extensions.get(key.as_ref()).map(|v| v.as_str())
|
||||
}
|
||||
|
||||
pub fn extensions(&self) -> HashMap<String, String> {
|
||||
self.extensions.clone()
|
||||
}
|
||||
|
||||
/// SQL like `set variable` may change timezone or other info in `QueryContext`.
|
||||
@@ -195,8 +198,8 @@ impl QueryContext {
|
||||
}
|
||||
|
||||
impl QueryContextBuilder {
|
||||
pub fn build(self) -> QueryContextRef {
|
||||
Arc::new(QueryContext {
|
||||
pub fn build(self) -> QueryContext {
|
||||
QueryContext {
|
||||
current_catalog: self
|
||||
.current_catalog
|
||||
.unwrap_or_else(|| DEFAULT_CATALOG_NAME.to_string()),
|
||||
@@ -212,13 +215,13 @@ impl QueryContextBuilder {
|
||||
sql_dialect: self
|
||||
.sql_dialect
|
||||
.unwrap_or_else(|| Arc::new(GreptimeDbDialect {})),
|
||||
extension: self.extension.unwrap_or_default(),
|
||||
extensions: self.extensions.unwrap_or_default(),
|
||||
configuration_parameter: self.configuration_parameter.unwrap_or_default(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_extension(mut self, key: String, value: String) -> Self {
|
||||
self.extension
|
||||
self.extensions
|
||||
.get_or_insert_with(HashMap::new)
|
||||
.insert(key, value);
|
||||
self
|
||||
@@ -231,7 +234,7 @@ impl QueryContextBuilder {
|
||||
current_user: Some(context.current_user.load().clone().into()),
|
||||
timezone: Some(context.timezone.load().clone().into()),
|
||||
sql_dialect: Some(context.sql_dialect.clone()),
|
||||
extension: Some(context.extension.clone()),
|
||||
extensions: Some(context.extensions.clone()),
|
||||
configuration_parameter: Some(context.configuration_parameter.clone()),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -70,6 +70,7 @@ impl Session {
|
||||
.configuration_parameter(self.configuration_variables.clone())
|
||||
.timezone(self.timezone())
|
||||
.build()
|
||||
.into()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
|
||||
@@ -52,7 +52,7 @@ mod test {
|
||||
async fn test_otlp(instance: &Arc<Instance>) {
|
||||
let req = build_request();
|
||||
let db = "otlp";
|
||||
let ctx = QueryContext::with(DEFAULT_CATALOG_NAME, db);
|
||||
let ctx = Arc::new(QueryContext::with(DEFAULT_CATALOG_NAME, db));
|
||||
|
||||
assert!(SqlQueryHandler::do_query(
|
||||
instance.as_ref(),
|
||||
|
||||
@@ -90,7 +90,7 @@ mod tests {
|
||||
};
|
||||
|
||||
let db = "prometheus";
|
||||
let mut ctx = Arc::into_inner(QueryContext::with(DEFAULT_CATALOG_NAME, db)).unwrap();
|
||||
let mut ctx = Arc::into_inner(QueryContext::with(DEFAULT_CATALOG_NAME, db).into()).unwrap();
|
||||
|
||||
// set physical table if provided
|
||||
if let Some(physical_table) = &physical_table {
|
||||
|
||||
@@ -43,7 +43,7 @@ async fn test_create_database_and_insert_query(
|
||||
let output = execute_sql_with(
|
||||
&instance,
|
||||
"create database test",
|
||||
QueryContext::with(DEFAULT_CATALOG_NAME, "test"),
|
||||
QueryContext::with(DEFAULT_CATALOG_NAME, "test").into(),
|
||||
)
|
||||
.await;
|
||||
assert_matches!(output.data, OutputData::AffectedRows(1));
|
||||
@@ -57,7 +57,7 @@ async fn test_create_database_and_insert_query(
|
||||
ts timestamp,
|
||||
TIME INDEX(ts)
|
||||
)"#,
|
||||
QueryContext::with(DEFAULT_CATALOG_NAME, "test"),
|
||||
QueryContext::with(DEFAULT_CATALOG_NAME, "test").into(),
|
||||
)
|
||||
.await;
|
||||
assert!(matches!(output.data, OutputData::AffectedRows(0)));
|
||||
@@ -68,7 +68,7 @@ async fn test_create_database_and_insert_query(
|
||||
('host1', 66.6, 1024, 1655276557000),
|
||||
('host2', 88.8, 333.3, 1655276558000)
|
||||
"#,
|
||||
QueryContext::with(DEFAULT_CATALOG_NAME, "test"),
|
||||
QueryContext::with(DEFAULT_CATALOG_NAME, "test").into(),
|
||||
)
|
||||
.await;
|
||||
assert!(matches!(output.data, OutputData::AffectedRows(2)));
|
||||
@@ -76,7 +76,7 @@ async fn test_create_database_and_insert_query(
|
||||
let query_output = execute_sql_with(
|
||||
&instance,
|
||||
"select ts from test.demo order by ts limit 1",
|
||||
QueryContext::with(DEFAULT_CATALOG_NAME, "test"),
|
||||
QueryContext::with(DEFAULT_CATALOG_NAME, "test").into(),
|
||||
)
|
||||
.await;
|
||||
match query_output.data {
|
||||
@@ -112,7 +112,7 @@ async fn test_replay(rebuildable_instance: Option<Box<dyn RebuildableMockInstanc
|
||||
let output = execute_sql_with(
|
||||
&instance,
|
||||
"create database test",
|
||||
QueryContext::with(DEFAULT_CATALOG_NAME, "test"),
|
||||
QueryContext::with(DEFAULT_CATALOG_NAME, "test").into(),
|
||||
)
|
||||
.await;
|
||||
assert_matches!(output.data, OutputData::AffectedRows(1));
|
||||
@@ -138,7 +138,7 @@ async fn test_flush_then_replay(rebuildable_instance: Option<Box<dyn Rebuildable
|
||||
let output = execute_sql_with(
|
||||
&instance,
|
||||
"create database test",
|
||||
QueryContext::with(DEFAULT_CATALOG_NAME, "test"),
|
||||
QueryContext::with(DEFAULT_CATALOG_NAME, "test").into(),
|
||||
)
|
||||
.await;
|
||||
assert_matches!(output.data, OutputData::AffectedRows(1));
|
||||
@@ -256,7 +256,7 @@ async fn do_create(instance: &Arc<Instance>, table_name: &str) -> Output {
|
||||
)"#,
|
||||
table_name
|
||||
),
|
||||
QueryContext::with(DEFAULT_CATALOG_NAME, "test"),
|
||||
QueryContext::with(DEFAULT_CATALOG_NAME, "test").into(),
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -266,7 +266,7 @@ async fn do_alter(instance: &Arc<Instance>, table_name: &str) -> Output {
|
||||
execute_sql_with(
|
||||
instance,
|
||||
&format!("alter table {} add column new_col STRING", table_name),
|
||||
QueryContext::with(DEFAULT_CATALOG_NAME, "test"),
|
||||
QueryContext::with(DEFAULT_CATALOG_NAME, "test").into(),
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -276,7 +276,7 @@ async fn do_insert(instance: &Arc<Instance>, table_name: &str, row: String) -> O
|
||||
execute_sql_with(
|
||||
instance,
|
||||
&format!("insert into test.{table_name}(host, cpu, memory, ts) values {row}"),
|
||||
QueryContext::with(DEFAULT_CATALOG_NAME, "test"),
|
||||
QueryContext::with(DEFAULT_CATALOG_NAME, "test").into(),
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -286,7 +286,7 @@ async fn do_query(instance: &Arc<Instance>, table_name: &str) -> Output {
|
||||
execute_sql_with(
|
||||
instance,
|
||||
&format!("select ts from test.{table_name} order by ts"),
|
||||
QueryContext::with(DEFAULT_CATALOG_NAME, "test"),
|
||||
QueryContext::with(DEFAULT_CATALOG_NAME, "test").into(),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -1204,7 +1204,7 @@ async fn test_rename_table(instance: Arc<dyn MockInstance>) {
|
||||
let output = execute_sql(&instance, "create database db").await.data;
|
||||
assert!(matches!(output, OutputData::AffectedRows(1)));
|
||||
|
||||
let query_ctx = QueryContext::with(DEFAULT_CATALOG_NAME, "db");
|
||||
let query_ctx = Arc::new(QueryContext::with(DEFAULT_CATALOG_NAME, "db"));
|
||||
let output = execute_sql_with(
|
||||
&instance,
|
||||
"create table demo(host string, cpu double, memory double, ts timestamp, time index(ts))",
|
||||
@@ -1277,7 +1277,7 @@ async fn test_create_table_after_rename_table(instance: Arc<dyn MockInstance>) {
|
||||
|
||||
// create test table
|
||||
let table_name = "demo";
|
||||
let query_ctx = QueryContext::with(DEFAULT_CATALOG_NAME, "db");
|
||||
let query_ctx = Arc::new(QueryContext::with(DEFAULT_CATALOG_NAME, "db"));
|
||||
let output = execute_sql_with(
|
||||
&instance,
|
||||
&format!("create table {table_name}(host string, cpu double, memory double, ts timestamp, time index(ts))"),
|
||||
@@ -1481,7 +1481,7 @@ async fn test_use_database(instance: Arc<dyn MockInstance>) {
|
||||
let output = execute_sql(&instance, "create database db1").await.data;
|
||||
assert!(matches!(output, OutputData::AffectedRows(1)));
|
||||
|
||||
let query_ctx = QueryContext::with(DEFAULT_CATALOG_NAME, "db1");
|
||||
let query_ctx = Arc::new(QueryContext::with(DEFAULT_CATALOG_NAME, "db1"));
|
||||
let output = execute_sql_with(
|
||||
&instance,
|
||||
"create table tb1(col_i32 int, ts timestamp, TIME INDEX(ts))",
|
||||
@@ -1845,7 +1845,7 @@ async fn test_information_schema_dot_tables(instance: Arc<dyn MockInstance>) {
|
||||
let instance = instance.frontend();
|
||||
|
||||
let sql = "create table another_table(i timestamp time index)";
|
||||
let query_ctx = QueryContext::with("another_catalog", "another_schema");
|
||||
let query_ctx = Arc::new(QueryContext::with("another_catalog", "another_schema"));
|
||||
let output = execute_sql_with(&instance, sql, query_ctx.clone())
|
||||
.await
|
||||
.data;
|
||||
@@ -1885,7 +1885,7 @@ async fn test_information_schema_dot_columns(instance: Arc<dyn MockInstance>) {
|
||||
let instance = instance.frontend();
|
||||
|
||||
let sql = "create table another_table(i timestamp time index)";
|
||||
let query_ctx = QueryContext::with("another_catalog", "another_schema");
|
||||
let query_ctx = Arc::new(QueryContext::with("another_catalog", "another_schema"));
|
||||
let output = execute_sql_with(&instance, sql, query_ctx.clone())
|
||||
.await
|
||||
.data;
|
||||
|
||||
@@ -553,7 +553,7 @@ async fn cross_schema_query(instance: Arc<dyn MockInstance>) {
|
||||
|
||||
ins.do_query(
|
||||
AGGREGATORS_CREATE_TABLE,
|
||||
QueryContext::with_db_name(Some("greptime_private")),
|
||||
QueryContext::with_db_name(Some("greptime_private")).into(),
|
||||
)
|
||||
.await
|
||||
.into_iter()
|
||||
@@ -562,7 +562,7 @@ async fn cross_schema_query(instance: Arc<dyn MockInstance>) {
|
||||
});
|
||||
ins.do_query(
|
||||
AGGREGATORS_INSERT_DATA,
|
||||
QueryContext::with_db_name(Some("greptime_private")),
|
||||
QueryContext::with_db_name(Some("greptime_private")).into(),
|
||||
)
|
||||
.await
|
||||
.into_iter()
|
||||
@@ -617,7 +617,7 @@ async fn cross_schema_query(instance: Arc<dyn MockInstance>) {
|
||||
let query_output = promql_query(
|
||||
ins.clone(),
|
||||
r#"http_requests"#,
|
||||
QueryContext::with_db_name(Some("greptime_private")),
|
||||
QueryContext::with_db_name(Some("greptime_private")).into(),
|
||||
start,
|
||||
end,
|
||||
interval,
|
||||
|
||||
Reference in New Issue
Block a user