feat: use raft-engine crate to reimplement logstore (#799)

* chore: remove useless method in Entry trait, add proto definition for entry and namespace

* feat: add proto definition for raft-engine based logstore

* feat: introduce RaftEngineLogstore

* feat: impl read for raft engine log store

* feat: impl raft engine logstore

* feat: raft engine logstore start and stop

* feat: add purge bg task

* fix: license header

* fix: clippy

* fix: toml files

* feat: add some test cases

* fix: CR comments

* fix: CR comments

* fix: check namespace validity and state of logstore

* fix: CR comments; add config item to control sync/async flush per write

* fix: remove unused error variants

* fix: unit tests

* fix: use compare and exchange to stop logstore

* fix: CR comments
This commit is contained in:
Lei, HUANG
2023-01-05 17:18:51 +08:00
committed by GitHub
parent afd9866709
commit 8f5ecefc90
43 changed files with 1120 additions and 2977 deletions

View File

@@ -21,8 +21,8 @@ use catalog::remote::MetaKvBackend;
use catalog::CatalogManagerRef;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_telemetry::logging::info;
use log_store::fs::config::LogConfig;
use log_store::fs::log::LocalFileLogStore;
use log_store::raft_engine::log_store::RaftEngineLogStore;
use log_store::LogConfig;
use meta_client::client::{MetaClient, MetaClientBuilder};
use meta_client::MetaClientOpts;
use mito::config::EngineConfig as TableEngineConfig;
@@ -42,7 +42,7 @@ use table::table::TableIdProviderRef;
use crate::datanode::{DatanodeOptions, ObjectStoreConfig};
use crate::error::{
self, CatalogSnafu, MetaClientInitSnafu, MissingMetasrvOptsSnafu, MissingNodeIdSnafu,
NewCatalogSnafu, Result, StartLogStoreSnafu,
NewCatalogSnafu, OpenLogStoreSnafu, Result, StartLogStoreSnafu,
};
use crate::heartbeat::HeartbeatTask;
use crate::script::ScriptExecutor;
@@ -53,7 +53,7 @@ mod grpc;
mod script;
mod sql;
pub(crate) type DefaultEngine = MitoEngine<EngineImpl<LocalFileLogStore>>;
pub(crate) type DefaultEngine = MitoEngine<EngineImpl<RaftEngineLogStore>>;
// An abstraction to read/write services.
pub struct Instance {
@@ -63,7 +63,7 @@ pub struct Instance {
pub(crate) script_executor: ScriptExecutor,
pub(crate) table_id_provider: Option<TableIdProviderRef>,
pub(crate) heartbeat_task: Option<HeartbeatTask>,
pub(crate) logstore: Arc<LocalFileLogStore>,
pub(crate) logstore: Arc<RaftEngineLogStore>,
}
pub type InstanceRef = Arc<Instance>;
@@ -71,7 +71,7 @@ pub type InstanceRef = Arc<Instance>;
impl Instance {
pub async fn new(opts: &DatanodeOptions) -> Result<Self> {
let object_store = new_object_store(&opts.storage).await?;
let logstore = Arc::new(create_local_file_log_store(&opts.wal_dir).await?);
let logstore = Arc::new(create_log_store(&opts.wal_dir).await?);
let meta_client = match opts.mode {
Mode::Standalone => None,
@@ -166,11 +166,11 @@ impl Instance {
}
pub async fn start(&self) -> Result<()> {
self.logstore.start().await.context(StartLogStoreSnafu)?;
self.catalog_manager
.start()
.await
.context(NewCatalogSnafu)?;
self.logstore.start().await.context(StartLogStoreSnafu)?;
if let Some(task) = &self.heartbeat_task {
task.start().await?;
}
@@ -293,9 +293,7 @@ async fn new_metasrv_client(node_id: u64, meta_config: &MetaClientOpts) -> Resul
Ok(meta_client)
}
pub(crate) async fn create_local_file_log_store(
path: impl AsRef<str>,
) -> Result<LocalFileLogStore> {
pub(crate) async fn create_log_store(path: impl AsRef<str>) -> Result<RaftEngineLogStore> {
let path = path.as_ref();
// create WAL directory
fs::create_dir_all(path::Path::new(path)).context(error::CreateDirSnafu { dir: path })?;
@@ -307,9 +305,6 @@ pub(crate) async fn create_local_file_log_store(
..Default::default()
};
let log_store = LocalFileLogStore::open(&log_config)
.await
.context(error::OpenLogStoreSnafu)?;
Ok(log_store)
let logstore = RaftEngineLogStore::try_new(log_config).context(OpenLogStoreSnafu)?;
Ok(logstore)
}

View File

@@ -29,7 +29,7 @@ use table::table::TableIdProvider;
use crate::datanode::DatanodeOptions;
use crate::error::Result;
use crate::heartbeat::HeartbeatTask;
use crate::instance::{create_local_file_log_store, new_object_store, DefaultEngine, Instance};
use crate::instance::{create_log_store, new_object_store, DefaultEngine, Instance};
use crate::script::ScriptExecutor;
use crate::sql::SqlHandler;
@@ -41,7 +41,7 @@ impl Instance {
pub async fn with_mock_meta_server(opts: &DatanodeOptions, meta_srv: MockInfo) -> Result<Self> {
let object_store = new_object_store(&opts.storage).await?;
let logstore = Arc::new(create_local_file_log_store(&opts.wal_dir).await?);
let logstore = Arc::new(create_log_store(&opts.wal_dir).await?);
let meta_client = Arc::new(mock_meta_client(meta_srv, opts.node_id.unwrap_or(42)).await);
let table_engine = Arc::new(DefaultEngine::new(
TableEngineConfig::default(),

View File

@@ -125,7 +125,7 @@ mod tests {
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, SchemaBuilder, SchemaRef};
use datatypes::value::Value;
use log_store::fs::noop::NoopLogStore;
use log_store::NoopLogStore;
use mito::config::EngineConfig as TableEngineConfig;
use mito::engine::MitoEngine;
use object_store::services::fs::Builder;