feat!: improve region manifest service (#1268)

* feat: try to use batch delete in ManifestLogStorage

* feat: clean temp dir when startup with file backend

* refactor: export region manifest checkpoint actions magin and refactor storage options

* feat: purge unused manifest and checkpoint files by repeat gc task

* chore: debug deleted logs

* feat: adds RepeatedTask and refactor all gc tasks

* chore: clean code

* feat: export gc_duration to manifest config

* test: assert gc works

* fix: typo

* Update src/common/runtime/src/error.rs

Co-authored-by: LFC <bayinamine@gmail.com>

* Update src/common/runtime/src/repeated_task.rs

Co-authored-by: LFC <bayinamine@gmail.com>

* Update src/common/runtime/src/repeated_task.rs

Co-authored-by: LFC <bayinamine@gmail.com>

* fix: format

* Update src/common/runtime/src/repeated_task.rs

Co-authored-by: Yingwen <realevenyag@gmail.com>

* chore: by CR comments

* chore: by CR comments

* fix: serde default for StorageConfig

* chore: remove compaction config in StandaloneOptions

---------

Co-authored-by: LFC <bayinamine@gmail.com>
Co-authored-by: Yingwen <realevenyag@gmail.com>
This commit is contained in:
dennis zhuang
2023-03-31 10:42:00 +08:00
committed by GitHub
parent b71bb4e5fa
commit 563adbabe9
31 changed files with 686 additions and 190 deletions

2
Cargo.lock generated
View File

@@ -1730,6 +1730,7 @@ dependencies = [
name = "common-runtime"
version = "0.1.1"
dependencies = [
"async-trait",
"common-error",
"common-telemetry",
"metrics",
@@ -1738,6 +1739,7 @@ dependencies = [
"snafu",
"tokio",
"tokio-test",
"tokio-util",
]
[[package]]

View File

@@ -37,11 +37,19 @@ type = "File"
data_dir = "/tmp/greptimedb/data/"
# Compaction options, see `standalone.example.toml`.
[compaction]
[storage.compaction]
max_inflight_tasks = 4
max_files_in_level0 = 8
max_purge_tasks = 32
# Storage manifest options
[storage.manifest]
# Region checkpoint actions margin.
# Create a checkpoint every <checkpoint_margin> actions.
checkpoint_margin = 10
# Region manifest logs and checkpoints gc execution duration
gc_duration = '30s'
# Procedure storage options, see `standalone.example.toml`.
# [procedure.store]
# type = "File"

View File

@@ -99,7 +99,7 @@ type = "File"
data_dir = "/tmp/greptimedb/data/"
# Compaction options.
[compaction]
[storage.compaction]
# Max task number that can concurrently run.
max_inflight_tasks = 4
# Max files in level 0 to trigger compaction.
@@ -107,6 +107,15 @@ max_files_in_level0 = 8
# Max task number for SST purge task after compaction.
max_purge_tasks = 32
# Storage manifest options
[storage.manifest]
# Region checkpoint actions margin.
# Create a checkpoint every <checkpoint_margin> actions.
checkpoint_margin = 10
# Region manifest logs and checkpoints gc execution duration
gc_duration = '30s'
# Procedure storage options.
# Uncomment to enable.
# [procedure.store]

View File

@@ -146,7 +146,7 @@ impl TryFrom<StartCommand> for DatanodeOptions {
}
if let Some(data_dir) = cmd.data_dir {
opts.storage = ObjectStoreConfig::File(FileConfig { data_dir });
opts.storage.store = ObjectStoreConfig::File(FileConfig { data_dir });
}
if let Some(wal_dir) = cmd.wal_dir {
@@ -167,7 +167,7 @@ mod tests {
use std::time::Duration;
use common_test_util::temp_dir::create_named_temp_file;
use datanode::datanode::{CompactionConfig, ObjectStoreConfig};
use datanode::datanode::{CompactionConfig, ObjectStoreConfig, RegionManifestConfig};
use servers::Mode;
use super::*;
@@ -203,10 +203,14 @@ mod tests {
type = "File"
data_dir = "/tmp/greptimedb/data/"
[compaction]
max_inflight_tasks = 4
max_files_in_level0 = 8
[storage.compaction]
max_inflight_tasks = 3
max_files_in_level0 = 7
max_purge_tasks = 32
[storage.manifest]
checkpoint_margin = 9
gc_duration = '7s'
"#;
write!(file, "{}", toml_str).unwrap();
@@ -237,9 +241,9 @@ mod tests {
assert_eq!(3000, timeout_millis);
assert!(tcp_nodelay);
match options.storage {
ObjectStoreConfig::File(FileConfig { data_dir }) => {
assert_eq!("/tmp/greptimedb/data/".to_string(), data_dir)
match &options.storage.store {
ObjectStoreConfig::File(FileConfig { data_dir, .. }) => {
assert_eq!("/tmp/greptimedb/data/", data_dir)
}
ObjectStoreConfig::S3 { .. } => unreachable!(),
ObjectStoreConfig::Oss { .. } => unreachable!(),
@@ -247,11 +251,18 @@ mod tests {
assert_eq!(
CompactionConfig {
max_inflight_tasks: 4,
max_files_in_level0: 8,
max_inflight_tasks: 3,
max_files_in_level0: 7,
max_purge_tasks: 32,
},
options.compaction
options.storage.compaction,
);
assert_eq!(
RegionManifestConfig {
checkpoint_margin: Some(9),
gc_duration: Some(Duration::from_secs(7)),
},
options.storage.manifest,
);
}

View File

@@ -17,9 +17,7 @@ use std::sync::Arc;
use clap::Parser;
use common_base::Plugins;
use common_telemetry::info;
use datanode::datanode::{
CompactionConfig, Datanode, DatanodeOptions, ObjectStoreConfig, ProcedureConfig, WalConfig,
};
use datanode::datanode::{Datanode, DatanodeOptions, ProcedureConfig, StorageConfig, WalConfig};
use datanode::instance::InstanceRef;
use frontend::frontend::FrontendOptions;
use frontend::grpc::GrpcOptions;
@@ -82,8 +80,7 @@ pub struct StandaloneOptions {
pub prometheus_options: Option<PrometheusOptions>,
pub prom_options: Option<PromOptions>,
pub wal: WalConfig,
pub storage: ObjectStoreConfig,
pub compaction: CompactionConfig,
pub storage: StorageConfig,
pub procedure: Option<ProcedureConfig>,
}
@@ -101,8 +98,7 @@ impl Default for StandaloneOptions {
prometheus_options: Some(PrometheusOptions::default()),
prom_options: Some(PromOptions::default()),
wal: WalConfig::default(),
storage: ObjectStoreConfig::default(),
compaction: CompactionConfig::default(),
storage: StorageConfig::default(),
procedure: None,
}
}
@@ -129,7 +125,6 @@ impl StandaloneOptions {
enable_memory_catalog: self.enable_memory_catalog,
wal: self.wal,
storage: self.storage,
compaction: self.compaction,
procedure: self.procedure,
..Default::default()
}

View File

@@ -5,6 +5,7 @@ edition.workspace = true
license.workspace = true
[dependencies]
async-trait.workspace = true
common-error = { path = "../error" }
common-telemetry = { path = "../telemetry" }
metrics = "0.20"
@@ -12,6 +13,7 @@ once_cell = "1.12"
paste.workspace = true
snafu.workspace = true
tokio.workspace = true
tokio-util.workspace = true
[dev-dependencies]
tokio-test = "0.4"

View File

@@ -15,6 +15,7 @@
use std::any::Any;
use common_error::prelude::*;
use tokio::task::JoinError;
pub type Result<T> = std::result::Result<T, Error>;
@@ -26,6 +27,19 @@ pub enum Error {
source: std::io::Error,
backtrace: Backtrace,
},
#[snafu(display("Repeated task {} not started yet", name))]
IllegalState { name: String, backtrace: Backtrace },
#[snafu(display(
"Failed to wait for repeated task {} to stop, source: {}",
name,
source
))]
WaitGcTaskStop {
name: String,
source: JoinError,
backtrace: Backtrace,
},
}
impl ErrorExt for Error {

View File

@@ -15,6 +15,7 @@
pub mod error;
mod global;
pub mod metric;
mod repeated_task;
pub mod runtime;
pub use global::{
@@ -23,4 +24,5 @@ pub use global::{
spawn_read, spawn_write, write_runtime,
};
pub use crate::repeated_task::{RepeatedTask, TaskFunction, TaskFunctionRef};
pub use crate::runtime::{Builder, JoinError, JoinHandle, Runtime};

View File

@@ -0,0 +1,174 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use common_error::prelude::ErrorExt;
use common_telemetry::logging;
use snafu::{ensure, OptionExt, ResultExt};
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use crate::error::{IllegalStateSnafu, Result, WaitGcTaskStopSnafu};
use crate::Runtime;
#[async_trait::async_trait]
pub trait TaskFunction<E: ErrorExt> {
async fn call(&self) -> std::result::Result<(), E>;
fn name(&self) -> &str;
}
pub type TaskFunctionRef<E> = Arc<dyn TaskFunction<E> + Send + Sync>;
pub struct RepeatedTask<E> {
cancel_token: Mutex<Option<CancellationToken>>,
task_handle: Mutex<Option<JoinHandle<()>>>,
started: AtomicBool,
interval: Duration,
task_fn: TaskFunctionRef<E>,
}
impl<E: ErrorExt> std::fmt::Display for RepeatedTask<E> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "RepeatedTask({})", self.task_fn.name())
}
}
impl<E: ErrorExt> std::fmt::Debug for RepeatedTask<E> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("RepeatedTask")
.field(&self.task_fn.name())
.finish()
}
}
impl<E: ErrorExt + 'static> RepeatedTask<E> {
pub fn new(interval: Duration, task_fn: TaskFunctionRef<E>) -> Self {
Self {
cancel_token: Mutex::new(None),
task_handle: Mutex::new(None),
started: AtomicBool::new(false),
interval,
task_fn,
}
}
pub fn started(&self) -> bool {
self.started.load(Ordering::Relaxed)
}
pub async fn start(&self, runtime: Runtime) -> Result<()> {
let token = CancellationToken::new();
let interval = self.interval;
let child = token.child_token();
let task_fn = self.task_fn.clone();
// TODO(hl): Maybe spawn to a blocking runtime.
let handle = runtime.spawn(async move {
loop {
tokio::select! {
_ = tokio::time::sleep(interval) => {}
_ = child.cancelled() => {
return;
}
}
if let Err(e) = task_fn.call().await {
logging::error!(e; "Failed to run repeated task: {}", task_fn.name());
}
}
});
*self.cancel_token.lock().await = Some(token);
*self.task_handle.lock().await = Some(handle);
self.started.store(true, Ordering::Relaxed);
logging::info!(
"Repeated task {} started with interval: {:?}",
self.task_fn.name(),
self.interval
);
Ok(())
}
pub async fn stop(&self) -> Result<()> {
let name = self.task_fn.name();
ensure!(
self.started
.compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed)
.is_ok(),
IllegalStateSnafu { name }
);
let token = self
.cancel_token
.lock()
.await
.take()
.context(IllegalStateSnafu { name })?;
let handle = self
.task_handle
.lock()
.await
.take()
.context(IllegalStateSnafu { name })?;
token.cancel();
handle.await.context(WaitGcTaskStopSnafu { name })?;
logging::info!("Repeated task {} stopped", name);
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::sync::atomic::AtomicI32;
use super::*;
struct TickTask {
n: AtomicI32,
}
#[async_trait::async_trait]
impl TaskFunction<crate::error::Error> for TickTask {
fn name(&self) -> &str {
"test"
}
async fn call(&self) -> Result<()> {
self.n.fetch_add(1, Ordering::Relaxed);
Ok(())
}
}
#[tokio::test]
async fn test_repeated_task() {
common_telemetry::init_default_ut_logging();
let task_fn = Arc::new(TickTask {
n: AtomicI32::new(0),
});
let task = RepeatedTask::new(Duration::from_millis(100), task_fn.clone());
task.start(crate::bg_runtime()).await.unwrap();
tokio::time::sleep(Duration::from_millis(550)).await;
task.stop().await.unwrap();
assert_eq!(task_fn.n.load(Ordering::Relaxed), 5);
}
}

View File

@@ -29,6 +29,7 @@ use crate::server::Services;
pub const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize(1024);
/// Object storage config
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum ObjectStoreConfig {
@@ -37,6 +38,16 @@ pub enum ObjectStoreConfig {
Oss(OssConfig),
}
/// Storage engine config
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
#[serde(default)]
pub struct StorageConfig {
#[serde(flatten)]
pub store: ObjectStoreConfig,
pub compaction: CompactionConfig,
pub manifest: RegionManifestConfig,
}
#[derive(Debug, Clone, Serialize, Default, Deserialize)]
#[serde(default)]
pub struct FileConfig {
@@ -107,6 +118,27 @@ impl Default for WalConfig {
}
}
/// Options for region manifest
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
#[serde(default)]
pub struct RegionManifestConfig {
/// Region manifest checkpoint actions margin.
/// Manifest service create a checkpoint every [checkpoint_margin] actions.
pub checkpoint_margin: Option<u16>,
/// Region manifest logs and checkpoints gc task execution duration.
#[serde(with = "humantime_serde")]
pub gc_duration: Option<Duration>,
}
impl Default for RegionManifestConfig {
fn default() -> Self {
Self {
checkpoint_margin: Some(10u16),
gc_duration: Some(Duration::from_secs(30)),
}
}
}
/// Options for table compaction
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
#[serde(default)]
@@ -132,7 +164,7 @@ impl Default for CompactionConfig {
impl From<&DatanodeOptions> for SchedulerConfig {
fn from(value: &DatanodeOptions) -> Self {
Self {
max_inflight_tasks: value.compaction.max_inflight_tasks,
max_inflight_tasks: value.storage.compaction.max_inflight_tasks,
}
}
}
@@ -140,8 +172,10 @@ impl From<&DatanodeOptions> for SchedulerConfig {
impl From<&DatanodeOptions> for StorageEngineConfig {
fn from(value: &DatanodeOptions) -> Self {
Self {
max_files_in_l0: value.compaction.max_files_in_level0,
max_purge_tasks: value.compaction.max_purge_tasks,
manifest_checkpoint_margin: value.storage.manifest.checkpoint_margin,
manifest_gc_duration: value.storage.manifest.gc_duration,
max_files_in_l0: value.storage.compaction.max_files_in_level0,
max_purge_tasks: value.storage.compaction.max_purge_tasks,
}
}
}
@@ -192,8 +226,7 @@ pub struct DatanodeOptions {
pub mysql_runtime_size: usize,
pub meta_client_options: Option<MetaClientOptions>,
pub wal: WalConfig,
pub storage: ObjectStoreConfig,
pub compaction: CompactionConfig,
pub storage: StorageConfig,
pub procedure: Option<ProcedureConfig>,
}
@@ -210,8 +243,7 @@ impl Default for DatanodeOptions {
mysql_runtime_size: 2,
meta_client_options: None,
wal: WalConfig::default(),
storage: ObjectStoreConfig::default(),
compaction: CompactionConfig::default(),
storage: StorageConfig::default(),
procedure: None,
}
}

View File

@@ -181,6 +181,9 @@ pub enum Error {
#[snafu(display("Failed to create directory {}, source: {}", dir, source))]
CreateDir { dir: String, source: std::io::Error },
#[snafu(display("Failed to remove directory {}, source: {}", dir, source))]
RemoveDir { dir: String, source: std::io::Error },
#[snafu(display("Failed to open log store, source: {}", source))]
OpenLogStore {
#[snafu(backtrace)]
@@ -576,6 +579,7 @@ impl ErrorExt for Error {
| TcpBind { .. }
| StartGrpc { .. }
| CreateDir { .. }
| RemoveDir { .. }
| InsertSystemCatalog { .. }
| RenameTable { .. }
| Catalog { .. }

View File

@@ -104,7 +104,7 @@ impl Instance {
meta_client: Option<Arc<MetaClient>>,
compaction_scheduler: CompactionSchedulerRef<RaftEngineLogStore>,
) -> Result<Self> {
let object_store = new_object_store(&opts.storage).await?;
let object_store = new_object_store(&opts.storage.store).await?;
let log_store = Arc::new(create_log_store(&opts.wal).await?);
let table_engine = Arc::new(DefaultEngine::new(
@@ -430,15 +430,27 @@ pub(crate) async fn new_fs_object_store(store_config: &ObjectStoreConfig) -> Res
info!("The file storage directory is: {}", &data_dir);
let atomic_write_dir = format!("{data_dir}/.tmp/");
if path::Path::new(&atomic_write_dir).exists() {
info!(
"Begin to clean temp storage directory: {}",
&atomic_write_dir
);
fs::remove_dir_all(&atomic_write_dir).context(error::RemoveDirSnafu {
dir: &atomic_write_dir,
})?;
info!("Cleaned temp storage directory: {}", &atomic_write_dir);
}
let mut builder = FsBuilder::default();
builder.root(&data_dir).atomic_write_dir(&atomic_write_dir);
Ok(ObjectStore::new(builder)
let object_store = ObjectStore::new(builder)
.context(error::InitBackendSnafu {
config: store_config.clone(),
})?
.finish())
.finish();
Ok(object_store)
}
/// Create metasrv client instance and spawn heartbeat loop.

View File

@@ -32,7 +32,9 @@ use sql::statements::tql::Tql;
use table::engine::{EngineContext, TableEngineRef};
use table::requests::{CreateTableRequest, TableOptions};
use crate::datanode::{DatanodeOptions, FileConfig, ObjectStoreConfig, ProcedureConfig, WalConfig};
use crate::datanode::{
DatanodeOptions, FileConfig, ObjectStoreConfig, ProcedureConfig, StorageConfig, WalConfig,
};
use crate::error::{CreateTableSnafu, Result};
use crate::instance::Instance;
use crate::sql::SqlHandler;
@@ -130,9 +132,12 @@ fn create_tmp_dir_and_datanode_opts(name: &str) -> (DatanodeOptions, TestGuard)
dir: wal_tmp_dir.path().to_str().unwrap().to_string(),
..Default::default()
},
storage: ObjectStoreConfig::File(FileConfig {
data_dir: data_tmp_dir.path().to_str().unwrap().to_string(),
}),
storage: StorageConfig {
store: ObjectStoreConfig::File(FileConfig {
data_dir: data_tmp_dir.path().to_str().unwrap().to_string(),
}),
..Default::default()
},
mode: Mode::Standalone,
..Default::default()
};

View File

@@ -24,7 +24,9 @@ use client::Client;
use common_grpc::channel_manager::ChannelManager;
use common_runtime::Builder as RuntimeBuilder;
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use datanode::datanode::{DatanodeOptions, FileConfig, ObjectStoreConfig, WalConfig};
use datanode::datanode::{
DatanodeOptions, FileConfig, ObjectStoreConfig, StorageConfig, WalConfig,
};
use datanode::instance::Instance as DatanodeInstance;
use meta_client::client::MetaClientBuilder;
use meta_client::rpc::Peer;
@@ -98,9 +100,12 @@ fn create_tmp_dir_and_datanode_opts(name: &str) -> (DatanodeOptions, TestGuard)
dir: wal_tmp_dir.path().to_str().unwrap().to_string(),
..Default::default()
},
storage: ObjectStoreConfig::File(FileConfig {
data_dir: data_tmp_dir.path().to_str().unwrap().to_string(),
}),
storage: StorageConfig {
store: ObjectStoreConfig::File(FileConfig {
data_dir: data_tmp_dir.path().to_str().unwrap().to_string(),
}),
..Default::default()
},
mode: Mode::Standalone,
..Default::default()
};
@@ -185,9 +190,12 @@ async fn create_distributed_datanode(
dir: wal_tmp_dir.path().to_str().unwrap().to_string(),
..Default::default()
},
storage: ObjectStoreConfig::File(FileConfig {
data_dir: data_tmp_dir.path().to_str().unwrap().to_string(),
}),
storage: StorageConfig {
store: ObjectStoreConfig::File(FileConfig {
data_dir: data_tmp_dir.path().to_str().unwrap().to_string(),
}),
..Default::default()
},
mode: Mode::Distributed,
..Default::default()
};

View File

@@ -15,16 +15,22 @@
use std::any::Any;
use common_error::prelude::{ErrorExt, Snafu};
use common_runtime::error::Error as RuntimeError;
use snafu::{Backtrace, ErrorCompat};
use tokio::task::JoinError;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Failed to wait for gc task to stop, source: {}", source))]
WaitGcTaskStop {
source: JoinError,
backtrace: Backtrace,
#[snafu(display("Failed to start log store gc task, source: {}", source))]
StartGcTask {
#[snafu(backtrace)]
source: RuntimeError,
},
#[snafu(display("Failed to stop log store gc task, source: {}", source))]
StopGcTask {
#[snafu(backtrace)]
source: RuntimeError,
},
#[snafu(display("Failed to add entry to LogBatch, source: {}", source))]

View File

@@ -13,25 +13,22 @@
// limitations under the License.
use std::fmt::{Debug, Formatter};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use async_stream::stream;
use common_runtime::{RepeatedTask, TaskFunction};
use common_telemetry::{error, info};
use raft_engine::{Config, Engine, LogBatch, MessageExt, ReadableSize, RecoveryMode};
use snafu::{ensure, OptionExt, ResultExt};
use snafu::{ensure, ResultExt};
use store_api::logstore::entry::Id;
use store_api::logstore::entry_stream::SendableEntryStream;
use store_api::logstore::namespace::Namespace as NamespaceTrait;
use store_api::logstore::{AppendResponse, LogStore};
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use crate::config::LogConfig;
use crate::error::{
AddEntryLogBatchSnafu, Error, FetchEntrySnafu, IllegalNamespaceSnafu, IllegalStateSnafu,
RaftEngineSnafu, WaitGcTaskStopSnafu,
RaftEngineSnafu, StartGcTaskSnafu, StopGcTaskSnafu,
};
use crate::raft_engine::protos::logstore::{EntryImpl as Entry, NamespaceImpl as Namespace};
@@ -41,9 +38,36 @@ const SYSTEM_NAMESPACE: u64 = 0;
pub struct RaftEngineLogStore {
config: LogConfig,
engine: Arc<Engine>,
cancel_token: Mutex<Option<CancellationToken>>,
gc_task_handle: Mutex<Option<JoinHandle<()>>>,
started: AtomicBool,
gc_task: RepeatedTask<Error>,
}
pub struct PurgeExpiredFilesFunction {
engine: Arc<Engine>,
}
#[async_trait::async_trait]
impl TaskFunction<Error> for PurgeExpiredFilesFunction {
fn name(&self) -> &str {
"RaftEngineLogStore-gc-task"
}
async fn call(&self) -> Result<(), Error> {
match self.engine.purge_expired_files().context(RaftEngineSnafu) {
Ok(res) => {
// TODO(hl): the retval of purge_expired_files indicates the namespaces need to be compact,
// which is useful when monitoring regions failed to flush it's memtable to SSTs.
info!(
"Successfully purged logstore files, namespaces need compaction: {:?}",
res
);
}
Err(e) => {
error!(e; "Failed to purge files in logstore");
}
}
Ok(())
}
}
impl RaftEngineLogStore {
@@ -58,56 +82,31 @@ impl RaftEngineLogStore {
..Default::default()
};
let engine = Arc::new(Engine::open(raft_engine_config).context(RaftEngineSnafu)?);
let gc_task = RepeatedTask::new(
config.purge_interval,
Arc::new(PurgeExpiredFilesFunction {
engine: engine.clone(),
}),
);
let log_store = Self {
config,
engine,
cancel_token: Mutex::new(None),
gc_task_handle: Mutex::new(None),
started: AtomicBool::new(false),
gc_task,
};
log_store.start().await?;
Ok(log_store)
}
pub fn started(&self) -> bool {
self.started.load(Ordering::Relaxed)
self.gc_task.started()
}
async fn start(&self) -> Result<(), Error> {
let engine_clone = self.engine.clone();
let interval = self.config.purge_interval;
let token = CancellationToken::new();
let child = token.child_token();
// TODO(hl): Maybe spawn to a blocking runtime.
let handle = common_runtime::spawn_bg(async move {
loop {
tokio::select! {
_ = tokio::time::sleep(interval) => {}
_ = child.cancelled() => {
info!("LogStore gc task has been cancelled");
return;
}
}
match engine_clone.purge_expired_files().context(RaftEngineSnafu) {
Ok(res) => {
// TODO(hl): the retval of purge_expired_files indicates the namespaces need to be compact,
// which is useful when monitoring regions failed to flush it's memtable to SSTs.
info!(
"Successfully purged logstore files, namespaces need compaction: {:?}",
res
);
}
Err(e) => {
error!(e; "Failed to purge files in logstore");
}
}
}
});
*self.cancel_token.lock().await = Some(token);
*self.gc_task_handle.lock().await = Some(handle);
self.started.store(true, Ordering::Relaxed);
info!("RaftEngineLogStore started with config: {:?}", self.config);
Ok(())
self.gc_task
.start(common_runtime::bg_runtime())
.await
.context(StartGcTaskSnafu)
}
}
@@ -115,7 +114,7 @@ impl Debug for RaftEngineLogStore {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RaftEngineLogsStore")
.field("config", &self.config)
.field("started", &self.started.load(Ordering::Relaxed))
.field("started", &self.gc_task.started())
.finish()
}
}
@@ -127,28 +126,7 @@ impl LogStore for RaftEngineLogStore {
type Entry = Entry;
async fn stop(&self) -> Result<(), Self::Error> {
ensure!(
self.started
.compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed)
.is_ok(),
IllegalStateSnafu
);
let handle = self
.gc_task_handle
.lock()
.await
.take()
.context(IllegalStateSnafu)?;
let token = self
.cancel_token
.lock()
.await
.take()
.context(IllegalStateSnafu)?;
token.cancel();
handle.await.context(WaitGcTaskStopSnafu)?;
info!("RaftEngineLogStore stopped");
Ok(())
self.gc_task.stop().await.context(StopGcTaskSnafu)
}
/// Append an entry to logstore. Currently of existence of entry's namespace is not checked.

View File

@@ -80,7 +80,7 @@ mod tests {
async fn test_table_manifest() {
let (_dir, object_store) = test_util::new_test_object_store("test_table_manifest").await;
let manifest = TableManifest::new("manifest/", object_store, None);
let manifest = TableManifest::create("manifest/", object_store);
let mut iter = manifest.scan(0, 100).await.unwrap();
assert!(iter.next_action().await.unwrap().is_none());

View File

@@ -471,7 +471,7 @@ impl<R: Region> MitoTable<R> {
regions: HashMap<RegionNumber, R>,
object_store: ObjectStore,
) -> Result<MitoTable<R>> {
let manifest = TableManifest::new(&table_manifest_dir(table_dir), object_store, None);
let manifest = TableManifest::create(&table_manifest_dir(table_dir), object_store);
// TODO(dennis): save manifest version into catalog?
let _manifest_version = manifest
@@ -487,7 +487,7 @@ impl<R: Region> MitoTable<R> {
}
pub(crate) fn build_manifest(table_dir: &str, object_store: ObjectStore) -> TableManifest {
TableManifest::new(&table_manifest_dir(table_dir), object_store, None)
TableManifest::create(&table_manifest_dir(table_dir), object_store)
}
pub(crate) async fn recover_table_info(

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub use opendal::raw::normalize_path as raw_normalize_path;
pub use opendal::raw::oio::Pager;
pub use opendal::{
layers, services, Builder as ObjectStoreBuilder, Entry, EntryMode, Error, ErrorKind, Metakey,

View File

@@ -14,8 +14,12 @@
//! storage engine config
use std::time::Duration;
#[derive(Debug, Clone)]
pub struct EngineConfig {
pub manifest_checkpoint_margin: Option<u16>,
pub manifest_gc_duration: Option<Duration>,
pub max_files_in_l0: usize,
pub max_purge_tasks: usize,
}
@@ -23,6 +27,8 @@ pub struct EngineConfig {
impl Default for EngineConfig {
fn default() -> Self {
Self {
manifest_checkpoint_margin: Some(10),
manifest_gc_duration: Some(Duration::from_secs(30)),
max_files_in_l0: 8,
max_purge_tasks: 32,
}

View File

@@ -21,6 +21,7 @@ use common_telemetry::logging::info;
use object_store::{util, ObjectStore};
use snafu::ResultExt;
use store_api::logstore::LogStore;
use store_api::manifest::Manifest;
use store_api::storage::{
CreateOptions, EngineContext, OpenOptions, Region, RegionDescriptor, StorageEngine,
};
@@ -290,8 +291,15 @@ impl<S: LogStore> EngineInner<S> {
let mut guard = SlotGuard::new(name, &self.regions);
let store_config =
self.region_store_config(&opts.parent_dir, opts.write_buffer_size, name, opts.ttl);
let store_config = self
.region_store_config(
&opts.parent_dir,
opts.write_buffer_size,
name,
&self.config,
opts.ttl,
)
.await?;
let region = match RegionImpl::open(name.to_string(), store_config, opts).await? {
None => return Ok(None),
@@ -321,12 +329,15 @@ impl<S: LogStore> EngineInner<S> {
.context(error::InvalidRegionDescSnafu {
region: &region_name,
})?;
let store_config = self.region_store_config(
&opts.parent_dir,
opts.write_buffer_size,
&region_name,
opts.ttl,
);
let store_config = self
.region_store_config(
&opts.parent_dir,
opts.write_buffer_size,
&region_name,
&self.config,
opts.ttl,
)
.await?;
let region = RegionImpl::create(metadata, store_config).await?;
@@ -342,25 +353,32 @@ impl<S: LogStore> EngineInner<S> {
slot.get_ready_region()
}
fn region_store_config(
async fn region_store_config(
&self,
parent_dir: &str,
write_buffer_size: Option<usize>,
region_name: &str,
config: &EngineConfig,
ttl: Option<Duration>,
) -> StoreConfig<S> {
) -> Result<StoreConfig<S>> {
let parent_dir = util::normalize_dir(parent_dir);
let sst_dir = &region_sst_dir(&parent_dir, region_name);
let sst_layer = Arc::new(FsAccessLayer::new(sst_dir, self.object_store.clone()));
let manifest_dir = region_manifest_dir(&parent_dir, region_name);
let manifest = RegionManifest::with_checkpointer(&manifest_dir, self.object_store.clone());
let manifest = RegionManifest::with_checkpointer(
&manifest_dir,
self.object_store.clone(),
config.manifest_checkpoint_margin,
config.manifest_gc_duration,
);
manifest.start().await?;
let flush_strategy = write_buffer_size
.map(|size| Arc::new(SizeBasedStrategy::new(size)) as Arc<_>)
.unwrap_or_else(|| self.flush_strategy.clone());
StoreConfig {
Ok(StoreConfig {
log_store: self.log_store.clone(),
sst_layer,
manifest,
@@ -371,7 +389,7 @@ impl<S: LogStore> EngineInner<S> {
engine_config: self.config.clone(),
file_purger: self.file_purger.clone(),
ttl,
}
})
}
}

View File

@@ -17,6 +17,7 @@ use std::io::Error as IoError;
use std::str::Utf8Error;
use common_error::prelude::*;
use common_runtime::error::Error as RuntimeError;
use datatypes::arrow::error::ArrowError;
use datatypes::prelude::ConcreteDataType;
use serde_json::error::Error as JsonError;
@@ -423,6 +424,18 @@ pub enum Error {
#[snafu(display("Cannot schedule request, scheduler's already stopped"))]
IllegalSchedulerState { backtrace: Backtrace },
#[snafu(display("Failed to start manifest gc task: {}", source))]
StartManifestGcTask {
#[snafu(backtrace)]
source: RuntimeError,
},
#[snafu(display("Failed to stop manifest gc task: {}", source))]
StopManifestGcTask {
#[snafu(backtrace)]
source: RuntimeError,
},
#[snafu(display("Failed to stop scheduler, source: {}", source))]
StopScheduler {
source: JoinError,
@@ -524,7 +537,11 @@ impl ErrorExt for Error {
StatusCode::Internal
}
DeleteSst { .. } => StatusCode::StorageUnavailable,
IllegalSchedulerState { .. } => StatusCode::Unexpected,
StartManifestGcTask { .. }
| StopManifestGcTask { .. }
| IllegalSchedulerState { .. } => StatusCode::Unexpected,
TtlCalculation { source, .. } => source.status_code(),
}
}

View File

@@ -15,43 +15,72 @@
use std::marker::PhantomData;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use arc_swap::ArcSwap;
use async_trait::async_trait;
use common_runtime::{RepeatedTask, TaskFunction};
use common_telemetry::{debug, logging, warn};
use object_store::ObjectStore;
use snafu::ensure;
use snafu::{ensure, ResultExt};
use store_api::manifest::action::{self, ProtocolAction, ProtocolVersion};
use store_api::manifest::*;
use crate::error::{Error, ManifestProtocolForbidWriteSnafu, Result};
use crate::error::{
Error, ManifestProtocolForbidWriteSnafu, Result, StartManifestGcTaskSnafu,
StopManifestGcTaskSnafu,
};
use crate::manifest::action::RegionCheckpoint;
use crate::manifest::checkpoint::Checkpointer;
use crate::manifest::storage::{ManifestObjectStore, ObjectStoreLogIterator};
// TODO(dennis): export this option to table options or storage options.
const CHECKPOINT_ACTIONS_MARGIN: u64 = 10;
const CHECKPOINT_ACTIONS_MARGIN: u16 = 10;
const GC_DURATION_SECS: u64 = 30;
#[derive(Clone, Debug)]
pub struct ManifestImpl<S: Checkpoint<Error = Error>, M: MetaAction<Error = Error>> {
inner: Arc<ManifestImplInner<S, M>>,
checkpointer: Option<Arc<dyn Checkpointer<Checkpoint = S, MetaAction = M>>>,
last_checkpoint_version: Arc<AtomicU64>,
checkpoint_actions_margin: u16,
gc_task: Option<Arc<RepeatedTask<Error>>>,
}
impl<S: Checkpoint<Error = Error>, M: MetaAction<Error = Error>> ManifestImpl<S, M> {
impl<S: 'static + Checkpoint<Error = Error>, M: 'static + MetaAction<Error = Error>>
ManifestImpl<S, M>
{
pub fn new(
manifest_dir: &str,
object_store: ObjectStore,
checkpoint_actions_margin: Option<u16>,
gc_duration: Option<Duration>,
checkpointer: Option<Arc<dyn Checkpointer<Checkpoint = S, MetaAction = M>>>,
) -> Self {
let inner = Arc::new(ManifestImplInner::new(manifest_dir, object_store));
let gc_task = if checkpointer.is_some() {
// only start gc task when checkpoint is enabled.
Some(Arc::new(RepeatedTask::new(
gc_duration.unwrap_or_else(|| Duration::from_secs(GC_DURATION_SECS)),
inner.clone() as _,
)))
} else {
None
};
ManifestImpl {
inner: Arc::new(ManifestImplInner::new(manifest_dir, object_store)),
inner,
checkpointer,
checkpoint_actions_margin: checkpoint_actions_margin
.unwrap_or(CHECKPOINT_ACTIONS_MARGIN),
last_checkpoint_version: Arc::new(AtomicU64::new(MIN_VERSION)),
gc_task,
}
}
pub fn create(manifest_dir: &str, object_store: ObjectStore) -> Self {
Self::new(manifest_dir, object_store, None, None, None)
}
pub fn checkpointer(&self) -> &Option<Arc<dyn Checkpointer<Checkpoint = S, MetaAction = M>>> {
&self.checkpointer
}
@@ -100,7 +129,7 @@ impl<S: 'static + Checkpoint<Error = Error>, M: 'static + MetaAction<Error = Err
async fn update(&self, action_list: M) -> Result<ManifestVersion> {
let version = self.inner.save(action_list).await?;
if version - self.last_checkpoint_version.load(Ordering::Relaxed)
>= CHECKPOINT_ACTIONS_MARGIN
>= self.checkpoint_actions_margin as u64
{
let s = self.do_checkpoint().await?;
debug!("Manifest checkpoint, checkpoint: {:#?}", s);
@@ -135,6 +164,24 @@ impl<S: 'static + Checkpoint<Error = Error>, M: 'static + MetaAction<Error = Err
fn last_version(&self) -> ManifestVersion {
self.inner.last_version()
}
async fn start(&self) -> Result<()> {
if let Some(task) = &self.gc_task {
task.start(common_runtime::bg_runtime())
.await
.context(StartManifestGcTaskSnafu)?;
}
Ok(())
}
async fn stop(&self) -> Result<()> {
if let Some(task) = &self.gc_task {
task.stop().await.context(StopManifestGcTaskSnafu)?;
}
Ok(())
}
}
#[derive(Debug)]
@@ -183,6 +230,29 @@ impl<M: MetaAction<Error = Error>> MetaActionIterator for MetaActionIteratorImpl
}
}
#[async_trait::async_trait]
impl<S: Checkpoint<Error = Error>, M: MetaAction<Error = Error>> TaskFunction<Error>
for ManifestImplInner<S, M>
{
fn name(&self) -> &str {
"region-manifest-gc"
}
async fn call(&self) -> Result<()> {
if let Some((last_version, _)) = self.store.load_last_checkpoint().await? {
// Purge all manifest and checkpoint files before last_version.
let deleted = self.store.delete_until(last_version).await?;
debug!(
"Deleted {} logs from region manifest storage(path={}).",
deleted,
self.store.path()
);
}
Ok(())
}
}
impl<S: Checkpoint<Error = Error>, M: MetaAction<Error = Error>> ManifestImplInner<S, M> {
fn new(manifest_dir: &str, object_store: ObjectStore) -> Self {
let (reader_version, writer_version) = action::supported_protocol_version();
@@ -283,8 +353,8 @@ impl<S: Checkpoint<Error = Error>, M: MetaAction<Error = Error>> ManifestImplInn
// It happens when saving checkpoint successfully, but failed at saving checkpoint metadata(the "__last_checkpoint" file).
// Then we try to use the old checkpoint and do the checkpoint next time.
// If the old checkpoint was deleted, it's fine that we return the latest checkpoint.
// the only side effect is leaving some unused checkpoint checkpoint files.
// TODO(dennis): delete unused checkpoint files
// the only side effect is leaving some unused checkpoint checkpoint files,
// and they will be purged by gc task.
warn!("The checkpoint manifest version {} in {} is greater than checkpoint metadata version {}.", self.store.path(), checkpoint.last_version(), version);
if let Some((_, bytes)) = self.store.load_checkpoint(version).await? {

View File

@@ -16,6 +16,7 @@
use std::any::Any;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use common_telemetry::info;
@@ -115,17 +116,10 @@ impl Checkpointer for RegionManifestCheckpointer {
};
manifest.save_checkpoint(&checkpoint).await?;
// TODO(dennis): background task to clean old manifest actions and checkpoints.
manifest
.manifest_store()
.delete(start_version, last_version + 1)
.await?;
if start_version > MIN_VERSION {
manifest
.manifest_store()
.delete_checkpoint(start_version - 1)
.await?
}
info!("Region manifest checkpoint, start_version: {}, last_version: {}, compacted actions: {}", start_version, last_version, compacted_actions);
@@ -138,10 +132,17 @@ impl Checkpointer for RegionManifestCheckpointer {
}
impl RegionManifest {
pub fn with_checkpointer(manifest_dir: &str, object_store: ObjectStore) -> Self {
pub fn with_checkpointer(
manifest_dir: &str,
object_store: ObjectStore,
checkpoint_actions_margin: Option<u16>,
gc_duration: Option<Duration>,
) -> Self {
Self::new(
manifest_dir,
object_store,
checkpoint_actions_margin,
gc_duration,
Some(Arc::new(RegionManifestCheckpointer {
flushed_manifest_version: AtomicU64::new(0),
})),
@@ -184,7 +185,8 @@ mod tests {
builder.root(&tmp_dir.path().to_string_lossy());
let object_store = ObjectStore::new(builder).unwrap().finish();
let manifest = RegionManifest::with_checkpointer("/manifest/", object_store);
let manifest = RegionManifest::with_checkpointer("/manifest/", object_store, None, None);
manifest.start().await.unwrap();
let region_meta = Arc::new(build_region_meta());
@@ -274,6 +276,8 @@ mod tests {
// Reach end
assert!(iter.next_action().await.unwrap().is_none());
manifest.stop().await.unwrap();
}
async fn assert_scan(manifest: &RegionManifest, start_version: ManifestVersion, expected: u64) {
@@ -294,7 +298,13 @@ mod tests {
builder.root(&tmp_dir.path().to_string_lossy());
let object_store = ObjectStore::new(builder).unwrap().finish();
let manifest = RegionManifest::with_checkpointer("/manifest/", object_store);
let manifest = RegionManifest::with_checkpointer(
"/manifest/",
object_store,
None,
Some(Duration::from_millis(50)),
);
manifest.start().await.unwrap();
let region_meta = Arc::new(build_region_meta());
let new_region_meta = Arc::new(build_altered_region_meta());
@@ -325,12 +335,15 @@ mod tests {
// update flushed manifest version for doing checkpoint
manifest.set_flushed_manifest_version(2);
let mut checkpoint_versions = vec![];
// do a checkpoint
let checkpoint = manifest.do_checkpoint().await.unwrap().unwrap();
let last_checkpoint = manifest.last_checkpoint().await.unwrap().unwrap();
assert_eq!(checkpoint, last_checkpoint);
assert_eq!(checkpoint.compacted_actions, 3);
assert_eq!(checkpoint.last_version, 2);
checkpoint_versions.push(2);
let alterd_raw_meta = RawRegionMetadata::from(new_region_meta.as_ref());
assert!(matches!(&checkpoint.checkpoint, Some(RegionManifestData {
committed_sequence: 99,
@@ -376,8 +389,8 @@ mod tests {
}
assert_scan(&manifest, 3, 2).await;
// do another checkpoints
// do another checkpoints
// compacted RegionChange
manifest.set_flushed_manifest_version(3);
let checkpoint = manifest.do_checkpoint().await.unwrap().unwrap();
@@ -385,6 +398,7 @@ mod tests {
assert_eq!(checkpoint, last_checkpoint);
assert_eq!(checkpoint.compacted_actions, 1);
assert_eq!(checkpoint.last_version, 3);
checkpoint_versions.push(3);
assert!(matches!(&checkpoint.checkpoint, Some(RegionManifestData {
committed_sequence: 200,
metadata,
@@ -406,6 +420,7 @@ mod tests {
assert_eq!(checkpoint, last_checkpoint);
assert_eq!(checkpoint.compacted_actions, 1);
assert_eq!(checkpoint.last_version, 4);
checkpoint_versions.push(4);
assert!(matches!(&checkpoint.checkpoint, Some(RegionManifestData {
committed_sequence: 200,
metadata,
@@ -427,5 +442,26 @@ mod tests {
.await
.unwrap()
.is_none());
// wait for gc
tokio::time::sleep(Duration::from_millis(60)).await;
for v in checkpoint_versions {
if v < 4 {
// ensure old checkpoints were purged.
assert!(manifest
.manifest_store()
.load_checkpoint(v)
.await
.unwrap()
.is_none());
} else {
// the last checkpoints is still exists.
let last_checkpoint = manifest.last_checkpoint().await.unwrap().unwrap();
assert_eq!(checkpoint, last_checkpoint);
}
}
manifest.stop().await.unwrap();
}
}

View File

@@ -19,7 +19,7 @@ use async_trait::async_trait;
use common_telemetry::logging;
use futures::TryStreamExt;
use lazy_static::lazy_static;
use object_store::{util, Entry, ErrorKind, ObjectStore};
use object_store::{raw_normalize_path, util, Entry, ErrorKind, ObjectStore};
use regex::Regex;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
@@ -31,7 +31,8 @@ use crate::error::{
};
lazy_static! {
static ref RE: Regex = Regex::new("^\\d+\\.json$").unwrap();
static ref DELTA_RE: Regex = Regex::new("^\\d+\\.json$").unwrap();
static ref CHECKPOINT_RE: Regex = Regex::new("^\\d+\\.checkpoint").unwrap();
}
const LAST_CHECKPOINT_FILE: &str = "_last_checkpoint";
@@ -46,20 +47,24 @@ pub fn checkpoint_file(version: ManifestVersion) -> String {
format!("{version:020}.checkpoint")
}
/// Return's the delta file version from path
/// Return's the file manifest version from path
///
/// # Panics
/// Panics if the file path is not a valid delta file.
/// Panics if the file path is not a valid delta or checkpoint file.
#[inline]
pub fn delta_version(path: &str) -> ManifestVersion {
pub fn file_version(path: &str) -> ManifestVersion {
let s = path.split('.').next().unwrap();
s.parse()
.unwrap_or_else(|_| panic!("Invalid delta file: {path}"))
s.parse().unwrap_or_else(|_| panic!("Invalid file: {path}"))
}
#[inline]
pub fn is_delta_file(file_name: &str) -> bool {
RE.is_match(file_name)
DELTA_RE.is_match(file_name)
}
#[inline]
pub fn is_checkpoint_file(file_name: &str) -> bool {
CHECKPOINT_RE.is_match(file_name)
}
pub struct ObjectStoreLogIterator {
@@ -162,7 +167,7 @@ impl ManifestLogStorage for ManifestObjectStore {
.try_filter_map(|e| async move {
let file_name = e.name();
if is_delta_file(file_name) {
let version = delta_version(file_name);
let version = file_version(file_name);
if version >= start && version < end {
Ok(Some((version, e)))
} else {
@@ -184,6 +189,48 @@ impl ManifestLogStorage for ManifestObjectStore {
})
}
async fn delete_until(&self, end: ManifestVersion) -> Result<usize> {
let streamer = self
.object_store
.list(&self.path)
.await
.context(ListObjectsSnafu { path: &self.path })?;
let paths: Vec<_> = streamer
.try_filter_map(|e| async move {
let file_name = e.name();
if is_delta_file(file_name) || is_checkpoint_file(file_name) {
let version = file_version(file_name);
if version < end {
Ok(Some(e.path().to_string()))
} else {
Ok(None)
}
} else {
Ok(None)
}
})
.try_collect::<Vec<_>>()
.await
.context(ListObjectsSnafu { path: &self.path })?;
let ret = paths.len();
logging::debug!(
"Deleting {} logs from manifest storage path {}.",
ret,
self.path
);
self.object_store
.remove(paths)
.await
.with_context(|_| DeleteObjectSnafu {
path: self.path.clone(),
})?;
Ok(ret)
}
async fn save(&self, version: ManifestVersion, bytes: &[u8]) -> Result<()> {
let path = self.delta_file_path(version);
self.object_store
@@ -193,14 +240,22 @@ impl ManifestLogStorage for ManifestObjectStore {
}
async fn delete(&self, start: ManifestVersion, end: ManifestVersion) -> Result<()> {
//TODO(dennis): delete in batch or concurrently?
for v in start..end {
let path = self.delta_file_path(v);
self.object_store
.delete(&path)
.await
.context(DeleteObjectSnafu { path })?;
}
let raw_paths = (start..end)
.map(|v| self.delta_file_path(v))
.collect::<Vec<_>>();
let paths = raw_paths
.iter()
.map(|p| raw_normalize_path(p))
.collect::<Vec<_>>();
self.object_store
.remove(paths)
.await
.with_context(|_| DeleteObjectSnafu {
path: raw_paths.join(","),
})?;
Ok(())
}
@@ -243,13 +298,11 @@ impl ManifestLogStorage for ManifestObjectStore {
version: ManifestVersion,
) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
let path = self.checkpoint_file_path(version);
let checkpoint = self
.object_store
.read(&path)
.await
.context(ReadObjectSnafu { path })?;
Ok(Some((version, checkpoint)))
match self.object_store.read(&path).await {
Ok(checkpoint) => Ok(Some((version, checkpoint))),
Err(e) if e.kind() == ErrorKind::NotFound => Ok(None),
Err(e) => Err(e).context(ReadObjectSnafu { path }),
}
}
async fn delete_checkpoint(&self, version: ManifestVersion) -> Result<()> {
@@ -351,5 +404,22 @@ mod tests {
let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap();
assert_eq!(checkpoint, "checkpoint".as_bytes());
assert_eq!(3, v);
//delete (,4) logs and checkpoints
log_store.delete_until(4).await.unwrap();
assert!(log_store.load_checkpoint(3).await.unwrap().is_none());
assert!(log_store.load_last_checkpoint().await.unwrap().is_none());
let mut it = log_store.scan(0, 11).await.unwrap();
let (version, bytes) = it.next_log().await.unwrap().unwrap();
assert_eq!(4, version);
assert_eq!("hello, 4".as_bytes(), bytes);
assert!(it.next_log().await.unwrap().is_none());
// delete all logs and checkpoints
log_store.delete_until(11).await.unwrap();
assert!(log_store.load_checkpoint(3).await.unwrap().is_none());
assert!(log_store.load_last_checkpoint().await.unwrap().is_none());
let mut it = log_store.scan(0, 11).await.unwrap();
assert!(it.next_log().await.unwrap().is_none());
}
}

View File

@@ -629,7 +629,8 @@ impl<S: LogStore> RegionInner<S> {
}
async fn close(&self) -> Result<()> {
self.writer.close().await
self.writer.close().await?;
self.manifest.stop().await
}
async fn flush(&self, ctx: &FlushContext) -> Result<()> {

View File

@@ -314,7 +314,8 @@ async fn test_recover_region_manifets() {
builder.root(&tmp_dir.path().to_string_lossy());
let object_store = ObjectStore::new(builder).unwrap().finish();
let manifest = RegionManifest::with_checkpointer("/manifest/", object_store.clone());
let manifest =
RegionManifest::with_checkpointer("/manifest/", object_store.clone(), None, None);
let region_meta = Arc::new(build_region_meta());
let sst_layer = Arc::new(FsAccessLayer::new("sst", object_store)) as _;

View File

@@ -18,6 +18,7 @@ use log_store::raft_engine::log_store::RaftEngineLogStore;
use log_store::LogConfig;
use object_store::services::Fs;
use object_store::ObjectStore;
use store_api::manifest::Manifest;
use crate::background::JobPoolImpl;
use crate::compaction::noop::NoopCompactionScheduler;
@@ -57,7 +58,8 @@ pub async fn new_store_config_with_object_store(
let manifest_dir = engine::region_manifest_dir(parent_dir, region_name);
let sst_layer = Arc::new(FsAccessLayer::new(&sst_dir, object_store.clone()));
let manifest = RegionManifest::with_checkpointer(&manifest_dir, object_store);
let manifest = RegionManifest::with_checkpointer(&manifest_dir, object_store, None, None);
manifest.start().await.unwrap();
let job_pool = Arc::new(JobPoolImpl {});
let flush_scheduler = Arc::new(FlushSchedulerImpl::new(job_pool));
let log_config = LogConfig {

View File

@@ -102,4 +102,13 @@ pub trait Manifest: Send + Sync + Clone + 'static {
/// Returns the last(or latest) manifest version.
fn last_version(&self) -> ManifestVersion;
/// Start the service
async fn start(&self) -> Result<(), Self::Error> {
Ok(())
}
/// Stop the service
async fn stop(&self) -> Result<(), Self::Error> {
Ok(())
}
}

View File

@@ -36,6 +36,10 @@ pub trait ManifestLogStorage {
end: ManifestVersion,
) -> Result<Self::Iter, Self::Error>;
/// Delete logs which version is less than specified version.
/// Returns the delete logs number.
async fn delete_until(&self, version: ManifestVersion) -> Result<usize, Self::Error>;
/// Save a log
async fn save(&self, version: ManifestVersion, bytes: &[u8]) -> Result<(), Self::Error>;

View File

@@ -24,7 +24,7 @@ use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER
use common_runtime::Builder as RuntimeBuilder;
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use datanode::datanode::{
DatanodeOptions, FileConfig, ObjectStoreConfig, OssConfig, S3Config, WalConfig,
DatanodeOptions, FileConfig, ObjectStoreConfig, OssConfig, S3Config, StorageConfig, WalConfig,
};
use datanode::error::{CreateTableSnafu, Result};
use datanode::instance::{Instance, InstanceRef};
@@ -100,8 +100,7 @@ fn get_test_store_config(
access_key_secret: env::var("GT_OSS_ACCESS_KEY").unwrap(),
bucket: env::var("GT_OSS_BUCKET").unwrap(),
endpoint: env::var("GT_OSS_ENDPOINT").unwrap(),
cache_path: None,
cache_capacity: None,
..Default::default()
};
let mut builder = Oss::default();
@@ -127,10 +126,7 @@ fn get_test_store_config(
access_key_id: env::var("GT_S3_ACCESS_KEY_ID").unwrap(),
secret_access_key: env::var("GT_S3_ACCESS_KEY").unwrap(),
bucket: env::var("GT_S3_BUCKET").unwrap(),
endpoint: None,
region: None,
cache_path: None,
cache_capacity: None,
..Default::default()
};
let mut builder = S3::default();
@@ -189,14 +185,17 @@ pub fn create_tmp_dir_and_datanode_opts(
) -> (DatanodeOptions, TestGuard) {
let wal_tmp_dir = create_temp_dir(&format!("gt_wal_{name}"));
let (storage, data_tmp_dir) = get_test_store_config(&store_type, name);
let (store, data_tmp_dir) = get_test_store_config(&store_type, name);
let opts = DatanodeOptions {
wal: WalConfig {
dir: wal_tmp_dir.path().to_str().unwrap().to_string(),
..Default::default()
},
storage,
storage: StorageConfig {
store,
..Default::default()
},
mode: Mode::Standalone,
..Default::default()
};