mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-26 18:00:41 +00:00
feat: able to handle concurrent region edit requests (#4569)
* feat: able to handle concurrent region edit requests * resolve PR comments
This commit is contained in:
@@ -33,6 +33,8 @@ mod create_test;
|
||||
#[cfg(test)]
|
||||
mod drop_test;
|
||||
#[cfg(test)]
|
||||
mod edit_region_test;
|
||||
#[cfg(test)]
|
||||
mod filter_deleted_test;
|
||||
#[cfg(test)]
|
||||
mod flush_test;
|
||||
@@ -88,7 +90,7 @@ use crate::manifest::action::RegionEdit;
|
||||
use crate::metrics::HANDLE_REQUEST_ELAPSED;
|
||||
use crate::read::scan_region::{ScanParallism, ScanRegion, Scanner};
|
||||
use crate::region::RegionUsage;
|
||||
use crate::request::WorkerRequest;
|
||||
use crate::request::{RegionEditRequest, WorkerRequest};
|
||||
use crate::wal::entry_distributor::{
|
||||
build_wal_entry_distributor_and_receivers, DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
|
||||
};
|
||||
@@ -196,11 +198,11 @@ impl MitoEngine {
|
||||
);
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let request = WorkerRequest::EditRegion {
|
||||
let request = WorkerRequest::EditRegion(RegionEditRequest {
|
||||
region_id,
|
||||
edit,
|
||||
tx,
|
||||
};
|
||||
});
|
||||
self.inner
|
||||
.workers
|
||||
.submit_to_worker(region_id, request)
|
||||
|
||||
120
src/mito2/src/engine/edit_region_test.rs
Normal file
120
src/mito2/src/engine/edit_region_test.rs
Normal file
@@ -0,0 +1,120 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use object_store::ObjectStore;
|
||||
use store_api::region_engine::RegionEngine;
|
||||
use store_api::region_request::RegionRequest;
|
||||
use store_api::storage::RegionId;
|
||||
use tokio::sync::Barrier;
|
||||
|
||||
use crate::config::MitoConfig;
|
||||
use crate::engine::MitoEngine;
|
||||
use crate::manifest::action::RegionEdit;
|
||||
use crate::region::MitoRegionRef;
|
||||
use crate::sst::file::{FileId, FileMeta};
|
||||
use crate::test_util::{CreateRequestBuilder, TestEnv};
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
async fn test_edit_region_concurrently() {
|
||||
const EDITS_PER_TASK: usize = 10;
|
||||
let tasks_count = 10;
|
||||
|
||||
// A task that creates SST files and edits the region with them.
|
||||
struct Task {
|
||||
region: MitoRegionRef,
|
||||
ssts: Vec<FileMeta>,
|
||||
}
|
||||
|
||||
impl Task {
|
||||
async fn create_ssts(&mut self, object_store: &ObjectStore) {
|
||||
for _ in 0..EDITS_PER_TASK {
|
||||
let file = FileMeta {
|
||||
region_id: self.region.region_id,
|
||||
file_id: FileId::random(),
|
||||
level: 0,
|
||||
..Default::default()
|
||||
};
|
||||
object_store
|
||||
.write(
|
||||
&format!("{}/{}.parquet", self.region.region_dir(), file.file_id),
|
||||
b"x".as_slice(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
self.ssts.push(file);
|
||||
}
|
||||
}
|
||||
|
||||
async fn edit_region(self, engine: MitoEngine) {
|
||||
for sst in self.ssts {
|
||||
let edit = RegionEdit {
|
||||
files_to_add: vec![sst],
|
||||
files_to_remove: vec![],
|
||||
compaction_time_window: None,
|
||||
flushed_entry_id: None,
|
||||
flushed_sequence: None,
|
||||
};
|
||||
engine
|
||||
.edit_region(self.region.region_id, edit)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut env = TestEnv::new();
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
engine
|
||||
.handle_request(
|
||||
region_id,
|
||||
RegionRequest::Create(CreateRequestBuilder::new().build()),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let region = engine.get_region(region_id).unwrap();
|
||||
|
||||
let mut tasks = Vec::with_capacity(tasks_count);
|
||||
let object_store = env.get_object_store().unwrap();
|
||||
for _ in 0..tasks_count {
|
||||
let mut task = Task {
|
||||
region: region.clone(),
|
||||
ssts: Vec::new(),
|
||||
};
|
||||
task.create_ssts(&object_store).await;
|
||||
tasks.push(task);
|
||||
}
|
||||
|
||||
let mut futures = Vec::with_capacity(tasks_count);
|
||||
let barrier = Arc::new(Barrier::new(tasks_count));
|
||||
for task in tasks {
|
||||
futures.push(tokio::spawn({
|
||||
let barrier = barrier.clone();
|
||||
let engine = engine.clone();
|
||||
async move {
|
||||
barrier.wait().await;
|
||||
task.edit_region(engine).await;
|
||||
}
|
||||
}));
|
||||
}
|
||||
futures::future::join_all(futures).await;
|
||||
|
||||
assert_eq!(
|
||||
region.version().ssts.levels()[0].files.len(),
|
||||
tasks_count * EDITS_PER_TASK
|
||||
);
|
||||
}
|
||||
@@ -842,6 +842,13 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Region {} is busy", region_id))]
|
||||
RegionBusy {
|
||||
region_id: RegionId,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
@@ -973,6 +980,7 @@ impl ErrorExt for Error {
|
||||
| FulltextFinish { source, .. }
|
||||
| ApplyFulltextIndex { source, .. } => source.status_code(),
|
||||
DecodeStats { .. } | StatsNotPresent { .. } => StatusCode::Internal,
|
||||
RegionBusy { .. } => StatusCode::RegionBusy,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -494,11 +494,7 @@ pub(crate) enum WorkerRequest {
|
||||
Stop,
|
||||
|
||||
/// Use [RegionEdit] to edit a region directly.
|
||||
EditRegion {
|
||||
region_id: RegionId,
|
||||
edit: RegionEdit,
|
||||
tx: Sender<Result<()>>,
|
||||
},
|
||||
EditRegion(RegionEditRequest),
|
||||
}
|
||||
|
||||
impl WorkerRequest {
|
||||
@@ -762,6 +758,15 @@ pub(crate) struct RegionChangeResult {
|
||||
pub(crate) result: Result<()>,
|
||||
}
|
||||
|
||||
/// Request to edit a region directly.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct RegionEditRequest {
|
||||
pub(crate) region_id: RegionId,
|
||||
pub(crate) edit: RegionEdit,
|
||||
/// The sender to notify the result to the region engine.
|
||||
pub(crate) tx: Sender<Result<()>>,
|
||||
}
|
||||
|
||||
/// Notifies the regin the result of editing region.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct RegionEditResult {
|
||||
|
||||
@@ -61,6 +61,7 @@ use crate::sst::index::intermediate::IntermediateManager;
|
||||
use crate::sst::index::puffin_manager::PuffinManagerFactory;
|
||||
use crate::time_provider::{StdTimeProvider, TimeProviderRef};
|
||||
use crate::wal::Wal;
|
||||
use crate::worker::handle_manifest::RegionEditQueues;
|
||||
|
||||
/// Identifier for a worker.
|
||||
pub(crate) type WorkerId = u32;
|
||||
@@ -441,6 +442,7 @@ impl<S: LogStore> WorkerStarter<S> {
|
||||
flush_receiver: self.flush_receiver,
|
||||
stalled_count: WRITE_STALL_TOTAL.with_label_values(&[&id_string]),
|
||||
region_count: REGION_COUNT.with_label_values(&[&id_string]),
|
||||
region_edit_queues: RegionEditQueues::default(),
|
||||
};
|
||||
let handle = common_runtime::spawn_global(async move {
|
||||
worker_thread.run().await;
|
||||
@@ -629,6 +631,8 @@ struct RegionWorkerLoop<S> {
|
||||
stalled_count: IntGauge,
|
||||
/// Gauge of regions in the worker.
|
||||
region_count: IntGauge,
|
||||
/// Queues for region edit requests.
|
||||
region_edit_queues: RegionEditQueues,
|
||||
}
|
||||
|
||||
impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
@@ -727,12 +731,8 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
WorkerRequest::SetReadonlyGracefully { region_id, sender } => {
|
||||
self.set_readonly_gracefully(region_id, sender).await;
|
||||
}
|
||||
WorkerRequest::EditRegion {
|
||||
region_id,
|
||||
edit,
|
||||
tx,
|
||||
} => {
|
||||
self.handle_region_edit(region_id, edit, tx).await;
|
||||
WorkerRequest::EditRegion(request) => {
|
||||
self.handle_region_edit(request).await;
|
||||
}
|
||||
// We receive a stop signal, but we still want to process remaining
|
||||
// requests. The worker thread will then check the running flag and
|
||||
@@ -824,7 +824,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
BackgroundNotify::CompactionFailed(req) => self.handle_compaction_failure(req).await,
|
||||
BackgroundNotify::Truncate(req) => self.handle_truncate_result(req).await,
|
||||
BackgroundNotify::RegionChange(req) => self.handle_manifest_region_change_result(req),
|
||||
BackgroundNotify::RegionEdit(req) => self.handle_region_edit_result(req),
|
||||
BackgroundNotify::RegionEdit(req) => self.handle_region_edit_result(req).await,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -16,38 +16,89 @@
|
||||
//!
|
||||
//! It updates the manifest and applies the changes to the region in background.
|
||||
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
|
||||
use common_telemetry::{info, warn};
|
||||
use snafu::ensure;
|
||||
use store_api::storage::RegionId;
|
||||
use tokio::sync::oneshot::Sender;
|
||||
|
||||
use crate::error::{InvalidRequestSnafu, RegionNotFoundSnafu, Result};
|
||||
use crate::error::{InvalidRequestSnafu, RegionBusySnafu, RegionNotFoundSnafu, Result};
|
||||
use crate::manifest::action::{
|
||||
RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList, RegionTruncate,
|
||||
};
|
||||
use crate::region::{MitoRegionRef, RegionState};
|
||||
use crate::request::{
|
||||
BackgroundNotify, OptionOutputTx, RegionChangeResult, RegionEditResult, TruncateResult,
|
||||
WorkerRequest,
|
||||
BackgroundNotify, OptionOutputTx, RegionChangeResult, RegionEditRequest, RegionEditResult,
|
||||
TruncateResult, WorkerRequest,
|
||||
};
|
||||
use crate::worker::RegionWorkerLoop;
|
||||
|
||||
pub(crate) type RegionEditQueues = HashMap<RegionId, RegionEditQueue>;
|
||||
|
||||
/// A queue for temporary store region edit requests, if the region is in the "Editing" state.
|
||||
/// When the current region edit request is completed, the next (if there exists) request in the
|
||||
/// queue will be processed.
|
||||
/// Everything is done in the region worker loop.
|
||||
pub(crate) struct RegionEditQueue {
|
||||
region_id: RegionId,
|
||||
requests: VecDeque<RegionEditRequest>,
|
||||
}
|
||||
|
||||
impl RegionEditQueue {
|
||||
const QUEUE_MAX_LEN: usize = 128;
|
||||
|
||||
fn new(region_id: RegionId) -> Self {
|
||||
Self {
|
||||
region_id,
|
||||
requests: VecDeque::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn enqueue(&mut self, request: RegionEditRequest) {
|
||||
if self.requests.len() > Self::QUEUE_MAX_LEN {
|
||||
let _ = request.tx.send(
|
||||
RegionBusySnafu {
|
||||
region_id: self.region_id,
|
||||
}
|
||||
.fail(),
|
||||
);
|
||||
return;
|
||||
};
|
||||
self.requests.push_back(request);
|
||||
}
|
||||
|
||||
fn dequeue(&mut self) -> Option<RegionEditRequest> {
|
||||
self.requests.pop_front()
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> RegionWorkerLoop<S> {
|
||||
/// Handles region edit request.
|
||||
pub(crate) async fn handle_region_edit(
|
||||
&self,
|
||||
region_id: RegionId,
|
||||
edit: RegionEdit,
|
||||
sender: Sender<Result<()>>,
|
||||
) {
|
||||
let region = match self.regions.writable_region(region_id) {
|
||||
Ok(region) => region,
|
||||
Err(e) => {
|
||||
let _ = sender.send(Err(e));
|
||||
return;
|
||||
}
|
||||
pub(crate) async fn handle_region_edit(&mut self, request: RegionEditRequest) {
|
||||
let region_id = request.region_id;
|
||||
let Some(region) = self.regions.get_region(region_id) else {
|
||||
let _ = request.tx.send(RegionNotFoundSnafu { region_id }.fail());
|
||||
return;
|
||||
};
|
||||
|
||||
if !region.is_writable() {
|
||||
if region.state() == RegionState::Editing {
|
||||
self.region_edit_queues
|
||||
.entry(region_id)
|
||||
.or_insert_with(|| RegionEditQueue::new(region_id))
|
||||
.enqueue(request);
|
||||
} else {
|
||||
let _ = request.tx.send(RegionBusySnafu { region_id }.fail());
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
let RegionEditRequest {
|
||||
region_id: _,
|
||||
edit,
|
||||
tx: sender,
|
||||
} = request;
|
||||
|
||||
// Marks the region as editing.
|
||||
if let Err(e) = region.set_editing() {
|
||||
let _ = sender.send(Err(e));
|
||||
@@ -79,7 +130,7 @@ impl<S> RegionWorkerLoop<S> {
|
||||
}
|
||||
|
||||
/// Handles region edit result.
|
||||
pub(crate) fn handle_region_edit_result(&self, edit_result: RegionEditResult) {
|
||||
pub(crate) async fn handle_region_edit_result(&mut self, edit_result: RegionEditResult) {
|
||||
let region = match self.regions.get_region(edit_result.region_id) {
|
||||
Some(region) => region,
|
||||
None => {
|
||||
@@ -104,6 +155,12 @@ impl<S> RegionWorkerLoop<S> {
|
||||
region.switch_state_to_writable(RegionState::Editing);
|
||||
|
||||
let _ = edit_result.sender.send(edit_result.result);
|
||||
|
||||
if let Some(edit_queue) = self.region_edit_queues.get_mut(&edit_result.region_id) {
|
||||
if let Some(request) = edit_queue.dequeue() {
|
||||
self.handle_region_edit(request).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Writes truncate action to the manifest and then applies it to the region in background.
|
||||
|
||||
Reference in New Issue
Block a user