feat: Use LocalScheduler framework to implement FlushScheduler (#1531)

* test: simplify countdownlatch

* feat: impl Drop for LocalScheduler

* feat(storage): Impl FlushRequest and FlushHandler

* feat(storage): Use scheduler to handle flush job

* chore(storage): remove unused code

* feat(storage): Use new type pattern for RegionMap

* feat(storage): Remove on_success callback

* feat(storage): Address CR comments and add some metrics to flush
This commit is contained in:
Yingwen
2023-05-10 15:16:51 +08:00
committed by GitHub
parent aa0c5b888c
commit ee3e1dbdaa
14 changed files with 463 additions and 377 deletions

1
Cargo.lock generated
View File

@@ -8323,6 +8323,7 @@ dependencies = [
"futures-util",
"lazy_static",
"log-store",
"metrics",
"object-store",
"parquet",
"paste",

View File

@@ -26,6 +26,7 @@ datafusion-expr.workspace = true
futures.workspace = true
futures-util.workspace = true
lazy_static = "1.4"
metrics.workspace = true
object-store = { path = "../object-store" }
parquet = { workspace = true, features = ["async"] }
paste.workspace = true

View File

@@ -1,120 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Background job management.
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use async_trait::async_trait;
use common_runtime::{self, JoinHandle};
use snafu::ResultExt;
use crate::error::{self, Result};
/// Background job context.
#[derive(Clone, Debug, Default)]
pub struct Context {
inner: Arc<ContextInner>,
}
impl Context {
fn new() -> Context {
Context::default()
}
/// Marks this context as cancelled.
///
/// Job accessing this context should check `is_cancelled()` and exit if it
/// returns true.
pub fn cancel(&self) {
self.inner.cancelled.store(false, Ordering::Relaxed);
}
/// Returns true if this context is cancelled.
pub fn is_cancelled(&self) -> bool {
self.inner.cancelled.load(Ordering::Relaxed)
}
}
#[derive(Debug, Default)]
struct ContextInner {
cancelled: AtomicBool,
}
/// Handle to the background job.
#[derive(Debug)]
pub struct JobHandle {
ctx: Context,
handle: JoinHandle<Result<()>>,
}
impl JobHandle {
/// Waits until this background job is finished.
pub async fn join(self) -> Result<()> {
self.handle.await.context(error::JoinTaskSnafu)?
}
/// Cancels this background job gracefully and waits until it exits.
#[allow(unused)]
pub async fn cancel(self) -> Result<()> {
// Tokio also provides an [`abort()`](https://docs.rs/tokio/latest/tokio/task/struct.JoinHandle.html#method.abort)
// method to abort current task, consider using it if we need to abort a background job.
self.ctx.cancel();
self.join().await
}
}
#[async_trait]
pub trait Job: Send {
async fn run(&mut self, ctx: &Context) -> Result<()>;
}
type BoxedJob = Box<dyn Job>;
/// Thread pool that runs all background jobs.
#[async_trait]
pub trait JobPool: Send + Sync + std::fmt::Debug {
/// Submit a job to run in background.
///
/// Returns the [JobHandle] to the job.
async fn submit(&self, job: BoxedJob) -> Result<JobHandle>;
/// Shutdown the manager, pending background jobs may be discarded.
async fn shutdown(&self) -> Result<()>;
}
pub type JobPoolRef = Arc<dyn JobPool>;
#[derive(Debug)]
pub struct JobPoolImpl {}
#[async_trait]
impl JobPool for JobPoolImpl {
async fn submit(&self, mut job: BoxedJob) -> Result<JobHandle> {
// TODO(yingwen): [flush] Schedule background jobs to background workers, controlling parallelism.
let ctx = Context::new();
let job_ctx = ctx.clone();
let handle = common_runtime::spawn_bg(async move { job.run(&job_ctx).await });
Ok(JobHandle { ctx, handle })
}
async fn shutdown(&self) -> Result<()> {
// TODO(yingwen): [flush] Stop background workers.
unimplemented!()
}
}

View File

@@ -44,7 +44,8 @@ impl<S: LogStore> Request for CompactionRequestImpl<S> {
fn complete(self, result: Result<()>) {
if let Some(sender) = self.sender {
// We don't care the send result as
// We don't care the send result as callers might not
// wait the result.
let _ = sender.send(result);
}
}

View File

@@ -26,12 +26,11 @@ use store_api::storage::{
CreateOptions, EngineContext, OpenOptions, Region, RegionDescriptor, StorageEngine,
};
use crate::background::JobPoolImpl;
use crate::compaction::CompactionSchedulerRef;
use crate::config::EngineConfig;
use crate::error::{self, Error, Result};
use crate::file_purger::{FilePurgeHandler, FilePurgerRef};
use crate::flush::{FlushSchedulerImpl, FlushSchedulerRef, FlushStrategyRef, SizeBasedStrategy};
use crate::flush::{FlushScheduler, FlushSchedulerRef, FlushStrategyRef, SizeBasedStrategy};
use crate::manifest::region::RegionManifest;
use crate::memtable::{DefaultMemtableBuilder, MemtableBuilderRef};
use crate::metadata::RegionMetadata;
@@ -132,7 +131,6 @@ enum RegionSlot<S: LogStore> {
Opening,
/// The region is ready for access.
Ready(RegionImpl<S>),
// TODO(yingwen): Closing state.
}
impl<S: LogStore> RegionSlot<S> {
@@ -181,12 +179,12 @@ impl<S: LogStore> Clone for RegionSlot<S> {
/// Used to update slot or clean the slot on failure.
struct SlotGuard<'a, S: LogStore> {
name: &'a str,
regions: &'a RwLock<RegionMap<S>>,
regions: &'a RegionMap<S>,
skip_clean: bool,
}
impl<'a, S: LogStore> SlotGuard<'a, S> {
fn new(name: &'a str, regions: &'a RwLock<RegionMap<S>>) -> SlotGuard<'a, S> {
fn new(name: &'a str, regions: &'a RegionMap<S>) -> SlotGuard<'a, S> {
SlotGuard {
name,
regions,
@@ -196,13 +194,7 @@ impl<'a, S: LogStore> SlotGuard<'a, S> {
/// Update the slot and skip cleaning on drop.
fn update(&mut self, slot: RegionSlot<S>) {
{
let mut regions = self.regions.write().unwrap();
if let Some(old) = regions.get_mut(self.name) {
*old = slot;
}
}
self.regions.update(self.name, slot);
self.skip_clean = true;
}
}
@@ -210,20 +202,70 @@ impl<'a, S: LogStore> SlotGuard<'a, S> {
impl<'a, S: LogStore> Drop for SlotGuard<'a, S> {
fn drop(&mut self) {
if !self.skip_clean {
let mut regions = self.regions.write().unwrap();
regions.remove(self.name);
self.regions.remove(self.name)
}
}
}
type RegionMap<S> = HashMap<String, RegionSlot<S>>;
/// Region slot map.
struct RegionMap<S: LogStore>(RwLock<HashMap<String, RegionSlot<S>>>);
impl<S: LogStore> RegionMap<S> {
/// Returns a new region map.
fn new() -> RegionMap<S> {
RegionMap(RwLock::new(HashMap::new()))
}
/// Returns the `Some(slot)` if there is existing slot with given `name`, or insert
/// given `slot` and returns `None`.
fn get_or_occupy_slot(&self, name: &str, slot: RegionSlot<S>) -> Option<RegionSlot<S>> {
{
// Try to get the region under read lock.
let regions = self.0.read().unwrap();
if let Some(slot) = regions.get(name) {
return Some(slot.clone());
}
}
// Get the region under write lock.
let mut regions = self.0.write().unwrap();
if let Some(slot) = regions.get(name) {
return Some(slot.clone());
}
// No slot in map, we can insert the slot now.
regions.insert(name.to_string(), slot);
None
}
/// Gets the region by the specific name.
fn get_region(&self, name: &str) -> Option<RegionImpl<S>> {
let slot = self.0.read().unwrap().get(name).cloned()?;
slot.get_ready_region()
}
/// Update the slot by name.
fn update(&self, name: &str, slot: RegionSlot<S>) {
let mut regions = self.0.write().unwrap();
if let Some(old) = regions.get_mut(name) {
*old = slot;
}
}
/// Remove region by name.
fn remove(&self, name: &str) {
let mut regions = self.0.write().unwrap();
regions.remove(name);
}
}
struct EngineInner<S: LogStore> {
object_store: ObjectStore,
log_store: Arc<S>,
regions: RwLock<RegionMap<S>>,
regions: Arc<RegionMap<S>>,
memtable_builder: MemtableBuilderRef,
flush_scheduler: FlushSchedulerRef,
flush_scheduler: FlushSchedulerRef<S>,
flush_strategy: FlushStrategyRef,
compaction_scheduler: CompactionSchedulerRef<S>,
file_purger: FilePurgerRef,
@@ -237,8 +279,11 @@ impl<S: LogStore> EngineInner<S> {
object_store: ObjectStore,
compaction_scheduler: CompactionSchedulerRef<S>,
) -> Self {
let job_pool = Arc::new(JobPoolImpl {});
let flush_scheduler = Arc::new(FlushSchedulerImpl::new(job_pool));
// TODO(yingwen): max inflight flush tasks.
let flush_scheduler = Arc::new(FlushScheduler::new(
SchedulerConfig::default(),
compaction_scheduler.clone(),
));
let file_purger = Arc::new(LocalScheduler::new(
SchedulerConfig {
@@ -249,7 +294,7 @@ impl<S: LogStore> EngineInner<S> {
Self {
object_store,
log_store,
regions: RwLock::new(Default::default()),
regions: Arc::new(RegionMap::new()),
memtable_builder: Arc::new(DefaultMemtableBuilder::default()),
flush_scheduler,
flush_strategy: Arc::new(SizeBasedStrategy::default()),
@@ -259,33 +304,10 @@ impl<S: LogStore> EngineInner<S> {
}
}
/// Returns the `Some(slot)` if there is existing slot with given `name`, or insert
/// given `slot` and returns `None`.
fn get_or_occupy_slot(&self, name: &str, slot: RegionSlot<S>) -> Option<RegionSlot<S>> {
{
// Try to get the region under read lock.
let regions = self.regions.read().unwrap();
if let Some(slot) = regions.get(name) {
return Some(slot.clone());
}
}
// Get the region under write lock.
let mut regions = self.regions.write().unwrap();
if let Some(slot) = regions.get(name) {
return Some(slot.clone());
}
// No slot in map, we can insert the slot now.
regions.insert(name.to_string(), slot);
None
}
async fn open_region(&self, name: &str, opts: &OpenOptions) -> Result<Option<RegionImpl<S>>> {
// We can wait until the state of the slot has been changed to ready, but this will
// make the code more complicate, so we just return the error here.
if let Some(slot) = self.get_or_occupy_slot(name, RegionSlot::Opening) {
if let Some(slot) = self.regions.get_or_occupy_slot(name, RegionSlot::Opening) {
return slot.try_get_ready_region().map(Some);
}
@@ -320,7 +342,10 @@ impl<S: LogStore> EngineInner<S> {
descriptor: RegionDescriptor,
opts: &CreateOptions,
) -> Result<RegionImpl<S>> {
if let Some(slot) = self.get_or_occupy_slot(&descriptor.name, RegionSlot::Creating) {
if let Some(slot) = self
.regions
.get_or_occupy_slot(&descriptor.name, RegionSlot::Creating)
{
return slot.try_get_ready_region();
}
@@ -359,8 +384,7 @@ impl<S: LogStore> EngineInner<S> {
}
fn get_region(&self, name: &str) -> Option<RegionImpl<S>> {
let slot = self.regions.read().unwrap().get(name).cloned()?;
slot.get_ready_region()
self.regions.get_region(name)
}
async fn region_store_config(

View File

@@ -138,21 +138,17 @@ pub enum Error {
source: std::io::Error,
},
#[snafu(display("Failed to join task, source: {}", source))]
JoinTask {
source: common_runtime::JoinError,
#[snafu(display(
"Failed to wait flushing, region_id: {}, source: {}",
region_id,
source
))]
WaitFlush {
region_id: RegionId,
source: tokio::sync::oneshot::error::RecvError,
location: Location,
},
#[snafu(display("Task already cancelled"))]
Cancelled { location: Location },
#[snafu(display("Failed to cancel flush, source: {}", source))]
CancelFlush {
#[snafu(backtrace)]
source: BoxedError,
},
#[snafu(display(
"Manifest protocol forbid to read, min_version: {}, supported_version: {}",
min_version,
@@ -455,6 +451,17 @@ pub enum Error {
region_id: RegionId,
source: tokio::sync::oneshot::error::RecvError,
},
#[snafu(display(
"The flush request is duplicate, region_id: {}, sequence: {}",
region_id,
sequence
))]
DuplicateFlush {
region_id: RegionId,
sequence: SequenceNumber,
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -492,9 +499,7 @@ impl ErrorExt for Error {
Utf8 { .. }
| EncodeJson { .. }
| DecodeJson { .. }
| JoinTask { .. }
| Cancelled { .. }
| CancelFlush { .. }
| WaitFlush { .. }
| DecodeMetaActionList { .. }
| Readline { .. }
| WalDataCorrupted { .. }
@@ -546,7 +551,8 @@ impl ErrorExt for Error {
StartManifestGcTask { .. }
| StopManifestGcTask { .. }
| IllegalSchedulerState { .. } => StatusCode::Unexpected,
| IllegalSchedulerState { .. }
| DuplicateFlush { .. } => StatusCode::Unexpected,
TtlCalculation { source, .. } => source.status_code(),
}

View File

@@ -12,22 +12,22 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::future::Future;
use std::pin::Pin;
mod scheduler;
use std::sync::Arc;
use async_trait::async_trait;
use common_telemetry::logging;
use common_telemetry::{logging, timer};
pub use scheduler::{FlushHandle, FlushRequest, FlushScheduler, FlushSchedulerRef};
use store_api::logstore::LogStore;
use store_api::storage::consts::WRITE_ROW_GROUP_SIZE;
use store_api::storage::SequenceNumber;
use crate::background::{Context, Job, JobHandle, JobPoolRef};
use crate::config::EngineConfig;
use crate::error::{CancelledSnafu, Result};
use crate::error::Result;
use crate::manifest::action::*;
use crate::manifest::region::RegionManifest;
use crate::memtable::{IterContext, MemtableId, MemtableRef};
use crate::metrics::FLUSH_ELAPSED;
use crate::region::{RegionWriterRef, SharedDataRef};
use crate::sst::{AccessLayerRef, FileId, FileMeta, Source, SstInfo, WriteOptions};
use crate::wal::Wal;
@@ -127,34 +127,6 @@ impl FlushStrategy for SizeBasedStrategy {
}
}
#[async_trait]
pub trait FlushScheduler: Send + Sync + std::fmt::Debug {
async fn schedule_flush(&self, flush_job: Box<dyn Job>) -> Result<JobHandle>;
}
#[derive(Debug)]
pub struct FlushSchedulerImpl {
job_pool: JobPoolRef,
}
impl FlushSchedulerImpl {
pub fn new(job_pool: JobPoolRef) -> FlushSchedulerImpl {
FlushSchedulerImpl { job_pool }
}
}
#[async_trait]
impl FlushScheduler for FlushSchedulerImpl {
async fn schedule_flush(&self, flush_job: Box<dyn Job>) -> Result<JobHandle> {
// TODO(yingwen): [flush] Implements flush schedule strategy, controls max background flushes.
self.job_pool.submit(flush_job).await
}
}
pub type FlushSchedulerRef = Arc<dyn FlushScheduler>;
pub type FlushCallback = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
pub struct FlushJob<S: LogStore> {
/// Max memtable id in these memtables,
/// used to remove immutable memtables in current version.
@@ -173,18 +145,22 @@ pub struct FlushJob<S: LogStore> {
pub wal: Wal<S>,
/// Region manifest service, used to persist metadata.
pub manifest: RegionManifest,
/// Callbacks that get invoked on flush success.
pub on_success: Option<FlushCallback>,
/// Storage engine config
pub engine_config: Arc<EngineConfig>,
}
impl<S: LogStore> FlushJob<S> {
async fn write_memtables_to_layer(&mut self, ctx: &Context) -> Result<Vec<FileMeta>> {
if ctx.is_cancelled() {
return CancelledSnafu {}.fail();
}
/// Execute the flush job.
async fn run(&mut self) -> Result<()> {
let _timer = timer!(FLUSH_ELAPSED);
let file_metas = self.write_memtables_to_layer().await?;
self.write_manifest_and_apply(&file_metas).await?;
Ok(())
}
async fn write_memtables_to_layer(&mut self) -> Result<Vec<FileMeta>> {
let region_id = self.shared.id();
let mut futures = Vec::with_capacity(self.memtables.len());
let iter_ctx = IterContext {
@@ -258,20 +234,6 @@ impl<S: LogStore> FlushJob<S> {
}
}
#[async_trait]
impl<S: LogStore> Job for FlushJob<S> {
// TODO(yingwen): [flush] Support in-job parallelism (Flush memtables concurrently)
async fn run(&mut self, ctx: &Context) -> Result<()> {
let file_metas = self.write_memtables_to_layer(ctx).await?;
self.write_manifest_and_apply(&file_metas).await?;
if let Some(cb) = self.on_success.take() {
cb.await;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -0,0 +1,252 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use std::time::Duration;
use common_telemetry::logging;
use metrics::increment_counter;
use snafu::{ensure, ResultExt};
use store_api::logstore::LogStore;
use store_api::storage::{RegionId, SequenceNumber};
use tokio::sync::oneshot::{Receiver, Sender};
use tokio::sync::{oneshot, Notify};
use crate::compaction::{CompactionRequestImpl, CompactionSchedulerRef};
use crate::config::EngineConfig;
use crate::error::{DuplicateFlushSnafu, Result, WaitFlushSnafu};
use crate::flush::FlushJob;
use crate::manifest::region::RegionManifest;
use crate::memtable::{MemtableId, MemtableRef};
use crate::metrics::{FLUSH_ERRORS_TOTAL, FLUSH_REQUESTS_TOTAL};
use crate::region;
use crate::region::{RegionWriterRef, SharedDataRef};
use crate::scheduler::rate_limit::BoxedRateLimitToken;
use crate::scheduler::{Handler, LocalScheduler, Request, Scheduler, SchedulerConfig};
use crate::sst::AccessLayerRef;
use crate::wal::Wal;
/// Key for [FlushRequest], consist of a region id and the flush
/// sequence.
type FlushKey = (RegionId, SequenceNumber);
/// Region flush request.
pub struct FlushRequest<S: LogStore> {
/// Max memtable id in these memtables,
/// used to remove immutable memtables in current version.
pub max_memtable_id: MemtableId,
/// Memtables to be flushed.
pub memtables: Vec<MemtableRef>,
/// Last sequence of data to be flushed.
pub flush_sequence: SequenceNumber,
/// Shared data of region to be flushed.
pub shared: SharedDataRef,
/// Sst access layer of the region.
pub sst_layer: AccessLayerRef,
/// Region writer, used to persist log entry that points to the latest manifest file.
pub writer: RegionWriterRef,
/// Region write-ahead logging, used to write data/meta to the log file.
pub wal: Wal<S>,
/// Region manifest service, used to persist metadata.
pub manifest: RegionManifest,
/// Storage engine config
pub engine_config: Arc<EngineConfig>,
/// Flush result sender. Callers should set the sender to None.
pub sender: Option<Sender<Result<()>>>,
// Compaction related options:
/// TTL of the region.
pub ttl: Option<Duration>,
/// Time window for compaction.
pub compaction_time_window: Option<i64>,
}
impl<S: LogStore> FlushRequest<S> {
#[inline]
fn region_id(&self) -> RegionId {
self.shared.id()
}
}
impl<S: LogStore> Request for FlushRequest<S> {
type Key = FlushKey;
#[inline]
fn key(&self) -> FlushKey {
(self.shared.id(), self.flush_sequence)
}
fn complete(self, result: Result<()>) {
if let Some(sender) = self.sender {
let _ = sender.send(result);
}
}
}
impl<S: LogStore> From<&FlushRequest<S>> for FlushJob<S> {
fn from(req: &FlushRequest<S>) -> FlushJob<S> {
FlushJob {
max_memtable_id: req.max_memtable_id,
memtables: req.memtables.clone(),
flush_sequence: req.flush_sequence,
shared: req.shared.clone(),
sst_layer: req.sst_layer.clone(),
writer: req.writer.clone(),
wal: req.wal.clone(),
manifest: req.manifest.clone(),
engine_config: req.engine_config.clone(),
}
}
}
impl<S: LogStore> From<&FlushRequest<S>> for CompactionRequestImpl<S> {
fn from(req: &FlushRequest<S>) -> CompactionRequestImpl<S> {
CompactionRequestImpl {
region_id: req.region_id(),
sst_layer: req.sst_layer.clone(),
writer: req.writer.clone(),
shared: req.shared.clone(),
manifest: req.manifest.clone(),
wal: req.wal.clone(),
ttl: req.ttl,
compaction_time_window: req.compaction_time_window,
sender: None,
sst_write_buffer_size: req.engine_config.sst_write_buffer_size,
}
}
}
/// A handle to get the flush result.
#[derive(Debug)]
pub struct FlushHandle {
region_id: RegionId,
receiver: Receiver<Result<()>>,
}
impl FlushHandle {
/// Waits until the flush job is finished.
pub async fn wait(self) -> Result<()> {
self.receiver.await.context(WaitFlushSnafu {
region_id: self.region_id,
})?
}
}
/// Flush scheduler.
pub struct FlushScheduler<S: LogStore> {
scheduler: LocalScheduler<FlushRequest<S>>,
}
pub type FlushSchedulerRef<S> = Arc<FlushScheduler<S>>;
impl<S: LogStore> FlushScheduler<S> {
/// Returns a new [FlushScheduler].
pub fn new(config: SchedulerConfig, compaction_scheduler: CompactionSchedulerRef<S>) -> Self {
let handler = FlushHandler {
compaction_scheduler,
};
Self {
scheduler: LocalScheduler::new(config, handler),
}
}
/// Schedules a flush request and return the handle to the flush task.
///
/// # Panics
/// Panics if `sender` of the `req` is not `None`.
pub fn schedule_flush(&self, mut req: FlushRequest<S>) -> Result<FlushHandle> {
assert!(req.sender.is_none());
let region_id = req.region_id();
let sequence = req.flush_sequence;
let (sender, receiver) = oneshot::channel();
req.sender = Some(sender);
let scheduled = self.scheduler.schedule(req)?;
// Normally we should not have duplicate flush request.
ensure!(
scheduled,
DuplicateFlushSnafu {
region_id,
sequence,
}
);
increment_counter!(FLUSH_REQUESTS_TOTAL);
Ok(FlushHandle {
region_id,
receiver,
})
}
}
struct FlushHandler<S: LogStore> {
compaction_scheduler: CompactionSchedulerRef<S>,
}
#[async_trait::async_trait]
impl<S: LogStore> Handler for FlushHandler<S> {
type Request = FlushRequest<S>;
async fn handle_request(
&self,
req: FlushRequest<S>,
token: BoxedRateLimitToken,
finish_notifier: Arc<Notify>,
) -> Result<()> {
let compaction_scheduler = self.compaction_scheduler.clone();
common_runtime::spawn_bg(async move {
execute_flush(req, compaction_scheduler).await;
// releases rate limit token
token.try_release();
// notify scheduler to schedule next task when current task finishes.
finish_notifier.notify_one();
});
Ok(())
}
}
async fn execute_flush<S: LogStore>(
req: FlushRequest<S>,
compaction_scheduler: CompactionSchedulerRef<S>,
) {
let mut flush_job = FlushJob::from(&req);
if let Err(e) = flush_job.run().await {
logging::error!(e; "Failed to flush regoin {}", req.region_id());
increment_counter!(FLUSH_ERRORS_TOTAL);
req.complete(Err(e));
} else {
logging::debug!("Successfully flush region: {}", req.region_id());
let compaction_request = CompactionRequestImpl::from(&req);
let max_files_in_l0 = req.engine_config.max_files_in_l0;
let shared_data = req.shared.clone();
// If flush is success, schedule a compaction request for this region.
region::schedule_compaction(
shared_data,
compaction_scheduler,
compaction_request,
max_files_in_l0,
);
req.complete(Ok(()));
}
}

View File

@@ -14,7 +14,6 @@
//! Storage engine implementation.
mod background;
mod chunk;
pub mod codec;
pub mod compaction;

View File

@@ -16,3 +16,9 @@
/// Elapsed time of updating manifest when creating regions.
pub const CREATE_REGION_UPDATE_MANIFEST: &str = "storage.create_region.update_manifest";
/// Counter of scheduled flush requests.
pub const FLUSH_REQUESTS_TOTAL: &str = "storage.flush.requests_total";
/// Counter of scheduled failed flush jobs.
pub const FLUSH_ERRORS_TOTAL: &str = "storage.flush.errors_total";
/// Elapsed time of a flush job.
pub const FLUSH_ELAPSED: &str = "storage.flush.elapsed";

View File

@@ -42,6 +42,7 @@ use crate::manifest::action::{
use crate::manifest::region::RegionManifest;
use crate::memtable::MemtableBuilderRef;
use crate::metadata::{RegionMetaImpl, RegionMetadata, RegionMetadataRef};
pub(crate) use crate::region::writer::schedule_compaction;
pub use crate::region::writer::{AlterContext, RegionWriter, RegionWriterRef, WriterContext};
use crate::schema::compat::CompatWrite;
use crate::snapshot::SnapshotImpl;
@@ -72,7 +73,6 @@ impl<S: LogStore> fmt::Debug for RegionImpl<S> {
.field("name", &self.inner.shared.name)
.field("wal", &self.inner.wal)
.field("flush_strategy", &self.inner.flush_strategy)
.field("flush_scheduler", &self.inner.flush_scheduler)
.field("compaction_scheduler", &self.inner.compaction_scheduler)
.field("sst_layer", &self.inner.sst_layer)
.field("manifest", &self.inner.manifest)
@@ -150,7 +150,7 @@ pub struct StoreConfig<S: LogStore> {
pub sst_layer: AccessLayerRef,
pub manifest: RegionManifest,
pub memtable_builder: MemtableBuilderRef,
pub flush_scheduler: FlushSchedulerRef,
pub flush_scheduler: FlushSchedulerRef<S>,
pub flush_strategy: FlushStrategyRef,
pub compaction_scheduler: CompactionSchedulerRef<S>,
pub engine_config: Arc<EngineConfig>,
@@ -575,7 +575,7 @@ struct RegionInner<S: LogStore> {
writer: RegionWriterRef,
wal: Wal<S>,
flush_strategy: FlushStrategyRef,
flush_scheduler: FlushSchedulerRef,
flush_scheduler: FlushSchedulerRef<S>,
compaction_scheduler: CompactionSchedulerRef<S>,
sst_layer: AccessLayerRef,
manifest: RegionManifest,

View File

@@ -16,9 +16,7 @@ use std::sync::Arc;
use std::time::Duration;
use common_base::readable_size::ReadableSize;
use common_error::prelude::BoxedError;
use common_telemetry::tracing::log::{debug, info};
use common_telemetry::{error, logging};
use common_telemetry::logging;
use futures::TryStreamExt;
use snafu::{ensure, ResultExt};
use store_api::logstore::LogStore;
@@ -26,11 +24,10 @@ use store_api::manifest::{Manifest, ManifestVersion, MetaAction};
use store_api::storage::{AlterRequest, FlushContext, SequenceNumber, WriteContext, WriteResponse};
use tokio::sync::{oneshot, Mutex};
use crate::background::JobHandle;
use crate::compaction::{CompactionRequestImpl, CompactionSchedulerRef};
use crate::config::EngineConfig;
use crate::error::{self, Result};
use crate::flush::{FlushCallback, FlushJob, FlushSchedulerRef, FlushStrategyRef};
use crate::flush::{FlushHandle, FlushRequest, FlushSchedulerRef, FlushStrategyRef};
use crate::manifest::action::{
RawRegionMetadata, RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList,
};
@@ -42,7 +39,7 @@ use crate::region::{
};
use crate::schema::compat::CompatWrite;
use crate::sst::AccessLayerRef;
use crate::version::{VersionControl, VersionControlRef, VersionEdit, VersionRef};
use crate::version::{VersionControl, VersionControlRef, VersionEdit};
use crate::wal::Wal;
use crate::write_batch::WriteBatch;
@@ -267,7 +264,7 @@ impl RegionWriter {
}
// we release the writer lock once for rejecting any following potential writing requests immediately.
self.cancel_flush().await?;
self.wait_flush().await?;
// TODO: canncel the compaction task
@@ -288,7 +285,7 @@ impl RegionWriter {
if ctx.wait {
if let Some(handle) = inner.flush_handle.take() {
handle.join().await?;
handle.wait().await?;
}
}
@@ -310,15 +307,12 @@ impl RegionWriter {
.await
}
/// Cancel flush task if any
async fn cancel_flush(&self) -> Result<()> {
/// Wait flush task if any
async fn wait_flush(&self) -> Result<()> {
let mut inner = self.inner.lock().await;
if let Some(task) = inner.flush_handle.take() {
task.cancel()
.await
.map_err(BoxedError::new)
.context(error::CancelFlushSnafu)?;
if let Some(handle) = inner.flush_handle.take() {
handle.wait().await?;
}
Ok(())
@@ -328,7 +322,7 @@ impl RegionWriter {
pub struct WriterContext<'a, S: LogStore> {
pub shared: &'a SharedDataRef,
pub flush_strategy: &'a FlushStrategyRef,
pub flush_scheduler: &'a FlushSchedulerRef,
pub flush_scheduler: &'a FlushSchedulerRef<S>,
pub compaction_scheduler: &'a CompactionSchedulerRef<S>,
pub sst_layer: &'a AccessLayerRef,
pub wal: &'a Wal<S>,
@@ -359,7 +353,7 @@ impl<'a, S: LogStore> AlterContext<'a, S> {
#[derive(Debug)]
struct WriterInner {
memtable_builder: MemtableBuilderRef,
flush_handle: Option<JobHandle>,
flush_handle: Option<FlushHandle>,
/// `WriterInner` will reject any future writing, if the closed flag is set.
///
@@ -625,15 +619,13 @@ impl WriterInner {
version_control.freeze_mutable(new_mutable);
if let Some(flush_handle) = self.flush_handle.take() {
// Previous flush job is incomplete, wait util it is finished (write stall).
// Previous flush job is incomplete, wait util it is finished.
// However the last flush job may fail, in which case, we just return error
// and abort current write request. The flush handle is left empty, so the next
// time we still have chance to trigger a new flush.
logging::info!("Write stall, region: {}", ctx.shared.name);
// TODO(yingwen): We should release the write lock during waiting flush done, which
// needs something like async condvar.
flush_handle.join().await.map_err(|e| {
flush_handle.wait().await.map_err(|e| {
logging::error!(e; "Previous flush job failed, region: {}", ctx.shared.name);
e
})?;
@@ -647,15 +639,7 @@ impl WriterInner {
return Ok(());
}
let cb = Self::build_flush_callback(
&current_version,
ctx,
&self.engine_config,
self.ttl,
self.compaction_time_window,
);
let flush_req = FlushJob {
let flush_req = FlushRequest {
max_memtable_id: max_memtable_id.unwrap(),
memtables: mem_to_flush,
// In write thread, safe to use current committed sequence.
@@ -665,14 +649,16 @@ impl WriterInner {
writer: ctx.writer.clone(),
wal: ctx.wal.clone(),
manifest: ctx.manifest.clone(),
on_success: cb,
engine_config: self.engine_config.clone(),
sender: None,
ttl: self.ttl,
compaction_time_window: self.compaction_time_window,
};
let flush_handle = ctx
.flush_scheduler
.schedule_flush(Box::new(flush_req))
.await?;
let flush_handle = ctx.flush_scheduler.schedule_flush(flush_req).map_err(|e| {
logging::error!(e; "Failed to schedule flush request");
e
})?;
self.flush_handle = Some(flush_handle);
Ok(())
@@ -711,106 +697,28 @@ impl WriterInner {
let (sender, receiver) = oneshot::channel();
compaction_request.sender = Some(sender);
if Self::schedule_compaction(
if schedule_compaction(
shared_data,
compaction_scheduler,
compaction_request,
compact_ctx.max_files_in_l0,
)
.await
{
) {
receiver
.await
.context(error::CompactTaskCancelSnafu { region_id })??;
}
} else {
Self::schedule_compaction(
schedule_compaction(
shared_data,
compaction_scheduler,
compaction_request,
compact_ctx.max_files_in_l0,
)
.await;
);
}
Ok(())
}
fn build_flush_callback<S: LogStore>(
version: &VersionRef,
ctx: &WriterContext<S>,
config: &Arc<EngineConfig>,
ttl: Option<Duration>,
compaction_time_window: Option<i64>,
) -> Option<FlushCallback> {
let region_id = version.metadata().id();
let compaction_request = CompactionRequestImpl {
region_id,
sst_layer: ctx.sst_layer.clone(),
writer: ctx.writer.clone(),
shared: ctx.shared.clone(),
manifest: ctx.manifest.clone(),
wal: ctx.wal.clone(),
ttl,
compaction_time_window,
sender: None,
sst_write_buffer_size: config.sst_write_buffer_size,
};
let compaction_scheduler = ctx.compaction_scheduler.clone();
let shared_data = ctx.shared.clone();
let max_files_in_l0 = config.max_files_in_l0;
let schedule_compaction_cb = Box::pin(async move {
Self::schedule_compaction(
shared_data,
compaction_scheduler,
compaction_request,
max_files_in_l0,
)
.await;
});
Some(schedule_compaction_cb)
}
/// Schedule compaction task, returns whether the task is scheduled.
async fn schedule_compaction<S: LogStore>(
shared_data: SharedDataRef,
compaction_scheduler: CompactionSchedulerRef<S>,
compaction_request: CompactionRequestImpl<S>,
max_files_in_l0: usize,
) -> bool {
let region_id = shared_data.id();
let level0_file_num = shared_data
.version_control
.current()
.ssts()
.level(0)
.file_num();
if level0_file_num <= max_files_in_l0 {
debug!(
"No enough SST files in level 0 (threshold: {}), skip compaction",
max_files_in_l0
);
return false;
}
match compaction_scheduler.schedule(compaction_request) {
Ok(scheduled) => {
info!(
"Schedule region {} compaction request result: {}",
region_id, scheduled
);
scheduled
}
Err(e) => {
error!(e;"Failed to schedule region compaction request {}", region_id);
false
}
}
}
async fn manual_flush<S: LogStore>(&mut self, writer_ctx: WriterContext<'_, S>) -> Result<()> {
self.trigger_flush(&writer_ctx).await?;
Ok(())
@@ -826,3 +734,43 @@ impl WriterInner {
self.closed = true;
}
}
/// Schedule compaction task, returns whether the task is scheduled.
pub(crate) fn schedule_compaction<S: LogStore>(
shared_data: SharedDataRef,
compaction_scheduler: CompactionSchedulerRef<S>,
compaction_request: CompactionRequestImpl<S>,
max_files_in_l0: usize,
) -> bool {
let region_id = shared_data.id();
let level0_file_num = shared_data
.version_control
.current()
.ssts()
.level(0)
.file_num();
if level0_file_num <= max_files_in_l0 {
logging::debug!(
"No enough SST files in level 0 (threshold: {}), skip compaction",
max_files_in_l0
);
return false;
}
match compaction_scheduler.schedule(compaction_request) {
Ok(scheduled) => {
logging::info!(
"Schedule region {} compaction request result: {}",
region_id,
scheduled
);
scheduled
}
Err(e) => {
logging::error!(e;"Failed to schedule region compaction request {}", region_id);
false
}
}
}

View File

@@ -110,7 +110,20 @@ where
R: Request + Send + Sync,
{
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LocalScheduler<...>").finish()
f.debug_struct("LocalScheduler")
.field("state", &self.state)
.finish()
}
}
impl<R> Drop for LocalScheduler<R>
where
R: Request,
{
fn drop(&mut self) {
self.state.store(STATE_STOP, Ordering::Relaxed);
self.cancel_token.cancel();
}
}
@@ -310,14 +323,14 @@ mod tests {
struct CountdownLatch {
counter: std::sync::Mutex<usize>,
notifies: std::sync::RwLock<Vec<Arc<Notify>>>,
notify: Notify,
}
impl CountdownLatch {
fn new(size: usize) -> Self {
Self {
counter: std::sync::Mutex::new(size),
notifies: std::sync::RwLock::new(vec![]),
notify: Notify::new(),
}
}
@@ -326,22 +339,14 @@ mod tests {
if *counter >= 1 {
*counter -= 1;
if *counter == 0 {
let notifies = self.notifies.read().unwrap();
for waiter in notifies.iter() {
waiter.notify_one();
}
self.notify.notify_one();
}
}
}
/// Users should only call this once.
async fn wait(&self) {
let notify = Arc::new(Notify::new());
{
let notify = notify.clone();
let mut notifies = self.notifies.write().unwrap();
notifies.push(notify);
}
notify.notified().await
self.notify.notified().await
}
}

View File

@@ -20,11 +20,10 @@ use object_store::services::Fs;
use object_store::ObjectStore;
use store_api::manifest::Manifest;
use crate::background::JobPoolImpl;
use crate::compaction::noop::NoopCompactionScheduler;
use crate::engine;
use crate::file_purger::noop::NoopFilePurgeHandler;
use crate::flush::{FlushSchedulerImpl, SizeBasedStrategy};
use crate::flush::{FlushScheduler, SizeBasedStrategy};
use crate::manifest::region::RegionManifest;
use crate::memtable::DefaultMemtableBuilder;
use crate::region::StoreConfig;
@@ -60,14 +59,16 @@ pub async fn new_store_config_with_object_store(
let sst_layer = Arc::new(FsAccessLayer::new(&sst_dir, object_store.clone()));
let manifest = RegionManifest::with_checkpointer(&manifest_dir, object_store, None, None);
manifest.start().await.unwrap();
let job_pool = Arc::new(JobPoolImpl {});
let flush_scheduler = Arc::new(FlushSchedulerImpl::new(job_pool));
let log_config = LogConfig {
log_file_dir: log_store_dir(store_dir),
..Default::default()
};
let log_store = Arc::new(RaftEngineLogStore::try_new(log_config).await.unwrap());
let compaction_scheduler = Arc::new(NoopCompactionScheduler::default());
let flush_scheduler = Arc::new(FlushScheduler::new(
SchedulerConfig::default(),
compaction_scheduler.clone(),
));
let file_purger = Arc::new(LocalScheduler::new(
SchedulerConfig::default(),
NoopFilePurgeHandler,