feat(mito): Flush region (#2291)

* chore: call handle_flush_request

* feat: alias SchedulerRef and clean scheduler on drop

* feat: add scheduler to workers

* feat: remove RegionMemtableStats

* feat: pick regions to flush

* feat: add more fields to region flush task

* feat: smallvec workspace dep

* feat: Use list to hold immutable memtables

* feat: flush job wip

* feat: use access layer to read write sst

* feat: flush memtables to l0

* feat: write manifest

* feat: schedule next flush on success

* feat: schedule flush on success and failure

* feat: add purger to region

* feat: apply edit after flush

* feat: collect stats for SSTs

* feat: manual flush

* test: test flush and fix manifest test

* feat: remove flush scheduler job limit

* fix: typo

* style: clippy

* feat: clean flushed files on failure

* chore: address CR comment

* refactor: Use put_rows

* feat: Clean flush scheduler on drop

* feat: remove region flush status on drop and close

* chore: address CR comment
This commit is contained in:
Yingwen
2023-09-02 16:55:31 +08:00
committed by Ruihang Xia
parent fa5e3b94d3
commit 648b2ae293
25 changed files with 842 additions and 261 deletions

1
Cargo.lock generated
View File

@@ -5467,6 +5467,7 @@ dependencies = [
"regex",
"serde",
"serde_json",
"smallvec",
"snafu",
"storage",
"store-api",

View File

@@ -91,6 +91,7 @@ rand = "0.8"
regex = "1.8"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
smallvec = "1"
snafu = { version = "0.7", features = ["backtraces"] }
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "296a4f6c73b129d6f565a42a2e5e53c6bc2b9da4", features = [
"visitor",

View File

@@ -16,7 +16,7 @@ humantime-serde.workspace = true
object-store = { workspace = true }
serde.workspace = true
serde_json = "1.0"
smallvec = "1"
smallvec.workspace = true
snafu.workspace = true
tokio.workspace = true
uuid.workspace = true

View File

@@ -45,7 +45,8 @@ paste.workspace = true
prost.workspace = true
regex = "1.5"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_json.workspace = true
smallvec.workspace = true
snafu.workspace = true
storage = { workspace = true }
store-api = { workspace = true }

View File

@@ -14,6 +14,8 @@
//! Configurations.
use std::time::Duration;
use common_base::readable_size::ReadableSize;
use common_datasource::compression::CompressionType;
use common_telemetry::warn;
@@ -44,8 +46,16 @@ pub struct MitoConfig {
pub manifest_compress_type: CompressionType,
// Background job configs:
/// Max number of running background jobs.
/// Max number of running background jobs (default 4).
pub max_background_jobs: usize,
// Flush configs:
/// Interval to auto flush a region if it has not flushed yet (default 30 min).
pub auto_flush_interval: Duration,
/// Global write buffer size threshold to trigger flush (default 512M).
pub global_write_buffer_size: ReadableSize,
/// Global write buffer size threshold to reject write requests (default 2G).
pub global_write_buffer_reject_size: ReadableSize,
}
impl Default for MitoConfig {
@@ -57,6 +67,9 @@ impl Default for MitoConfig {
manifest_checkpoint_distance: 10,
manifest_compress_type: CompressionType::Uncompressed,
max_background_jobs: DEFAULT_MAX_BG_JOB,
auto_flush_interval: Duration::from_secs(30 * 60),
global_write_buffer_size: ReadableSize::mb(512),
global_write_buffer_reject_size: ReadableSize::gb(2),
}
}
}

View File

@@ -22,7 +22,8 @@ use api::v1::{ColumnSchema, Row, Rows, SemanticType};
use common_recordbatch::RecordBatches;
use store_api::metadata::ColumnMetadata;
use store_api::region_request::{
RegionCreateRequest, RegionDeleteRequest, RegionOpenRequest, RegionPutRequest,
RegionCreateRequest, RegionDeleteRequest, RegionFlushRequest, RegionOpenRequest,
RegionPutRequest,
};
use store_api::storage::RegionId;
@@ -408,3 +409,52 @@ async fn test_put_overwrite() {
+-------+---------+---------------------+";
assert_eq!(expected, batches.pretty_print().unwrap());
}
#[tokio::test]
async fn test_manual_flush() {
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
let column_schemas = request
.column_metadatas
.iter()
.map(column_metadata_to_column_schema)
.collect::<Vec<_>>();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
let rows = Rows {
schema: column_schemas,
rows: build_rows(0, 3),
};
put_rows(&engine, region_id, rows).await;
let Output::AffectedRows(rows) = engine
.handle_request(region_id, RegionRequest::Flush(RegionFlushRequest {}))
.await
.unwrap()
else {
unreachable!()
};
assert_eq!(0, rows);
let request = ScanRequest::default();
let scanner = engine.handle_query(region_id, request).unwrap();
assert_eq!(1, scanner.num_files());
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 0 | 0.0 | 1970-01-01T00:00:00 |
| 1 | 1.0 | 1970-01-01T00:00:01 |
| 2 | 2.0 | 1970-01-01T00:00:02 |
+-------+---------+---------------------+";
assert_eq!(expected, batches.pretty_print().unwrap());
}

View File

@@ -395,6 +395,30 @@ pub enum Error {
source: object_store::Error,
location: Location,
},
#[snafu(display(
"Failed to flush region {}, location: {}, source: {}",
region_id,
location,
source
))]
FlushRegion {
region_id: RegionId,
source: Arc<Error>,
location: Location,
},
#[snafu(display("Region {} is dropped, location: {}", region_id, location))]
RegionDropped {
region_id: RegionId,
location: Location,
},
#[snafu(display("Region {} is closed, location: {}", region_id, location))]
RegionClosed {
region_id: RegionId,
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -457,6 +481,9 @@ impl ErrorExt for Error {
StopScheduler { .. } => StatusCode::Internal,
BuildPredicate { source, .. } => source.status_code(),
DeleteSst { .. } => StatusCode::StorageUnavailable,
FlushRegion { source, .. } => source.status_code(),
RegionDropped { .. } => StatusCode::Cancelled,
RegionClosed { .. } => StatusCode::Cancelled,
}
}

View File

@@ -17,13 +17,25 @@
use std::collections::HashMap;
use std::sync::Arc;
use store_api::storage::RegionId;
use tokio::sync::oneshot::Sender;
use common_query::Output;
use common_telemetry::{error, info};
use snafu::ResultExt;
use store_api::storage::{RegionId, ScanRequest};
use tokio::sync::{mpsc, oneshot};
use crate::error::Result;
use crate::access_layer::AccessLayerRef;
use crate::error::{Error, FlushRegionSnafu, RegionClosedSnafu, RegionDroppedSnafu, Result};
use crate::memtable::MemtableBuilderRef;
use crate::read::Source;
use crate::region::version::{VersionControlData, VersionRef};
use crate::region::MitoRegionRef;
use crate::request::{SenderDdlRequest, SenderWriteRequest};
use crate::schedule::scheduler::SchedulerRef;
use crate::request::{
BackgroundNotify, FlushFailed, FlushFinished, SenderDdlRequest, WorkerRequest,
};
use crate::schedule::scheduler::{Job, SchedulerRef};
use crate::sst::file::{FileId, FileMeta};
use crate::sst::file_purger::FilePurgerRef;
use crate::sst::parquet::WriteOptions;
/// Global write buffer (memtable) manager.
///
@@ -32,9 +44,6 @@ pub trait WriteBufferManager: Send + Sync + std::fmt::Debug {
/// Returns whether to trigger the engine.
fn should_flush_engine(&self) -> bool;
/// Returns whether the mutable memtable of this region needs to flush.
fn should_flush_region(&self, stats: RegionMemtableStats) -> bool;
/// Reserves `mem` bytes.
fn reserve_mem(&self, mem: usize);
@@ -53,15 +62,6 @@ pub trait WriteBufferManager: Send + Sync + std::fmt::Debug {
pub type WriteBufferManagerRef = Arc<dyn WriteBufferManager>;
/// Statistics of a region's memtable.
#[derive(Debug)]
pub struct RegionMemtableStats {
/// Size of the mutable memtable.
pub bytes_mutable: usize,
/// Write buffer size of the region.
pub write_buffer_size: usize,
}
// TODO(yingwen): Implements the manager.
#[derive(Debug)]
pub struct WriteBufferManagerImpl {}
@@ -71,10 +71,6 @@ impl WriteBufferManager for WriteBufferManagerImpl {
false
}
fn should_flush_region(&self, _stats: RegionMemtableStats) -> bool {
false
}
fn reserve_mem(&self, _mem: usize) {}
fn schedule_free_mem(&self, _mem: usize) {}
@@ -90,11 +86,11 @@ impl WriteBufferManager for WriteBufferManagerImpl {
pub enum FlushReason {
/// Other reasons.
Others,
/// Memtable is full.
MemtableFull,
/// Engine reaches flush threshold.
EngineFull,
// TODO(yingwen): Alter, manually.
/// Manual flush.
Manual,
// TODO(yingwen): Alter.
}
/// Task to flush a region.
@@ -103,17 +99,141 @@ pub(crate) struct RegionFlushTask {
pub(crate) region_id: RegionId,
/// Reason to flush.
pub(crate) reason: FlushReason,
/// Flush result sender.
pub(crate) sender: Option<Sender<Result<()>>>,
/// Flush result senders.
pub(crate) senders: Vec<oneshot::Sender<Result<Output>>>,
/// Request sender to notify the worker.
pub(crate) request_sender: mpsc::Sender<WorkerRequest>,
pub(crate) access_layer: AccessLayerRef,
pub(crate) memtable_builder: MemtableBuilderRef,
pub(crate) file_purger: FilePurgerRef,
}
impl RegionFlushTask {
/// Consumes the task and notify the sender the job is success.
fn on_success(self) {
if let Some(sender) = self.sender {
let _ = sender.send(Ok(()));
for sender in self.senders {
let _ = sender.send(Ok(Output::AffectedRows(0)));
}
}
/// Send flush error to waiter.
fn on_failure(&mut self, err: Arc<Error>) {
for sender in self.senders.drain(..) {
// Ignore send result.
let _ = sender.send(Err(err.clone()).context(FlushRegionSnafu {
region_id: self.region_id,
}));
}
}
/// Converts the flush task into a background job.
fn into_flush_job(mut self, region: &MitoRegionRef) -> Job {
// Get a version of this region before creating a job so we
// always have a consistent memtable list.
let version_data = region.version_control.current();
Box::pin(async move {
self.do_flush(version_data).await;
})
}
/// Runs the flush task.
async fn do_flush(&mut self, version_data: VersionControlData) {
let worker_request = match self.flush_memtables(&version_data.version).await {
Ok(file_metas) => {
let memtables_to_remove = version_data
.version
.memtables
.immutables()
.iter()
.map(|m| m.id())
.collect();
let flush_finished = FlushFinished {
region_id: self.region_id,
file_metas,
// The last entry has been flushed.
flushed_entry_id: version_data.last_entry_id,
memtables_to_remove,
senders: std::mem::take(&mut self.senders),
file_purger: self.file_purger.clone(),
};
WorkerRequest::Background {
region_id: self.region_id,
notify: BackgroundNotify::FlushFinished(flush_finished),
}
}
Err(e) => {
error!(e; "Failed to flush region {}", self.region_id);
let err = Arc::new(e);
self.on_failure(err.clone());
WorkerRequest::Background {
region_id: self.region_id,
notify: BackgroundNotify::FlushFailed(FlushFailed { err }),
}
}
};
self.send_worker_request(worker_request).await;
}
/// Flushes memtables to level 0 SSTs.
async fn flush_memtables(&self, version: &VersionRef) -> Result<Vec<FileMeta>> {
// TODO(yingwen): Make it configurable.
let write_opts = WriteOptions::default();
let memtables = version.memtables.immutables();
let mut file_metas = Vec::with_capacity(memtables.len());
for mem in memtables {
if mem.is_empty() {
// Skip empty memtables.
continue;
}
let file_id = FileId::random();
let iter = mem.iter(ScanRequest::default());
let source = Source::Iter(iter);
let mut writer = self
.access_layer
.write_sst(file_id, version.metadata.clone(), source);
let Some(sst_info) = writer.write_all(&write_opts).await? else {
// No data written.
continue;
};
file_metas.push(FileMeta {
region_id: version.metadata.region_id,
file_id,
time_range: sst_info.time_range,
level: 0,
file_size: sst_info.file_size,
});
}
let file_ids: Vec<_> = file_metas.iter().map(|f| f.file_id).collect();
info!(
"Successfully flush memtables, region: {}, files: {:?}",
version.metadata.region_id, file_ids
);
Ok(file_metas)
}
/// Notify flush job status.
async fn send_worker_request(&self, request: WorkerRequest) {
if let Err(e) = self.request_sender.send(request).await {
error!(
"Failed to notify flush job status for region {}, request: {:?}",
self.region_id, e.0
);
}
}
/// Merge two flush tasks.
fn merge(&mut self, mut other: RegionFlushTask) {
assert_eq!(self.region_id, other.region_id);
// Now we only merge senders. They share the same flush reason.
self.senders.append(&mut other.senders);
}
}
/// Manages background flushes of a worker.
@@ -133,25 +253,25 @@ impl FlushScheduler {
}
}
/// Returns true if the region is stalling.
pub(crate) fn is_stalling(&self, region_id: RegionId) -> bool {
if let Some(status) = self.region_status.get(&region_id) {
return status.stalling;
}
false
/// Returns true if the region already requested flush.
pub(crate) fn is_flush_requested(&self, region_id: RegionId) -> bool {
self.region_status.contains_key(&region_id)
}
/// Schedules a flush `task` for specific `region`.
pub(crate) fn schedule_flush(&mut self, region: &MitoRegionRef, task: RegionFlushTask) {
pub(crate) fn schedule_flush(
&mut self,
region: &MitoRegionRef,
task: RegionFlushTask,
) -> Result<()> {
debug_assert_eq!(region.region_id, task.region_id);
let version = region.version_control.current().version;
if version.memtables.mutable.is_empty() && version.memtables.immutable.is_none() {
if version.memtables.mutable.is_empty() && version.memtables.immutables().is_empty() {
debug_assert!(!self.region_status.contains_key(&region.region_id));
// The region has nothing to flush.
task.on_success();
return;
return Ok(());
}
// Add this region to status map.
@@ -160,64 +280,157 @@ impl FlushScheduler {
.entry(region.region_id)
.or_insert_with(|| FlushStatus::new(region.clone()));
// Checks whether we can flush the region now.
if flush_status.flushing_task.is_some() {
if flush_status.flushing {
// There is already a flush job running.
flush_status.stalling = true;
return;
flush_status.push_task(task);
return Ok(());
}
todo!()
// If there are pending tasks, then we should push it to pending list.
if flush_status.pending_task.is_some() {
flush_status.push_task(task);
return Ok(());
}
// Now we can flush the region directly.
region
.version_control
.freeze_mutable(&task.memtable_builder);
// Submit a flush job.
let job = task.into_flush_job(region);
if let Err(e) = self.scheduler.schedule(job) {
// If scheduler returns error, senders in the job will be dropped and waiters
// can get recv errors.
error!(e; "Failed to schedule flush job for region {}", region.region_id);
// Remove from region status if we can't submit the task.
self.region_status.remove(&region.region_id);
return Err(e);
}
flush_status.flushing = true;
Ok(())
}
/// Add write `request` to pending queue.
/// Notifies the scheduler that the flush job is finished.
///
/// Returns error if region is not stalling.
pub(crate) fn add_write_request_to_pending(
/// Returns all pending requests if the region doesn't need to flush again.
pub(crate) fn on_flush_success(
&mut self,
request: SenderWriteRequest,
) -> Result<(), SenderWriteRequest> {
if let Some(status) = self.region_status.get_mut(&request.request.region_id) {
if status.stalling {
status.pending_writes.push(request);
return Ok(());
}
region_id: RegionId,
) -> Option<Vec<SenderDdlRequest>> {
let Some(flush_status) = self.region_status.get_mut(&region_id) else {
return None;
};
// This region doesn't have running flush job.
flush_status.flushing = false;
let pending_ddls = if flush_status.pending_task.is_none() {
// The region doesn't have any pending flush task.
// Safety: The flush status exists.
let flush_status = self.region_status.remove(&region_id).unwrap();
Some(flush_status.pending_ddls)
} else {
None
};
// Schedule next flush job.
if let Err(e) = self.schedule_next_flush() {
error!(e; "Flush of region {} is successful, but failed to schedule next flush", region_id);
}
Err(request)
pending_ddls
}
/// Notifies the scheduler that the flush job is finished.
pub(crate) fn on_flush_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
error!(err; "Region {} failed to flush, cancel all pending tasks", region_id);
// Remove this region.
let Some(flush_status) = self.region_status.remove(&region_id) else {
return;
};
// Fast fail: cancels all pending tasks and sends error to their waiters.
flush_status.on_failure(err);
// Still tries to schedule a new flush.
if let Err(e) = self.schedule_next_flush() {
error!(e; "Failed to schedule next flush after region {} flush is failed", region_id);
}
}
/// Notifies the scheduler that the region is dropped.
pub(crate) fn on_region_dropped(&mut self, region_id: RegionId) {
// Remove this region.
let Some(flush_status) = self.region_status.remove(&region_id) else {
return;
};
// Notifies all pending tasks.
flush_status.on_failure(Arc::new(RegionDroppedSnafu { region_id }.build()));
}
/// Notifies the scheduler that the region is closed.
pub(crate) fn on_region_closed(&mut self, region_id: RegionId) {
// Remove this region.
let Some(flush_status) = self.region_status.remove(&region_id) else {
return;
};
// Notifies all pending tasks.
flush_status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build()));
}
/// Add ddl request to pending queue.
///
/// Returns error if region is not stalling.
/// Returns error if region doesn't request flush.
pub(crate) fn add_ddl_request_to_pending(
&mut self,
request: SenderDdlRequest,
) -> Result<(), SenderDdlRequest> {
if let Some(status) = self.region_status.get_mut(&request.region_id) {
if status.stalling {
status.pending_ddls.push(request);
return Ok(());
}
status.pending_ddls.push(request);
return Ok(());
}
Err(request)
}
/// Schedules a new flush task when the scheduler can submit next task.
pub(crate) fn schedule_next_flush(&mut self) -> Result<()> {
debug_assert!(self
.region_status
.values()
.all(|status| !status.flushing && status.pending_task.is_some()));
// Get the first region from status map.
let Some(flush_status) = self
.region_status
.values_mut()
.find(|status| status.pending_task.is_some())
else {
return Ok(());
};
debug_assert!(!flush_status.flushing);
let task = flush_status.pending_task.take().unwrap();
let region = flush_status.region.clone();
self.schedule_flush(&region, task)
}
}
/// Flush status of a region scheduled by the [FlushScheduler].
///
/// Tracks running and pending flusht tasks and all pending requests of a region.
/// Tracks running and pending flush tasks and all pending requests of a region.
struct FlushStatus {
/// Current region.
region: MitoRegionRef,
/// Current running flush task.
flushing_task: Option<RegionFlushTask>,
/// The number of flush requests waiting in queue.
num_queueing: usize,
/// The region is stalling.
stalling: bool,
/// Pending write requests.
pending_writes: Vec<SenderWriteRequest>,
/// There is a flush task running.
flushing: bool,
/// Task waiting for next flush.
pending_task: Option<RegionFlushTask>,
/// Pending ddl requests.
pending_ddls: Vec<SenderDdlRequest>,
}
@@ -226,11 +439,30 @@ impl FlushStatus {
fn new(region: MitoRegionRef) -> FlushStatus {
FlushStatus {
region,
flushing_task: None,
num_queueing: 0,
stalling: false,
pending_writes: Vec::new(),
flushing: false,
pending_task: None,
pending_ddls: Vec::new(),
}
}
fn push_task(&mut self, task: RegionFlushTask) {
if let Some(pending) = &mut self.pending_task {
pending.merge(task);
} else {
self.pending_task = Some(task);
}
}
fn on_failure(self, err: Arc<Error>) {
if let Some(mut task) = self.pending_task {
task.on_failure(err.clone());
}
for ddl in self.pending_ddls {
if let Some(sender) = ddl.sender {
let _ = sender.send(Err(err.clone()).context(FlushRegionSnafu {
region_id: self.region.region_id,
}));
}
}
}
}

View File

@@ -21,10 +21,11 @@ use snafu::{OptionExt, ResultExt};
use store_api::manifest::action::{ProtocolAction, ProtocolVersion};
use store_api::manifest::ManifestVersion;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::{RegionId, SequenceNumber};
use store_api::storage::RegionId;
use crate::error::{RegionMetadataNotFoundSnafu, Result, SerdeJsonSnafu, Utf8Snafu};
use crate::sst::file::{FileId, FileMeta};
use crate::wal::EntryId;
/// Actions that can be applied to region manifest.
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
@@ -50,7 +51,7 @@ pub struct RegionEdit {
pub files_to_add: Vec<FileMeta>,
pub files_to_remove: Vec<FileMeta>,
pub compaction_time_window: Option<i64>,
pub flushed_sequence: Option<SequenceNumber>,
pub flushed_entry_id: Option<EntryId>,
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
@@ -65,6 +66,8 @@ pub struct RegionManifest {
pub metadata: RegionMetadataRef,
/// SST files.
pub files: HashMap<FileId, FileMeta>,
/// Last WAL entry id of flushed data.
pub flushed_entry_id: EntryId,
/// Current manifest version.
pub manifest_version: ManifestVersion,
}
@@ -73,6 +76,7 @@ pub struct RegionManifest {
pub struct RegionManifestBuilder {
metadata: Option<RegionMetadataRef>,
files: HashMap<FileId, FileMeta>,
flushed_entry_id: EntryId,
manifest_version: ManifestVersion,
}
@@ -83,6 +87,7 @@ impl RegionManifestBuilder {
Self {
metadata: Some(s.metadata),
files: s.files,
flushed_entry_id: s.flushed_entry_id,
manifest_version: s.manifest_version,
}
} else {
@@ -103,6 +108,9 @@ impl RegionManifestBuilder {
for file in edit.files_to_remove {
self.files.remove(&file.file_id);
}
if let Some(flushed_entry_id) = edit.flushed_entry_id {
self.flushed_entry_id = self.flushed_entry_id.max(flushed_entry_id);
}
}
/// Check if the builder keeps a [RegionMetadata](crate::metadata::RegionMetadata).
@@ -115,6 +123,7 @@ impl RegionManifestBuilder {
Ok(RegionManifest {
metadata,
files: self.files,
flushed_entry_id: self.flushed_entry_id,
manifest_version: self.manifest_version,
})
}
@@ -217,10 +226,38 @@ mod tests {
// modification to manifest-related structs is compatible with older manifests.
#[test]
fn test_region_manifest_compatibility() {
let region_edit = r#"{"region_version":0,"flushed_sequence":null,"files_to_add":[{"region_id":4402341478400,"file_name":"4b220a70-2b03-4641-9687-b65d94641208.parquet","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":1}],"files_to_remove":[{"region_id":4402341478400,"file_name":"34b6ebb9-b8a5-4a4b-b744-56f67defad02.parquet","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":0}]}"#;
let region_edit = r#"{
"flushed_entry_id":null,
"compaction_time_window":null,
"files_to_add":[
{"region_id":4402341478400,"file_id":"4b220a70-2b03-4641-9687-b65d94641208","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":1,"file_size":100}
],
"files_to_remove":[
{"region_id":4402341478400,"file_id":"34b6ebb9-b8a5-4a4b-b744-56f67defad02","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":0,"file_size":100}
]
}"#;
let _ = serde_json::from_str::<RegionEdit>(region_edit).unwrap();
let region_change = r#" {"committed_sequence":42,"metadata":{"column_metadatas":[{"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}],"version":9,"primary_key":[1],"region_id":5299989648942}}"#;
let region_edit = r#"{
"flushed_entry_id":10,
"compaction_time_window":null,
"files_to_add":[
{"region_id":4402341478400,"file_id":"4b220a70-2b03-4641-9687-b65d94641208","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":1,"file_size":100}
],
"files_to_remove":[
{"region_id":4402341478400,"file_id":"34b6ebb9-b8a5-4a4b-b744-56f67defad02","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":0,"file_size":100}
]
}"#;
let _ = serde_json::from_str::<RegionEdit>(region_edit).unwrap();
let region_change = r#" {
"metadata":{
"column_metadatas":[
{"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}
],
"primary_key":[1],
"region_id":5299989648942}
}"#;
let _ = serde_json::from_str::<RegionChange>(region_change).unwrap();
let region_remove = r#"{"region_id":42}"#;

View File

@@ -57,7 +57,7 @@ fn nop_action() -> RegionMetaActionList {
files_to_add: vec![],
files_to_remove: vec![],
compaction_time_window: None,
flushed_sequence: None,
flushed_entry_id: None,
})])
}
@@ -149,7 +149,7 @@ async fn manager_with_checkpoint_distance_1() {
.await
.unwrap();
let raw_json = std::str::from_utf8(&raw_bytes).unwrap();
let expected_json = "{\"size\":729,\"version\":9,\"checksum\":null,\"extend_metadata\":{}}";
let expected_json = "{\"size\":750,\"version\":9,\"checksum\":null,\"extend_metadata\":{}}";
assert_eq!(expected_json, raw_json);
// reopen the manager
@@ -175,7 +175,7 @@ async fn checkpoint_with_different_compression_types() {
files_to_add: vec![file_meta],
files_to_remove: vec![],
compaction_time_window: None,
flushed_sequence: None,
flushed_entry_id: None,
})]);
actions.push(action);
}

View File

@@ -16,6 +16,8 @@
use std::sync::Arc;
use smallvec::SmallVec;
use crate::memtable::MemtableRef;
/// A version of current memtables in a region.
@@ -23,8 +25,12 @@ use crate::memtable::MemtableRef;
pub(crate) struct MemtableVersion {
/// Mutable memtable.
pub(crate) mutable: MemtableRef,
/// Immutable memtable.
pub(crate) immutable: Option<MemtableRef>,
/// Immutable memtables.
///
/// We only allow one flush job per region but if a flush job failed, then we
/// might need to store more than one immutable memtable on the next time we
/// flush the region.
immutables: SmallVec<[MemtableRef; 2]>,
}
pub(crate) type MemtableVersionRef = Arc<MemtableVersion>;
@@ -34,38 +40,54 @@ impl MemtableVersion {
pub(crate) fn new(mutable: MemtableRef) -> MemtableVersion {
MemtableVersion {
mutable,
immutable: None,
immutables: SmallVec::new(),
}
}
/// Immutable memtables.
pub(crate) fn immutables(&self) -> &[MemtableRef] {
&self.immutables
}
/// Lists mutable and immutable memtables.
pub(crate) fn list_memtables(&self) -> Vec<MemtableRef> {
if let Some(immutable) = &self.immutable {
vec![self.mutable.clone(), immutable.clone()]
} else {
vec![self.mutable.clone()]
}
let mut mems = Vec::with_capacity(self.immutables.len() + 1);
mems.push(self.mutable.clone());
mems.extend_from_slice(&self.immutables);
mems
}
/// Returns a new [MemtableVersion] which switches the old mutable memtable to immutable
/// memtable.
///
/// Returns `None` if immutable memtable is `Some`.
/// Returns `None` if the mutable memtable is empty.
#[must_use]
pub(crate) fn freeze_mutable(&self, mutable: MemtableRef) -> Option<MemtableVersion> {
debug_assert!(self.mutable.is_empty());
if self.immutable.is_some() {
// There is already an immutable memtable.
debug_assert!(mutable.is_empty());
if self.mutable.is_empty() {
// No need to freeze the mutable memtable.
return None;
}
// Marks the mutable memtable as immutable so it can free the memory usage from our
// soft limit.
self.mutable.mark_immutable();
// Pushes the mutable memtable to immutable list.
let immutables = self
.immutables
.iter()
.cloned()
.chain([self.mutable.clone()])
.collect();
Some(MemtableVersion {
mutable,
immutable: Some(self.mutable.clone()),
immutables,
})
}
/// Returns the memory usage of the mutable memtable.
pub(crate) fn mutable_bytes_usage(&self) -> usize {
// TODO(yingwen): Get memtable usage.
0
}
}

View File

@@ -541,21 +541,6 @@ impl BatchBuilder {
}
}
/// Collected [Source] statistics.
#[derive(Debug, Clone)]
pub struct SourceStats {
/// Number of rows fetched.
pub num_rows: usize,
/// Min timestamp from fetched batches.
///
/// If no rows fetched, the value of the timestamp is i64::MIN.
pub min_timestamp: Timestamp,
/// Max timestamp from fetched batches.
///
/// If no rows fetched, the value of the timestamp is i64::MAX.
pub max_timestamp: Timestamp,
}
/// Async [Batch] reader and iterator wrapper.
///
/// This is the data source for SST writers or internal readers.
@@ -574,12 +559,6 @@ impl Source {
Source::Iter(iter) => iter.next().transpose(),
}
}
// TODO(yingwen): Remove this method once we support collecting stats in the writer.
/// Returns statisics of fetched batches.
pub(crate) fn stats(&self) -> SourceStats {
unimplemented!()
}
}
/// Async batch reader.

View File

@@ -44,6 +44,16 @@ impl Scanner {
}
}
#[cfg(test)]
impl Scanner {
/// Returns number of files to scan.
pub(crate) fn num_files(&self) -> usize {
match self {
Scanner::Seq(seq_scan) => seq_scan.num_files(),
}
}
}
#[cfg_attr(doc, aquamarine::aquamarine)]
/// Helper to scans a region by [ScanRequest].
///

View File

@@ -137,3 +137,11 @@ impl SeqScan {
Ok(stream)
}
}
#[cfg(test)]
impl SeqScan {
/// Returns number of SST files to scan.
pub(crate) fn num_files(&self) -> usize {
self.files.len()
}
}

View File

@@ -18,10 +18,11 @@ pub(crate) mod opener;
pub(crate) mod version;
use std::collections::HashMap;
use std::sync::atomic::AtomicI64;
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::{Arc, RwLock};
use common_telemetry::info;
use common_time::util::current_time_millis;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::RegionId;
@@ -62,11 +63,14 @@ pub(crate) struct MitoRegion {
pub(crate) type MitoRegionRef = Arc<MitoRegion>;
impl MitoRegion {
/// Stop background tasks for this region.
/// Stop background managers for this region.
pub(crate) async fn stop(&self) -> Result<()> {
self.manifest_manager.stop().await?;
info!("Stopped region, region_id: {}", self.region_id);
info!(
"Stopped region manifest manager, region_id: {}",
self.region_id
);
Ok(())
}
@@ -82,6 +86,17 @@ impl MitoRegion {
let version_data = self.version_control.current();
version_data.version
}
/// Returns last flush timestamp in millis.
pub(crate) fn last_flush_millis(&self) -> i64 {
self.last_flush_millis.load(Ordering::Relaxed)
}
/// Update flush time to current time.
pub(crate) fn update_flush_millis(&self) {
let now = current_time_millis();
self.last_flush_millis.store(now, Ordering::Relaxed);
}
}
/// Regions indexed by ids.

View File

@@ -28,9 +28,10 @@ use std::sync::{Arc, RwLock};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::SequenceNumber;
use crate::flush::RegionMemtableStats;
use crate::manifest::action::RegionEdit;
use crate::memtable::version::{MemtableVersion, MemtableVersionRef};
use crate::memtable::{MemtableBuilderRef, MemtableId, MemtableRef};
use crate::memtable::{MemtableBuilderRef, MemtableRef};
use crate::sst::file_purger::FilePurgerRef;
use crate::sst::version::{SstVersion, SstVersionRef};
use crate::wal::EntryId;
@@ -68,16 +69,13 @@ impl VersionControl {
data.last_entry_id = entry_id;
}
/// Freezes the mutable memtable and returns the id of the frozen memtable.
///
/// If the mutable memtable is empty or there is already an immutable memtable, returns `None`.
pub(crate) fn freeze_mutable(&self, builder: &MemtableBuilderRef) -> Option<MemtableId> {
/// Freezes the mutable memtable if it is not empty.
pub(crate) fn freeze_mutable(&self, builder: &MemtableBuilderRef) {
let version = self.current().version;
if version.memtables.mutable.is_empty() || version.memtables.immutable.is_some() {
return None;
if version.memtables.mutable.is_empty() {
return;
}
let new_mutable = builder.build(&version.metadata);
let mutable_id = version.memtables.mutable.id();
// Safety: Immutable memtable is None.
let new_memtables = version.memtables.freeze_mutable(new_mutable).unwrap();
// Create a new version with memtable switched.
@@ -89,7 +87,19 @@ impl VersionControl {
let mut version_data = self.data.write().unwrap();
version_data.version = new_version;
Some(mutable_id)
}
/// Apply edit to current version.
pub(crate) fn apply_edit(&self, edit: RegionEdit, purger: FilePurgerRef) {
let version = self.current().version;
let new_version = Arc::new(
VersionBuilder::from_version(version)
.apply_edit(edit, purger)
.build(),
);
let mut version_data = self.data.write().unwrap();
version_data.version = new_version;
}
/// Mark all opened files as deleted and set the delete marker in [VersionControlData]
@@ -136,17 +146,6 @@ pub(crate) struct Version {
pub(crate) type VersionRef = Arc<Version>;
impl Version {
/// Returns statistics of the mutable memtable.
pub(crate) fn mutable_stats(&self) -> RegionMemtableStats {
// TODO(yingwen): Get from memtable.
RegionMemtableStats {
bytes_mutable: 0,
write_buffer_size: 0,
}
}
}
/// Version builder.
pub(crate) struct VersionBuilder {
metadata: RegionMetadataRef,
@@ -182,6 +181,25 @@ impl VersionBuilder {
self
}
/// Apply edit to the builder.
pub(crate) fn apply_edit(
mut self,
edit: RegionEdit,
file_purger: FilePurgerRef,
) -> VersionBuilder {
if let Some(flushed_entry_id) = edit.flushed_entry_id {
self.flushed_entry_id = self.flushed_entry_id.max(flushed_entry_id);
}
if !edit.files_to_add.is_empty() || !edit.files_to_remove.is_empty() {
let mut ssts = (*self.ssts).clone();
ssts.add_files(file_purger, edit.files_to_add.into_iter());
ssts.remove_files(edit.files_to_remove.into_iter());
self.ssts = Arc::new(ssts);
}
self
}
/// Builds a new [Version] from the builder.
pub(crate) fn build(self) -> Version {
Version {

View File

@@ -15,6 +15,7 @@
//! Worker requests.
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use api::helper::{
@@ -25,6 +26,7 @@ use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, SemanticType, Value};
use common_base::readable_size::ReadableSize;
use common_query::Output;
use datatypes::prelude::DataType;
use smallvec::SmallVec;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::{ColumnMetadata, RegionMetadata};
use store_api::region_request::{
@@ -35,8 +37,13 @@ use store_api::storage::{CompactionStrategy, RegionId};
use tokio::sync::oneshot::{self, Receiver, Sender};
use crate::config::DEFAULT_WRITE_BUFFER_SIZE;
use crate::error::{CreateDefaultSnafu, Error, FillDefaultSnafu, InvalidRequestSnafu, Result};
use crate::error::{
CreateDefaultSnafu, Error, FillDefaultSnafu, FlushRegionSnafu, InvalidRequestSnafu, Result,
};
use crate::memtable::MemtableId;
use crate::sst::file::FileMeta;
use crate::sst::file_purger::{FilePurgerRef, PurgeRequest};
use crate::wal::EntryId;
/// Options that affect the entire region.
///
@@ -374,6 +381,7 @@ pub(crate) struct SenderWriteRequest {
}
/// Request sent to a worker
#[derive(Debug)]
pub(crate) enum WorkerRequest {
/// Write to a region.
Write(SenderWriteRequest),
@@ -491,15 +499,51 @@ pub(crate) enum BackgroundNotify {
/// Notifies a flush job is finished.
#[derive(Debug)]
pub(crate) struct FlushFinished {
/// Meta of the flushed SST.
pub(crate) file_meta: FileMeta,
/// Region id.
pub(crate) region_id: RegionId,
/// Meta of the flushed SSTs.
pub(crate) file_metas: Vec<FileMeta>,
/// Entry id of flushed data.
pub(crate) flushed_entry_id: EntryId,
/// Id of memtables to remove.
pub(crate) memtables_to_remove: SmallVec<[MemtableId; 2]>,
/// Flush result senders.
pub(crate) senders: Vec<oneshot::Sender<Result<Output>>>,
/// File purger for cleaning files on failure.
pub(crate) file_purger: FilePurgerRef,
}
impl FlushFinished {
pub(crate) fn on_failure(self, err: Error) {
let err = Arc::new(err);
for sender in self.senders {
// Ignore send result.
let _ = sender.send(Err(err.clone()).context(FlushRegionSnafu {
region_id: self.region_id,
}));
}
// Clean flushed files.
for file in self.file_metas {
self.file_purger.send_request(PurgeRequest {
region_id: file.region_id,
file_id: file.file_id,
});
}
}
pub(crate) fn on_success(self) {
for sender in self.senders {
// Ignore send result.
let _ = sender.send(Ok(Output::AffectedRows(0)));
}
}
}
/// Notifies a flush job is failed.
#[derive(Debug)]
pub(crate) struct FlushFailed {
/// The reason of a failed flush job.
pub(crate) error: Error,
/// The error source of the failure.
pub(crate) err: Arc<Error>,
}
#[cfg(test)]

View File

@@ -24,6 +24,8 @@ use crate::sst::file::FileTimeRange;
/// Key of metadata in parquet SST.
pub const PARQUET_METADATA_KEY: &str = "greptime:metadata";
const DEFAULT_WRITE_BUFFER_SIZE: ReadableSize = ReadableSize::mb(8);
const DEFAULT_ROW_GROUP_SIZE: usize = 100000;
/// Parquet write options.
#[derive(Debug)]
@@ -34,6 +36,15 @@ pub struct WriteOptions {
pub row_group_size: usize,
}
impl Default for WriteOptions {
fn default() -> Self {
WriteOptions {
write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
row_group_size: DEFAULT_ROW_GROUP_SIZE,
}
}
}
/// Parquet SST info returned by the writer.
pub struct SstInfo {
/// Time range of the SST.

View File

@@ -15,6 +15,7 @@
//! Parquet writer.
use common_telemetry::debug;
use common_time::Timestamp;
use object_store::ObjectStore;
use parquet::basic::{Compression, Encoding, ZstdLevel};
use parquet::file::metadata::KeyValue;
@@ -25,7 +26,7 @@ use store_api::metadata::RegionMetadataRef;
use store_api::storage::consts::SEQUENCE_COLUMN_NAME;
use crate::error::{InvalidMetadataSnafu, Result};
use crate::read::Source;
use crate::read::{Batch, Source};
use crate::sst::parquet::format::WriteFormat;
use crate::sst::parquet::{SstInfo, WriteOptions, PARQUET_METADATA_KEY};
use crate::sst::stream_writer::BufferedWriter;
@@ -95,13 +96,13 @@ impl ParquetWriter {
)
.await?;
let mut stats = SourceStats::default();
while let Some(batch) = self.source.next_batch().await? {
stats.update(&batch);
let arrow_batch = write_format.convert_batch(&batch)?;
buffered_writer.write(&arrow_batch).await?;
}
// Get stats from the source.
let stats = self.source.stats();
if stats.num_rows == 0 {
debug!(
@@ -114,7 +115,8 @@ impl ParquetWriter {
}
let (_file_meta, file_size) = buffered_writer.close().await?;
let time_range = (stats.min_timestamp, stats.max_timestamp);
// Safety: num rows > 0 so we must have min/max.
let time_range = stats.time_range.unwrap();
// object_store.write will make sure all bytes are written or an error is raised.
Ok(Some(SstInfo {
@@ -125,4 +127,33 @@ impl ParquetWriter {
}
}
#[derive(Default)]
struct SourceStats {
/// Number of rows fetched.
num_rows: usize,
/// Time range of fetched batches.
time_range: Option<(Timestamp, Timestamp)>,
}
impl SourceStats {
fn update(&mut self, batch: &Batch) {
if batch.is_empty() {
return;
}
self.num_rows += batch.num_rows();
// Safety: batch is not empty.
let (min_in_batch, max_in_batch) = (
batch.first_timestamp().unwrap(),
batch.last_timestamp().unwrap(),
);
if let Some(time_range) = &mut self.time_range {
time_range.0 = time_range.0.min(min_in_batch);
time_range.1 = time_range.1.max(max_in_batch);
} else {
self.time_range = Some((min_in_batch, max_in_batch));
}
}
}
// TODO(yingwen): Port tests.

View File

@@ -17,10 +17,11 @@ use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;
use crate::sst::file::{FileHandle, FileId, Level, MAX_LEVEL};
use crate::sst::file::{FileHandle, FileId, FileMeta, Level, MAX_LEVEL};
use crate::sst::file_purger::FilePurgerRef;
/// A version of all SSTs in a region.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub(crate) struct SstVersion {
/// SST metadata organized by levels.
levels: LevelMetaArray,
@@ -41,6 +42,37 @@ impl SstVersion {
&self.levels
}
/// Add files to the version.
///
/// # Panics
/// Panics if level of [FileMeta] is greater than [MAX_LEVEL].
pub(crate) fn add_files(
&mut self,
file_purger: FilePurgerRef,
files_to_add: impl Iterator<Item = FileMeta>,
) {
for file in files_to_add {
let level = file.level;
let handle = FileHandle::new(file, file_purger.clone());
let file_id = handle.file_id();
let old = self.levels[level as usize].files.insert(file_id, handle);
assert!(old.is_none(), "Adds an existing file: {file_id}");
}
}
/// Remove files from the version.
///
/// # Panics
/// Panics if level of [FileMeta] is greater than [MAX_LEVEL].
pub(crate) fn remove_files(&mut self, files_to_remove: impl Iterator<Item = FileMeta>) {
for file in files_to_remove {
let level = file.level;
if let Some(handle) = self.levels[level as usize].files.remove(&file.file_id) {
handle.mark_deleted();
}
}
}
/// Mark all SSTs in this version as deleted.
pub(crate) fn mark_all_deleted(&self) {
for level_meta in self.levels.iter() {
@@ -56,6 +88,7 @@ impl SstVersion {
type LevelMetaArray = [LevelMeta; MAX_LEVEL as usize];
/// Metadata of files in the same SST level.
#[derive(Clone)]
pub struct LevelMeta {
/// Level number.
pub level: Level,

View File

@@ -193,6 +193,7 @@ impl<S: LogStore> WorkerStarter<S> {
config: self.config,
regions: regions.clone(),
dropping_regions: Arc::new(RegionMap::default()),
sender: sender.clone(),
receiver,
wal: Wal::new(self.log_store),
object_store: self.object_store,
@@ -308,6 +309,8 @@ struct RegionWorkerLoop<S> {
regions: RegionMapRef,
/// Regions that are not yet fully dropped.
dropping_regions: RegionMapRef,
/// Request sender.
sender: Sender<WorkerRequest>,
/// Request receiver.
receiver: Receiver<WorkerRequest>,
/// WAL of the engine.
@@ -404,10 +407,15 @@ impl<S: LogStore> RegionWorkerLoop<S> {
for ddl in ddl_requests {
let res = match ddl.request {
DdlRequest::Create(req) => self.handle_create_request(ddl.region_id, req).await,
DdlRequest::Drop(_) => self.handle_drop_request(ddl.region_id).await,
DdlRequest::Open(req) => self.handle_open_request(ddl.region_id, req).await,
DdlRequest::Close(_) => self.handle_close_request(ddl.region_id).await,
DdlRequest::Drop(_) => self.handle_drop_request(ddl.region_id).await,
DdlRequest::Alter(_) | DdlRequest::Flush(_) | DdlRequest::Compact(_) => todo!(),
DdlRequest::Alter(_) => todo!(),
DdlRequest::Flush(_) => {
self.handle_flush_request(ddl.region_id, ddl.sender).await;
continue;
}
DdlRequest::Compact(_) => todo!(),
};
if let Some(sender) = ddl.sender {
@@ -416,9 +424,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
}
}
}
}
impl<S> RegionWorkerLoop<S> {
/// Handles region background request
async fn handle_background_notify(&mut self, region_id: RegionId, notify: BackgroundNotify) {
match notify {
@@ -428,7 +434,9 @@ impl<S> RegionWorkerLoop<S> {
BackgroundNotify::FlushFailed(req) => self.handle_flush_failed(region_id, req).await,
}
}
}
impl<S> RegionWorkerLoop<S> {
// Clean up the worker.
async fn clean(&self) {
// Closes remaining regions.

View File

@@ -31,8 +31,8 @@ impl<S> RegionWorkerLoop<S> {
region.stop().await?;
self.regions.remove_region(region_id);
// TODO(yingwen): Clean flush status.
// Clean flush status.
self.flush_scheduler.on_region_closed(region_id);
info!("Region {} closed", region_id);

View File

@@ -40,7 +40,6 @@ impl<S> RegionWorkerLoop<S> {
};
info!("Try to drop region: {}", region_id);
region.stop().await?;
// write dropping marker
let marker_path = join_path(region.access_layer.region_dir(), DROPPING_MARKER_FILE);
@@ -49,9 +48,12 @@ impl<S> RegionWorkerLoop<S> {
.await
.context(OpenDalSnafu)?;
region.stop().await?;
// remove this region from region map to prevent other requests from accessing this region
self.regions.remove_region(region_id);
self.dropping_regions.insert_region(region.clone());
// Notifies flush scheduler.
self.flush_scheduler.on_region_dropped(region_id);
// mark region version as dropped
region.version_control.mark_dropped();

View File

@@ -14,81 +14,168 @@
//! Handling flush related requests.
use store_api::region_request::RegionFlushRequest;
use common_query::Output;
use common_telemetry::{error, info};
use common_time::util::current_time_millis;
use store_api::logstore::LogStore;
use store_api::storage::RegionId;
use tokio::sync::oneshot;
use crate::error::{RegionNotFoundSnafu, Result};
use crate::flush::{FlushReason, RegionFlushTask};
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
use crate::region::MitoRegionRef;
use crate::request::{FlushFailed, FlushFinished};
use crate::worker::RegionWorkerLoop;
impl<S> RegionWorkerLoop<S> {
/// Handles manual flush request.
pub(crate) async fn handle_flush(
&mut self,
_region_id: RegionId,
_request: RegionFlushRequest,
) {
// TODO(yingwen): schedule flush.
unimplemented!()
}
impl<S: LogStore> RegionWorkerLoop<S> {
/// On region flush job finished.
pub(crate) async fn handle_flush_finished(
&mut self,
_region_id: RegionId,
_request: FlushFinished,
region_id: RegionId,
mut request: FlushFinished,
) {
// TODO(yingwen):
// 1. check region existence
// 2. write manifest
// 3. update region metadata.
// 4. handle all pending requests.
// 5. remove flushed files if the region is dropped.
unimplemented!()
let Some(region) = self.regions.get_region(region_id) else {
// We may dropped or closed the region.
request.on_failure(RegionNotFoundSnafu { region_id }.build());
return;
};
// Write region edit to manifest.
let edit = RegionEdit {
files_to_add: std::mem::take(&mut request.file_metas),
files_to_remove: Vec::new(),
compaction_time_window: None,
flushed_entry_id: Some(request.flushed_entry_id),
};
let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
if let Err(e) = region.manifest_manager.update(action_list).await {
error!(e; "Failed to write manifest, region: {}", region_id);
request.on_failure(e);
return;
}
// Apply edit to region's version.
region
.version_control
.apply_edit(edit, region.file_purger.clone());
// Delete wal.
info!(
"Region {} flush finished, tries to bump wal to {}",
region_id, request.flushed_entry_id
);
if let Err(e) = self.wal.obsolete(region_id, request.flushed_entry_id).await {
error!(e; "Failed to write wal, region: {}", region_id);
request.on_failure(e);
return;
}
// Handle pending requests of the region.
if let Some(ddl_requests) = self.flush_scheduler.on_flush_success(region_id) {
self.handle_ddl_requests(ddl_requests).await;
}
// Notifies waiters.
request.on_success();
}
}
impl<S> RegionWorkerLoop<S> {
/// Handles manual flush request.
pub(crate) async fn handle_flush_request(
&mut self,
region_id: RegionId,
sender: Option<oneshot::Sender<Result<Output>>>,
) {
let Some(region) = self.regions.get_region(region_id) else {
if let Some(sender) = sender {
let _ = sender.send(RegionNotFoundSnafu { region_id }.fail());
}
return;
};
let mut task = self.new_flush_task(&region, FlushReason::Manual);
if let Some(sender) = sender {
task.senders.push(sender);
}
if let Err(e) = self.flush_scheduler.schedule_flush(&region, task) {
error!(e; "Failed to schedule flush task for region {}", region.region_id);
}
}
/// On region flush job failed.
pub(crate) async fn handle_flush_failed(
&mut self,
_region_id: RegionId,
_request: FlushFailed,
) {
// TODO(yingwen): fail all pending requests.
unimplemented!()
pub(crate) async fn handle_flush_failed(&mut self, region_id: RegionId, request: FlushFailed) {
self.flush_scheduler.on_flush_failed(region_id, request.err);
}
/// Checks whether the engine reaches flush threshold. If so, finds regions in this
/// worker to flush.
pub(crate) fn maybe_flush_worker(&self) {
pub(crate) fn maybe_flush_worker(&mut self) {
if !self.write_buffer_manager.should_flush_engine() {
// No need to flush worker.
return;
}
// If the engine needs flush, each worker will find some regions to flush. We might
// flush more memory than expect but it should be acceptable.
self.find_regions_to_flush();
if let Err(e) = self.flush_regions_on_engine_full() {
error!(e; "Failed to flush worker");
}
}
/// Find some regions to flush to reduce write buffer usage.
pub(crate) fn find_regions_to_flush(&self) {
unimplemented!()
fn flush_regions_on_engine_full(&mut self) -> Result<()> {
let regions = self.regions.list_regions();
let now = current_time_millis();
let min_last_flush_time = now - self.config.auto_flush_interval.as_millis() as i64;
let mut max_mutable_size = 0;
// Region with max mutable memtable size.
let mut max_mem_region = None;
for region in &regions {
if self.flush_scheduler.is_flush_requested(region.region_id) {
// Already flushing.
continue;
}
let version = region.version();
let region_mutable_size = version.memtables.mutable_bytes_usage();
// Tracks region with max mutable memtable size.
if region_mutable_size > max_mutable_size {
max_mem_region = Some(region);
max_mutable_size = region_mutable_size;
}
if region.last_flush_millis() < min_last_flush_time {
// If flush time of this region is earlier than `min_last_flush_time`, we can flush this region.
let task = self.new_flush_task(region, FlushReason::EngineFull);
self.flush_scheduler.schedule_flush(region, task)?;
}
}
// Flush memtable with max mutable memtable.
// TODO(yingwen): Maybe flush more tables to reduce write buffer size.
if let Some(region) = max_mem_region {
if !self.flush_scheduler.is_flush_requested(region.region_id) {
let task = self.new_flush_task(region, FlushReason::EngineFull);
self.flush_scheduler.schedule_flush(region, task)?;
}
}
Ok(())
}
/// Flush a region if it meets flush requirements.
pub(crate) fn flush_region_if_full(&mut self, region: &MitoRegionRef) {
let version_data = region.version_control.current();
if self
.write_buffer_manager
.should_flush_region(version_data.version.mutable_stats())
{
// We need to flush this region.
let task = RegionFlushTask {
region_id: region.region_id,
reason: FlushReason::MemtableFull,
sender: None,
};
self.flush_scheduler.schedule_flush(region, task);
fn new_flush_task(&self, region: &MitoRegionRef, reason: FlushReason) -> RegionFlushTask {
// TODO(yingwen): metrics for flush requested.
RegionFlushTask {
region_id: region.region_id,
reason,
senders: Vec::new(),
request_sender: self.sender.clone(),
access_layer: region.access_layer.clone(),
memtable_builder: self.memtable_builder.clone(),
file_purger: region.file_purger.clone(),
}
}
}

View File

@@ -48,21 +48,21 @@ impl<S: LogStore> RegionWorkerLoop<S> {
// Write to WAL.
let mut wal_writer = self.wal.writer();
for region_ctx in region_ctxs.values_mut().filter_map(|v| v.write_ctx_mut()) {
for region_ctx in region_ctxs.values_mut() {
if let Err(e) = region_ctx.add_wal_entry(&mut wal_writer).map_err(Arc::new) {
region_ctx.set_error(e);
}
}
if let Err(e) = wal_writer.write_to_wal().await.map_err(Arc::new) {
// Failed to write wal.
for mut region_ctx in region_ctxs.into_values().filter_map(|v| v.into_write_ctx()) {
for mut region_ctx in region_ctxs.into_values() {
region_ctx.set_error(e.clone());
}
return;
}
// Write to memtables.
for mut region_ctx in region_ctxs.into_values().filter_map(|v| v.into_write_ctx()) {
for mut region_ctx in region_ctxs.into_values() {
region_ctx.write_memtable();
}
}
@@ -73,7 +73,7 @@ impl<S> RegionWorkerLoop<S> {
fn prepare_region_write_ctx(
&mut self,
write_requests: Vec<SenderWriteRequest>,
) -> HashMap<RegionId, MaybeStalling> {
) -> HashMap<RegionId, RegionWriteCtx> {
// Initialize region write context map.
let mut region_ctxs = HashMap::new();
for mut sender_req in write_requests {
@@ -88,36 +88,13 @@ impl<S> RegionWorkerLoop<S> {
continue;
};
// A new region to write, checks whether we need to flush this region.
self.flush_region_if_full(&region);
// Checks whether the region is stalling.
let maybe_stalling = if self.flush_scheduler.is_stalling(region_id) {
// Region is stalling so there is no write context for it.
MaybeStalling::Stalling
} else {
// Initialize the context.
MaybeStalling::Writable(RegionWriteCtx::new(
region.region_id,
&region.version_control,
))
};
let region_ctx = RegionWriteCtx::new(region.region_id, &region.version_control);
e.insert(maybe_stalling);
e.insert(region_ctx);
}
// Safety: Now we ensure the region exists.
let maybe_stalling = region_ctxs.get_mut(&region_id).unwrap();
// Get stalling status of a region.
let MaybeStalling::Writable(region_ctx) = maybe_stalling else {
// If this region is stalling, we need to add requests to pending queue
// and write to the region later.
// Safety: We have checked the region is stalling.
self.flush_scheduler
.add_write_request_to_pending(sender_req)
.unwrap();
continue;
};
let region_ctx = region_ctxs.get_mut(&region_id).unwrap();
// Checks whether request schema is compatible with region schema.
if let Err(e) =
@@ -147,32 +124,6 @@ impl<S> RegionWorkerLoop<S> {
}
}
/// An entry to store the write context or stalling flag.
enum MaybeStalling {
/// The region is writable.
Writable(RegionWriteCtx),
/// The region is stalling and we should not write to it.
Stalling,
}
impl MaybeStalling {
/// Converts itself to a [RegionWriteCtx] if it is writable.
fn into_write_ctx(self) -> Option<RegionWriteCtx> {
match self {
MaybeStalling::Writable(v) => Some(v),
MaybeStalling::Stalling => None,
}
}
/// Gets a mutable reference of [RegionWriteCtx] if it is writable.
fn write_ctx_mut(&mut self) -> Option<&mut RegionWriteCtx> {
match self {
MaybeStalling::Writable(v) => Some(v),
MaybeStalling::Stalling => None,
}
}
}
/// Send rejected error to all `write_requests`.
fn reject_write_requests(_write_requests: Vec<SenderWriteRequest>) {
unimplemented!()