feat: write manifests in background tasks (#3709)

* chore: truncate wip

* feat: truncate and edit write manifest in background

* refactor: wrap in manifest context

* feat: alter write manifest in background

* chore: fix compiler errors

* feat: flush update manifest in background

* feat: compaction update manifest in background

* feat: set dropping state

* feat: reset drop state

* feat: check state before updating manifest

* test: fix compaction test

* refactor: rename method

* chore: update comment

* chore: discard state guard

* refactor: use atomic cell to store state enum

* chore: fix clippy

* chore: update toml

* chore: remove unused type alias

* feat: check state after writing manifest

* chore: address CR comments

* chore: change status code

* chore: Update src/mito2/src/region.rs

Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com>

* fix: executes applier

---------

Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com>
This commit is contained in:
Yingwen
2024-04-24 11:09:48 +08:00
committed by GitHub
parent 86a989517e
commit 4685b59ef1
28 changed files with 818 additions and 367 deletions

View File

@@ -59,6 +59,7 @@ pub enum StatusCode {
RegionNotFound = 4005,
RegionAlreadyExists = 4006,
RegionReadonly = 4007,
/// Region is not in a proper state to handle specific request.
RegionNotReady = 4008,
// If mutually exclusive operations are reached at the same time,
// only one can be executed, another one will get region busy.

View File

@@ -32,6 +32,7 @@ common-test-util = { workspace = true, optional = true }
common-time.workspace = true
common-wal.workspace = true
crc32fast = "1"
crossbeam-utils.workspace = true
datafusion.workspace = true
datafusion-common.workspace = true
datafusion-expr.workspace = true

View File

@@ -37,9 +37,11 @@ use crate::error::{
use crate::metrics::COMPACTION_STAGE_ELAPSED;
use crate::region::options::CompactionOptions;
use crate::region::version::{VersionControlRef, VersionRef};
use crate::region::ManifestContextRef;
use crate::request::{OptionOutputTx, OutputTx, WorkerRequest};
use crate::schedule::scheduler::SchedulerRef;
use crate::sst::file_purger::FilePurgerRef;
use crate::worker::WorkerListener;
/// Region compaction request.
pub struct CompactionRequest {
@@ -54,6 +56,9 @@ pub struct CompactionRequest {
/// Start time of compaction task.
pub(crate) start_time: Instant,
pub(crate) cache_manager: CacheManagerRef,
pub(crate) manifest_ctx: ManifestContextRef,
pub(crate) version_control: VersionControlRef,
pub(crate) listener: WorkerListener,
}
impl CompactionRequest {
@@ -88,6 +93,8 @@ pub(crate) struct CompactionScheduler {
/// Request sender of the worker that this scheduler belongs to.
request_sender: Sender<WorkerRequest>,
cache_manager: CacheManagerRef,
engine_config: Arc<MitoConfig>,
listener: WorkerListener,
}
impl CompactionScheduler {
@@ -95,12 +102,16 @@ impl CompactionScheduler {
scheduler: SchedulerRef,
request_sender: Sender<WorkerRequest>,
cache_manager: CacheManagerRef,
engine_config: Arc<MitoConfig>,
listener: WorkerListener,
) -> Self {
Self {
scheduler,
region_status: HashMap::new(),
request_sender,
cache_manager,
engine_config,
listener,
}
}
@@ -112,7 +123,7 @@ impl CompactionScheduler {
access_layer: &AccessLayerRef,
file_purger: &FilePurgerRef,
waiter: OptionOutputTx,
engine_config: Arc<MitoConfig>,
manifest_ctx: &ManifestContextRef,
) -> Result<()> {
if let Some(status) = self.region_status.get_mut(&region_id) {
// Region is compacting. Add the waiter to pending list.
@@ -130,8 +141,10 @@ impl CompactionScheduler {
let request = status.new_compaction_request(
self.request_sender.clone(),
waiter,
engine_config,
self.engine_config.clone(),
self.cache_manager.clone(),
manifest_ctx,
self.listener.clone(),
);
self.region_status.insert(region_id, status);
self.schedule_compaction_request(request)
@@ -141,7 +154,7 @@ impl CompactionScheduler {
pub(crate) fn on_compaction_finished(
&mut self,
region_id: RegionId,
engine_config: Arc<MitoConfig>,
manifest_ctx: &ManifestContextRef,
) {
let Some(status) = self.region_status.get_mut(&region_id) else {
return;
@@ -150,8 +163,10 @@ impl CompactionScheduler {
let request = status.new_compaction_request(
self.request_sender.clone(),
OptionOutputTx::none(),
engine_config,
self.engine_config.clone(),
self.cache_manager.clone(),
manifest_ctx,
self.listener.clone(),
);
// Try to schedule next compaction task for this region.
if let Err(e) = self.schedule_compaction_request(request) {
@@ -325,6 +340,8 @@ impl CompactionStatus {
waiter: OptionOutputTx,
engine_config: Arc<MitoConfig>,
cache_manager: CacheManagerRef,
manifest_ctx: &ManifestContextRef,
listener: WorkerListener,
) -> CompactionRequest {
let current_version = self.version_control.current().version;
let start_time = Instant::now();
@@ -337,6 +354,9 @@ impl CompactionStatus {
file_purger: self.file_purger.clone(),
start_time,
cache_manager,
manifest_ctx: manifest_ctx.clone(),
version_control: self.version_control.clone(),
listener,
};
if let Some(pending) = self.pending_compaction.take() {
@@ -371,6 +391,9 @@ mod tests {
let version_control = Arc::new(builder.build());
let (output_tx, output_rx) = oneshot::channel();
let waiter = OptionOutputTx::from(output_tx);
let manifest_ctx = env
.mock_manifest_context(version_control.current().version.metadata.clone())
.await;
scheduler
.schedule_compaction(
builder.region_id(),
@@ -378,7 +401,7 @@ mod tests {
&env.access_layer,
&purger,
waiter,
Arc::new(MitoConfig::default()),
&manifest_ctx,
)
.unwrap();
let output = output_rx.await.unwrap().unwrap();
@@ -396,7 +419,7 @@ mod tests {
&env.access_layer,
&purger,
waiter,
Arc::new(MitoConfig::default()),
&manifest_ctx,
)
.unwrap();
let output = output_rx.await.unwrap().unwrap();
@@ -448,6 +471,9 @@ mod tests {
.push_l0_file(90, end)
.build(),
);
let manifest_ctx = env
.mock_manifest_context(version_control.current().version.metadata.clone())
.await;
scheduler
.schedule_compaction(
region_id,
@@ -455,7 +481,7 @@ mod tests {
&env.access_layer,
&purger,
OptionOutputTx::none(),
Arc::new(MitoConfig::default()),
&manifest_ctx,
)
.unwrap();
// Should schedule 1 compaction.
@@ -483,7 +509,7 @@ mod tests {
&env.access_layer,
&purger,
OptionOutputTx::none(),
Arc::new(MitoConfig::default()),
&manifest_ctx,
)
.unwrap();
assert_eq!(1, scheduler.region_status.len());
@@ -496,7 +522,7 @@ mod tests {
.is_some());
// On compaction finished and schedule next compaction.
scheduler.on_compaction_finished(region_id, Arc::new(MitoConfig::default()));
scheduler.on_compaction_finished(region_id, &manifest_ctx);
assert_eq!(1, scheduler.region_status.len());
assert_eq!(2, job_scheduler.num_jobs());
// 5 files for next compaction.
@@ -514,7 +540,7 @@ mod tests {
&env.access_layer,
&purger,
OptionOutputTx::none(),
Arc::new(MitoConfig::default()),
&manifest_ctx,
)
.unwrap();
assert_eq!(2, job_scheduler.num_jobs());

View File

@@ -34,12 +34,15 @@ use crate::compaction::picker::{CompactionTask, Picker};
use crate::compaction::CompactionRequest;
use crate::config::MitoConfig;
use crate::error::{self, CompactRegionSnafu};
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_STAGE_ELAPSED};
use crate::read::projection::ProjectionMapper;
use crate::read::scan_region::ScanInput;
use crate::read::seq_scan::SeqScan;
use crate::read::{BoxedBatchReader, Source};
use crate::region::options::IndexOptions;
use crate::region::version::VersionControlRef;
use crate::region::{ManifestContextRef, RegionState};
use crate::request::{
BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, WorkerRequest,
};
@@ -47,6 +50,7 @@ use crate::sst::file::{FileHandle, FileId, FileMeta, IndexType, Level};
use crate::sst::file_purger::FilePurgerRef;
use crate::sst::parquet::WriteOptions;
use crate::sst::version::LevelMeta;
use crate::worker::WorkerListener;
const MAX_PARALLEL_COMPACTION: usize = 8;
@@ -140,6 +144,9 @@ impl Picker for TwcsPicker {
file_purger,
start_time,
cache_manager,
manifest_ctx,
version_control,
listener,
} = req;
let region_metadata = current_version.metadata.clone();
@@ -197,6 +204,9 @@ impl Picker for TwcsPicker {
storage: current_version.options.storage.clone(),
index_options: current_version.options.index_options.clone(),
append_mode: current_version.options.append_mode,
manifest_ctx,
version_control,
listener,
};
Some(Box::new(task))
}
@@ -341,6 +351,12 @@ pub(crate) struct TwcsCompactionTask {
pub(crate) index_options: IndexOptions,
/// The region is using append mode.
pub(crate) append_mode: bool,
/// Manifest context.
pub(crate) manifest_ctx: ManifestContextRef,
/// Version control to update.
pub(crate) version_control: VersionControlRef,
/// Event listener.
pub(crate) listener: WorkerListener,
}
impl Debug for TwcsCompactionTask {
@@ -481,18 +497,55 @@ impl TwcsCompactionTask {
Ok((output_files, inputs))
}
async fn handle_compaction(&mut self) -> error::Result<(Vec<FileMeta>, Vec<FileMeta>)> {
async fn handle_compaction(&mut self) -> error::Result<()> {
self.mark_files_compacting(true);
let merge_timer = COMPACTION_STAGE_ELAPSED
.with_label_values(&["merge"])
.start_timer();
let (output, mut compacted) = self.merge_ssts().await.map_err(|e| {
error!(e; "Failed to compact region: {}", self.region_id);
merge_timer.stop_and_discard();
e
})?;
compacted.extend(self.expired_ssts.iter().map(FileHandle::meta));
Ok((output, compacted))
let (added, mut deleted) = match self.merge_ssts().await {
Ok(v) => v,
Err(e) => {
error!(e; "Failed to compact region: {}", self.region_id);
merge_timer.stop_and_discard();
return Err(e);
}
};
deleted.extend(self.expired_ssts.iter().map(FileHandle::meta));
let merge_time = merge_timer.stop_and_record();
info!(
"Compacted SST files, region_id: {}, input: {:?}, output: {:?}, window: {:?}, waiter_num: {}, merge_time: {}s",
self.region_id,
deleted,
added,
self.compaction_time_window,
self.waiters.len(),
merge_time,
);
self.listener.on_merge_ssts_finished(self.region_id).await;
let _manifest_timer = COMPACTION_STAGE_ELAPSED
.with_label_values(&["write_manifest"])
.start_timer();
// Write region edit to manifest.
let edit = RegionEdit {
files_to_add: added,
files_to_remove: deleted,
compaction_time_window: self
.compaction_time_window
.map(|seconds| Duration::from_secs(seconds as u64)),
flushed_entry_id: None,
flushed_sequence: None,
};
let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
// We might leak files if we fail to update manifest. We can add a cleanup task to
// remove them later.
self.manifest_ctx
.update_manifest(RegionState::Writable, action_list, || {
self.version_control
.apply_edit(edit, &[], self.file_purger.clone());
})
.await
}
/// Handles compaction failure, notifies all waiters.
@@ -520,27 +573,11 @@ impl TwcsCompactionTask {
impl CompactionTask for TwcsCompactionTask {
async fn run(&mut self) {
let notify = match self.handle_compaction().await {
Ok((added, deleted)) => {
info!(
"Compacted SST files, input: {:?}, output: {:?}, window: {:?}, waiter_num: {}",
deleted,
added,
self.compaction_time_window,
self.waiters.len(),
);
BackgroundNotify::CompactionFinished(CompactionFinished {
region_id: self.region_id,
compaction_outputs: added,
compacted_files: deleted,
senders: std::mem::take(&mut self.waiters),
file_purger: self.file_purger.clone(),
compaction_time_window: self
.compaction_time_window
.map(|seconds| Duration::from_secs(seconds as u64)),
start_time: self.start_time,
})
}
Ok(()) => BackgroundNotify::CompactionFinished(CompactionFinished {
region_id: self.region_id,
senders: std::mem::take(&mut self.waiters),
start_time: self.start_time,
}),
Err(e) => {
error!(e; "Failed to compact region, region id: {}", self.region_id);
let err = Arc::new(e);

View File

@@ -345,7 +345,7 @@ async fn test_catchup_with_manifest_update() {
// Ensures the mutable is empty.
assert!(region.version().memtables.mutable.is_empty());
let manifest = region.manifest_manager.read().await.manifest();
let manifest = region.manifest_ctx.manifest().await;
assert_eq!(manifest.manifest_version, 0);
let resp = follower_engine
@@ -361,7 +361,7 @@ async fn test_catchup_with_manifest_update() {
// The inner region was replaced. We must get it again.
let region = follower_engine.get_region(region_id).unwrap();
let manifest = region.manifest_manager.read().await.manifest();
let manifest = region.manifest_ctx.manifest().await;
assert_eq!(manifest.manifest_version, 2);
assert!(!region.is_writable());

View File

@@ -51,9 +51,9 @@ pub trait EventListener: Send + Sync {
let _ = removed;
}
/// Notifies the listener that the region is going to handle the compaction
/// finished request.
async fn on_handle_compaction_finished(&self, region_id: RegionId) {
/// Notifies the listener that ssts has been merged and the region
/// is going to update its manifest.
async fn on_merge_ssts_finished(&self, region_id: RegionId) {
let _ = region_id;
}
}
@@ -201,7 +201,7 @@ impl CompactionListener {
#[async_trait]
impl EventListener for CompactionListener {
async fn on_handle_compaction_finished(&self, region_id: RegionId) {
async fn on_merge_ssts_finished(&self, region_id: RegionId) {
info!("Handle compaction finished request, region {region_id}");
self.handle_finished_notify.notify_one();

View File

@@ -127,7 +127,7 @@ async fn test_engine_open_readonly() {
)
.await
.unwrap_err();
assert_eq!(StatusCode::RegionReadonly, err.status_code());
assert_eq!(StatusCode::RegionNotReady, err.status_code());
assert_eq!(Some(RegionRole::Follower), engine.role(region_id));
// Set writable and write.

View File

@@ -66,7 +66,7 @@ async fn test_set_readonly_gracefully() {
.await
.unwrap_err();
assert_eq!(error.status_code(), StatusCode::RegionReadonly);
assert_eq!(error.status_code(), StatusCode::RegionNotReady);
engine.set_writable(region_id, true).unwrap();

View File

@@ -29,6 +29,7 @@ use store_api::manifest::ManifestVersion;
use store_api::storage::RegionId;
use crate::cache::file_cache::FileType;
use crate::region::RegionState;
use crate::sst::file::FileId;
use crate::worker::WorkerId;
@@ -395,9 +396,11 @@ pub enum Error {
location: Location,
},
#[snafu(display("Region {} is read only", region_id))]
RegionReadonly {
#[snafu(display("Region {} is in {:?} state, expect: {:?}", region_id, state, expect))]
RegionState {
region_id: RegionId,
state: RegionState,
expect: RegionState,
location: Location,
},
@@ -669,7 +672,7 @@ impl ErrorExt for Error {
CompactRegion { source, .. } => source.status_code(),
CompatReader { .. } => StatusCode::Unexpected,
InvalidRegionRequest { source, .. } => source.status_code(),
RegionReadonly { .. } => StatusCode::RegionReadonly,
RegionState { .. } => StatusCode::RegionNotReady,
JsonOptions { .. } => StatusCode::InvalidArguments,
EmptyRegionDir { .. } | EmptyManifestDir { .. } => StatusCode::RegionNotFound,
ArrowReader { .. } => StatusCode::StorageUnavailable,

View File

@@ -31,10 +31,12 @@ use crate::config::MitoConfig;
use crate::error::{
Error, FlushRegionSnafu, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, Result,
};
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
use crate::metrics::{FLUSH_BYTES_TOTAL, FLUSH_ELAPSED, FLUSH_ERRORS_TOTAL, FLUSH_REQUESTS_TOTAL};
use crate::read::Source;
use crate::region::options::IndexOptions;
use crate::region::version::{VersionControlData, VersionControlRef, VersionRef};
use crate::region::version::{VersionControlData, VersionControlRef};
use crate::region::{ManifestContextRef, RegionState};
use crate::request::{
BackgroundNotify, FlushFailed, FlushFinished, OptionOutputTx, OutputTx, SenderDdlRequest,
SenderWriteRequest, WorkerRequest,
@@ -204,6 +206,7 @@ pub(crate) struct RegionFlushTask {
pub(crate) engine_config: Arc<MitoConfig>,
pub(crate) row_group_size: Option<usize>,
pub(crate) cache_manager: CacheManagerRef,
pub(crate) manifest_ctx: ManifestContextRef,
/// Index options for the region.
pub(crate) index_options: IndexOptions,
@@ -240,36 +243,30 @@ impl RegionFlushTask {
// Get a version of this region before creating a job to get current
// wal entry id, sequence and immutable memtables.
let version_data = version_control.current();
// This is used to update the version.
let version_control = version_control.clone();
Box::pin(async move {
self.do_flush(version_data).await;
self.do_flush(version_data, &version_control).await;
})
}
/// Runs the flush task.
async fn do_flush(&mut self, version_data: VersionControlData) {
async fn do_flush(
&mut self,
version_data: VersionControlData,
version_control: &VersionControlRef,
) {
let timer = FLUSH_ELAPSED.with_label_values(&["total"]).start_timer();
self.listener.on_flush_begin(self.region_id).await;
let worker_request = match self.flush_memtables(&version_data.version).await {
Ok(file_metas) => {
let memtables_to_remove = version_data
.version
.memtables
.immutables()
.iter()
.map(|m| m.id())
.collect();
let worker_request = match self.flush_memtables(&version_data, version_control).await {
Ok(()) => {
let flush_finished = FlushFinished {
region_id: self.region_id,
file_metas,
// The last entry has been flushed.
flushed_entry_id: version_data.last_entry_id,
flushed_sequence: version_data.committed_sequence,
memtables_to_remove,
senders: std::mem::take(&mut self.senders),
file_purger: self.file_purger.clone(),
_timer: timer,
};
WorkerRequest::Background {
@@ -293,8 +290,13 @@ impl RegionFlushTask {
self.send_worker_request(worker_request).await;
}
/// Flushes memtables to level 0 SSTs.
async fn flush_memtables(&self, version: &VersionRef) -> Result<Vec<FileMeta>> {
/// Flushes memtables to level 0 SSTs and updates the manifest.
async fn flush_memtables(
&self,
version_data: &VersionControlData,
version_control: &VersionControlRef,
) -> Result<()> {
let version = &version_data.version;
let timer = FLUSH_ELAPSED
.with_label_values(&["flush_memtables"])
.start_timer();
@@ -382,7 +384,31 @@ impl RegionFlushTask {
timer.stop_and_record(),
);
Ok(file_metas)
let memtables_to_remove: SmallVec<[_; 2]> = version_data
.version
.memtables
.immutables()
.iter()
.map(|m| m.id())
.collect();
let edit = RegionEdit {
files_to_add: file_metas,
files_to_remove: Vec::new(),
compaction_time_window: None,
// The last entry has been flushed.
flushed_entry_id: Some(version_data.last_entry_id),
flushed_sequence: Some(version_data.committed_sequence),
};
info!("Applying {edit:?} to region {}", self.region_id);
let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
// We will leak files if the manifest update fails, but we ignore them for simplicity. We can
// add a cleanup job to remove them later.
self.manifest_ctx
.update_manifest(RegionState::Writable, action_list, || {
version_control.apply_edit(edit, &memtables_to_remove, self.file_purger.clone());
})
.await
}
/// Notify flush job status.
@@ -775,6 +801,9 @@ mod tests {
engine_config: Arc::new(MitoConfig::default()),
row_group_size: None,
cache_manager: Arc::new(CacheManager::default()),
manifest_ctx: env
.mock_manifest_context(version_control.current().version.metadata.clone())
.await,
index_options: IndexOptions::default(),
};
task.push_sender(OptionOutputTx::from(output_tx));

View File

@@ -257,9 +257,8 @@ impl RegionManifestManager {
}
/// Stops the manager.
pub async fn stop(&mut self) -> Result<()> {
pub async fn stop(&mut self) {
self.stopped = true;
Ok(())
}
/// Updates the manifest. Returns the current manifest version number.
@@ -524,7 +523,7 @@ mod test {
.unwrap()
.unwrap();
// Stops it.
manager.stop().await.unwrap();
manager.stop().await;
// Open it.
let manager = env
@@ -564,7 +563,7 @@ mod test {
manager.validate_manifest(&new_metadata, 1);
// Reopen the manager.
manager.stop().await.unwrap();
manager.stop().await;
let manager = env
.create_manifest_manager(CompressionType::Uncompressed, 10, None)
.await
@@ -651,7 +650,7 @@ mod test {
// Reopen the manager,
// we just calculate the size from the latest checkpoint file
manager.stop().await.unwrap();
manager.stop().await;
let manager = env
.create_manifest_manager(CompressionType::Uncompressed, 10, None)
.await

View File

@@ -152,7 +152,7 @@ async fn manager_with_checkpoint_distance_1() {
assert_eq!(expected_json, raw_json);
// reopen the manager
manager.stop().await.unwrap();
manager.stop().await;
let manager = reopen_manager(&env, 1, CompressionType::Uncompressed).await;
assert_eq!(10, manager.manifest().manifest_version);
}

View File

@@ -19,21 +19,21 @@ pub mod options;
pub(crate) mod version;
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::{Arc, RwLock};
use common_telemetry::info;
use common_telemetry::{error, info, warn};
use common_wal::options::WalOptions;
use crossbeam_utils::atomic::AtomicCell;
use snafu::{ensure, OptionExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::RegionId;
use tokio::sync::RwLock as TokioRwLock;
use crate::access_layer::AccessLayerRef;
use crate::error::{RegionNotFoundSnafu, RegionReadonlySnafu, Result};
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
use crate::error::{RegionNotFoundSnafu, RegionStateSnafu, RegionTruncatedSnafu, Result};
use crate::manifest::action::{RegionMetaAction, RegionMetaActionList};
use crate::manifest::manager::RegionManifestManager;
use crate::memtable::{MemtableBuilderRef, MemtableId};
use crate::memtable::MemtableBuilderRef;
use crate::region::version::{VersionControlRef, VersionRef};
use crate::request::OnFailure;
use crate::sst::file_purger::FilePurgerRef;
@@ -57,6 +57,23 @@ impl RegionUsage {
}
}
/// State of the region.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RegionState {
/// The region is opened but is still read-only.
ReadOnly,
/// The region is opened and is writable.
Writable,
/// The region is altering.
Altering,
/// The region is dropping.
Dropping,
/// The region is truncating.
Truncating,
/// The region is handling a region edit.
Editing,
}
/// Metadata and runtime status of a region.
///
/// Writing and reading a region follow a single-writer-multi-reader rule:
@@ -71,19 +88,19 @@ pub(crate) struct MitoRegion {
pub(crate) region_id: RegionId,
/// Version controller for this region.
///
/// We MUST update the version control inside the write lock of the region manifest manager.
pub(crate) version_control: VersionControlRef,
/// SSTs accessor for this region.
pub(crate) access_layer: AccessLayerRef,
/// Manager to maintain manifest for this region.
pub(crate) manifest_manager: TokioRwLock<RegionManifestManager>,
/// Context to maintain manifest for this region.
pub(crate) manifest_ctx: ManifestContextRef,
/// SST file purger.
pub(crate) file_purger: FilePurgerRef,
/// Wal options of this region.
pub(crate) wal_options: WalOptions,
/// Last flush time in millis.
last_flush_millis: AtomicI64,
/// Whether the region is writable.
writable: AtomicBool,
/// Provider to get current time.
time_provider: TimeProviderRef,
/// Memtable builder for the region.
@@ -94,15 +111,18 @@ pub(crate) type MitoRegionRef = Arc<MitoRegion>;
impl MitoRegion {
/// Stop background managers for this region.
pub(crate) async fn stop(&self) -> Result<()> {
self.manifest_manager.write().await.stop().await?;
pub(crate) async fn stop(&self) {
self.manifest_ctx
.manifest_manager
.write()
.await
.stop()
.await;
info!(
"Stopped region manifest manager, region_id: {}",
self.region_id
);
Ok(())
}
/// Returns current metadata of the region.
@@ -128,19 +148,73 @@ impl MitoRegion {
self.last_flush_millis.store(now, Ordering::Relaxed);
}
/// Returns whether the region is writable.
pub(crate) fn is_writable(&self) -> bool {
self.writable.load(Ordering::Relaxed)
}
/// Returns the region dir.
pub(crate) fn region_dir(&self) -> &str {
self.access_layer.region_dir()
}
/// Sets the writable flag.
/// Returns whether the region is writable.
pub(crate) fn is_writable(&self) -> bool {
self.manifest_ctx.state.load() == RegionState::Writable
}
/// Returns the state of the region.
pub(crate) fn state(&self) -> RegionState {
self.manifest_ctx.state.load()
}
/// Sets the writable state.
pub(crate) fn set_writable(&self, writable: bool) {
self.writable.store(writable, Ordering::Relaxed);
if writable {
// Only sets the region to writable if it is read only.
// This prevents others updating the manifest.
let _ = self
.manifest_ctx
.state
.compare_exchange(RegionState::ReadOnly, RegionState::Writable);
} else {
self.manifest_ctx.state.store(RegionState::ReadOnly);
}
}
/// Sets the altering state.
/// You should call this method in the worker loop.
pub(crate) fn set_altering(&self) -> Result<()> {
self.compare_exchange_state(RegionState::Writable, RegionState::Altering)
}
/// Sets the dropping state.
/// You should call this method in the worker loop.
pub(crate) fn set_dropping(&self) -> Result<()> {
self.compare_exchange_state(RegionState::Writable, RegionState::Dropping)
}
/// Sets the truncating state.
/// You should call this method in the worker loop.
pub(crate) fn set_truncating(&self) -> Result<()> {
self.compare_exchange_state(RegionState::Writable, RegionState::Truncating)
}
/// Sets the editing state.
/// You should call this method in the worker loop.
pub(crate) fn set_editing(&self) -> Result<()> {
self.compare_exchange_state(RegionState::Writable, RegionState::Editing)
}
/// Sets the region to readonly gracefully. This acquires the manifest write lock.
pub(crate) async fn set_readonly_gracefully(&self) {
let _manager = self.manifest_ctx.manifest_manager.write().await;
// We acquires the write lock of the manifest manager to ensure that no one is updating the manifest.
// Then we change the state.
self.set_writable(false);
}
/// Switches the region state to `RegionState::Writable` if the current state is `expect`.
/// Otherwise, logs an error.
pub(crate) fn switch_state_to_writable(&self, expect: RegionState) {
if let Err(e) = self.compare_exchange_state(expect, RegionState::Writable) {
error!(e; "failed to switch region state to writable, expect state is {:?}", expect);
}
}
/// Returns the region usage in bytes.
@@ -155,7 +229,12 @@ impl MitoRegion {
let wal_usage = self.estimated_wal_usage(memtable_usage);
let manifest_usage = self.manifest_manager.read().await.manifest_usage();
let manifest_usage = self
.manifest_ctx
.manifest_manager
.read()
.await
.manifest_usage();
RegionUsage {
region_id,
@@ -171,28 +250,133 @@ impl MitoRegion {
((memtable_usage as f32) * ESTIMATED_WAL_FACTOR) as u64
}
pub(crate) async fn apply_edit(
&self,
edit: RegionEdit,
memtables_to_remove: &[MemtableId],
) -> Result<()> {
info!("Applying {edit:?} to region {}", self.region_id);
self.manifest_manager
.write()
.await
.update(RegionMetaActionList::with_action(RegionMetaAction::Edit(
edit.clone(),
)))
.await?;
// Apply edit to region's version.
self.version_control
.apply_edit(edit, memtables_to_remove, self.file_purger.clone());
/// Sets the state of the region to given state if the current state equals to
/// the expected.
fn compare_exchange_state(&self, expect: RegionState, state: RegionState) -> Result<()> {
self.manifest_ctx
.state
.compare_exchange(expect, state)
.map_err(|actual| {
RegionStateSnafu {
region_id: self.region_id,
state: actual,
expect,
}
.build()
})?;
Ok(())
}
}
/// Context to update the region manifest.
#[derive(Debug)]
pub(crate) struct ManifestContext {
/// Manager to maintain manifest for this region.
manifest_manager: tokio::sync::RwLock<RegionManifestManager>,
/// The state of the region. The region checks the state before updating
/// manifest.
state: AtomicCell<RegionState>,
}
impl ManifestContext {
pub(crate) fn new(manager: RegionManifestManager, state: RegionState) -> Self {
ManifestContext {
manifest_manager: tokio::sync::RwLock::new(manager),
state: AtomicCell::new(state),
}
}
pub(crate) async fn has_update(&self) -> Result<bool> {
self.manifest_manager.read().await.has_update().await
}
/// Updates the manifest if current state is `expect_state` and executes
/// the `applier` if the manifest is updated.
pub(crate) async fn update_manifest(
&self,
expect_state: RegionState,
action_list: RegionMetaActionList,
applier: impl FnOnce(),
) -> Result<()> {
// Acquires the write lock of the manifest manager.
let mut manager = self.manifest_manager.write().await;
// Gets current manifest.
let manifest = manager.manifest();
// Checks state inside the lock. This is to ensure that we won't update the manifest
// after `set_readonly_gracefully()` is called.
let current_state = self.state.load();
ensure!(
current_state == expect_state,
RegionStateSnafu {
region_id: manifest.metadata.region_id,
state: current_state,
expect: expect_state,
}
);
for action in &action_list.actions {
// Checks whether the edit is still applicable.
let RegionMetaAction::Edit(edit) = &action else {
continue;
};
// Checks whether the region is truncated.
let Some(truncated_entry_id) = manifest.truncated_entry_id else {
continue;
};
// This is an edit from flush.
if let Some(flushed_entry_id) = edit.flushed_entry_id {
ensure!(
truncated_entry_id < flushed_entry_id,
RegionTruncatedSnafu {
region_id: manifest.metadata.region_id,
}
);
}
// This is an edit from compaction.
if !edit.files_to_remove.is_empty() {
// Input files of the compaction task has been truncated.
for file in &edit.files_to_remove {
ensure!(
manifest.files.contains_key(&file.file_id),
RegionTruncatedSnafu {
region_id: manifest.metadata.region_id,
}
);
}
}
}
// Now we can update the manifest.
manager.update(action_list).await.inspect_err(
|e| error!(e; "Failed to update manifest, region_id: {}", manifest.metadata.region_id),
)?;
// Executes the applier. We MUST hold the write lock.
applier();
if self.state.load() == RegionState::ReadOnly {
warn!(
"Region {} becomes read-only while updating manifest which may cause inconsistency",
manifest.metadata.region_id
);
}
Ok(())
}
}
#[cfg(test)]
impl ManifestContext {
pub(crate) async fn manifest(&self) -> Arc<crate::manifest::action::RegionManifest> {
self.manifest_manager.read().await.manifest()
}
}
pub(crate) type ManifestContextRef = Arc<ManifestContext>;
/// Regions indexed by ids.
#[derive(Debug, Default)]
pub(crate) struct RegionMap {
@@ -225,7 +409,14 @@ impl RegionMap {
let region = self
.get_region(region_id)
.context(RegionNotFoundSnafu { region_id })?;
ensure!(region.is_writable(), RegionReadonlySnafu { region_id });
ensure!(
region.is_writable(),
RegionStateSnafu {
region_id,
state: region.state(),
expect: RegionState::Writable,
}
);
Ok(region)
}
@@ -265,3 +456,15 @@ impl RegionMap {
}
pub(crate) type RegionMapRef = Arc<RegionMap>;
#[cfg(test)]
mod tests {
use crossbeam_utils::atomic::AtomicCell;
use crate::region::RegionState;
#[test]
fn test_region_state_lock_free() {
assert!(AtomicCell::<RegionState>::is_lock_free());
}
}

View File

@@ -15,7 +15,7 @@
//! Region opener.
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, AtomicI64};
use std::sync::atomic::AtomicI64;
use std::sync::Arc;
use common_telemetry::{debug, error, info, warn};
@@ -27,7 +27,6 @@ use snafu::{ensure, OptionExt};
use store_api::logstore::LogStore;
use store_api::metadata::{ColumnMetadata, RegionMetadata};
use store_api::storage::{ColumnId, RegionId};
use tokio::sync::RwLock;
use crate::access_layer::AccessLayer;
use crate::cache::CacheManagerRef;
@@ -41,7 +40,7 @@ use crate::memtable::time_partition::TimePartitions;
use crate::memtable::MemtableBuilderProvider;
use crate::region::options::RegionOptions;
use crate::region::version::{VersionBuilder, VersionControl, VersionControlRef};
use crate::region::MitoRegion;
use crate::region::{ManifestContext, MitoRegion, RegionState};
use crate::region_write_ctx::RegionWriteCtx;
use crate::request::OptionOutputTx;
use crate::schedule::scheduler::SchedulerRef;
@@ -203,7 +202,11 @@ impl RegionOpener {
region_id,
version_control,
access_layer: access_layer.clone(),
manifest_manager: RwLock::new(manifest_manager),
// Region is writable after it is created.
manifest_ctx: Arc::new(ManifestContext::new(
manifest_manager,
RegionState::Writable,
)),
file_purger: Arc::new(LocalFilePurger::new(
self.purge_scheduler,
access_layer,
@@ -211,8 +214,6 @@ impl RegionOpener {
)),
wal_options,
last_flush_millis: AtomicI64::new(time_provider.current_time_millis()),
// Region is writable after it is created.
writable: AtomicBool::new(true),
time_provider,
memtable_builder,
})
@@ -331,12 +332,14 @@ impl RegionOpener {
region_id: self.region_id,
version_control,
access_layer,
manifest_manager: RwLock::new(manifest_manager),
// Region is always opened in read only mode.
manifest_ctx: Arc::new(ManifestContext::new(
manifest_manager,
RegionState::ReadOnly,
)),
file_purger,
wal_options,
last_flush_millis: AtomicI64::new(time_provider.current_time_millis()),
// Region is always opened in read only mode.
writable: AtomicBool::new(false),
time_provider,
memtable_builder,
};

View File

@@ -16,18 +16,17 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::time::Instant;
use api::helper::{
is_column_type_value_eq, is_semantic_type_eq, proto_value_type, to_proto_value,
ColumnDataTypeWrapper,
};
use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, SemanticType, Value};
use common_telemetry::{info, warn};
use common_telemetry::info;
use datatypes::prelude::DataType;
use prometheus::HistogramTimer;
use prost::Message;
use smallvec::SmallVec;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::{ColumnMetadata, RegionMetadata};
use store_api::region_engine::SetReadonlyResponse;
@@ -44,10 +43,7 @@ use crate::error::{
FlushRegionSnafu, InvalidRequestSnafu, Result,
};
use crate::manifest::action::RegionEdit;
use crate::memtable::MemtableId;
use crate::metrics::COMPACTION_ELAPSED_TOTAL;
use crate::sst::file::FileMeta;
use crate::sst::file_purger::{FilePurgerRef, PurgeRequest};
use crate::wal::EntryId;
/// Request to write a region.
@@ -620,6 +616,8 @@ pub(crate) enum BackgroundNotify {
CompactionFinished(CompactionFinished),
/// Compaction has failed.
CompactionFailed(CompactionFailed),
/// Truncate result.
Truncate(TruncateResult),
}
/// Notifies a flush job is finished.
@@ -627,18 +625,10 @@ pub(crate) enum BackgroundNotify {
pub(crate) struct FlushFinished {
/// Region id.
pub(crate) region_id: RegionId,
/// Meta of the flushed SSTs.
pub(crate) file_metas: Vec<FileMeta>,
/// Entry id of flushed data.
pub(crate) flushed_entry_id: EntryId,
/// Sequence of flushed data.
pub(crate) flushed_sequence: SequenceNumber,
/// Id of memtables to remove.
pub(crate) memtables_to_remove: SmallVec<[MemtableId; 2]>,
/// Flush result senders.
pub(crate) senders: Vec<OutputTx>,
/// File purger for cleaning files on failure.
pub(crate) file_purger: FilePurgerRef,
/// Flush timer.
pub(crate) _timer: HistogramTimer,
}
@@ -660,12 +650,6 @@ impl OnFailure for FlushFinished {
region_id: self.region_id,
}));
}
// Clean flushed files.
for file in &self.file_metas {
self.file_purger.send_request(PurgeRequest {
file_meta: file.clone(),
});
}
}
}
@@ -681,16 +665,8 @@ pub(crate) struct FlushFailed {
pub(crate) struct CompactionFinished {
/// Region id.
pub(crate) region_id: RegionId,
/// Compaction output files that are to be added to region version.
pub(crate) compaction_outputs: Vec<FileMeta>,
/// Compacted files that are to be removed from region version.
pub(crate) compacted_files: Vec<FileMeta>,
/// Compaction result senders.
pub(crate) senders: Vec<OutputTx>,
/// File purger for cleaning files on failure.
pub(crate) file_purger: FilePurgerRef,
/// Inferred Compaction time window.
pub(crate) compaction_time_window: Option<Duration>,
/// Start time of compaction task.
pub(crate) start_time: Instant,
}
@@ -708,8 +684,7 @@ impl CompactionFinished {
}
impl OnFailure for CompactionFinished {
/// Compaction succeeded but failed to update manifest or region's already been dropped,
/// clean compaction output files.
/// Compaction succeeded but failed to update manifest or region's already been dropped.
fn on_failure(&mut self, err: Error) {
let err = Arc::new(err);
for sender in self.senders.drain(..) {
@@ -717,15 +692,6 @@ impl OnFailure for CompactionFinished {
region_id: self.region_id,
}));
}
for file in &self.compaction_outputs {
warn!(
"Cleaning region {} compaction output file: {}",
self.region_id, file.file_id
);
self.file_purger.send_request(PurgeRequest {
file_meta: file.clone(),
});
}
}
}
@@ -737,6 +703,21 @@ pub(crate) struct CompactionFailed {
pub(crate) err: Arc<Error>,
}
/// Notifies the truncate result of a region.
#[derive(Debug)]
pub(crate) struct TruncateResult {
/// Region id.
pub(crate) region_id: RegionId,
/// Result sender.
pub(crate) sender: OptionOutputTx,
/// Truncate result.
pub(crate) result: Result<()>,
/// Truncated entry id.
pub(crate) truncated_entry_id: EntryId,
/// Truncated sequence.
pub(crate) truncated_sequence: SequenceNumber,
}
#[cfg(test)]
mod tests {
use api::v1::value::ValueData;

View File

@@ -16,19 +16,25 @@
use std::sync::Arc;
use common_datasource::compression::CompressionType;
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use object_store::services::Fs;
use object_store::util::join_dir;
use object_store::ObjectStore;
use store_api::metadata::RegionMetadataRef;
use tokio::sync::mpsc::Sender;
use crate::access_layer::{AccessLayer, AccessLayerRef};
use crate::cache::CacheManager;
use crate::compaction::CompactionScheduler;
use crate::config::MitoConfig;
use crate::flush::FlushScheduler;
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
use crate::region::{ManifestContext, ManifestContextRef, RegionState};
use crate::request::WorkerRequest;
use crate::schedule::scheduler::{LocalScheduler, SchedulerRef};
use crate::sst::index::intermediate::IntermediateManager;
use crate::worker::WorkerListener;
/// Scheduler mocker.
pub(crate) struct SchedulerEnv {
@@ -73,7 +79,13 @@ impl SchedulerEnv {
) -> CompactionScheduler {
let scheduler = self.get_scheduler();
CompactionScheduler::new(scheduler, request_sender, Arc::new(CacheManager::default()))
CompactionScheduler::new(
scheduler,
request_sender,
Arc::new(CacheManager::default()),
Arc::new(MitoConfig::default()),
WorkerListener::default(),
)
}
/// Creates a new flush scheduler.
@@ -83,6 +95,27 @@ impl SchedulerEnv {
FlushScheduler::new(scheduler)
}
/// Creates a new manifest context.
pub(crate) async fn mock_manifest_context(
&self,
metadata: RegionMetadataRef,
) -> ManifestContextRef {
Arc::new(ManifestContext::new(
RegionManifestManager::new(
metadata,
RegionManifestOptions {
manifest_dir: "".to_string(),
object_store: self.access_layer.object_store().clone(),
compress_type: CompressionType::Uncompressed,
checkpoint_distance: 10,
},
)
.await
.unwrap(),
RegionState::Writable,
))
}
fn get_scheduler(&self) -> SchedulerRef {
self.scheduler
.clone()

View File

@@ -21,6 +21,7 @@ mod handle_compaction;
mod handle_create;
mod handle_drop;
mod handle_flush;
mod handle_manifest;
mod handle_open;
mod handle_truncate;
mod handle_write;
@@ -45,9 +46,8 @@ use crate::cache::write_cache::{WriteCache, WriteCacheRef};
use crate::cache::{CacheManager, CacheManagerRef};
use crate::compaction::CompactionScheduler;
use crate::config::MitoConfig;
use crate::error::{InvalidRequestSnafu, JoinSnafu, Result, WorkerStoppedSnafu};
use crate::error::{JoinSnafu, Result, WorkerStoppedSnafu};
use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef};
use crate::manifest::action::RegionEdit;
use crate::memtable::MemtableBuilderProvider;
use crate::region::{MitoRegionRef, RegionMap, RegionMapRef};
use crate::request::{
@@ -367,7 +367,7 @@ impl<S: LogStore> WorkerStarter<S> {
running: running.clone(),
memtable_builder_provider: MemtableBuilderProvider::new(
Some(self.write_buffer_manager.clone()),
self.config,
self.config.clone(),
),
purge_scheduler: self.purge_scheduler.clone(),
write_buffer_manager: self.write_buffer_manager,
@@ -376,6 +376,8 @@ impl<S: LogStore> WorkerStarter<S> {
self.scheduler,
sender.clone(),
self.cache_manager.clone(),
self.config,
self.listener.clone(),
),
stalled_requests: StalledRequests::default(),
listener: self.listener,
@@ -622,10 +624,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
edit,
tx,
} => {
let result = self.edit_region(region_id, edit).await;
if let Err(Err(e)) = tx.send(result) {
warn!("Failed to send edit region error to caller, error: {e:?}");
}
self.handle_region_edit(region_id, edit, tx).await;
}
// We receive a stop signal, but we still want to process remaining
// requests. The worker thread will then check the running flag and
@@ -669,7 +668,11 @@ impl<S: LogStore> RegionWorkerLoop<S> {
self.handle_compaction_request(ddl.region_id, ddl.sender);
continue;
}
DdlRequest::Truncate(_) => self.handle_truncate_request(ddl.region_id).await,
DdlRequest::Truncate(_) => {
self.handle_truncate_request(ddl.region_id, ddl.sender)
.await;
continue;
}
DdlRequest::Catchup(req) => self.handle_catchup_request(ddl.region_id, req).await,
};
@@ -706,6 +709,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
self.handle_compaction_finished(region_id, req).await
}
BackgroundNotify::CompactionFailed(req) => self.handle_compaction_failure(req).await,
BackgroundNotify::Truncate(req) => self.handle_truncate_result(req).await,
}
}
@@ -716,35 +720,17 @@ impl<S: LogStore> RegionWorkerLoop<S> {
sender: oneshot::Sender<SetReadonlyResponse>,
) {
if let Some(region) = self.regions.get_region(region_id) {
region.set_writable(false);
// We need to do this in background as we need the manifest lock.
common_runtime::spawn_bg(async move {
region.set_readonly_gracefully().await;
let last_entry_id = region.version_control.current().last_entry_id;
let _ = sender.send(SetReadonlyResponse::success(Some(last_entry_id)));
let last_entry_id = region.version_control.current().last_entry_id;
let _ = sender.send(SetReadonlyResponse::success(Some(last_entry_id)));
});
} else {
let _ = sender.send(SetReadonlyResponse::NotFound);
}
}
async fn edit_region(&self, region_id: RegionId, edit: RegionEdit) -> Result<()> {
let region = self.regions.writable_region(region_id)?;
for file_meta in &edit.files_to_add {
let is_exist = region.access_layer.is_exist(file_meta).await?;
ensure!(
is_exist,
InvalidRequestSnafu {
region_id,
reason: format!(
"trying to add a not exist file '{}' when editing region",
file_meta.file_id
)
}
);
}
// Applying region edit directly has nothing to do with memtables (at least for now).
region.apply_edit(edit, &[]).await
}
}
impl<S> RegionWorkerLoop<S> {
@@ -753,9 +739,7 @@ impl<S> RegionWorkerLoop<S> {
// Closes remaining regions.
let regions = self.regions.list_regions();
for region in regions {
if let Err(e) = region.stop().await {
error!(e; "Failed to stop region {}", region.region_id);
}
region.stop().await;
}
self.regions.clear();
@@ -825,10 +809,10 @@ impl WorkerListener {
let _ = removed;
}
pub(crate) async fn on_handle_compaction_finished(&self, region_id: RegionId) {
pub(crate) async fn on_merge_ssts_finished(&self, region_id: RegionId) {
#[cfg(any(test, feature = "test"))]
if let Some(listener) = &self.listener {
listener.on_handle_compaction_finished(region_id).await;
listener.on_merge_ssts_finished(region_id).await;
}
// Avoid compiler warning.
let _ = region_id;

View File

@@ -16,7 +16,7 @@
use std::sync::Arc;
use common_telemetry::{debug, error, info};
use common_telemetry::{debug, info};
use snafu::ResultExt;
use store_api::metadata::{RegionMetadata, RegionMetadataBuilder, RegionMetadataRef};
use store_api::region_request::RegionAlterRequest;
@@ -26,9 +26,7 @@ use crate::error::{
InvalidMetadataSnafu, InvalidRegionRequestSchemaVersionSnafu, InvalidRegionRequestSnafu, Result,
};
use crate::flush::FlushReason;
use crate::manifest::action::{RegionChange, RegionMetaAction, RegionMetaActionList};
use crate::region::version::Version;
use crate::region::MitoRegionRef;
use crate::manifest::action::RegionChange;
use crate::request::{DdlRequest, OptionOutputTx, SenderDdlRequest};
use crate::worker::RegionWorkerLoop;
@@ -107,51 +105,28 @@ impl<S> RegionWorkerLoop<S> {
return;
}
// Now we can alter the region directly.
if let Err(e) = alter_region_schema(&region, &version, request).await {
error!(e; "Failed to alter region schema, region_id: {}", region_id);
sender.send(Err(e));
return;
}
info!(
"Schema of region {} is altered from {} to {}",
"Try to alter region {} from version {} to {}",
region_id,
version.metadata.schema_version,
region.metadata().schema_version
);
// Notifies waiters.
sender.send(Ok(0));
let new_meta = match metadata_after_alteration(&version.metadata, request) {
Ok(new_meta) => new_meta,
Err(e) => {
sender.send(Err(e));
return;
}
};
// Persist the metadata to region's manifest.
let change = RegionChange {
metadata: new_meta.clone(),
};
self.handle_manifest_region_change(region, change, sender)
}
}
/// Alter the schema of the region.
async fn alter_region_schema(
region: &MitoRegionRef,
version: &Version,
request: RegionAlterRequest,
) -> Result<()> {
let new_meta = metadata_after_alteration(&version.metadata, request)?;
// Persist the metadata to region's manifest.
let change = RegionChange {
metadata: new_meta.clone(),
};
let action_list = RegionMetaActionList::with_action(RegionMetaAction::Change(change));
region
.manifest_manager
.write()
.await
.update(action_list)
.await?;
// Apply the metadata to region's version.
region
.version_control
.alter_schema(new_meta, &region.memtable_builder);
Ok(())
}
/// Creates a metadata after applying the alter `request` to the old `metadata`.
///
/// Returns an error if the `request` is invalid.

View File

@@ -45,31 +45,30 @@ impl<S: LogStore> RegionWorkerLoop<S> {
let is_mutable_empty = region.version().memtables.mutable.is_empty();
// Utilizes the short circuit evaluation.
let region =
if !is_mutable_empty || region.manifest_manager.read().await.has_update().await? {
info!("Reopening the region: {region_id}, empty mutable: {is_mutable_empty}");
let reopened_region = Arc::new(
RegionOpener::new(
region_id,
region.region_dir(),
self.memtable_builder_provider.clone(),
self.object_store_manager.clone(),
self.purge_scheduler.clone(),
self.intermediate_manager.clone(),
)
.cache(Some(self.cache_manager.clone()))
.options(region.version().options.clone())
.skip_wal_replay(true)
.open(&self.config, &self.wal)
.await?,
);
debug_assert!(!reopened_region.is_writable());
self.regions.insert_region(reopened_region.clone());
let region = if !is_mutable_empty || region.manifest_ctx.has_update().await? {
info!("Reopening the region: {region_id}, empty mutable: {is_mutable_empty}");
let reopened_region = Arc::new(
RegionOpener::new(
region_id,
region.region_dir(),
self.memtable_builder_provider.clone(),
self.object_store_manager.clone(),
self.purge_scheduler.clone(),
self.intermediate_manager.clone(),
)
.cache(Some(self.cache_manager.clone()))
.options(region.version().options.clone())
.skip_wal_replay(true)
.open(&self.config, &self.wal)
.await?,
);
debug_assert!(!reopened_region.is_writable());
self.regions.insert_region(reopened_region.clone());
reopened_region
} else {
region
};
reopened_region
} else {
region
};
let flushed_entry_id = region.version_control.current().last_entry_id;
info!("Trying to replay memtable for region: {region_id}, flushed entry id: {flushed_entry_id}");

View File

@@ -33,7 +33,7 @@ impl<S> RegionWorkerLoop<S> {
info!("Try to close region {}", region_id);
region.stop().await?;
region.stop().await;
self.regions.remove_region(region_id);
// Clean flush status.
self.flush_scheduler.on_region_closed(region_id);

View File

@@ -16,9 +16,8 @@ use common_telemetry::{error, info, warn};
use store_api::logstore::LogStore;
use store_api::storage::RegionId;
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
use crate::metrics::{COMPACTION_REQUEST_COUNT, COMPACTION_STAGE_ELAPSED};
use crate::request::{CompactionFailed, CompactionFinished, OnFailure, OptionOutputTx};
use crate::metrics::COMPACTION_REQUEST_COUNT;
use crate::request::{CompactionFailed, CompactionFinished, OptionOutputTx};
use crate::worker::RegionWorkerLoop;
impl<S: LogStore> RegionWorkerLoop<S> {
@@ -38,7 +37,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
&region.access_layer,
&region.file_purger,
sender,
self.config.clone(),
&region.manifest_ctx,
) {
error!(e; "Failed to schedule compaction task for region: {}", region_id);
} else {
@@ -55,8 +54,6 @@ impl<S: LogStore> RegionWorkerLoop<S> {
region_id: RegionId,
mut request: CompactionFinished,
) {
self.listener.on_handle_compaction_finished(region_id).await;
let Some(region) = self.regions.writable_region_or(region_id, &mut request) else {
warn!(
"Unable to finish the compaction task for a read only region {}",
@@ -65,44 +62,12 @@ impl<S: LogStore> RegionWorkerLoop<S> {
return;
};
{
let manifest_timer = COMPACTION_STAGE_ELAPSED
.with_label_values(&["write_manifest"])
.start_timer();
// Write region edit to manifest.
let edit = RegionEdit {
files_to_add: std::mem::take(&mut request.compaction_outputs),
files_to_remove: std::mem::take(&mut request.compacted_files),
compaction_time_window: request.compaction_time_window,
flushed_entry_id: None,
flushed_sequence: None,
};
let action_list =
RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
if let Err(e) = region
.manifest_manager
.write()
.await
.update(action_list)
.await
{
error!(e; "Failed to update manifest, region: {}", region_id);
manifest_timer.stop_and_discard();
request.on_failure(e);
return;
}
// Apply edit to region's version.
region
.version_control
.apply_edit(edit, &[], region.file_purger.clone());
}
// compaction finished.
request.on_success();
// Schedule next compaction if necessary.
self.compaction_scheduler
.on_compaction_finished(region_id, self.config.clone());
.on_compaction_finished(region_id, &region.manifest_ctx);
}
/// When compaction fails, we simply log the error.

View File

@@ -16,7 +16,7 @@
use std::time::Duration;
use common_telemetry::{info, warn};
use common_telemetry::{error, info, warn};
use futures::TryStreamExt;
use object_store::util::join_path;
use object_store::{EntryMode, ObjectStore};
@@ -27,7 +27,7 @@ use tokio::time::sleep;
use crate::error::{OpenDalSnafu, Result};
use crate::metrics::REGION_COUNT;
use crate::region::RegionMapRef;
use crate::region::{RegionMapRef, RegionState};
use crate::worker::{RegionWorkerLoop, DROPPING_MARKER_FILE};
const GC_TASK_INTERVAL_SEC: u64 = 5 * 60; // 5 minutes
@@ -42,17 +42,27 @@ impl<S> RegionWorkerLoop<S> {
info!("Try to drop region: {}", region_id);
// write dropping marker
// Marks the region as dropping.
region.set_dropping()?;
// Writes dropping marker
// We rarely drop a region so we still operate in the worker loop.
let marker_path = join_path(region.access_layer.region_dir(), DROPPING_MARKER_FILE);
region
.access_layer
.object_store()
.write(&marker_path, vec![])
.await
.context(OpenDalSnafu)?;
.context(OpenDalSnafu)
.inspect_err(|e| {
error!(e; "Failed to write the drop marker file for region {}", region_id);
region.stop().await?;
// remove this region from region map to prevent other requests from accessing this region
// Sets the state back to writable. It's possible that the marker file has been written.
// We sets the state back to writable so we can retry the drop operation.
region.switch_state_to_writable(RegionState::Dropping);
})?;
region.stop().await;
// Removes this region from region map to prevent other requests from accessing this region
self.regions.remove_region(region_id);
self.dropping_regions.insert_region(region.clone());
// Notifies flush scheduler.
@@ -60,7 +70,7 @@ impl<S> RegionWorkerLoop<S> {
// Notifies compaction scheduler.
self.compaction_scheduler.on_region_dropped(region_id);
// mark region version as dropped
// Marks region version as dropped
region
.version_control
.mark_dropped(&region.memtable_builder);
@@ -71,7 +81,7 @@ impl<S> RegionWorkerLoop<S> {
REGION_COUNT.dec();
// detach a background task to delete the region dir
// Detaches a background task to delete the region dir
let region_dir = region.access_layer.region_dir().to_owned();
let object_store = region.access_layer.object_store().clone();
let dropping_regions = self.dropping_regions.clone();

View File

@@ -22,9 +22,8 @@ use store_api::region_request::RegionFlushRequest;
use store_api::storage::RegionId;
use crate::config::MitoConfig;
use crate::error::{RegionTruncatedSnafu, Result};
use crate::error::Result;
use crate::flush::{FlushReason, RegionFlushTask};
use crate::manifest::action::RegionEdit;
use crate::region::MitoRegionRef;
use crate::request::{FlushFailed, FlushFinished, OnFailure, OptionOutputTx};
use crate::worker::RegionWorkerLoop;
@@ -178,6 +177,7 @@ impl<S> RegionWorkerLoop<S> {
engine_config,
row_group_size,
cache_manager: self.cache_manager.clone(),
manifest_ctx: region.manifest_ctx.clone(),
index_options: region.version().options.index_options.clone(),
}
}
@@ -198,29 +198,6 @@ impl<S: LogStore> RegionWorkerLoop<S> {
return;
};
// The flush task before truncating the region fails immediately.
let version_data = region.version_control.current();
if let Some(truncated_entry_id) = version_data.version.truncated_entry_id {
if truncated_entry_id >= request.flushed_entry_id {
request.on_failure(RegionTruncatedSnafu { region_id }.build());
return;
}
}
// Write region edit to manifest.
let edit = RegionEdit {
files_to_add: std::mem::take(&mut request.file_metas),
files_to_remove: Vec::new(),
compaction_time_window: None,
flushed_entry_id: Some(request.flushed_entry_id),
flushed_sequence: Some(request.flushed_sequence),
};
if let Err(e) = region.apply_edit(edit, &request.memtables_to_remove).await {
error!(e; "Failed to write manifest, region: {}", region_id);
request.on_failure(e);
return;
}
region.update_flush_millis();
// Delete wal.
@@ -263,7 +240,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
&region.access_layer,
&region.file_purger,
OptionOutputTx::none(),
self.config.clone(),
&region.manifest_ctx,
) {
warn!(
"Failed to schedule compaction after flush, region: {}, err: {}",

View File

@@ -0,0 +1,200 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Handles manifest.
//!
//! It updates the manifest and applies the changes to the region in background.
use common_telemetry::{info, warn};
use snafu::ensure;
use store_api::storage::RegionId;
use tokio::sync::oneshot::Sender;
use crate::error::{InvalidRequestSnafu, Result};
use crate::manifest::action::{
RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList, RegionTruncate,
};
use crate::region::{MitoRegionRef, RegionState};
use crate::request::{BackgroundNotify, OptionOutputTx, TruncateResult, WorkerRequest};
use crate::worker::RegionWorkerLoop;
impl<S> RegionWorkerLoop<S> {
/// Handles region edit request.
pub(crate) async fn handle_region_edit(
&self,
region_id: RegionId,
edit: RegionEdit,
sender: Sender<Result<()>>,
) {
let region = match self.regions.writable_region(region_id) {
Ok(region) => region,
Err(e) => {
let _ = sender.send(Err(e));
return;
}
};
// Marks the region as editing.
if let Err(e) = region.set_editing() {
let _ = sender.send(Err(e));
return;
}
// Now the region is in editing state.
// Updates manifest in background.
common_runtime::spawn_bg(async move {
let result = edit_region(&region, edit).await;
if let Err(res) = sender.send(result) {
warn!(
"Failed to send result back to the worker, region_id: {}, res: {:?}",
region_id, res
);
}
// Sets the region as writable. For simplicity, we don't send the result
// back to the worker.
region.switch_state_to_writable(RegionState::Editing);
});
}
/// Writes truncate action to the manifest and then applies it to the region in background.
pub(crate) fn handle_manifest_truncate_action(
&self,
region: MitoRegionRef,
truncate: RegionTruncate,
sender: OptionOutputTx,
) {
// Marks the region as truncating.
// This prevents the region from being accessed by other write requests.
if let Err(e) = region.set_truncating() {
sender.send(Err(e));
return;
}
// Now the region is in truncating state.
let request_sender = self.sender.clone();
let manifest_ctx = region.manifest_ctx.clone();
let version_control = region.version_control.clone();
let memtable_builder = region.memtable_builder.clone();
// Updates manifest in background.
common_runtime::spawn_bg(async move {
// Write region truncated to manifest.
let action_list =
RegionMetaActionList::with_action(RegionMetaAction::Truncate(truncate.clone()));
let result = manifest_ctx
.update_manifest(RegionState::Truncating, action_list, || {
// Applies the truncate action to the region.
version_control.truncate(
truncate.truncated_entry_id,
truncate.truncated_sequence,
&memtable_builder,
);
})
.await;
// Sends the result back to the request sender.
let truncate_result = TruncateResult {
region_id: truncate.region_id,
sender,
result,
truncated_entry_id: truncate.truncated_entry_id,
truncated_sequence: truncate.truncated_sequence,
};
let _ = request_sender
.send(WorkerRequest::Background {
region_id: truncate.region_id,
notify: BackgroundNotify::Truncate(truncate_result),
})
.await
.inspect_err(|_| warn!("failed to send truncate result"));
});
}
/// Writes region change action to the manifest and then applies it to the region in background.
pub(crate) fn handle_manifest_region_change(
&self,
region: MitoRegionRef,
change: RegionChange,
sender: OptionOutputTx,
) {
// Marks the region as altering.
if let Err(e) = region.set_altering() {
sender.send(Err(e));
return;
}
// Now the region is in altering state.
common_runtime::spawn_bg(async move {
let new_meta = change.metadata.clone();
let action_list = RegionMetaActionList::with_action(RegionMetaAction::Change(change));
let result = region
.manifest_ctx
.update_manifest(RegionState::Altering, action_list, || {
// Apply the metadata to region's version.
region
.version_control
.alter_schema(new_meta, &region.memtable_builder);
})
.await;
// Sets the region as writable.
region.switch_state_to_writable(RegionState::Altering);
if result.is_ok() {
info!(
"Region {} is altered, schema version is {}",
region.region_id,
region.metadata().schema_version
);
}
sender.send(result.map(|_| 0));
});
}
}
/// Checks the edit, writes and applies it.
async fn edit_region(region: &MitoRegionRef, edit: RegionEdit) -> Result<()> {
let region_id = region.region_id;
for file_meta in &edit.files_to_add {
let is_exist = region.access_layer.is_exist(file_meta).await?;
ensure!(
is_exist,
InvalidRequestSnafu {
region_id,
reason: format!(
"trying to add a not exist file '{}' when editing region",
file_meta.file_id
)
}
);
}
info!("Applying {edit:?} to region {}", region_id);
let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
region
.manifest_ctx
.update_manifest(RegionState::Editing, action_list, || {
// Applies the edit to the region.
region
.version_control
.apply_edit(edit, &[], region.file_purger.clone());
})
.await
}

View File

@@ -16,19 +16,23 @@
use common_telemetry::info;
use store_api::logstore::LogStore;
use store_api::region_request::AffectedRows;
use store_api::storage::RegionId;
use crate::error::Result;
use crate::manifest::action::{RegionMetaAction, RegionMetaActionList, RegionTruncate};
use crate::error::RegionNotFoundSnafu;
use crate::manifest::action::RegionTruncate;
use crate::region::RegionState;
use crate::request::{OptionOutputTx, TruncateResult};
use crate::worker::RegionWorkerLoop;
impl<S: LogStore> RegionWorkerLoop<S> {
pub(crate) async fn handle_truncate_request(
&mut self,
region_id: RegionId,
) -> Result<AffectedRows> {
let region = self.regions.writable_region(region_id)?;
mut sender: OptionOutputTx,
) {
let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else {
return;
};
info!("Try to truncate region {}", region_id);
@@ -42,36 +46,55 @@ impl<S: LogStore> RegionWorkerLoop<S> {
truncated_entry_id,
truncated_sequence,
};
let action_list =
RegionMetaActionList::with_action(RegionMetaAction::Truncate(truncate.clone()));
region
.manifest_manager
.write()
.await
.update(action_list)
.await?;
self.handle_manifest_truncate_action(region, truncate, sender);
}
/// Handles truncate result.
pub(crate) async fn handle_truncate_result(&mut self, truncate_result: TruncateResult) {
let region_id = truncate_result.region_id;
let Some(region) = self.regions.get_region(region_id) else {
truncate_result.sender.send(
RegionNotFoundSnafu {
region_id: truncate_result.region_id,
}
.fail(),
);
return;
};
// We are already in the worker loop so we can set the state first.
region.switch_state_to_writable(RegionState::Truncating);
if let Err(e) = truncate_result.result {
// Unable to truncate the region.
truncate_result.sender.send(Err(e));
return;
}
// Notifies flush scheduler.
self.flush_scheduler.on_region_truncated(region_id);
// Notifies compaction scheduler.
self.compaction_scheduler.on_region_truncated(region_id);
// Reset region's version and mark all SSTs deleted.
region.version_control.truncate(
truncated_entry_id,
truncated_sequence,
&region.memtable_builder,
);
// Make all data obsolete.
self.wal
.obsolete(region_id, truncated_entry_id, &region.wal_options)
.await?;
if let Err(e) = self
.wal
.obsolete(
region_id,
truncate_result.truncated_entry_id,
&region.wal_options,
)
.await
{
truncate_result.sender.send(Err(e));
return;
}
info!(
"Complete truncating region: {}, entry id: {} and sequence: {}.",
region_id, truncated_entry_id, truncated_sequence
region_id, truncate_result.truncated_entry_id, truncate_result.truncated_sequence
);
Ok(0)
truncate_result.sender.send(Ok(0));
}
}

View File

@@ -42,7 +42,7 @@ common-runtime.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
console = "0.15"
crossbeam-utils = "0.8.14"
crossbeam-utils.workspace = true
datafusion = { workspace = true, optional = true }
datafusion-common = { workspace = true, optional = true }
datafusion-expr = { workspace = true, optional = true }