mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-07 05:42:57 +00:00
perf: change current schema and catalog to borrow, clone only necessary (#2116)
perf: change current schema and catalog to borrow, clone only when necessary Co-authored-by: gongzhengyang <gongzhengyang@bolean.com.cn>
This commit is contained in:
@@ -45,8 +45,8 @@ impl DfTableSourceProvider {
|
||||
catalog_manager,
|
||||
disallow_cross_schema_query,
|
||||
resolved_tables: HashMap::new(),
|
||||
default_catalog: query_ctx.current_catalog(),
|
||||
default_schema: query_ctx.current_schema(),
|
||||
default_catalog: query_ctx.current_catalog().to_owned(),
|
||||
default_schema: query_ctx.current_schema().to_owned(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -65,8 +65,8 @@ impl Instance {
|
||||
ctx: QueryContextRef,
|
||||
) -> Result<Output> {
|
||||
let catalog_list = new_dummy_catalog_list(
|
||||
&ctx.current_catalog(),
|
||||
&ctx.current_schema(),
|
||||
ctx.current_catalog(),
|
||||
ctx.current_schema(),
|
||||
self.catalog_manager.clone(),
|
||||
)
|
||||
.await?;
|
||||
@@ -75,8 +75,8 @@ impl Instance {
|
||||
.decode(
|
||||
plan_bytes.as_slice(),
|
||||
Arc::new(catalog_list) as Arc<_>,
|
||||
&ctx.current_catalog(),
|
||||
&ctx.current_schema(),
|
||||
ctx.current_catalog(),
|
||||
ctx.current_schema(),
|
||||
)
|
||||
.await
|
||||
.context(DecodeLogicalPlanSnafu)?;
|
||||
@@ -131,8 +131,8 @@ impl Instance {
|
||||
) -> Result<Output> {
|
||||
let results = future::try_join_all(requests.inserts.into_iter().map(|insert| {
|
||||
let catalog_manager = self.catalog_manager.clone();
|
||||
let catalog = ctx.current_catalog();
|
||||
let schema = ctx.current_schema();
|
||||
let catalog = ctx.current_catalog().to_owned();
|
||||
let schema = ctx.current_schema().to_owned();
|
||||
|
||||
common_runtime::spawn_write(async move {
|
||||
let table_name = &insert.table_name.clone();
|
||||
@@ -163,8 +163,8 @@ impl Instance {
|
||||
}
|
||||
|
||||
async fn handle_delete(&self, request: DeleteRequest, ctx: QueryContextRef) -> Result<Output> {
|
||||
let catalog = &ctx.current_catalog();
|
||||
let schema = &ctx.current_schema();
|
||||
let catalog = ctx.current_catalog();
|
||||
let schema = ctx.current_schema();
|
||||
let table_name = &request.table_name.clone();
|
||||
let table_ref = TableReference::full(catalog, schema, table_name);
|
||||
|
||||
|
||||
@@ -233,12 +233,12 @@ pub fn table_idents_to_full_name(
|
||||
) -> Result<(String, String, String)> {
|
||||
match &obj_name.0[..] {
|
||||
[table] => Ok((
|
||||
query_ctx.current_catalog(),
|
||||
query_ctx.current_schema(),
|
||||
query_ctx.current_catalog().to_owned(),
|
||||
query_ctx.current_schema().to_owned(),
|
||||
table.value.clone(),
|
||||
)),
|
||||
[schema, table] => Ok((
|
||||
query_ctx.current_catalog(),
|
||||
query_ctx.current_catalog().to_owned(),
|
||||
schema.value.clone(),
|
||||
table.value.clone(),
|
||||
)),
|
||||
@@ -260,7 +260,10 @@ pub fn idents_to_full_database_name(
|
||||
query_ctx: &QueryContextRef,
|
||||
) -> Result<(String, String)> {
|
||||
match &obj_name.0[..] {
|
||||
[database] => Ok((query_ctx.current_catalog(), database.value.clone())),
|
||||
[database] => Ok((
|
||||
query_ctx.current_catalog().to_owned(),
|
||||
database.value.clone(),
|
||||
)),
|
||||
[catalog, database] => Ok((catalog.value.clone(), database.value.clone())),
|
||||
_ => error::InvalidSqlSnafu {
|
||||
msg: format!(
|
||||
|
||||
@@ -48,7 +48,7 @@ impl SqlHandler {
|
||||
let schema = req.db_name;
|
||||
if self
|
||||
.catalog_manager
|
||||
.schema_exist(&catalog, &schema)
|
||||
.schema_exist(catalog, &schema)
|
||||
.await
|
||||
.context(CatalogSnafu)?
|
||||
{
|
||||
@@ -60,7 +60,7 @@ impl SqlHandler {
|
||||
}
|
||||
|
||||
let reg_req = RegisterSchemaRequest {
|
||||
catalog,
|
||||
catalog: catalog.to_owned(),
|
||||
schema: schema.clone(),
|
||||
};
|
||||
let _ = self
|
||||
|
||||
@@ -317,8 +317,8 @@ impl Instance {
|
||||
ctx: QueryContextRef,
|
||||
request: &InsertRequest,
|
||||
) -> Result<()> {
|
||||
let catalog_name = &ctx.current_catalog();
|
||||
let schema_name = &ctx.current_schema();
|
||||
let catalog_name = &ctx.current_catalog().to_owned();
|
||||
let schema_name = &ctx.current_schema().to_owned();
|
||||
let table_name = &request.table_name;
|
||||
let columns = &request.columns;
|
||||
|
||||
@@ -374,8 +374,8 @@ impl Instance {
|
||||
columns: &[Column],
|
||||
engine: &str,
|
||||
) -> Result<Output> {
|
||||
let catalog_name = &ctx.current_catalog();
|
||||
let schema_name = &ctx.current_schema();
|
||||
let catalog_name = ctx.current_catalog();
|
||||
let schema_name = ctx.current_schema();
|
||||
|
||||
// Create table automatically, build schema from data.
|
||||
let create_expr = self
|
||||
@@ -409,8 +409,8 @@ impl Instance {
|
||||
add_columns, table_name
|
||||
);
|
||||
let expr = AlterExpr {
|
||||
catalog_name: ctx.current_catalog(),
|
||||
schema_name: ctx.current_schema(),
|
||||
catalog_name: ctx.current_catalog().to_owned(),
|
||||
schema_name: ctx.current_schema().to_owned(),
|
||||
table_name: table_name.to_string(),
|
||||
kind: Some(Kind::AddColumns(add_columns)),
|
||||
..Default::default()
|
||||
@@ -643,7 +643,7 @@ pub fn check_permission(
|
||||
}
|
||||
Statement::ShowTables(stmt) => {
|
||||
if let Some(database) = &stmt.database {
|
||||
validate_catalog_and_schema(&query_ctx.current_catalog(), database, query_ctx)
|
||||
validate_catalog_and_schema(query_ctx.current_catalog(), database, query_ctx)
|
||||
.map_err(BoxedError::new)
|
||||
.context(SqlExecInterceptedSnafu)?;
|
||||
}
|
||||
|
||||
@@ -449,7 +449,7 @@ impl DistInstance {
|
||||
let catalog = query_ctx.current_catalog();
|
||||
if self
|
||||
.catalog_manager
|
||||
.schema_exist(&catalog, &expr.database_name)
|
||||
.schema_exist(catalog, &expr.database_name)
|
||||
.await
|
||||
.context(CatalogSnafu)?
|
||||
{
|
||||
@@ -464,7 +464,7 @@ impl DistInstance {
|
||||
}
|
||||
|
||||
let key = SchemaKey {
|
||||
catalog_name: catalog.clone(),
|
||||
catalog_name: catalog.to_owned(),
|
||||
schema_name: expr.database_name.clone(),
|
||||
};
|
||||
let value = SchemaValue {};
|
||||
@@ -494,7 +494,7 @@ impl DistInstance {
|
||||
//
|
||||
// TODO(fys): when the meta invalidation cache mechanism is established, remove it.
|
||||
self.catalog_manager()
|
||||
.invalidate_schema(&catalog, &expr.database_name)
|
||||
.invalidate_schema(catalog, &expr.database_name)
|
||||
.await;
|
||||
|
||||
Ok(Output::AffectedRows(1))
|
||||
@@ -621,8 +621,8 @@ impl DistInstance {
|
||||
ctx: QueryContextRef,
|
||||
) -> Result<Output> {
|
||||
let inserter = DistInserter::new(
|
||||
ctx.current_catalog(),
|
||||
ctx.current_schema(),
|
||||
ctx.current_catalog().to_owned(),
|
||||
ctx.current_schema().to_owned(),
|
||||
self.catalog_manager.clone(),
|
||||
);
|
||||
let affected_rows = inserter.grpc_insert(requests).await?;
|
||||
@@ -634,8 +634,8 @@ impl DistInstance {
|
||||
request: DeleteRequest,
|
||||
ctx: QueryContextRef,
|
||||
) -> Result<Output> {
|
||||
let catalog = &ctx.current_catalog();
|
||||
let schema = &ctx.current_schema();
|
||||
let catalog = ctx.current_catalog();
|
||||
let schema = ctx.current_schema();
|
||||
let table_name = &request.table_name;
|
||||
let table_ref = TableReference::full(catalog, schema, table_name);
|
||||
|
||||
|
||||
@@ -132,7 +132,7 @@ impl Instance {
|
||||
let table_name = prom_store::table_name(query)?;
|
||||
|
||||
let output = self
|
||||
.handle_remote_query(&ctx, &catalog_name, &schema_name, &table_name, query)
|
||||
.handle_remote_query(&ctx, catalog_name, schema_name, &table_name, query)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.with_context(|_| error::ExecuteQuerySnafu {
|
||||
|
||||
@@ -109,9 +109,9 @@ impl DatafusionQueryEngine {
|
||||
}
|
||||
);
|
||||
|
||||
let default_catalog = query_ctx.current_catalog();
|
||||
let default_schema = query_ctx.current_schema();
|
||||
let table_name = dml.table_name.resolve(&default_catalog, &default_schema);
|
||||
let default_catalog = &query_ctx.current_catalog().to_owned();
|
||||
let default_schema = &query_ctx.current_schema().to_owned();
|
||||
let table_name = dml.table_name.resolve(default_catalog, default_schema);
|
||||
let table = self.find_table(&table_name).await?;
|
||||
|
||||
let output = self
|
||||
|
||||
@@ -99,7 +99,7 @@ pub async fn show_databases(
|
||||
query_ctx: QueryContextRef,
|
||||
) -> Result<Output> {
|
||||
let mut databases = catalog_manager
|
||||
.schema_names(&query_ctx.current_catalog())
|
||||
.schema_names(query_ctx.current_catalog())
|
||||
.await
|
||||
.context(error::CatalogSnafu)?;
|
||||
|
||||
@@ -143,11 +143,11 @@ pub async fn show_tables(
|
||||
let schema = if let Some(database) = stmt.database {
|
||||
database
|
||||
} else {
|
||||
query_ctx.current_schema()
|
||||
query_ctx.current_schema().to_owned()
|
||||
};
|
||||
// TODO(sunng87): move this function into query_ctx
|
||||
let mut tables = catalog_manager
|
||||
.table_names(&query_ctx.current_catalog(), &schema)
|
||||
.table_names(query_ctx.current_catalog(), &schema)
|
||||
.await
|
||||
.context(error::CatalogSnafu)?;
|
||||
|
||||
|
||||
@@ -129,8 +129,8 @@ impl GreptimeRequestHandler {
|
||||
.auth(
|
||||
Identity::UserId(&username, None),
|
||||
Password::PlainText(password.into()),
|
||||
&query_ctx.current_catalog(),
|
||||
&query_ctx.current_schema(),
|
||||
query_ctx.current_catalog(),
|
||||
query_ctx.current_schema(),
|
||||
)
|
||||
.await
|
||||
.context(AuthSnafu),
|
||||
|
||||
@@ -286,7 +286,7 @@ fn check_others(query: &str, query_ctx: QueryContextRef) -> Option<Output> {
|
||||
Some(select_function("version()", &get_version()))
|
||||
} else if SELECT_DATABASE_PATTERN.is_match(query) {
|
||||
let schema = query_ctx.current_schema();
|
||||
Some(select_function("database()", &schema))
|
||||
Some(select_function("database()", schema))
|
||||
} else if SELECT_TIME_DIFF_FUNC_PATTERN.is_match(query) {
|
||||
Some(select_function(
|
||||
"TIMEDIFF(NOW(), UTC_TIMESTAMP())",
|
||||
|
||||
@@ -57,7 +57,10 @@ impl InfluxdbLineProtocolHandler for DummyInstance {
|
||||
async fn exec(&self, request: &InfluxdbRequest, ctx: QueryContextRef) -> Result<()> {
|
||||
let requests: InsertRequests = request.try_into()?;
|
||||
for expr in requests.inserts {
|
||||
let _ = self.tx.send((ctx.current_schema(), expr.table_name)).await;
|
||||
let _ = self
|
||||
.tx
|
||||
.send((ctx.current_schema().to_owned(), expr.table_name))
|
||||
.await;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -59,7 +59,7 @@ impl PromStoreProtocolHandler for DummyInstance {
|
||||
async fn write(&self, request: WriteRequest, ctx: QueryContextRef) -> Result<()> {
|
||||
let _ = self
|
||||
.tx
|
||||
.send((ctx.current_schema(), request.encode_to_vec()))
|
||||
.send((ctx.current_schema().to_owned(), request.encode_to_vec()))
|
||||
.await;
|
||||
|
||||
Ok(())
|
||||
@@ -67,7 +67,7 @@ impl PromStoreProtocolHandler for DummyInstance {
|
||||
async fn read(&self, request: ReadRequest, ctx: QueryContextRef) -> Result<PromStoreResponse> {
|
||||
let _ = self
|
||||
.tx
|
||||
.send((ctx.current_schema(), request.encode_to_vec()))
|
||||
.send((ctx.current_schema().to_owned(), request.encode_to_vec()))
|
||||
.await;
|
||||
|
||||
let response = ReadResponse {
|
||||
|
||||
@@ -79,13 +79,13 @@ impl QueryContext {
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn current_schema(&self) -> String {
|
||||
self.current_schema.clone()
|
||||
pub fn current_schema(&self) -> &str {
|
||||
&self.current_schema
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn current_catalog(&self) -> String {
|
||||
self.current_catalog.clone()
|
||||
pub fn current_catalog(&self) -> &str {
|
||||
&self.current_catalog
|
||||
}
|
||||
|
||||
#[inline]
|
||||
@@ -96,7 +96,7 @@ impl QueryContext {
|
||||
pub fn get_db_string(&self) -> String {
|
||||
let catalog = self.current_catalog();
|
||||
let schema = self.current_schema();
|
||||
build_db_string(&catalog, &schema)
|
||||
build_db_string(catalog, schema)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
|
||||
Reference in New Issue
Block a user