diff --git a/src/catalog/src/error.rs b/src/catalog/src/error.rs index cf0008ad80..3e8f204ed3 100644 --- a/src/catalog/src/error.rs +++ b/src/catalog/src/error.rs @@ -251,6 +251,12 @@ pub enum Error { source: common_meta::error::Error, location: Location, }, + + #[snafu(display("Get null from table cache, key: {}", key))] + TableCacheNotGet { key: String, location: Location }, + + #[snafu(display("Failed to get table cache, err: {}", err_msg))] + GetTableCache { err_msg: String }, } pub type Result = std::result::Result; @@ -311,6 +317,7 @@ impl ErrorExt for Error { Error::QueryAccessDenied { .. } => StatusCode::AccessDenied, Error::Datafusion { .. } => StatusCode::EngineExecuteQuery, Error::TableMetadataManager { source, .. } => source.status_code(), + Error::TableCacheNotGet { .. } | Error::GetTableCache { .. } => StatusCode::Internal, } } diff --git a/src/catalog/src/kvbackend/client.rs b/src/catalog/src/kvbackend/client.rs index c0f3547609..9535ab7c77 100644 --- a/src/catalog/src/kvbackend/client.rs +++ b/src/catalog/src/kvbackend/client.rs @@ -82,12 +82,10 @@ impl CachedMetaKvBackendBuilder { let cache_ttl = self.cache_ttl.unwrap_or(DEFAULT_CACHE_TTL); let cache_tti = self.cache_tti.unwrap_or(DEFAULT_CACHE_TTI); - let cache = Arc::new( - CacheBuilder::new(cache_max_capacity) - .time_to_live(cache_ttl) - .time_to_idle(cache_tti) - .build(), - ); + let cache = CacheBuilder::new(cache_max_capacity) + .time_to_live(cache_ttl) + .time_to_idle(cache_tti) + .build(); let kv_backend = Arc::new(MetaKvBackend { client: self.meta_client, @@ -104,7 +102,7 @@ impl CachedMetaKvBackendBuilder { } } -pub type CacheBackendRef = Arc, KeyValue>>; +pub type CacheBackend = Cache, KeyValue>; /// A wrapper of `MetaKvBackend` with cache support. /// @@ -117,7 +115,7 @@ pub type CacheBackendRef = Arc, KeyValue>>; /// TTL and TTI for cache. pub struct CachedMetaKvBackend { kv_backend: KvBackendRef, - cache: CacheBackendRef, + cache: CacheBackend, name: String, version: AtomicUsize, } @@ -317,12 +315,10 @@ impl CachedMetaKvBackend { // only for test #[cfg(test)] fn wrap(kv_backend: KvBackendRef) -> Self { - let cache = Arc::new( - CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY) - .time_to_live(DEFAULT_CACHE_TTL) - .time_to_idle(DEFAULT_CACHE_TTI) - .build(), - ); + let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY) + .time_to_live(DEFAULT_CACHE_TTL) + .time_to_idle(DEFAULT_CACHE_TTI) + .build(); let name = format!("CachedKvBackend({})", kv_backend.name()); Self { @@ -333,7 +329,7 @@ impl CachedMetaKvBackend { } } - pub fn cache(&self) -> &CacheBackendRef { + pub fn cache(&self) -> &CacheBackend { &self.cache } diff --git a/src/catalog/src/kvbackend/manager.rs b/src/catalog/src/kvbackend/manager.rs index 4e3d4ec3cd..05c2431a4a 100644 --- a/src/catalog/src/kvbackend/manager.rs +++ b/src/catalog/src/kvbackend/manager.rs @@ -15,11 +15,13 @@ use std::any::Any; use std::collections::BTreeSet; use std::sync::{Arc, Weak}; +use std::time::Duration; use async_stream::try_stream; use common_catalog::consts::{ DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, NUMBERS_TABLE_ID, }; +use common_catalog::format_full_table_name; use common_error::ext::BoxedError; use common_meta::cache_invalidator::{CacheInvalidator, CacheInvalidatorRef, Context}; use common_meta::error::Result as MetaResult; @@ -32,6 +34,7 @@ use common_meta::kv_backend::KvBackendRef; use common_meta::table_name::TableName; use futures_util::stream::BoxStream; use futures_util::{StreamExt, TryStreamExt}; +use moka::future::{Cache as AsyncCache, CacheBuilder}; use moka::sync::Cache; use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef}; use snafu::prelude::*; @@ -40,9 +43,10 @@ use table::metadata::TableId; use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME}; use table::TableRef; +use crate::error::Error::{GetTableCache, TableCacheNotGet}; use crate::error::{ self as catalog_err, ListCatalogsSnafu, ListSchemasSnafu, ListTablesSnafu, - Result as CatalogResult, TableMetadataManagerSnafu, + Result as CatalogResult, TableCacheNotGetSnafu, TableMetadataManagerSnafu, }; use crate::information_schema::InformationSchemaProvider; use crate::CatalogManager; @@ -62,6 +66,7 @@ pub struct KvBackendCatalogManager { table_metadata_manager: TableMetadataManagerRef, /// A sub-CatalogManager that handles system tables system_catalog: SystemCatalog, + table_cache: AsyncCache, } fn make_table(table_info_value: TableInfoValue) -> CatalogResult { @@ -81,13 +86,24 @@ impl CacheInvalidator for KvBackendCatalogManager { } async fn invalidate_table_name(&self, ctx: &Context, table_name: TableName) -> MetaResult<()> { + let table_cache_key = format_full_table_name( + &table_name.catalog_name, + &table_name.schema_name, + &table_name.table_name, + ); self.cache_invalidator .invalidate_table_name(ctx, table_name) - .await + .await?; + self.table_cache.invalidate(&table_cache_key).await; + + Ok(()) } } -const DEFAULT_CACHED_CATALOG: u64 = 128; +const CATALOG_CACHE_MAX_CAPACITY: u64 = 128; +const TABLE_CACHE_MAX_CAPACITY: u64 = 65536; +const TABLE_CACHE_TTL: Duration = Duration::from_secs(10 * 60); +const TABLE_CACHE_TTI: Duration = Duration::from_secs(5 * 60); impl KvBackendCatalogManager { pub fn new(backend: KvBackendRef, cache_invalidator: CacheInvalidatorRef) -> Arc { @@ -97,12 +113,16 @@ impl KvBackendCatalogManager { cache_invalidator, system_catalog: SystemCatalog { catalog_manager: me.clone(), - catalog_cache: Cache::new(DEFAULT_CACHED_CATALOG), + catalog_cache: Cache::new(CATALOG_CACHE_MAX_CAPACITY), information_schema_provider: Arc::new(InformationSchemaProvider::new( DEFAULT_CATALOG_NAME.to_string(), me.clone(), )), }, + table_cache: CacheBuilder::new(TABLE_CACHE_MAX_CAPACITY) + .time_to_live(TABLE_CACHE_TTL) + .time_to_idle(TABLE_CACHE_TTI) + .build(), }) } @@ -217,29 +237,52 @@ impl CatalogManager for KvBackendCatalogManager { return Ok(Some(table)); } - let key = TableNameKey::new(catalog, schema, table_name); - let Some(table_name_value) = self - .table_metadata_manager - .table_name_manager() - .get(key) - .await - .context(TableMetadataManagerSnafu)? - else { - return Ok(None); - }; - let table_id = table_name_value.table_id(); + let init = async { + let table_name_key = TableNameKey::new(catalog, schema, table_name); + let Some(table_name_value) = self + .table_metadata_manager + .table_name_manager() + .get(table_name_key) + .await + .context(TableMetadataManagerSnafu)? + else { + return TableCacheNotGetSnafu { + key: table_name_key.to_string(), + } + .fail(); + }; + let table_id = table_name_value.table_id(); - let Some(table_info_value) = self - .table_metadata_manager - .table_info_manager() - .get(table_id) - .await - .context(TableMetadataManagerSnafu)? - .map(|v| v.into_inner()) - else { - return Ok(None); + let Some(table_info_value) = self + .table_metadata_manager + .table_info_manager() + .get(table_id) + .await + .context(TableMetadataManagerSnafu)? + .map(|v| v.into_inner()) + else { + return TableCacheNotGetSnafu { + key: table_name_key.to_string(), + } + .fail(); + }; + make_table(table_info_value) }; - make_table(table_info_value).map(Some) + + match self + .table_cache + .try_get_with_by_ref(&format_full_table_name(catalog, schema, table_name), init) + .await + { + Ok(table) => Ok(Some(table)), + Err(err) => match err.as_ref() { + TableCacheNotGet { .. } => Ok(None), + _ => Err(err), + }, + } + .map_err(|err| GetTableCache { + err_msg: err.to_string(), + }) } async fn tables<'a>( diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 4a760cafce..5f712fbdd8 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -16,7 +16,7 @@ use std::sync::Arc; use common_procedure::{watcher, Output, ProcedureId, ProcedureManagerRef, ProcedureWithId}; use common_telemetry::tracing_context::{FutureExt, TracingContext}; -use common_telemetry::{info, tracing}; +use common_telemetry::{debug, info, tracing}; use snafu::{ensure, OptionExt, ResultExt}; use store_api::storage::TableId; @@ -545,7 +545,7 @@ impl ProcedureExecutor for DdlManager { .attach(tracing::info_span!("DdlManager::submit_ddl_task")); async move { let cluster_id = ctx.cluster_id.unwrap_or_default(); - info!("Submitting Ddl task: {:?}", request.task); + debug!("Submitting Ddl task: {:?}", request.task); match request.task { CreateTable(create_table_task) => { handle_create_table_task(self, cluster_id, create_table_task).await diff --git a/src/servers/src/http/prom_store.rs b/src/servers/src/http/prom_store.rs index e9d97f7b23..a6b74d324b 100644 --- a/src/servers/src/http/prom_store.rs +++ b/src/servers/src/http/prom_store.rs @@ -128,13 +128,18 @@ pub async fn remote_read( } async fn decode_remote_write_request(body: Body) -> Result { + let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_ELAPSED.start_timer(); let body = hyper::body::to_bytes(body) .await .context(error::HyperSnafu)?; let buf = snappy_decompress(&body[..])?; - WriteRequest::decode(&buf[..]).context(error::DecodePromRemoteRequestSnafu) + let request = WriteRequest::decode(&buf[..]).context(error::DecodePromRemoteRequestSnafu)?; + crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_NUM_SERIES + .observe(request.timeseries.len() as f64); + + Ok(request) } async fn decode_remote_read_request(body: Body) -> Result { diff --git a/src/servers/src/metrics.rs b/src/servers/src/metrics.rs index 87d8bd7a9f..e9aac792d4 100644 --- a/src/servers/src/metrics.rs +++ b/src/servers/src/metrics.rs @@ -78,6 +78,21 @@ lazy_static! { &[METRIC_DB_LABEL] ) .unwrap(); + pub static ref METRIC_HTTP_PROM_STORE_DECODE_ELAPSED: Histogram = register_histogram!( + "greptime_servers_http_prometheus_decode_elapsed", + "servers http prometheus decode elapsed", + ) + .unwrap(); + pub static ref METRIC_HTTP_PROM_STORE_DECODE_NUM_SERIES: Histogram = register_histogram!( + "greptime_servers_http_prometheus_decode_num_series", + "servers http prometheus decode num series", + ) + .unwrap(); + pub static ref METRIC_HTTP_PROM_STORE_CONVERT_ELAPSED: Histogram = register_histogram!( + "greptime_servers_http_prometheus_convert_elapsed", + "servers http prometheus convert to gRPC request elapsed", + ) + .unwrap(); pub static ref METRIC_HTTP_PROM_STORE_READ_ELAPSED: HistogramVec = register_histogram_vec!( "greptime_servers_http_prometheus_read_elapsed", "servers http prometheus read elapsed", diff --git a/src/servers/src/prom_store.rs b/src/servers/src/prom_store.rs index 52591678b6..3c114f335d 100644 --- a/src/servers/src/prom_store.rs +++ b/src/servers/src/prom_store.rs @@ -299,6 +299,8 @@ fn recordbatch_to_timeseries(table: &str, recordbatch: RecordBatch) -> Result Result<(RowInsertRequests, usize)> { + let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_CONVERT_ELAPSED.start_timer(); + let mut multi_table_data = MultiTableData::new(); for series in &request.timeseries { diff --git a/src/servers/src/row_writer.rs b/src/servers/src/row_writer.rs index b4daad3ecd..c11f3bfc9f 100644 --- a/src/servers/src/row_writer.rs +++ b/src/servers/src/row_writer.rs @@ -212,7 +212,7 @@ fn write_by_semantic_type( let index = column_indexes.entry(name.clone()).or_insert(schema.len()); if *index == schema.len() { schema.push(ColumnSchema { - column_name: name.to_string(), + column_name: name, datatype: datatype as i32, semantic_type: semantic_type as i32, ..Default::default()