mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-03 20:02:54 +00:00
refactor: refactor frontend cache (#3912)
* feat: implement the `TableCache` * refactor: use `TableCache` * refactor: replace `TableFlowManager` with `TableFlownodeSetCache` * refactor: introduce cache crate * chore: add comments * chore: update comments * chore: apply suggestions from CR * chore: rename `cache_invalidator` to `local_cache_invalidator` * chore(fuzz): set `acquire_timeout` to 30s
This commit is contained in:
14
Cargo.lock
generated
14
Cargo.lock
generated
@@ -1218,6 +1218,15 @@ dependencies = [
|
||||
"syn 1.0.109",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "cache"
|
||||
version = "0.7.2"
|
||||
dependencies = [
|
||||
"catalog",
|
||||
"common-meta",
|
||||
"moka",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "cactus"
|
||||
version = "1.0.7"
|
||||
@@ -1592,6 +1601,7 @@ dependencies = [
|
||||
"async-trait",
|
||||
"auth",
|
||||
"base64 0.21.7",
|
||||
"cache",
|
||||
"catalog",
|
||||
"chrono",
|
||||
"clap 4.5.4",
|
||||
@@ -1624,6 +1634,7 @@ dependencies = [
|
||||
"meta-client",
|
||||
"meta-srv",
|
||||
"mito2",
|
||||
"moka",
|
||||
"nu-ansi-term",
|
||||
"plugins",
|
||||
"prometheus",
|
||||
@@ -3581,6 +3592,7 @@ dependencies = [
|
||||
"arc-swap",
|
||||
"async-trait",
|
||||
"auth",
|
||||
"cache",
|
||||
"catalog",
|
||||
"client",
|
||||
"common-base",
|
||||
@@ -10294,6 +10306,7 @@ dependencies = [
|
||||
"async-trait",
|
||||
"auth",
|
||||
"axum",
|
||||
"cache",
|
||||
"catalog",
|
||||
"chrono",
|
||||
"clap 4.5.4",
|
||||
@@ -10323,6 +10336,7 @@ dependencies = [
|
||||
"itertools 0.10.5",
|
||||
"meta-client",
|
||||
"meta-srv",
|
||||
"moka",
|
||||
"mysql_async",
|
||||
"object-store",
|
||||
"opentelemetry-proto 0.5.0",
|
||||
|
||||
@@ -4,6 +4,7 @@ members = [
|
||||
"src/api",
|
||||
"src/auth",
|
||||
"src/catalog",
|
||||
"src/cache",
|
||||
"src/client",
|
||||
"src/cmd",
|
||||
"src/common/base",
|
||||
@@ -173,6 +174,7 @@ zstd = "0.13"
|
||||
## workspaces members
|
||||
api = { path = "src/api" }
|
||||
auth = { path = "src/auth" }
|
||||
cache = { path = "src/cache" }
|
||||
catalog = { path = "src/catalog" }
|
||||
client = { path = "src/client" }
|
||||
cmd = { path = "src/cmd" }
|
||||
|
||||
10
src/cache/Cargo.toml
vendored
Normal file
10
src/cache/Cargo.toml
vendored
Normal file
@@ -0,0 +1,10 @@
|
||||
[package]
|
||||
name = "cache"
|
||||
version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
catalog.workspace = true
|
||||
common-meta.workspace = true
|
||||
moka.workspace = true
|
||||
86
src/cache/src/lib.rs
vendored
Normal file
86
src/cache/src/lib.rs
vendored
Normal file
@@ -0,0 +1,86 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use catalog::kvbackend::new_table_cache;
|
||||
use common_meta::cache::{
|
||||
new_table_flownode_set_cache, new_table_info_cache, new_table_name_cache, CacheRegistryBuilder,
|
||||
};
|
||||
use common_meta::kv_backend::KvBackendRef;
|
||||
use moka::future::CacheBuilder;
|
||||
|
||||
const DEFAULT_CACHE_MAX_CAPACITY: u64 = 65536;
|
||||
const DEFAULT_CACHE_TTL: Duration = Duration::from_secs(10 * 60);
|
||||
const DEFAULT_CACHE_TTI: Duration = Duration::from_secs(5 * 60);
|
||||
|
||||
pub const TABLE_INFO_CACHE_NAME: &str = "table_info_cache";
|
||||
pub const TABLE_NAME_CACHE_NAME: &str = "table_name_cache";
|
||||
pub const TABLE_CACHE_NAME: &str = "table_cache";
|
||||
pub const TABLE_FLOWNODE_SET_CACHE_NAME: &str = "table_flownode_set_cache";
|
||||
|
||||
// TODO(weny): Make the cache configurable.
|
||||
pub fn default_cache_registry_builder(kv_backend: KvBackendRef) -> CacheRegistryBuilder {
|
||||
// Builds table info cache
|
||||
let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY)
|
||||
.time_to_live(DEFAULT_CACHE_TTL)
|
||||
.time_to_idle(DEFAULT_CACHE_TTI)
|
||||
.build();
|
||||
let table_info_cache = Arc::new(new_table_info_cache(
|
||||
TABLE_INFO_CACHE_NAME.to_string(),
|
||||
cache,
|
||||
kv_backend.clone(),
|
||||
));
|
||||
|
||||
// Builds table name cache
|
||||
let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY)
|
||||
.time_to_live(DEFAULT_CACHE_TTL)
|
||||
.time_to_idle(DEFAULT_CACHE_TTI)
|
||||
.build();
|
||||
let table_name_cache = Arc::new(new_table_name_cache(
|
||||
TABLE_NAME_CACHE_NAME.to_string(),
|
||||
cache,
|
||||
kv_backend.clone(),
|
||||
));
|
||||
|
||||
// Builds table cache
|
||||
let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY)
|
||||
.time_to_live(DEFAULT_CACHE_TTL)
|
||||
.time_to_idle(DEFAULT_CACHE_TTI)
|
||||
.build();
|
||||
let table_cache = Arc::new(new_table_cache(
|
||||
TABLE_CACHE_NAME.to_string(),
|
||||
cache,
|
||||
table_info_cache.clone(),
|
||||
table_name_cache.clone(),
|
||||
));
|
||||
|
||||
// Builds table flownode set cache
|
||||
let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY)
|
||||
.time_to_live(DEFAULT_CACHE_TTL)
|
||||
.time_to_idle(DEFAULT_CACHE_TTI)
|
||||
.build();
|
||||
let table_flownode_set_cache = Arc::new(new_table_flownode_set_cache(
|
||||
TABLE_FLOWNODE_SET_CACHE_NAME.to_string(),
|
||||
cache,
|
||||
kv_backend.clone(),
|
||||
));
|
||||
|
||||
CacheRegistryBuilder::default()
|
||||
.add_cache(table_info_cache)
|
||||
.add_cache(table_name_cache)
|
||||
.add_cache(table_cache)
|
||||
.add_cache(table_flownode_set_cache)
|
||||
}
|
||||
@@ -318,15 +318,12 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Get null from table cache, key: {}", key))]
|
||||
TableCacheNotGet {
|
||||
key: String,
|
||||
#[snafu(display("Failed to get table cache"))]
|
||||
GetTableCache {
|
||||
source: common_meta::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to get table cache, err: {}", err_msg))]
|
||||
GetTableCache { err_msg: String },
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -388,7 +385,7 @@ impl ErrorExt for Error {
|
||||
Error::QueryAccessDenied { .. } => StatusCode::AccessDenied,
|
||||
Error::Datafusion { .. } => StatusCode::EngineExecuteQuery,
|
||||
Error::TableMetadataManager { source, .. } => source.status_code(),
|
||||
Error::TableCacheNotGet { .. } | Error::GetTableCache { .. } => StatusCode::Internal,
|
||||
Error::GetTableCache { .. } => StatusCode::Internal,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -16,5 +16,7 @@ pub use client::{CachedMetaKvBackend, CachedMetaKvBackendBuilder, MetaKvBackend}
|
||||
|
||||
mod client;
|
||||
mod manager;
|
||||
mod table_cache;
|
||||
|
||||
pub use manager::KvBackendCatalogManager;
|
||||
pub use table_cache::{new_table_cache, TableCache, TableCacheRef};
|
||||
|
||||
@@ -350,6 +350,13 @@ pub struct MetaKvBackend {
|
||||
pub client: Arc<MetaClient>,
|
||||
}
|
||||
|
||||
impl MetaKvBackend {
|
||||
/// Constructs a [MetaKvBackend].
|
||||
pub fn new(client: Arc<MetaClient>) -> MetaKvBackend {
|
||||
MetaKvBackend { client }
|
||||
}
|
||||
}
|
||||
|
||||
impl TxnService for MetaKvBackend {
|
||||
type Error = Error;
|
||||
}
|
||||
|
||||
@@ -15,27 +15,23 @@
|
||||
use std::any::Any;
|
||||
use std::collections::BTreeSet;
|
||||
use std::sync::{Arc, Weak};
|
||||
use std::time::Duration;
|
||||
|
||||
use async_stream::try_stream;
|
||||
use common_catalog::consts::{
|
||||
DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, NUMBERS_TABLE_ID,
|
||||
};
|
||||
use common_catalog::format_full_table_name;
|
||||
use common_config::Mode;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::cache_invalidator::{CacheInvalidator, Context, MultiCacheInvalidator};
|
||||
use common_meta::instruction::CacheIdent;
|
||||
use common_meta::key::catalog_name::CatalogNameKey;
|
||||
use common_meta::key::schema_name::SchemaNameKey;
|
||||
use common_meta::key::table_info::TableInfoValue;
|
||||
use common_meta::key::table_name::TableNameKey;
|
||||
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
|
||||
use common_meta::kv_backend::KvBackendRef;
|
||||
use common_meta::table_name::TableName;
|
||||
use futures_util::stream::BoxStream;
|
||||
use futures_util::{StreamExt, TryStreamExt};
|
||||
use meta_client::client::MetaClient;
|
||||
use moka::future::{Cache as AsyncCache, CacheBuilder};
|
||||
use moka::sync::Cache;
|
||||
use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
|
||||
use snafu::prelude::*;
|
||||
@@ -43,12 +39,12 @@ use table::dist_table::DistTable;
|
||||
use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
|
||||
use table::TableRef;
|
||||
|
||||
use crate::error::Error::{GetTableCache, TableCacheNotGet};
|
||||
use crate::error::{
|
||||
InvalidTableInfoInCatalogSnafu, ListCatalogsSnafu, ListSchemasSnafu, ListTablesSnafu, Result,
|
||||
TableCacheNotGetSnafu, TableMetadataManagerSnafu,
|
||||
GetTableCacheSnafu, InvalidTableInfoInCatalogSnafu, ListCatalogsSnafu, ListSchemasSnafu,
|
||||
ListTablesSnafu, Result, TableMetadataManagerSnafu,
|
||||
};
|
||||
use crate::information_schema::InformationSchemaProvider;
|
||||
use crate::kvbackend::TableCacheRef;
|
||||
use crate::CatalogManager;
|
||||
|
||||
/// Access all existing catalog, schema and tables.
|
||||
@@ -64,60 +60,18 @@ pub struct KvBackendCatalogManager {
|
||||
table_metadata_manager: TableMetadataManagerRef,
|
||||
/// A sub-CatalogManager that handles system tables
|
||||
system_catalog: SystemCatalog,
|
||||
table_cache: AsyncCache<String, TableRef>,
|
||||
}
|
||||
|
||||
struct TableCacheInvalidator {
|
||||
table_cache: AsyncCache<String, TableRef>,
|
||||
}
|
||||
|
||||
impl TableCacheInvalidator {
|
||||
pub fn new(table_cache: AsyncCache<String, TableRef>) -> Self {
|
||||
Self { table_cache }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl CacheInvalidator for TableCacheInvalidator {
|
||||
async fn invalidate(
|
||||
&self,
|
||||
_ctx: &Context,
|
||||
caches: &[CacheIdent],
|
||||
) -> common_meta::error::Result<()> {
|
||||
for cache in caches {
|
||||
if let CacheIdent::TableName(table_name) = cache {
|
||||
let table_cache_key = format_full_table_name(
|
||||
&table_name.catalog_name,
|
||||
&table_name.schema_name,
|
||||
&table_name.table_name,
|
||||
);
|
||||
self.table_cache.invalidate(&table_cache_key).await;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
table_cache: TableCacheRef,
|
||||
}
|
||||
|
||||
const CATALOG_CACHE_MAX_CAPACITY: u64 = 128;
|
||||
const TABLE_CACHE_MAX_CAPACITY: u64 = 65536;
|
||||
const TABLE_CACHE_TTL: Duration = Duration::from_secs(10 * 60);
|
||||
const TABLE_CACHE_TTI: Duration = Duration::from_secs(5 * 60);
|
||||
|
||||
impl KvBackendCatalogManager {
|
||||
pub async fn new(
|
||||
mode: Mode,
|
||||
meta_client: Option<Arc<MetaClient>>,
|
||||
backend: KvBackendRef,
|
||||
multi_cache_invalidator: Arc<MultiCacheInvalidator>,
|
||||
table_cache: TableCacheRef,
|
||||
) -> Arc<Self> {
|
||||
let table_cache: AsyncCache<String, TableRef> = CacheBuilder::new(TABLE_CACHE_MAX_CAPACITY)
|
||||
.time_to_live(TABLE_CACHE_TTL)
|
||||
.time_to_idle(TABLE_CACHE_TTI)
|
||||
.build();
|
||||
multi_cache_invalidator
|
||||
.add_invalidator(Arc::new(TableCacheInvalidator::new(table_cache.clone())))
|
||||
.await;
|
||||
|
||||
Arc::new_cyclic(|me| Self {
|
||||
mode,
|
||||
meta_client,
|
||||
@@ -245,60 +199,25 @@ impl CatalogManager for KvBackendCatalogManager {
|
||||
|
||||
async fn table(
|
||||
&self,
|
||||
catalog: &str,
|
||||
schema: &str,
|
||||
catalog_name: &str,
|
||||
schema_name: &str,
|
||||
table_name: &str,
|
||||
) -> Result<Option<TableRef>> {
|
||||
if let Some(table) = self.system_catalog.table(catalog, schema, table_name) {
|
||||
if let Some(table) = self
|
||||
.system_catalog
|
||||
.table(catalog_name, schema_name, table_name)
|
||||
{
|
||||
return Ok(Some(table));
|
||||
}
|
||||
|
||||
let init = async {
|
||||
let table_name_key = TableNameKey::new(catalog, schema, table_name);
|
||||
let Some(table_name_value) = self
|
||||
.table_metadata_manager
|
||||
.table_name_manager()
|
||||
.get(table_name_key)
|
||||
.await
|
||||
.context(TableMetadataManagerSnafu)?
|
||||
else {
|
||||
return TableCacheNotGetSnafu {
|
||||
key: table_name_key.to_string(),
|
||||
}
|
||||
.fail();
|
||||
};
|
||||
let table_id = table_name_value.table_id();
|
||||
|
||||
let Some(table_info_value) = self
|
||||
.table_metadata_manager
|
||||
.table_info_manager()
|
||||
.get(table_id)
|
||||
.await
|
||||
.context(TableMetadataManagerSnafu)?
|
||||
.map(|v| v.into_inner())
|
||||
else {
|
||||
return TableCacheNotGetSnafu {
|
||||
key: table_name_key.to_string(),
|
||||
}
|
||||
.fail();
|
||||
};
|
||||
build_table(table_info_value)
|
||||
};
|
||||
|
||||
match self
|
||||
.table_cache
|
||||
.try_get_with_by_ref(&format_full_table_name(catalog, schema, table_name), init)
|
||||
self.table_cache
|
||||
.get_by_ref(&TableName {
|
||||
catalog_name: catalog_name.to_string(),
|
||||
schema_name: schema_name.to_string(),
|
||||
table_name: table_name.to_string(),
|
||||
})
|
||||
.await
|
||||
{
|
||||
Ok(table) => Ok(Some(table)),
|
||||
Err(err) => match err.as_ref() {
|
||||
TableCacheNotGet { .. } => Ok(None),
|
||||
_ => Err(err),
|
||||
},
|
||||
}
|
||||
.map_err(|err| GetTableCache {
|
||||
err_msg: err.to_string(),
|
||||
})
|
||||
.context(GetTableCacheSnafu)
|
||||
}
|
||||
|
||||
fn tables<'a>(&'a self, catalog: &'a str, schema: &'a str) -> BoxStream<'a, Result<TableRef>> {
|
||||
|
||||
80
src/catalog/src/kvbackend/table_cache.rs
Normal file
80
src/catalog/src/kvbackend/table_cache.rs
Normal file
@@ -0,0 +1,80 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_meta::cache::{CacheContainer, Initializer, TableInfoCacheRef, TableNameCacheRef};
|
||||
use common_meta::error::{Result as MetaResult, ValueNotExistSnafu};
|
||||
use common_meta::instruction::CacheIdent;
|
||||
use common_meta::table_name::TableName;
|
||||
use futures::future::BoxFuture;
|
||||
use moka::future::Cache;
|
||||
use snafu::OptionExt;
|
||||
use table::dist_table::DistTable;
|
||||
use table::TableRef;
|
||||
|
||||
pub type TableCacheRef = Arc<TableCache>;
|
||||
|
||||
/// [TableCache] caches the [TableName] to [TableRef] mapping.
|
||||
pub type TableCache = CacheContainer<TableName, TableRef, CacheIdent>;
|
||||
|
||||
/// Constructs a [TableCache].
|
||||
pub fn new_table_cache(
|
||||
name: String,
|
||||
cache: Cache<TableName, TableRef>,
|
||||
table_info_cache: TableInfoCacheRef,
|
||||
table_name_cache: TableNameCacheRef,
|
||||
) -> TableCache {
|
||||
let init = init_factory(table_info_cache, table_name_cache);
|
||||
|
||||
CacheContainer::new(name, cache, Box::new(invalidator), init, Box::new(filter))
|
||||
}
|
||||
|
||||
fn init_factory(
|
||||
table_info_cache: TableInfoCacheRef,
|
||||
table_name_cache: TableNameCacheRef,
|
||||
) -> Initializer<TableName, TableRef> {
|
||||
Arc::new(move |table_name| {
|
||||
let table_info_cache = table_info_cache.clone();
|
||||
let table_name_cache = table_name_cache.clone();
|
||||
Box::pin(async move {
|
||||
let table_id = table_name_cache
|
||||
.get_by_ref(table_name)
|
||||
.await?
|
||||
.context(ValueNotExistSnafu)?;
|
||||
let table_info = table_info_cache
|
||||
.get_by_ref(&table_id)
|
||||
.await?
|
||||
.context(ValueNotExistSnafu)?;
|
||||
|
||||
Ok(Some(DistTable::table(table_info)))
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
fn invalidator<'a>(
|
||||
cache: &'a Cache<TableName, TableRef>,
|
||||
ident: &'a CacheIdent,
|
||||
) -> BoxFuture<'a, MetaResult<()>> {
|
||||
Box::pin(async move {
|
||||
if let CacheIdent::TableName(table_name) = ident {
|
||||
cache.invalidate(table_name).await
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
fn filter(ident: &CacheIdent) -> bool {
|
||||
matches!(ident, CacheIdent::TableName(_))
|
||||
}
|
||||
@@ -19,6 +19,7 @@ workspace = true
|
||||
async-trait.workspace = true
|
||||
auth.workspace = true
|
||||
base64.workspace = true
|
||||
cache.workspace = true
|
||||
catalog.workspace = true
|
||||
chrono.workspace = true
|
||||
clap.workspace = true
|
||||
@@ -52,6 +53,7 @@ lazy_static.workspace = true
|
||||
meta-client.workspace = true
|
||||
meta-srv.workspace = true
|
||||
mito2.workspace = true
|
||||
moka.workspace = true
|
||||
nu-ansi-term = "0.46"
|
||||
plugins.workspace = true
|
||||
prometheus.workspace = true
|
||||
|
||||
@@ -303,6 +303,13 @@ pub enum Error {
|
||||
location: Location,
|
||||
source: common_runtime::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to get cache from cache registry: {}", name))]
|
||||
CacheRequired {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
name: String,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -352,6 +359,8 @@ impl ErrorExt for Error {
|
||||
|
||||
Error::SerdeJson { .. } | Error::FileIo { .. } => StatusCode::Unexpected,
|
||||
|
||||
Error::CacheRequired { .. } => StatusCode::Internal,
|
||||
|
||||
Error::Other { source, .. } => source.status_code(),
|
||||
|
||||
Error::BuildRuntime { source, .. } => source.status_code(),
|
||||
|
||||
@@ -16,10 +16,10 @@ use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use catalog::kvbackend::{CachedMetaKvBackendBuilder, KvBackendCatalogManager};
|
||||
use cache::{default_cache_registry_builder, TABLE_CACHE_NAME};
|
||||
use catalog::kvbackend::{CachedMetaKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
|
||||
use clap::Parser;
|
||||
use client::client_manager::DatanodeClients;
|
||||
use common_meta::cache_invalidator::MultiCacheInvalidator;
|
||||
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
|
||||
use common_meta::heartbeat::handler::HandlerGroupExecutor;
|
||||
use common_telemetry::info;
|
||||
@@ -242,22 +242,27 @@ impl StartCommand {
|
||||
.cache_tti(cache_tti)
|
||||
.build();
|
||||
let cached_meta_backend = Arc::new(cached_meta_backend);
|
||||
let multi_cache_invalidator = Arc::new(MultiCacheInvalidator::with_invalidators(vec![
|
||||
cached_meta_backend.clone(),
|
||||
]));
|
||||
let cache_registry_builder =
|
||||
default_cache_registry_builder(Arc::new(MetaKvBackend::new(meta_client.clone())));
|
||||
let cache_registry = Arc::new(
|
||||
cache_registry_builder
|
||||
.add_cache(cached_meta_backend.clone())
|
||||
.build(),
|
||||
);
|
||||
let table_cache = cache_registry.get().context(error::CacheRequiredSnafu {
|
||||
name: TABLE_CACHE_NAME,
|
||||
})?;
|
||||
let catalog_manager = KvBackendCatalogManager::new(
|
||||
opts.mode,
|
||||
Some(meta_client.clone()),
|
||||
cached_meta_backend.clone(),
|
||||
multi_cache_invalidator.clone(),
|
||||
table_cache,
|
||||
)
|
||||
.await;
|
||||
|
||||
let executor = HandlerGroupExecutor::new(vec![
|
||||
Arc::new(ParseMailboxMessageHandler),
|
||||
Arc::new(InvalidateTableCacheHandler::new(
|
||||
multi_cache_invalidator.clone(),
|
||||
)),
|
||||
Arc::new(InvalidateTableCacheHandler::new(cache_registry.clone())),
|
||||
]);
|
||||
|
||||
let heartbeat_task = HeartbeatTask::new(
|
||||
@@ -269,12 +274,13 @@ impl StartCommand {
|
||||
|
||||
let mut instance = FrontendBuilder::new(
|
||||
cached_meta_backend.clone(),
|
||||
cache_registry.clone(),
|
||||
catalog_manager,
|
||||
Arc::new(DatanodeClients::default()),
|
||||
meta_client,
|
||||
)
|
||||
.with_plugin(plugins.clone())
|
||||
.with_cache_invalidator(multi_cache_invalidator)
|
||||
.with_local_cache_invalidator(cache_registry)
|
||||
.with_heartbeat_task(heartbeat_task)
|
||||
.try_build()
|
||||
.await
|
||||
|
||||
@@ -16,11 +16,12 @@ use std::sync::Arc;
|
||||
use std::{fs, path};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use cache::{default_cache_registry_builder, TABLE_CACHE_NAME};
|
||||
use catalog::kvbackend::KvBackendCatalogManager;
|
||||
use clap::Parser;
|
||||
use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
|
||||
use common_config::{metadata_store_dir, KvBackendConfig};
|
||||
use common_meta::cache_invalidator::{CacheInvalidatorRef, MultiCacheInvalidator};
|
||||
use common_meta::cache_invalidator::CacheInvalidatorRef;
|
||||
use common_meta::ddl::flow_meta::{FlowMetadataAllocator, FlowMetadataAllocatorRef};
|
||||
use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef};
|
||||
use common_meta::ddl::{DdlContext, ProcedureExecutorRef};
|
||||
@@ -55,12 +56,13 @@ use servers::export_metrics::ExportMetricsOption;
|
||||
use servers::http::HttpOptions;
|
||||
use servers::tls::{TlsMode, TlsOption};
|
||||
use servers::Mode;
|
||||
use snafu::ResultExt;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
|
||||
use crate::error::{
|
||||
CreateDirSnafu, IllegalConfigSnafu, InitDdlManagerSnafu, InitMetadataSnafu, InitTimezoneSnafu,
|
||||
Result, ShutdownDatanodeSnafu, ShutdownFrontendSnafu, StartDatanodeSnafu, StartFrontendSnafu,
|
||||
StartProcedureManagerSnafu, StartWalOptionsAllocatorSnafu, StopProcedureManagerSnafu,
|
||||
CacheRequiredSnafu, CreateDirSnafu, IllegalConfigSnafu, InitDdlManagerSnafu, InitMetadataSnafu,
|
||||
InitTimezoneSnafu, Result, ShutdownDatanodeSnafu, ShutdownFrontendSnafu, StartDatanodeSnafu,
|
||||
StartFrontendSnafu, StartProcedureManagerSnafu, StartWalOptionsAllocatorSnafu,
|
||||
StopProcedureManagerSnafu,
|
||||
};
|
||||
use crate::options::{GlobalOptions, Options};
|
||||
use crate::App;
|
||||
@@ -379,14 +381,12 @@ impl StartCommand {
|
||||
.await
|
||||
.context(StartFrontendSnafu)?;
|
||||
|
||||
let multi_cache_invalidator = Arc::new(MultiCacheInvalidator::default());
|
||||
let catalog_manager = KvBackendCatalogManager::new(
|
||||
dn_opts.mode,
|
||||
None,
|
||||
kv_backend.clone(),
|
||||
multi_cache_invalidator.clone(),
|
||||
)
|
||||
.await;
|
||||
let cache_registry = Arc::new(default_cache_registry_builder(kv_backend.clone()).build());
|
||||
let table_cache = cache_registry.get().context(CacheRequiredSnafu {
|
||||
name: TABLE_CACHE_NAME,
|
||||
})?;
|
||||
let catalog_manager =
|
||||
KvBackendCatalogManager::new(dn_opts.mode, None, kv_backend.clone(), table_cache).await;
|
||||
|
||||
let builder =
|
||||
DatanodeBuilder::new(dn_opts, fe_plugins.clone()).with_kv_backend(kv_backend.clone());
|
||||
@@ -424,7 +424,7 @@ impl StartCommand {
|
||||
let ddl_task_executor = Self::create_ddl_task_executor(
|
||||
procedure_manager.clone(),
|
||||
node_manager.clone(),
|
||||
multi_cache_invalidator,
|
||||
cache_registry.clone(),
|
||||
table_metadata_manager,
|
||||
table_meta_allocator,
|
||||
flow_metadata_manager,
|
||||
@@ -432,12 +432,17 @@ impl StartCommand {
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut frontend =
|
||||
FrontendBuilder::new(kv_backend, catalog_manager, node_manager, ddl_task_executor)
|
||||
.with_plugin(fe_plugins.clone())
|
||||
.try_build()
|
||||
.await
|
||||
.context(StartFrontendSnafu)?;
|
||||
let mut frontend = FrontendBuilder::new(
|
||||
kv_backend,
|
||||
cache_registry,
|
||||
catalog_manager,
|
||||
node_manager,
|
||||
ddl_task_executor,
|
||||
)
|
||||
.with_plugin(fe_plugins.clone())
|
||||
.try_build()
|
||||
.await
|
||||
.context(StartFrontendSnafu)?;
|
||||
|
||||
let servers = Services::new(fe_opts.clone(), Arc::new(frontend.clone()), fe_plugins)
|
||||
.build()
|
||||
|
||||
@@ -18,7 +18,7 @@ mod registry;
|
||||
mod table;
|
||||
|
||||
pub use container::{CacheContainer, Initializer, Invalidator, TokenFilter};
|
||||
pub use flow::{new_table_flownode_set_cache, TableFlownodeSetCache};
|
||||
pub use flow::{new_table_flownode_set_cache, TableFlownodeSetCache, TableFlownodeSetCacheRef};
|
||||
pub use registry::{CacheRegistry, CacheRegistryBuilder, CacheRegistryRef};
|
||||
pub use table::{
|
||||
new_table_info_cache, new_table_name_cache, new_table_route_cache, TableInfoCache,
|
||||
|
||||
4
src/common/meta/src/cache/flow.rs
vendored
4
src/common/meta/src/cache/flow.rs
vendored
@@ -13,4 +13,6 @@
|
||||
// limitations under the License.
|
||||
|
||||
mod table_flownode;
|
||||
pub use table_flownode::{new_table_flownode_set_cache, TableFlownodeSetCache};
|
||||
pub use table_flownode::{
|
||||
new_table_flownode_set_cache, TableFlownodeSetCache, TableFlownodeSetCacheRef,
|
||||
};
|
||||
|
||||
@@ -30,6 +30,8 @@ use crate::FlownodeId;
|
||||
|
||||
type FlownodeSet = HashSet<FlownodeId>;
|
||||
|
||||
pub type TableFlownodeSetCacheRef = Arc<TableFlownodeSetCache>;
|
||||
|
||||
/// [TableFlownodeSetCache] caches the [TableId] to [FlownodeSet] mapping.
|
||||
pub type TableFlownodeSetCache = CacheContainer<TableId, FlownodeSet, CacheIdent>;
|
||||
|
||||
@@ -54,6 +56,10 @@ fn init_factory(table_flow_manager: TableFlowManagerRef) -> Initializer<TableId,
|
||||
.map_ok(|key| key.flownode_id())
|
||||
.try_collect::<HashSet<_>>()
|
||||
.await
|
||||
// We must cache the `HashSet` even if it's empty,
|
||||
// to avoid future requests to the remote storage next time;
|
||||
// If the value is added to the remote storage,
|
||||
// we have a corresponding cache invalidation mechanism to invalidate `(Key, EmptyHashSet)`.
|
||||
.map(Some)
|
||||
})
|
||||
})
|
||||
|
||||
2
src/common/meta/src/cache/registry.rs
vendored
2
src/common/meta/src/cache/registry.rs
vendored
@@ -50,7 +50,7 @@ pub struct CacheRegistry {
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl CacheInvalidator for Arc<CacheRegistry> {
|
||||
impl CacheInvalidator for CacheRegistry {
|
||||
async fn invalidate(&self, ctx: &Context, caches: &[CacheIdent]) -> Result<()> {
|
||||
let tasks = self
|
||||
.indexes
|
||||
|
||||
@@ -14,8 +14,6 @@
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::instruction::CacheIdent;
|
||||
use crate::key::schema_name::SchemaNameKey;
|
||||
@@ -61,34 +59,6 @@ impl CacheInvalidator for DummyCacheInvalidator {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct MultiCacheInvalidator {
|
||||
invalidators: RwLock<Vec<CacheInvalidatorRef>>,
|
||||
}
|
||||
|
||||
impl MultiCacheInvalidator {
|
||||
pub fn with_invalidators(invalidators: Vec<CacheInvalidatorRef>) -> Self {
|
||||
Self {
|
||||
invalidators: RwLock::new(invalidators),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn add_invalidator(&self, invalidator: CacheInvalidatorRef) {
|
||||
self.invalidators.write().await.push(invalidator);
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl CacheInvalidator for MultiCacheInvalidator {
|
||||
async fn invalidate(&self, ctx: &Context, caches: &[CacheIdent]) -> Result<()> {
|
||||
let invalidators = self.invalidators.read().await;
|
||||
for invalidator in invalidators.iter() {
|
||||
invalidator.invalidate(ctx, caches).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<T> CacheInvalidator for T
|
||||
where
|
||||
|
||||
@@ -172,20 +172,16 @@ impl AlterTableProcedure {
|
||||
|
||||
/// Broadcasts the invalidating table cache instructions.
|
||||
async fn on_broadcast(&mut self) -> Result<Status> {
|
||||
// Safety: Checked in `AlterTableProcedure::new`.
|
||||
let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
|
||||
let cache_invalidator = &self.context.cache_invalidator;
|
||||
let cache_keys = if matches!(alter_kind, Kind::RenameTable { .. }) {
|
||||
vec![CacheIdent::TableName(self.data.table_ref().into())]
|
||||
} else {
|
||||
vec![
|
||||
CacheIdent::TableId(self.data.table_id()),
|
||||
CacheIdent::TableName(self.data.table_ref().into()),
|
||||
]
|
||||
};
|
||||
|
||||
cache_invalidator
|
||||
.invalidate(&Context::default(), &cache_keys)
|
||||
.invalidate(
|
||||
&Context::default(),
|
||||
&[
|
||||
CacheIdent::TableId(self.data.table_id()),
|
||||
CacheIdent::TableName(self.data.table_ref().into()),
|
||||
],
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(Status::done())
|
||||
|
||||
@@ -17,6 +17,7 @@ api.workspace = true
|
||||
arc-swap = "1.0"
|
||||
async-trait = "0.1"
|
||||
auth.workspace = true
|
||||
cache.workspace = true
|
||||
catalog.workspace = true
|
||||
client.workspace = true
|
||||
common-base.workspace = true
|
||||
|
||||
@@ -346,6 +346,13 @@ pub enum Error {
|
||||
#[snafu(source)]
|
||||
error: toml::ser::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to get cache from cache registry: {}", name))]
|
||||
CacheRequired {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
name: String,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -397,7 +404,8 @@ impl ErrorExt for Error {
|
||||
|
||||
Error::FindDatanode { .. }
|
||||
| Error::VectorToGrpcColumn { .. }
|
||||
| Error::InvalidRegionRequest { .. } => StatusCode::Internal,
|
||||
| Error::InvalidRegionRequest { .. }
|
||||
| Error::CacheRequired { .. } => StatusCode::Internal,
|
||||
|
||||
Error::ContextValueNotFound { .. } => StatusCode::Unexpected,
|
||||
|
||||
|
||||
@@ -14,11 +14,12 @@
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use cache::TABLE_FLOWNODE_SET_CACHE_NAME;
|
||||
use catalog::CatalogManagerRef;
|
||||
use common_base::Plugins;
|
||||
use common_meta::cache::CacheRegistryRef;
|
||||
use common_meta::cache_invalidator::{CacheInvalidatorRef, DummyCacheInvalidator};
|
||||
use common_meta::ddl::ProcedureExecutorRef;
|
||||
use common_meta::key::flow::TableFlowManager;
|
||||
use common_meta::key::TableMetadataManager;
|
||||
use common_meta::kv_backend::KvBackendRef;
|
||||
use common_meta::node_manager::NodeManagerRef;
|
||||
@@ -31,8 +32,9 @@ use operator::table::TableMutationOperator;
|
||||
use partition::manager::PartitionRuleManager;
|
||||
use query::QueryEngineFactory;
|
||||
use servers::server::ServerHandlers;
|
||||
use snafu::OptionExt;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::error::{self, Result};
|
||||
use crate::heartbeat::HeartbeatTask;
|
||||
use crate::instance::region_query::FrontendRegionQueryHandler;
|
||||
use crate::instance::{Instance, StatementExecutorRef};
|
||||
@@ -41,7 +43,8 @@ use crate::script::ScriptExecutor;
|
||||
/// The frontend [`Instance`] builder.
|
||||
pub struct FrontendBuilder {
|
||||
kv_backend: KvBackendRef,
|
||||
cache_invalidator: Option<CacheInvalidatorRef>,
|
||||
cache_registry: CacheRegistryRef,
|
||||
local_cache_invalidator: Option<CacheInvalidatorRef>,
|
||||
catalog_manager: CatalogManagerRef,
|
||||
node_manager: NodeManagerRef,
|
||||
plugins: Option<Plugins>,
|
||||
@@ -52,13 +55,15 @@ pub struct FrontendBuilder {
|
||||
impl FrontendBuilder {
|
||||
pub fn new(
|
||||
kv_backend: KvBackendRef,
|
||||
cache_registry: CacheRegistryRef,
|
||||
catalog_manager: CatalogManagerRef,
|
||||
node_manager: NodeManagerRef,
|
||||
procedure_executor: ProcedureExecutorRef,
|
||||
) -> Self {
|
||||
Self {
|
||||
kv_backend,
|
||||
cache_invalidator: None,
|
||||
cache_registry,
|
||||
local_cache_invalidator: None,
|
||||
catalog_manager,
|
||||
node_manager,
|
||||
plugins: None,
|
||||
@@ -67,9 +72,9 @@ impl FrontendBuilder {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_cache_invalidator(self, cache_invalidator: CacheInvalidatorRef) -> Self {
|
||||
pub fn with_local_cache_invalidator(self, cache_invalidator: CacheInvalidatorRef) -> Self {
|
||||
Self {
|
||||
cache_invalidator: Some(cache_invalidator),
|
||||
local_cache_invalidator: Some(cache_invalidator),
|
||||
..self
|
||||
}
|
||||
}
|
||||
@@ -95,20 +100,24 @@ impl FrontendBuilder {
|
||||
|
||||
let partition_manager = Arc::new(PartitionRuleManager::new(kv_backend.clone()));
|
||||
|
||||
let cache_invalidator = self
|
||||
.cache_invalidator
|
||||
let local_cache_invalidator = self
|
||||
.local_cache_invalidator
|
||||
.unwrap_or_else(|| Arc::new(DummyCacheInvalidator));
|
||||
|
||||
let region_query_handler =
|
||||
FrontendRegionQueryHandler::arc(partition_manager.clone(), node_manager.clone());
|
||||
|
||||
let table_flow_manager = Arc::new(TableFlowManager::new(kv_backend.clone()));
|
||||
|
||||
let table_flownode_cache =
|
||||
self.cache_registry
|
||||
.get()
|
||||
.context(error::CacheRequiredSnafu {
|
||||
name: TABLE_FLOWNODE_SET_CACHE_NAME,
|
||||
})?;
|
||||
let inserter = Arc::new(Inserter::new(
|
||||
self.catalog_manager.clone(),
|
||||
partition_manager.clone(),
|
||||
node_manager.clone(),
|
||||
table_flow_manager,
|
||||
table_flownode_cache,
|
||||
));
|
||||
let deleter = Arc::new(Deleter::new(
|
||||
self.catalog_manager.clone(),
|
||||
@@ -149,7 +158,7 @@ impl FrontendBuilder {
|
||||
query_engine.clone(),
|
||||
self.procedure_executor,
|
||||
kv_backend.clone(),
|
||||
cache_invalidator,
|
||||
local_cache_invalidator,
|
||||
inserter.clone(),
|
||||
));
|
||||
|
||||
|
||||
@@ -26,7 +26,7 @@ use catalog::CatalogManagerRef;
|
||||
use client::{OutputData, OutputMeta};
|
||||
use common_catalog::consts::default_engine;
|
||||
use common_grpc_expr::util::{extract_new_columns, ColumnExpr};
|
||||
use common_meta::key::flow::TableFlowManagerRef;
|
||||
use common_meta::cache::TableFlownodeSetCacheRef;
|
||||
use common_meta::node_manager::{AffectedRows, NodeManagerRef};
|
||||
use common_meta::peer::Peer;
|
||||
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
|
||||
@@ -34,7 +34,7 @@ use common_query::Output;
|
||||
use common_telemetry::tracing_context::TracingContext;
|
||||
use common_telemetry::{error, info, warn};
|
||||
use datatypes::schema::Schema;
|
||||
use futures_util::{future, TryStreamExt};
|
||||
use futures_util::future;
|
||||
use meter_macros::write_meter;
|
||||
use partition::manager::PartitionRuleManagerRef;
|
||||
use session::context::QueryContextRef;
|
||||
@@ -62,7 +62,7 @@ pub struct Inserter {
|
||||
catalog_manager: CatalogManagerRef,
|
||||
partition_manager: PartitionRuleManagerRef,
|
||||
node_manager: NodeManagerRef,
|
||||
table_flow_manager: TableFlowManagerRef,
|
||||
table_flownode_set_cache: TableFlownodeSetCacheRef,
|
||||
}
|
||||
|
||||
pub type InserterRef = Arc<Inserter>;
|
||||
@@ -72,13 +72,13 @@ impl Inserter {
|
||||
catalog_manager: CatalogManagerRef,
|
||||
partition_manager: PartitionRuleManagerRef,
|
||||
node_manager: NodeManagerRef,
|
||||
table_flow_manager: TableFlowManagerRef,
|
||||
table_flownode_set_cache: TableFlownodeSetCacheRef,
|
||||
) -> Self {
|
||||
Self {
|
||||
catalog_manager,
|
||||
partition_manager,
|
||||
node_manager,
|
||||
table_flow_manager,
|
||||
table_flownode_set_cache,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -196,10 +196,10 @@ impl Inserter {
|
||||
impl Inserter {
|
||||
fn post_request(&self, requests: RegionInsertRequests) {
|
||||
let node_manager = self.node_manager.clone();
|
||||
let table_flow_manager = self.table_flow_manager.clone();
|
||||
let table_flownode_set_cache = self.table_flownode_set_cache.clone();
|
||||
// Spawn all tasks that do job for mirror insert requests for flownode
|
||||
common_runtime::spawn_bg(async move {
|
||||
match Self::mirror_flow_node_requests(table_flow_manager, requests).await {
|
||||
match Self::mirror_flow_node_requests(table_flownode_set_cache, requests).await {
|
||||
Ok(flow_tasks) => {
|
||||
let flow_tasks = flow_tasks.into_iter().map(|(peer, inserts)| {
|
||||
let node_manager = node_manager.clone();
|
||||
@@ -272,7 +272,7 @@ impl Inserter {
|
||||
|
||||
/// Mirror requests for source table to flownode
|
||||
async fn mirror_flow_node_requests(
|
||||
table_flow_manager: TableFlowManagerRef,
|
||||
table_flownode_set_cache: TableFlownodeSetCacheRef,
|
||||
requests: RegionInsertRequests,
|
||||
) -> Result<HashMap<Peer, RegionInsertRequests>> {
|
||||
// store partial source table requests used by flow node(only store what's used)
|
||||
@@ -285,17 +285,15 @@ impl Inserter {
|
||||
Some(None) => continue,
|
||||
_ => {
|
||||
let table_id = RegionId::from_u64(req.region_id).table_id();
|
||||
let peers = table_flow_manager
|
||||
.flows(table_id)
|
||||
// TODO(discord9): determine where to store the flow node address in distributed mode
|
||||
.map_ok(|key| Peer::new(key.flownode_id(), ""))
|
||||
.try_collect::<Vec<_>>()
|
||||
// TODO(discord9): determine where to store the flow node address in distributed mode
|
||||
let peers = table_flownode_set_cache
|
||||
.get(table_id)
|
||||
.await
|
||||
.map(|mut v| {
|
||||
v.dedup();
|
||||
v
|
||||
})
|
||||
.context(RequestInsertsSnafu)?;
|
||||
.context(RequestInsertsSnafu)?
|
||||
.unwrap_or_default()
|
||||
.into_iter()
|
||||
.map(|id| Peer::new(id, ""))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
if !peers.is_empty() {
|
||||
let mut reqs = RegionInsertRequests::default();
|
||||
|
||||
@@ -18,6 +18,7 @@ use std::collections::HashMap;
|
||||
use std::fs::create_dir_all;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use common_telemetry::info;
|
||||
use common_telemetry::tracing::warn;
|
||||
@@ -96,6 +97,7 @@ fn generate_create_table_expr<R: Rng + 'static>(rng: &mut R) -> CreateTableExpr
|
||||
async fn connect_mysql(addr: &str) -> Pool<MySql> {
|
||||
loop {
|
||||
match MySqlPoolOptions::new()
|
||||
.acquire_timeout(Duration::from_secs(30))
|
||||
.connect(&format!("mysql://{addr}/public"))
|
||||
.await
|
||||
{
|
||||
|
||||
@@ -17,6 +17,7 @@ async-stream.workspace = true
|
||||
async-trait = "0.1"
|
||||
auth.workspace = true
|
||||
axum.workspace = true
|
||||
cache.workspace = true
|
||||
catalog.workspace = true
|
||||
chrono.workspace = true
|
||||
clap.workspace = true
|
||||
@@ -43,6 +44,7 @@ futures.workspace = true
|
||||
futures-util.workspace = true
|
||||
meta-client.workspace = true
|
||||
meta-srv = { workspace = true, features = ["mock"] }
|
||||
moka.workspace = true
|
||||
mysql_async = { version = "0.33", default-features = false, features = [
|
||||
"default-rustls",
|
||||
] }
|
||||
|
||||
@@ -19,12 +19,12 @@ use std::time::Duration;
|
||||
|
||||
use api::v1::region::region_server::RegionServer;
|
||||
use arrow_flight::flight_service_server::FlightServiceServer;
|
||||
use cache::default_cache_registry_builder;
|
||||
use catalog::kvbackend::{CachedMetaKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
|
||||
use client::client_manager::DatanodeClients;
|
||||
use client::Client;
|
||||
use common_base::Plugins;
|
||||
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
|
||||
use common_meta::cache_invalidator::MultiCacheInvalidator;
|
||||
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
|
||||
use common_meta::heartbeat::handler::HandlerGroupExecutor;
|
||||
use common_meta::kv_backend::chroot::ChrootKvBackend;
|
||||
@@ -346,22 +346,25 @@ impl GreptimeDbClusterBuilder {
|
||||
|
||||
let cached_meta_backend =
|
||||
Arc::new(CachedMetaKvBackendBuilder::new(meta_client.clone()).build());
|
||||
let multi_cache_invalidator = Arc::new(MultiCacheInvalidator::with_invalidators(vec![
|
||||
cached_meta_backend.clone(),
|
||||
]));
|
||||
let cache_registry_builder =
|
||||
default_cache_registry_builder(Arc::new(MetaKvBackend::new(meta_client.clone())));
|
||||
let cache_registry = Arc::new(
|
||||
cache_registry_builder
|
||||
.add_cache(cached_meta_backend.clone())
|
||||
.build(),
|
||||
);
|
||||
let table_cache = cache_registry.get().unwrap();
|
||||
let catalog_manager = KvBackendCatalogManager::new(
|
||||
Mode::Distributed,
|
||||
Some(meta_client.clone()),
|
||||
cached_meta_backend.clone(),
|
||||
multi_cache_invalidator.clone(),
|
||||
table_cache,
|
||||
)
|
||||
.await;
|
||||
|
||||
let handlers_executor = HandlerGroupExecutor::new(vec![
|
||||
Arc::new(ParseMailboxMessageHandler),
|
||||
Arc::new(InvalidateTableCacheHandler::new(
|
||||
multi_cache_invalidator.clone(),
|
||||
)),
|
||||
Arc::new(InvalidateTableCacheHandler::new(cache_registry.clone())),
|
||||
]);
|
||||
|
||||
let heartbeat_task = HeartbeatTask::new(
|
||||
@@ -373,11 +376,12 @@ impl GreptimeDbClusterBuilder {
|
||||
|
||||
let instance = FrontendBuilder::new(
|
||||
cached_meta_backend.clone(),
|
||||
cache_registry.clone(),
|
||||
catalog_manager,
|
||||
datanode_clients,
|
||||
meta_client,
|
||||
)
|
||||
.with_cache_invalidator(multi_cache_invalidator)
|
||||
.with_local_cache_invalidator(cache_registry)
|
||||
.with_heartbeat_task(heartbeat_task)
|
||||
.try_build()
|
||||
.await
|
||||
|
||||
@@ -14,12 +14,12 @@
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use cache::default_cache_registry_builder;
|
||||
use catalog::kvbackend::KvBackendCatalogManager;
|
||||
use cmd::standalone::StandaloneOptions;
|
||||
use common_base::Plugins;
|
||||
use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
|
||||
use common_config::KvBackendConfig;
|
||||
use common_meta::cache_invalidator::MultiCacheInvalidator;
|
||||
use common_meta::ddl::flow_meta::FlowMetadataAllocator;
|
||||
use common_meta::ddl::table_meta::TableMetadataAllocator;
|
||||
use common_meta::ddl::DdlContext;
|
||||
@@ -128,12 +128,12 @@ impl GreptimeDbStandaloneBuilder {
|
||||
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
|
||||
table_metadata_manager.init().await.unwrap();
|
||||
let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
|
||||
let multi_cache_invalidator = Arc::new(MultiCacheInvalidator::default());
|
||||
let cache_registry = Arc::new(default_cache_registry_builder(kv_backend.clone()).build());
|
||||
let catalog_manager = KvBackendCatalogManager::new(
|
||||
Mode::Standalone,
|
||||
None,
|
||||
kv_backend.clone(),
|
||||
multi_cache_invalidator.clone(),
|
||||
cache_registry.get().unwrap(),
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -167,7 +167,7 @@ impl GreptimeDbStandaloneBuilder {
|
||||
DdlManager::try_new(
|
||||
DdlContext {
|
||||
node_manager: node_manager.clone(),
|
||||
cache_invalidator: multi_cache_invalidator,
|
||||
cache_invalidator: cache_registry.clone(),
|
||||
memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
|
||||
table_metadata_manager,
|
||||
table_metadata_allocator,
|
||||
@@ -182,6 +182,7 @@ impl GreptimeDbStandaloneBuilder {
|
||||
|
||||
let instance = FrontendBuilder::new(
|
||||
kv_backend.clone(),
|
||||
cache_registry,
|
||||
catalog_manager,
|
||||
node_manager,
|
||||
ddl_task_executor,
|
||||
|
||||
Reference in New Issue
Block a user