fix: remove start from LogStore; fix error message (#837)

(cherry picked from commit 627d444723)
This commit is contained in:
Lei, HUANG
2023-01-06 12:21:00 +08:00
parent 7bbc679c76
commit 7e3c59fb51
11 changed files with 56 additions and 72 deletions

View File

@@ -1,10 +1,10 @@
// Copyright 2022 Greptime Team
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
@@ -144,12 +144,6 @@ pub enum Error {
source: log_store::error::Error,
},
#[snafu(display("Failed to star log store gc task, source: {}", source))]
StartLogStore {
#[snafu(backtrace)]
source: log_store::error::Error,
},
#[snafu(display("Failed to storage engine, source: {}", source))]
OpenStorageEngine { source: StorageError },
@@ -367,7 +361,6 @@ impl ErrorExt for Error {
Error::BumpTableId { source, .. } => source.status_code(),
Error::MissingNodeId { .. } => StatusCode::InvalidArguments,
Error::MissingMetasrvOpts { .. } => StatusCode::InvalidArguments,
Error::StartLogStore { source, .. } => source.status_code(),
}
}

View File

@@ -36,13 +36,12 @@ use servers::Mode;
use snafu::prelude::*;
use storage::config::EngineConfig as StorageEngineConfig;
use storage::EngineImpl;
use store_api::logstore::LogStore;
use table::table::TableIdProviderRef;
use crate::datanode::{DatanodeOptions, ObjectStoreConfig};
use crate::error::{
self, CatalogSnafu, MetaClientInitSnafu, MissingMetasrvOptsSnafu, MissingNodeIdSnafu,
NewCatalogSnafu, OpenLogStoreSnafu, Result, StartLogStoreSnafu,
NewCatalogSnafu, OpenLogStoreSnafu, Result,
};
use crate::heartbeat::HeartbeatTask;
use crate::script::ScriptExecutor;
@@ -62,7 +61,6 @@ pub struct Instance {
pub(crate) script_executor: ScriptExecutor,
pub(crate) table_id_provider: Option<TableIdProviderRef>,
pub(crate) heartbeat_task: Option<HeartbeatTask>,
pub(crate) logstore: Arc<RaftEngineLogStore>,
}
pub type InstanceRef = Arc<Instance>;
@@ -160,12 +158,10 @@ impl Instance {
script_executor,
heartbeat_task,
table_id_provider,
logstore,
})
}
pub async fn start(&self) -> Result<()> {
self.logstore.start().await.context(StartLogStoreSnafu)?;
self.catalog_manager
.start()
.await
@@ -287,6 +283,8 @@ pub(crate) async fn create_log_store(path: impl AsRef<str>) -> Result<RaftEngine
..Default::default()
};
let logstore = RaftEngineLogStore::try_new(log_config).context(OpenLogStoreSnafu)?;
let logstore = RaftEngineLogStore::try_new(log_config)
.await
.context(OpenLogStoreSnafu)?;
Ok(logstore)
}

View File

@@ -83,7 +83,6 @@ impl Instance {
script_executor,
table_id_provider: Some(Arc::new(LocalTableIdProvider::default())),
heartbeat_task: Some(heartbeat_task),
logstore,
})
}
}

View File

@@ -57,10 +57,6 @@ impl LogStore for NoopLogStore {
type Namespace = NamespaceImpl;
type Entry = EntryImpl;
async fn start(&self) -> Result<()> {
Ok(())
}
async fn stop(&self) -> Result<()> {
Ok(())
}
@@ -131,7 +127,6 @@ mod tests {
#[tokio::test]
async fn test_noop_logstore() {
let mut store = NoopLogStore::default();
store.start().await.unwrap();
let e = store.entry("".as_bytes(), 1, NamespaceImpl::default());
store.append(e.clone()).await.unwrap();
store

View File

@@ -47,7 +47,7 @@ pub struct RaftEngineLogStore {
}
impl RaftEngineLogStore {
pub fn try_new(config: LogConfig) -> Result<Self, Error> {
pub async fn try_new(config: LogConfig) -> Result<Self, Error> {
// TODO(hl): set according to available disk space
let raft_engine_config = Config {
dir: config.log_file_dir.clone(),
@@ -58,36 +58,22 @@ impl RaftEngineLogStore {
..Default::default()
};
let engine = Arc::new(Engine::open(raft_engine_config).context(RaftEngineSnafu)?);
Ok(Self {
let log_store = Self {
config,
engine,
cancel_token: Mutex::new(None),
gc_task_handle: Mutex::new(None),
started: AtomicBool::new(false),
})
};
log_store.start().await?;
Ok(log_store)
}
pub fn started(&self) -> bool {
self.started.load(Ordering::Relaxed)
}
}
impl Debug for RaftEngineLogStore {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RaftEngineLogsStore")
.field("config", &self.config)
.field("started", &self.started.load(Ordering::Relaxed))
.finish()
}
}
#[async_trait::async_trait]
impl LogStore for RaftEngineLogStore {
type Error = Error;
type Namespace = Namespace;
type Entry = Entry;
async fn start(&self) -> Result<(), Self::Error> {
async fn start(&self) -> Result<(), Error> {
let engine_clone = self.engine.clone();
let interval = self.config.gc_interval;
let token = CancellationToken::new();
@@ -123,6 +109,22 @@ impl LogStore for RaftEngineLogStore {
info!("RaftEngineLogStore started with config: {:?}", self.config);
Ok(())
}
}
impl Debug for RaftEngineLogStore {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RaftEngineLogsStore")
.field("config", &self.config)
.field("started", &self.started.load(Ordering::Relaxed))
.finish()
}
}
#[async_trait::async_trait]
impl LogStore for RaftEngineLogStore {
type Error = Error;
type Namespace = Namespace;
type Entry = Entry;
async fn stop(&self) -> Result<(), Self::Error> {
ensure!(
@@ -355,6 +357,7 @@ mod tests {
log_file_dir: dir.path().to_str().unwrap().to_string(),
..Default::default()
})
.await
.unwrap();
logstore.start().await.unwrap();
let namespaces = logstore.list_namespaces().await.unwrap();
@@ -368,6 +371,7 @@ mod tests {
log_file_dir: dir.path().to_str().unwrap().to_string(),
..Default::default()
})
.await
.unwrap();
logstore.start().await.unwrap();
assert!(logstore.list_namespaces().await.unwrap().is_empty());
@@ -394,6 +398,7 @@ mod tests {
log_file_dir: dir.path().to_str().unwrap().to_string(),
..Default::default()
})
.await
.unwrap();
logstore.start().await.unwrap();
@@ -435,8 +440,8 @@ mod tests {
log_file_dir: dir.path().to_str().unwrap().to_string(),
..Default::default()
})
.await
.unwrap();
logstore.start().await.unwrap();
logstore
.append(Entry::create(1, 1, "1".as_bytes().to_vec()))
.await
@@ -455,6 +460,7 @@ mod tests {
log_file_dir: dir.path().to_str().unwrap().to_string(),
..Default::default()
})
.await
.unwrap();
logstore.start().await.unwrap();
@@ -495,7 +501,7 @@ mod tests {
..Default::default()
};
let logstore = RaftEngineLogStore::try_new(config).unwrap();
let logstore = RaftEngineLogStore::try_new(config).await.unwrap();
logstore.start().await.unwrap();
let namespace = Namespace::with_id(42);
for id in 0..4096 {
@@ -528,7 +534,7 @@ mod tests {
..Default::default()
};
let logstore = RaftEngineLogStore::try_new(config).unwrap();
let logstore = RaftEngineLogStore::try_new(config).await.unwrap();
logstore.start().await.unwrap();
let namespace = Namespace::with_id(42);
for id in 0..1024 {

View File

@@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use store_api::logstore::LogStore;
use tempdir::TempDir;
use crate::raft_engine::log_store::RaftEngineLogStore;
@@ -29,7 +28,6 @@ pub async fn create_tmp_local_file_log_store(dir: &str) -> (RaftEngineLogStore,
..Default::default()
};
let logstore = RaftEngineLogStore::try_new(cfg).unwrap();
logstore.start().await.unwrap();
let logstore = RaftEngineLogStore::try_new(cfg).await.unwrap();
(logstore, dir)
}

View File

@@ -109,7 +109,6 @@ mod tests {
use mito::engine::MitoEngine;
use storage::config::EngineConfig as StorageEngineConfig;
use storage::EngineImpl;
use store_api::logstore::LogStore;
use tempdir::TempDir;
#[tokio::test]
@@ -124,9 +123,7 @@ mod tests {
..Default::default()
};
let log_store = RaftEngineLogStore::try_new(log_config).unwrap();
log_store.start().await.unwrap();
let log_store = RaftEngineLogStore::try_new(log_config).await.unwrap();
let mock_engine = Arc::new(DefaultEngine::new(
TableEngineConfig::default(),
EngineImpl::new(

View File

@@ -205,11 +205,11 @@ pub enum Error {
},
#[snafu(display(
"Failed to mark WAL as stable, region id: {}, source: {}",
"Failed to mark WAL as obsolete, region id: {}, source: {}",
region_id,
source
))]
MarkWalStable {
MarkWalObsolete {
region_id: u64,
#[snafu(backtrace)]
source: BoxedError,
@@ -504,7 +504,7 @@ impl ErrorExt for Error {
PushBatch { source, .. } => source.status_code(),
CreateDefault { source, .. } => source.status_code(),
ConvertChunk { source, .. } => source.status_code(),
MarkWalStable { source, .. } => source.status_code(),
MarkWalObsolete { source, .. } => source.status_code(),
}
}

View File

@@ -18,7 +18,6 @@ use log_store::raft_engine::log_store::RaftEngineLogStore;
use log_store::LogConfig;
use object_store::backend::fs::Builder;
use object_store::ObjectStore;
use store_api::logstore::LogStore;
use crate::background::JobPoolImpl;
use crate::engine;
@@ -51,8 +50,7 @@ pub async fn new_store_config(
log_file_dir: log_store_dir(store_dir),
..Default::default()
};
let log_store = Arc::new(RaftEngineLogStore::try_new(log_config).unwrap());
log_store.start().await.unwrap();
let log_store = Arc::new(RaftEngineLogStore::try_new(log_config).await.unwrap());
StoreConfig {
log_store,

View File

@@ -1,10 +1,10 @@
// Copyright 2022 Greptime Team
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
@@ -24,7 +24,10 @@ use store_api::logstore::LogStore;
use store_api::storage::{RegionId, SequenceNumber};
use crate::codec::{Decoder, Encoder};
use crate::error::{self, Error, MarkWalStableSnafu, Result};
use crate::error::{
DecodeWalHeaderSnafu, EncodeWalHeaderSnafu, Error, MarkWalObsoleteSnafu, ReadWalSnafu, Result,
WalDataCorruptedSnafu, WriteWalSnafu,
};
use crate::proto::wal::{self, WalHeader};
use crate::write_batch::codec::{PayloadDecoder, PayloadEncoder};
use crate::write_batch::Payload;
@@ -65,7 +68,7 @@ impl<S: LogStore> Wal<S> {
.obsolete(self.namespace.clone(), seq)
.await
.map_err(BoxedError::new)
.context(MarkWalStableSnafu {
.context(MarkWalObsoleteSnafu {
region_id: self.region_id,
})
}
@@ -120,7 +123,7 @@ impl<S: LogStore> Wal<S> {
encoder
.encode(p, &mut buf)
.map_err(BoxedError::new)
.context(error::WriteWalSnafu {
.context(WriteWalSnafu {
region_id: self.region_id(),
})?;
}
@@ -135,7 +138,7 @@ impl<S: LogStore> Wal<S> {
.read(&self.namespace, start_seq)
.await
.map_err(BoxedError::new)
.context(error::ReadWalSnafu {
.context(ReadWalSnafu {
region_id: self.region_id(),
})?
// Handle the error when reading from the stream.
@@ -161,7 +164,7 @@ impl<S: LogStore> Wal<S> {
.append(e)
.await
.map_err(BoxedError::new)
.context(error::WriteWalSnafu {
.context(WriteWalSnafu {
region_id: self.region_id(),
})?;
@@ -180,7 +183,7 @@ impl<S: LogStore> Wal<S> {
ensure!(
data_pos <= input.len(),
error::WalDataCorruptedSnafu {
WalDataCorruptedSnafu {
region_id: self.region_id(),
message: format!(
"Not enough input buffer, expected data position={}, actual buffer length={}",
@@ -198,7 +201,7 @@ impl<S: LogStore> Wal<S> {
let payload = decoder
.decode(&input[data_pos..])
.map_err(BoxedError::new)
.context(error::ReadWalSnafu {
.context(ReadWalSnafu {
region_id: self.region_id(),
})?;
@@ -215,7 +218,7 @@ impl Encoder for WalHeaderEncoder {
fn encode(&self, item: &WalHeader, dst: &mut Vec<u8>) -> Result<()> {
item.encode_length_delimited(dst)
.map_err(|err| err.into())
.context(error::EncodeWalHeaderSnafu)
.context(EncodeWalHeaderSnafu)
}
}
@@ -228,12 +231,12 @@ impl Decoder for WalHeaderDecoder {
fn decode(&self, src: &[u8]) -> Result<(usize, WalHeader)> {
let mut data_pos = prost::decode_length_delimiter(src)
.map_err(|err| err.into())
.context(error::DecodeWalHeaderSnafu)?;
.context(DecodeWalHeaderSnafu)?;
data_pos += prost::length_delimiter_len(data_pos);
let wal_header = WalHeader::decode_length_delimited(src)
.map_err(|err| err.into())
.context(error::DecodeWalHeaderSnafu)?;
.context(DecodeWalHeaderSnafu)?;
Ok((data_pos, wal_header))
}

View File

@@ -31,9 +31,6 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug {
type Namespace: Namespace;
type Entry: Entry;
/// Start the components and background tasks of logstore.
async fn start(&self) -> Result<(), Self::Error>;
/// Stop components of logstore.
async fn stop(&self) -> Result<(), Self::Error>;