chore: reduce the number of requests for meta (#1647)

This commit is contained in:
fys
2023-05-26 17:25:18 +08:00
committed by GitHub
parent 89366ba939
commit f0a519b71b
2 changed files with 42 additions and 10 deletions

View File

@@ -94,6 +94,18 @@ impl FrontendCatalogManager {
self.datanode_clients.clone()
}
pub async fn invalidate_schema(&self, catalog: &str, schema: &str) {
let schema_key = SchemaKey {
catalog_name: catalog.into(),
schema_name: schema.into(),
}
.to_string();
let key = schema_key.as_bytes();
self.backend_cache_invalidtor.invalidate_key(key).await;
}
pub async fn invalidate_table(&self, catalog: &str, schema: &str, table: &str) {
let tg_key = TableGlobalKey {
catalog_name: catalog.into(),
@@ -263,6 +275,7 @@ impl CatalogManager for FrontendCatalogManager {
catalog_name: catalog.to_string(),
}
.to_string();
Ok(self.backend.get(key.as_bytes()).await?.map(|_| {
Arc::new(FrontendCatalogProvider {
catalog_name: catalog.to_string(),
@@ -340,18 +353,27 @@ impl CatalogProvider for FrontendCatalogProvider {
}
async fn schema(&self, name: &str) -> catalog::error::Result<Option<SchemaProviderRef>> {
let all_schemas = self.schema_names().await?;
if all_schemas.contains(&name.to_string()) {
Ok(Some(Arc::new(FrontendSchemaProvider {
catalog_name: self.catalog_name.clone(),
let catalog = &self.catalog_name;
let schema_key = SchemaKey {
catalog_name: catalog.clone(),
schema_name: name.to_string(),
}
.to_string();
let val = self.backend.get(schema_key.as_bytes()).await?;
let provider = val.map(|_| {
Arc::new(FrontendSchemaProvider {
catalog_name: catalog.clone(),
schema_name: name.to_string(),
backend: self.backend.clone(),
partition_manager: self.partition_manager.clone(),
datanode_clients: self.datanode_clients.clone(),
})))
} else {
Ok(None)
}
}) as Arc<dyn SchemaProvider>
});
Ok(provider)
}
}

View File

@@ -463,8 +463,8 @@ impl DistInstance {
}
let key = SchemaKey {
catalog_name: catalog,
schema_name: expr.database_name,
catalog_name: catalog.clone(),
schema_name: expr.database_name.clone(),
};
let value = SchemaValue {};
let client = self
@@ -475,10 +475,12 @@ impl DistInstance {
let request = CompareAndPutRequest::new()
.with_key(key.to_string())
.with_value(value.as_bytes().context(CatalogEntrySerdeSnafu)?);
let response = client
.compare_and_put(request.into())
.await
.context(RequestMetaSnafu)?;
ensure!(
response.success,
SchemaExistsSnafu {
@@ -486,6 +488,14 @@ impl DistInstance {
}
);
// Since the database created on meta does not go through KvBackend, so we manually
// invalidate the cache here.
//
// TODO(fys): when the meta invalidation cache mechanism is established, remove it.
self.catalog_manager()
.invalidate_schema(&catalog, &expr.database_name)
.await;
Ok(Output::AffectedRows(1))
}