feat(mito): Add WriteCache struct and write SSTs to write cache (#2999)

* docs: remove todo

* feat: add upload cache

* feat: add cache to sst write path

* feat: add storage to part

* feat: add dir to part

* feat: revert storage name

* feat: flush use upload part writer

* feat: use upload part writer in compaction task

* refactor: upload part writer builds parquet writer

* chore: suppress warnings

* refactor: rename UploadCache to WriteCache

* refactor: move source to write_all()

* chore: typos

* chore: remove output mod

* feat: changes upload to async method

* docs: update cache

* chore: fix compiler errors

* docs: remove comment

* chore: simplify upload part

* refactor: remove option from cache manager param to access layer

* feat: remove cache home from file cache

* feat: write cache holds file cache

* feat: add recover and pub some methods

* feat: remove usages of UploadPartWriter

* refactor: move sst_file_path to sst mod

* refactor: use write cache in access layer

* refactor: remove upload

* style: fix clippy

* refactor: pub write cache method/structs
This commit is contained in:
Yingwen
2024-01-04 18:53:43 +08:00
committed by GitHub
parent f1a4750576
commit 96b6235f25
15 changed files with 309 additions and 192 deletions

View File

@@ -18,18 +18,22 @@ use object_store::ObjectStore;
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use crate::cache::write_cache::SstUploadRequest;
use crate::cache::CacheManagerRef;
use crate::error::{DeleteSstSnafu, Result};
use crate::read::Source;
use crate::sst::file::{FileHandle, FileId};
use crate::sst::location;
use crate::sst::parquet::reader::ParquetReaderBuilder;
use crate::sst::parquet::writer::ParquetWriter;
use crate::sst::parquet::{SstInfo, WriteOptions};
pub type AccessLayerRef = Arc<AccessLayer>;
/// A layer to access SST files under the same directory.
pub struct AccessLayer {
region_dir: String,
/// Target object store.
object_store: ObjectStore,
}
@@ -74,15 +78,44 @@ impl AccessLayer {
ParquetReaderBuilder::new(self.region_dir.clone(), file, self.object_store.clone())
}
/// Returns a new parquet writer to write the SST for specific `file_id`.
// TODO(hl): maybe rename to [sst_writer].
pub(crate) fn write_sst(
/// Writes a SST with specific `file_id` and `metadata` to the layer.
///
/// Returns the info of the SST. If no data written, returns None.
pub(crate) async fn write_sst(
&self,
file_id: FileId,
metadata: RegionMetadataRef,
source: Source,
) -> ParquetWriter {
let path = location::sst_file_path(&self.region_dir, file_id);
ParquetWriter::new(path, metadata, source, self.object_store.clone())
request: SstWriteRequest,
write_opts: &WriteOptions,
) -> Result<Option<SstInfo>> {
let path = location::sst_file_path(&self.region_dir, request.file_id);
if let Some(write_cache) = request.cache_manager.write_cache() {
// Write to the write cache.
return write_cache
.write_and_upload_sst(
SstUploadRequest {
file_id: request.file_id,
metadata: request.metadata,
source: request.source,
storage: request.storage,
upload_path: path,
remote_store: self.object_store.clone(),
},
write_opts,
)
.await;
}
// Write cache is disabled.
let mut writer = ParquetWriter::new(path, request.metadata, self.object_store.clone());
writer.write_all(request.source, write_opts).await
}
}
/// Contents to build a SST.
pub(crate) struct SstWriteRequest {
pub(crate) file_id: FileId,
pub(crate) metadata: RegionMetadataRef,
pub(crate) source: Source,
pub(crate) cache_manager: CacheManagerRef,
pub(crate) storage: Option<String>,
}

View File

@@ -20,6 +20,8 @@ mod cache_size;
pub(crate) mod file_cache;
#[cfg(test)]
pub(crate) mod test_util;
#[allow(unused)]
pub(crate) mod write_cache;
use std::mem;
use std::sync::Arc;
@@ -32,6 +34,7 @@ use parquet::file::metadata::ParquetMetaData;
use store_api::storage::RegionId;
use crate::cache::cache_size::parquet_meta_size;
use crate::cache::write_cache::WriteCacheRef;
use crate::metrics::{CACHE_BYTES, CACHE_HIT, CACHE_MISS};
use crate::sst::file::FileId;
@@ -44,6 +47,8 @@ const PAGE_TYPE: &str = "page";
// Metrics type key for files on the local store.
const FILE_TYPE: &str = "file";
// TODO(yingwen): Builder for cache manager.
/// Manages cached data for the engine.
pub struct CacheManager {
/// Cache for SST metadata.
@@ -52,6 +57,10 @@ pub struct CacheManager {
vector_cache: Option<VectorCache>,
/// Cache for SST pages.
page_cache: Option<PageCache>,
/// A Cache for writing files to object stores.
// TODO(yingwen): Remove this once the cache is ready.
#[allow(unused)]
write_cache: Option<WriteCacheRef>,
}
pub type CacheManagerRef = Arc<CacheManager>;
@@ -111,6 +120,7 @@ impl CacheManager {
sst_meta_cache,
vector_cache,
page_cache,
write_cache: None,
}
}
@@ -184,6 +194,11 @@ impl CacheManager {
cache.insert(page_key, pages);
}
}
/// Gets the the write cache.
pub(crate) fn write_cache(&self) -> Option<&WriteCacheRef> {
self.write_cache.as_ref()
}
}
fn meta_cache_weight(k: &SstMetaKey, v: &Arc<ParquetMetaData>) -> u32 {

View File

@@ -14,6 +14,7 @@
//! A cache for files.
use std::sync::Arc;
use std::time::Instant;
use common_base::readable_size::ReadableSize;
@@ -21,7 +22,7 @@ use common_telemetry::{info, warn};
use futures::{FutureExt, TryStreamExt};
use moka::future::Cache;
use moka::notification::RemovalCause;
use object_store::util::{join_dir, join_path};
use object_store::util::join_path;
use object_store::{ErrorKind, Metakey, ObjectStore, Reader};
use snafu::ResultExt;
use store_api::storage::RegionId;
@@ -32,7 +33,7 @@ use crate::metrics::{CACHE_BYTES, CACHE_HIT, CACHE_MISS};
use crate::sst::file::FileId;
/// Subdirectory of cached files.
const FILE_DIR: &str = "files";
const FILE_DIR: &str = "files/";
/// A file cache manages files on local store and evict files based
/// on size.
@@ -40,25 +41,18 @@ const FILE_DIR: &str = "files";
pub(crate) struct FileCache {
/// Local store to cache files.
local_store: ObjectStore,
/// Cached file directory under cache home.
file_dir: String,
/// Index to track cached files.
///
/// File id is enough to identity a file uniquely.
memory_index: Cache<IndexKey, IndexValue>,
}
pub(crate) type FileCacheRef = Arc<FileCache>;
impl FileCache {
/// Creates a new file cache.
pub(crate) fn new(
local_store: ObjectStore,
cache_home: String,
capacity: ReadableSize,
) -> FileCache {
// Stores files under `cache_home/{FILE_DIR}`.
let file_dir = cache_file_dir(&cache_home);
pub(crate) fn new(local_store: ObjectStore, capacity: ReadableSize) -> FileCache {
let cache_store = local_store.clone();
let cache_file_dir = file_dir.clone();
let memory_index = Cache::builder()
.weigher(|_key, value: &IndexValue| -> u32 {
// We only measure space on local store.
@@ -67,7 +61,8 @@ impl FileCache {
.max_capacity(capacity.as_bytes())
.async_eviction_listener(move |key, value, cause| {
let store = cache_store.clone();
let file_path = cache_file_path(&cache_file_dir, *key);
// Stores files under FILE_DIR.
let file_path = cache_file_path(FILE_DIR, *key);
async move {
if let RemovalCause::Replaced = cause {
// The cache is replaced by another file. This is unexpected, we don't remove the same
@@ -91,7 +86,6 @@ impl FileCache {
.build();
FileCache {
local_store,
file_dir,
memory_index,
}
}
@@ -145,7 +139,7 @@ impl FileCache {
let mut lister = self
.local_store
.lister_with(&self.file_dir)
.lister_with(FILE_DIR)
.metakey(Metakey::ContentLength)
.await
.context(OpenDalSnafu)?;
@@ -182,7 +176,7 @@ impl FileCache {
/// Returns the cache file path for the key.
pub(crate) fn cache_file_path(&self, key: IndexKey) -> String {
cache_file_path(&self.file_dir, key)
cache_file_path(FILE_DIR, key)
}
/// Returns the local store of the file cache.
@@ -203,11 +197,6 @@ pub(crate) struct IndexValue {
file_size: u32,
}
/// Returns the directory to store files.
fn cache_file_dir(cache_home: &str) -> String {
join_dir(cache_home, FILE_DIR)
}
/// Generates the path to the cached file.
///
/// The file name format is `{region_id}.{file_id}`
@@ -245,13 +234,8 @@ mod tests {
async fn test_file_cache_basic() {
let dir = create_temp_dir("");
let local_store = new_fs_store(dir.path().to_str().unwrap());
let cache_home = "cache".to_string();
let cache = FileCache::new(
local_store.clone(),
cache_home.clone(),
ReadableSize::mb(10),
);
let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10));
let region_id = RegionId::new(2000, 0);
let file_id = FileId::random();
let key = (region_id, file_id);
@@ -291,13 +275,8 @@ mod tests {
async fn test_file_cache_file_removed() {
let dir = create_temp_dir("");
let local_store = new_fs_store(dir.path().to_str().unwrap());
let cache_home = "cache".to_string();
let cache = FileCache::new(
local_store.clone(),
cache_home.clone(),
ReadableSize::mb(10),
);
let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10));
let region_id = RegionId::new(2000, 0);
let file_id = FileId::random();
let key = (region_id, file_id);
@@ -326,12 +305,7 @@ mod tests {
async fn test_file_cache_recover() {
let dir = create_temp_dir("");
let local_store = new_fs_store(dir.path().to_str().unwrap());
let cache_home = "cache".to_string();
let cache = FileCache::new(
local_store.clone(),
cache_home.clone(),
ReadableSize::mb(10),
);
let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10));
let region_id = RegionId::new(2000, 0);
// Write N files.
@@ -354,11 +328,7 @@ mod tests {
}
// Recover the cache.
let cache = FileCache::new(
local_store.clone(),
cache_home.clone(),
ReadableSize::mb(10),
);
let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10));
// No entry before recovery.
assert!(cache.reader((region_id, file_ids[0])).await.is_none());
cache.recover().await.unwrap();

86
src/mito2/src/cache/write_cache.rs vendored Normal file
View File

@@ -0,0 +1,86 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! A write-through cache for remote object stores.
use std::sync::Arc;
use common_base::readable_size::ReadableSize;
use object_store::manager::ObjectStoreManagerRef;
use object_store::ObjectStore;
use store_api::metadata::RegionMetadataRef;
use crate::cache::file_cache::{FileCache, FileCacheRef};
use crate::error::Result;
use crate::read::Source;
use crate::sst::file::FileId;
use crate::sst::parquet::writer::ParquetWriter;
use crate::sst::parquet::{SstInfo, WriteOptions};
/// A cache for uploading files to remote object stores.
///
/// It keeps files in local disk and then sends files to object stores.
pub struct WriteCache {
/// Local file cache.
file_cache: FileCacheRef,
/// Object store manager.
object_store_manager: ObjectStoreManagerRef,
}
pub type WriteCacheRef = Arc<WriteCache>;
impl WriteCache {
/// Create the cache with a `local_store` to cache files and a
/// `object_store_manager` for all object stores.
pub fn new(
local_store: ObjectStore,
object_store_manager: ObjectStoreManagerRef,
cache_capacity: ReadableSize,
) -> Self {
Self {
file_cache: Arc::new(FileCache::new(local_store, cache_capacity)),
object_store_manager,
}
}
/// Recovers the write cache from local store.
pub async fn recover(&self) -> Result<()> {
self.file_cache.recover().await
}
/// Writes SST to the cache and then uploads it to the remote object store.
pub async fn write_and_upload_sst(
&self,
request: SstUploadRequest,
write_opts: &WriteOptions,
) -> Result<Option<SstInfo>> {
// TODO(yingwen): Write to the local store and then upload.
// Now we write to the remote and ignore local cache.
let mut writer =
ParquetWriter::new(request.upload_path, request.metadata, request.remote_store);
writer.write_all(request.source, write_opts).await
}
}
/// Request to write and upload a SST.
pub struct SstUploadRequest {
pub file_id: FileId,
pub metadata: RegionMetadataRef,
pub source: Source,
pub storage: Option<String>,
/// Path to upload the file.
pub upload_path: String,
/// Remote object store to upload.
pub remote_store: ObjectStore,
}

View File

@@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod output;
mod picker;
#[cfg(test)]
mod test_util;
@@ -30,6 +29,7 @@ use store_api::storage::RegionId;
use tokio::sync::mpsc::{self, Sender};
use crate::access_layer::AccessLayerRef;
use crate::cache::CacheManagerRef;
use crate::compaction::twcs::TwcsPicker;
use crate::config::MitoConfig;
use crate::error::{
@@ -55,6 +55,7 @@ pub struct CompactionRequest {
pub(crate) start_time: Instant,
/// Buffering threshold while writing SST files.
pub(crate) sst_write_buffer_size: ReadableSize,
pub(crate) cache_manager: CacheManagerRef,
}
impl CompactionRequest {
@@ -88,14 +89,20 @@ pub(crate) struct CompactionScheduler {
region_status: HashMap<RegionId, CompactionStatus>,
/// Request sender of the worker that this scheduler belongs to.
request_sender: Sender<WorkerRequest>,
cache_manager: CacheManagerRef,
}
impl CompactionScheduler {
pub(crate) fn new(scheduler: SchedulerRef, request_sender: Sender<WorkerRequest>) -> Self {
pub(crate) fn new(
scheduler: SchedulerRef,
request_sender: Sender<WorkerRequest>,
cache_manager: CacheManagerRef,
) -> Self {
Self {
scheduler,
region_status: HashMap::new(),
request_sender,
cache_manager,
}
}
@@ -122,8 +129,12 @@ impl CompactionScheduler {
access_layer.clone(),
file_purger.clone(),
);
let request =
status.new_compaction_request(self.request_sender.clone(), waiter, engine_config);
let request = status.new_compaction_request(
self.request_sender.clone(),
waiter,
engine_config,
self.cache_manager.clone(),
);
self.region_status.insert(region_id, status);
self.schedule_compaction_request(request)
}
@@ -142,6 +153,7 @@ impl CompactionScheduler {
self.request_sender.clone(),
OptionOutputTx::none(),
engine_config,
self.cache_manager.clone(),
);
// Try to schedule next compaction task for this region.
if let Err(e) = self.schedule_compaction_request(request) {
@@ -314,6 +326,7 @@ impl CompactionStatus {
request_sender: Sender<WorkerRequest>,
waiter: OptionOutputTx,
engine_config: Arc<MitoConfig>,
cache_manager: CacheManagerRef,
) -> CompactionRequest {
let current_version = self.version_control.current().version;
let start_time = Instant::now();
@@ -325,6 +338,7 @@ impl CompactionStatus {
file_purger: self.file_purger.clone(),
start_time,
sst_write_buffer_size: engine_config.sst_write_buffer_size,
cache_manager,
};
if let Some(pending) = self.pending_compaction.take() {

View File

@@ -1,86 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_base::readable_size::ReadableSize;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::RegionId;
use crate::access_layer::AccessLayerRef;
use crate::error;
use crate::read::projection::ProjectionMapper;
use crate::read::seq_scan::SeqScan;
use crate::read::{BoxedBatchReader, Source};
use crate::sst::file::{FileHandle, FileId, FileMeta, Level};
use crate::sst::parquet::{SstInfo, WriteOptions};
#[derive(Debug)]
pub(crate) struct CompactionOutput {
pub output_file_id: FileId,
/// Compaction output file level.
pub output_level: Level,
/// Compaction input files.
pub inputs: Vec<FileHandle>,
}
impl CompactionOutput {
pub(crate) async fn build(
&self,
region_id: RegionId,
schema: RegionMetadataRef,
sst_layer: AccessLayerRef,
sst_write_buffer_size: ReadableSize,
) -> error::Result<Option<FileMeta>> {
let reader = build_sst_reader(schema.clone(), sst_layer.clone(), &self.inputs).await?;
let opts = WriteOptions {
write_buffer_size: sst_write_buffer_size,
..Default::default()
};
// TODO(hl): measure merge elapsed time.
let mut writer = sst_layer.write_sst(self.output_file_id, schema, Source::Reader(reader));
let meta = writer.write_all(&opts).await?.map(
|SstInfo {
time_range,
file_size,
..
}| {
FileMeta {
region_id,
file_id: self.output_file_id,
time_range,
level: self.output_level,
file_size,
}
},
);
Ok(meta)
}
}
/// Builds [BoxedBatchReader] that reads all SST files and yields batches in primary key order.
async fn build_sst_reader(
schema: RegionMetadataRef,
sst_layer: AccessLayerRef,
inputs: &[FileHandle],
) -> error::Result<BoxedBatchReader> {
SeqScan::new(sst_layer, ProjectionMapper::all(&schema)?)
.with_files(inputs.to_vec())
// We ignore file not found error during compaction.
.with_ignore_file_not_found(true)
.build_reader()
.await
}

View File

@@ -27,18 +27,21 @@ use store_api::metadata::RegionMetadataRef;
use store_api::storage::RegionId;
use tokio::sync::mpsc;
use crate::access_layer::AccessLayerRef;
use crate::compaction::output::CompactionOutput;
use crate::access_layer::{AccessLayerRef, SstWriteRequest};
use crate::cache::CacheManagerRef;
use crate::compaction::picker::{CompactionTask, Picker};
use crate::compaction::CompactionRequest;
use crate::error;
use crate::error::CompactRegionSnafu;
use crate::error::{self, CompactRegionSnafu};
use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_STAGE_ELAPSED};
use crate::read::projection::ProjectionMapper;
use crate::read::seq_scan::SeqScan;
use crate::read::{BoxedBatchReader, Source};
use crate::request::{
BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, WorkerRequest,
};
use crate::sst::file::{FileHandle, FileId, FileMeta};
use crate::sst::file::{FileHandle, FileId, FileMeta, Level};
use crate::sst::file_purger::FilePurgerRef;
use crate::sst::parquet::WriteOptions;
use crate::sst::version::LevelMeta;
const MAX_PARALLEL_COMPACTION: usize = 8;
@@ -126,6 +129,7 @@ impl Picker for TwcsPicker {
file_purger,
start_time,
sst_write_buffer_size,
cache_manager,
} = req;
let region_metadata = current_version.metadata.clone();
@@ -169,7 +173,7 @@ impl Picker for TwcsPicker {
}
let task = TwcsCompactionTask {
region_id,
schema: region_metadata,
metadata: region_metadata,
sst_layer: access_layer,
outputs,
expired_ssts,
@@ -179,6 +183,8 @@ impl Picker for TwcsPicker {
waiters,
file_purger,
start_time,
cache_manager,
storage: current_version.options.storage.clone(),
};
Some(Box::new(task))
}
@@ -228,7 +234,7 @@ fn find_latest_window_in_seconds<'a>(
pub(crate) struct TwcsCompactionTask {
pub region_id: RegionId,
pub schema: RegionMetadataRef,
pub metadata: RegionMetadataRef,
pub sst_layer: AccessLayerRef,
pub outputs: Vec<CompactionOutput>,
pub expired_ssts: Vec<FileHandle>,
@@ -241,6 +247,9 @@ pub(crate) struct TwcsCompactionTask {
pub waiters: Vec<OutputTx>,
/// Start time of compaction task
pub start_time: Instant,
pub(crate) cache_manager: CacheManagerRef,
/// Target storage of the region.
pub(crate) storage: Option<String>,
}
impl Debug for TwcsCompactionTask {
@@ -274,11 +283,8 @@ impl TwcsCompactionTask {
let mut futs = Vec::with_capacity(self.outputs.len());
let mut compacted_inputs =
Vec::with_capacity(self.outputs.iter().map(|o| o.inputs.len()).sum());
let region_id = self.region_id;
for output in self.outputs.drain(..) {
let schema = self.schema.clone();
let sst_layer = self.sst_layer.clone();
let sst_write_buffer_size = self.sst_write_buffer_size;
compacted_inputs.extend(output.inputs.iter().map(FileHandle::meta));
info!(
@@ -293,15 +299,42 @@ impl TwcsCompactionTask {
output.output_file_id
);
// TODO(hl): Maybe spawn to runtime to exploit in-job parallelism.
let write_opts = WriteOptions {
write_buffer_size: self.sst_write_buffer_size,
..Default::default()
};
let metadata = self.metadata.clone();
let sst_layer = self.sst_layer.clone();
let region_id = self.region_id;
let cache_manager = self.cache_manager.clone();
let storage = self.storage.clone();
futs.push(async move {
output
.build(region_id, schema, sst_layer, sst_write_buffer_size)
.await
let reader =
build_sst_reader(metadata.clone(), sst_layer.clone(), &output.inputs).await?;
let file_meta_opt = sst_layer
.write_sst(
SstWriteRequest {
file_id: output.output_file_id,
metadata,
source: Source::Reader(reader),
cache_manager,
storage,
},
&write_opts,
)
.await?
.map(|sst_info| FileMeta {
region_id,
file_id: output.output_file_id,
time_range: sst_info.time_range,
level: output.output_level,
file_size: sst_info.file_size,
});
Ok(file_meta_opt)
});
}
let mut outputs = Vec::with_capacity(futs.len());
let mut output_files = Vec::with_capacity(futs.len());
while !futs.is_empty() {
let mut task_chunk = Vec::with_capacity(MAX_PARALLEL_COMPACTION);
for _ in 0..MAX_PARALLEL_COMPACTION {
@@ -314,11 +347,11 @@ impl TwcsCompactionTask {
.context(error::JoinSnafu)?
.into_iter()
.collect::<error::Result<Vec<_>>>()?;
outputs.extend(metas.into_iter().flatten());
output_files.extend(metas.into_iter().flatten());
}
let inputs = compacted_inputs.into_iter().collect();
Ok((outputs, inputs))
Ok((output_files, inputs))
}
async fn handle_compaction(&mut self) -> error::Result<(Vec<FileMeta>, Vec<FileMeta>)> {
@@ -485,6 +518,29 @@ fn get_expired_ssts(
.collect()
}
#[derive(Debug)]
pub(crate) struct CompactionOutput {
pub output_file_id: FileId,
/// Compaction output file level.
pub output_level: Level,
/// Compaction input files.
pub inputs: Vec<FileHandle>,
}
/// Builds [BoxedBatchReader] that reads all SST files and yields batches in primary key order.
async fn build_sst_reader(
metadata: RegionMetadataRef,
sst_layer: AccessLayerRef,
inputs: &[FileHandle],
) -> error::Result<BoxedBatchReader> {
SeqScan::new(sst_layer, ProjectionMapper::all(&metadata)?)
.with_files(inputs.to_vec())
// We ignore file not found error during compaction.
.with_ignore_file_not_found(true)
.build_reader()
.await
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;

View File

@@ -24,7 +24,8 @@ use store_api::storage::RegionId;
use strum::IntoStaticStr;
use tokio::sync::mpsc;
use crate::access_layer::AccessLayerRef;
use crate::access_layer::{AccessLayerRef, SstWriteRequest};
use crate::cache::CacheManagerRef;
use crate::config::MitoConfig;
use crate::error::{
Error, FlushRegionSnafu, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, Result,
@@ -200,6 +201,7 @@ pub(crate) struct RegionFlushTask {
pub(crate) listener: WorkerListener,
pub(crate) engine_config: Arc<MitoConfig>,
pub(crate) row_group_size: Option<usize>,
pub(crate) cache_manager: CacheManagerRef,
}
impl RegionFlushTask {
@@ -243,6 +245,7 @@ impl RegionFlushTask {
async fn do_flush(&mut self, version_data: VersionControlData) {
let timer = FLUSH_ELAPSED.with_label_values(&["total"]).start_timer();
self.listener.on_flush_begin(self.region_id).await;
let worker_request = match self.flush_memtables(&version_data.version).await {
Ok(file_metas) => {
let memtables_to_remove = version_data
@@ -252,6 +255,7 @@ impl RegionFlushTask {
.iter()
.map(|m| m.id())
.collect();
let flush_finished = FlushFinished {
region_id: self.region_id,
file_metas,
@@ -297,10 +301,10 @@ impl RegionFlushTask {
if let Some(row_group_size) = self.row_group_size {
write_opts.row_group_size = row_group_size;
}
let memtables = version.memtables.immutables();
let mut file_metas = Vec::with_capacity(memtables.len());
let mut flushed_bytes = 0;
for mem in memtables {
if mem.is_empty() {
// Skip empty memtables.
@@ -310,22 +314,32 @@ impl RegionFlushTask {
let file_id = FileId::random();
let iter = mem.iter(None, None);
let source = Source::Iter(iter);
let mut writer = self
// Flush to level 0.
let write_request = SstWriteRequest {
file_id,
metadata: version.metadata.clone(),
source,
cache_manager: self.cache_manager.clone(),
storage: version.options.storage.clone(),
};
let Some(sst_info) = self
.access_layer
.write_sst(file_id, version.metadata.clone(), source);
let Some(sst_info) = writer.write_all(&write_opts).await? else {
.write_sst(write_request, &write_opts)
.await?
else {
// No data written.
continue;
};
flushed_bytes += sst_info.file_size;
file_metas.push(FileMeta {
region_id: version.metadata.region_id,
let file_meta = FileMeta {
region_id: self.region_id,
file_id,
time_range: sst_info.time_range,
level: 0,
file_size: sst_info.file_size,
});
};
file_metas.push(file_meta);
}
if !file_metas.is_empty() {
@@ -334,8 +348,8 @@ impl RegionFlushTask {
let file_ids: Vec<_> = file_metas.iter().map(|f| f.file_id).collect();
info!(
"Successfully flush memtables, region: {}, reason: {}, files: {:?}, cost: {:?}",
version.metadata.region_id,
"Successfully flush memtables, region: {}, reason: {}, files: {:?}, cost: {:?}s",
self.region_id,
self.reason.as_str(),
file_ids,
timer.stop_and_record(),
@@ -652,6 +666,7 @@ mod tests {
use tokio::sync::oneshot;
use super::*;
use crate::cache::CacheManager;
use crate::test_util::scheduler_util::SchedulerEnv;
use crate::test_util::version_util::VersionControlBuilder;
@@ -728,6 +743,7 @@ mod tests {
listener: WorkerListener::default(),
engine_config: Arc::new(MitoConfig::default()),
row_group_size: None,
cache_manager: Arc::new(CacheManager::new(0, 0, 0)),
};
task.push_sender(OptionOutputTx::from(output_tx));
scheduler

View File

@@ -37,7 +37,7 @@ pub struct RegionOptions {
pub ttl: Option<Duration>,
/// Compaction options.
pub compaction: CompactionOptions,
/// Custom storage.
/// Custom storage. Uses default storage if it is `None`.
pub storage: Option<String>,
/// Wal options.
pub wal_options: WalOptions,

View File

@@ -27,8 +27,6 @@ use crate::region::version::{VersionControlData, VersionControlRef, VersionRef};
use crate::request::OptionOutputTx;
use crate::wal::{EntryId, WalWriter};
/// Context to keep region metadata and buffer write requests.
/// Notifier to notify write result on drop.
struct WriteNotify {
/// Error to send to the waiter.

View File

@@ -114,8 +114,12 @@ mod tests {
..Default::default()
};
let mut writer = ParquetWriter::new(file_path, metadata, source, object_store.clone());
let info = writer.write_all(&write_opts).await.unwrap().unwrap();
let mut writer = ParquetWriter::new(file_path, metadata, object_store.clone());
let info = writer
.write_all(source, &write_opts)
.await
.unwrap()
.unwrap();
assert_eq!(200, info.num_rows);
assert!(info.file_size > 0);
assert_eq!(
@@ -159,9 +163,12 @@ mod tests {
..Default::default()
};
// Prepare data.
let mut writer =
ParquetWriter::new(file_path, metadata.clone(), source, object_store.clone());
writer.write_all(&write_opts).await.unwrap().unwrap();
let mut writer = ParquetWriter::new(file_path, metadata.clone(), object_store.clone());
writer
.write_all(source, &write_opts)
.await
.unwrap()
.unwrap();
let cache = Some(Arc::new(CacheManager::new(0, 0, 64 * 1024 * 1024)));
let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store)
@@ -220,10 +227,9 @@ mod tests {
// write the sst file and get sst info
// sst info contains the parquet metadata, which is converted from FileMetaData
let mut writer =
ParquetWriter::new(file_path, metadata.clone(), source, object_store.clone());
let mut writer = ParquetWriter::new(file_path, metadata.clone(), object_store.clone());
let sst_info = writer
.write_all(&write_opts)
.write_all(source, &write_opts)
.await
.unwrap()
.expect("write_all should return sst info");

View File

@@ -36,8 +36,6 @@ use crate::sst::parquet::{SstInfo, WriteOptions, PARQUET_METADATA_KEY};
pub struct ParquetWriter {
/// SST output file path.
file_path: String,
/// Input data source.
source: Source,
/// Region metadata of the source and the target SST.
metadata: RegionMetadataRef,
object_store: ObjectStore,
@@ -48,12 +46,10 @@ impl ParquetWriter {
pub fn new(
file_path: String,
metadata: RegionMetadataRef,
source: Source,
object_store: ObjectStore,
) -> ParquetWriter {
ParquetWriter {
file_path,
source,
metadata,
object_store,
}
@@ -62,7 +58,11 @@ impl ParquetWriter {
/// Iterates source and writes all rows to Parquet file.
///
/// Returns the [SstInfo] if the SST is written.
pub async fn write_all(&mut self, opts: &WriteOptions) -> Result<Option<SstInfo>> {
pub async fn write_all(
&mut self,
mut source: Source,
opts: &WriteOptions,
) -> Result<Option<SstInfo>> {
let json = self.metadata.to_json().context(InvalidMetadataSnafu)?;
let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
@@ -88,7 +88,7 @@ impl ParquetWriter {
.context(WriteBufferSnafu)?;
let mut stats = SourceStats::default();
while let Some(batch) = self.source.next_batch().await? {
while let Some(batch) = source.next_batch().await? {
stats.update(&batch);
let arrow_batch = write_format.convert_batch(&batch)?;

View File

@@ -22,6 +22,7 @@ use object_store::ObjectStore;
use tokio::sync::mpsc::Sender;
use crate::access_layer::{AccessLayer, AccessLayerRef};
use crate::cache::CacheManager;
use crate::compaction::CompactionScheduler;
use crate::flush::FlushScheduler;
use crate::request::WorkerRequest;
@@ -65,7 +66,11 @@ impl SchedulerEnv {
) -> CompactionScheduler {
let scheduler = self.get_scheduler();
CompactionScheduler::new(scheduler, request_sender)
CompactionScheduler::new(
scheduler,
request_sender,
Arc::new(CacheManager::new(0, 0, 0)),
)
}
/// Creates a new flush scheduler.

View File

@@ -286,7 +286,11 @@ impl<S: LogStore> WorkerStarter<S> {
scheduler: self.scheduler.clone(),
write_buffer_manager: self.write_buffer_manager,
flush_scheduler: FlushScheduler::new(self.scheduler.clone()),
compaction_scheduler: CompactionScheduler::new(self.scheduler, sender.clone()),
compaction_scheduler: CompactionScheduler::new(
self.scheduler,
sender.clone(),
self.cache_manager.clone(),
),
stalled_requests: StalledRequests::default(),
listener: self.listener,
cache_manager: self.cache_manager,

View File

@@ -137,7 +137,6 @@ impl<S> RegionWorkerLoop<S> {
row_group_size: Option<usize>,
engine_config: Arc<MitoConfig>,
) -> RegionFlushTask {
// TODO(yingwen): metrics for flush requested.
RegionFlushTask {
region_id: region.region_id,
reason,
@@ -149,6 +148,7 @@ impl<S> RegionWorkerLoop<S> {
listener: self.listener.clone(),
engine_config,
row_group_size,
cache_manager: self.cache_manager.clone(),
}
}
}