feat(mito2): add file purger and cooperate with scheduler to purge sst files (#2251)

* feat: add file purger and use scheduler

Signed-off-by: ZhuZiyi <zyzhu2001@gmail.com>

* chore: code format

Signed-off-by: ZhuZiyi <zyzhu2001@gmail.com>

* chore: code format

Signed-off-by: ZhuZiyi <zyzhu2001@gmail.com>

* feat: print some information about handling error message

Signed-off-by: ZhuZiyi <zyzhu2001@gmail.com>

* fix: resolve conversion

Signed-off-by: ZhuZiyi <zyzhu2001@gmail.com>

* chore: code format

Signed-off-by: ZhuZiyi <zyzhu2001@gmail.com>

* chore: resolve conversation

Signed-off-by: ZhuZiyi <zyzhu2001@gmail.com>

* fix: resolve conflicting files

Signed-off-by: ZhuZiyi <zyzhu2001@gmail.com>

* chore: code format

Signed-off-by: ZhuZiyi <zyzhu2001@gmail.com>

* chore: code format

Signed-off-by: ZhuZiyi <zyzhu2001@gmail.com>

---------

Signed-off-by: ZhuZiyi <zyzhu2001@gmail.com>
This commit is contained in:
Bamboo1
2023-08-29 15:55:03 +08:00
committed by GitHub
parent 805f254d15
commit 68600a2cf9
6 changed files with 191 additions and 13 deletions

View File

@@ -0,0 +1,59 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use object_store::{util, ObjectStore};
use snafu::ResultExt;
use crate::error::{DeleteSstSnafu, Result};
use crate::sst::file::FileId;
pub type AccessLayerRef = Arc<AccessLayer>;
/// Sst access layer.
pub struct AccessLayer {
sst_dir: String,
object_store: ObjectStore,
}
impl std::fmt::Debug for AccessLayer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AccessLayer")
.field("sst_dir", &self.sst_dir)
.finish()
}
}
impl AccessLayer {
pub fn new(sst_dir: &str, object_store: ObjectStore) -> AccessLayer {
AccessLayer {
sst_dir: sst_dir.to_string(),
object_store,
}
}
fn sst_file_path(&self, file_name: &str) -> String {
util::join_path(&self.sst_dir, file_name)
}
/// Deletes a SST file with given file id.
pub async fn delete_sst(&self, file_id: FileId) -> Result<()> {
let path = self.sst_file_path(&file_id.as_parquet());
self.object_store
.delete(&path)
.await
.context(DeleteSstSnafu { file_id })
}
}

View File

@@ -26,6 +26,7 @@ use snafu::{Location, Snafu};
use store_api::manifest::ManifestVersion;
use store_api::storage::RegionId;
use crate::sst::file::FileId;
use crate::worker::WorkerId;
#[derive(Debug, Snafu)]
@@ -387,6 +388,13 @@ pub enum Error {
source: table::error::Error,
location: Location,
},
#[snafu(display("Failed to delete SST file, file id: {}, source: {}", file_id, source))]
DeleteSst {
file_id: FileId,
source: object_store::Error,
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -448,6 +456,7 @@ impl ErrorExt for Error {
InvalidSchedulerState { .. } => StatusCode::InvalidArguments,
StopScheduler { .. } => StatusCode::Internal,
BuildPredicate { source, .. } => source.status_code(),
DeleteSst { .. } => StatusCode::StorageUnavailable,
}
}

View File

@@ -20,6 +20,8 @@
pub mod test_util;
// TODO(yingwen): Remove all `allow(dead_code)` after finish refactoring mito.
#[allow(dead_code)]
mod access_layer;
pub mod config;
#[allow(dead_code)]
pub mod engine;

View File

@@ -38,7 +38,7 @@ const STATE_AWAIT_TERMINATION: u8 = 2;
#[async_trait::async_trait]
pub trait Scheduler {
/// Schedules a Job
fn schedule(&self, req: Job) -> Result<()>;
fn schedule(&self, job: Job) -> Result<()>;
/// Stops scheduler. If `await_termination` is set to true, the scheduler will wait until all tasks are processed.
async fn stop(&self, await_termination: bool) -> Result<()>;
@@ -77,8 +77,8 @@ impl LocalScheduler {
break;
}
req_opt = receiver.recv_async() =>{
if let Ok(req) = req_opt {
req.await;
if let Ok(job) = req_opt {
job.await;
}
}
}
@@ -86,8 +86,8 @@ impl LocalScheduler {
// When task scheduler is cancelled, we will wait all task finished
if state_clone.load(Ordering::Relaxed) == STATE_AWAIT_TERMINATION {
// recv_async waits until all sender's been dropped.
while let Ok(req) = receiver.recv_async().await {
req.await;
while let Ok(job) = receiver.recv_async().await {
job.await;
}
state_clone.store(STATE_STOP, Ordering::Relaxed);
}
@@ -111,7 +111,7 @@ impl LocalScheduler {
#[async_trait::async_trait]
impl Scheduler for LocalScheduler {
fn schedule(&self, req: Job) -> Result<()> {
fn schedule(&self, job: Job) -> Result<()> {
ensure!(
self.state.load(Ordering::Relaxed) == STATE_RUNNING,
InvalidSchedulerStateSnafu
@@ -121,7 +121,7 @@ impl Scheduler for LocalScheduler {
.unwrap()
.as_ref()
.context(InvalidSchedulerStateSnafu)?
.send(req)
.send(job)
.map_err(|_| InvalidFlumeSenderSnafu {}.build())
}

View File

@@ -113,6 +113,11 @@ impl fmt::Debug for FileHandle {
}
impl FileHandle {
pub fn new(meta: FileMeta, file_purger: FilePurgerRef) -> FileHandle {
FileHandle {
inner: Arc::new(FileHandleInner::new(meta, file_purger)),
}
}
/// Returns the file id.
pub fn file_id(&self) -> FileId {
self.inner.meta.file_id
@@ -127,6 +132,12 @@ impl FileHandle {
pub fn time_range(&self) -> FileTimeRange {
self.inner.meta.time_range
}
/// Mark the file as deleted and will delete it on drop asynchronously
#[inline]
pub fn mark_deleted(&self) {
self.inner.deleted.store(true, Ordering::Relaxed);
}
}
/// Inner data of [FileHandle].
@@ -150,6 +161,17 @@ impl Drop for FileHandleInner {
}
}
impl FileHandleInner {
fn new(meta: FileMeta, file_purger: FilePurgerRef) -> FileHandleInner {
FileHandleInner {
meta,
compacting: AtomicBool::new(false),
deleted: AtomicBool::new(false),
file_purger,
}
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -14,8 +14,11 @@
use std::sync::Arc;
use common_telemetry::{error, info};
use store_api::storage::RegionId;
use crate::access_layer::AccessLayerRef;
use crate::schedule::scheduler::{LocalScheduler, Scheduler};
use crate::sst::file::FileId;
/// Request to remove a file.
@@ -35,11 +38,94 @@ pub trait FilePurger: Send + Sync {
pub type FilePurgerRef = Arc<dyn FilePurger>;
// TODO(yingwen): Remove this once we implement the real purger.
/// A purger that does nothing.
#[derive(Debug)]
struct NoopPurger {}
pub struct LocalFilePurger {
scheduler: Arc<LocalScheduler>,
impl FilePurger for NoopPurger {
fn send_request(&self, _request: PurgeRequest) {}
sst_layer: AccessLayerRef,
}
impl LocalFilePurger {
pub fn new(scheduler: Arc<LocalScheduler>, sst_layer: AccessLayerRef) -> Self {
Self {
scheduler,
sst_layer,
}
}
}
impl FilePurger for LocalFilePurger {
fn send_request(&self, request: PurgeRequest) {
let file_id = request.file_id;
let region_id = request.region_id;
let sst_layer = self.sst_layer.clone();
if let Err(e) = self.scheduler.schedule(Box::pin(async move {
if let Err(e) = sst_layer.delete_sst(file_id).await {
error!(e; "Failed to delete SST file, file: {}, region: {}",
file_id.as_parquet(), region_id);
} else {
info!(
"Successfully deleted SST file: {}, region: {}",
file_id.as_parquet(),
region_id
);
}
})) {
error!(e; "Failed to schedule the file purge request");
}
}
}
#[cfg(test)]
mod tests {
use common_test_util::temp_dir::create_temp_dir;
use object_store::services::Fs;
use object_store::{util, ObjectStore};
use super::*;
use crate::access_layer::AccessLayer;
use crate::schedule::scheduler::LocalScheduler;
use crate::sst::file::{FileHandle, FileId, FileMeta, FileTimeRange};
#[tokio::test]
async fn test_file_purge() {
common_telemetry::init_default_ut_logging();
let dir = create_temp_dir("file-purge");
let mut builder = Fs::default();
builder.root(dir.path().to_str().unwrap());
let object_store = ObjectStore::new(builder).unwrap().finish();
let sst_file_id = FileId::random();
let sst_dir = "table1";
let path = util::join_path(sst_dir, &sst_file_id.as_parquet());
object_store.write(&path, vec![0; 4096]).await.unwrap();
let scheduler = Arc::new(LocalScheduler::new(3));
let layer = Arc::new(AccessLayer::new(sst_dir, object_store.clone()));
let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer));
{
let handle = FileHandle::new(
FileMeta {
region_id: 0.into(),
file_id: sst_file_id,
time_range: FileTimeRange::default(),
level: 0,
file_size: 4096,
},
file_purger,
);
// mark file as deleted and drop the handle, we expect the file is deleted.
handle.mark_deleted();
}
scheduler.stop(true).await.unwrap();
assert!(!object_store
.is_exist(&format!("{}/{}", sst_dir, sst_file_id.as_parquet()))
.await
.unwrap());
}
}