mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-14 17:23:09 +00:00
fix: avoid acquiring lock during reading stats (#4070)
* fix: avoid acquiring lock during reading stats * chore: apply suggestions from CR * chore: apply suggestions from CR
This commit is contained in:
@@ -189,7 +189,7 @@ impl RegionServer {
|
||||
|
||||
pub async fn region_disk_usage(&self, region_id: RegionId) -> Option<i64> {
|
||||
match self.inner.region_map.get(®ion_id) {
|
||||
Some(e) => e.region_disk_usage(region_id).await,
|
||||
Some(e) => e.region_disk_usage(region_id),
|
||||
None => None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -200,7 +200,7 @@ impl RegionEngine for MockRegionEngine {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn region_disk_usage(&self, _region_id: RegionId) -> Option<i64> {
|
||||
fn region_disk_usage(&self, _region_id: RegionId) -> Option<i64> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
|
||||
@@ -107,7 +107,7 @@ impl RegionEngine for FileRegionEngine {
|
||||
self.inner.stop().await.map_err(BoxedError::new)
|
||||
}
|
||||
|
||||
async fn region_disk_usage(&self, _: RegionId) -> Option<i64> {
|
||||
fn region_disk_usage(&self, _: RegionId) -> Option<i64> {
|
||||
None
|
||||
}
|
||||
|
||||
|
||||
@@ -202,9 +202,9 @@ impl RegionEngine for MetricEngine {
|
||||
/// Retrieves region's disk usage.
|
||||
///
|
||||
/// Note: Returns `None` if it's a logical region.
|
||||
async fn region_disk_usage(&self, region_id: RegionId) -> Option<i64> {
|
||||
fn region_disk_usage(&self, region_id: RegionId) -> Option<i64> {
|
||||
if self.inner.is_physical_region(region_id) {
|
||||
self.inner.mito.region_disk_usage(region_id).await
|
||||
self.inner.mito.region_disk_usage(region_id)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
@@ -383,15 +383,7 @@ mod test {
|
||||
let logical_region_id = env.default_logical_region_id();
|
||||
let physical_region_id = env.default_physical_region_id();
|
||||
|
||||
assert!(env
|
||||
.metric()
|
||||
.region_disk_usage(logical_region_id)
|
||||
.await
|
||||
.is_none());
|
||||
assert!(env
|
||||
.metric()
|
||||
.region_disk_usage(physical_region_id)
|
||||
.await
|
||||
.is_some());
|
||||
assert!(env.metric().region_disk_usage(logical_region_id).is_none());
|
||||
assert!(env.metric().region_disk_usage(physical_region_id).is_some());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -110,14 +110,14 @@ impl MitoEngine {
|
||||
}
|
||||
|
||||
/// Returns the region disk/memory usage information.
|
||||
pub async fn get_region_usage(&self, region_id: RegionId) -> Result<RegionUsage> {
|
||||
pub fn get_region_usage(&self, region_id: RegionId) -> Result<RegionUsage> {
|
||||
let region = self
|
||||
.inner
|
||||
.workers
|
||||
.get_region(region_id)
|
||||
.context(RegionNotFoundSnafu { region_id })?;
|
||||
|
||||
Ok(region.region_usage().await)
|
||||
Ok(region.region_usage())
|
||||
}
|
||||
|
||||
/// Handle substrait query and return a stream of record batches
|
||||
@@ -368,10 +368,9 @@ impl RegionEngine for MitoEngine {
|
||||
self.inner.stop().await.map_err(BoxedError::new)
|
||||
}
|
||||
|
||||
async fn region_disk_usage(&self, region_id: RegionId) -> Option<i64> {
|
||||
fn region_disk_usage(&self, region_id: RegionId) -> Option<i64> {
|
||||
let size = self
|
||||
.get_region_usage(region_id)
|
||||
.await
|
||||
.map(|usage| usage.disk_usage())
|
||||
.ok()?;
|
||||
size.try_into().ok()
|
||||
|
||||
@@ -524,7 +524,7 @@ async fn test_region_usage() {
|
||||
.unwrap();
|
||||
// region is empty now, check manifest size
|
||||
let region = engine.get_region(region_id).unwrap();
|
||||
let region_stat = region.region_usage().await;
|
||||
let region_stat = region.region_usage();
|
||||
assert_eq!(region_stat.manifest_usage, 686);
|
||||
|
||||
// put some rows
|
||||
@@ -535,7 +535,7 @@ async fn test_region_usage() {
|
||||
|
||||
put_rows(&engine, region_id, rows).await;
|
||||
|
||||
let region_stat = region.region_usage().await;
|
||||
let region_stat = region.region_usage();
|
||||
assert!(region_stat.wal_usage > 0);
|
||||
|
||||
// delete some rows
|
||||
@@ -545,13 +545,13 @@ async fn test_region_usage() {
|
||||
};
|
||||
delete_rows(&engine, region_id, rows).await;
|
||||
|
||||
let region_stat = region.region_usage().await;
|
||||
let region_stat = region.region_usage();
|
||||
assert!(region_stat.wal_usage > 0);
|
||||
|
||||
// flush region
|
||||
flush_region(&engine, region_id, None).await;
|
||||
|
||||
let region_stat = region.region_usage().await;
|
||||
let region_stat = region.region_usage();
|
||||
assert_eq!(region_stat.sst_usage, 3010);
|
||||
|
||||
// region total usage
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_datasource::compression::CompressionType;
|
||||
@@ -121,12 +122,17 @@ pub struct RegionManifestManager {
|
||||
|
||||
impl RegionManifestManager {
|
||||
/// Constructs a region's manifest and persist it.
|
||||
pub async fn new(metadata: RegionMetadataRef, options: RegionManifestOptions) -> Result<Self> {
|
||||
pub async fn new(
|
||||
metadata: RegionMetadataRef,
|
||||
options: RegionManifestOptions,
|
||||
total_manifest_size: Arc<AtomicU64>,
|
||||
) -> Result<Self> {
|
||||
// construct storage
|
||||
let mut store = ManifestObjectStore::new(
|
||||
&options.manifest_dir,
|
||||
options.object_store.clone(),
|
||||
options.compress_type,
|
||||
total_manifest_size,
|
||||
);
|
||||
|
||||
info!(
|
||||
@@ -168,7 +174,10 @@ impl RegionManifestManager {
|
||||
/// Opens an existing manifest.
|
||||
///
|
||||
/// Returns `Ok(None)` if no such manifest.
|
||||
pub async fn open(options: RegionManifestOptions) -> Result<Option<Self>> {
|
||||
pub async fn open(
|
||||
options: RegionManifestOptions,
|
||||
total_manifest_size: Arc<AtomicU64>,
|
||||
) -> Result<Option<Self>> {
|
||||
let _t = MANIFEST_OP_ELAPSED
|
||||
.with_label_values(&["open"])
|
||||
.start_timer();
|
||||
@@ -178,6 +187,7 @@ impl RegionManifestManager {
|
||||
&options.manifest_dir,
|
||||
options.object_store.clone(),
|
||||
options.compress_type,
|
||||
total_manifest_size,
|
||||
);
|
||||
|
||||
// recover from storage
|
||||
|
||||
@@ -15,6 +15,8 @@
|
||||
use std::collections::HashMap;
|
||||
use std::iter::Iterator;
|
||||
use std::str::FromStr;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_datasource::compression::CompressionType;
|
||||
use common_telemetry::debug;
|
||||
@@ -133,15 +135,22 @@ pub struct ManifestObjectStore {
|
||||
path: String,
|
||||
/// Stores the size of each manifest file.
|
||||
manifest_size_map: HashMap<FileKey, u64>,
|
||||
total_manifest_size: Arc<AtomicU64>,
|
||||
}
|
||||
|
||||
impl ManifestObjectStore {
|
||||
pub fn new(path: &str, object_store: ObjectStore, compress_type: CompressionType) -> Self {
|
||||
pub fn new(
|
||||
path: &str,
|
||||
object_store: ObjectStore,
|
||||
compress_type: CompressionType,
|
||||
total_manifest_size: Arc<AtomicU64>,
|
||||
) -> Self {
|
||||
Self {
|
||||
object_store,
|
||||
compress_type,
|
||||
path: util::normalize_dir(path),
|
||||
manifest_size_map: HashMap::new(),
|
||||
total_manifest_size,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -338,10 +347,9 @@ impl ManifestObjectStore {
|
||||
// delete manifest sizes
|
||||
for (_, is_checkpoint, version) in &del_entries {
|
||||
if *is_checkpoint {
|
||||
self.manifest_size_map
|
||||
.remove(&FileKey::Checkpoint(*version));
|
||||
self.unset_file_size(&FileKey::Checkpoint(*version));
|
||||
} else {
|
||||
self.manifest_size_map.remove(&FileKey::Delta(*version));
|
||||
self.unset_file_size(&FileKey::Delta(*version));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -564,12 +572,28 @@ impl ManifestObjectStore {
|
||||
/// Set the size of the delta file by delta version.
|
||||
pub(crate) fn set_delta_file_size(&mut self, version: ManifestVersion, size: u64) {
|
||||
self.manifest_size_map.insert(FileKey::Delta(version), size);
|
||||
self.inc_total_manifest_size(size);
|
||||
}
|
||||
|
||||
/// Set the size of the checkpoint file by checkpoint version.
|
||||
pub(crate) fn set_checkpoint_file_size(&mut self, version: ManifestVersion, size: u64) {
|
||||
self.manifest_size_map
|
||||
.insert(FileKey::Checkpoint(version), size);
|
||||
self.inc_total_manifest_size(size);
|
||||
}
|
||||
|
||||
fn unset_file_size(&mut self, key: &FileKey) {
|
||||
if let Some(val) = self.manifest_size_map.remove(key) {
|
||||
self.dec_total_manifest_size(val);
|
||||
}
|
||||
}
|
||||
|
||||
fn inc_total_manifest_size(&self, val: u64) {
|
||||
self.total_manifest_size.fetch_add(val, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
fn dec_total_manifest_size(&self, val: u64) {
|
||||
self.total_manifest_size.fetch_sub(val, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -610,7 +634,12 @@ mod tests {
|
||||
let mut builder = Fs::default();
|
||||
let _ = builder.root(&tmp_dir.path().to_string_lossy());
|
||||
let object_store = ObjectStore::new(builder).unwrap().finish();
|
||||
ManifestObjectStore::new("/", object_store, CompressionType::Uncompressed)
|
||||
ManifestObjectStore::new(
|
||||
"/",
|
||||
object_store,
|
||||
CompressionType::Uncompressed,
|
||||
Default::default(),
|
||||
)
|
||||
}
|
||||
|
||||
fn new_checkpoint_metadata_with_version(version: ManifestVersion) -> CheckpointMetadata {
|
||||
|
||||
@@ -20,7 +20,7 @@ pub(crate) mod version;
|
||||
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::atomic::{AtomicI64, Ordering};
|
||||
use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use common_telemetry::{error, info, warn};
|
||||
@@ -106,6 +106,8 @@ pub(crate) struct MitoRegion {
|
||||
time_provider: TimeProviderRef,
|
||||
/// Memtable builder for the region.
|
||||
pub(crate) memtable_builder: MemtableBuilderRef,
|
||||
/// manifest stats
|
||||
stats: ManifestStats,
|
||||
}
|
||||
|
||||
pub(crate) type MitoRegionRef = Arc<MitoRegion>;
|
||||
@@ -233,7 +235,7 @@ impl MitoRegion {
|
||||
}
|
||||
|
||||
/// Returns the region usage in bytes.
|
||||
pub(crate) async fn region_usage(&self) -> RegionUsage {
|
||||
pub(crate) fn region_usage(&self) -> RegionUsage {
|
||||
let region_id = self.region_id;
|
||||
|
||||
let version = self.version();
|
||||
@@ -243,13 +245,7 @@ impl MitoRegion {
|
||||
let sst_usage = version.ssts.sst_usage();
|
||||
|
||||
let wal_usage = self.estimated_wal_usage(memtable_usage);
|
||||
|
||||
let manifest_usage = self
|
||||
.manifest_ctx
|
||||
.manifest_manager
|
||||
.read()
|
||||
.await
|
||||
.manifest_usage();
|
||||
let manifest_usage = self.stats.total_manifest_size();
|
||||
|
||||
RegionUsage {
|
||||
region_id,
|
||||
@@ -526,6 +522,18 @@ impl OpeningRegions {
|
||||
|
||||
pub(crate) type OpeningRegionsRef = Arc<OpeningRegions>;
|
||||
|
||||
/// Manifest stats.
|
||||
#[derive(Default, Debug, Clone)]
|
||||
pub(crate) struct ManifestStats {
|
||||
total_manifest_size: Arc<AtomicU64>,
|
||||
}
|
||||
|
||||
impl ManifestStats {
|
||||
fn total_manifest_size(&self) -> u64 {
|
||||
self.total_manifest_size.load(Ordering::Relaxed)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crossbeam_utils::atomic::AtomicCell;
|
||||
|
||||
@@ -41,7 +41,7 @@ use crate::memtable::time_partition::TimePartitions;
|
||||
use crate::memtable::MemtableBuilderProvider;
|
||||
use crate::region::options::RegionOptions;
|
||||
use crate::region::version::{VersionBuilder, VersionControl, VersionControlRef};
|
||||
use crate::region::{ManifestContext, MitoRegion, RegionState};
|
||||
use crate::region::{ManifestContext, ManifestStats, MitoRegion, RegionState};
|
||||
use crate::region_write_ctx::RegionWriteCtx;
|
||||
use crate::request::OptionOutputTx;
|
||||
use crate::schedule::scheduler::SchedulerRef;
|
||||
@@ -63,6 +63,7 @@ pub(crate) struct RegionOpener {
|
||||
skip_wal_replay: bool,
|
||||
intermediate_manager: IntermediateManager,
|
||||
time_provider: Option<TimeProviderRef>,
|
||||
stats: ManifestStats,
|
||||
}
|
||||
|
||||
impl RegionOpener {
|
||||
@@ -87,6 +88,7 @@ impl RegionOpener {
|
||||
skip_wal_replay: false,
|
||||
intermediate_manager,
|
||||
time_provider: None,
|
||||
stats: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -169,8 +171,12 @@ impl RegionOpener {
|
||||
// Create a manifest manager for this region and writes regions to the manifest file.
|
||||
let region_manifest_options = self.manifest_options(config, &options)?;
|
||||
let metadata = Arc::new(self.metadata.unwrap());
|
||||
let manifest_manager =
|
||||
RegionManifestManager::new(metadata.clone(), region_manifest_options).await?;
|
||||
let manifest_manager = RegionManifestManager::new(
|
||||
metadata.clone(),
|
||||
region_manifest_options,
|
||||
self.stats.total_manifest_size.clone(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let memtable_builder = self
|
||||
.memtable_builder_provider
|
||||
@@ -217,6 +223,7 @@ impl RegionOpener {
|
||||
last_flush_millis: AtomicI64::new(time_provider.current_time_millis()),
|
||||
time_provider,
|
||||
memtable_builder,
|
||||
stats: self.stats,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -267,7 +274,11 @@ impl RegionOpener {
|
||||
let region_options = self.options.as_ref().unwrap().clone();
|
||||
|
||||
let region_manifest_options = self.manifest_options(config, ®ion_options)?;
|
||||
let Some(manifest_manager) = RegionManifestManager::open(region_manifest_options).await?
|
||||
let Some(manifest_manager) = RegionManifestManager::open(
|
||||
region_manifest_options,
|
||||
self.stats.total_manifest_size.clone(),
|
||||
)
|
||||
.await?
|
||||
else {
|
||||
return Ok(None);
|
||||
};
|
||||
@@ -350,6 +361,7 @@ impl RegionOpener {
|
||||
last_flush_millis: AtomicI64::new(time_provider.current_time_millis()),
|
||||
time_provider,
|
||||
memtable_builder,
|
||||
stats: self.stats.clone(),
|
||||
};
|
||||
Ok(Some(region))
|
||||
}
|
||||
|
||||
@@ -356,11 +356,11 @@ impl TestEnv {
|
||||
};
|
||||
|
||||
if let Some(metadata) = initial_metadata {
|
||||
RegionManifestManager::new(metadata, manifest_opts)
|
||||
RegionManifestManager::new(metadata, manifest_opts, Default::default())
|
||||
.await
|
||||
.map(Some)
|
||||
} else {
|
||||
RegionManifestManager::open(manifest_opts).await
|
||||
RegionManifestManager::open(manifest_opts, Default::default()).await
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -109,6 +109,7 @@ impl SchedulerEnv {
|
||||
compress_type: CompressionType::Uncompressed,
|
||||
checkpoint_distance: 10,
|
||||
},
|
||||
Default::default(),
|
||||
)
|
||||
.await
|
||||
.unwrap(),
|
||||
|
||||
@@ -79,7 +79,7 @@ impl RegionEngine for MetaRegionEngine {
|
||||
})
|
||||
}
|
||||
|
||||
async fn region_disk_usage(&self, _region_id: RegionId) -> Option<i64> {
|
||||
fn region_disk_usage(&self, _region_id: RegionId) -> Option<i64> {
|
||||
None
|
||||
}
|
||||
|
||||
|
||||
@@ -200,7 +200,7 @@ pub trait RegionEngine: Send + Sync {
|
||||
async fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef, BoxedError>;
|
||||
|
||||
/// Retrieves region's disk usage.
|
||||
async fn region_disk_usage(&self, region_id: RegionId) -> Option<i64>;
|
||||
fn region_disk_usage(&self, region_id: RegionId) -> Option<i64>;
|
||||
|
||||
/// Stops the engine
|
||||
async fn stop(&self) -> Result<(), BoxedError>;
|
||||
|
||||
Reference in New Issue
Block a user