mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
feat: file ref mgr (#6844)
* feat: file ref manager Signed-off-by: discord9 <discord9@163.com> * refactor: put file ref mgr to sep file Signed-off-by: discord9 <discord9@163.com> * chore: rename path Signed-off-by: discord9 <discord9@163.com> * test: ref path Signed-off-by: discord9 <discord9@163.com> * fix: pass node id Signed-off-by: discord9 <discord9@163.com> * chore: after rebase fix Signed-off-by: discord9 <discord9@163.com> * feat: exlucde already in manifest files Signed-off-by: discord9 <discord9@163.com> * docs: explain why it can work Signed-off-by: discord9 <discord9@163.com> * feat: also include manifest versions Signed-off-by: discord9 <discord9@163.com> * refactor: per review Signed-off-by: discord9 <discord9@163.com> * rename func to imply what's it actually doing Signed-off-by: discord9 <discord9@163.com> * more docs Signed-off-by: discord9 <discord9@163.com> * docs: expect gc worker to do the job right Signed-off-by: discord9 <discord9@163.com> * refactor: partially per review Signed-off-by: discord9 <discord9@163.com> * refactor: per review Signed-off-by: discord9 <discord9@163.com> * chore: unused Signed-off-by: discord9 <discord9@163.com> * metrics: change to per datanode instead Signed-off-by: discord9 <discord9@163.com> --------- Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -7593,6 +7593,7 @@ dependencies = [
|
||||
"crc32fast",
|
||||
"criterion 0.4.0",
|
||||
"crossbeam-utils",
|
||||
"dashmap",
|
||||
"datafusion",
|
||||
"datafusion-common",
|
||||
"datafusion-expr",
|
||||
|
||||
@@ -39,6 +39,7 @@ use meta_client::MetaClientRef;
|
||||
use metric_engine::engine::MetricEngine;
|
||||
use mito2::config::MitoConfig;
|
||||
use mito2::engine::{MitoEngine, MitoEngineBuilder};
|
||||
use mito2::sst::file_ref::{FileReferenceManager, FileReferenceManagerRef};
|
||||
use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef};
|
||||
use object_store::util::normalize_dir;
|
||||
use query::dummy_catalog::{DummyCatalogManager, TableProviderFactoryRef};
|
||||
@@ -238,8 +239,13 @@ impl DatanodeBuilder {
|
||||
table_id_schema_cache,
|
||||
schema_cache,
|
||||
));
|
||||
let file_ref_manager = Arc::new(FileReferenceManager::new(Some(node_id)));
|
||||
let region_server = self
|
||||
.new_region_server(schema_metadata_manager, region_event_listener)
|
||||
.new_region_server(
|
||||
schema_metadata_manager,
|
||||
region_event_listener,
|
||||
file_ref_manager,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// TODO(weny): Considering introducing a readonly kv_backend trait.
|
||||
@@ -361,6 +367,7 @@ impl DatanodeBuilder {
|
||||
&mut self,
|
||||
schema_metadata_manager: SchemaMetadataManagerRef,
|
||||
event_listener: RegionServerEventListenerRef,
|
||||
file_ref_manager: FileReferenceManagerRef,
|
||||
) -> Result<RegionServer> {
|
||||
let opts: &DatanodeOptions = &self.opts;
|
||||
|
||||
@@ -399,6 +406,7 @@ impl DatanodeBuilder {
|
||||
.build_store_engines(
|
||||
object_store_manager,
|
||||
schema_metadata_manager,
|
||||
file_ref_manager,
|
||||
self.plugins.clone(),
|
||||
)
|
||||
.await?;
|
||||
@@ -419,6 +427,7 @@ impl DatanodeBuilder {
|
||||
&mut self,
|
||||
object_store_manager: ObjectStoreManagerRef,
|
||||
schema_metadata_manager: SchemaMetadataManagerRef,
|
||||
file_ref_manager: FileReferenceManagerRef,
|
||||
plugins: Plugins,
|
||||
) -> Result<Vec<RegionEngineRef>> {
|
||||
let mut metric_engine_config = metric_engine::config::EngineConfig::default();
|
||||
@@ -444,6 +453,7 @@ impl DatanodeBuilder {
|
||||
object_store_manager.clone(),
|
||||
mito_engine_config,
|
||||
schema_metadata_manager.clone(),
|
||||
file_ref_manager.clone(),
|
||||
plugins.clone(),
|
||||
)
|
||||
.await?;
|
||||
@@ -469,6 +479,7 @@ impl DatanodeBuilder {
|
||||
object_store_manager: ObjectStoreManagerRef,
|
||||
mut config: MitoConfig,
|
||||
schema_metadata_manager: SchemaMetadataManagerRef,
|
||||
file_ref_manager: FileReferenceManagerRef,
|
||||
plugins: Plugins,
|
||||
) -> Result<MitoEngine> {
|
||||
let opts = &self.opts;
|
||||
@@ -490,6 +501,7 @@ impl DatanodeBuilder {
|
||||
log_store,
|
||||
object_store_manager,
|
||||
schema_metadata_manager,
|
||||
file_ref_manager,
|
||||
plugins,
|
||||
);
|
||||
|
||||
@@ -530,6 +542,7 @@ impl DatanodeBuilder {
|
||||
log_store,
|
||||
object_store_manager,
|
||||
schema_metadata_manager,
|
||||
file_ref_manager,
|
||||
plugins,
|
||||
);
|
||||
|
||||
|
||||
@@ -42,6 +42,7 @@ datafusion.workspace = true
|
||||
datafusion-common.workspace = true
|
||||
datafusion-expr.workspace = true
|
||||
datatypes.workspace = true
|
||||
dashmap.workspace = true
|
||||
dotenv.workspace = true
|
||||
either.workspace = true
|
||||
futures.workspace = true
|
||||
|
||||
@@ -36,7 +36,7 @@ use crate::error::{CleanDirSnafu, DeleteIndexSnafu, DeleteSstSnafu, OpenDalSnafu
|
||||
use crate::metrics::{COMPACTION_STAGE_ELAPSED, FLUSH_ELAPSED};
|
||||
use crate::read::Source;
|
||||
use crate::region::options::IndexOptions;
|
||||
use crate::sst::file::{FileHandle, FileId, FileMeta, RegionFileId};
|
||||
use crate::sst::file::{FileHandle, FileId, RegionFileId};
|
||||
use crate::sst::index::intermediate::IntermediateManager;
|
||||
use crate::sst::index::puffin_manager::PuffinManagerFactory;
|
||||
use crate::sst::index::IndexerBuilderImpl;
|
||||
@@ -195,21 +195,21 @@ impl AccessLayer {
|
||||
}
|
||||
|
||||
/// Deletes a SST file (and its index file if it has one) with given file id.
|
||||
pub(crate) async fn delete_sst(&self, file_meta: &FileMeta) -> Result<()> {
|
||||
let path = location::sst_file_path(&self.table_dir, file_meta.file_id(), self.path_type);
|
||||
pub(crate) async fn delete_sst(&self, region_file_id: &RegionFileId) -> Result<()> {
|
||||
let path = location::sst_file_path(&self.table_dir, *region_file_id, self.path_type);
|
||||
self.object_store
|
||||
.delete(&path)
|
||||
.await
|
||||
.context(DeleteSstSnafu {
|
||||
file_id: file_meta.file_id,
|
||||
file_id: region_file_id.file_id(),
|
||||
})?;
|
||||
|
||||
let path = location::index_file_path(&self.table_dir, file_meta.file_id(), self.path_type);
|
||||
let path = location::index_file_path(&self.table_dir, *region_file_id, self.path_type);
|
||||
self.object_store
|
||||
.delete(&path)
|
||||
.await
|
||||
.context(DeleteIndexSnafu {
|
||||
file_id: file_meta.file_id,
|
||||
file_id: region_file_id.file_id(),
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -97,7 +97,7 @@ use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
|
||||
use store_api::ManifestVersion;
|
||||
use tokio::sync::{oneshot, Semaphore};
|
||||
|
||||
use crate::cache::CacheStrategy;
|
||||
use crate::cache::{CacheManagerRef, CacheStrategy};
|
||||
use crate::config::MitoConfig;
|
||||
use crate::error::{
|
||||
InvalidRequestSnafu, JoinSnafu, MitoManifestInfoSnafu, RecvSnafu, RegionNotFoundSnafu, Result,
|
||||
@@ -113,6 +113,7 @@ use crate::read::stream::ScanBatchStream;
|
||||
use crate::region::MitoRegionRef;
|
||||
use crate::request::{RegionEditRequest, WorkerRequest};
|
||||
use crate::sst::file::FileMeta;
|
||||
use crate::sst::file_ref::FileReferenceManagerRef;
|
||||
use crate::wal::entry_distributor::{
|
||||
build_wal_entry_distributor_and_receivers, DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
|
||||
};
|
||||
@@ -127,6 +128,7 @@ pub struct MitoEngineBuilder<'a, S: LogStore> {
|
||||
log_store: Arc<S>,
|
||||
object_store_manager: ObjectStoreManagerRef,
|
||||
schema_metadata_manager: SchemaMetadataManagerRef,
|
||||
file_ref_manager: FileReferenceManagerRef,
|
||||
plugins: Plugins,
|
||||
#[cfg(feature = "enterprise")]
|
||||
extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
|
||||
@@ -139,6 +141,7 @@ impl<'a, S: LogStore> MitoEngineBuilder<'a, S> {
|
||||
log_store: Arc<S>,
|
||||
object_store_manager: ObjectStoreManagerRef,
|
||||
schema_metadata_manager: SchemaMetadataManagerRef,
|
||||
file_ref_manager: FileReferenceManagerRef,
|
||||
plugins: Plugins,
|
||||
) -> Self {
|
||||
Self {
|
||||
@@ -147,6 +150,7 @@ impl<'a, S: LogStore> MitoEngineBuilder<'a, S> {
|
||||
log_store,
|
||||
object_store_manager,
|
||||
schema_metadata_manager,
|
||||
file_ref_manager,
|
||||
plugins,
|
||||
#[cfg(feature = "enterprise")]
|
||||
extension_range_provider_factory: None,
|
||||
@@ -174,6 +178,7 @@ impl<'a, S: LogStore> MitoEngineBuilder<'a, S> {
|
||||
self.log_store.clone(),
|
||||
self.object_store_manager,
|
||||
self.schema_metadata_manager,
|
||||
self.file_ref_manager,
|
||||
self.plugins,
|
||||
)
|
||||
.await?;
|
||||
@@ -210,6 +215,7 @@ impl MitoEngine {
|
||||
log_store: Arc<S>,
|
||||
object_store_manager: ObjectStoreManagerRef,
|
||||
schema_metadata_manager: SchemaMetadataManagerRef,
|
||||
file_ref_manager: FileReferenceManagerRef,
|
||||
plugins: Plugins,
|
||||
) -> Result<MitoEngine> {
|
||||
let builder = MitoEngineBuilder::new(
|
||||
@@ -218,11 +224,24 @@ impl MitoEngine {
|
||||
log_store,
|
||||
object_store_manager,
|
||||
schema_metadata_manager,
|
||||
file_ref_manager,
|
||||
plugins,
|
||||
);
|
||||
builder.try_build().await
|
||||
}
|
||||
|
||||
pub fn mito_config(&self) -> &MitoConfig {
|
||||
&self.inner.config
|
||||
}
|
||||
|
||||
pub fn cache_manager(&self) -> CacheManagerRef {
|
||||
self.inner.workers.cache_manager()
|
||||
}
|
||||
|
||||
pub fn file_ref_manager(&self) -> FileReferenceManagerRef {
|
||||
self.inner.workers.file_ref_manager()
|
||||
}
|
||||
|
||||
/// Returns true if the specific region exists.
|
||||
pub fn is_region_exists(&self, region_id: RegionId) -> bool {
|
||||
self.inner.workers.is_region_exists(region_id)
|
||||
@@ -319,7 +338,7 @@ impl MitoEngine {
|
||||
self.find_region(id)
|
||||
}
|
||||
|
||||
fn find_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
|
||||
pub(crate) fn find_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
|
||||
self.inner.workers.get_region(region_id)
|
||||
}
|
||||
|
||||
@@ -926,6 +945,7 @@ impl MitoEngine {
|
||||
listener: Option<crate::engine::listener::EventListenerRef>,
|
||||
time_provider: crate::time_provider::TimeProviderRef,
|
||||
schema_metadata_manager: SchemaMetadataManagerRef,
|
||||
file_ref_manager: FileReferenceManagerRef,
|
||||
) -> Result<MitoEngine> {
|
||||
config.sanitize(data_home)?;
|
||||
|
||||
@@ -940,6 +960,7 @@ impl MitoEngine {
|
||||
write_buffer_manager,
|
||||
listener,
|
||||
schema_metadata_manager,
|
||||
file_ref_manager,
|
||||
time_provider,
|
||||
)
|
||||
.await?,
|
||||
|
||||
@@ -437,6 +437,11 @@ lazy_static! {
|
||||
"mito stalled write request in each worker",
|
||||
&[WORKER_LABEL]
|
||||
).unwrap();
|
||||
/// Number of ref files per table
|
||||
pub static ref GC_REF_FILE_CNT: IntGauge = register_int_gauge!(
|
||||
"greptime_gc_ref_file_count",
|
||||
"gc ref file count",
|
||||
).unwrap();
|
||||
/// Total number of stalled write requests.
|
||||
pub static ref WRITE_STALL_TOTAL: IntCounter = register_int_counter!(
|
||||
"greptime_mito_write_stall_total",
|
||||
|
||||
@@ -780,7 +780,6 @@ impl ManifestContext {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) async fn manifest(&self) -> Arc<crate::manifest::action::RegionManifest> {
|
||||
self.manifest_manager.read().await.manifest()
|
||||
}
|
||||
|
||||
@@ -60,7 +60,8 @@ use crate::region::{
|
||||
use crate::region_write_ctx::RegionWriteCtx;
|
||||
use crate::request::OptionOutputTx;
|
||||
use crate::schedule::scheduler::SchedulerRef;
|
||||
use crate::sst::file_purger::LocalFilePurger;
|
||||
use crate::sst::file_purger::create_local_file_purger;
|
||||
use crate::sst::file_ref::FileReferenceManagerRef;
|
||||
use crate::sst::index::intermediate::IntermediateManager;
|
||||
use crate::sst::index::puffin_manager::PuffinManagerFactory;
|
||||
use crate::sst::location::region_dir_from_table_dir;
|
||||
@@ -86,6 +87,7 @@ pub(crate) struct RegionOpener {
|
||||
stats: ManifestStats,
|
||||
wal_entry_reader: Option<Box<dyn WalEntryReader>>,
|
||||
replay_checkpoint: Option<u64>,
|
||||
file_ref_manager: FileReferenceManagerRef,
|
||||
}
|
||||
|
||||
impl RegionOpener {
|
||||
@@ -102,6 +104,7 @@ impl RegionOpener {
|
||||
puffin_manager_factory: PuffinManagerFactory,
|
||||
intermediate_manager: IntermediateManager,
|
||||
time_provider: TimeProviderRef,
|
||||
file_ref_manager: FileReferenceManagerRef,
|
||||
) -> RegionOpener {
|
||||
RegionOpener {
|
||||
region_id,
|
||||
@@ -120,6 +123,7 @@ impl RegionOpener {
|
||||
stats: Default::default(),
|
||||
wal_entry_reader: None,
|
||||
replay_checkpoint: None,
|
||||
file_ref_manager,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -287,11 +291,12 @@ impl RegionOpener {
|
||||
manifest_manager,
|
||||
RegionRoleState::Leader(RegionLeaderState::Writable),
|
||||
)),
|
||||
file_purger: Arc::new(LocalFilePurger::new(
|
||||
file_purger: create_local_file_purger(
|
||||
self.purge_scheduler,
|
||||
access_layer,
|
||||
self.cache_manager,
|
||||
)),
|
||||
self.file_ref_manager.clone(),
|
||||
),
|
||||
provider,
|
||||
last_flush_millis: AtomicI64::new(now),
|
||||
last_compaction_millis: AtomicI64::new(now),
|
||||
@@ -409,11 +414,12 @@ impl RegionOpener {
|
||||
self.puffin_manager_factory.clone(),
|
||||
self.intermediate_manager.clone(),
|
||||
));
|
||||
let file_purger = Arc::new(LocalFilePurger::new(
|
||||
let file_purger = create_local_file_purger(
|
||||
self.purge_scheduler.clone(),
|
||||
access_layer.clone(),
|
||||
self.cache_manager.clone(),
|
||||
));
|
||||
self.file_ref_manager.clone(),
|
||||
);
|
||||
let memtable_builder = self.memtable_builder_provider.builder_for_options(
|
||||
region_options.memtable.as_ref(),
|
||||
region_options.need_dedup(),
|
||||
|
||||
@@ -30,6 +30,7 @@ use store_api::storage::consts::{
|
||||
|
||||
pub mod file;
|
||||
pub mod file_purger;
|
||||
pub mod file_ref;
|
||||
pub mod index;
|
||||
pub mod location;
|
||||
pub mod parquet;
|
||||
|
||||
@@ -30,7 +30,7 @@ use store_api::region_request::PathType;
|
||||
use store_api::storage::RegionId;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::sst::file_purger::{FilePurgerRef, PurgeRequest};
|
||||
use crate::sst::file_purger::FilePurgerRef;
|
||||
use crate::sst::location;
|
||||
|
||||
/// Type to store SST level.
|
||||
@@ -337,16 +337,14 @@ struct FileHandleInner {
|
||||
|
||||
impl Drop for FileHandleInner {
|
||||
fn drop(&mut self) {
|
||||
if self.deleted.load(Ordering::Relaxed) {
|
||||
self.file_purger.send_request(PurgeRequest {
|
||||
file_meta: self.meta.clone(),
|
||||
});
|
||||
}
|
||||
self.file_purger
|
||||
.remove_file(self.meta.clone(), self.deleted.load(Ordering::Relaxed));
|
||||
}
|
||||
}
|
||||
|
||||
impl FileHandleInner {
|
||||
fn new(meta: FileMeta, file_purger: FilePurgerRef) -> FileHandleInner {
|
||||
file_purger.new_file(&meta);
|
||||
FileHandleInner {
|
||||
meta,
|
||||
compacting: AtomicBool::new(false),
|
||||
|
||||
@@ -23,18 +23,21 @@ use crate::cache::CacheManagerRef;
|
||||
use crate::error::Result;
|
||||
use crate::schedule::scheduler::SchedulerRef;
|
||||
use crate::sst::file::FileMeta;
|
||||
|
||||
/// Request to remove a file.
|
||||
#[derive(Debug)]
|
||||
pub struct PurgeRequest {
|
||||
/// File meta.
|
||||
pub file_meta: FileMeta,
|
||||
}
|
||||
use crate::sst::file_ref::FileReferenceManagerRef;
|
||||
|
||||
/// A worker to delete files in background.
|
||||
pub trait FilePurger: Send + Sync + fmt::Debug {
|
||||
/// Send a purge request to the background worker.
|
||||
fn send_request(&self, request: PurgeRequest);
|
||||
/// Send a request to remove the file.
|
||||
/// If `is_delete` is true, the file will be deleted from the storage.
|
||||
/// Otherwise, only the reference will be removed.
|
||||
fn remove_file(&self, file_meta: FileMeta, is_delete: bool);
|
||||
|
||||
/// Notify the purger of a new file created.
|
||||
/// This is useful for object store based storage, where we need to track the file references
|
||||
/// The default implementation is a no-op.
|
||||
fn new_file(&self, _: &FileMeta) {
|
||||
// noop
|
||||
}
|
||||
}
|
||||
|
||||
pub type FilePurgerRef = Arc<dyn FilePurger>;
|
||||
@@ -44,7 +47,7 @@ pub type FilePurgerRef = Arc<dyn FilePurger>;
|
||||
pub struct NoopFilePurger;
|
||||
|
||||
impl FilePurger for NoopFilePurger {
|
||||
fn send_request(&self, _: PurgeRequest) {
|
||||
fn remove_file(&self, _file_meta: FileMeta, _is_delete: bool) {
|
||||
// noop
|
||||
}
|
||||
}
|
||||
@@ -64,6 +67,42 @@ impl fmt::Debug for LocalFilePurger {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_local_fs(sst_layer: &AccessLayerRef) -> bool {
|
||||
sst_layer.object_store().info().scheme() == object_store::Scheme::Fs
|
||||
}
|
||||
|
||||
/// Creates a file purger based on the storage type of the access layer.
|
||||
/// Should be use in combination with Gc Worker.
|
||||
///
|
||||
/// If the storage is local file system, a `LocalFilePurger` is created, which deletes
|
||||
/// the files from both the storage and the cache.
|
||||
///
|
||||
/// If the storage is an object store, an `ObjectStoreFilePurger` is created, which
|
||||
/// only manages the file references without deleting the actual files.
|
||||
///
|
||||
pub fn create_file_purger(
|
||||
scheduler: SchedulerRef,
|
||||
sst_layer: AccessLayerRef,
|
||||
cache_manager: Option<CacheManagerRef>,
|
||||
file_ref_manager: FileReferenceManagerRef,
|
||||
) -> FilePurgerRef {
|
||||
if is_local_fs(&sst_layer) {
|
||||
Arc::new(LocalFilePurger::new(scheduler, sst_layer, cache_manager))
|
||||
} else {
|
||||
Arc::new(ObjectStoreFilePurger { file_ref_manager })
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a local file purger that deletes files from both the storage and the cache.
|
||||
pub fn create_local_file_purger(
|
||||
scheduler: SchedulerRef,
|
||||
sst_layer: AccessLayerRef,
|
||||
cache_manager: Option<CacheManagerRef>,
|
||||
_file_ref_manager: FileReferenceManagerRef,
|
||||
) -> FilePurgerRef {
|
||||
Arc::new(LocalFilePurger::new(scheduler, sst_layer, cache_manager))
|
||||
}
|
||||
|
||||
impl LocalFilePurger {
|
||||
/// Creates a new purger.
|
||||
pub fn new(
|
||||
@@ -82,11 +121,9 @@ impl LocalFilePurger {
|
||||
pub async fn stop_scheduler(&self) -> Result<()> {
|
||||
self.scheduler.stop(true).await
|
||||
}
|
||||
}
|
||||
|
||||
impl FilePurger for LocalFilePurger {
|
||||
fn send_request(&self, request: PurgeRequest) {
|
||||
let file_meta = request.file_meta;
|
||||
/// Deletes the file(and it's index, if any) from cache and storage.
|
||||
fn delete_file(&self, file_meta: FileMeta) {
|
||||
let sst_layer = self.sst_layer.clone();
|
||||
|
||||
// Remove meta of the file from cache.
|
||||
@@ -96,7 +133,7 @@ impl FilePurger for LocalFilePurger {
|
||||
|
||||
let cache_manager = self.cache_manager.clone();
|
||||
if let Err(e) = self.scheduler.schedule(Box::pin(async move {
|
||||
if let Err(e) = sst_layer.delete_sst(&file_meta).await {
|
||||
if let Err(e) = sst_layer.delete_sst(&file_meta.file_id()).await {
|
||||
error!(e; "Failed to delete SST file, file_id: {}, region: {}",
|
||||
file_meta.file_id, file_meta.region_id);
|
||||
} else {
|
||||
@@ -151,6 +188,32 @@ impl FilePurger for LocalFilePurger {
|
||||
}
|
||||
}
|
||||
|
||||
impl FilePurger for LocalFilePurger {
|
||||
fn remove_file(&self, file_meta: FileMeta, is_delete: bool) {
|
||||
if is_delete {
|
||||
self.delete_file(file_meta);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ObjectStoreFilePurger {
|
||||
file_ref_manager: FileReferenceManagerRef,
|
||||
}
|
||||
|
||||
impl FilePurger for ObjectStoreFilePurger {
|
||||
fn remove_file(&self, file_meta: FileMeta, _is_delete: bool) {
|
||||
// if not on local file system, instead inform the global file purger to remove the file reference.
|
||||
// notice that no matter whether the file is deleted or not, we need to remove the reference
|
||||
// because the file is no longer in use nonetheless.
|
||||
self.file_ref_manager.remove_file(&file_meta);
|
||||
}
|
||||
|
||||
fn new_file(&self, file_meta: &FileMeta) {
|
||||
self.file_ref_manager.add_file(file_meta);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::num::NonZeroU64;
|
||||
|
||||
312
src/mito2/src/sst/file_ref.rs
Normal file
312
src/mito2/src/sst/file_ref.rs
Normal file
@@ -0,0 +1,312 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_telemetry::debug;
|
||||
use dashmap::{DashMap, Entry};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use store_api::storage::{RegionId, TableId};
|
||||
use store_api::ManifestVersion;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::metrics::GC_REF_FILE_CNT;
|
||||
use crate::region::RegionMapRef;
|
||||
use crate::sst::file::{FileId, FileMeta};
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
|
||||
pub struct FileRef {
|
||||
pub region_id: RegionId,
|
||||
pub file_id: FileId,
|
||||
}
|
||||
|
||||
impl FileRef {
|
||||
pub fn new(region_id: RegionId, file_id: FileId) -> Self {
|
||||
Self { region_id, file_id }
|
||||
}
|
||||
}
|
||||
|
||||
/// File references for a table.
|
||||
/// It contains all files referenced by the table.
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct TableFileRefs {
|
||||
/// (FileRef, Ref Count) meaning how many FileHandleInner is opened for this file.
|
||||
pub files: HashMap<FileRef, usize>,
|
||||
}
|
||||
|
||||
/// Manages all file references in one datanode.
|
||||
/// It keeps track of which files are referenced and group by table ids.
|
||||
/// And periodically update the references to tmp file in object storage.
|
||||
/// This is useful for ensuring that files are not deleted while they are still in use by any
|
||||
/// query.
|
||||
#[derive(Debug)]
|
||||
pub struct FileReferenceManager {
|
||||
/// Datanode id. used to determine tmp ref file name.
|
||||
node_id: Option<u64>,
|
||||
/// TODO(discord9): use no hash hasher since table id is sequential.
|
||||
files_per_table: DashMap<TableId, TableFileRefs>,
|
||||
}
|
||||
|
||||
pub type FileReferenceManagerRef = Arc<FileReferenceManager>;
|
||||
|
||||
/// The tmp file uploaded to object storage to record one table's file references.
|
||||
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct TableFileRefsManifest {
|
||||
pub file_refs: HashSet<FileRef>,
|
||||
/// Manifest version when this manifest is read for it's files
|
||||
pub manifest_version: HashMap<RegionId, ManifestVersion>,
|
||||
}
|
||||
|
||||
impl FileReferenceManager {
|
||||
pub fn new(node_id: Option<u64>) -> Self {
|
||||
Self {
|
||||
node_id,
|
||||
files_per_table: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
fn ref_file_set(&self, table_id: TableId) -> Option<HashSet<FileRef>> {
|
||||
let file_refs = if let Some(file_refs) = self.files_per_table.get(&table_id) {
|
||||
file_refs.clone()
|
||||
} else {
|
||||
// still return an empty manifest to indicate no files are referenced.
|
||||
// and differentiate from error case where table_id not found.
|
||||
return None;
|
||||
};
|
||||
|
||||
if file_refs.files.is_empty() {
|
||||
// still return an empty manifest to indicate no files are referenced.
|
||||
// and differentiate from error case where table_id not found.
|
||||
return Some(HashSet::new());
|
||||
}
|
||||
|
||||
let ref_file_set: HashSet<FileRef> = file_refs.files.keys().cloned().collect();
|
||||
|
||||
debug!(
|
||||
"Get file refs for table {}, node {:?}, {} files",
|
||||
table_id,
|
||||
self.node_id,
|
||||
ref_file_set.len(),
|
||||
);
|
||||
|
||||
Some(ref_file_set)
|
||||
}
|
||||
|
||||
/// Gets all ref files for the given table id, excluding those already in region manifest.
|
||||
///
|
||||
/// It's safe if manifest version became outdated when gc worker is called, as gc worker will check the changes between those two versions and act accordingly to make sure to get the real truly tmp ref file sets at the time of old manifest version.
|
||||
///
|
||||
/// TODO(discord9): Since query will only possible refer to files in latest manifest when it's started, the only true risks is files removed from manifest between old version(when reading refs) and new version(at gc worker), so in case of having outdated manifest version, gc worker should make sure not to delete those files(Until next gc round which will use the latest manifest version and handle those files normally).
|
||||
/// or perhaps using a two-phase commit style process where it proposes a set of files for deletion and then verifies no new references have appeared before committing the delete.
|
||||
///
|
||||
/// gc worker could do this:
|
||||
/// 1. if can get the files that got removed from old manifest to new manifest, then shouldn't delete those files even if they are not in tmp ref file, other files can be normally handled(deleted if not in use, otherwise keep)
|
||||
/// and report back allow next gc round to handle those files with newer tmp ref file sets.
|
||||
/// 2. if can't get the files that got removed from old manifest to new manifest(possible if just did a checkpoint),
|
||||
/// then can do nothing as can't sure whether a file is truly unused or just tmp ref file sets haven't report it, so need to report back and try next gc round to handle those files with newer tmp ref file sets.
|
||||
///
|
||||
#[allow(unused)]
|
||||
pub(crate) async fn get_snapshot_of_unmanifested_refs(
|
||||
&self,
|
||||
table_id: TableId,
|
||||
region_map: &RegionMapRef,
|
||||
) -> Result<TableFileRefsManifest> {
|
||||
let Some(ref_files) = self.ref_file_set(table_id) else {
|
||||
return Ok(Default::default());
|
||||
};
|
||||
let region_list = region_map.list_regions();
|
||||
let table_regions = region_list
|
||||
.iter()
|
||||
.filter(|r| r.region_id().table_id() == table_id)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let mut in_manifest_files = HashSet::new();
|
||||
let mut manifest_version = HashMap::new();
|
||||
|
||||
for r in &table_regions {
|
||||
let manifest = r.manifest_ctx.manifest().await;
|
||||
let files = manifest.files.keys().cloned().collect::<Vec<_>>();
|
||||
in_manifest_files.extend(files);
|
||||
manifest_version.insert(r.region_id(), manifest.manifest_version);
|
||||
}
|
||||
|
||||
let ref_files_excluding_in_manifest = ref_files
|
||||
.iter()
|
||||
.filter(|f| !in_manifest_files.contains(&f.file_id))
|
||||
.cloned()
|
||||
.collect::<HashSet<_>>();
|
||||
|
||||
Ok(TableFileRefsManifest {
|
||||
file_refs: ref_files_excluding_in_manifest,
|
||||
manifest_version,
|
||||
})
|
||||
}
|
||||
|
||||
/// Adds a new file reference.
|
||||
/// Also records the access layer for the table if not exists.
|
||||
/// The access layer will be used to upload ref file to object storage.
|
||||
pub fn add_file(&self, file_meta: &FileMeta) {
|
||||
let table_id = file_meta.region_id.table_id();
|
||||
let mut is_new = false;
|
||||
{
|
||||
let file_ref = FileRef::new(file_meta.region_id, file_meta.file_id);
|
||||
self.files_per_table
|
||||
.entry(table_id)
|
||||
.and_modify(|refs| {
|
||||
refs.files
|
||||
.entry(file_ref.clone())
|
||||
.and_modify(|count| *count += 1)
|
||||
.or_insert_with(|| {
|
||||
is_new = true;
|
||||
1
|
||||
});
|
||||
})
|
||||
.or_insert_with(|| TableFileRefs {
|
||||
files: HashMap::from_iter([(file_ref, 1)]),
|
||||
});
|
||||
}
|
||||
if is_new {
|
||||
GC_REF_FILE_CNT.inc();
|
||||
}
|
||||
}
|
||||
|
||||
/// Removes a file reference.
|
||||
/// If the reference count reaches zero, the file reference will be removed from the manager.
|
||||
pub fn remove_file(&self, file_meta: &FileMeta) {
|
||||
let table_id = file_meta.region_id.table_id();
|
||||
let file_ref = FileRef::new(file_meta.region_id, file_meta.file_id);
|
||||
|
||||
let mut remove_table_entry = false;
|
||||
let mut remove_file_ref = false;
|
||||
let mut file_cnt = 0;
|
||||
|
||||
let table_ref = self.files_per_table.entry(table_id).and_modify(|refs| {
|
||||
let entry = refs.files.entry(file_ref.clone()).and_modify(|count| {
|
||||
if *count > 0 {
|
||||
*count -= 1;
|
||||
}
|
||||
if *count == 0 {
|
||||
remove_file_ref = true;
|
||||
}
|
||||
});
|
||||
if let std::collections::hash_map::Entry::Occupied(o) = entry
|
||||
&& remove_file_ref
|
||||
{
|
||||
o.remove_entry();
|
||||
}
|
||||
|
||||
file_cnt = refs.files.len();
|
||||
|
||||
if refs.files.is_empty() {
|
||||
remove_table_entry = true;
|
||||
}
|
||||
});
|
||||
|
||||
if let Entry::Occupied(o) = table_ref
|
||||
&& remove_table_entry
|
||||
{
|
||||
o.remove_entry();
|
||||
}
|
||||
if remove_file_ref {
|
||||
GC_REF_FILE_CNT.dec();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::num::NonZeroU64;
|
||||
|
||||
use smallvec::SmallVec;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use super::*;
|
||||
use crate::sst::file::{FileId, FileMeta, FileTimeRange, IndexType, RegionFileId};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_file_ref_mgr() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let sst_file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random());
|
||||
|
||||
let file_ref_mgr = FileReferenceManager::new(None);
|
||||
|
||||
let file_meta = FileMeta {
|
||||
region_id: sst_file_id.region_id(),
|
||||
file_id: sst_file_id.file_id(),
|
||||
time_range: FileTimeRange::default(),
|
||||
level: 0,
|
||||
file_size: 4096,
|
||||
available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
|
||||
index_file_size: 4096,
|
||||
num_rows: 1024,
|
||||
num_row_groups: 1,
|
||||
sequence: NonZeroU64::new(4096),
|
||||
};
|
||||
|
||||
file_ref_mgr.add_file(&file_meta);
|
||||
|
||||
assert_eq!(
|
||||
file_ref_mgr.files_per_table.get(&0).unwrap().files,
|
||||
HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 1)])
|
||||
);
|
||||
|
||||
file_ref_mgr.add_file(&file_meta);
|
||||
|
||||
let expected_table_ref_manifest =
|
||||
HashSet::from_iter([FileRef::new(file_meta.region_id, file_meta.file_id)]);
|
||||
|
||||
assert_eq!(
|
||||
file_ref_mgr.ref_file_set(0).unwrap(),
|
||||
expected_table_ref_manifest
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
file_ref_mgr.files_per_table.get(&0).unwrap().files,
|
||||
HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 2)])
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
file_ref_mgr.ref_file_set(0).unwrap(),
|
||||
expected_table_ref_manifest
|
||||
);
|
||||
|
||||
file_ref_mgr.remove_file(&file_meta);
|
||||
|
||||
assert_eq!(
|
||||
file_ref_mgr.files_per_table.get(&0).unwrap().files,
|
||||
HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 1)])
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
file_ref_mgr.ref_file_set(0).unwrap(),
|
||||
expected_table_ref_manifest
|
||||
);
|
||||
|
||||
file_ref_mgr.remove_file(&file_meta);
|
||||
|
||||
assert!(
|
||||
file_ref_mgr.files_per_table.get(&0).is_none(),
|
||||
"{:?}",
|
||||
file_ref_mgr.files_per_table
|
||||
);
|
||||
|
||||
assert!(
|
||||
file_ref_mgr.ref_file_set(0).is_none(),
|
||||
"{:?}",
|
||||
file_ref_mgr.files_per_table
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -74,6 +74,7 @@ use crate::flush::{WriteBufferManager, WriteBufferManagerRef};
|
||||
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
|
||||
use crate::read::{Batch, BatchBuilder, BatchReader};
|
||||
use crate::sst::file_purger::{FilePurgerRef, NoopFilePurger};
|
||||
use crate::sst::file_ref::{FileReferenceManager, FileReferenceManagerRef};
|
||||
use crate::sst::index::intermediate::IntermediateManager;
|
||||
use crate::sst::index::puffin_manager::PuffinManagerFactory;
|
||||
use crate::time_provider::{StdTimeProvider, TimeProviderRef};
|
||||
@@ -209,6 +210,7 @@ pub struct TestEnv {
|
||||
log_store_factory: LogStoreFactory,
|
||||
object_store_manager: Option<ObjectStoreManagerRef>,
|
||||
schema_metadata_manager: SchemaMetadataManagerRef,
|
||||
file_ref_manager: FileReferenceManagerRef,
|
||||
kv_backend: KvBackendRef,
|
||||
}
|
||||
|
||||
@@ -243,6 +245,7 @@ impl TestEnv {
|
||||
log_store_factory: LogStoreFactory::RaftEngine(RaftEngineLogStoreFactory),
|
||||
object_store_manager: None,
|
||||
schema_metadata_manager,
|
||||
file_ref_manager: Arc::new(FileReferenceManager::new(None)),
|
||||
kv_backend,
|
||||
}
|
||||
}
|
||||
@@ -280,6 +283,7 @@ impl TestEnv {
|
||||
log_store,
|
||||
zelf.object_store_manager.as_ref().unwrap().clone(),
|
||||
zelf.schema_metadata_manager.clone(),
|
||||
zelf.file_ref_manager.clone(),
|
||||
Plugins::new(),
|
||||
)
|
||||
.await
|
||||
@@ -333,6 +337,7 @@ impl TestEnv {
|
||||
listener,
|
||||
Arc::new(StdTimeProvider),
|
||||
self.schema_metadata_manager.clone(),
|
||||
self.file_ref_manager.clone(),
|
||||
)
|
||||
.await
|
||||
.unwrap(),
|
||||
@@ -345,6 +350,7 @@ impl TestEnv {
|
||||
listener,
|
||||
Arc::new(StdTimeProvider),
|
||||
self.schema_metadata_manager.clone(),
|
||||
self.file_ref_manager.clone(),
|
||||
)
|
||||
.await
|
||||
.unwrap(),
|
||||
@@ -388,6 +394,7 @@ impl TestEnv {
|
||||
listener,
|
||||
Arc::new(StdTimeProvider),
|
||||
self.schema_metadata_manager.clone(),
|
||||
self.file_ref_manager.clone(),
|
||||
)
|
||||
.await
|
||||
.unwrap(),
|
||||
@@ -400,6 +407,7 @@ impl TestEnv {
|
||||
listener,
|
||||
Arc::new(StdTimeProvider),
|
||||
self.schema_metadata_manager.clone(),
|
||||
self.file_ref_manager.clone(),
|
||||
)
|
||||
.await
|
||||
.unwrap(),
|
||||
@@ -432,6 +440,7 @@ impl TestEnv {
|
||||
listener,
|
||||
time_provider.clone(),
|
||||
self.schema_metadata_manager.clone(),
|
||||
self.file_ref_manager.clone(),
|
||||
)
|
||||
.await
|
||||
.unwrap(),
|
||||
@@ -444,6 +453,7 @@ impl TestEnv {
|
||||
listener,
|
||||
time_provider.clone(),
|
||||
self.schema_metadata_manager.clone(),
|
||||
self.file_ref_manager.clone(),
|
||||
)
|
||||
.await
|
||||
.unwrap(),
|
||||
@@ -480,6 +490,7 @@ impl TestEnv {
|
||||
log_store,
|
||||
Arc::new(object_store_manager),
|
||||
self.schema_metadata_manager.clone(),
|
||||
self.file_ref_manager.clone(),
|
||||
Plugins::new(),
|
||||
)
|
||||
.await
|
||||
@@ -489,6 +500,7 @@ impl TestEnv {
|
||||
log_store,
|
||||
Arc::new(object_store_manager),
|
||||
self.schema_metadata_manager.clone(),
|
||||
self.file_ref_manager.clone(),
|
||||
Plugins::new(),
|
||||
)
|
||||
.await
|
||||
|
||||
@@ -66,6 +66,7 @@ use crate::request::{
|
||||
};
|
||||
use crate::schedule::scheduler::{LocalScheduler, SchedulerRef};
|
||||
use crate::sst::file::FileId;
|
||||
use crate::sst::file_ref::FileReferenceManagerRef;
|
||||
use crate::sst::index::intermediate::IntermediateManager;
|
||||
use crate::sst::index::puffin_manager::PuffinManagerFactory;
|
||||
use crate::time_provider::{StdTimeProvider, TimeProviderRef};
|
||||
@@ -130,6 +131,8 @@ pub(crate) struct WorkerGroup {
|
||||
purge_scheduler: SchedulerRef,
|
||||
/// Cache.
|
||||
cache_manager: CacheManagerRef,
|
||||
/// File reference manager.
|
||||
file_ref_manager: FileReferenceManagerRef,
|
||||
}
|
||||
|
||||
impl WorkerGroup {
|
||||
@@ -141,6 +144,7 @@ impl WorkerGroup {
|
||||
log_store: Arc<S>,
|
||||
object_store_manager: ObjectStoreManagerRef,
|
||||
schema_metadata_manager: SchemaMetadataManagerRef,
|
||||
file_ref_manager: FileReferenceManagerRef,
|
||||
plugins: Plugins,
|
||||
) -> Result<WorkerGroup> {
|
||||
let (flush_sender, flush_receiver) = watch::channel(());
|
||||
@@ -204,6 +208,7 @@ impl WorkerGroup {
|
||||
flush_receiver: flush_receiver.clone(),
|
||||
plugins: plugins.clone(),
|
||||
schema_metadata_manager: schema_metadata_manager.clone(),
|
||||
file_ref_manager: file_ref_manager.clone(),
|
||||
}
|
||||
.start()
|
||||
})
|
||||
@@ -215,6 +220,7 @@ impl WorkerGroup {
|
||||
compact_job_pool,
|
||||
purge_scheduler,
|
||||
cache_manager,
|
||||
file_ref_manager,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -266,6 +272,10 @@ impl WorkerGroup {
|
||||
self.cache_manager.clone()
|
||||
}
|
||||
|
||||
pub(crate) fn file_ref_manager(&self) -> FileReferenceManagerRef {
|
||||
self.file_ref_manager.clone()
|
||||
}
|
||||
|
||||
/// Get worker for specific `region_id`.
|
||||
pub(crate) fn worker(&self, region_id: RegionId) -> &RegionWorker {
|
||||
let index = region_id_to_index(region_id, self.workers.len());
|
||||
@@ -286,6 +296,7 @@ impl WorkerGroup {
|
||||
/// Starts a worker group with `write_buffer_manager` and `listener` for tests.
|
||||
///
|
||||
/// The number of workers should be power of two.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(crate) async fn start_for_test<S: LogStore>(
|
||||
config: Arc<MitoConfig>,
|
||||
log_store: Arc<S>,
|
||||
@@ -293,6 +304,7 @@ impl WorkerGroup {
|
||||
write_buffer_manager: Option<WriteBufferManagerRef>,
|
||||
listener: Option<crate::engine::listener::EventListenerRef>,
|
||||
schema_metadata_manager: SchemaMetadataManagerRef,
|
||||
file_ref_manager: FileReferenceManagerRef,
|
||||
time_provider: TimeProviderRef,
|
||||
) -> Result<WorkerGroup> {
|
||||
let (flush_sender, flush_receiver) = watch::channel(());
|
||||
@@ -350,6 +362,7 @@ impl WorkerGroup {
|
||||
flush_receiver: flush_receiver.clone(),
|
||||
plugins: Plugins::new(),
|
||||
schema_metadata_manager: schema_metadata_manager.clone(),
|
||||
file_ref_manager: file_ref_manager.clone(),
|
||||
}
|
||||
.start()
|
||||
})
|
||||
@@ -361,6 +374,7 @@ impl WorkerGroup {
|
||||
compact_job_pool,
|
||||
purge_scheduler,
|
||||
cache_manager,
|
||||
file_ref_manager,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -428,6 +442,7 @@ struct WorkerStarter<S> {
|
||||
flush_receiver: watch::Receiver<()>,
|
||||
plugins: Plugins,
|
||||
schema_metadata_manager: SchemaMetadataManagerRef,
|
||||
file_ref_manager: FileReferenceManagerRef,
|
||||
}
|
||||
|
||||
impl<S: LogStore> WorkerStarter<S> {
|
||||
@@ -480,6 +495,7 @@ impl<S: LogStore> WorkerStarter<S> {
|
||||
request_wait_time: REQUEST_WAIT_TIME.with_label_values(&[&id_string]),
|
||||
region_edit_queues: RegionEditQueues::default(),
|
||||
schema_metadata_manager: self.schema_metadata_manager,
|
||||
file_ref_manager: self.file_ref_manager.clone(),
|
||||
};
|
||||
let handle = common_runtime::spawn_global(async move {
|
||||
worker_thread.run().await;
|
||||
@@ -729,6 +745,8 @@ struct RegionWorkerLoop<S> {
|
||||
region_edit_queues: RegionEditQueues,
|
||||
/// Database level metadata manager.
|
||||
schema_metadata_manager: SchemaMetadataManagerRef,
|
||||
/// Datanode level file references manager.
|
||||
file_ref_manager: FileReferenceManagerRef,
|
||||
}
|
||||
|
||||
impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
|
||||
@@ -157,6 +157,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
self.puffin_manager_factory.clone(),
|
||||
self.intermediate_manager.clone(),
|
||||
self.time_provider.clone(),
|
||||
self.file_ref_manager.clone(),
|
||||
)
|
||||
.cache(Some(self.cache_manager.clone()))
|
||||
.options(region.version().options.clone())?
|
||||
|
||||
@@ -66,6 +66,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
self.puffin_manager_factory.clone(),
|
||||
self.intermediate_manager.clone(),
|
||||
self.time_provider.clone(),
|
||||
self.file_ref_manager.clone(),
|
||||
)
|
||||
.metadata_builder(builder)
|
||||
.parse_options(request.options)?
|
||||
|
||||
@@ -103,6 +103,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
self.puffin_manager_factory.clone(),
|
||||
self.intermediate_manager.clone(),
|
||||
self.time_provider.clone(),
|
||||
self.file_ref_manager.clone(),
|
||||
)
|
||||
.skip_wal_replay(request.skip_wal_replay)
|
||||
.cache(Some(self.cache_manager.clone()))
|
||||
|
||||
@@ -16,7 +16,7 @@ pub use opendal::raw::{Access, HttpClient};
|
||||
pub use opendal::{
|
||||
services, Buffer, Builder as ObjectStoreBuilder, Entry, EntryMode, Error, ErrorKind,
|
||||
FuturesAsyncReader, FuturesAsyncWriter, Lister, Operator as ObjectStore, Reader, Result,
|
||||
Writer,
|
||||
Scheme, Writer,
|
||||
};
|
||||
|
||||
pub mod config;
|
||||
|
||||
Reference in New Issue
Block a user