fix: Adds FileHandle to ChunkStream (#1255)

* test: Add compaction test

* test: Test read during compaction

* test: Add s3 object store to test

* test: only run compact test

* feat: Hold file handle in chunk stream

* test: check files still exist after compact

* feat: Revert changes to develop.yaml

* test: Simplify MockPurgeHandler
This commit is contained in:
Yingwen
2023-03-28 16:22:07 +08:00
committed by GitHub
parent f491a040f5
commit e72ce5eaa9
12 changed files with 559 additions and 58 deletions

View File

@@ -18,6 +18,7 @@ use std::marker::PhantomData;
use store_api::storage::RegionId;
use crate::compaction::{CompactionTask, Picker, PickerContext};
use crate::error::Result;
use crate::scheduler::{Request, Scheduler};
pub struct NoopCompactionScheduler<R> {
@@ -48,11 +49,7 @@ impl Picker for NoopCompactionPicker {
type Request = NoopCompactionRequest;
type Task = NoopCompactionTask;
fn pick(
&self,
_ctx: &PickerContext,
_req: &Self::Request,
) -> crate::error::Result<Option<Self::Task>> {
fn pick(&self, _ctx: &PickerContext, _req: &Self::Request) -> Result<Option<Self::Task>> {
Ok(None)
}
}
@@ -62,7 +59,7 @@ pub struct NoopCompactionTask;
#[async_trait::async_trait]
impl CompactionTask for NoopCompactionTask {
async fn run(self) -> crate::error::Result<()> {
async fn run(self) -> Result<()> {
Ok(())
}
}
@@ -73,6 +70,8 @@ impl Request for NoopCompactionRequest {
fn key(&self) -> Self::Key {
0
}
fn complete(self, _result: Result<()>) {}
}
#[async_trait::async_trait]
@@ -82,11 +81,11 @@ where
{
type Request = R;
fn schedule(&self, _request: Self::Request) -> crate::error::Result<bool> {
fn schedule(&self, _request: Self::Request) -> Result<bool> {
Ok(true)
}
async fn stop(&self, _await_termination: bool) -> crate::error::Result<()> {
async fn stop(&self, _await_termination: bool) -> Result<()> {
Ok(())
}
}

View File

@@ -18,6 +18,7 @@ use std::time::Duration;
use common_telemetry::{debug, error, info};
use store_api::logstore::LogStore;
use store_api::storage::RegionId;
use tokio::sync::oneshot::Sender;
use tokio::sync::Notify;
use crate::compaction::picker::{Picker, PickerContext};
@@ -39,6 +40,13 @@ impl<S: LogStore> Request for CompactionRequestImpl<S> {
fn key(&self) -> RegionId {
self.region_id
}
fn complete(self, result: Result<()>) {
if let Some(sender) = self.sender {
// We don't care the send result as
let _ = sender.send(result);
}
}
}
/// Region compaction request.
@@ -50,6 +58,8 @@ pub struct CompactionRequestImpl<S: LogStore> {
pub manifest: RegionManifest,
pub wal: Wal<S>,
pub ttl: Option<Duration>,
/// Compaction result sender.
pub sender: Option<Sender<Result<()>>>,
}
impl<S: LogStore> CompactionRequestImpl<S> {
@@ -90,6 +100,7 @@ where
let region_id = req.key();
let Some(task) = self.picker.pick(&PickerContext {}, &req)? else {
info!("No file needs compaction in region: {:?}", region_id);
req.complete(Ok(()));
return Ok(());
};
@@ -99,8 +110,12 @@ where
if let Err(e) = task.run().await {
// TODO(hl): maybe resubmit compaction task on failure?
error!(e; "Failed to compact region: {:?}", region_id);
req.complete(Err(e));
} else {
info!("Successfully compacted region: {:?}", region_id);
req.complete(Ok(()));
}
// releases rate limit token
token.try_release();

View File

@@ -443,6 +443,12 @@ pub enum Error {
#[snafu(display("Failed to create a checkpoint: {}", msg))]
ManifestCheckpoint { msg: String, backtrace: Backtrace },
#[snafu(display("The compaction task is cancelled, region_id: {}", region_id))]
CompactTaskCancel {
region_id: RegionId,
source: tokio::sync::oneshot::error::RecvError,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -514,8 +520,9 @@ impl ErrorExt for Error {
ConvertChunk { source, .. } => source.status_code(),
MarkWalObsolete { source, .. } => source.status_code(),
DecodeParquetTimeRange { .. } => StatusCode::Unexpected,
RateLimited { .. } => StatusCode::Internal,
StopScheduler { .. } => StatusCode::Internal,
RateLimited { .. } | StopScheduler { .. } | CompactTaskCancel { .. } => {
StatusCode::Internal
}
DeleteSst { .. } => StatusCode::StorageUnavailable,
IllegalSchedulerState { .. } => StatusCode::Unexpected,
TtlCalculation { source, .. } => source.status_code(),

View File

@@ -18,6 +18,7 @@ use common_telemetry::{debug, error};
use store_api::storage::RegionId;
use tokio::sync::Notify;
use crate::error::Result;
use crate::scheduler::rate_limit::{BoxedRateLimitToken, RateLimitToken};
use crate::scheduler::{Handler, LocalScheduler, Request};
use crate::sst::{AccessLayerRef, FileId};
@@ -34,6 +35,8 @@ impl Request for FilePurgeRequest {
fn key(&self) -> Self::Key {
format!("{}/{}", self.region_id, self.file_id)
}
fn complete(self, _result: Result<()>) {}
}
pub struct FilePurgeHandler;
@@ -47,7 +50,7 @@ impl Handler for FilePurgeHandler {
req: Self::Request,
token: BoxedRateLimitToken,
finish_notifier: Arc<Notify>,
) -> crate::error::Result<()> {
) -> Result<()> {
req.sst_layer.delete_sst(req.file_id).await.map_err(|e| {
error!(e; "Failed to delete SST file, file: {}, region: {}",
req.file_id.as_parquet(), req.region_id);
@@ -72,6 +75,7 @@ pub mod noop {
use tokio::sync::Notify;
use crate::error::Result;
use crate::file_purger::{FilePurgeRequest, FilePurgerRef};
use crate::scheduler::rate_limit::{BoxedRateLimitToken, RateLimitToken};
use crate::scheduler::{Handler, LocalScheduler, SchedulerConfig};
@@ -95,7 +99,7 @@ pub mod noop {
_req: Self::Request,
token: BoxedRateLimitToken,
finish_notifier: Arc<Notify>,
) -> crate::error::Result<()> {
) -> Result<()> {
token.try_release();
finish_notifier.notify_one();
Ok(())

View File

@@ -161,6 +161,23 @@ pub struct StoreConfig<S: LogStore> {
pub type RecoverdMetadata = (SequenceNumber, (ManifestVersion, RawRegionMetadata));
pub type RecoveredMetadataMap = BTreeMap<SequenceNumber, (ManifestVersion, RawRegionMetadata)>;
#[derive(Debug)]
pub struct CompactContext {
/// Whether to wait the compaction result.
pub wait: bool,
/// Max file number in level 0.
pub max_files_in_l0: usize,
}
impl Default for CompactContext {
fn default() -> CompactContext {
CompactContext {
wait: true,
max_files_in_l0: 1,
}
}
}
impl<S: LogStore> RegionImpl<S> {
/// Create a new region and also persist the region metadata to manifest.
///
@@ -471,6 +488,11 @@ impl<S: LogStore> RegionImpl<S> {
version
}
}
/// Compact the region manually.
pub async fn compact(&self, ctx: CompactContext) -> Result<()> {
self.inner.compact(ctx).await
}
}
// Private methods for tests.
@@ -623,4 +645,19 @@ impl<S: LogStore> RegionInner<S> {
};
self.writer.flush(writer_ctx, ctx).await
}
/// Compact the region manually.
async fn compact(&self, ctx: CompactContext) -> Result<()> {
let writer_ctx = WriterContext {
shared: &self.shared,
flush_strategy: &self.flush_strategy,
flush_scheduler: &self.flush_scheduler,
compaction_scheduler: &self.compaction_scheduler,
sst_layer: &self.sst_layer,
wal: &self.wal,
writer: &self.writer,
manifest: &self.manifest,
};
self.writer.compact(writer_ctx, ctx).await
}
}

View File

@@ -17,6 +17,7 @@
mod alter;
mod basic;
mod close;
mod compact;
mod flush;
mod projection;
@@ -38,6 +39,7 @@ use store_api::storage::{
};
use super::*;
use crate::chunk::ChunkReaderImpl;
use crate::file_purger::noop::NoopFilePurgeHandler;
use crate::manifest::action::{RegionChange, RegionMetaActionList};
use crate::manifest::test_utils::*;
@@ -158,6 +160,28 @@ impl<S: LogStore> TesterBase<S> {
self.region.write(&self.write_ctx, batch).await.unwrap()
}
/// Returns a reader to scan all data.
pub async fn full_scan_reader(&self) -> ChunkReaderImpl {
let snapshot = self.region.snapshot(&self.read_ctx).unwrap();
let resp = snapshot
.scan(&self.read_ctx, ScanRequest::default())
.await
.unwrap();
resp.reader
}
/// Collect data from the reader.
pub async fn collect_reader(&self, mut reader: ChunkReaderImpl) -> Vec<(i64, Option<i64>)> {
let mut dst = Vec::new();
while let Some(chunk) = reader.next_chunk().await.unwrap() {
let chunk = reader.project_chunk(chunk);
append_chunk_to(&chunk, &mut dst);
}
dst
}
}
pub type FileTesterBase = TesterBase<RaftEngineLogStore>;

View File

@@ -0,0 +1,282 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Region compaction tests.
use std::env;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use common_telemetry::logging;
use common_test_util::temp_dir::create_temp_dir;
use log_store::raft_engine::log_store::RaftEngineLogStore;
use object_store::services::{Fs, S3};
use object_store::ObjectStore;
use store_api::storage::{FlushContext, Region, WriteResponse};
use tokio::sync::Notify;
use crate::compaction::{CompactionHandler, SimplePicker};
use crate::config::EngineConfig;
use crate::error::Result;
use crate::file_purger::{FilePurgeHandler, FilePurgeRequest};
use crate::region::tests::{self, FileTesterBase};
use crate::region::{CompactContext, FlushStrategyRef, RegionImpl};
use crate::scheduler::rate_limit::BoxedRateLimitToken;
use crate::scheduler::{Handler, LocalScheduler, SchedulerConfig};
use crate::test_util::config_util;
use crate::test_util::flush_switch::FlushSwitch;
const REGION_NAME: &str = "region-compact-0";
fn new_object_store(store_dir: &str, s3_bucket: Option<String>) -> ObjectStore {
if let Some(bucket) = s3_bucket {
if !bucket.is_empty() {
logging::info!("Use S3 object store");
let root = uuid::Uuid::new_v4().to_string();
let mut builder = S3::default();
builder
.root(&root)
.access_key_id(&env::var("GT_S3_ACCESS_KEY_ID").unwrap())
.secret_access_key(&env::var("GT_S3_ACCESS_KEY").unwrap())
.bucket(&bucket);
return ObjectStore::new(builder).unwrap().finish();
}
}
logging::info!("Use local fs object store");
let mut builder = Fs::default();
builder.root(store_dir);
ObjectStore::new(builder).unwrap().finish()
}
/// Create a new region for compaction test
async fn create_region_for_compaction<
H: Handler<Request = FilePurgeRequest> + Send + Sync + 'static,
>(
store_dir: &str,
enable_version_column: bool,
engine_config: EngineConfig,
purge_handler: H,
flush_strategy: FlushStrategyRef,
s3_bucket: Option<String>,
) -> (RegionImpl<RaftEngineLogStore>, ObjectStore) {
let metadata = tests::new_metadata(REGION_NAME, enable_version_column);
let object_store = new_object_store(store_dir, s3_bucket);
let mut store_config = config_util::new_store_config_with_object_store(
REGION_NAME,
store_dir,
object_store.clone(),
)
.await;
store_config.engine_config = Arc::new(engine_config);
store_config.flush_strategy = flush_strategy;
let picker = SimplePicker::default();
let handler = CompactionHandler::new(picker);
let config = SchedulerConfig::default();
// Overwrite test compaction scheduler and file purger.
store_config.compaction_scheduler = Arc::new(LocalScheduler::new(config, handler));
store_config.file_purger = Arc::new(LocalScheduler::new(
SchedulerConfig {
max_inflight_tasks: store_config.engine_config.max_purge_tasks,
},
purge_handler,
));
(
RegionImpl::create(metadata, store_config).await.unwrap(),
object_store,
)
}
#[derive(Debug, Default, Clone)]
struct MockFilePurgeHandler {
num_deleted: Arc<AtomicUsize>,
}
#[async_trait::async_trait]
impl Handler for MockFilePurgeHandler {
type Request = FilePurgeRequest;
async fn handle_request(
&self,
req: Self::Request,
token: BoxedRateLimitToken,
finish_notifier: Arc<Notify>,
) -> Result<()> {
logging::info!(
"Try to delete file: {:?}, num_deleted: {:?}",
req.file_id,
self.num_deleted
);
let handler = FilePurgeHandler;
handler
.handle_request(req, token, finish_notifier)
.await
.unwrap();
self.num_deleted.fetch_add(1, Ordering::Relaxed);
Ok(())
}
}
impl MockFilePurgeHandler {
fn num_deleted(&self) -> usize {
self.num_deleted.load(Ordering::Relaxed)
}
}
/// Tester for region compaction.
struct CompactionTester {
base: Option<FileTesterBase>,
purge_handler: MockFilePurgeHandler,
object_store: ObjectStore,
}
impl CompactionTester {
async fn new(
store_dir: &str,
engine_config: EngineConfig,
flush_strategy: FlushStrategyRef,
s3_bucket: Option<String>,
) -> CompactionTester {
let purge_handler = MockFilePurgeHandler::default();
let (region, object_store) = create_region_for_compaction(
store_dir,
false,
engine_config.clone(),
purge_handler.clone(),
flush_strategy,
s3_bucket,
)
.await;
CompactionTester {
base: Some(FileTesterBase::with_region(region)),
purge_handler,
object_store,
}
}
#[inline]
fn base(&self) -> &FileTesterBase {
self.base.as_ref().unwrap()
}
#[inline]
fn base_mut(&mut self) -> &mut FileTesterBase {
self.base.as_mut().unwrap()
}
async fn put(&self, data: &[(i64, Option<i64>)]) -> WriteResponse {
self.base().put(data).await
}
async fn flush(&self, wait: Option<bool>) {
let ctx = wait.map(|wait| FlushContext { wait }).unwrap_or_default();
self.base().region.flush(&ctx).await.unwrap();
}
async fn compact(&self) {
// Trigger compaction and wait until it is done.
self.base()
.region
.compact(CompactContext::default())
.await
.unwrap();
}
/// Close region and clean up files.
async fn clean_up(mut self) {
self.base = None;
self.object_store.remove_all("/").await.unwrap();
}
}
async fn compact_during_read(s3_bucket: Option<String>) {
let dir = create_temp_dir("compact_read");
let store_dir = dir.path().to_str().unwrap();
// Use a large max_files_in_l0 to avoid compaction automatically.
let mut tester = CompactionTester::new(
store_dir,
EngineConfig {
max_files_in_l0: 100,
..Default::default()
},
// Disable auto-flush.
Arc::new(FlushSwitch::default()),
s3_bucket,
)
.await;
let expect: Vec<_> = (0..200).map(|v| (v, Some(v))).collect();
// Put elements so we have content to flush (In SST1).
tester.put(&expect[0..100]).await;
// Flush content to SST1.
tester.flush(None).await;
// Put element (In SST2).
tester.put(&expect[100..200]).await;
// Flush content to SST2.
tester.flush(None).await;
tester.base_mut().read_ctx.batch_size = 1;
// Create a reader.
let reader = tester.base().full_scan_reader().await;
assert_eq!(0, tester.purge_handler.num_deleted());
// Trigger compaction.
tester.compact().await;
// The files are still referenced.
assert_eq!(0, tester.purge_handler.num_deleted());
// Read from the reader.
let output = tester.base().collect_reader(reader).await;
assert_eq!(expect.len(), output.len());
tester.clean_up().await;
}
#[tokio::test]
async fn test_compact_during_read_on_fs() {
common_telemetry::init_default_ut_logging();
compact_during_read(None).await;
}
#[tokio::test]
async fn test_compact_during_read_on_s3() {
common_telemetry::init_default_ut_logging();
if let Ok(bucket) = env::var("GT_S3_BUCKET") {
if !bucket.is_empty() {
compact_during_read(Some(bucket)).await;
}
}
}

View File

@@ -23,7 +23,7 @@ use snafu::{ensure, ResultExt};
use store_api::logstore::LogStore;
use store_api::manifest::{Manifest, ManifestVersion, MetaAction};
use store_api::storage::{AlterRequest, FlushContext, SequenceNumber, WriteContext, WriteResponse};
use tokio::sync::Mutex;
use tokio::sync::{oneshot, Mutex};
use crate::background::JobHandle;
use crate::compaction::{CompactionRequestImpl, CompactionSchedulerRef};
@@ -36,7 +36,9 @@ use crate::manifest::action::{
use crate::memtable::{Inserter, MemtableBuilderRef, MemtableId, MemtableRef};
use crate::metadata::RegionMetadataRef;
use crate::proto::wal::WalHeader;
use crate::region::{RecoverdMetadata, RecoveredMetadataMap, RegionManifest, SharedDataRef};
use crate::region::{
CompactContext, RecoverdMetadata, RecoveredMetadataMap, RegionManifest, SharedDataRef,
};
use crate::schema::compat::CompatWrite;
use crate::sst::AccessLayerRef;
use crate::version::{VersionControl, VersionControlRef, VersionEdit, VersionRef};
@@ -286,6 +288,19 @@ impl RegionWriter {
Ok(())
}
/// Compact manually.
pub async fn compact<S: LogStore>(
&self,
writer_ctx: WriterContext<'_, S>,
ctx: CompactContext,
) -> Result<()> {
let mut inner = self.inner.lock().await;
ensure!(!inner.is_closed(), error::ClosedRegionSnafu);
inner.manual_compact(writer_ctx, ctx).await
}
/// Cancel flush task if any
async fn cancel_flush(&self) -> Result<()> {
let mut inner = self.inner.lock().await;
@@ -644,6 +659,61 @@ impl WriterInner {
Ok(())
}
async fn manual_compact<S: LogStore>(
&mut self,
writer_ctx: WriterContext<'_, S>,
compact_ctx: CompactContext,
) -> Result<()> {
let region_id = writer_ctx.shared.id();
let mut compaction_request = CompactionRequestImpl {
region_id,
sst_layer: writer_ctx.sst_layer.clone(),
writer: writer_ctx.writer.clone(),
shared: writer_ctx.shared.clone(),
manifest: writer_ctx.manifest.clone(),
wal: writer_ctx.wal.clone(),
ttl: self.ttl,
sender: None,
};
let compaction_scheduler = writer_ctx.compaction_scheduler.clone();
let shared_data = writer_ctx.shared.clone();
logging::info!(
"Manual compact, region_id: {}, compact_ctx: {:?}",
region_id,
compact_ctx
);
if compact_ctx.wait {
let (sender, receiver) = oneshot::channel();
compaction_request.sender = Some(sender);
if Self::schedule_compaction(
shared_data,
compaction_scheduler,
compaction_request,
compact_ctx.max_files_in_l0,
)
.await
{
receiver
.await
.context(error::CompactTaskCancelSnafu { region_id })??;
}
} else {
Self::schedule_compaction(
shared_data,
compaction_scheduler,
compaction_request,
compact_ctx.max_files_in_l0,
)
.await;
}
Ok(())
}
fn build_flush_callback<S: LogStore>(
version: &VersionRef,
ctx: &WriterContext<S>,
@@ -659,40 +729,63 @@ impl WriterInner {
manifest: ctx.manifest.clone(),
wal: ctx.wal.clone(),
ttl,
sender: None,
};
let compaction_scheduler = ctx.compaction_scheduler.clone();
let shared_data = ctx.shared.clone();
let max_files_in_l0 = config.max_files_in_l0;
let schedule_compaction_cb = Box::pin(async move {
let level0_file_num = shared_data
.version_control
.current()
.ssts()
.level(0)
.file_num();
if level0_file_num <= max_files_in_l0 {
info!(
"No enough SST files in level 0 (threshold: {}), skip compaction",
max_files_in_l0
);
return;
}
match compaction_scheduler.schedule(compaction_request) {
Ok(scheduled) => {
info!(
"Schedule region {} compaction request result: {}",
region_id, scheduled
)
}
Err(e) => {
error!(e;"Failed to schedule region compaction request {}", region_id);
}
}
let schedule_compaction_cb = Box::pin(async move {
Self::schedule_compaction(
shared_data,
compaction_scheduler,
compaction_request,
max_files_in_l0,
)
.await;
});
Some(schedule_compaction_cb)
}
/// Schedule compaction task, returns whether the task is scheduled.
async fn schedule_compaction<S: LogStore>(
shared_data: SharedDataRef,
compaction_scheduler: CompactionSchedulerRef<S>,
compaction_request: CompactionRequestImpl<S>,
max_files_in_l0: usize,
) -> bool {
let region_id = shared_data.id();
let level0_file_num = shared_data
.version_control
.current()
.ssts()
.level(0)
.file_num();
if level0_file_num <= max_files_in_l0 {
info!(
"No enough SST files in level 0 (threshold: {}), skip compaction",
max_files_in_l0
);
return false;
}
match compaction_scheduler.schedule(compaction_request) {
Ok(scheduled) => {
info!(
"Schedule region {} compaction request result: {}",
region_id, scheduled
);
scheduled
}
Err(e) => {
error!(e;"Failed to schedule region compaction request {}", region_id);
false
}
}
}
async fn manual_flush<S: LogStore>(&mut self, writer_ctx: WriterContext<'_, S>) -> Result<()> {
self.trigger_flush(&writer_ctx).await?;
Ok(())

View File

@@ -24,8 +24,7 @@ use tokio::sync::Notify;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use crate::error;
use crate::error::{IllegalSchedulerStateSnafu, StopSchedulerSnafu};
use crate::error::{IllegalSchedulerStateSnafu, Result, StopSchedulerSnafu};
use crate::scheduler::dedup_deque::DedupDeque;
use crate::scheduler::rate_limit::{
BoxedRateLimitToken, CascadeRateLimiter, MaxInflightTaskLimiter, RateLimiter,
@@ -40,7 +39,11 @@ pub trait Request: Send + Sync + 'static {
/// Type of request key.
type Key: Eq + Hash + Clone + Debug + Send + Sync;
/// Returns the request key.
fn key(&self) -> Self::Key;
/// Notify the request result.
fn complete(self, result: Result<()>);
}
#[async_trait::async_trait]
@@ -52,7 +55,7 @@ pub trait Handler {
req: Self::Request,
token: BoxedRateLimitToken,
finish_notifier: Arc<Notify>,
) -> error::Result<()>;
) -> Result<()>;
}
/// [Scheduler] defines a set of API to schedule requests.
@@ -63,11 +66,11 @@ pub trait Scheduler: Debug {
/// Schedules a request.
/// Returns true if request is scheduled. Returns false if task queue already
/// contains the request with same key.
fn schedule(&self, request: Self::Request) -> error::Result<bool>;
fn schedule(&self, request: Self::Request) -> Result<bool>;
/// Stops scheduler. If `await_termination` is set to true, the scheduler will
/// wait until all queued requests are processed.
async fn stop(&self, await_termination: bool) -> error::Result<()>;
async fn stop(&self, await_termination: bool) -> Result<()>;
}
/// Scheduler config.
@@ -118,7 +121,7 @@ where
{
type Request = R;
fn schedule(&self, request: Self::Request) -> error::Result<bool> {
fn schedule(&self, request: Self::Request) -> Result<bool> {
ensure!(self.running(), IllegalSchedulerStateSnafu);
debug!(
"Schedule request: {:?}, queue size: {}",
@@ -131,7 +134,7 @@ where
Ok(res)
}
async fn stop(&self, await_termination: bool) -> error::Result<()> {
async fn stop(&self, await_termination: bool) -> Result<()> {
let state = if await_termination {
STATE_AWAIT_TERMINATION
} else {
@@ -279,7 +282,7 @@ where
req: R,
token: BoxedRateLimitToken,
finish_notifier: Arc<Notify>,
) -> error::Result<()> {
) -> Result<()> {
self.request_handler
.handle_request(req, token, finish_notifier)
.await
@@ -397,7 +400,7 @@ mod tests {
_req: Self::Request,
token: BoxedRateLimitToken,
finish_notifier: Arc<Notify>,
) -> error::Result<()> {
) -> Result<()> {
(self.cb)();
token.try_release();
finish_notifier.notify_one();
@@ -411,6 +414,8 @@ mod tests {
fn key(&self) -> Self::Key {
self.region_id
}
fn complete(self, _result: Result<()>) {}
}
#[tokio::test]

View File

@@ -256,7 +256,6 @@ impl FileHandle {
/// Actually data of [FileHandle].
///
/// Contains meta of the file, and other mutable info like metrics.
#[derive(Debug)]
struct FileHandleInner {
meta: FileMeta,
compacting: AtomicBool,
@@ -265,6 +264,16 @@ struct FileHandleInner {
file_purger: FilePurgerRef,
}
impl fmt::Debug for FileHandleInner {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("FileHandleInner")
.field("meta", &self.meta)
.field("compacting", &self.compacting)
.field("deleted", &self.deleted)
.finish()
}
}
impl Drop for FileHandleInner {
fn drop(&mut self) {
if self.deleted.load(Ordering::Relaxed) {
@@ -452,12 +461,19 @@ impl Source {
}
/// Sst access layer.
#[derive(Debug)]
pub struct FsAccessLayer {
sst_dir: String,
object_store: ObjectStore,
}
impl fmt::Debug for FsAccessLayer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("FsAccessLayer")
.field("sst_dir", &self.sst_dir)
.finish()
}
}
impl FsAccessLayer {
pub fn new(sst_dir: &str, object_store: ObjectStore) -> FsAccessLayer {
FsAccessLayer {

View File

@@ -312,7 +312,7 @@ impl ParquetReader {
}
});
ChunkStream::new(adapter, Box::pin(chunk_stream))
ChunkStream::new(self.file_handle.clone(), adapter, Box::pin(chunk_stream))
}
/// Builds time range row filter.
@@ -515,13 +515,23 @@ impl ArrowPredicate for PlainTimestampRowFilter {
pub type SendableChunkStream = Pin<Box<dyn Stream<Item = Result<RecordBatch>> + Send>>;
pub struct ChunkStream {
// Holds the file handle in the stream to avoid the purger purge it.
_file_handle: FileHandle,
adapter: ReadAdapter,
stream: SendableChunkStream,
}
impl ChunkStream {
pub fn new(adapter: ReadAdapter, stream: SendableChunkStream) -> Result<Self> {
Ok(Self { adapter, stream })
pub fn new(
file_handle: FileHandle,
adapter: ReadAdapter,
stream: SendableChunkStream,
) -> Result<Self> {
Ok(Self {
_file_handle: file_handle,
adapter,
stream,
})
}
}

View File

@@ -16,7 +16,7 @@ use std::sync::Arc;
use log_store::raft_engine::log_store::RaftEngineLogStore;
use log_store::LogConfig;
use object_store::services::Fs as Builder;
use object_store::services::Fs;
use object_store::ObjectStore;
use crate::background::JobPoolImpl;
@@ -38,15 +38,24 @@ fn log_store_dir(store_dir: &str) -> String {
pub async fn new_store_config(
region_name: &str,
store_dir: &str,
) -> StoreConfig<RaftEngineLogStore> {
let mut builder = Fs::default();
builder.root(store_dir);
let object_store = ObjectStore::new(builder).unwrap().finish();
new_store_config_with_object_store(region_name, store_dir, object_store).await
}
/// Create a new StoreConfig with given object store.
pub async fn new_store_config_with_object_store(
region_name: &str,
store_dir: &str,
object_store: ObjectStore,
) -> StoreConfig<RaftEngineLogStore> {
let parent_dir = "";
let sst_dir = engine::region_sst_dir(parent_dir, region_name);
let manifest_dir = engine::region_manifest_dir(parent_dir, region_name);
let mut builder = Builder::default();
builder.root(store_dir);
let object_store = ObjectStore::new(builder).unwrap().finish();
let sst_layer = Arc::new(FsAccessLayer::new(&sst_dir, object_store.clone()));
let manifest = RegionManifest::with_checkpointer(&manifest_dir, object_store);
let job_pool = Arc::new(JobPoolImpl {});