mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-04 12:22:55 +00:00
chore: start plugins during standalone startup & comply with current catalog while changing database (#3282)
* chore: start plugins in standalone * chore: respect current catalog in use statement for mysql * chore: reduce unnecessory convert to string * chore: reduce duplicate code
This commit is contained in:
@@ -213,6 +213,10 @@ impl App for Instance {
|
||||
.await
|
||||
.context(StartWalOptionsAllocatorSnafu)?;
|
||||
|
||||
plugins::start_frontend_plugins(self.frontend.plugins().clone())
|
||||
.await
|
||||
.context(StartFrontendSnafu)?;
|
||||
|
||||
self.frontend.start().await.context(StartFrontendSnafu)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -56,11 +56,22 @@ pub fn build_db_string(catalog: &str, schema: &str) -> String {
|
||||
/// - if `[<catalog>-]` is provided, we split database name with `-` and use
|
||||
/// `<catalog>` and `<schema>`.
|
||||
pub fn parse_catalog_and_schema_from_db_string(db: &str) -> (&str, &str) {
|
||||
match parse_optional_catalog_and_schema_from_db_string(db) {
|
||||
(Some(catalog), schema) => (catalog, schema),
|
||||
(None, schema) => (DEFAULT_CATALOG_NAME, schema),
|
||||
}
|
||||
}
|
||||
|
||||
/// Attempt to parse catalog and schema from given database name
|
||||
///
|
||||
/// Similar to [`parse_catalog_and_schema_from_db_string`] but returns an optional
|
||||
/// catalog if it's not provided in the database name.
|
||||
pub fn parse_optional_catalog_and_schema_from_db_string(db: &str) -> (Option<&str>, &str) {
|
||||
let parts = db.splitn(2, '-').collect::<Vec<&str>>();
|
||||
if parts.len() == 2 {
|
||||
(parts[0], parts[1])
|
||||
(Some(parts[0]), parts[1])
|
||||
} else {
|
||||
(DEFAULT_CATALOG_NAME, db)
|
||||
(None, db)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -90,5 +101,20 @@ mod tests {
|
||||
("catalog", "schema1-schema2"),
|
||||
parse_catalog_and_schema_from_db_string("catalog-schema1-schema2")
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
(None, "fullschema"),
|
||||
parse_optional_catalog_and_schema_from_db_string("fullschema")
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
(Some("catalog"), "schema"),
|
||||
parse_optional_catalog_and_schema_from_db_string("catalog-schema")
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
(Some("catalog"), "schema1-schema2"),
|
||||
parse_optional_catalog_and_schema_from_db_string("catalog-schema1-schema2")
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,7 +21,7 @@ use std::time::Duration;
|
||||
use ::auth::{Identity, Password, UserProviderRef};
|
||||
use async_trait::async_trait;
|
||||
use chrono::{NaiveDate, NaiveDateTime};
|
||||
use common_catalog::parse_catalog_and_schema_from_db_string;
|
||||
use common_catalog::parse_optional_catalog_and_schema_from_db_string;
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_query::Output;
|
||||
use common_telemetry::{debug, error, logging, tracing, warn};
|
||||
@@ -351,9 +351,14 @@ impl<W: AsyncWrite + Send + Sync + Unpin> AsyncMysqlShim<W> for MysqlInstanceShi
|
||||
}
|
||||
|
||||
async fn on_init<'a>(&'a mut self, database: &'a str, w: InitWriter<'a, W>) -> Result<()> {
|
||||
let (catalog, schema) = parse_catalog_and_schema_from_db_string(database);
|
||||
let (catalog_from_db, schema) = parse_optional_catalog_and_schema_from_db_string(database);
|
||||
let catalog = if let Some(catalog) = catalog_from_db {
|
||||
catalog.to_owned()
|
||||
} else {
|
||||
self.session.get_catalog()
|
||||
};
|
||||
|
||||
if !self.query_handler.is_valid_schema(catalog, schema).await? {
|
||||
if !self.query_handler.is_valid_schema(&catalog, schema).await? {
|
||||
return w
|
||||
.error(
|
||||
ErrorKind::ER_WRONG_DB_NAME,
|
||||
@@ -366,7 +371,10 @@ impl<W: AsyncWrite + Send + Sync + Unpin> AsyncMysqlShim<W> for MysqlInstanceShi
|
||||
let user_info = &self.session.user_info();
|
||||
|
||||
if let Some(schema_validator) = &self.user_provider {
|
||||
if let Err(e) = schema_validator.authorize(catalog, schema, user_info).await {
|
||||
if let Err(e) = schema_validator
|
||||
.authorize(&catalog, schema, user_info)
|
||||
.await
|
||||
{
|
||||
METRIC_AUTH_FAILURE
|
||||
.with_label_values(&[e.status_code().as_ref()])
|
||||
.inc();
|
||||
@@ -380,7 +388,9 @@ impl<W: AsyncWrite + Send + Sync + Unpin> AsyncMysqlShim<W> for MysqlInstanceShi
|
||||
}
|
||||
}
|
||||
|
||||
self.session.set_catalog(catalog.into());
|
||||
if catalog_from_db.is_some() {
|
||||
self.session.set_catalog(catalog)
|
||||
}
|
||||
self.session.set_schema(schema.into());
|
||||
|
||||
w.ok().await.map_err(|e| e.into())
|
||||
|
||||
@@ -98,6 +98,11 @@ impl Session {
|
||||
self.catalog.store(Arc::new(catalog));
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn get_catalog(&self) -> String {
|
||||
self.catalog.load().as_ref().clone()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn set_schema(&self, schema: String) {
|
||||
self.schema.store(Arc::new(schema));
|
||||
|
||||
Reference in New Issue
Block a user