diff --git a/src/frontend/src/catalog.rs b/src/frontend/src/catalog.rs index 58a58d62a1..8abeb3f96e 100644 --- a/src/frontend/src/catalog.rs +++ b/src/frontend/src/catalog.rs @@ -94,6 +94,18 @@ impl FrontendCatalogManager { self.datanode_clients.clone() } + pub async fn invalidate_schema(&self, catalog: &str, schema: &str) { + let schema_key = SchemaKey { + catalog_name: catalog.into(), + schema_name: schema.into(), + } + .to_string(); + + let key = schema_key.as_bytes(); + + self.backend_cache_invalidtor.invalidate_key(key).await; + } + pub async fn invalidate_table(&self, catalog: &str, schema: &str, table: &str) { let tg_key = TableGlobalKey { catalog_name: catalog.into(), @@ -263,6 +275,7 @@ impl CatalogManager for FrontendCatalogManager { catalog_name: catalog.to_string(), } .to_string(); + Ok(self.backend.get(key.as_bytes()).await?.map(|_| { Arc::new(FrontendCatalogProvider { catalog_name: catalog.to_string(), @@ -340,18 +353,27 @@ impl CatalogProvider for FrontendCatalogProvider { } async fn schema(&self, name: &str) -> catalog::error::Result> { - let all_schemas = self.schema_names().await?; - if all_schemas.contains(&name.to_string()) { - Ok(Some(Arc::new(FrontendSchemaProvider { - catalog_name: self.catalog_name.clone(), + let catalog = &self.catalog_name; + + let schema_key = SchemaKey { + catalog_name: catalog.clone(), + schema_name: name.to_string(), + } + .to_string(); + + let val = self.backend.get(schema_key.as_bytes()).await?; + + let provider = val.map(|_| { + Arc::new(FrontendSchemaProvider { + catalog_name: catalog.clone(), schema_name: name.to_string(), backend: self.backend.clone(), partition_manager: self.partition_manager.clone(), datanode_clients: self.datanode_clients.clone(), - }))) - } else { - Ok(None) - } + }) as Arc + }); + + Ok(provider) } } diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index fbeecdb5b4..8d5cb9289d 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -463,8 +463,8 @@ impl DistInstance { } let key = SchemaKey { - catalog_name: catalog, - schema_name: expr.database_name, + catalog_name: catalog.clone(), + schema_name: expr.database_name.clone(), }; let value = SchemaValue {}; let client = self @@ -475,10 +475,12 @@ impl DistInstance { let request = CompareAndPutRequest::new() .with_key(key.to_string()) .with_value(value.as_bytes().context(CatalogEntrySerdeSnafu)?); + let response = client .compare_and_put(request.into()) .await .context(RequestMetaSnafu)?; + ensure!( response.success, SchemaExistsSnafu { @@ -486,6 +488,14 @@ impl DistInstance { } ); + // Since the database created on meta does not go through KvBackend, so we manually + // invalidate the cache here. + // + // TODO(fys): when the meta invalidation cache mechanism is established, remove it. + self.catalog_manager() + .invalidate_schema(&catalog, &expr.database_name) + .await; + Ok(Output::AffectedRows(1)) }