mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-25 17:30:41 +00:00
fix: remove start from LogStore; fix error message (#837)
This commit is contained in:
@@ -141,12 +141,6 @@ pub enum Error {
|
||||
source: log_store::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to star log store gc task, source: {}", source))]
|
||||
StartLogStore {
|
||||
#[snafu(backtrace)]
|
||||
source: log_store::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to storage engine, source: {}", source))]
|
||||
OpenStorageEngine { source: StorageError },
|
||||
|
||||
@@ -382,7 +376,6 @@ impl ErrorExt for Error {
|
||||
Error::BumpTableId { source, .. } => source.status_code(),
|
||||
Error::MissingNodeId { .. } => StatusCode::InvalidArguments,
|
||||
Error::MissingMetasrvOpts { .. } => StatusCode::InvalidArguments,
|
||||
Error::StartLogStore { source, .. } => source.status_code(),
|
||||
Error::PollRecordbatchStream { source } => source.status_code(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -36,13 +36,12 @@ use servers::Mode;
|
||||
use snafu::prelude::*;
|
||||
use storage::config::EngineConfig as StorageEngineConfig;
|
||||
use storage::EngineImpl;
|
||||
use store_api::logstore::LogStore;
|
||||
use table::table::TableIdProviderRef;
|
||||
|
||||
use crate::datanode::{DatanodeOptions, ObjectStoreConfig};
|
||||
use crate::error::{
|
||||
self, CatalogSnafu, MetaClientInitSnafu, MissingMetasrvOptsSnafu, MissingNodeIdSnafu,
|
||||
NewCatalogSnafu, OpenLogStoreSnafu, Result, StartLogStoreSnafu,
|
||||
NewCatalogSnafu, OpenLogStoreSnafu, Result,
|
||||
};
|
||||
use crate::heartbeat::HeartbeatTask;
|
||||
use crate::script::ScriptExecutor;
|
||||
@@ -63,7 +62,6 @@ pub struct Instance {
|
||||
pub(crate) script_executor: ScriptExecutor,
|
||||
pub(crate) table_id_provider: Option<TableIdProviderRef>,
|
||||
pub(crate) heartbeat_task: Option<HeartbeatTask>,
|
||||
pub(crate) logstore: Arc<RaftEngineLogStore>,
|
||||
}
|
||||
|
||||
pub type InstanceRef = Arc<Instance>;
|
||||
@@ -161,12 +159,10 @@ impl Instance {
|
||||
script_executor,
|
||||
heartbeat_task,
|
||||
table_id_provider,
|
||||
logstore,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn start(&self) -> Result<()> {
|
||||
self.logstore.start().await.context(StartLogStoreSnafu)?;
|
||||
self.catalog_manager
|
||||
.start()
|
||||
.await
|
||||
@@ -305,6 +301,8 @@ pub(crate) async fn create_log_store(path: impl AsRef<str>) -> Result<RaftEngine
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let logstore = RaftEngineLogStore::try_new(log_config).context(OpenLogStoreSnafu)?;
|
||||
let logstore = RaftEngineLogStore::try_new(log_config)
|
||||
.await
|
||||
.context(OpenLogStoreSnafu)?;
|
||||
Ok(logstore)
|
||||
}
|
||||
|
||||
@@ -83,7 +83,6 @@ impl Instance {
|
||||
script_executor,
|
||||
table_id_provider: Some(Arc::new(LocalTableIdProvider::default())),
|
||||
heartbeat_task: Some(heartbeat_task),
|
||||
logstore,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -57,10 +57,6 @@ impl LogStore for NoopLogStore {
|
||||
type Namespace = NamespaceImpl;
|
||||
type Entry = EntryImpl;
|
||||
|
||||
async fn start(&self) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn stop(&self) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
@@ -131,7 +127,6 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_noop_logstore() {
|
||||
let mut store = NoopLogStore::default();
|
||||
store.start().await.unwrap();
|
||||
let e = store.entry("".as_bytes(), 1, NamespaceImpl::default());
|
||||
store.append(e.clone()).await.unwrap();
|
||||
store
|
||||
|
||||
@@ -47,7 +47,7 @@ pub struct RaftEngineLogStore {
|
||||
}
|
||||
|
||||
impl RaftEngineLogStore {
|
||||
pub fn try_new(config: LogConfig) -> Result<Self, Error> {
|
||||
pub async fn try_new(config: LogConfig) -> Result<Self, Error> {
|
||||
// TODO(hl): set according to available disk space
|
||||
let raft_engine_config = Config {
|
||||
dir: config.log_file_dir.clone(),
|
||||
@@ -58,36 +58,22 @@ impl RaftEngineLogStore {
|
||||
..Default::default()
|
||||
};
|
||||
let engine = Arc::new(Engine::open(raft_engine_config).context(RaftEngineSnafu)?);
|
||||
Ok(Self {
|
||||
let log_store = Self {
|
||||
config,
|
||||
engine,
|
||||
cancel_token: Mutex::new(None),
|
||||
gc_task_handle: Mutex::new(None),
|
||||
started: AtomicBool::new(false),
|
||||
})
|
||||
};
|
||||
log_store.start().await?;
|
||||
Ok(log_store)
|
||||
}
|
||||
|
||||
pub fn started(&self) -> bool {
|
||||
self.started.load(Ordering::Relaxed)
|
||||
}
|
||||
}
|
||||
|
||||
impl Debug for RaftEngineLogStore {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("RaftEngineLogsStore")
|
||||
.field("config", &self.config)
|
||||
.field("started", &self.started.load(Ordering::Relaxed))
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl LogStore for RaftEngineLogStore {
|
||||
type Error = Error;
|
||||
type Namespace = Namespace;
|
||||
type Entry = Entry;
|
||||
|
||||
async fn start(&self) -> Result<(), Self::Error> {
|
||||
async fn start(&self) -> Result<(), Error> {
|
||||
let engine_clone = self.engine.clone();
|
||||
let interval = self.config.gc_interval;
|
||||
let token = CancellationToken::new();
|
||||
@@ -123,6 +109,22 @@ impl LogStore for RaftEngineLogStore {
|
||||
info!("RaftEngineLogStore started with config: {:?}", self.config);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Debug for RaftEngineLogStore {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("RaftEngineLogsStore")
|
||||
.field("config", &self.config)
|
||||
.field("started", &self.started.load(Ordering::Relaxed))
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl LogStore for RaftEngineLogStore {
|
||||
type Error = Error;
|
||||
type Namespace = Namespace;
|
||||
type Entry = Entry;
|
||||
|
||||
async fn stop(&self) -> Result<(), Self::Error> {
|
||||
ensure!(
|
||||
@@ -355,6 +357,7 @@ mod tests {
|
||||
log_file_dir: dir.path().to_str().unwrap().to_string(),
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
logstore.start().await.unwrap();
|
||||
let namespaces = logstore.list_namespaces().await.unwrap();
|
||||
@@ -368,6 +371,7 @@ mod tests {
|
||||
log_file_dir: dir.path().to_str().unwrap().to_string(),
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
logstore.start().await.unwrap();
|
||||
assert!(logstore.list_namespaces().await.unwrap().is_empty());
|
||||
@@ -394,6 +398,7 @@ mod tests {
|
||||
log_file_dir: dir.path().to_str().unwrap().to_string(),
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
logstore.start().await.unwrap();
|
||||
|
||||
@@ -435,8 +440,8 @@ mod tests {
|
||||
log_file_dir: dir.path().to_str().unwrap().to_string(),
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
logstore.start().await.unwrap();
|
||||
logstore
|
||||
.append(Entry::create(1, 1, "1".as_bytes().to_vec()))
|
||||
.await
|
||||
@@ -455,6 +460,7 @@ mod tests {
|
||||
log_file_dir: dir.path().to_str().unwrap().to_string(),
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
logstore.start().await.unwrap();
|
||||
|
||||
@@ -495,7 +501,7 @@ mod tests {
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let logstore = RaftEngineLogStore::try_new(config).unwrap();
|
||||
let logstore = RaftEngineLogStore::try_new(config).await.unwrap();
|
||||
logstore.start().await.unwrap();
|
||||
let namespace = Namespace::with_id(42);
|
||||
for id in 0..4096 {
|
||||
@@ -528,7 +534,7 @@ mod tests {
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let logstore = RaftEngineLogStore::try_new(config).unwrap();
|
||||
let logstore = RaftEngineLogStore::try_new(config).await.unwrap();
|
||||
logstore.start().await.unwrap();
|
||||
let namespace = Namespace::with_id(42);
|
||||
for id in 0..1024 {
|
||||
|
||||
@@ -12,7 +12,6 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use store_api::logstore::LogStore;
|
||||
use tempdir::TempDir;
|
||||
|
||||
use crate::raft_engine::log_store::RaftEngineLogStore;
|
||||
@@ -29,7 +28,6 @@ pub async fn create_tmp_local_file_log_store(dir: &str) -> (RaftEngineLogStore,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let logstore = RaftEngineLogStore::try_new(cfg).unwrap();
|
||||
logstore.start().await.unwrap();
|
||||
let logstore = RaftEngineLogStore::try_new(cfg).await.unwrap();
|
||||
(logstore, dir)
|
||||
}
|
||||
|
||||
@@ -107,7 +107,6 @@ mod tests {
|
||||
use mito::engine::MitoEngine;
|
||||
use storage::config::EngineConfig as StorageEngineConfig;
|
||||
use storage::EngineImpl;
|
||||
use store_api::logstore::LogStore;
|
||||
use tempdir::TempDir;
|
||||
|
||||
#[tokio::test]
|
||||
@@ -122,9 +121,7 @@ mod tests {
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let log_store = RaftEngineLogStore::try_new(log_config).unwrap();
|
||||
log_store.start().await.unwrap();
|
||||
|
||||
let log_store = RaftEngineLogStore::try_new(log_config).await.unwrap();
|
||||
let mock_engine = Arc::new(DefaultEngine::new(
|
||||
TableEngineConfig::default(),
|
||||
EngineImpl::new(
|
||||
|
||||
@@ -205,11 +205,11 @@ pub enum Error {
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Failed to mark WAL as stable, region id: {}, source: {}",
|
||||
"Failed to mark WAL as obsolete, region id: {}, source: {}",
|
||||
region_id,
|
||||
source
|
||||
))]
|
||||
MarkWalStable {
|
||||
MarkWalObsolete {
|
||||
region_id: u64,
|
||||
#[snafu(backtrace)]
|
||||
source: BoxedError,
|
||||
@@ -508,7 +508,7 @@ impl ErrorExt for Error {
|
||||
PushBatch { source, .. } => source.status_code(),
|
||||
CreateDefault { source, .. } => source.status_code(),
|
||||
ConvertChunk { source, .. } => source.status_code(),
|
||||
MarkWalStable { source, .. } => source.status_code(),
|
||||
MarkWalObsolete { source, .. } => source.status_code(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -18,7 +18,6 @@ use log_store::raft_engine::log_store::RaftEngineLogStore;
|
||||
use log_store::LogConfig;
|
||||
use object_store::backend::fs::Builder;
|
||||
use object_store::ObjectStore;
|
||||
use store_api::logstore::LogStore;
|
||||
|
||||
use crate::background::JobPoolImpl;
|
||||
use crate::engine;
|
||||
@@ -51,8 +50,7 @@ pub async fn new_store_config(
|
||||
log_file_dir: log_store_dir(store_dir),
|
||||
..Default::default()
|
||||
};
|
||||
let log_store = Arc::new(RaftEngineLogStore::try_new(log_config).unwrap());
|
||||
log_store.start().await.unwrap();
|
||||
let log_store = Arc::new(RaftEngineLogStore::try_new(log_config).await.unwrap());
|
||||
|
||||
StoreConfig {
|
||||
log_store,
|
||||
|
||||
@@ -25,7 +25,7 @@ use store_api::storage::{RegionId, SequenceNumber};
|
||||
|
||||
use crate::codec::{Decoder, Encoder};
|
||||
use crate::error::{
|
||||
DecodeWalHeaderSnafu, EncodeWalHeaderSnafu, Error, MarkWalStableSnafu, ReadWalSnafu, Result,
|
||||
DecodeWalHeaderSnafu, EncodeWalHeaderSnafu, Error, MarkWalObsoleteSnafu, ReadWalSnafu, Result,
|
||||
WalDataCorruptedSnafu, WriteWalSnafu,
|
||||
};
|
||||
use crate::proto::wal::{self, WalHeader};
|
||||
@@ -68,7 +68,7 @@ impl<S: LogStore> Wal<S> {
|
||||
.obsolete(self.namespace.clone(), seq)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(MarkWalStableSnafu {
|
||||
.context(MarkWalObsoleteSnafu {
|
||||
region_id: self.region_id,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -31,9 +31,6 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug {
|
||||
type Namespace: Namespace;
|
||||
type Entry: Entry;
|
||||
|
||||
/// Start the components and background tasks of logstore.
|
||||
async fn start(&self) -> Result<(), Self::Error>;
|
||||
|
||||
/// Stop components of logstore.
|
||||
async fn stop(&self) -> Result<(), Self::Error>;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user