feat: introduce the Noop WAL provider for datanode (#7105)

* feat: introduce noop log store

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: update config example

Signed-off-by: WenyXu <wenymedia@gmail.com>

* test: add noop wal tests

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2025-10-20 14:13:27 +08:00
committed by GitHub
parent 20b5b9bee4
commit 3119464ff9
14 changed files with 346 additions and 10 deletions

View File

@@ -474,7 +474,7 @@
| `meta_client.metadata_cache_ttl` | String | `10m` | TTL of the metadata cache. |
| `meta_client.metadata_cache_tti` | String | `5m` | -- |
| `wal` | -- | -- | The WAL options. |
| `wal.provider` | String | `raft_engine` | The provider of the WAL.<br/>- `raft_engine`: the wal is stored in the local file system by raft-engine.<br/>- `kafka`: it's remote wal that data is stored in Kafka. |
| `wal.provider` | String | `raft_engine` | The provider of the WAL.<br/>- `raft_engine`: the wal is stored in the local file system by raft-engine.<br/>- `kafka`: it's remote wal that data is stored in Kafka.<br/>- `noop`: it's a no-op WAL provider that does not store any WAL data.<br/>**Notes: any unflushed data will be lost when the datanode is shutdown.** |
| `wal.dir` | String | Unset | The directory to store the WAL files.<br/>**It's only used when the provider is `raft_engine`**. |
| `wal.file_size` | String | `128MB` | The size of the WAL segment file.<br/>**It's only used when the provider is `raft_engine`**. |
| `wal.purge_threshold` | String | `1GB` | The threshold of the WAL size to trigger a purge.<br/>**It's only used when the provider is `raft_engine`**. |

View File

@@ -118,6 +118,7 @@ metadata_cache_tti = "5m"
## The provider of the WAL.
## - `raft_engine`: the wal is stored in the local file system by raft-engine.
## - `kafka`: it's remote wal that data is stored in Kafka.
## - `noop`: it's a no-op WAL provider that does not store any WAL data.<br/>**Notes: any unflushed data will be lost when the datanode is shutdown.**
provider = "raft_engine"
## The directory to store the WAL files.

View File

@@ -316,6 +316,13 @@ pub enum Error {
location: Location,
source: standalone::error::Error,
},
#[snafu(display("Invalid WAL provider"))]
InvalidWalProvider {
#[snafu(implicit)]
location: Location,
source: common_wal::error::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -373,6 +380,7 @@ impl ErrorExt for Error {
}
Error::MetaClientInit { source, .. } => source.status_code(),
Error::SchemaNotFound { .. } => StatusCode::DatabaseNotFound,
Error::InvalidWalProvider { .. } => StatusCode::InvalidArguments,
}
}

View File

@@ -476,7 +476,11 @@ impl StartCommand {
.step(10)
.build(),
);
let kafka_options = opts.wal.clone().into();
let kafka_options = opts
.wal
.clone()
.try_into()
.context(error::InvalidWalProviderSnafu)?;
let wal_options_allocator = build_wal_options_allocator(&kafka_options, kv_backend.clone())
.await
.context(error::BuildWalOptionsAllocatorSnafu)?;

View File

@@ -25,6 +25,7 @@ use crate::config::kafka::common::{
};
use crate::config::kafka::{DatanodeKafkaConfig, MetasrvKafkaConfig};
use crate::config::raft_engine::RaftEngineConfig;
use crate::error::{Error, UnsupportedWalProviderSnafu};
/// Wal configurations for metasrv.
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Default)]
@@ -43,6 +44,7 @@ pub enum MetasrvWalConfig {
pub enum DatanodeWalConfig {
RaftEngine(RaftEngineConfig),
Kafka(DatanodeKafkaConfig),
Noop,
}
impl Default for DatanodeWalConfig {
@@ -51,11 +53,13 @@ impl Default for DatanodeWalConfig {
}
}
impl From<DatanodeWalConfig> for MetasrvWalConfig {
fn from(config: DatanodeWalConfig) -> Self {
impl TryFrom<DatanodeWalConfig> for MetasrvWalConfig {
type Error = Error;
fn try_from(config: DatanodeWalConfig) -> Result<Self, Self::Error> {
match config {
DatanodeWalConfig::RaftEngine(_) => Self::RaftEngine,
DatanodeWalConfig::Kafka(config) => Self::Kafka(MetasrvKafkaConfig {
DatanodeWalConfig::RaftEngine(_) => Ok(Self::RaftEngine),
DatanodeWalConfig::Kafka(config) => Ok(Self::Kafka(MetasrvKafkaConfig {
connection: config.connection,
kafka_topic: config.kafka_topic,
auto_create_topics: config.auto_create_topics,
@@ -67,7 +71,11 @@ impl From<DatanodeWalConfig> for MetasrvWalConfig {
flush_trigger_size: DEFAULT_FLUSH_TRIGGER_SIZE,
// This field won't be used in standalone mode
checkpoint_trigger_size: DEFAULT_CHECKPOINT_TRIGGER_SIZE,
}),
})),
DatanodeWalConfig::Noop => UnsupportedWalProviderSnafu {
provider: "noop".to_string(),
}
.fail(),
}
}
}

View File

@@ -92,6 +92,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Unsupported WAL provider: {}", provider))]
UnsupportedWalProvider {
provider: String,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;

View File

@@ -34,6 +34,7 @@ use common_wal::config::raft_engine::RaftEngineConfig;
use file_engine::engine::FileRegionEngine;
use log_store::kafka::log_store::KafkaLogStore;
use log_store::kafka::{GlobalIndexCollector, default_index_file};
use log_store::noop::log_store::NoopLogStore;
use log_store::raft_engine::log_store::RaftEngineLogStore;
use meta_client::MetaClientRef;
use metric_engine::engine::MetricEngine;
@@ -561,6 +562,27 @@ impl DatanodeBuilder {
self.extension_range_provider_factory.take(),
);
builder.try_build().await.context(BuildMitoEngineSnafu)?
}
DatanodeWalConfig::Noop => {
let log_store = Arc::new(NoopLogStore);
let builder = MitoEngineBuilder::new(
&opts.storage.data_home,
config,
log_store,
object_store_manager,
schema_metadata_manager,
file_ref_manager,
partition_expr_fetcher.clone(),
plugins,
);
#[cfg(feature = "enterprise")]
let builder = builder.with_extension_range_provider_factory(
self.extension_range_provider_factory.take(),
);
builder.try_build().await.context(BuildMitoEngineSnafu)?
}
};

View File

@@ -18,5 +18,6 @@
pub mod error;
pub mod kafka;
pub mod metrics;
pub mod noop;
pub mod raft_engine;
pub mod test_util;

15
src/log-store/src/noop.rs Normal file
View File

@@ -0,0 +1,15 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod log_store;

View File

@@ -0,0 +1,116 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use futures::stream;
use store_api::logstore::entry::{Entry, NaiveEntry};
use store_api::logstore::provider::Provider;
use store_api::logstore::{AppendBatchResponse, EntryId, LogStore, SendableEntryStream, WalIndex};
use store_api::storage::RegionId;
use crate::error::{Error, Result};
#[derive(Debug, Clone, Copy)]
pub struct NoopLogStore;
#[async_trait::async_trait]
impl LogStore for NoopLogStore {
type Error = Error;
async fn stop(&self) -> Result<()> {
Ok(())
}
async fn append_batch(&self, entries: Vec<Entry>) -> Result<AppendBatchResponse> {
let last_entry_ids = entries
.iter()
.map(|entry| (entry.region_id(), 0))
.collect::<HashMap<RegionId, EntryId>>();
Ok(AppendBatchResponse { last_entry_ids })
}
async fn read(
&self,
_provider: &Provider,
_entry_id: EntryId,
_index: Option<WalIndex>,
) -> Result<SendableEntryStream<'static, Entry, Self::Error>> {
Ok(Box::pin(stream::empty()))
}
async fn create_namespace(&self, _ns: &Provider) -> Result<()> {
Ok(())
}
async fn delete_namespace(&self, _ns: &Provider) -> Result<()> {
Ok(())
}
async fn list_namespaces(&self) -> Result<Vec<Provider>> {
Ok(vec![])
}
fn entry(
&self,
data: Vec<u8>,
entry_id: EntryId,
region_id: RegionId,
provider: &Provider,
) -> Result<Entry> {
Ok(Entry::Naive(NaiveEntry {
provider: provider.clone(),
region_id,
entry_id,
data,
}))
}
async fn obsolete(
&self,
_provider: &Provider,
_region_id: RegionId,
_entry_id: EntryId,
) -> Result<()> {
Ok(())
}
fn latest_entry_id(&self, _provider: &Provider) -> Result<EntryId> {
Ok(0)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_append_batch() {
let log_store = NoopLogStore;
let entries = vec![Entry::Naive(NaiveEntry {
provider: Provider::noop_provider(),
region_id: RegionId::new(1, 1),
entry_id: 1,
data: vec![1],
})];
let last_entry_ids = log_store
.append_batch(entries)
.await
.unwrap()
.last_entry_ids;
assert_eq!(last_entry_ids.len(), 1);
assert_eq!(last_entry_ids[&(RegionId::new(1, 1))], 0);
}
}

View File

@@ -25,6 +25,7 @@ use common_wal::options::WalOptions;
use futures::StreamExt;
use futures::future::BoxFuture;
use log_store::kafka::log_store::KafkaLogStore;
use log_store::noop::log_store::NoopLogStore;
use log_store::raft_engine::log_store::RaftEngineLogStore;
use object_store::manager::ObjectStoreManagerRef;
use object_store::util::{join_dir, normalize_dir};
@@ -367,7 +368,8 @@ impl RegionOpener {
match wal_options {
WalOptions::RaftEngine => {
ensure!(
TypeId::of::<RaftEngineLogStore>() == TypeId::of::<S>(),
TypeId::of::<RaftEngineLogStore>() == TypeId::of::<S>()
|| TypeId::of::<NoopLogStore>() == TypeId::of::<S>(),
error::IncompatibleWalProviderChangeSnafu {
global: "`kafka`",
region: "`raft_engine`",
@@ -377,7 +379,8 @@ impl RegionOpener {
}
WalOptions::Kafka(options) => {
ensure!(
TypeId::of::<KafkaLogStore>() == TypeId::of::<S>(),
TypeId::of::<KafkaLogStore>() == TypeId::of::<S>()
|| TypeId::of::<NoopLogStore>() == TypeId::of::<S>(),
error::IncompatibleWalProviderChangeSnafu {
global: "`raft_engine`",
region: "`kafka`",

View File

@@ -202,7 +202,7 @@ impl GreptimeDbStandaloneBuilder {
.step(10)
.build(),
);
let kafka_options = opts.wal.clone().into();
let kafka_options = opts.wal.clone().try_into().unwrap();
let wal_options_allocator = build_wal_options_allocator(&kafka_options, kv_backend.clone())
.await
.unwrap();

View File

@@ -13,6 +13,7 @@
// limitations under the License.
mod instance_kafka_wal_test;
mod instance_noop_wal_test;
mod instance_test;
mod promql_test;
mod reconcile_table;

View File

@@ -0,0 +1,150 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use client::OutputData;
use common_recordbatch::util::collect_batches;
use common_test_util::recordbatch::check_output_stream;
use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig};
use crate::cluster::GreptimeDbClusterBuilder;
use crate::tests::test_util::{
MockInstance, MockInstanceBuilder, RebuildableMockInstance, TestContext, execute_sql,
};
pub(crate) async fn distributed_with_noop_wal() -> TestContext {
common_telemetry::init_default_ut_logging();
let test_name = uuid::Uuid::new_v4().to_string();
let builder = GreptimeDbClusterBuilder::new(&test_name)
.await
.with_datanode_wal_config(DatanodeWalConfig::Noop)
.with_metasrv_wal_config(MetasrvWalConfig::RaftEngine);
TestContext::new(MockInstanceBuilder::Distributed(builder)).await
}
#[tokio::test]
async fn test_mito_engine() {
let mut test_context = distributed_with_noop_wal().await;
let frontend = test_context.frontend();
let sql = r#"create table demo(
host STRING,
cpu DOUBLE,
memory DOUBLE,
ts timestamp,
TIME INDEX(ts)
)"#;
let output = execute_sql(&frontend, sql).await.data;
assert!(matches!(output, OutputData::AffectedRows(0)));
let output = execute_sql(
&frontend,
"insert into demo(host, cpu, memory, ts) values ('host1', 1.1, 1024, 1655276557000)",
)
.await
.data;
assert!(matches!(output, OutputData::AffectedRows(1)));
let output = execute_sql(&frontend, "select * from demo order by ts")
.await
.data;
let expected = r#"+-------+-----+--------+---------------------+
| host | cpu | memory | ts |
+-------+-----+--------+---------------------+
| host1 | 1.1 | 1024.0 | 2022-06-15T07:02:37 |
+-------+-----+--------+---------------------+"#;
check_output_stream(output, expected).await;
test_context.rebuild().await;
let frontend = test_context.frontend();
let output = execute_sql(&frontend, "select * from demo order by ts")
.await
.data;
// Unflushed data should be lost.
let expected = r#"++
++"#;
check_output_stream(output, expected).await;
let output = execute_sql(
&frontend,
"insert into demo(host, cpu, memory, ts) values ('host1', 1.1, 1024, 1655276557000)",
)
.await
.data;
assert!(matches!(output, OutputData::AffectedRows(1)));
execute_sql(&frontend, "admin flush_table('demo')").await;
test_context.rebuild().await;
let frontend = test_context.frontend();
let output = execute_sql(&frontend, "select * from demo order by ts")
.await
.data;
let expected = r#"+-------+-----+--------+---------------------+
| host | cpu | memory | ts |
+-------+-----+--------+---------------------+
| host1 | 1.1 | 1024.0 | 2022-06-15T07:02:37 |
+-------+-----+--------+---------------------+"#;
check_output_stream(output, expected).await;
}
#[tokio::test]
async fn test_metric_engine() {
let mut test_context = distributed_with_noop_wal().await;
let frontend = test_context.frontend();
let sql = r#"CREATE TABLE phy (ts timestamp time index, val double) engine=metric with ("physical_metric_table" = "");"#;
let output = execute_sql(&frontend, sql).await.data;
assert!(matches!(output, OutputData::AffectedRows(0)));
let sql = r#"CREATE TABLE t1 (ts timestamp time index, val double, host string primary key) engine = metric with ("on_physical_table" = "phy");"#;
let output = execute_sql(&frontend, sql).await.data;
assert!(matches!(output, OutputData::AffectedRows(0)));
// The logical table should be lost.
test_context.rebuild().await;
let frontend = test_context.frontend();
let output = execute_sql(&frontend, "select * from t1").await;
let err = unwrap_err(output.data).await;
// Should returns region not found error.
assert!(err.contains("not found"));
let sql = r#"CREATE TABLE t2 (ts timestamp time index, job string primary key, val double) engine = metric with ("on_physical_table" = "phy");"#;
let output = execute_sql(&frontend, sql).await.data;
assert!(matches!(output, OutputData::AffectedRows(0)));
execute_sql(
&frontend,
"INSERT INTO t2 VALUES ('job1', 0, 0), ('job2', 1, 1);",
)
.await;
execute_sql(&frontend, "admin flush_table('phy')").await;
test_context.rebuild().await;
let frontend = test_context.frontend();
let output = execute_sql(&frontend, "select * from t2 order by job").await;
let expected = r#"+------+-------------------------+-----+
| job | ts | val |
+------+-------------------------+-----+
| job1 | 1970-01-01T00:00:00 | 0.0 |
| job2 | 1970-01-01T00:00:00.001 | 1.0 |
+------+-------------------------+-----+"#;
check_output_stream(output.data, expected).await;
}
async fn unwrap_err(output: OutputData) -> String {
let error = match output {
OutputData::Stream(stream) => collect_batches(stream).await.unwrap_err(),
_ => unreachable!(),
};
format!("{error:?}")
}