From 7e3c59fb517657bc84a605e4fdaaf0278a4c5650 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Fri, 6 Jan 2023 12:21:00 +0800 Subject: [PATCH] fix: remove start from LogStore; fix error message (#837) (cherry picked from commit 627d4447236344f7483eb4e5c38b24ce4c5b92f3) --- src/datanode/src/error.rs | 11 +--- src/datanode/src/instance.rs | 10 ++-- src/datanode/src/mock.rs | 1 - src/log-store/src/noop.rs | 5 -- src/log-store/src/raft_engine/log_store.rs | 52 +++++++++++-------- src/log-store/src/test_util/log_store_util.rs | 4 +- src/script/src/manager.rs | 5 +- src/storage/src/error.rs | 6 +-- src/storage/src/test_util/config_util.rs | 4 +- src/storage/src/wal.rs | 27 +++++----- src/store-api/src/logstore.rs | 3 -- 11 files changed, 56 insertions(+), 72 deletions(-) diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 6115bfa60a..469ec760fb 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -1,10 +1,10 @@ -// Copyright 2022 Greptime Team +// Copyright 2023 Greptime Team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, @@ -144,12 +144,6 @@ pub enum Error { source: log_store::error::Error, }, - #[snafu(display("Failed to star log store gc task, source: {}", source))] - StartLogStore { - #[snafu(backtrace)] - source: log_store::error::Error, - }, - #[snafu(display("Failed to storage engine, source: {}", source))] OpenStorageEngine { source: StorageError }, @@ -367,7 +361,6 @@ impl ErrorExt for Error { Error::BumpTableId { source, .. } => source.status_code(), Error::MissingNodeId { .. } => StatusCode::InvalidArguments, Error::MissingMetasrvOpts { .. } => StatusCode::InvalidArguments, - Error::StartLogStore { source, .. } => source.status_code(), } } diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index fa19cecf77..fc4ff605fe 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -36,13 +36,12 @@ use servers::Mode; use snafu::prelude::*; use storage::config::EngineConfig as StorageEngineConfig; use storage::EngineImpl; -use store_api::logstore::LogStore; use table::table::TableIdProviderRef; use crate::datanode::{DatanodeOptions, ObjectStoreConfig}; use crate::error::{ self, CatalogSnafu, MetaClientInitSnafu, MissingMetasrvOptsSnafu, MissingNodeIdSnafu, - NewCatalogSnafu, OpenLogStoreSnafu, Result, StartLogStoreSnafu, + NewCatalogSnafu, OpenLogStoreSnafu, Result, }; use crate::heartbeat::HeartbeatTask; use crate::script::ScriptExecutor; @@ -62,7 +61,6 @@ pub struct Instance { pub(crate) script_executor: ScriptExecutor, pub(crate) table_id_provider: Option, pub(crate) heartbeat_task: Option, - pub(crate) logstore: Arc, } pub type InstanceRef = Arc; @@ -160,12 +158,10 @@ impl Instance { script_executor, heartbeat_task, table_id_provider, - logstore, }) } pub async fn start(&self) -> Result<()> { - self.logstore.start().await.context(StartLogStoreSnafu)?; self.catalog_manager .start() .await @@ -287,6 +283,8 @@ pub(crate) async fn create_log_store(path: impl AsRef) -> Result Result<()> { - Ok(()) - } - async fn stop(&self) -> Result<()> { Ok(()) } @@ -131,7 +127,6 @@ mod tests { #[tokio::test] async fn test_noop_logstore() { let mut store = NoopLogStore::default(); - store.start().await.unwrap(); let e = store.entry("".as_bytes(), 1, NamespaceImpl::default()); store.append(e.clone()).await.unwrap(); store diff --git a/src/log-store/src/raft_engine/log_store.rs b/src/log-store/src/raft_engine/log_store.rs index 4231767f6d..8d03594c55 100644 --- a/src/log-store/src/raft_engine/log_store.rs +++ b/src/log-store/src/raft_engine/log_store.rs @@ -47,7 +47,7 @@ pub struct RaftEngineLogStore { } impl RaftEngineLogStore { - pub fn try_new(config: LogConfig) -> Result { + pub async fn try_new(config: LogConfig) -> Result { // TODO(hl): set according to available disk space let raft_engine_config = Config { dir: config.log_file_dir.clone(), @@ -58,36 +58,22 @@ impl RaftEngineLogStore { ..Default::default() }; let engine = Arc::new(Engine::open(raft_engine_config).context(RaftEngineSnafu)?); - Ok(Self { + let log_store = Self { config, engine, cancel_token: Mutex::new(None), gc_task_handle: Mutex::new(None), started: AtomicBool::new(false), - }) + }; + log_store.start().await?; + Ok(log_store) } pub fn started(&self) -> bool { self.started.load(Ordering::Relaxed) } -} -impl Debug for RaftEngineLogStore { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("RaftEngineLogsStore") - .field("config", &self.config) - .field("started", &self.started.load(Ordering::Relaxed)) - .finish() - } -} - -#[async_trait::async_trait] -impl LogStore for RaftEngineLogStore { - type Error = Error; - type Namespace = Namespace; - type Entry = Entry; - - async fn start(&self) -> Result<(), Self::Error> { + async fn start(&self) -> Result<(), Error> { let engine_clone = self.engine.clone(); let interval = self.config.gc_interval; let token = CancellationToken::new(); @@ -123,6 +109,22 @@ impl LogStore for RaftEngineLogStore { info!("RaftEngineLogStore started with config: {:?}", self.config); Ok(()) } +} + +impl Debug for RaftEngineLogStore { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RaftEngineLogsStore") + .field("config", &self.config) + .field("started", &self.started.load(Ordering::Relaxed)) + .finish() + } +} + +#[async_trait::async_trait] +impl LogStore for RaftEngineLogStore { + type Error = Error; + type Namespace = Namespace; + type Entry = Entry; async fn stop(&self) -> Result<(), Self::Error> { ensure!( @@ -355,6 +357,7 @@ mod tests { log_file_dir: dir.path().to_str().unwrap().to_string(), ..Default::default() }) + .await .unwrap(); logstore.start().await.unwrap(); let namespaces = logstore.list_namespaces().await.unwrap(); @@ -368,6 +371,7 @@ mod tests { log_file_dir: dir.path().to_str().unwrap().to_string(), ..Default::default() }) + .await .unwrap(); logstore.start().await.unwrap(); assert!(logstore.list_namespaces().await.unwrap().is_empty()); @@ -394,6 +398,7 @@ mod tests { log_file_dir: dir.path().to_str().unwrap().to_string(), ..Default::default() }) + .await .unwrap(); logstore.start().await.unwrap(); @@ -435,8 +440,8 @@ mod tests { log_file_dir: dir.path().to_str().unwrap().to_string(), ..Default::default() }) + .await .unwrap(); - logstore.start().await.unwrap(); logstore .append(Entry::create(1, 1, "1".as_bytes().to_vec())) .await @@ -455,6 +460,7 @@ mod tests { log_file_dir: dir.path().to_str().unwrap().to_string(), ..Default::default() }) + .await .unwrap(); logstore.start().await.unwrap(); @@ -495,7 +501,7 @@ mod tests { ..Default::default() }; - let logstore = RaftEngineLogStore::try_new(config).unwrap(); + let logstore = RaftEngineLogStore::try_new(config).await.unwrap(); logstore.start().await.unwrap(); let namespace = Namespace::with_id(42); for id in 0..4096 { @@ -528,7 +534,7 @@ mod tests { ..Default::default() }; - let logstore = RaftEngineLogStore::try_new(config).unwrap(); + let logstore = RaftEngineLogStore::try_new(config).await.unwrap(); logstore.start().await.unwrap(); let namespace = Namespace::with_id(42); for id in 0..1024 { diff --git a/src/log-store/src/test_util/log_store_util.rs b/src/log-store/src/test_util/log_store_util.rs index 06143712e6..81a999a713 100644 --- a/src/log-store/src/test_util/log_store_util.rs +++ b/src/log-store/src/test_util/log_store_util.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use store_api::logstore::LogStore; use tempdir::TempDir; use crate::raft_engine::log_store::RaftEngineLogStore; @@ -29,7 +28,6 @@ pub async fn create_tmp_local_file_log_store(dir: &str) -> (RaftEngineLogStore, ..Default::default() }; - let logstore = RaftEngineLogStore::try_new(cfg).unwrap(); - logstore.start().await.unwrap(); + let logstore = RaftEngineLogStore::try_new(cfg).await.unwrap(); (logstore, dir) } diff --git a/src/script/src/manager.rs b/src/script/src/manager.rs index d01d1f961f..a7f11a813b 100644 --- a/src/script/src/manager.rs +++ b/src/script/src/manager.rs @@ -109,7 +109,6 @@ mod tests { use mito::engine::MitoEngine; use storage::config::EngineConfig as StorageEngineConfig; use storage::EngineImpl; - use store_api::logstore::LogStore; use tempdir::TempDir; #[tokio::test] @@ -124,9 +123,7 @@ mod tests { ..Default::default() }; - let log_store = RaftEngineLogStore::try_new(log_config).unwrap(); - log_store.start().await.unwrap(); - + let log_store = RaftEngineLogStore::try_new(log_config).await.unwrap(); let mock_engine = Arc::new(DefaultEngine::new( TableEngineConfig::default(), EngineImpl::new( diff --git a/src/storage/src/error.rs b/src/storage/src/error.rs index e26f42db2c..d0b204fe63 100644 --- a/src/storage/src/error.rs +++ b/src/storage/src/error.rs @@ -205,11 +205,11 @@ pub enum Error { }, #[snafu(display( - "Failed to mark WAL as stable, region id: {}, source: {}", + "Failed to mark WAL as obsolete, region id: {}, source: {}", region_id, source ))] - MarkWalStable { + MarkWalObsolete { region_id: u64, #[snafu(backtrace)] source: BoxedError, @@ -504,7 +504,7 @@ impl ErrorExt for Error { PushBatch { source, .. } => source.status_code(), CreateDefault { source, .. } => source.status_code(), ConvertChunk { source, .. } => source.status_code(), - MarkWalStable { source, .. } => source.status_code(), + MarkWalObsolete { source, .. } => source.status_code(), } } diff --git a/src/storage/src/test_util/config_util.rs b/src/storage/src/test_util/config_util.rs index 38730bba5d..c0e7bd85e4 100644 --- a/src/storage/src/test_util/config_util.rs +++ b/src/storage/src/test_util/config_util.rs @@ -18,7 +18,6 @@ use log_store::raft_engine::log_store::RaftEngineLogStore; use log_store::LogConfig; use object_store::backend::fs::Builder; use object_store::ObjectStore; -use store_api::logstore::LogStore; use crate::background::JobPoolImpl; use crate::engine; @@ -51,8 +50,7 @@ pub async fn new_store_config( log_file_dir: log_store_dir(store_dir), ..Default::default() }; - let log_store = Arc::new(RaftEngineLogStore::try_new(log_config).unwrap()); - log_store.start().await.unwrap(); + let log_store = Arc::new(RaftEngineLogStore::try_new(log_config).await.unwrap()); StoreConfig { log_store, diff --git a/src/storage/src/wal.rs b/src/storage/src/wal.rs index 5ecf3a1a37..91920ec270 100644 --- a/src/storage/src/wal.rs +++ b/src/storage/src/wal.rs @@ -1,10 +1,10 @@ -// Copyright 2022 Greptime Team +// Copyright 2023 Greptime Team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, @@ -24,7 +24,10 @@ use store_api::logstore::LogStore; use store_api::storage::{RegionId, SequenceNumber}; use crate::codec::{Decoder, Encoder}; -use crate::error::{self, Error, MarkWalStableSnafu, Result}; +use crate::error::{ + DecodeWalHeaderSnafu, EncodeWalHeaderSnafu, Error, MarkWalObsoleteSnafu, ReadWalSnafu, Result, + WalDataCorruptedSnafu, WriteWalSnafu, +}; use crate::proto::wal::{self, WalHeader}; use crate::write_batch::codec::{PayloadDecoder, PayloadEncoder}; use crate::write_batch::Payload; @@ -65,7 +68,7 @@ impl Wal { .obsolete(self.namespace.clone(), seq) .await .map_err(BoxedError::new) - .context(MarkWalStableSnafu { + .context(MarkWalObsoleteSnafu { region_id: self.region_id, }) } @@ -120,7 +123,7 @@ impl Wal { encoder .encode(p, &mut buf) .map_err(BoxedError::new) - .context(error::WriteWalSnafu { + .context(WriteWalSnafu { region_id: self.region_id(), })?; } @@ -135,7 +138,7 @@ impl Wal { .read(&self.namespace, start_seq) .await .map_err(BoxedError::new) - .context(error::ReadWalSnafu { + .context(ReadWalSnafu { region_id: self.region_id(), })? // Handle the error when reading from the stream. @@ -161,7 +164,7 @@ impl Wal { .append(e) .await .map_err(BoxedError::new) - .context(error::WriteWalSnafu { + .context(WriteWalSnafu { region_id: self.region_id(), })?; @@ -180,7 +183,7 @@ impl Wal { ensure!( data_pos <= input.len(), - error::WalDataCorruptedSnafu { + WalDataCorruptedSnafu { region_id: self.region_id(), message: format!( "Not enough input buffer, expected data position={}, actual buffer length={}", @@ -198,7 +201,7 @@ impl Wal { let payload = decoder .decode(&input[data_pos..]) .map_err(BoxedError::new) - .context(error::ReadWalSnafu { + .context(ReadWalSnafu { region_id: self.region_id(), })?; @@ -215,7 +218,7 @@ impl Encoder for WalHeaderEncoder { fn encode(&self, item: &WalHeader, dst: &mut Vec) -> Result<()> { item.encode_length_delimited(dst) .map_err(|err| err.into()) - .context(error::EncodeWalHeaderSnafu) + .context(EncodeWalHeaderSnafu) } } @@ -228,12 +231,12 @@ impl Decoder for WalHeaderDecoder { fn decode(&self, src: &[u8]) -> Result<(usize, WalHeader)> { let mut data_pos = prost::decode_length_delimiter(src) .map_err(|err| err.into()) - .context(error::DecodeWalHeaderSnafu)?; + .context(DecodeWalHeaderSnafu)?; data_pos += prost::length_delimiter_len(data_pos); let wal_header = WalHeader::decode_length_delimited(src) .map_err(|err| err.into()) - .context(error::DecodeWalHeaderSnafu)?; + .context(DecodeWalHeaderSnafu)?; Ok((data_pos, wal_header)) } diff --git a/src/store-api/src/logstore.rs b/src/store-api/src/logstore.rs index 60e58e8fd1..7ab6844d56 100644 --- a/src/store-api/src/logstore.rs +++ b/src/store-api/src/logstore.rs @@ -31,9 +31,6 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug { type Namespace: Namespace; type Entry: Entry; - /// Start the components and background tasks of logstore. - async fn start(&self) -> Result<(), Self::Error>; - /// Stop components of logstore. async fn stop(&self) -> Result<(), Self::Error>;