mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
feat: tableref cache (#3420)
* feat: tableref cache * chore: minor refactor * chore: avoid to string * chore: change log level * feat: add metrics for prometheus remote write decode
This commit is contained in:
@@ -251,6 +251,12 @@ pub enum Error {
|
||||
source: common_meta::error::Error,
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Get null from table cache, key: {}", key))]
|
||||
TableCacheNotGet { key: String, location: Location },
|
||||
|
||||
#[snafu(display("Failed to get table cache, err: {}", err_msg))]
|
||||
GetTableCache { err_msg: String },
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -311,6 +317,7 @@ impl ErrorExt for Error {
|
||||
Error::QueryAccessDenied { .. } => StatusCode::AccessDenied,
|
||||
Error::Datafusion { .. } => StatusCode::EngineExecuteQuery,
|
||||
Error::TableMetadataManager { source, .. } => source.status_code(),
|
||||
Error::TableCacheNotGet { .. } | Error::GetTableCache { .. } => StatusCode::Internal,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -82,12 +82,10 @@ impl CachedMetaKvBackendBuilder {
|
||||
let cache_ttl = self.cache_ttl.unwrap_or(DEFAULT_CACHE_TTL);
|
||||
let cache_tti = self.cache_tti.unwrap_or(DEFAULT_CACHE_TTI);
|
||||
|
||||
let cache = Arc::new(
|
||||
CacheBuilder::new(cache_max_capacity)
|
||||
.time_to_live(cache_ttl)
|
||||
.time_to_idle(cache_tti)
|
||||
.build(),
|
||||
);
|
||||
let cache = CacheBuilder::new(cache_max_capacity)
|
||||
.time_to_live(cache_ttl)
|
||||
.time_to_idle(cache_tti)
|
||||
.build();
|
||||
|
||||
let kv_backend = Arc::new(MetaKvBackend {
|
||||
client: self.meta_client,
|
||||
@@ -104,7 +102,7 @@ impl CachedMetaKvBackendBuilder {
|
||||
}
|
||||
}
|
||||
|
||||
pub type CacheBackendRef = Arc<Cache<Vec<u8>, KeyValue>>;
|
||||
pub type CacheBackend = Cache<Vec<u8>, KeyValue>;
|
||||
|
||||
/// A wrapper of `MetaKvBackend` with cache support.
|
||||
///
|
||||
@@ -117,7 +115,7 @@ pub type CacheBackendRef = Arc<Cache<Vec<u8>, KeyValue>>;
|
||||
/// TTL and TTI for cache.
|
||||
pub struct CachedMetaKvBackend {
|
||||
kv_backend: KvBackendRef,
|
||||
cache: CacheBackendRef,
|
||||
cache: CacheBackend,
|
||||
name: String,
|
||||
version: AtomicUsize,
|
||||
}
|
||||
@@ -317,12 +315,10 @@ impl CachedMetaKvBackend {
|
||||
// only for test
|
||||
#[cfg(test)]
|
||||
fn wrap(kv_backend: KvBackendRef) -> Self {
|
||||
let cache = Arc::new(
|
||||
CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY)
|
||||
.time_to_live(DEFAULT_CACHE_TTL)
|
||||
.time_to_idle(DEFAULT_CACHE_TTI)
|
||||
.build(),
|
||||
);
|
||||
let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY)
|
||||
.time_to_live(DEFAULT_CACHE_TTL)
|
||||
.time_to_idle(DEFAULT_CACHE_TTI)
|
||||
.build();
|
||||
|
||||
let name = format!("CachedKvBackend({})", kv_backend.name());
|
||||
Self {
|
||||
@@ -333,7 +329,7 @@ impl CachedMetaKvBackend {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn cache(&self) -> &CacheBackendRef {
|
||||
pub fn cache(&self) -> &CacheBackend {
|
||||
&self.cache
|
||||
}
|
||||
|
||||
|
||||
@@ -15,11 +15,13 @@
|
||||
use std::any::Any;
|
||||
use std::collections::BTreeSet;
|
||||
use std::sync::{Arc, Weak};
|
||||
use std::time::Duration;
|
||||
|
||||
use async_stream::try_stream;
|
||||
use common_catalog::consts::{
|
||||
DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, NUMBERS_TABLE_ID,
|
||||
};
|
||||
use common_catalog::format_full_table_name;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::cache_invalidator::{CacheInvalidator, CacheInvalidatorRef, Context};
|
||||
use common_meta::error::Result as MetaResult;
|
||||
@@ -32,6 +34,7 @@ use common_meta::kv_backend::KvBackendRef;
|
||||
use common_meta::table_name::TableName;
|
||||
use futures_util::stream::BoxStream;
|
||||
use futures_util::{StreamExt, TryStreamExt};
|
||||
use moka::future::{Cache as AsyncCache, CacheBuilder};
|
||||
use moka::sync::Cache;
|
||||
use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
|
||||
use snafu::prelude::*;
|
||||
@@ -40,9 +43,10 @@ use table::metadata::TableId;
|
||||
use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
|
||||
use table::TableRef;
|
||||
|
||||
use crate::error::Error::{GetTableCache, TableCacheNotGet};
|
||||
use crate::error::{
|
||||
self as catalog_err, ListCatalogsSnafu, ListSchemasSnafu, ListTablesSnafu,
|
||||
Result as CatalogResult, TableMetadataManagerSnafu,
|
||||
Result as CatalogResult, TableCacheNotGetSnafu, TableMetadataManagerSnafu,
|
||||
};
|
||||
use crate::information_schema::InformationSchemaProvider;
|
||||
use crate::CatalogManager;
|
||||
@@ -62,6 +66,7 @@ pub struct KvBackendCatalogManager {
|
||||
table_metadata_manager: TableMetadataManagerRef,
|
||||
/// A sub-CatalogManager that handles system tables
|
||||
system_catalog: SystemCatalog,
|
||||
table_cache: AsyncCache<String, TableRef>,
|
||||
}
|
||||
|
||||
fn make_table(table_info_value: TableInfoValue) -> CatalogResult<TableRef> {
|
||||
@@ -81,13 +86,24 @@ impl CacheInvalidator for KvBackendCatalogManager {
|
||||
}
|
||||
|
||||
async fn invalidate_table_name(&self, ctx: &Context, table_name: TableName) -> MetaResult<()> {
|
||||
let table_cache_key = format_full_table_name(
|
||||
&table_name.catalog_name,
|
||||
&table_name.schema_name,
|
||||
&table_name.table_name,
|
||||
);
|
||||
self.cache_invalidator
|
||||
.invalidate_table_name(ctx, table_name)
|
||||
.await
|
||||
.await?;
|
||||
self.table_cache.invalidate(&table_cache_key).await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
const DEFAULT_CACHED_CATALOG: u64 = 128;
|
||||
const CATALOG_CACHE_MAX_CAPACITY: u64 = 128;
|
||||
const TABLE_CACHE_MAX_CAPACITY: u64 = 65536;
|
||||
const TABLE_CACHE_TTL: Duration = Duration::from_secs(10 * 60);
|
||||
const TABLE_CACHE_TTI: Duration = Duration::from_secs(5 * 60);
|
||||
|
||||
impl KvBackendCatalogManager {
|
||||
pub fn new(backend: KvBackendRef, cache_invalidator: CacheInvalidatorRef) -> Arc<Self> {
|
||||
@@ -97,12 +113,16 @@ impl KvBackendCatalogManager {
|
||||
cache_invalidator,
|
||||
system_catalog: SystemCatalog {
|
||||
catalog_manager: me.clone(),
|
||||
catalog_cache: Cache::new(DEFAULT_CACHED_CATALOG),
|
||||
catalog_cache: Cache::new(CATALOG_CACHE_MAX_CAPACITY),
|
||||
information_schema_provider: Arc::new(InformationSchemaProvider::new(
|
||||
DEFAULT_CATALOG_NAME.to_string(),
|
||||
me.clone(),
|
||||
)),
|
||||
},
|
||||
table_cache: CacheBuilder::new(TABLE_CACHE_MAX_CAPACITY)
|
||||
.time_to_live(TABLE_CACHE_TTL)
|
||||
.time_to_idle(TABLE_CACHE_TTI)
|
||||
.build(),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -217,29 +237,52 @@ impl CatalogManager for KvBackendCatalogManager {
|
||||
return Ok(Some(table));
|
||||
}
|
||||
|
||||
let key = TableNameKey::new(catalog, schema, table_name);
|
||||
let Some(table_name_value) = self
|
||||
.table_metadata_manager
|
||||
.table_name_manager()
|
||||
.get(key)
|
||||
.await
|
||||
.context(TableMetadataManagerSnafu)?
|
||||
else {
|
||||
return Ok(None);
|
||||
};
|
||||
let table_id = table_name_value.table_id();
|
||||
let init = async {
|
||||
let table_name_key = TableNameKey::new(catalog, schema, table_name);
|
||||
let Some(table_name_value) = self
|
||||
.table_metadata_manager
|
||||
.table_name_manager()
|
||||
.get(table_name_key)
|
||||
.await
|
||||
.context(TableMetadataManagerSnafu)?
|
||||
else {
|
||||
return TableCacheNotGetSnafu {
|
||||
key: table_name_key.to_string(),
|
||||
}
|
||||
.fail();
|
||||
};
|
||||
let table_id = table_name_value.table_id();
|
||||
|
||||
let Some(table_info_value) = self
|
||||
.table_metadata_manager
|
||||
.table_info_manager()
|
||||
.get(table_id)
|
||||
.await
|
||||
.context(TableMetadataManagerSnafu)?
|
||||
.map(|v| v.into_inner())
|
||||
else {
|
||||
return Ok(None);
|
||||
let Some(table_info_value) = self
|
||||
.table_metadata_manager
|
||||
.table_info_manager()
|
||||
.get(table_id)
|
||||
.await
|
||||
.context(TableMetadataManagerSnafu)?
|
||||
.map(|v| v.into_inner())
|
||||
else {
|
||||
return TableCacheNotGetSnafu {
|
||||
key: table_name_key.to_string(),
|
||||
}
|
||||
.fail();
|
||||
};
|
||||
make_table(table_info_value)
|
||||
};
|
||||
make_table(table_info_value).map(Some)
|
||||
|
||||
match self
|
||||
.table_cache
|
||||
.try_get_with_by_ref(&format_full_table_name(catalog, schema, table_name), init)
|
||||
.await
|
||||
{
|
||||
Ok(table) => Ok(Some(table)),
|
||||
Err(err) => match err.as_ref() {
|
||||
TableCacheNotGet { .. } => Ok(None),
|
||||
_ => Err(err),
|
||||
},
|
||||
}
|
||||
.map_err(|err| GetTableCache {
|
||||
err_msg: err.to_string(),
|
||||
})
|
||||
}
|
||||
|
||||
async fn tables<'a>(
|
||||
|
||||
@@ -16,7 +16,7 @@ use std::sync::Arc;
|
||||
|
||||
use common_procedure::{watcher, Output, ProcedureId, ProcedureManagerRef, ProcedureWithId};
|
||||
use common_telemetry::tracing_context::{FutureExt, TracingContext};
|
||||
use common_telemetry::{info, tracing};
|
||||
use common_telemetry::{debug, info, tracing};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use store_api::storage::TableId;
|
||||
|
||||
@@ -545,7 +545,7 @@ impl ProcedureExecutor for DdlManager {
|
||||
.attach(tracing::info_span!("DdlManager::submit_ddl_task"));
|
||||
async move {
|
||||
let cluster_id = ctx.cluster_id.unwrap_or_default();
|
||||
info!("Submitting Ddl task: {:?}", request.task);
|
||||
debug!("Submitting Ddl task: {:?}", request.task);
|
||||
match request.task {
|
||||
CreateTable(create_table_task) => {
|
||||
handle_create_table_task(self, cluster_id, create_table_task).await
|
||||
|
||||
@@ -128,13 +128,18 @@ pub async fn remote_read(
|
||||
}
|
||||
|
||||
async fn decode_remote_write_request(body: Body) -> Result<WriteRequest> {
|
||||
let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_ELAPSED.start_timer();
|
||||
let body = hyper::body::to_bytes(body)
|
||||
.await
|
||||
.context(error::HyperSnafu)?;
|
||||
|
||||
let buf = snappy_decompress(&body[..])?;
|
||||
|
||||
WriteRequest::decode(&buf[..]).context(error::DecodePromRemoteRequestSnafu)
|
||||
let request = WriteRequest::decode(&buf[..]).context(error::DecodePromRemoteRequestSnafu)?;
|
||||
crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_NUM_SERIES
|
||||
.observe(request.timeseries.len() as f64);
|
||||
|
||||
Ok(request)
|
||||
}
|
||||
|
||||
async fn decode_remote_read_request(body: Body) -> Result<ReadRequest> {
|
||||
|
||||
@@ -78,6 +78,21 @@ lazy_static! {
|
||||
&[METRIC_DB_LABEL]
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_HTTP_PROM_STORE_DECODE_ELAPSED: Histogram = register_histogram!(
|
||||
"greptime_servers_http_prometheus_decode_elapsed",
|
||||
"servers http prometheus decode elapsed",
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_HTTP_PROM_STORE_DECODE_NUM_SERIES: Histogram = register_histogram!(
|
||||
"greptime_servers_http_prometheus_decode_num_series",
|
||||
"servers http prometheus decode num series",
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_HTTP_PROM_STORE_CONVERT_ELAPSED: Histogram = register_histogram!(
|
||||
"greptime_servers_http_prometheus_convert_elapsed",
|
||||
"servers http prometheus convert to gRPC request elapsed",
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_HTTP_PROM_STORE_READ_ELAPSED: HistogramVec = register_histogram_vec!(
|
||||
"greptime_servers_http_prometheus_read_elapsed",
|
||||
"servers http prometheus read elapsed",
|
||||
|
||||
@@ -299,6 +299,8 @@ fn recordbatch_to_timeseries(table: &str, recordbatch: RecordBatch) -> Result<Ve
|
||||
}
|
||||
|
||||
pub fn to_grpc_row_insert_requests(request: WriteRequest) -> Result<(RowInsertRequests, usize)> {
|
||||
let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_CONVERT_ELAPSED.start_timer();
|
||||
|
||||
let mut multi_table_data = MultiTableData::new();
|
||||
|
||||
for series in &request.timeseries {
|
||||
|
||||
@@ -212,7 +212,7 @@ fn write_by_semantic_type(
|
||||
let index = column_indexes.entry(name.clone()).or_insert(schema.len());
|
||||
if *index == schema.len() {
|
||||
schema.push(ColumnSchema {
|
||||
column_name: name.to_string(),
|
||||
column_name: name,
|
||||
datatype: datatype as i32,
|
||||
semantic_type: semantic_type as i32,
|
||||
..Default::default()
|
||||
|
||||
Reference in New Issue
Block a user