From 627d4447236344f7483eb4e5c38b24ce4c5b92f3 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Fri, 6 Jan 2023 12:21:00 +0800 Subject: [PATCH] fix: remove start from LogStore; fix error message (#837) --- src/datanode/src/error.rs | 7 --- src/datanode/src/instance.rs | 10 ++-- src/datanode/src/mock.rs | 1 - src/log-store/src/noop.rs | 5 -- src/log-store/src/raft_engine/log_store.rs | 52 +++++++++++-------- src/log-store/src/test_util/log_store_util.rs | 4 +- src/script/src/manager.rs | 5 +- src/storage/src/error.rs | 6 +-- src/storage/src/test_util/config_util.rs | 4 +- src/storage/src/wal.rs | 4 +- src/store-api/src/logstore.rs | 3 -- 11 files changed, 41 insertions(+), 60 deletions(-) diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index f0c889106c..7d14402c57 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -141,12 +141,6 @@ pub enum Error { source: log_store::error::Error, }, - #[snafu(display("Failed to star log store gc task, source: {}", source))] - StartLogStore { - #[snafu(backtrace)] - source: log_store::error::Error, - }, - #[snafu(display("Failed to storage engine, source: {}", source))] OpenStorageEngine { source: StorageError }, @@ -382,7 +376,6 @@ impl ErrorExt for Error { Error::BumpTableId { source, .. } => source.status_code(), Error::MissingNodeId { .. } => StatusCode::InvalidArguments, Error::MissingMetasrvOpts { .. } => StatusCode::InvalidArguments, - Error::StartLogStore { source, .. } => source.status_code(), Error::PollRecordbatchStream { source } => source.status_code(), } } diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 9489bb656b..8a41520d3f 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -36,13 +36,12 @@ use servers::Mode; use snafu::prelude::*; use storage::config::EngineConfig as StorageEngineConfig; use storage::EngineImpl; -use store_api::logstore::LogStore; use table::table::TableIdProviderRef; use crate::datanode::{DatanodeOptions, ObjectStoreConfig}; use crate::error::{ self, CatalogSnafu, MetaClientInitSnafu, MissingMetasrvOptsSnafu, MissingNodeIdSnafu, - NewCatalogSnafu, OpenLogStoreSnafu, Result, StartLogStoreSnafu, + NewCatalogSnafu, OpenLogStoreSnafu, Result, }; use crate::heartbeat::HeartbeatTask; use crate::script::ScriptExecutor; @@ -63,7 +62,6 @@ pub struct Instance { pub(crate) script_executor: ScriptExecutor, pub(crate) table_id_provider: Option, pub(crate) heartbeat_task: Option, - pub(crate) logstore: Arc, } pub type InstanceRef = Arc; @@ -161,12 +159,10 @@ impl Instance { script_executor, heartbeat_task, table_id_provider, - logstore, }) } pub async fn start(&self) -> Result<()> { - self.logstore.start().await.context(StartLogStoreSnafu)?; self.catalog_manager .start() .await @@ -305,6 +301,8 @@ pub(crate) async fn create_log_store(path: impl AsRef) -> Result Result<()> { - Ok(()) - } - async fn stop(&self) -> Result<()> { Ok(()) } @@ -131,7 +127,6 @@ mod tests { #[tokio::test] async fn test_noop_logstore() { let mut store = NoopLogStore::default(); - store.start().await.unwrap(); let e = store.entry("".as_bytes(), 1, NamespaceImpl::default()); store.append(e.clone()).await.unwrap(); store diff --git a/src/log-store/src/raft_engine/log_store.rs b/src/log-store/src/raft_engine/log_store.rs index 4231767f6d..8d03594c55 100644 --- a/src/log-store/src/raft_engine/log_store.rs +++ b/src/log-store/src/raft_engine/log_store.rs @@ -47,7 +47,7 @@ pub struct RaftEngineLogStore { } impl RaftEngineLogStore { - pub fn try_new(config: LogConfig) -> Result { + pub async fn try_new(config: LogConfig) -> Result { // TODO(hl): set according to available disk space let raft_engine_config = Config { dir: config.log_file_dir.clone(), @@ -58,36 +58,22 @@ impl RaftEngineLogStore { ..Default::default() }; let engine = Arc::new(Engine::open(raft_engine_config).context(RaftEngineSnafu)?); - Ok(Self { + let log_store = Self { config, engine, cancel_token: Mutex::new(None), gc_task_handle: Mutex::new(None), started: AtomicBool::new(false), - }) + }; + log_store.start().await?; + Ok(log_store) } pub fn started(&self) -> bool { self.started.load(Ordering::Relaxed) } -} -impl Debug for RaftEngineLogStore { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("RaftEngineLogsStore") - .field("config", &self.config) - .field("started", &self.started.load(Ordering::Relaxed)) - .finish() - } -} - -#[async_trait::async_trait] -impl LogStore for RaftEngineLogStore { - type Error = Error; - type Namespace = Namespace; - type Entry = Entry; - - async fn start(&self) -> Result<(), Self::Error> { + async fn start(&self) -> Result<(), Error> { let engine_clone = self.engine.clone(); let interval = self.config.gc_interval; let token = CancellationToken::new(); @@ -123,6 +109,22 @@ impl LogStore for RaftEngineLogStore { info!("RaftEngineLogStore started with config: {:?}", self.config); Ok(()) } +} + +impl Debug for RaftEngineLogStore { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RaftEngineLogsStore") + .field("config", &self.config) + .field("started", &self.started.load(Ordering::Relaxed)) + .finish() + } +} + +#[async_trait::async_trait] +impl LogStore for RaftEngineLogStore { + type Error = Error; + type Namespace = Namespace; + type Entry = Entry; async fn stop(&self) -> Result<(), Self::Error> { ensure!( @@ -355,6 +357,7 @@ mod tests { log_file_dir: dir.path().to_str().unwrap().to_string(), ..Default::default() }) + .await .unwrap(); logstore.start().await.unwrap(); let namespaces = logstore.list_namespaces().await.unwrap(); @@ -368,6 +371,7 @@ mod tests { log_file_dir: dir.path().to_str().unwrap().to_string(), ..Default::default() }) + .await .unwrap(); logstore.start().await.unwrap(); assert!(logstore.list_namespaces().await.unwrap().is_empty()); @@ -394,6 +398,7 @@ mod tests { log_file_dir: dir.path().to_str().unwrap().to_string(), ..Default::default() }) + .await .unwrap(); logstore.start().await.unwrap(); @@ -435,8 +440,8 @@ mod tests { log_file_dir: dir.path().to_str().unwrap().to_string(), ..Default::default() }) + .await .unwrap(); - logstore.start().await.unwrap(); logstore .append(Entry::create(1, 1, "1".as_bytes().to_vec())) .await @@ -455,6 +460,7 @@ mod tests { log_file_dir: dir.path().to_str().unwrap().to_string(), ..Default::default() }) + .await .unwrap(); logstore.start().await.unwrap(); @@ -495,7 +501,7 @@ mod tests { ..Default::default() }; - let logstore = RaftEngineLogStore::try_new(config).unwrap(); + let logstore = RaftEngineLogStore::try_new(config).await.unwrap(); logstore.start().await.unwrap(); let namespace = Namespace::with_id(42); for id in 0..4096 { @@ -528,7 +534,7 @@ mod tests { ..Default::default() }; - let logstore = RaftEngineLogStore::try_new(config).unwrap(); + let logstore = RaftEngineLogStore::try_new(config).await.unwrap(); logstore.start().await.unwrap(); let namespace = Namespace::with_id(42); for id in 0..1024 { diff --git a/src/log-store/src/test_util/log_store_util.rs b/src/log-store/src/test_util/log_store_util.rs index 413427a20b..af20be8b76 100644 --- a/src/log-store/src/test_util/log_store_util.rs +++ b/src/log-store/src/test_util/log_store_util.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use store_api::logstore::LogStore; use tempdir::TempDir; use crate::raft_engine::log_store::RaftEngineLogStore; @@ -29,7 +28,6 @@ pub async fn create_tmp_local_file_log_store(dir: &str) -> (RaftEngineLogStore, ..Default::default() }; - let logstore = RaftEngineLogStore::try_new(cfg).unwrap(); - logstore.start().await.unwrap(); + let logstore = RaftEngineLogStore::try_new(cfg).await.unwrap(); (logstore, dir) } diff --git a/src/script/src/manager.rs b/src/script/src/manager.rs index 12e9d80526..01d4d56c53 100644 --- a/src/script/src/manager.rs +++ b/src/script/src/manager.rs @@ -107,7 +107,6 @@ mod tests { use mito::engine::MitoEngine; use storage::config::EngineConfig as StorageEngineConfig; use storage::EngineImpl; - use store_api::logstore::LogStore; use tempdir::TempDir; #[tokio::test] @@ -122,9 +121,7 @@ mod tests { ..Default::default() }; - let log_store = RaftEngineLogStore::try_new(log_config).unwrap(); - log_store.start().await.unwrap(); - + let log_store = RaftEngineLogStore::try_new(log_config).await.unwrap(); let mock_engine = Arc::new(DefaultEngine::new( TableEngineConfig::default(), EngineImpl::new( diff --git a/src/storage/src/error.rs b/src/storage/src/error.rs index 197d5fc1d9..2219c0bf00 100644 --- a/src/storage/src/error.rs +++ b/src/storage/src/error.rs @@ -205,11 +205,11 @@ pub enum Error { }, #[snafu(display( - "Failed to mark WAL as stable, region id: {}, source: {}", + "Failed to mark WAL as obsolete, region id: {}, source: {}", region_id, source ))] - MarkWalStable { + MarkWalObsolete { region_id: u64, #[snafu(backtrace)] source: BoxedError, @@ -508,7 +508,7 @@ impl ErrorExt for Error { PushBatch { source, .. } => source.status_code(), CreateDefault { source, .. } => source.status_code(), ConvertChunk { source, .. } => source.status_code(), - MarkWalStable { source, .. } => source.status_code(), + MarkWalObsolete { source, .. } => source.status_code(), } } diff --git a/src/storage/src/test_util/config_util.rs b/src/storage/src/test_util/config_util.rs index 84b87c4195..70ebdcf897 100644 --- a/src/storage/src/test_util/config_util.rs +++ b/src/storage/src/test_util/config_util.rs @@ -18,7 +18,6 @@ use log_store::raft_engine::log_store::RaftEngineLogStore; use log_store::LogConfig; use object_store::backend::fs::Builder; use object_store::ObjectStore; -use store_api::logstore::LogStore; use crate::background::JobPoolImpl; use crate::engine; @@ -51,8 +50,7 @@ pub async fn new_store_config( log_file_dir: log_store_dir(store_dir), ..Default::default() }; - let log_store = Arc::new(RaftEngineLogStore::try_new(log_config).unwrap()); - log_store.start().await.unwrap(); + let log_store = Arc::new(RaftEngineLogStore::try_new(log_config).await.unwrap()); StoreConfig { log_store, diff --git a/src/storage/src/wal.rs b/src/storage/src/wal.rs index a0a2920200..91920ec270 100644 --- a/src/storage/src/wal.rs +++ b/src/storage/src/wal.rs @@ -25,7 +25,7 @@ use store_api::storage::{RegionId, SequenceNumber}; use crate::codec::{Decoder, Encoder}; use crate::error::{ - DecodeWalHeaderSnafu, EncodeWalHeaderSnafu, Error, MarkWalStableSnafu, ReadWalSnafu, Result, + DecodeWalHeaderSnafu, EncodeWalHeaderSnafu, Error, MarkWalObsoleteSnafu, ReadWalSnafu, Result, WalDataCorruptedSnafu, WriteWalSnafu, }; use crate::proto::wal::{self, WalHeader}; @@ -68,7 +68,7 @@ impl Wal { .obsolete(self.namespace.clone(), seq) .await .map_err(BoxedError::new) - .context(MarkWalStableSnafu { + .context(MarkWalObsoleteSnafu { region_id: self.region_id, }) } diff --git a/src/store-api/src/logstore.rs b/src/store-api/src/logstore.rs index 087c37f2ea..1cd4453122 100644 --- a/src/store-api/src/logstore.rs +++ b/src/store-api/src/logstore.rs @@ -31,9 +31,6 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug { type Namespace: Namespace; type Entry: Entry; - /// Start the components and background tasks of logstore. - async fn start(&self) -> Result<(), Self::Error>; - /// Stop components of logstore. async fn stop(&self) -> Result<(), Self::Error>;