feat: file purger (#1030)

* wip

* wip

* feat: file purger

* chore: add tests

* feat: delete removed file on sst merge

* chore: move MockAccessLayer to test_util

* fix: some cr comments

* feat: add await termination for scheduler

* fix: some cr comments

* chore: rename max_file_in_level0 to max_files_in_level0
This commit is contained in:
Lei, HUANG
2023-02-19 14:56:41 +08:00
committed by GitHub
parent a9c8584c98
commit af1f8d6101
25 changed files with 917 additions and 391 deletions

View File

@@ -26,5 +26,6 @@ connect_timeout_millis = 5000
tcp_nodelay = false
[compaction]
max_inflight_task = 4
max_file_in_level0 = 16
max_inflight_tasks = 4
max_files_in_level0 = 16
max_purge_tasks = 32

View File

@@ -184,8 +184,9 @@ mod tests {
assert_eq!(
CompactionConfig {
max_inflight_task: 4,
max_file_in_level0: 16,
max_inflight_tasks: 4,
max_files_in_level0: 16,
max_purge_tasks: 32,
},
options.compaction
);

View File

@@ -110,16 +110,19 @@ impl Default for WalConfig {
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub struct CompactionConfig {
/// Max task number that can concurrently run.
pub max_inflight_task: usize,
pub max_inflight_tasks: usize,
/// Max files in level 0 to trigger compaction.
pub max_file_in_level0: usize,
pub max_files_in_level0: usize,
/// Max task number for SST purge task after compaction.
pub max_purge_tasks: usize,
}
impl Default for CompactionConfig {
fn default() -> Self {
Self {
max_inflight_task: 4,
max_file_in_level0: 8,
max_inflight_tasks: 4,
max_files_in_level0: 8,
max_purge_tasks: 32,
}
}
}
@@ -127,7 +130,7 @@ impl Default for CompactionConfig {
impl From<&DatanodeOptions> for SchedulerConfig {
fn from(value: &DatanodeOptions) -> Self {
Self {
max_inflight_task: value.compaction.max_inflight_task,
max_inflight_tasks: value.compaction.max_inflight_tasks,
}
}
}
@@ -135,7 +138,8 @@ impl From<&DatanodeOptions> for SchedulerConfig {
impl From<&DatanodeOptions> for StorageEngineConfig {
fn from(value: &DatanodeOptions) -> Self {
Self {
max_files_in_l0: value.compaction.max_file_in_level0,
max_files_in_l0: value.compaction.max_files_in_level0,
max_purge_tasks: value.compaction.max_purge_tasks,
}
}
}

View File

@@ -82,11 +82,11 @@ where
{
type Request = R;
async fn schedule(&self, _request: Self::Request) -> crate::error::Result<bool> {
fn schedule(&self, _request: Self::Request) -> crate::error::Result<bool> {
Ok(true)
}
async fn stop(&self) -> crate::error::Result<()> {
async fn stop(&self, _await_termination: bool) -> crate::error::Result<()> {
Ok(())
}
}

View File

@@ -109,253 +109,3 @@ where
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use super::*;
use crate::scheduler::dedup_deque::DedupDeque;
use crate::scheduler::rate_limit::{
BoxedRateLimitToken, CascadeRateLimiter, MaxInflightTaskLimiter,
};
use crate::scheduler::{HandlerLoop, LocalScheduler, Scheduler, SchedulerConfig};
struct CountdownLatch {
counter: std::sync::Mutex<usize>,
notifies: std::sync::RwLock<Vec<Arc<Notify>>>,
}
impl CountdownLatch {
fn new(size: usize) -> Self {
Self {
counter: std::sync::Mutex::new(size),
notifies: std::sync::RwLock::new(vec![]),
}
}
fn countdown(&self) {
let mut counter = self.counter.lock().unwrap();
if *counter >= 1 {
*counter -= 1;
if *counter == 0 {
let notifies = self.notifies.read().unwrap();
for waiter in notifies.iter() {
waiter.notify_one();
}
}
}
}
async fn wait(&self) {
let notify = Arc::new(Notify::new());
{
let notify = notify.clone();
let mut notifies = self.notifies.write().unwrap();
notifies.push(notify);
}
notify.notified().await
}
}
#[tokio::test]
async fn test_schedule_handler() {
common_telemetry::init_default_ut_logging();
let queue = Arc::new(std::sync::RwLock::new(DedupDeque::default()));
let latch = Arc::new(CountdownLatch::new(2));
let latch_cloned = latch.clone();
let handler = Arc::new(HandlerLoop {
req_queue: queue.clone(),
cancel_token: Default::default(),
task_notifier: Arc::new(Default::default()),
request_handler: MockHandler {
cb: move || {
latch_cloned.countdown();
},
},
limiter: Arc::new(CascadeRateLimiter::new(vec![Box::new(
MaxInflightTaskLimiter::new(3),
)])),
});
let handler_cloned = handler.clone();
common_runtime::spawn_bg(async move { handler_cloned.run().await });
queue.write().unwrap().push_back(1, MockRequest::default());
handler.task_notifier.notify_one();
queue.write().unwrap().push_back(2, MockRequest::default());
handler.task_notifier.notify_one();
tokio::time::timeout(Duration::from_secs(1), latch.wait())
.await
.unwrap();
}
#[derive(Default, Debug)]
struct MockRequest {
region_id: RegionId,
}
struct MockHandler<F> {
cb: F,
}
#[async_trait::async_trait]
impl<F> Handler for MockHandler<F>
where
F: Fn() + Send + Sync,
{
type Request = MockRequest;
async fn handle_request(
&self,
_req: Self::Request,
token: BoxedRateLimitToken,
finish_notifier: Arc<Notify>,
) -> Result<()> {
(self.cb)();
token.try_release();
finish_notifier.notify_one();
Ok(())
}
}
impl Request for MockRequest {
type Key = RegionId;
fn key(&self) -> Self::Key {
self.region_id
}
}
#[tokio::test]
async fn test_scheduler() {
let latch = Arc::new(CountdownLatch::new(2));
let latch_cloned = latch.clone();
let handler = MockHandler {
cb: move || {
latch_cloned.countdown();
},
};
let scheduler: LocalScheduler<MockRequest> = LocalScheduler::new(
SchedulerConfig {
max_inflight_task: 3,
},
handler,
);
scheduler
.schedule(MockRequest { region_id: 1 })
.await
.unwrap();
scheduler
.schedule(MockRequest { region_id: 2 })
.await
.unwrap();
tokio::time::timeout(Duration::from_secs(1), latch.wait())
.await
.unwrap();
}
#[tokio::test]
async fn test_scheduler_many() {
common_telemetry::init_default_ut_logging();
let task_size = 100;
let latch = Arc::new(CountdownLatch::new(task_size));
let latch_clone = latch.clone();
let handler = MockHandler {
cb: move || {
latch_clone.countdown();
},
};
let config = SchedulerConfig {
max_inflight_task: 3,
};
let scheduler = LocalScheduler::new(config, handler);
for i in 0..task_size {
scheduler
.schedule(MockRequest {
region_id: i as RegionId,
})
.await
.unwrap();
}
tokio::time::timeout(Duration::from_secs(3), latch.wait())
.await
.unwrap();
}
#[tokio::test]
async fn test_scheduler_interval() {
common_telemetry::init_default_ut_logging();
let task_size = 100;
let latch = Arc::new(CountdownLatch::new(task_size));
let latch_clone = latch.clone();
let handler = MockHandler {
cb: move || {
latch_clone.countdown();
},
};
let config = SchedulerConfig {
max_inflight_task: 3,
};
let scheduler = LocalScheduler::new(config, handler);
for i in 0..task_size / 2 {
scheduler
.schedule(MockRequest {
region_id: i as RegionId,
})
.await
.unwrap();
}
tokio::time::sleep(Duration::from_millis(100)).await;
for i in task_size / 2..task_size {
scheduler
.schedule(MockRequest {
region_id: i as RegionId,
})
.await
.unwrap();
}
tokio::time::timeout(Duration::from_secs(6), latch.wait())
.await
.unwrap();
}
#[tokio::test]
async fn test_schedule_duplicate_tasks() {
common_telemetry::init_default_ut_logging();
let handler = MockHandler { cb: || {} };
let config = SchedulerConfig {
max_inflight_task: 3,
};
let scheduler = LocalScheduler::new(config, handler);
let mut scheduled_task = 0;
for _ in 0..10 {
if scheduler
.schedule(MockRequest { region_id: 1 })
.await
.unwrap()
{
scheduled_task += 1;
}
}
scheduler.stop().await.unwrap();
debug!("Schedule tasks: {}", scheduled_task);
assert!(scheduled_task < 10);
}
}

View File

@@ -165,9 +165,10 @@ fn fit_time_bucket(span_sec: i64) -> i64 {
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use super::*;
use crate::file_purger::noop::new_noop_file_purger;
use crate::sst::FileMeta;
#[test]
@@ -227,14 +228,21 @@ mod tests {
}
fn new_file_handle(name: &str, start_ts_millis: i64, end_ts_millis: i64) -> FileHandle {
FileHandle::new(FileMeta {
file_name: name.to_string(),
time_range: Some((
Timestamp::new_millisecond(start_ts_millis),
Timestamp::new_millisecond(end_ts_millis),
)),
level: 0,
})
let file_purger = new_noop_file_purger();
let layer = Arc::new(crate::test_util::access_layer_util::MockAccessLayer {});
FileHandle::new(
FileMeta {
region_id: 0,
file_name: name.to_string(),
time_range: Some((
Timestamp::new_millisecond(start_ts_millis),
Timestamp::new_millisecond(end_ts_millis),
)),
level: 0,
},
layer,
file_purger,
)
}
fn new_file_handles(input: &[(&str, i64, i64)]) -> Vec<FileHandle> {

View File

@@ -17,6 +17,7 @@ use std::fmt::{Debug, Formatter};
use common_telemetry::{error, info};
use store_api::logstore::LogStore;
use store_api::storage::RegionId;
use uuid::Uuid;
use crate::compaction::writer::build_sst_reader;
@@ -62,11 +63,12 @@ impl<S: LogStore> CompactionTaskImpl<S> {
async fn merge_ssts(&mut self) -> Result<(Vec<FileMeta>, Vec<FileMeta>)> {
let mut futs = Vec::with_capacity(self.outputs.len());
let mut compacted_inputs = HashSet::new();
let region_id = self.shared_data.id();
for output in self.outputs.drain(..) {
let schema = self.schema.clone();
let sst_layer = self.sst_layer.clone();
compacted_inputs.extend(output.inputs.iter().map(|f| FileMeta {
region_id,
file_name: f.file_name().to_string(),
time_range: *f.time_range(),
level: f.level(),
@@ -74,7 +76,7 @@ impl<S: LogStore> CompactionTaskImpl<S> {
// TODO(hl): Maybe spawn to runtime to exploit in-job parallelism.
futs.push(async move {
match output.build(schema, sst_layer).await {
match output.build(region_id, schema, sst_layer).await {
Ok(meta) => Ok(meta),
Err(e) => Err(e),
}
@@ -118,7 +120,7 @@ impl<S: LogStore> CompactionTaskImpl<S> {
fn mark_files_compacting(&self, compacting: bool) {
for o in &self.outputs {
for input in &o.inputs {
input.set_compacting(compacting);
input.mark_compacting(compacting);
}
}
}
@@ -157,7 +159,12 @@ pub struct CompactionOutput {
}
impl CompactionOutput {
async fn build(&self, schema: RegionSchemaRef, sst_layer: AccessLayerRef) -> Result<FileMeta> {
async fn build(
&self,
region_id: RegionId,
schema: RegionSchemaRef,
sst_layer: AccessLayerRef,
) -> Result<FileMeta> {
let reader = build_sst_reader(
schema,
sst_layer.clone(),
@@ -174,6 +181,7 @@ impl CompactionOutput {
.await?;
Ok(FileMeta {
region_id,
file_name: output_file_name,
time_range,
level: self.output_level,

View File

@@ -97,6 +97,7 @@ mod tests {
use tempdir::TempDir;
use super::*;
use crate::file_purger::noop::new_noop_file_purger;
use crate::memtable::{
DefaultMemtableBuilder, IterContext, KeyValues, Memtable, MemtableBuilder,
};
@@ -224,11 +225,16 @@ mod tests {
.write_sst(&sst::WriteOptions::default())
.await
.unwrap();
let handle = FileHandle::new(FileMeta {
file_name: sst_file_name.to_string(),
time_range,
level: 0,
});
let handle = FileHandle::new(
FileMeta {
region_id: 0,
file_name: sst_file_name.to_string(),
time_range,
level: 0,
},
Arc::new(crate::test_util::access_layer_util::MockAccessLayer {}),
new_noop_file_purger(),
);
seq.fetch_add(1, Ordering::Relaxed);
handle
}
@@ -452,11 +458,16 @@ mod tests {
let output_files = ["o1.parquet", "o2.parquet", "o3.parquet"]
.into_iter()
.map(|f| {
FileHandle::new(FileMeta {
file_name: f.to_string(),
level: 1,
time_range: None,
})
FileHandle::new(
FileMeta {
region_id: 0,
file_name: f.to_string(),
level: 1,
time_range: None,
},
Arc::new(crate::test_util::access_layer_util::MockAccessLayer {}),
new_noop_file_purger(),
)
})
.collect::<Vec<_>>();

View File

@@ -17,10 +17,14 @@
#[derive(Debug, Clone)]
pub struct EngineConfig {
pub max_files_in_l0: usize,
pub max_purge_tasks: usize,
}
impl Default for EngineConfig {
fn default() -> Self {
Self { max_files_in_l0: 8 }
Self {
max_files_in_l0: 8,
max_purge_tasks: 32,
}
}
}

View File

@@ -28,11 +28,13 @@ use crate::background::JobPoolImpl;
use crate::compaction::CompactionSchedulerRef;
use crate::config::EngineConfig;
use crate::error::{self, Error, Result};
use crate::file_purger::{FilePurgeHandler, FilePurgerRef};
use crate::flush::{FlushSchedulerImpl, FlushSchedulerRef, FlushStrategyRef, SizeBasedStrategy};
use crate::manifest::region::RegionManifest;
use crate::memtable::{DefaultMemtableBuilder, MemtableBuilderRef};
use crate::metadata::RegionMetadata;
use crate::region::{RegionImpl, StoreConfig};
use crate::scheduler::{LocalScheduler, SchedulerConfig};
use crate::sst::FsAccessLayer;
/// [StorageEngine] implementation.
@@ -222,6 +224,7 @@ struct EngineInner<S: LogStore> {
flush_scheduler: FlushSchedulerRef,
flush_strategy: FlushStrategyRef,
compaction_scheduler: CompactionSchedulerRef<S>,
file_purger: FilePurgerRef,
config: Arc<EngineConfig>,
}
@@ -234,6 +237,13 @@ impl<S: LogStore> EngineInner<S> {
) -> Self {
let job_pool = Arc::new(JobPoolImpl {});
let flush_scheduler = Arc::new(FlushSchedulerImpl::new(job_pool));
let file_purger = Arc::new(LocalScheduler::new(
SchedulerConfig {
max_inflight_tasks: config.max_purge_tasks,
},
FilePurgeHandler,
));
Self {
object_store,
log_store,
@@ -242,6 +252,7 @@ impl<S: LogStore> EngineInner<S> {
flush_scheduler,
flush_strategy: Arc::new(SizeBasedStrategy::default()),
compaction_scheduler,
file_purger,
config: Arc::new(config),
}
}
@@ -341,6 +352,7 @@ impl<S: LogStore> EngineInner<S> {
flush_strategy: self.flush_strategy.clone(),
compaction_scheduler: self.compaction_scheduler.clone(),
engine_config: self.config.clone(),
file_purger: self.file_purger.clone(),
}
}
}

View File

@@ -420,11 +420,20 @@ pub enum Error {
#[snafu(display("Scheduler rate limited, msg: {}", msg))]
RateLimited { msg: String, backtrace: Backtrace },
#[snafu(display("Failed to stop scheduler, source: {:?}", source))]
#[snafu(display("Cannot schedule request, scheduler's already stopped"))]
IllegalSchedulerState { backtrace: Backtrace },
#[snafu(display("Failed to stop scheduler, source: {}", source))]
StopScheduler {
source: JoinError,
backtrace: Backtrace,
},
#[snafu(display("Failed to delete SST file, source: {}", source))]
DeleteSst {
source: object_store::Error,
backtrace: Backtrace,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -497,6 +506,8 @@ impl ErrorExt for Error {
DecodeParquetTimeRange { .. } => StatusCode::Unexpected,
RateLimited { .. } => StatusCode::Internal,
StopScheduler { .. } => StatusCode::Internal,
DeleteSst { .. } => StatusCode::StorageUnavailable,
IllegalSchedulerState { .. } => StatusCode::Unexpected,
}
}

View File

@@ -0,0 +1,233 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_telemetry::{debug, error};
use store_api::storage::RegionId;
use tokio::sync::Notify;
use crate::scheduler::rate_limit::{BoxedRateLimitToken, RateLimitToken};
use crate::scheduler::{Handler, LocalScheduler, Request};
use crate::sst::AccessLayerRef;
pub struct FilePurgeRequest {
pub region_id: RegionId,
pub file_name: String,
pub sst_layer: AccessLayerRef,
}
impl Request for FilePurgeRequest {
type Key = String;
fn key(&self) -> Self::Key {
format!("{}/{}", self.region_id, self.file_name)
}
}
pub struct FilePurgeHandler;
#[async_trait::async_trait]
impl Handler for FilePurgeHandler {
type Request = FilePurgeRequest;
async fn handle_request(
&self,
req: Self::Request,
token: BoxedRateLimitToken,
finish_notifier: Arc<Notify>,
) -> crate::error::Result<()> {
req.sst_layer
.delete_sst(&req.file_name)
.await
.map_err(|e| {
error!(e; "Failed to delete SST file, file: {}, region: {}", req.file_name, req.region_id);
e
})?;
debug!(
"Successfully deleted SST file: {}, region: {}",
req.file_name, req.region_id
);
token.try_release();
finish_notifier.notify_one();
Ok(())
}
}
pub type FilePurgerRef = Arc<LocalScheduler<FilePurgeRequest>>;
#[cfg(test)]
pub mod noop {
use std::sync::Arc;
use tokio::sync::Notify;
use crate::file_purger::{FilePurgeRequest, FilePurgerRef};
use crate::scheduler::rate_limit::{BoxedRateLimitToken, RateLimitToken};
use crate::scheduler::{Handler, LocalScheduler, SchedulerConfig};
pub fn new_noop_file_purger() -> FilePurgerRef {
Arc::new(LocalScheduler::new(
SchedulerConfig::default(),
NoopFilePurgeHandler,
))
}
#[derive(Debug)]
pub struct NoopFilePurgeHandler;
#[async_trait::async_trait]
impl Handler for NoopFilePurgeHandler {
type Request = FilePurgeRequest;
async fn handle_request(
&self,
_req: Self::Request,
token: BoxedRateLimitToken,
finish_notifier: Arc<Notify>,
) -> crate::error::Result<()> {
token.try_release();
finish_notifier.notify_one();
Ok(())
}
}
}
#[cfg(test)]
mod tests {
use object_store::backend::fs::Builder;
use object_store::ObjectStore;
use store_api::storage::OpType;
use tempdir::TempDir;
use super::*;
use crate::file_purger::noop::NoopFilePurgeHandler;
use crate::memtable::tests::{schema_for_test, write_kvs};
use crate::memtable::{DefaultMemtableBuilder, IterContext, MemtableBuilder};
use crate::scheduler::{Scheduler, SchedulerConfig};
use crate::sst::{AccessLayer, FileHandle, FileMeta, FsAccessLayer, Source, WriteOptions};
struct MockRateLimitToken;
impl RateLimitToken for MockRateLimitToken {
fn try_release(&self) {}
}
async fn create_sst_file(
os: ObjectStore,
sst_file_name: &str,
file_purger: FilePurgerRef,
) -> (FileHandle, String, AccessLayerRef) {
let schema = schema_for_test();
let memtable = DefaultMemtableBuilder::default().build(schema.clone());
write_kvs(
&*memtable,
10,
OpType::Put,
&[(1, 1), (2, 2)],
&[(Some(1), Some(1)), (Some(2), Some(2))],
);
let iter = memtable.iter(&IterContext::default()).unwrap();
let sst_path = "table1";
let layer = Arc::new(FsAccessLayer::new(sst_path, os.clone()));
let _sst_info = layer
.write_sst(sst_file_name, Source::Iter(iter), &WriteOptions {})
.await
.unwrap();
(
FileHandle::new(
FileMeta {
region_id: 0,
file_name: sst_file_name.to_string(),
time_range: None,
level: 0,
},
layer.clone(),
file_purger,
),
sst_path.to_string(),
layer as _,
)
}
#[tokio::test]
async fn test_file_purger_handler() {
let dir = TempDir::new("file-purge").unwrap();
let object_store = ObjectStore::new(
Builder::default()
.root(dir.path().to_str().unwrap())
.build()
.unwrap(),
);
let sst_file_name = "test-file-purge-handler.parquet";
let noop_file_purger = Arc::new(LocalScheduler::new(
SchedulerConfig::default(),
NoopFilePurgeHandler,
));
let (file, path, layer) =
create_sst_file(object_store.clone(), sst_file_name, noop_file_purger).await;
let request = FilePurgeRequest {
region_id: 0,
file_name: file.file_name().to_string(),
sst_layer: layer,
};
let handler = FilePurgeHandler;
let notify = Arc::new(Notify::new());
handler
.handle_request(request, Box::new(MockRateLimitToken {}), notify.clone())
.await
.unwrap();
notify.notified().await;
let object = object_store.object(&format!("{}/{}", path, sst_file_name));
assert!(!object.is_exist().await.unwrap());
}
#[tokio::test]
async fn test_file_purge_loop() {
common_telemetry::init_default_ut_logging();
let dir = TempDir::new("file-purge").unwrap();
let object_store = ObjectStore::new(
Builder::default()
.root(dir.path().to_str().unwrap())
.build()
.unwrap(),
);
let sst_file_name = "test-file-purger.parquet";
let scheduler = Arc::new(LocalScheduler::new(
SchedulerConfig::default(),
FilePurgeHandler,
));
let (handle, path, _layer) =
create_sst_file(object_store.clone(), sst_file_name, scheduler.clone()).await;
{
// mark file as deleted and drop the handle, we expect the file is deleted.
handle.mark_deleted();
drop(handle);
}
scheduler.stop(true).await.unwrap();
assert!(!object_store
.object(&format!("{}/{}", path, sst_file_name))
.is_exist()
.await
.unwrap());
}
}

View File

@@ -174,6 +174,7 @@ impl<S: LogStore> FlushJob<S> {
return CancelledSnafu {}.fail();
}
let region_id = self.shared.id();
let mut futures = Vec::with_capacity(self.memtables.len());
let iter_ctx = IterContext {
for_flush: true,
@@ -198,6 +199,7 @@ impl<S: LogStore> FlushJob<S> {
.await?;
Ok(FileMeta {
region_id,
file_name,
time_range,
level: 0,

View File

@@ -40,3 +40,4 @@ mod wal;
pub mod write_batch;
pub use engine::EngineImpl;
mod file_purger;

View File

@@ -41,6 +41,7 @@ pub fn build_region_edit(
files_to_add: files_to_add
.iter()
.map(|f| FileMeta {
region_id: 0,
file_name: f.to_string(),
time_range: None,
level: 0,
@@ -49,6 +50,7 @@ pub fn build_region_edit(
files_to_remove: files_to_remove
.iter()
.map(|f| FileMeta {
region_id: 0,
file_name: f.to_string(),
time_range: None,
level: 0,

View File

@@ -31,6 +31,7 @@ use store_api::storage::{
use crate::compaction::CompactionSchedulerRef;
use crate::config::EngineConfig;
use crate::error::{self, Error, Result};
use crate::file_purger::FilePurgerRef;
use crate::flush::{FlushSchedulerRef, FlushStrategyRef};
use crate::manifest::action::{
RawRegionMetadata, RegionChange, RegionMetaAction, RegionMetaActionList,
@@ -122,6 +123,7 @@ pub struct StoreConfig<S: LogStore> {
pub flush_strategy: FlushStrategyRef,
pub compaction_scheduler: CompactionSchedulerRef<S>,
pub engine_config: Arc<EngineConfig>,
pub file_purger: FilePurgerRef,
}
pub type RecoverdMetadata = (SequenceNumber, (ManifestVersion, RawRegionMetadata));
@@ -151,7 +153,13 @@ impl<S: LogStore> RegionImpl<S> {
let mutable_memtable = store_config
.memtable_builder
.build(metadata.schema().clone());
let version = Version::with_manifest_version(metadata, manifest_version, mutable_memtable);
let version = Version::with_manifest_version(
metadata,
manifest_version,
mutable_memtable,
store_config.sst_layer.clone(),
store_config.file_purger.clone(),
);
let region = RegionImpl::new(version, store_config);
Ok(region)
@@ -198,6 +206,8 @@ impl<S: LogStore> RegionImpl<S> {
let (version, mut recovered_metadata) = match Self::recover_from_manifest(
&store_config.manifest,
&store_config.memtable_builder,
&store_config.sst_layer,
&store_config.file_purger,
)
.await?
{
@@ -289,6 +299,8 @@ impl<S: LogStore> RegionImpl<S> {
async fn recover_from_manifest(
manifest: &RegionManifest,
memtable_builder: &MemtableBuilderRef,
sst_layer: &AccessLayerRef,
file_purger: &FilePurgerRef,
) -> Result<(Option<Version>, RecoveredMetadataMap)> {
let (start, end) = Self::manifest_scan_range();
let mut iter = manifest.scan(start, end).await?;
@@ -316,6 +328,8 @@ impl<S: LogStore> RegionImpl<S> {
Arc::new(region_metadata),
last_manifest_version,
memtable,
sst_layer.clone(),
file_purger.clone(),
));
for (manifest_version, action) in actions.drain(..) {
version = Self::replay_edit(manifest_version, action, version);

View File

@@ -37,9 +37,12 @@ use store_api::storage::{
use tempdir::TempDir;
use super::*;
use crate::file_purger::noop::NoopFilePurgeHandler;
use crate::manifest::action::{RegionChange, RegionMetaActionList};
use crate::manifest::test_utils::*;
use crate::memtable::DefaultMemtableBuilder;
use crate::scheduler::{LocalScheduler, SchedulerConfig};
use crate::sst::FsAccessLayer;
use crate::test_util::descriptor_util::RegionDescBuilder;
use crate::test_util::{self, config_util, schema_util, write_batch_util};
@@ -285,17 +288,25 @@ async fn test_recover_region_manifets() {
.unwrap(),
);
let manifest = RegionManifest::new("/manifest/", object_store);
let manifest = RegionManifest::new("/manifest/", object_store.clone());
let region_meta = Arc::new(build_region_meta());
let sst_layer = Arc::new(FsAccessLayer::new("sst", object_store)) as _;
let file_purger = Arc::new(LocalScheduler::new(
SchedulerConfig::default(),
NoopFilePurgeHandler,
));
// Recover from empty
assert!(
RegionImpl::<NoopLogStore>::recover_from_manifest(&manifest, &memtable_builder)
.await
.unwrap()
.0
.is_none()
);
assert!(RegionImpl::<NoopLogStore>::recover_from_manifest(
&manifest,
&memtable_builder,
&sst_layer,
&file_purger
)
.await
.unwrap()
.0
.is_none());
{
// save some actions into region_meta
@@ -329,10 +340,14 @@ async fn test_recover_region_manifets() {
}
// try to recover
let (version, recovered_metadata) =
RegionImpl::<NoopLogStore>::recover_from_manifest(&manifest, &memtable_builder)
.await
.unwrap();
let (version, recovered_metadata) = RegionImpl::<NoopLogStore>::recover_from_manifest(
&manifest,
&memtable_builder,
&sst_layer,
&file_purger,
)
.await
.unwrap();
assert_eq!(42, *recovered_metadata.first_key_value().unwrap().0);
let version = version.unwrap();

View File

@@ -652,7 +652,7 @@ impl WriterInner {
);
return;
}
match compaction_scheduler.schedule(compaction_request).await {
match compaction_scheduler.schedule(compaction_request) {
Ok(scheduled) => {
info!(
"Schedule region {} compaction request result: {}",

View File

@@ -14,17 +14,18 @@
use std::fmt::{Debug, Formatter};
use std::hash::Hash;
use std::sync::atomic::{AtomicU8, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use async_trait::async_trait;
use common_telemetry::{debug, error, info};
use snafu::ResultExt;
use snafu::{ensure, ResultExt};
use tokio::sync::Notify;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use crate::error;
use crate::error::StopSchedulerSnafu;
use crate::error::{IllegalSchedulerStateSnafu, StopSchedulerSnafu};
use crate::scheduler::dedup_deque::DedupDeque;
use crate::scheduler::rate_limit::{
BoxedRateLimitToken, CascadeRateLimiter, MaxInflightTaskLimiter, RateLimiter,
@@ -58,28 +59,35 @@ pub trait Handler {
#[async_trait]
pub trait Scheduler: Debug {
type Request;
/// Schedules a request.
/// Returns true if request is scheduled. Returns false if task queue already
/// contains the request with same key.
async fn schedule(&self, request: Self::Request) -> error::Result<bool>;
fn schedule(&self, request: Self::Request) -> error::Result<bool>;
/// Stops scheduler.
async fn stop(&self) -> error::Result<()>;
/// Stops scheduler. If `await_termination` is set to true, the scheduler will
/// wait until all queued requests are processed.
async fn stop(&self, await_termination: bool) -> error::Result<()>;
}
/// Scheduler config.
#[derive(Debug)]
pub struct SchedulerConfig {
pub max_inflight_task: usize,
pub max_inflight_tasks: usize,
}
impl Default for SchedulerConfig {
fn default() -> Self {
Self {
max_inflight_task: 4,
max_inflight_tasks: 4,
}
}
}
const STATE_RUNNING: u8 = 0;
const STATE_STOP: u8 = 1;
const STATE_AWAIT_TERMINATION: u8 = 2;
/// Request scheduler based on local state.
pub struct LocalScheduler<R: Request> {
/// Request FIFO with key deduplication.
@@ -90,6 +98,8 @@ pub struct LocalScheduler<R: Request> {
task_notifier: Arc<Notify>,
/// Join handle of spawned request handling loop.
join_handle: Mutex<Option<JoinHandle<()>>>,
/// State of scheduler.
state: Arc<AtomicU8>,
}
impl<R> Debug for LocalScheduler<R>
@@ -108,7 +118,8 @@ where
{
type Request = R;
async fn schedule(&self, request: Self::Request) -> error::Result<bool> {
fn schedule(&self, request: Self::Request) -> error::Result<bool> {
ensure!(self.running(), IllegalSchedulerStateSnafu);
debug!(
"Schedule request: {:?}, queue size: {}",
request.key(),
@@ -120,7 +131,14 @@ where
Ok(res)
}
async fn stop(&self) -> error::Result<()> {
async fn stop(&self, await_termination: bool) -> error::Result<()> {
let state = if await_termination {
STATE_AWAIT_TERMINATION
} else {
STATE_STOP
};
self.state.store(state, Ordering::Relaxed);
self.cancel_token.cancel();
let handle = { self.join_handle.lock().unwrap().take() };
if let Some(handle) = handle {
@@ -142,15 +160,16 @@ where
let request_queue = Arc::new(RwLock::new(DedupDeque::default()));
let cancel_token = CancellationToken::new();
let task_notifier = Arc::new(Notify::new());
let state = Arc::new(AtomicU8::new(STATE_RUNNING));
let handle_loop = HandlerLoop {
task_notifier: task_notifier.clone(),
req_queue: request_queue.clone(),
cancel_token: cancel_token.child_token(),
limiter: Arc::new(CascadeRateLimiter::new(vec![Box::new(
MaxInflightTaskLimiter::new(config.max_inflight_task),
MaxInflightTaskLimiter::new(config.max_inflight_tasks),
)])),
request_handler: handler,
state: state.clone(),
};
let join_handle = common_runtime::spawn_bg(async move {
debug!("Task handler loop spawned");
@@ -161,6 +180,7 @@ where
request_queue,
cancel_token,
task_notifier,
state,
}
}
@@ -169,6 +189,11 @@ where
fn remaining_requests(&self) -> usize {
self.request_queue.read().unwrap().len()
}
#[inline]
fn running(&self) -> bool {
self.state.load(Ordering::Relaxed) == STATE_RUNNING
}
}
pub struct HandlerLoop<R: Request, H: Handler> {
@@ -177,6 +202,7 @@ pub struct HandlerLoop<R: Request, H: Handler> {
pub task_notifier: Arc<Notify>,
pub request_handler: H,
pub limiter: Arc<CascadeRateLimiter<R>>,
pub state: Arc<AtomicU8>,
}
impl<R, H> HandlerLoop<R, H>
@@ -186,36 +212,52 @@ where
{
/// Runs scheduled requests dispatch loop.
pub async fn run(&self) {
let task_notifier = self.task_notifier.clone();
let limiter = self.limiter.clone();
loop {
while self.running() {
tokio::select! {
_ = task_notifier.notified() => {
// poll requests as many as possible until rate limited, and then wait for
// notification (some task's finished).
debug!("Notified, queue size: {:?}", self.req_queue.read().unwrap().len());
while let Some((task_key, req)) = self.poll_task().await{
if let Ok(token) = limiter.acquire_token(&req) {
debug!("Executing request: {:?}", task_key);
if let Err(e) = self.handle_request(req, token, self.task_notifier.clone()).await {
error!(e; "Failed to submit request: {:?}", task_key);
} else {
info!("Submitted task: {:?}", task_key);
}
} else {
// rate limited, put back to req queue to wait for next schedule
debug!("Put back request {:?}, queue size: {}", task_key, self.req_queue.read().unwrap().len());
self.put_back_req(task_key, req).await;
break;
}
}
_ = self.task_notifier.notified() => {
debug!("Notified, queue size: {:?}",self.req_queue.read().unwrap().len());
self.poll_and_execute(&limiter).await;
}
_ = self.cancel_token.cancelled() => {
info!("Task scheduler stopped.");
return;
info!("Task scheduler cancelled.");
break;
}
}
}
// For correctness, we need to poll requests from fifo again.
if self.state.load(Ordering::Relaxed) == STATE_AWAIT_TERMINATION {
info!("Waiting for all pending tasks to finish.");
self.poll_and_execute(&limiter).await;
self.state.store(STATE_STOP, Ordering::Relaxed);
}
info!("Task scheduler stopped");
}
/// Polls and executes requests as many as possible until rate limited.
async fn poll_and_execute(&self, limiter: &Arc<CascadeRateLimiter<R>>) {
while let Some((task_key, req)) = self.poll_task().await {
if let Ok(token) = limiter.acquire_token(&req) {
debug!("Executing request: {:?}", task_key);
if let Err(e) = self
.handle_request(req, token, self.task_notifier.clone())
.await
{
error!(e; "Failed to submit request: {:?}", task_key);
} else {
info!("Submitted task: {:?}", task_key);
}
} else {
// rate limited, put back to req queue to wait for next schedule
debug!(
"Put back request {:?}, queue size: {}",
task_key,
self.req_queue.read().unwrap().len()
);
self.put_back_req(task_key, req).await;
break;
}
}
}
#[inline]
@@ -242,4 +284,288 @@ where
.handle_request(req, token, finish_notifier)
.await
}
#[inline]
fn running(&self) -> bool {
self.state.load(Ordering::Relaxed) == STATE_RUNNING
}
}
#[cfg(test)]
mod tests {
use std::sync::atomic::AtomicI32;
use std::time::Duration;
use store_api::storage::RegionId;
use super::*;
use crate::scheduler::dedup_deque::DedupDeque;
use crate::scheduler::rate_limit::{
BoxedRateLimitToken, CascadeRateLimiter, MaxInflightTaskLimiter,
};
use crate::scheduler::{HandlerLoop, LocalScheduler, Scheduler, SchedulerConfig};
struct CountdownLatch {
counter: std::sync::Mutex<usize>,
notifies: std::sync::RwLock<Vec<Arc<Notify>>>,
}
impl CountdownLatch {
fn new(size: usize) -> Self {
Self {
counter: std::sync::Mutex::new(size),
notifies: std::sync::RwLock::new(vec![]),
}
}
fn countdown(&self) {
let mut counter = self.counter.lock().unwrap();
if *counter >= 1 {
*counter -= 1;
if *counter == 0 {
let notifies = self.notifies.read().unwrap();
for waiter in notifies.iter() {
waiter.notify_one();
}
}
}
}
async fn wait(&self) {
let notify = Arc::new(Notify::new());
{
let notify = notify.clone();
let mut notifies = self.notifies.write().unwrap();
notifies.push(notify);
}
notify.notified().await
}
}
#[tokio::test]
async fn test_schedule_handler() {
common_telemetry::init_default_ut_logging();
let queue = Arc::new(std::sync::RwLock::new(DedupDeque::default()));
let latch = Arc::new(CountdownLatch::new(2));
let latch_cloned = latch.clone();
let handler = Arc::new(HandlerLoop {
req_queue: queue.clone(),
cancel_token: Default::default(),
task_notifier: Arc::new(Default::default()),
request_handler: MockHandler {
cb: move || {
latch_cloned.countdown();
},
},
limiter: Arc::new(CascadeRateLimiter::new(vec![Box::new(
MaxInflightTaskLimiter::new(3),
)])),
state: Arc::new(AtomicU8::default()),
});
let handler_cloned = handler.clone();
common_runtime::spawn_bg(async move { handler_cloned.run().await });
queue.write().unwrap().push_back(1, MockRequest::default());
handler.task_notifier.notify_one();
queue.write().unwrap().push_back(2, MockRequest::default());
handler.task_notifier.notify_one();
tokio::time::timeout(Duration::from_secs(1), latch.wait())
.await
.unwrap();
}
#[derive(Default, Debug)]
struct MockRequest {
region_id: RegionId,
}
struct MockHandler<F> {
cb: F,
}
#[async_trait::async_trait]
impl<F> Handler for MockHandler<F>
where
F: Fn() + Send + Sync,
{
type Request = MockRequest;
async fn handle_request(
&self,
_req: Self::Request,
token: BoxedRateLimitToken,
finish_notifier: Arc<Notify>,
) -> error::Result<()> {
(self.cb)();
token.try_release();
finish_notifier.notify_one();
Ok(())
}
}
impl Request for MockRequest {
type Key = RegionId;
fn key(&self) -> Self::Key {
self.region_id
}
}
#[tokio::test]
async fn test_scheduler() {
let latch = Arc::new(CountdownLatch::new(2));
let latch_cloned = latch.clone();
let handler = MockHandler {
cb: move || {
latch_cloned.countdown();
},
};
let scheduler: LocalScheduler<MockRequest> = LocalScheduler::new(
SchedulerConfig {
max_inflight_tasks: 3,
},
handler,
);
scheduler.schedule(MockRequest { region_id: 1 }).unwrap();
scheduler.schedule(MockRequest { region_id: 2 }).unwrap();
tokio::time::timeout(Duration::from_secs(1), latch.wait())
.await
.unwrap();
}
#[tokio::test]
async fn test_scheduler_many() {
common_telemetry::init_default_ut_logging();
let task_size = 100;
let latch = Arc::new(CountdownLatch::new(task_size));
let latch_clone = latch.clone();
let handler = MockHandler {
cb: move || {
latch_clone.countdown();
},
};
let config = SchedulerConfig {
max_inflight_tasks: 3,
};
let scheduler = LocalScheduler::new(config, handler);
for i in 0..task_size {
scheduler
.schedule(MockRequest {
region_id: i as RegionId,
})
.unwrap();
}
tokio::time::timeout(Duration::from_secs(3), latch.wait())
.await
.unwrap();
}
#[tokio::test]
async fn test_scheduler_interval() {
common_telemetry::init_default_ut_logging();
let task_size = 100;
let latch = Arc::new(CountdownLatch::new(task_size));
let latch_clone = latch.clone();
let handler = MockHandler {
cb: move || {
latch_clone.countdown();
},
};
let config = SchedulerConfig {
max_inflight_tasks: 3,
};
let scheduler = LocalScheduler::new(config, handler);
for i in 0..task_size / 2 {
scheduler
.schedule(MockRequest {
region_id: i as RegionId,
})
.unwrap();
}
tokio::time::sleep(Duration::from_millis(100)).await;
for i in task_size / 2..task_size {
scheduler
.schedule(MockRequest {
region_id: i as RegionId,
})
.unwrap();
}
tokio::time::timeout(Duration::from_secs(6), latch.wait())
.await
.unwrap();
}
#[tokio::test]
async fn test_schedule_duplicate_tasks() {
common_telemetry::init_default_ut_logging();
let handler = MockHandler { cb: || {} };
let config = SchedulerConfig {
max_inflight_tasks: 30,
};
let scheduler = LocalScheduler::new(config, handler);
let mut scheduled_task = 0;
for _ in 0..10 {
if scheduler.schedule(MockRequest { region_id: 1 }).unwrap() {
scheduled_task += 1;
}
}
scheduler.stop(true).await.unwrap();
debug!("Schedule tasks: {}", scheduled_task);
assert!(scheduled_task < 10);
}
#[tokio::test]
async fn test_await_termination() {
common_telemetry::init_default_ut_logging();
let finished = Arc::new(AtomicI32::new(0));
let finished_clone = finished.clone();
let handler = MockHandler {
cb: move || {
finished_clone.fetch_add(1, Ordering::Relaxed);
},
};
let config = SchedulerConfig {
max_inflight_tasks: 3,
};
let scheduler = Arc::new(LocalScheduler::new(config, handler));
let scheduler_cloned = scheduler.clone();
let task_scheduled = Arc::new(AtomicI32::new(0));
let task_scheduled_cloned = task_scheduled.clone();
common_runtime::spawn_write(async move {
for i in 0..10000 {
if let Ok(res) = scheduler_cloned.schedule(MockRequest {
region_id: i as RegionId,
}) {
if res {
task_scheduled_cloned.fetch_add(1, Ordering::Relaxed);
}
}
}
});
tokio::time::sleep(Duration::from_millis(1)).await;
scheduler.stop(true).await.unwrap();
let finished = finished.load(Ordering::Relaxed);
assert_eq!(finished, task_scheduled.load(Ordering::Relaxed));
}
}

View File

@@ -45,16 +45,16 @@ pub type BoxedRateLimiter<R> = Box<dyn RateLimiter<Request = R> + Send + Sync>;
/// Limits max inflight tasks number.
pub struct MaxInflightTaskLimiter<R> {
max_inflight_task: usize,
inflight_task: Arc<AtomicUsize>,
max_inflight_tasks: usize,
inflight_tasks: Arc<AtomicUsize>,
_phantom_data: PhantomData<R>,
}
impl<R> MaxInflightTaskLimiter<R> {
pub fn new(max_inflight_task: usize) -> Self {
pub fn new(max_inflight_tasks: usize) -> Self {
Self {
max_inflight_task,
inflight_task: Arc::new(AtomicUsize::new(0)),
max_inflight_tasks,
inflight_tasks: Arc::new(AtomicUsize::new(0)),
_phantom_data: Default::default(),
}
}
@@ -64,20 +64,20 @@ impl<R> RateLimiter for MaxInflightTaskLimiter<R> {
type Request = R;
fn acquire_token(&self, _: &Self::Request) -> Result<BoxedRateLimitToken> {
if self.inflight_task.fetch_add(1, Ordering::Relaxed) >= self.max_inflight_task {
self.inflight_task.fetch_sub(1, Ordering::Relaxed);
if self.inflight_tasks.fetch_add(1, Ordering::Relaxed) >= self.max_inflight_tasks {
self.inflight_tasks.fetch_sub(1, Ordering::Relaxed);
return RateLimitedSnafu {
msg: format!(
"Max inflight task num exceeds, current: {}, max: {}",
self.inflight_task.load(Ordering::Relaxed),
self.max_inflight_task
self.inflight_tasks.load(Ordering::Relaxed),
self.max_inflight_tasks
),
}
.fail();
}
Ok(Box::new(MaxInflightLimiterToken::new(
self.inflight_task.clone(),
self.inflight_tasks.clone(),
)))
}
}
@@ -160,14 +160,14 @@ mod tests {
fn test_max_inflight_limiter() {
let limiter = MaxInflightTaskLimiter::new(3);
let t1 = limiter.acquire_token(&1).unwrap();
assert_eq!(1, limiter.inflight_task.load(Ordering::Relaxed));
assert_eq!(1, limiter.inflight_tasks.load(Ordering::Relaxed));
let _t2 = limiter.acquire_token(&1).unwrap();
assert_eq!(2, limiter.inflight_task.load(Ordering::Relaxed));
assert_eq!(2, limiter.inflight_tasks.load(Ordering::Relaxed));
let _t3 = limiter.acquire_token(&1).unwrap();
assert_eq!(3, limiter.inflight_task.load(Ordering::Relaxed));
assert_eq!(3, limiter.inflight_tasks.load(Ordering::Relaxed));
assert!(limiter.acquire_token(&1).is_err());
t1.try_release();
assert_eq!(2, limiter.inflight_task.load(Ordering::Relaxed));
assert_eq!(2, limiter.inflight_tasks.load(Ordering::Relaxed));
let _t4 = limiter.acquire_token(&1).unwrap();
}

View File

@@ -19,17 +19,21 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use async_trait::async_trait;
use common_telemetry::{error, info};
use common_time::range::TimestampRange;
use common_time::Timestamp;
use object_store::{util, ObjectStore};
use serde::{Deserialize, Serialize};
use store_api::storage::ChunkReader;
use snafu::ResultExt;
use store_api::storage::{ChunkReader, RegionId};
use table::predicate::Predicate;
use crate::chunk::ChunkReaderImpl;
use crate::error::Result;
use crate::error::{DeleteSstSnafu, Result};
use crate::file_purger::{FilePurgeRequest, FilePurgerRef};
use crate::memtable::BoxedBatchIterator;
use crate::read::{Batch, BoxedBatchReader};
use crate::scheduler::Scheduler;
use crate::schema::ProjectedSchemaRef;
use crate::sst::parquet::{ParquetReader, ParquetWriter};
@@ -48,13 +52,17 @@ type LevelMetaVec = [LevelMeta; MAX_LEVEL as usize];
#[derive(Debug, Clone)]
pub struct LevelMetas {
levels: LevelMetaVec,
sst_layer: AccessLayerRef,
file_purger: FilePurgerRef,
}
impl LevelMetas {
/// Create a new LevelMetas and initialized each level.
pub fn new() -> LevelMetas {
pub fn new(sst_layer: AccessLayerRef, file_purger: FilePurgerRef) -> LevelMetas {
LevelMetas {
levels: new_level_meta_vec(),
sst_layer,
file_purger,
}
}
@@ -75,18 +83,21 @@ impl LevelMetas {
/// Panics if level of [FileHandle] is greater than [MAX_LEVEL].
pub fn merge(
&self,
files_to_add: impl Iterator<Item = FileHandle>,
files_to_remove: impl Iterator<Item = FileHandle>,
files_to_add: impl Iterator<Item = FileMeta>,
files_to_remove: impl Iterator<Item = FileMeta>,
) -> LevelMetas {
let mut merged = self.clone();
for file in files_to_add {
let level = file.level();
merged.levels[level as usize].add_file(file);
let level = file.level;
let handle = FileHandle::new(file, self.sst_layer.clone(), self.file_purger.clone());
merged.levels[level as usize].add_file(handle);
}
for file in files_to_remove {
let level = file.level();
merged.levels[level as usize].remove_file(file);
let level = file.level;
if let Some(removed_file) = merged.levels[level as usize].remove_file(&file.file_name) {
removed_file.mark_deleted();
}
}
merged
}
@@ -96,12 +107,6 @@ impl LevelMetas {
}
}
impl Default for LevelMetas {
fn default() -> LevelMetas {
LevelMetas::new()
}
}
/// Metadata of files in same SST level.
#[derive(Debug, Default, Clone)]
pub struct LevelMeta {
@@ -124,8 +129,8 @@ impl LevelMeta {
self.files.insert(file.file_name().to_string(), file);
}
fn remove_file(&mut self, file_to_remove: FileHandle) {
self.files.remove(file_to_remove.file_name());
fn remove_file(&mut self, file_to_remove: &str) -> Option<FileHandle> {
self.files.remove(file_to_remove)
}
/// Returns the level of level meta.
@@ -160,9 +165,13 @@ pub struct FileHandle {
}
impl FileHandle {
pub fn new(meta: FileMeta) -> FileHandle {
pub fn new(
meta: FileMeta,
sst_layer: AccessLayerRef,
file_purger: FilePurgerRef,
) -> FileHandle {
FileHandle {
inner: Arc::new(FileHandleInner::new(meta)),
inner: Arc::new(FileHandleInner::new(meta, sst_layer, file_purger)),
}
}
@@ -190,9 +199,19 @@ impl FileHandle {
/// Sets the compacting flag.
#[inline]
pub fn set_compacting(&self, compacting: bool) {
pub fn mark_compacting(&self, compacting: bool) {
self.inner.compacting.store(compacting, Ordering::Relaxed);
}
#[inline]
pub fn deleted(&self) -> bool {
self.inner.deleted.load(Ordering::Relaxed)
}
#[inline]
pub fn mark_deleted(&self) {
self.inner.deleted.store(true, Ordering::Relaxed);
}
}
/// Actually data of [FileHandle].
@@ -202,13 +221,46 @@ impl FileHandle {
struct FileHandleInner {
meta: FileMeta,
compacting: AtomicBool,
deleted: AtomicBool,
sst_layer: AccessLayerRef,
file_purger: FilePurgerRef,
}
impl Drop for FileHandleInner {
fn drop(&mut self) {
if self.deleted.load(Ordering::Relaxed) {
let request = FilePurgeRequest {
sst_layer: self.sst_layer.clone(),
file_name: self.meta.file_name.clone(),
region_id: self.meta.region_id,
};
match self.file_purger.schedule(request) {
Ok(res) => {
info!(
"Scheduled SST purge task, region: {}, name: {}, res: {}",
self.meta.region_id, self.meta.file_name, res
);
}
Err(e) => {
error!(e; "Failed to schedule SST purge task, region: {}, name: {}", self.meta.region_id, self.meta.file_name);
}
}
}
}
}
impl FileHandleInner {
fn new(meta: FileMeta) -> FileHandleInner {
fn new(
meta: FileMeta,
sst_layer: AccessLayerRef,
file_purger: FilePurgerRef,
) -> FileHandleInner {
FileHandleInner {
meta,
compacting: AtomicBool::new(false),
deleted: AtomicBool::new(false),
sst_layer,
file_purger,
}
}
}
@@ -216,7 +268,11 @@ impl FileHandleInner {
/// Immutable metadata of a sst file.
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct FileMeta {
/// Region of file.
pub region_id: RegionId,
/// File name
pub file_name: String,
/// Timestamp range of file.
pub time_range: Option<(Timestamp, Timestamp)>,
/// SST level of the file.
pub level: Level,
@@ -257,8 +313,8 @@ pub trait AccessLayer: Send + Sync + std::fmt::Debug {
/// Read SST file with given `file_name` and schema.
async fn read_sst(&self, file_name: &str, opts: &ReadOptions) -> Result<BoxedBatchReader>;
/// Returns backend object store.
fn object_store(&self) -> ObjectStore;
/// Deletes a SST file with given name.
async fn delete_sst(&self, file_name: &str) -> Result<()>;
}
pub type AccessLayerRef = Arc<dyn AccessLayer>;
@@ -340,8 +396,10 @@ impl AccessLayer for FsAccessLayer {
Ok(Box::new(stream))
}
fn object_store(&self) -> ObjectStore {
self.object_store.clone()
async fn delete_sst(&self, file_name: &str) -> Result<()> {
let path = self.sst_file_path(file_name);
let object = self.object_store.object(&path);
object.delete().await.context(DeleteSstSnafu)
}
}
@@ -350,20 +408,28 @@ mod tests {
use std::collections::HashSet;
use super::*;
use crate::file_purger::noop::NoopFilePurgeHandler;
use crate::scheduler::{LocalScheduler, SchedulerConfig};
fn create_handle(name: &str, level: Level) -> FileHandle {
FileHandle::new(FileMeta {
fn create_file_meta(name: &str, level: Level) -> FileMeta {
FileMeta {
region_id: 0,
file_name: name.to_string(),
time_range: None,
level,
})
}
}
#[test]
fn test_level_metas_add_and_remove() {
let metas = LevelMetas::new();
let layer = Arc::new(crate::test_util::access_layer_util::MockAccessLayer {});
let purger = Arc::new(LocalScheduler::new(
SchedulerConfig::default(),
NoopFilePurgeHandler,
));
let metas = LevelMetas::new(layer, purger);
let merged = metas.merge(
vec![create_handle("a", 0), create_handle("b", 0)].into_iter(),
vec![create_file_meta("a", 0), create_file_meta("b", 0)].into_iter(),
vec![].into_iter(),
);
@@ -377,7 +443,7 @@ mod tests {
);
let merged1 = merged.merge(
vec![create_handle("c", 1), create_handle("d", 1)].into_iter(),
vec![create_file_meta("c", 1), create_file_meta("d", 1)].into_iter(),
vec![].into_iter(),
);
assert_eq!(
@@ -400,7 +466,7 @@ mod tests {
let removed1 = merged1.merge(
vec![].into_iter(),
vec![create_handle("a", 0), create_handle("c", 0)].into_iter(),
vec![create_file_meta("a", 0), create_file_meta("c", 0)].into_iter(),
);
assert_eq!(
HashSet::from(["b".to_string()]),
@@ -422,7 +488,7 @@ mod tests {
let removed2 = removed1.merge(
vec![].into_iter(),
vec![create_handle("c", 1), create_handle("d", 1)].into_iter(),
vec![create_file_meta("c", 1), create_file_meta("d", 1)].into_iter(),
);
assert_eq!(
HashSet::from(["b".to_string()]),

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod access_layer_util;
pub mod config_util;
pub mod descriptor_util;
pub mod flush_switch;

View File

@@ -0,0 +1,43 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::read::BoxedBatchReader;
use crate::sst::{AccessLayer, ReadOptions, Source, SstInfo, WriteOptions};
#[derive(Debug)]
pub struct MockAccessLayer;
#[async_trait::async_trait]
impl AccessLayer for MockAccessLayer {
async fn write_sst(
&self,
_file_name: &str,
_source: Source,
_opts: &WriteOptions,
) -> crate::error::Result<SstInfo> {
unimplemented!()
}
async fn read_sst(
&self,
_file_name: &str,
_opts: &ReadOptions,
) -> crate::error::Result<BoxedBatchReader> {
unimplemented!()
}
async fn delete_sst(&self, _file_name: &str) -> crate::error::Result<()> {
Ok(())
}
}

View File

@@ -22,10 +22,12 @@ use object_store::ObjectStore;
use crate::background::JobPoolImpl;
use crate::compaction::noop::NoopCompactionScheduler;
use crate::engine;
use crate::file_purger::noop::NoopFilePurgeHandler;
use crate::flush::{FlushSchedulerImpl, SizeBasedStrategy};
use crate::manifest::region::RegionManifest;
use crate::memtable::DefaultMemtableBuilder;
use crate::region::StoreConfig;
use crate::scheduler::{LocalScheduler, SchedulerConfig};
use crate::sst::FsAccessLayer;
fn log_store_dir(store_dir: &str) -> String {
@@ -53,6 +55,10 @@ pub async fn new_store_config(
};
let log_store = Arc::new(RaftEngineLogStore::try_new(log_config).await.unwrap());
let compaction_scheduler = Arc::new(NoopCompactionScheduler::default());
let file_purger = Arc::new(LocalScheduler::new(
SchedulerConfig::default(),
NoopFilePurgeHandler,
));
StoreConfig {
log_store,
sst_layer,
@@ -62,5 +68,6 @@ pub async fn new_store_config(
flush_strategy: Arc::new(SizeBasedStrategy::default()),
compaction_scheduler,
engine_config: Default::default(),
file_purger,
}
}

View File

@@ -28,12 +28,12 @@ use common_telemetry::info;
use store_api::manifest::ManifestVersion;
use store_api::storage::{SchemaRef, SequenceNumber};
use crate::file_purger::FilePurgerRef;
use crate::memtable::{MemtableId, MemtableRef, MemtableVersion};
use crate::metadata::RegionMetadataRef;
use crate::schema::RegionSchemaRef;
use crate::sst::{FileHandle, FileMeta, LevelMetas};
use crate::sst::{AccessLayerRef, FileMeta, LevelMetas};
use crate::sync::CowCell;
pub const INIT_COMMITTED_SEQUENCE: u64 = 0;
/// Controls version of in memory state for a region.
@@ -167,7 +167,12 @@ impl Version {
/// Create a new `Version` with given `metadata`.
#[cfg(test)]
pub fn new(metadata: RegionMetadataRef, memtable: MemtableRef) -> Version {
Version::with_manifest_version(metadata, 0, memtable)
let sst_layer = Arc::new(crate::test_util::access_layer_util::MockAccessLayer) as Arc<_>;
let file_purger = Arc::new(crate::scheduler::LocalScheduler::new(
crate::scheduler::SchedulerConfig::default(),
crate::file_purger::noop::NoopFilePurgeHandler,
));
Version::with_manifest_version(metadata, 0, memtable, sst_layer, file_purger)
}
/// Create a new `Version` with given `metadata` and initial `manifest_version`.
@@ -175,11 +180,13 @@ impl Version {
metadata: RegionMetadataRef,
manifest_version: ManifestVersion,
mutable_memtable: MemtableRef,
sst_layer: AccessLayerRef,
file_purger: FilePurgerRef,
) -> Version {
Version {
metadata,
memtables: Arc::new(MemtableVersion::new(mutable_memtable)),
ssts: Arc::new(LevelMetas::new()),
ssts: Arc::new(LevelMetas::new(sst_layer, file_purger)),
flushed_sequence: 0,
manifest_version,
}
@@ -236,11 +243,10 @@ impl Version {
self.memtables = Arc::new(removed);
}
let handles_to_add = edit.files_to_add.into_iter().map(FileHandle::new);
let merged_ssts = self.ssts.merge(
handles_to_add,
edit.files_to_remove.into_iter().map(FileHandle::new),
);
let handles_to_add = edit.files_to_add.into_iter();
let merged_ssts = self
.ssts
.merge(handles_to_add, edit.files_to_remove.into_iter());
info!(
"After apply edit, region: {}, SST files: {:?}",