mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-19 06:20:38 +00:00
feat(remote_wal): add skeleton for remote wal related to datanode (#2941)
* refactor: refactor wal config * test: update tests related to wal * feat: introduce kafka wal config * chore: augment proto with wal options * feat: augment region open request with wal options * feat: augment mito region with wal options * feat: augment region create request with wal options * refactor: refactor log store trait * feat: add skeleton for kafka log store * feat: generalize building log store when starting datanode * feat: integrate wal options to region write * chore: minor update * refactor: remove wal options from region create/open requests * fix: compliation issues * chore: insert wal options into region options upon initializing region server * chore: integrate wal options into region options * chore: fill in kafka wal config * chore: reuse namespaces while writing to wal * chore: minor update * chore: fetch wal options from region while handling truncate/flush * fix: region options test * fix: resolve some review conversations * refactor: serde with wal options * fix: resolve some review conversations
This commit is contained in:
48
Cargo.lock
generated
48
Cargo.lock
generated
@@ -1624,7 +1624,11 @@ version = "0.4.4"
|
||||
dependencies = [
|
||||
"common-base",
|
||||
"humantime-serde",
|
||||
"rskafka",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"serde_with",
|
||||
"toml 0.7.8",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -1812,6 +1816,7 @@ dependencies = [
|
||||
"bytes",
|
||||
"chrono",
|
||||
"common-catalog",
|
||||
"common-config",
|
||||
"common-error",
|
||||
"common-grpc-expr",
|
||||
"common-macro",
|
||||
@@ -2152,6 +2157,15 @@ version = "2.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5"
|
||||
|
||||
[[package]]
|
||||
name = "crc32c"
|
||||
version = "0.6.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d8f48d60e5b4d2c53d5c2b1d8a58c849a70ae5e5509b08a48d047e3b65714a74"
|
||||
dependencies = [
|
||||
"rustc_version",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crc32fast"
|
||||
version = "1.3.2"
|
||||
@@ -4083,6 +4097,12 @@ version = "3.0.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02"
|
||||
|
||||
[[package]]
|
||||
name = "integer-encoding"
|
||||
version = "4.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "924df4f0e24e2e7f9cdd90babb0b96f93b20f3ecfa949ea9e6613756b8c8e1bf"
|
||||
|
||||
[[package]]
|
||||
name = "inventory"
|
||||
version = "0.3.13"
|
||||
@@ -4900,6 +4920,7 @@ dependencies = [
|
||||
"chrono",
|
||||
"common-base",
|
||||
"common-catalog",
|
||||
"common-config",
|
||||
"common-datasource",
|
||||
"common-decimal",
|
||||
"common-error",
|
||||
@@ -7382,6 +7403,30 @@ dependencies = [
|
||||
"zeroize",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rskafka"
|
||||
version = "0.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "132ecfa3cd9c3825208524a80881f115337762904ad3f0174e87975b2d79162c"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"bytes",
|
||||
"chrono",
|
||||
"crc32c",
|
||||
"flate2",
|
||||
"futures",
|
||||
"integer-encoding 4.0.0",
|
||||
"lz4",
|
||||
"parking_lot 0.12.1",
|
||||
"pin-project-lite",
|
||||
"rand",
|
||||
"snap",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"zstd 0.12.4",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rstest"
|
||||
version = "0.17.0"
|
||||
@@ -8976,6 +9021,7 @@ dependencies = [
|
||||
"async-trait",
|
||||
"bytes",
|
||||
"common-base",
|
||||
"common-config",
|
||||
"common-error",
|
||||
"common-macro",
|
||||
"common-query",
|
||||
@@ -9502,7 +9548,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7e54bc85fc7faa8bc175c4bab5b92ba8d9a3ce893d0e9f42cc455c8ab16a9e09"
|
||||
dependencies = [
|
||||
"byteorder",
|
||||
"integer-encoding",
|
||||
"integer-encoding 3.0.4",
|
||||
"ordered-float 2.10.1",
|
||||
]
|
||||
|
||||
|
||||
@@ -29,9 +29,15 @@ connect_timeout = "1s"
|
||||
# `TCP_NODELAY` option for accepted connections, true by default.
|
||||
tcp_nodelay = true
|
||||
|
||||
# WAL options, see `standalone.example.toml`.
|
||||
# WAL options.
|
||||
# Currently, users are expected to choose the wal through the provider field.
|
||||
# When a wal provider is chose, the user should comment out all other wal config
|
||||
# except those corresponding to the chosen one.
|
||||
[wal]
|
||||
# WAL data directory
|
||||
provider = "raft_engine"
|
||||
|
||||
# Raft-engine wal options, see `standalone.example.toml`
|
||||
# dir = "/tmp/greptimedb/wal"
|
||||
file_size = "256MB"
|
||||
purge_threshold = "4GB"
|
||||
@@ -39,6 +45,22 @@ purge_interval = "10m"
|
||||
read_batch_size = 128
|
||||
sync_write = false
|
||||
|
||||
# Kafka wal options.
|
||||
# The broker endpoints of the Kafka cluster. ["127.0.0.1:9090"] by default.
|
||||
# broker_endpoints = ["127.0.0.1:9090"]
|
||||
# Number of topics shall be created beforehand.
|
||||
# num_topics = 64
|
||||
# Topic name prefix.
|
||||
# topic_name_prefix = "greptimedb_wal_kafka_topic"
|
||||
# Number of partitions per topic.
|
||||
# num_partitions = 1
|
||||
# The maximum log size an rskafka batch producer could buffer.
|
||||
# max_batch_size = "4MB"
|
||||
# The linger duration of an rskafka batch producer.
|
||||
# linger = "200ms"
|
||||
# The maximum amount of time (in milliseconds) to wait for Kafka records to be returned.
|
||||
# max_wait_time = "100ms"
|
||||
|
||||
# Storage options, see `standalone.example.toml`.
|
||||
[storage]
|
||||
# The working home directory.
|
||||
|
||||
@@ -82,6 +82,13 @@ enable = true
|
||||
|
||||
# WAL options.
|
||||
[wal]
|
||||
# Available wal providers:
|
||||
# - "RaftEngine" (default)
|
||||
# - "Kafka"
|
||||
provider = "raft_engine"
|
||||
|
||||
# There's no kafka wal config for standalone mode.
|
||||
|
||||
# WAL data directory
|
||||
# dir = "/tmp/greptimedb/wal"
|
||||
# WAL file size in bytes.
|
||||
|
||||
@@ -18,7 +18,8 @@ use std::time::Duration;
|
||||
use async_trait::async_trait;
|
||||
use catalog::kvbackend::MetaKvBackend;
|
||||
use clap::Parser;
|
||||
use common_telemetry::logging;
|
||||
use common_config::WalConfig;
|
||||
use common_telemetry::{info, logging};
|
||||
use datanode::config::DatanodeOptions;
|
||||
use datanode::datanode::{Datanode, DatanodeBuilder};
|
||||
use meta_client::MetaClientOptions;
|
||||
@@ -166,8 +167,18 @@ impl StartCommand {
|
||||
opts.storage.data_home = data_home.clone();
|
||||
}
|
||||
|
||||
if let Some(wal_dir) = &self.wal_dir {
|
||||
opts.wal.dir = Some(wal_dir.clone());
|
||||
// `wal_dir` only affects raft-engine config.
|
||||
if let Some(wal_dir) = &self.wal_dir
|
||||
&& let WalConfig::RaftEngine(raft_engine_config) = &mut opts.wal
|
||||
{
|
||||
if raft_engine_config
|
||||
.dir
|
||||
.as_ref()
|
||||
.is_some_and(|original_dir| original_dir != wal_dir)
|
||||
{
|
||||
info!("The wal dir of raft-engine is altered to {wal_dir}");
|
||||
}
|
||||
raft_engine_config.dir.replace(wal_dir.clone());
|
||||
}
|
||||
|
||||
if let Some(http_addr) = &self.http_addr {
|
||||
@@ -256,6 +267,7 @@ mod tests {
|
||||
tcp_nodelay = true
|
||||
|
||||
[wal]
|
||||
provider = "raft_engine"
|
||||
dir = "/other/wal"
|
||||
file_size = "1GB"
|
||||
purge_threshold = "50GB"
|
||||
@@ -293,12 +305,18 @@ mod tests {
|
||||
|
||||
assert_eq!("127.0.0.1:3001".to_string(), options.rpc_addr);
|
||||
assert_eq!(Some(42), options.node_id);
|
||||
assert_eq!("/other/wal", options.wal.dir.unwrap());
|
||||
|
||||
assert_eq!(Duration::from_secs(600), options.wal.purge_interval);
|
||||
assert_eq!(1024 * 1024 * 1024, options.wal.file_size.0);
|
||||
assert_eq!(1024 * 1024 * 1024 * 50, options.wal.purge_threshold.0);
|
||||
assert!(!options.wal.sync_write);
|
||||
let WalConfig::RaftEngine(raft_engine_config) = options.wal else {
|
||||
unreachable!()
|
||||
};
|
||||
assert_eq!("/other/wal", raft_engine_config.dir.unwrap());
|
||||
assert_eq!(Duration::from_secs(600), raft_engine_config.purge_interval);
|
||||
assert_eq!(1024 * 1024 * 1024, raft_engine_config.file_size.0);
|
||||
assert_eq!(
|
||||
1024 * 1024 * 1024 * 50,
|
||||
raft_engine_config.purge_threshold.0
|
||||
);
|
||||
assert!(!raft_engine_config.sync_write);
|
||||
|
||||
let HeartbeatOptions {
|
||||
interval: heart_beat_interval,
|
||||
@@ -412,9 +430,10 @@ mod tests {
|
||||
tcp_nodelay = true
|
||||
|
||||
[wal]
|
||||
provider = "raft_engine"
|
||||
file_size = "1GB"
|
||||
purge_threshold = "50GB"
|
||||
purge_interval = "10m"
|
||||
purge_interval = "5m"
|
||||
sync_write = false
|
||||
|
||||
[storage]
|
||||
@@ -475,7 +494,10 @@ mod tests {
|
||||
};
|
||||
|
||||
// Should be read from env, env > default values.
|
||||
assert_eq!(opts.wal.read_batch_size, 100,);
|
||||
let WalConfig::RaftEngine(raft_engine_config) = opts.wal else {
|
||||
unreachable!()
|
||||
};
|
||||
assert_eq!(raft_engine_config.read_batch_size, 100);
|
||||
assert_eq!(
|
||||
opts.meta_client.unwrap().metasrv_addrs,
|
||||
vec![
|
||||
@@ -486,10 +508,13 @@ mod tests {
|
||||
);
|
||||
|
||||
// Should be read from config file, config file > env > default values.
|
||||
assert_eq!(opts.wal.purge_interval, Duration::from_secs(60 * 10));
|
||||
assert_eq!(
|
||||
raft_engine_config.purge_interval,
|
||||
Duration::from_secs(60 * 5)
|
||||
);
|
||||
|
||||
// Should be read from cli, cli > config file > env > default values.
|
||||
assert_eq!(opts.wal.dir.unwrap(), "/other/wal/dir");
|
||||
assert_eq!(raft_engine_config.dir.unwrap(), "/other/wal/dir");
|
||||
|
||||
// Should be default value.
|
||||
assert_eq!(opts.http.addr, DatanodeOptions::default().http.addr);
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#![feature(assert_matches)]
|
||||
#![feature(assert_matches, let_chains)]
|
||||
|
||||
use async_trait::async_trait;
|
||||
use clap::arg;
|
||||
|
||||
@@ -171,6 +171,7 @@ impl Options {
|
||||
mod tests {
|
||||
use std::io::Write;
|
||||
|
||||
use common_config::WalConfig;
|
||||
use common_test_util::temp_dir::create_named_temp_file;
|
||||
use datanode::config::{DatanodeOptions, ObjectStoreConfig};
|
||||
|
||||
@@ -194,6 +195,7 @@ mod tests {
|
||||
tcp_nodelay = true
|
||||
|
||||
[wal]
|
||||
provider = "raft_engine"
|
||||
dir = "/tmp/greptimedb/wal"
|
||||
file_size = "1GB"
|
||||
purge_threshold = "50GB"
|
||||
@@ -277,7 +279,10 @@ mod tests {
|
||||
);
|
||||
|
||||
// Should be the values from config file, not environment variables.
|
||||
assert_eq!(opts.wal.dir.unwrap(), "/tmp/greptimedb/wal");
|
||||
let WalConfig::RaftEngine(raft_engine_config) = opts.wal else {
|
||||
unreachable!()
|
||||
};
|
||||
assert_eq!(raft_engine_config.dir.unwrap(), "/tmp/greptimedb/wal");
|
||||
|
||||
// Should be default values.
|
||||
assert_eq!(opts.node_id, None);
|
||||
|
||||
@@ -500,6 +500,7 @@ mod tests {
|
||||
enable_memory_catalog = true
|
||||
|
||||
[wal]
|
||||
provider = "raft_engine"
|
||||
dir = "/tmp/greptimedb/test/wal"
|
||||
file_size = "1GB"
|
||||
purge_threshold = "50GB"
|
||||
@@ -562,7 +563,10 @@ mod tests {
|
||||
assert_eq!(None, fe_opts.mysql.reject_no_database);
|
||||
assert!(fe_opts.influxdb.enable);
|
||||
|
||||
assert_eq!("/tmp/greptimedb/test/wal", dn_opts.wal.dir.unwrap());
|
||||
let WalConfig::RaftEngine(raft_engine_config) = dn_opts.wal else {
|
||||
unreachable!()
|
||||
};
|
||||
assert_eq!("/tmp/greptimedb/test/wal", raft_engine_config.dir.unwrap());
|
||||
|
||||
assert!(matches!(
|
||||
&dn_opts.storage.store,
|
||||
|
||||
@@ -7,4 +7,8 @@ license.workspace = true
|
||||
[dependencies]
|
||||
common-base.workspace = true
|
||||
humantime-serde.workspace = true
|
||||
rskafka = "0.5"
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
serde_with = "3"
|
||||
toml.workspace = true
|
||||
|
||||
@@ -12,41 +12,12 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::time::Duration;
|
||||
pub mod wal;
|
||||
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
#[serde(default)]
|
||||
pub struct WalConfig {
|
||||
// wal directory
|
||||
pub dir: Option<String>,
|
||||
// wal file size in bytes
|
||||
pub file_size: ReadableSize,
|
||||
// wal purge threshold in bytes
|
||||
pub purge_threshold: ReadableSize,
|
||||
// purge interval in seconds
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub purge_interval: Duration,
|
||||
// read batch size
|
||||
pub read_batch_size: usize,
|
||||
// whether to sync log file after every write
|
||||
pub sync_write: bool,
|
||||
}
|
||||
|
||||
impl Default for WalConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
dir: None,
|
||||
file_size: ReadableSize::mb(256), // log file size 256MB
|
||||
purge_threshold: ReadableSize::gb(4), // purge threshold 4GB
|
||||
purge_interval: Duration::from_secs(600),
|
||||
read_batch_size: 128,
|
||||
sync_write: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
pub use crate::wal::{KafkaWalOptions, WalConfig, WalOptions, WAL_OPTIONS_KEY};
|
||||
|
||||
pub fn metadata_store_dir(store_dir: &str) -> String {
|
||||
format!("{store_dir}/metadata")
|
||||
|
||||
116
src/common/config/src/wal.rs
Normal file
116
src/common/config/src/wal.rs
Normal file
@@ -0,0 +1,116 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
pub mod kafka;
|
||||
pub mod raft_engine;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_with::with_prefix;
|
||||
|
||||
pub use crate::wal::kafka::{KafkaConfig, KafkaOptions as KafkaWalOptions, Topic as KafkaWalTopic};
|
||||
pub use crate::wal::raft_engine::RaftEngineConfig;
|
||||
|
||||
/// An encoded wal options will be wrapped into a (WAL_OPTIONS_KEY, encoded wal options) key-value pair
|
||||
/// and inserted into the options of a `RegionCreateRequest`.
|
||||
pub const WAL_OPTIONS_KEY: &str = "wal_options";
|
||||
|
||||
/// Wal config for datanode.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
#[serde(tag = "provider")]
|
||||
pub enum WalConfig {
|
||||
#[serde(rename = "raft_engine")]
|
||||
RaftEngine(RaftEngineConfig),
|
||||
#[serde(rename = "kafka")]
|
||||
Kafka(KafkaConfig),
|
||||
}
|
||||
|
||||
impl Default for WalConfig {
|
||||
fn default() -> Self {
|
||||
WalConfig::RaftEngine(RaftEngineConfig::default())
|
||||
}
|
||||
}
|
||||
|
||||
/// Wal options allocated to a region.
|
||||
/// A wal options is encoded by metasrv with `serde_json::to_string`, and then decoded
|
||||
/// by datanode with `serde_json::from_str`.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
|
||||
#[serde(tag = "wal.provider")]
|
||||
pub enum WalOptions {
|
||||
#[default]
|
||||
#[serde(rename = "raft_engine")]
|
||||
RaftEngine,
|
||||
#[serde(rename = "kafka")]
|
||||
#[serde(with = "prefix_wal_kafka")]
|
||||
Kafka(KafkaWalOptions),
|
||||
}
|
||||
|
||||
with_prefix!(prefix_wal_kafka "wal.kafka.");
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::time::Duration;
|
||||
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use rskafka::client::partition::Compression as RsKafkaCompression;
|
||||
|
||||
use crate::wal::{KafkaConfig, KafkaWalOptions, WalOptions};
|
||||
|
||||
#[test]
|
||||
fn test_serde_kafka_config() {
|
||||
let toml_str = r#"
|
||||
broker_endpoints = ["127.0.0.1:9090"]
|
||||
num_topics = 32
|
||||
topic_name_prefix = "greptimedb_wal_kafka_topic"
|
||||
num_partitions = 1
|
||||
max_batch_size = "4MB"
|
||||
linger = "200ms"
|
||||
max_wait_time = "100ms"
|
||||
"#;
|
||||
let decoded: KafkaConfig = toml::from_str(toml_str).unwrap();
|
||||
let expected = KafkaConfig {
|
||||
broker_endpoints: vec!["127.0.0.1:9090".to_string()],
|
||||
num_topics: 32,
|
||||
topic_name_prefix: "greptimedb_wal_kafka_topic".to_string(),
|
||||
num_partitions: 1,
|
||||
compression: RsKafkaCompression::default(),
|
||||
max_batch_size: ReadableSize::mb(4),
|
||||
linger: Duration::from_millis(200),
|
||||
max_wait_time: Duration::from_millis(100),
|
||||
};
|
||||
assert_eq!(decoded, expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_serde_wal_options() {
|
||||
// Test serde raft-engine wal options.
|
||||
let wal_options = WalOptions::RaftEngine;
|
||||
let encoded = serde_json::to_string(&wal_options).unwrap();
|
||||
let expected = r#"{"wal.provider":"raft_engine"}"#;
|
||||
assert_eq!(&encoded, expected);
|
||||
|
||||
let decoded: WalOptions = serde_json::from_str(&encoded).unwrap();
|
||||
assert_eq!(decoded, wal_options);
|
||||
|
||||
// Test serde kafka wal options.
|
||||
let wal_options = WalOptions::Kafka(KafkaWalOptions {
|
||||
topic: "test_topic".to_string(),
|
||||
});
|
||||
let encoded = serde_json::to_string(&wal_options).unwrap();
|
||||
let expected = r#"{"wal.provider":"kafka","wal.kafka.topic":"test_topic"}"#;
|
||||
assert_eq!(&encoded, expected);
|
||||
|
||||
let decoded: WalOptions = serde_json::from_str(&encoded).unwrap();
|
||||
assert_eq!(decoded, wal_options);
|
||||
}
|
||||
}
|
||||
72
src/common/config/src/wal/kafka.rs
Normal file
72
src/common/config/src/wal/kafka.rs
Normal file
@@ -0,0 +1,72 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use rskafka::client::partition::Compression as RsKafkaCompression;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
/// Topic name prefix.
|
||||
pub const TOPIC_NAME_PREFIX: &str = "greptimedb_wal_kafka_topic";
|
||||
/// Kafka wal topic.
|
||||
pub type Topic = String;
|
||||
|
||||
/// Configurations for kafka wal.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
#[serde(default)]
|
||||
pub struct KafkaConfig {
|
||||
/// The broker endpoints of the Kafka cluster.
|
||||
pub broker_endpoints: Vec<String>,
|
||||
/// Number of topics shall be created beforehand.
|
||||
pub num_topics: usize,
|
||||
/// Topic name prefix.
|
||||
pub topic_name_prefix: String,
|
||||
/// Number of partitions per topic.
|
||||
pub num_partitions: i32,
|
||||
/// The compression algorithm used to compress log entries.
|
||||
#[serde(skip)]
|
||||
#[serde(default)]
|
||||
pub compression: RsKafkaCompression,
|
||||
/// The maximum log size an rskakfa batch producer could buffer.
|
||||
pub max_batch_size: ReadableSize,
|
||||
/// The linger duration of an rskafka batch producer.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub linger: Duration,
|
||||
/// The maximum amount of time (in milliseconds) to wait for Kafka records to be returned.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub max_wait_time: Duration,
|
||||
}
|
||||
|
||||
impl Default for KafkaConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
broker_endpoints: vec!["127.0.0.1:9090".to_string()],
|
||||
num_topics: 64,
|
||||
topic_name_prefix: TOPIC_NAME_PREFIX.to_string(),
|
||||
num_partitions: 1,
|
||||
compression: RsKafkaCompression::NoCompression,
|
||||
max_batch_size: ReadableSize::mb(4),
|
||||
linger: Duration::from_millis(200),
|
||||
max_wait_time: Duration::from_millis(100),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Kafka wal options allocated to a region.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub struct KafkaOptions {
|
||||
/// Kafka wal topic.
|
||||
pub topic: Topic,
|
||||
}
|
||||
50
src/common/config/src/wal/raft_engine.rs
Normal file
50
src/common/config/src/wal/raft_engine.rs
Normal file
@@ -0,0 +1,50 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
/// Configurations for raft-engine wal.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
#[serde(default)]
|
||||
pub struct RaftEngineConfig {
|
||||
// wal directory
|
||||
pub dir: Option<String>,
|
||||
// wal file size in bytes
|
||||
pub file_size: ReadableSize,
|
||||
// wal purge threshold in bytes
|
||||
pub purge_threshold: ReadableSize,
|
||||
// purge interval in seconds
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub purge_interval: Duration,
|
||||
// read batch size
|
||||
pub read_batch_size: usize,
|
||||
// whether to sync log file after every write
|
||||
pub sync_write: bool,
|
||||
}
|
||||
|
||||
impl Default for RaftEngineConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
dir: None,
|
||||
file_size: ReadableSize::mb(256),
|
||||
purge_threshold: ReadableSize::gb(4),
|
||||
purge_interval: Duration::from_secs(600),
|
||||
read_batch_size: 128,
|
||||
sync_write: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -15,6 +15,7 @@ async-trait.workspace = true
|
||||
base64.workspace = true
|
||||
bytes.workspace = true
|
||||
common-catalog.workspace = true
|
||||
common-config.workspace = true
|
||||
common-error.workspace = true
|
||||
common-grpc-expr.workspace = true
|
||||
common-macro.workspace = true
|
||||
|
||||
@@ -21,6 +21,7 @@ use api::v1::region::{
|
||||
use api::v1::{ColumnDef, CreateTableExpr, SemanticType};
|
||||
use async_trait::async_trait;
|
||||
use common_catalog::consts::METRIC_ENGINE;
|
||||
use common_config::WAL_OPTIONS_KEY;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_procedure::error::{
|
||||
ExternalSnafu, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
|
||||
@@ -47,7 +48,6 @@ use crate::rpc::ddl::CreateTableTask;
|
||||
use crate::rpc::router::{
|
||||
find_leader_regions, find_leaders, operating_leader_regions, RegionRoute,
|
||||
};
|
||||
use crate::wal::WAL_OPTIONS_KEY;
|
||||
|
||||
pub struct CreateTableProcedure {
|
||||
pub context: DdlContext,
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
use std::str::Utf8Error;
|
||||
|
||||
use common_config::wal::WalOptions;
|
||||
use common_error::ext::{BoxedError, ErrorExt};
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_macro::stack_trace_debug;
|
||||
@@ -23,7 +24,6 @@ use store_api::storage::{RegionId, RegionNumber};
|
||||
use table::metadata::TableId;
|
||||
|
||||
use crate::peer::Peer;
|
||||
use crate::wal::WalOptions;
|
||||
use crate::DatanodeId;
|
||||
|
||||
#[derive(Snafu)]
|
||||
|
||||
@@ -22,14 +22,10 @@ use serde_with::with_prefix;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::wal::kafka::KafkaConfig;
|
||||
pub use crate::wal::kafka::{KafkaOptions as KafkaWalOptions, Topic as KafkaWalTopic};
|
||||
pub use crate::wal::kafka::Topic as KafkaWalTopic;
|
||||
pub use crate::wal::options_allocator::{build_wal_options_allocator, WalOptionsAllocator};
|
||||
|
||||
/// An encoded wal options will be wrapped into a (WAL_OPTIONS_KEY, encoded wal options) key-value pair
|
||||
/// and inserted into the options of a `RegionCreateRequest`.
|
||||
pub const WAL_OPTIONS_KEY: &str = "wal_options";
|
||||
|
||||
/// Wal configurations for bootstrapping metasrv.
|
||||
/// Wal config for metasrv.
|
||||
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)]
|
||||
#[serde(tag = "provider")]
|
||||
pub enum WalConfig {
|
||||
@@ -40,23 +36,6 @@ pub enum WalConfig {
|
||||
Kafka(KafkaConfig),
|
||||
}
|
||||
|
||||
/// Wal options allocated to a region.
|
||||
/// A wal options is encoded by metasrv into a `String` with `serde_json::to_string`.
|
||||
/// It's then decoded by datanode to a `HashMap<String, String>` with `serde_json::from_str`.
|
||||
/// Such a encoding/decoding scheme is inspired by the encoding/decoding of `RegionOptions`.
|
||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
|
||||
#[serde(tag = "wal.provider")]
|
||||
pub enum WalOptions {
|
||||
#[default]
|
||||
#[serde(rename = "raft_engine")]
|
||||
RaftEngine,
|
||||
#[serde(rename = "kafka")]
|
||||
#[serde(with = "prefix_wal_kafka")]
|
||||
Kafka(KafkaWalOptions),
|
||||
}
|
||||
|
||||
with_prefix!(prefix_wal_kafka "wal.kafka.");
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
@@ -101,27 +80,4 @@ mod tests {
|
||||
};
|
||||
assert_eq!(wal_config, WalConfig::Kafka(expected_kafka_config));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_serde_wal_options() {
|
||||
// Test serde raft-engine wal options.
|
||||
let wal_options = WalOptions::RaftEngine;
|
||||
let encoded = serde_json::to_string(&wal_options).unwrap();
|
||||
let expected = r#"{"wal.provider":"raft_engine"}"#;
|
||||
assert_eq!(&encoded, expected);
|
||||
|
||||
let decoded: WalOptions = serde_json::from_str(&encoded).unwrap();
|
||||
assert_eq!(decoded, wal_options);
|
||||
|
||||
// Test serde kafka wal options.
|
||||
let wal_options = WalOptions::Kafka(KafkaWalOptions {
|
||||
topic: "test_topic".to_string(),
|
||||
});
|
||||
let encoded = serde_json::to_string(&wal_options).unwrap();
|
||||
let expected = r#"{"wal.provider":"kafka","wal.kafka.topic":"test_topic"}"#;
|
||||
assert_eq!(&encoded, expected);
|
||||
|
||||
let decoded: WalOptions = serde_json::from_str(&encoded).unwrap();
|
||||
assert_eq!(decoded, wal_options);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,7 +22,7 @@ pub use crate::wal::kafka::topic::Topic;
|
||||
pub use crate::wal::kafka::topic_manager::TopicManager;
|
||||
use crate::wal::kafka::topic_selector::SelectorType as TopicSelectorType;
|
||||
|
||||
/// Configurations for bootstrapping a kafka wal.
|
||||
/// Configurations for kafka wal.
|
||||
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
|
||||
pub struct KafkaConfig {
|
||||
/// The broker endpoints of the Kafka cluster.
|
||||
@@ -51,10 +51,3 @@ impl Default for KafkaConfig {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Kafka wal options allocated to a region.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub struct KafkaOptions {
|
||||
/// Kafka wal topic.
|
||||
pub topic: Topic,
|
||||
}
|
||||
|
||||
@@ -15,13 +15,14 @@
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_config::{KafkaWalOptions, WalOptions};
|
||||
use snafu::ResultExt;
|
||||
use store_api::storage::RegionNumber;
|
||||
|
||||
use crate::error::{EncodeWalOptionsToJsonSnafu, Result};
|
||||
use crate::kv_backend::KvBackendRef;
|
||||
use crate::wal::kafka::{KafkaOptions, TopicManager as KafkaTopicManager};
|
||||
use crate::wal::{WalConfig, WalOptions};
|
||||
use crate::wal::kafka::TopicManager as KafkaTopicManager;
|
||||
use crate::wal::WalConfig;
|
||||
|
||||
/// Allocates wal options in region granularity.
|
||||
#[derive(Default)]
|
||||
@@ -55,7 +56,7 @@ impl WalOptionsAllocator {
|
||||
let topics = topic_manager.select_batch(num_regions);
|
||||
topics
|
||||
.into_iter()
|
||||
.map(|topic| WalOptions::Kafka(KafkaOptions { topic }))
|
||||
.map(|topic| WalOptions::Kafka(KafkaWalOptions { topic }))
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,6 +21,8 @@ use std::sync::Arc;
|
||||
|
||||
use catalog::memory::MemoryCatalogManager;
|
||||
use common_base::Plugins;
|
||||
use common_config::wal::{KafkaConfig, RaftEngineConfig};
|
||||
use common_config::{WalConfig, WAL_OPTIONS_KEY};
|
||||
use common_error::ext::BoxedError;
|
||||
use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
|
||||
use common_meta::key::datanode_table::DatanodeTableManager;
|
||||
@@ -32,9 +34,11 @@ use file_engine::engine::FileRegionEngine;
|
||||
use futures::future;
|
||||
use futures_util::future::try_join_all;
|
||||
use futures_util::StreamExt;
|
||||
use log_store::kafka::log_store::KafkaLogStore;
|
||||
use log_store::raft_engine::log_store::RaftEngineLogStore;
|
||||
use meta_client::client::MetaClient;
|
||||
use metric_engine::engine::MetricEngine;
|
||||
use mito2::config::MitoConfig;
|
||||
use mito2::engine::MitoEngine;
|
||||
use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef};
|
||||
use object_store::util::normalize_dir;
|
||||
@@ -45,7 +49,6 @@ use servers::metrics_handler::MetricsHandler;
|
||||
use servers::server::{start_server, ServerHandler, ServerHandlers};
|
||||
use servers::Mode;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::logstore::LogStore;
|
||||
use store_api::path_utils::{region_dir, WAL_DIR};
|
||||
use store_api::region_engine::RegionEngineRef;
|
||||
use store_api::region_request::{RegionOpenRequest, RegionRequest};
|
||||
@@ -221,8 +224,6 @@ impl DatanodeBuilder {
|
||||
let kv_backend = self.kv_backend.take().context(MissingKvBackendSnafu)?;
|
||||
|
||||
// build and initialize region server
|
||||
let log_store = Self::build_log_store(&self.opts).await?;
|
||||
|
||||
let (region_event_listener, region_event_receiver) = if controlled_by_metasrv {
|
||||
let (tx, rx) = new_region_server_event_channel();
|
||||
(Box::new(tx) as _, Some(rx))
|
||||
@@ -230,9 +231,7 @@ impl DatanodeBuilder {
|
||||
(Box::new(NoopRegionServerEventListener) as _, None)
|
||||
};
|
||||
|
||||
let region_server = self
|
||||
.new_region_server(log_store, region_event_listener)
|
||||
.await?;
|
||||
let region_server = self.new_region_server(region_event_listener).await?;
|
||||
|
||||
self.initialize_region_server(®ion_server, kv_backend, !controlled_by_metasrv)
|
||||
.await?;
|
||||
@@ -347,11 +346,21 @@ impl DatanodeBuilder {
|
||||
while let Some(table_value) = table_values.next().await {
|
||||
let table_value = table_value.context(GetMetadataSnafu)?;
|
||||
for region_number in table_value.regions {
|
||||
// Augments region options with wal options if a wal options is provided.
|
||||
let mut region_options = table_value.region_info.region_options.clone();
|
||||
table_value
|
||||
.region_info
|
||||
.region_wal_options
|
||||
.get(®ion_number.to_string())
|
||||
.and_then(|wal_options| {
|
||||
region_options.insert(WAL_OPTIONS_KEY.to_string(), wal_options.clone())
|
||||
});
|
||||
|
||||
regions.push((
|
||||
RegionId::new(table_value.table_id, region_number),
|
||||
table_value.region_info.engine.clone(),
|
||||
table_value.region_info.region_storage_path.clone(),
|
||||
table_value.region_info.region_options.clone(),
|
||||
region_options,
|
||||
));
|
||||
}
|
||||
}
|
||||
@@ -394,7 +403,6 @@ impl DatanodeBuilder {
|
||||
|
||||
async fn new_region_server(
|
||||
&self,
|
||||
log_store: Arc<RaftEngineLogStore>,
|
||||
event_listener: RegionServerEventListenerRef,
|
||||
) -> Result<RegionServer> {
|
||||
let opts = &self.opts;
|
||||
@@ -426,7 +434,7 @@ impl DatanodeBuilder {
|
||||
);
|
||||
|
||||
let object_store_manager = Self::build_object_store_manager(opts).await?;
|
||||
let engines = Self::build_store_engines(opts, log_store, object_store_manager).await?;
|
||||
let engines = Self::build_store_engines(opts, object_store_manager).await?;
|
||||
for engine in engines {
|
||||
region_server.register_engine(engine);
|
||||
}
|
||||
@@ -436,48 +444,19 @@ impl DatanodeBuilder {
|
||||
|
||||
// internal utils
|
||||
|
||||
/// Build [RaftEngineLogStore]
|
||||
async fn build_log_store(opts: &DatanodeOptions) -> Result<Arc<RaftEngineLogStore>> {
|
||||
let data_home = normalize_dir(&opts.storage.data_home);
|
||||
let wal_dir = match &opts.wal.dir {
|
||||
Some(dir) => dir.clone(),
|
||||
None => format!("{}{WAL_DIR}", data_home),
|
||||
};
|
||||
let wal_config = opts.wal.clone();
|
||||
|
||||
// create WAL directory
|
||||
fs::create_dir_all(Path::new(&wal_dir))
|
||||
.await
|
||||
.context(CreateDirSnafu { dir: &wal_dir })?;
|
||||
info!(
|
||||
"Creating logstore with config: {:?} and storage path: {}",
|
||||
wal_config, &wal_dir
|
||||
);
|
||||
let logstore = RaftEngineLogStore::try_new(wal_dir, wal_config)
|
||||
.await
|
||||
.map_err(Box::new)
|
||||
.context(OpenLogStoreSnafu)?;
|
||||
Ok(Arc::new(logstore))
|
||||
}
|
||||
|
||||
/// Build [RegionEngineRef] from `store_engine` section in `opts`
|
||||
async fn build_store_engines<S>(
|
||||
/// Builds [RegionEngineRef] from `store_engine` section in `opts`
|
||||
async fn build_store_engines(
|
||||
opts: &DatanodeOptions,
|
||||
log_store: Arc<S>,
|
||||
object_store_manager: ObjectStoreManagerRef,
|
||||
) -> Result<Vec<RegionEngineRef>>
|
||||
where
|
||||
S: LogStore,
|
||||
{
|
||||
) -> Result<Vec<RegionEngineRef>> {
|
||||
let mut engines = vec![];
|
||||
for engine in &opts.region_engine {
|
||||
match engine {
|
||||
RegionEngineConfig::Mito(config) => {
|
||||
let mito_engine: MitoEngine = MitoEngine::new(
|
||||
config.clone(),
|
||||
log_store.clone(),
|
||||
object_store_manager.clone(),
|
||||
);
|
||||
let mito_engine =
|
||||
Self::build_mito_engine(opts, object_store_manager.clone(), config.clone())
|
||||
.await?;
|
||||
|
||||
let metric_engine = MetricEngine::new(mito_engine.clone());
|
||||
engines.push(Arc::new(mito_engine) as _);
|
||||
engines.push(Arc::new(metric_engine) as _);
|
||||
@@ -494,6 +473,61 @@ impl DatanodeBuilder {
|
||||
Ok(engines)
|
||||
}
|
||||
|
||||
/// Builds [MitoEngine] according to options.
|
||||
async fn build_mito_engine(
|
||||
opts: &DatanodeOptions,
|
||||
object_store_manager: ObjectStoreManagerRef,
|
||||
config: MitoConfig,
|
||||
) -> Result<MitoEngine> {
|
||||
let mito_engine = match &opts.wal {
|
||||
WalConfig::RaftEngine(raft_engine_config) => MitoEngine::new(
|
||||
config,
|
||||
Self::build_raft_engine_log_store(&opts.storage.data_home, raft_engine_config)
|
||||
.await?,
|
||||
object_store_manager,
|
||||
),
|
||||
WalConfig::Kafka(kafka_config) => MitoEngine::new(
|
||||
config,
|
||||
Self::build_kafka_log_store(kafka_config).await?,
|
||||
object_store_manager,
|
||||
),
|
||||
};
|
||||
Ok(mito_engine)
|
||||
}
|
||||
|
||||
/// Builds [RaftEngineLogStore].
|
||||
async fn build_raft_engine_log_store(
|
||||
data_home: &str,
|
||||
config: &RaftEngineConfig,
|
||||
) -> Result<Arc<RaftEngineLogStore>> {
|
||||
let data_home = normalize_dir(data_home);
|
||||
let wal_dir = match &config.dir {
|
||||
Some(dir) => dir.clone(),
|
||||
None => format!("{}{WAL_DIR}", data_home),
|
||||
};
|
||||
|
||||
// create WAL directory
|
||||
fs::create_dir_all(Path::new(&wal_dir))
|
||||
.await
|
||||
.context(CreateDirSnafu { dir: &wal_dir })?;
|
||||
info!(
|
||||
"Creating raft-engine logstore with config: {:?} and storage path: {}",
|
||||
config, &wal_dir
|
||||
);
|
||||
let logstore = RaftEngineLogStore::try_new(wal_dir, config.clone())
|
||||
.await
|
||||
.map_err(Box::new)
|
||||
.context(OpenLogStoreSnafu)?;
|
||||
|
||||
Ok(Arc::new(logstore))
|
||||
}
|
||||
|
||||
/// Builds [KafkaLogStore].
|
||||
async fn build_kafka_log_store(config: &KafkaConfig) -> Result<Arc<KafkaLogStore>> {
|
||||
let _ = config;
|
||||
todo!()
|
||||
}
|
||||
|
||||
/// Builds [ObjectStoreManager]
|
||||
async fn build_object_store_manager(opts: &DatanodeOptions) -> Result<ObjectStoreManagerRef> {
|
||||
let object_store =
|
||||
|
||||
85
src/log-store/src/kafka.rs
Normal file
85
src/log-store/src/kafka.rs
Normal file
@@ -0,0 +1,85 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
pub mod log_store;
|
||||
|
||||
use common_meta::wal::KafkaWalTopic as Topic;
|
||||
use store_api::logstore::entry::{Entry, Id as EntryId};
|
||||
use store_api::logstore::namespace::Namespace;
|
||||
|
||||
use crate::error::Error;
|
||||
|
||||
/// Kafka Namespace implementation.
|
||||
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
|
||||
pub struct NamespaceImpl {
|
||||
region_id: u64,
|
||||
topic: Topic,
|
||||
}
|
||||
|
||||
impl NamespaceImpl {
|
||||
fn new(region_id: u64, topic: Topic) -> Self {
|
||||
Self { region_id, topic }
|
||||
}
|
||||
|
||||
fn region_id(&self) -> u64 {
|
||||
self.region_id
|
||||
}
|
||||
|
||||
fn topic(&self) -> &Topic {
|
||||
&self.topic
|
||||
}
|
||||
}
|
||||
|
||||
impl Namespace for NamespaceImpl {
|
||||
fn id(&self) -> u64 {
|
||||
self.region_id
|
||||
}
|
||||
}
|
||||
|
||||
/// Kafka Entry implementation.
|
||||
pub struct EntryImpl {
|
||||
/// Entry payload.
|
||||
data: Vec<u8>,
|
||||
/// The logical entry id.
|
||||
id: EntryId,
|
||||
/// The namespace used to identify and isolate log entries from different regions.
|
||||
ns: NamespaceImpl,
|
||||
}
|
||||
|
||||
impl EntryImpl {
|
||||
fn new(data: Vec<u8>, entry_id: EntryId, ns: NamespaceImpl) -> Self {
|
||||
Self {
|
||||
data,
|
||||
id: entry_id,
|
||||
ns,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Entry for EntryImpl {
|
||||
type Error = Error;
|
||||
type Namespace = NamespaceImpl;
|
||||
|
||||
fn data(&self) -> &[u8] {
|
||||
&self.data
|
||||
}
|
||||
|
||||
fn id(&self) -> EntryId {
|
||||
self.id
|
||||
}
|
||||
|
||||
fn namespace(&self) -> Self::Namespace {
|
||||
self.ns.clone()
|
||||
}
|
||||
}
|
||||
106
src/log-store/src/kafka/log_store.rs
Normal file
106
src/log-store/src/kafka/log_store.rs
Normal file
@@ -0,0 +1,106 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use common_config::wal::{KafkaConfig, WalOptions};
|
||||
use store_api::logstore::entry::Id as EntryId;
|
||||
use store_api::logstore::entry_stream::SendableEntryStream;
|
||||
use store_api::logstore::namespace::Id as NamespaceId;
|
||||
use store_api::logstore::{AppendBatchResponse, AppendResponse, LogStore};
|
||||
|
||||
use crate::error::{Error, Result};
|
||||
use crate::kafka::{EntryImpl, NamespaceImpl};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct KafkaLogStore;
|
||||
|
||||
impl KafkaLogStore {
|
||||
pub async fn try_new(config: KafkaConfig) -> Result<Self> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl LogStore for KafkaLogStore {
|
||||
type Error = Error;
|
||||
type Entry = EntryImpl;
|
||||
type Namespace = NamespaceImpl;
|
||||
|
||||
/// Create an entry of the associate Entry type.
|
||||
fn entry<D: AsRef<[u8]>>(
|
||||
&self,
|
||||
data: D,
|
||||
entry_id: EntryId,
|
||||
ns: Self::Namespace,
|
||||
) -> Self::Entry {
|
||||
EntryImpl::new(data.as_ref().to_vec(), entry_id, ns)
|
||||
}
|
||||
|
||||
/// Append an `Entry` to WAL with given namespace and return append response containing
|
||||
/// the entry id.
|
||||
async fn append(&self, entry: Self::Entry) -> Result<AppendResponse> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
/// For a batch of log entries belonging to multiple regions, each assigned to a specific topic,
|
||||
/// we need to determine the minimum log offset returned for each region in this batch.
|
||||
/// During replay, we use this offset to fetch log entries for a region from its assigned topic.
|
||||
/// After fetching, we filter the entries to obtain log entries relevant to that specific region.
|
||||
async fn append_batch(&self, entries: Vec<Self::Entry>) -> Result<AppendBatchResponse> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
/// Create a new `EntryStream` to asynchronously generates `Entry` with ids
|
||||
/// starting from `id`. The generated entries will be filtered by the namespace.
|
||||
async fn read(
|
||||
&self,
|
||||
ns: &Self::Namespace,
|
||||
entry_id: EntryId,
|
||||
) -> Result<SendableEntryStream<Self::Entry, Self::Error>> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
/// Create a namespace of the associate Namespace type
|
||||
fn namespace(&self, ns_id: NamespaceId, wal_options: &WalOptions) -> Self::Namespace {
|
||||
todo!()
|
||||
}
|
||||
|
||||
/// Create a new `Namespace`.
|
||||
async fn create_namespace(&self, _ns: &Self::Namespace) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Delete an existing `Namespace` with given ref.
|
||||
async fn delete_namespace(&self, _ns: &Self::Namespace) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// List all existing namespaces.
|
||||
async fn list_namespaces(&self) -> Result<Vec<Self::Namespace>> {
|
||||
Ok(vec![])
|
||||
}
|
||||
|
||||
/// Mark all entry ids `<=id` of given `namespace` as obsolete so that logstore can safely delete
|
||||
/// the log files if all entries inside are obsolete. This method may not delete log
|
||||
/// files immediately.
|
||||
async fn obsolete(&self, _ns: Self::Namespace, _entry_id: EntryId) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Stop components of logstore.
|
||||
async fn stop(&self) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -15,6 +15,8 @@
|
||||
#![feature(let_chains)]
|
||||
|
||||
pub mod error;
|
||||
#[allow(unused)]
|
||||
pub mod kafka;
|
||||
mod noop;
|
||||
pub mod raft_engine;
|
||||
pub mod test_util;
|
||||
|
||||
@@ -12,9 +12,12 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use store_api::logstore::entry::{Entry, Id};
|
||||
use std::collections::HashMap;
|
||||
|
||||
use common_config::wal::WalOptions;
|
||||
use store_api::logstore::entry::{Entry, Id as EntryId};
|
||||
use store_api::logstore::namespace::{Id as NamespaceId, Namespace};
|
||||
use store_api::logstore::{AppendResponse, LogStore};
|
||||
use store_api::logstore::{AppendBatchResponse, AppendResponse, LogStore};
|
||||
|
||||
use crate::error::{Error, Result};
|
||||
|
||||
@@ -42,7 +45,7 @@ impl Entry for EntryImpl {
|
||||
&[]
|
||||
}
|
||||
|
||||
fn id(&self) -> Id {
|
||||
fn id(&self) -> EntryId {
|
||||
0
|
||||
}
|
||||
|
||||
@@ -62,17 +65,22 @@ impl LogStore for NoopLogStore {
|
||||
}
|
||||
|
||||
async fn append(&self, mut _e: Self::Entry) -> Result<AppendResponse> {
|
||||
Ok(AppendResponse { entry_id: 0 })
|
||||
Ok(AppendResponse {
|
||||
entry_id: 0,
|
||||
offset: None,
|
||||
})
|
||||
}
|
||||
|
||||
async fn append_batch(&self, _e: Vec<Self::Entry>) -> Result<()> {
|
||||
Ok(())
|
||||
async fn append_batch(&self, _e: Vec<Self::Entry>) -> Result<AppendBatchResponse> {
|
||||
Ok(AppendBatchResponse {
|
||||
offsets: HashMap::new(),
|
||||
})
|
||||
}
|
||||
|
||||
async fn read(
|
||||
&self,
|
||||
_ns: &Self::Namespace,
|
||||
_id: Id,
|
||||
_entry_id: EntryId,
|
||||
) -> Result<store_api::logstore::entry_stream::SendableEntryStream<'_, Self::Entry, Self::Error>>
|
||||
{
|
||||
Ok(Box::pin(futures::stream::once(futures::future::ready(Ok(
|
||||
@@ -92,25 +100,31 @@ impl LogStore for NoopLogStore {
|
||||
Ok(vec![])
|
||||
}
|
||||
|
||||
fn entry<D: AsRef<[u8]>>(&self, data: D, id: Id, ns: Self::Namespace) -> Self::Entry {
|
||||
fn entry<D: AsRef<[u8]>>(
|
||||
&self,
|
||||
data: D,
|
||||
entry_id: EntryId,
|
||||
ns: Self::Namespace,
|
||||
) -> Self::Entry {
|
||||
let _ = data;
|
||||
let _ = id;
|
||||
let _ = entry_id;
|
||||
let _ = ns;
|
||||
EntryImpl
|
||||
}
|
||||
|
||||
fn namespace(&self, id: NamespaceId) -> Self::Namespace {
|
||||
let _ = id;
|
||||
fn namespace(&self, ns_id: NamespaceId, wal_options: &WalOptions) -> Self::Namespace {
|
||||
let _ = ns_id;
|
||||
let _ = wal_options;
|
||||
NamespaceImpl
|
||||
}
|
||||
|
||||
async fn obsolete(
|
||||
&self,
|
||||
namespace: Self::Namespace,
|
||||
id: Id,
|
||||
ns: Self::Namespace,
|
||||
entry_id: EntryId,
|
||||
) -> std::result::Result<(), Self::Error> {
|
||||
let _ = namespace;
|
||||
let _ = id;
|
||||
let _ = ns;
|
||||
let _ = entry_id;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -135,7 +149,7 @@ mod tests {
|
||||
store.create_namespace(&NamespaceImpl).await.unwrap();
|
||||
assert_eq!(0, store.list_namespaces().await.unwrap().len());
|
||||
store.delete_namespace(&NamespaceImpl).await.unwrap();
|
||||
assert_eq!(NamespaceImpl, store.namespace(0));
|
||||
assert_eq!(NamespaceImpl, store.namespace(0, &WalOptions::default()));
|
||||
store.obsolete(NamespaceImpl, 1).await.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,8 +14,8 @@
|
||||
|
||||
use std::hash::{Hash, Hasher};
|
||||
|
||||
use store_api::logstore::entry::{Entry, Id};
|
||||
use store_api::logstore::namespace::Namespace;
|
||||
use store_api::logstore::entry::{Entry, Id as EntryId};
|
||||
use store_api::logstore::namespace::{Id as NamespaceId, Namespace};
|
||||
|
||||
use crate::error::Error;
|
||||
use crate::raft_engine::protos::logstore::{EntryImpl, NamespaceImpl};
|
||||
@@ -42,7 +42,7 @@ impl EntryImpl {
|
||||
}
|
||||
|
||||
impl NamespaceImpl {
|
||||
pub fn with_id(id: Id) -> Self {
|
||||
pub fn with_id(id: NamespaceId) -> Self {
|
||||
Self {
|
||||
id,
|
||||
..Default::default()
|
||||
@@ -60,7 +60,7 @@ impl Hash for NamespaceImpl {
|
||||
impl Eq for NamespaceImpl {}
|
||||
|
||||
impl Namespace for NamespaceImpl {
|
||||
fn id(&self) -> store_api::logstore::namespace::Id {
|
||||
fn id(&self) -> NamespaceId {
|
||||
self.id
|
||||
}
|
||||
}
|
||||
@@ -73,7 +73,7 @@ impl Entry for EntryImpl {
|
||||
self.data.as_slice()
|
||||
}
|
||||
|
||||
fn id(&self) -> Id {
|
||||
fn id(&self) -> EntryId {
|
||||
self.id
|
||||
}
|
||||
|
||||
|
||||
@@ -16,15 +16,15 @@ use std::fmt::{Debug, Formatter};
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_stream::stream;
|
||||
use common_config::WalConfig;
|
||||
use common_config::wal::{RaftEngineConfig, WalOptions};
|
||||
use common_runtime::{RepeatedTask, TaskFunction};
|
||||
use common_telemetry::{error, info};
|
||||
use raft_engine::{Config, Engine, LogBatch, MessageExt, ReadableSize, RecoveryMode};
|
||||
use snafu::{ensure, ResultExt};
|
||||
use store_api::logstore::entry::{Entry, Id};
|
||||
use store_api::logstore::entry::{Entry, Id as EntryId};
|
||||
use store_api::logstore::entry_stream::SendableEntryStream;
|
||||
use store_api::logstore::namespace::Namespace as NamespaceTrait;
|
||||
use store_api::logstore::{AppendResponse, LogStore};
|
||||
use store_api::logstore::namespace::{Id as NamespaceId, Namespace as NamespaceTrait};
|
||||
use store_api::logstore::{AppendBatchResponse, AppendResponse, LogStore};
|
||||
|
||||
use crate::error;
|
||||
use crate::error::{
|
||||
@@ -37,7 +37,7 @@ use crate::raft_engine::protos::logstore::{EntryImpl, NamespaceImpl as Namespace
|
||||
const NAMESPACE_PREFIX: &str = "$sys/";
|
||||
|
||||
pub struct RaftEngineLogStore {
|
||||
config: WalConfig,
|
||||
config: RaftEngineConfig,
|
||||
engine: Arc<Engine>,
|
||||
gc_task: RepeatedTask<Error>,
|
||||
}
|
||||
@@ -72,7 +72,7 @@ impl TaskFunction<Error> for PurgeExpiredFilesFunction {
|
||||
}
|
||||
|
||||
impl RaftEngineLogStore {
|
||||
pub async fn try_new(dir: String, config: WalConfig) -> Result<Self> {
|
||||
pub async fn try_new(dir: String, config: RaftEngineConfig) -> Result<Self> {
|
||||
let raft_engine_config = Config {
|
||||
dir,
|
||||
purge_threshold: ReadableSize(config.purge_threshold.0),
|
||||
@@ -153,7 +153,7 @@ impl LogStore for RaftEngineLogStore {
|
||||
self.gc_task.stop().await.context(StopGcTaskSnafu)
|
||||
}
|
||||
|
||||
/// Append an entry to logstore. Currently of existence of entry's namespace is not checked.
|
||||
/// Appends an entry to logstore. Currently the existence of the entry's namespace is not checked.
|
||||
async fn append(&self, e: Self::Entry) -> Result<AppendResponse> {
|
||||
ensure!(self.started(), IllegalStateSnafu);
|
||||
let entry_id = e.id;
|
||||
@@ -178,15 +178,18 @@ impl LogStore for RaftEngineLogStore {
|
||||
.engine
|
||||
.write(&mut batch, self.config.sync_write)
|
||||
.context(RaftEngineSnafu)?;
|
||||
Ok(AppendResponse { entry_id })
|
||||
Ok(AppendResponse {
|
||||
entry_id,
|
||||
offset: None,
|
||||
})
|
||||
}
|
||||
|
||||
/// Append a batch of entries to logstore. `RaftEngineLogStore` assures the atomicity of
|
||||
/// Appends a batch of entries to logstore. `RaftEngineLogStore` assures the atomicity of
|
||||
/// batch append.
|
||||
async fn append_batch(&self, entries: Vec<Self::Entry>) -> Result<()> {
|
||||
async fn append_batch(&self, entries: Vec<Self::Entry>) -> Result<AppendBatchResponse> {
|
||||
ensure!(self.started(), IllegalStateSnafu);
|
||||
if entries.is_empty() {
|
||||
return Ok(());
|
||||
return Ok(AppendBatchResponse::default());
|
||||
}
|
||||
|
||||
let mut batch = LogBatch::with_capacity(entries.len());
|
||||
@@ -203,7 +206,9 @@ impl LogStore for RaftEngineLogStore {
|
||||
.engine
|
||||
.write(&mut batch, self.config.sync_write)
|
||||
.context(RaftEngineSnafu)?;
|
||||
Ok(())
|
||||
|
||||
// The user of raft-engine log store does not care about the response.
|
||||
Ok(AppendBatchResponse::default())
|
||||
}
|
||||
|
||||
/// Create a stream of entries from logstore in the given namespace. The end of stream is
|
||||
@@ -211,18 +216,18 @@ impl LogStore for RaftEngineLogStore {
|
||||
async fn read(
|
||||
&self,
|
||||
ns: &Self::Namespace,
|
||||
id: Id,
|
||||
entry_id: EntryId,
|
||||
) -> Result<SendableEntryStream<'_, Self::Entry, Self::Error>> {
|
||||
ensure!(self.started(), IllegalStateSnafu);
|
||||
let engine = self.engine.clone();
|
||||
|
||||
let last_index = engine.last_index(ns.id).unwrap_or(0);
|
||||
let mut start_index = id.max(engine.first_index(ns.id).unwrap_or(last_index + 1));
|
||||
let last_index = engine.last_index(ns.id()).unwrap_or(0);
|
||||
let mut start_index = entry_id.max(engine.first_index(ns.id()).unwrap_or(last_index + 1));
|
||||
|
||||
info!(
|
||||
"Read logstore, namespace: {}, start: {}, span: {:?}",
|
||||
ns.id(),
|
||||
id,
|
||||
entry_id,
|
||||
self.span(ns)
|
||||
);
|
||||
let max_batch_size = self.config.read_batch_size;
|
||||
@@ -322,31 +327,37 @@ impl LogStore for RaftEngineLogStore {
|
||||
Ok(namespaces)
|
||||
}
|
||||
|
||||
fn entry<D: AsRef<[u8]>>(&self, data: D, id: Id, ns: Self::Namespace) -> Self::Entry {
|
||||
fn entry<D: AsRef<[u8]>>(
|
||||
&self,
|
||||
data: D,
|
||||
entry_id: EntryId,
|
||||
ns: Self::Namespace,
|
||||
) -> Self::Entry {
|
||||
EntryImpl {
|
||||
id,
|
||||
id: entry_id,
|
||||
data: data.as_ref().to_vec(),
|
||||
namespace_id: ns.id(),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
fn namespace(&self, id: store_api::logstore::namespace::Id) -> Self::Namespace {
|
||||
fn namespace(&self, ns_id: NamespaceId, wal_options: &WalOptions) -> Self::Namespace {
|
||||
let _ = wal_options;
|
||||
Namespace {
|
||||
id,
|
||||
id: ns_id,
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
async fn obsolete(&self, namespace: Self::Namespace, id: Id) -> Result<()> {
|
||||
async fn obsolete(&self, ns: Self::Namespace, entry_id: EntryId) -> Result<()> {
|
||||
ensure!(self.started(), IllegalStateSnafu);
|
||||
let obsoleted = self.engine.compact_to(namespace.id(), id + 1);
|
||||
let obsoleted = self.engine.compact_to(ns.id(), entry_id + 1);
|
||||
info!(
|
||||
"Namespace {} obsoleted {} entries, compacted index: {}, span: {:?}",
|
||||
namespace.id(),
|
||||
ns.id(),
|
||||
obsoleted,
|
||||
id,
|
||||
self.span(&namespace)
|
||||
entry_id,
|
||||
self.span(&ns)
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
@@ -386,7 +397,7 @@ mod tests {
|
||||
let dir = create_temp_dir("raft-engine-logstore-test");
|
||||
let logstore = RaftEngineLogStore::try_new(
|
||||
dir.path().to_str().unwrap().to_string(),
|
||||
WalConfig::default(),
|
||||
RaftEngineConfig::default(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -399,7 +410,7 @@ mod tests {
|
||||
let dir = create_temp_dir("raft-engine-logstore-test");
|
||||
let logstore = RaftEngineLogStore::try_new(
|
||||
dir.path().to_str().unwrap().to_string(),
|
||||
WalConfig::default(),
|
||||
RaftEngineConfig::default(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -425,7 +436,7 @@ mod tests {
|
||||
let dir = create_temp_dir("raft-engine-logstore-test");
|
||||
let logstore = RaftEngineLogStore::try_new(
|
||||
dir.path().to_str().unwrap().to_string(),
|
||||
WalConfig::default(),
|
||||
RaftEngineConfig::default(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -466,7 +477,7 @@ mod tests {
|
||||
{
|
||||
let logstore = RaftEngineLogStore::try_new(
|
||||
dir.path().to_str().unwrap().to_string(),
|
||||
WalConfig::default(),
|
||||
RaftEngineConfig::default(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -486,7 +497,7 @@ mod tests {
|
||||
|
||||
let logstore = RaftEngineLogStore::try_new(
|
||||
dir.path().to_str().unwrap().to_string(),
|
||||
WalConfig::default(),
|
||||
RaftEngineConfig::default(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -521,7 +532,7 @@ mod tests {
|
||||
let dir = create_temp_dir("raft-engine-logstore-test");
|
||||
let path = dir.path().to_str().unwrap().to_string();
|
||||
|
||||
let config = WalConfig {
|
||||
let config = RaftEngineConfig {
|
||||
file_size: ReadableSize::mb(2),
|
||||
purge_threshold: ReadableSize::mb(4),
|
||||
purge_interval: Duration::from_secs(5),
|
||||
@@ -553,7 +564,7 @@ mod tests {
|
||||
let dir = create_temp_dir("raft-engine-logstore-test");
|
||||
let path = dir.path().to_str().unwrap().to_string();
|
||||
|
||||
let config = WalConfig {
|
||||
let config = RaftEngineConfig {
|
||||
file_size: ReadableSize::mb(2),
|
||||
purge_threshold: ReadableSize::mb(4),
|
||||
purge_interval: Duration::from_secs(5),
|
||||
@@ -582,7 +593,7 @@ mod tests {
|
||||
let dir = create_temp_dir("logstore-append-batch-test");
|
||||
let path = dir.path().to_str().unwrap().to_string();
|
||||
|
||||
let config = WalConfig {
|
||||
let config = RaftEngineConfig {
|
||||
file_size: ReadableSize::mb(2),
|
||||
purge_threshold: ReadableSize::mb(4),
|
||||
purge_interval: Duration::from_secs(5),
|
||||
@@ -613,7 +624,7 @@ mod tests {
|
||||
let dir = create_temp_dir("logstore-append-batch-test");
|
||||
|
||||
let path = dir.path().to_str().unwrap().to_string();
|
||||
let config = WalConfig {
|
||||
let config = RaftEngineConfig {
|
||||
file_size: ReadableSize::mb(2),
|
||||
purge_threshold: ReadableSize::mb(4),
|
||||
purge_interval: Duration::from_secs(5),
|
||||
|
||||
@@ -15,14 +15,14 @@
|
||||
use std::path::Path;
|
||||
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_config::WalConfig;
|
||||
use common_config::wal::RaftEngineConfig;
|
||||
|
||||
use crate::raft_engine::log_store::RaftEngineLogStore;
|
||||
|
||||
/// Create a write log for the provided path, used for test.
|
||||
pub async fn create_tmp_local_file_log_store<P: AsRef<Path>>(path: P) -> RaftEngineLogStore {
|
||||
let path = path.as_ref().display().to_string();
|
||||
let cfg = WalConfig {
|
||||
let cfg = RaftEngineConfig {
|
||||
file_size: ReadableSize::kb(128),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
@@ -20,6 +20,7 @@ bytes.workspace = true
|
||||
chrono.workspace = true
|
||||
common-base.workspace = true
|
||||
common-catalog.workspace = true
|
||||
common-config.workspace = true
|
||||
common-datasource.workspace = true
|
||||
common-decimal.workspace = true
|
||||
common-error.workspace = true
|
||||
|
||||
@@ -22,6 +22,7 @@ use std::collections::HashMap;
|
||||
use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use common_config::wal::WalOptions;
|
||||
use common_telemetry::info;
|
||||
use common_time::util::current_time_millis;
|
||||
use snafu::{ensure, OptionExt};
|
||||
@@ -74,6 +75,8 @@ pub(crate) struct MitoRegion {
|
||||
pub(crate) manifest_manager: RegionManifestManager,
|
||||
/// SST file purger.
|
||||
pub(crate) file_purger: FilePurgerRef,
|
||||
/// Wal options of this region.
|
||||
pub(crate) wal_options: WalOptions,
|
||||
/// Last flush time in millis.
|
||||
last_flush_millis: AtomicI64,
|
||||
/// Whether the region is writable.
|
||||
|
||||
@@ -18,6 +18,7 @@ use std::collections::HashMap;
|
||||
use std::sync::atomic::{AtomicBool, AtomicI64};
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_config::wal::WalOptions;
|
||||
use common_telemetry::{debug, error, info, warn};
|
||||
use common_time::util::current_time_millis;
|
||||
use futures::StreamExt;
|
||||
@@ -145,6 +146,8 @@ impl RegionOpener {
|
||||
}
|
||||
}
|
||||
let options = RegionOptions::try_from(&self.options)?;
|
||||
let wal_options = options.wal_options.clone();
|
||||
|
||||
let object_store = self.object_store(&options.storage)?.clone();
|
||||
|
||||
// Create a manifest manager for this region and writes regions to the manifest file.
|
||||
@@ -171,6 +174,7 @@ impl RegionOpener {
|
||||
access_layer,
|
||||
self.cache_manager,
|
||||
)),
|
||||
wal_options,
|
||||
last_flush_millis: AtomicI64::new(current_time_millis()),
|
||||
// Region is writable after it is created.
|
||||
writable: AtomicBool::new(true),
|
||||
@@ -215,6 +219,8 @@ impl RegionOpener {
|
||||
wal: &Wal<S>,
|
||||
) -> Result<Option<MitoRegion>> {
|
||||
let region_options = RegionOptions::try_from(&self.options)?;
|
||||
let wal_options = region_options.wal_options.clone();
|
||||
|
||||
let region_manifest_options = self.manifest_options(config, ®ion_options)?;
|
||||
let Some(manifest_manager) = RegionManifestManager::open(region_manifest_options).await?
|
||||
else {
|
||||
@@ -244,7 +250,14 @@ impl RegionOpener {
|
||||
let flushed_entry_id = version.flushed_entry_id;
|
||||
let version_control = Arc::new(VersionControl::new(version));
|
||||
if !self.skip_wal_replay {
|
||||
replay_memtable(wal, region_id, flushed_entry_id, &version_control).await?;
|
||||
replay_memtable(
|
||||
wal,
|
||||
&wal_options,
|
||||
region_id,
|
||||
flushed_entry_id,
|
||||
&version_control,
|
||||
)
|
||||
.await?;
|
||||
} else {
|
||||
info!("Skip the WAL replay for region: {}", region_id);
|
||||
}
|
||||
@@ -255,6 +268,7 @@ impl RegionOpener {
|
||||
access_layer,
|
||||
manifest_manager,
|
||||
file_purger,
|
||||
wal_options,
|
||||
last_flush_millis: AtomicI64::new(current_time_millis()),
|
||||
// Region is always opened in read only mode.
|
||||
writable: AtomicBool::new(false),
|
||||
@@ -346,6 +360,7 @@ pub(crate) fn check_recovered_region(
|
||||
/// Replays the mutations from WAL and inserts mutations to memtable of given region.
|
||||
async fn replay_memtable<S: LogStore>(
|
||||
wal: &Wal<S>,
|
||||
wal_options: &WalOptions,
|
||||
region_id: RegionId,
|
||||
flushed_entry_id: EntryId,
|
||||
version_control: &VersionControlRef,
|
||||
@@ -354,8 +369,8 @@ async fn replay_memtable<S: LogStore>(
|
||||
// Last entry id should start from flushed entry id since there might be no
|
||||
// data in the WAL.
|
||||
let mut last_entry_id = flushed_entry_id;
|
||||
let mut region_write_ctx = RegionWriteCtx::new(region_id, version_control);
|
||||
let mut wal_stream = wal.scan(region_id, flushed_entry_id)?;
|
||||
let mut region_write_ctx = RegionWriteCtx::new(region_id, version_control, wal_options.clone());
|
||||
let mut wal_stream = wal.scan(region_id, flushed_entry_id, wal_options)?;
|
||||
while let Some(res) = wal_stream.next().await {
|
||||
let (entry_id, entry) = res?;
|
||||
last_entry_id = last_entry_id.max(entry_id);
|
||||
|
||||
@@ -17,6 +17,8 @@
|
||||
use std::collections::HashMap;
|
||||
use std::time::Duration;
|
||||
|
||||
use common_config::wal::WalOptions;
|
||||
use common_config::WAL_OPTIONS_KEY;
|
||||
use serde::Deserialize;
|
||||
use serde_json::Value;
|
||||
use serde_with::{serde_as, with_prefix, DisplayFromStr};
|
||||
@@ -37,6 +39,8 @@ pub struct RegionOptions {
|
||||
pub compaction: CompactionOptions,
|
||||
/// Custom storage.
|
||||
pub storage: Option<String>,
|
||||
/// Wal options.
|
||||
pub wal_options: WalOptions,
|
||||
}
|
||||
|
||||
impl TryFrom<&HashMap<String, String>> for RegionOptions {
|
||||
@@ -53,10 +57,19 @@ impl TryFrom<&HashMap<String, String>> for RegionOptions {
|
||||
serde_json::from_str(&json).context(JsonOptionsSnafu)?;
|
||||
let compaction: CompactionOptions = serde_json::from_str(&json).unwrap_or_default();
|
||||
|
||||
// Tries to decode the wal options from the map or sets to the default if there's none wal options in the map.
|
||||
let wal_options = options_map.get(WAL_OPTIONS_KEY).map_or_else(
|
||||
|| Ok(WalOptions::default()),
|
||||
|encoded_wal_options| {
|
||||
serde_json::from_str(encoded_wal_options).context(JsonOptionsSnafu)
|
||||
},
|
||||
)?;
|
||||
|
||||
Ok(RegionOptions {
|
||||
ttl: options.ttl,
|
||||
compaction,
|
||||
storage: options.storage,
|
||||
wal_options,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -161,6 +174,9 @@ fn options_map_to_value(options: &HashMap<String, String>) -> Value {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use common_config::wal::KafkaWalOptions;
|
||||
use common_config::WAL_OPTIONS_KEY;
|
||||
|
||||
use super::*;
|
||||
|
||||
fn make_map(options: &[(&str, &str)]) -> HashMap<String, String> {
|
||||
@@ -232,8 +248,34 @@ mod tests {
|
||||
assert_eq!(expect, options);
|
||||
}
|
||||
|
||||
fn test_with_wal_options(wal_options: &WalOptions) -> bool {
|
||||
let encoded_wal_options = serde_json::to_string(&wal_options).unwrap();
|
||||
let map = make_map(&[(WAL_OPTIONS_KEY, &encoded_wal_options)]);
|
||||
let got = RegionOptions::try_from(&map).unwrap();
|
||||
let expect = RegionOptions {
|
||||
wal_options: wal_options.clone(),
|
||||
..Default::default()
|
||||
};
|
||||
expect == got
|
||||
}
|
||||
|
||||
// No need to add compatible tests for RegionOptions since the above tests already check for compatibility.
|
||||
#[test]
|
||||
fn test_with_any_wal_options() {
|
||||
let all_wal_options = vec![
|
||||
WalOptions::RaftEngine,
|
||||
WalOptions::Kafka(KafkaWalOptions {
|
||||
topic: "test_topic".to_string(),
|
||||
}),
|
||||
];
|
||||
all_wal_options.iter().all(test_with_wal_options);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_with_all() {
|
||||
let wal_options = WalOptions::Kafka(KafkaWalOptions {
|
||||
topic: "test_topic".to_string(),
|
||||
});
|
||||
let map = make_map(&[
|
||||
("ttl", "7d"),
|
||||
("compaction.twcs.max_active_window_files", "8"),
|
||||
@@ -241,6 +283,10 @@ mod tests {
|
||||
("compaction.twcs.time_window", "2h"),
|
||||
("compaction.type", "twcs"),
|
||||
("storage", "S3"),
|
||||
(
|
||||
WAL_OPTIONS_KEY,
|
||||
&serde_json::to_string(&wal_options).unwrap(),
|
||||
),
|
||||
]);
|
||||
let options = RegionOptions::try_from(&map).unwrap();
|
||||
let expect = RegionOptions {
|
||||
@@ -251,6 +297,7 @@ mod tests {
|
||||
time_window: Some(Duration::from_secs(3600 * 2)),
|
||||
}),
|
||||
storage: Some("s3".to_string()),
|
||||
wal_options,
|
||||
};
|
||||
assert_eq!(expect, options);
|
||||
}
|
||||
|
||||
@@ -16,6 +16,7 @@ use std::mem;
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::{Mutation, OpType, Rows, WalEntry};
|
||||
use common_config::wal::WalOptions;
|
||||
use snafu::ResultExt;
|
||||
use store_api::logstore::LogStore;
|
||||
use store_api::storage::{RegionId, SequenceNumber};
|
||||
@@ -86,6 +87,8 @@ pub(crate) struct RegionWriteCtx {
|
||||
/// We keep [WalEntry] instead of mutations to avoid taking mutations
|
||||
/// out of the context to construct the wal entry when we write to the wal.
|
||||
wal_entry: WalEntry,
|
||||
/// Wal options of the region being written to.
|
||||
wal_options: WalOptions,
|
||||
/// Notifiers to send write results to waiters.
|
||||
///
|
||||
/// The i-th notify is for i-th mutation.
|
||||
@@ -102,7 +105,11 @@ pub(crate) struct RegionWriteCtx {
|
||||
|
||||
impl RegionWriteCtx {
|
||||
/// Returns an empty context.
|
||||
pub(crate) fn new(region_id: RegionId, version_control: &VersionControlRef) -> RegionWriteCtx {
|
||||
pub(crate) fn new(
|
||||
region_id: RegionId,
|
||||
version_control: &VersionControlRef,
|
||||
wal_options: WalOptions,
|
||||
) -> RegionWriteCtx {
|
||||
let VersionControlData {
|
||||
version,
|
||||
committed_sequence,
|
||||
@@ -117,6 +124,7 @@ impl RegionWriteCtx {
|
||||
next_sequence: committed_sequence + 1,
|
||||
next_entry_id: last_entry_id + 1,
|
||||
wal_entry: WalEntry::default(),
|
||||
wal_options,
|
||||
notifiers: Vec::new(),
|
||||
failed: false,
|
||||
put_num: 0,
|
||||
@@ -153,7 +161,12 @@ impl RegionWriteCtx {
|
||||
&mut self,
|
||||
wal_writer: &mut WalWriter<S>,
|
||||
) -> Result<()> {
|
||||
wal_writer.add_entry(self.region_id, self.next_entry_id, &self.wal_entry)?;
|
||||
wal_writer.add_entry(
|
||||
self.region_id,
|
||||
self.next_entry_id,
|
||||
&self.wal_entry,
|
||||
&self.wal_options,
|
||||
)?;
|
||||
// We only call this method one time, but we still bump next entry id for consistency.
|
||||
self.next_entry_id += 1;
|
||||
Ok(())
|
||||
|
||||
@@ -14,11 +14,13 @@
|
||||
|
||||
//! Write ahead log of the engine.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::mem;
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::WalEntry;
|
||||
use async_stream::try_stream;
|
||||
use common_config::wal::WalOptions;
|
||||
use common_error::ext::BoxedError;
|
||||
use futures::stream::BoxStream;
|
||||
use futures::StreamExt;
|
||||
@@ -60,13 +62,19 @@ impl<S: LogStore> Wal<S> {
|
||||
store: self.store.clone(),
|
||||
entries: Vec::new(),
|
||||
entry_encode_buf: Vec::new(),
|
||||
namespaces: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Scan entries of specific region starting from `start_id` (inclusive).
|
||||
pub fn scan(&self, region_id: RegionId, start_id: EntryId) -> Result<WalEntryStream> {
|
||||
pub fn scan<'a>(
|
||||
&'a self,
|
||||
region_id: RegionId,
|
||||
start_id: EntryId,
|
||||
wal_options: &'a WalOptions,
|
||||
) -> Result<WalEntryStream> {
|
||||
let stream = try_stream!({
|
||||
let namespace = self.store.namespace(region_id.into());
|
||||
let namespace = self.store.namespace(region_id.into(), wal_options);
|
||||
let mut stream = self
|
||||
.store
|
||||
.read(&namespace, start_id)
|
||||
@@ -89,8 +97,13 @@ impl<S: LogStore> Wal<S> {
|
||||
}
|
||||
|
||||
/// Mark entries whose ids `<= last_id` as deleted.
|
||||
pub async fn obsolete(&self, region_id: RegionId, last_id: EntryId) -> Result<()> {
|
||||
let namespace = self.store.namespace(region_id.into());
|
||||
pub async fn obsolete(
|
||||
&self,
|
||||
region_id: RegionId,
|
||||
last_id: EntryId,
|
||||
wal_options: &WalOptions,
|
||||
) -> Result<()> {
|
||||
let namespace = self.store.namespace(region_id.into(), wal_options);
|
||||
self.store
|
||||
.obsolete(namespace, last_id)
|
||||
.await
|
||||
@@ -117,6 +130,8 @@ pub struct WalWriter<S: LogStore> {
|
||||
entries: Vec<S::Entry>,
|
||||
/// Buffer to encode WAL entry.
|
||||
entry_encode_buf: Vec<u8>,
|
||||
/// Namespaces of regions being written into.
|
||||
namespaces: HashMap<RegionId, S::Namespace>,
|
||||
}
|
||||
|
||||
impl<S: LogStore> WalWriter<S> {
|
||||
@@ -126,8 +141,15 @@ impl<S: LogStore> WalWriter<S> {
|
||||
region_id: RegionId,
|
||||
entry_id: EntryId,
|
||||
wal_entry: &WalEntry,
|
||||
wal_options: &WalOptions,
|
||||
) -> Result<()> {
|
||||
let namespace = self.store.namespace(region_id.into());
|
||||
// Gets or inserts with a newly built namespace.
|
||||
let namespace = self
|
||||
.namespaces
|
||||
.entry(region_id)
|
||||
.or_insert_with(|| self.store.namespace(region_id.into(), wal_options))
|
||||
.clone();
|
||||
|
||||
// Encode wal entry to log store entry.
|
||||
self.entry_encode_buf.clear();
|
||||
wal_entry
|
||||
@@ -143,6 +165,7 @@ impl<S: LogStore> WalWriter<S> {
|
||||
}
|
||||
|
||||
/// Write all buffered entries to the WAL.
|
||||
// TODO(niebayes): returns an `AppendBatchResponse` and handle it properly.
|
||||
pub async fn write_to_wal(&mut self) -> Result<()> {
|
||||
// TODO(yingwen): metrics.
|
||||
|
||||
@@ -152,6 +175,7 @@ impl<S: LogStore> WalWriter<S> {
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(WriteWalSnafu)
|
||||
.map(|_| ())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -235,6 +259,7 @@ mod tests {
|
||||
async fn test_write_wal() {
|
||||
let env = WalEnv::new().await;
|
||||
let wal = env.new_wal();
|
||||
let wal_options = WalOptions::default();
|
||||
|
||||
let entry = WalEntry {
|
||||
mutations: vec![
|
||||
@@ -244,11 +269,17 @@ mod tests {
|
||||
};
|
||||
let mut writer = wal.writer();
|
||||
// Region 1 entry 1.
|
||||
writer.add_entry(RegionId::new(1, 1), 1, &entry).unwrap();
|
||||
writer
|
||||
.add_entry(RegionId::new(1, 1), 1, &entry, &wal_options)
|
||||
.unwrap();
|
||||
// Region 2 entry 1.
|
||||
writer.add_entry(RegionId::new(1, 2), 1, &entry).unwrap();
|
||||
writer
|
||||
.add_entry(RegionId::new(1, 2), 1, &entry, &wal_options)
|
||||
.unwrap();
|
||||
// Region 1 entry 2.
|
||||
writer.add_entry(RegionId::new(1, 1), 2, &entry).unwrap();
|
||||
writer
|
||||
.add_entry(RegionId::new(1, 1), 2, &entry, &wal_options)
|
||||
.unwrap();
|
||||
|
||||
// Test writing multiple region to wal.
|
||||
writer.write_to_wal().await.unwrap();
|
||||
@@ -295,31 +326,32 @@ mod tests {
|
||||
async fn test_scan_wal() {
|
||||
let env = WalEnv::new().await;
|
||||
let wal = env.new_wal();
|
||||
let wal_options = WalOptions::default();
|
||||
|
||||
let entries = sample_entries();
|
||||
let (id1, id2) = (RegionId::new(1, 1), RegionId::new(1, 2));
|
||||
let mut writer = wal.writer();
|
||||
writer.add_entry(id1, 1, &entries[0]).unwrap();
|
||||
writer.add_entry(id1, 1, &entries[0], &wal_options).unwrap();
|
||||
// Insert one entry into region2. Scan should not return this entry.
|
||||
writer.add_entry(id2, 1, &entries[0]).unwrap();
|
||||
writer.add_entry(id1, 2, &entries[1]).unwrap();
|
||||
writer.add_entry(id1, 3, &entries[2]).unwrap();
|
||||
writer.add_entry(id1, 4, &entries[3]).unwrap();
|
||||
writer.add_entry(id2, 1, &entries[0], &wal_options).unwrap();
|
||||
writer.add_entry(id1, 2, &entries[1], &wal_options).unwrap();
|
||||
writer.add_entry(id1, 3, &entries[2], &wal_options).unwrap();
|
||||
writer.add_entry(id1, 4, &entries[3], &wal_options).unwrap();
|
||||
|
||||
writer.write_to_wal().await.unwrap();
|
||||
|
||||
// Scan all contents region1
|
||||
let stream = wal.scan(id1, 1).unwrap();
|
||||
let stream = wal.scan(id1, 1, &wal_options).unwrap();
|
||||
let actual: Vec<_> = stream.try_collect().await.unwrap();
|
||||
check_entries(&entries, 1, &actual);
|
||||
|
||||
// Scan parts of contents
|
||||
let stream = wal.scan(id1, 2).unwrap();
|
||||
let stream = wal.scan(id1, 2, &wal_options).unwrap();
|
||||
let actual: Vec<_> = stream.try_collect().await.unwrap();
|
||||
check_entries(&entries[1..], 2, &actual);
|
||||
|
||||
// Scan out of range
|
||||
let stream = wal.scan(id1, 5).unwrap();
|
||||
let stream = wal.scan(id1, 5, &wal_options).unwrap();
|
||||
let actual: Vec<_> = stream.try_collect().await.unwrap();
|
||||
assert!(actual.is_empty());
|
||||
}
|
||||
@@ -328,26 +360,35 @@ mod tests {
|
||||
async fn test_obsolete_wal() {
|
||||
let env = WalEnv::new().await;
|
||||
let wal = env.new_wal();
|
||||
let wal_options = WalOptions::default();
|
||||
|
||||
let entries = sample_entries();
|
||||
let mut writer = wal.writer();
|
||||
let region_id = RegionId::new(1, 1);
|
||||
writer.add_entry(region_id, 1, &entries[0]).unwrap();
|
||||
writer.add_entry(region_id, 2, &entries[1]).unwrap();
|
||||
writer.add_entry(region_id, 3, &entries[2]).unwrap();
|
||||
writer
|
||||
.add_entry(region_id, 1, &entries[0], &wal_options)
|
||||
.unwrap();
|
||||
writer
|
||||
.add_entry(region_id, 2, &entries[1], &wal_options)
|
||||
.unwrap();
|
||||
writer
|
||||
.add_entry(region_id, 3, &entries[2], &wal_options)
|
||||
.unwrap();
|
||||
|
||||
writer.write_to_wal().await.unwrap();
|
||||
|
||||
// Delete 1, 2.
|
||||
wal.obsolete(region_id, 2).await.unwrap();
|
||||
wal.obsolete(region_id, 2, &wal_options).await.unwrap();
|
||||
|
||||
// Put 4.
|
||||
let mut writer = wal.writer();
|
||||
writer.add_entry(region_id, 4, &entries[3]).unwrap();
|
||||
writer
|
||||
.add_entry(region_id, 4, &entries[3], &wal_options)
|
||||
.unwrap();
|
||||
writer.write_to_wal().await.unwrap();
|
||||
|
||||
// Scan all
|
||||
let stream = wal.scan(region_id, 1).unwrap();
|
||||
let stream = wal.scan(region_id, 1, &wal_options).unwrap();
|
||||
let actual: Vec<_> = stream.try_collect().await.unwrap();
|
||||
check_entries(&entries[2..], 3, &actual);
|
||||
}
|
||||
|
||||
@@ -201,7 +201,11 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
"Region {} flush finished, tries to bump wal to {}",
|
||||
region_id, request.flushed_entry_id
|
||||
);
|
||||
if let Err(e) = self.wal.obsolete(region_id, request.flushed_entry_id).await {
|
||||
if let Err(e) = self
|
||||
.wal
|
||||
.obsolete(region_id, request.flushed_entry_id, ®ion.wal_options)
|
||||
.await
|
||||
{
|
||||
error!(e; "Failed to write wal, region: {}", region_id);
|
||||
request.on_failure(e);
|
||||
return;
|
||||
|
||||
@@ -59,7 +59,9 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
);
|
||||
|
||||
// Make all data obsolete.
|
||||
self.wal.obsolete(region_id, truncated_entry_id).await?;
|
||||
self.wal
|
||||
.obsolete(region_id, truncated_entry_id, ®ion.wal_options)
|
||||
.await?;
|
||||
info!(
|
||||
"Complete truncating region: {}, entry id: {} and sequence: {}.",
|
||||
region_id, truncated_entry_id, truncated_sequence
|
||||
|
||||
@@ -133,7 +133,11 @@ impl<S> RegionWorkerLoop<S> {
|
||||
continue;
|
||||
};
|
||||
|
||||
let region_ctx = RegionWriteCtx::new(region.region_id, ®ion.version_control);
|
||||
let region_ctx = RegionWriteCtx::new(
|
||||
region.region_id,
|
||||
®ion.version_control,
|
||||
region.wal_options.clone(),
|
||||
);
|
||||
|
||||
e.insert(region_ctx);
|
||||
}
|
||||
|
||||
@@ -10,6 +10,7 @@ aquamarine.workspace = true
|
||||
async-trait.workspace = true
|
||||
bytes.workspace = true
|
||||
common-base.workspace = true
|
||||
common-config.workspace = true
|
||||
common-error.workspace = true
|
||||
common-macro.workspace = true
|
||||
common-query.workspace = true
|
||||
|
||||
@@ -14,11 +14,14 @@
|
||||
|
||||
//! LogStore APIs.
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use common_config::wal::WalOptions;
|
||||
use common_error::ext::ErrorExt;
|
||||
|
||||
use crate::logstore::entry::{Entry, Id};
|
||||
use crate::logstore::entry::{Entry, Id as EntryId, Offset as EntryOffset};
|
||||
use crate::logstore::entry_stream::SendableEntryStream;
|
||||
use crate::logstore::namespace::Namespace;
|
||||
use crate::logstore::namespace::{Id as NamespaceId, Namespace};
|
||||
|
||||
pub mod entry;
|
||||
pub mod entry_stream;
|
||||
@@ -36,17 +39,21 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug {
|
||||
|
||||
/// Append an `Entry` to WAL with given namespace and return append response containing
|
||||
/// the entry id.
|
||||
async fn append(&self, e: Self::Entry) -> Result<AppendResponse, Self::Error>;
|
||||
async fn append(&self, entry: Self::Entry) -> Result<AppendResponse, Self::Error>;
|
||||
|
||||
/// Append a batch of entries atomically and return the offset of first entry.
|
||||
async fn append_batch(&self, e: Vec<Self::Entry>) -> Result<(), Self::Error>;
|
||||
/// Append a batch of entries and return an append batch response containing the start entry ids of
|
||||
/// log entries written to each region.
|
||||
async fn append_batch(
|
||||
&self,
|
||||
entries: Vec<Self::Entry>,
|
||||
) -> Result<AppendBatchResponse, Self::Error>;
|
||||
|
||||
/// Create a new `EntryStream` to asynchronously generates `Entry` with ids
|
||||
/// starting from `id`.
|
||||
async fn read(
|
||||
&self,
|
||||
ns: &Self::Namespace,
|
||||
id: Id,
|
||||
id: EntryId,
|
||||
) -> Result<SendableEntryStream<Self::Entry, Self::Error>, Self::Error>;
|
||||
|
||||
/// Create a new `Namespace`.
|
||||
@@ -59,19 +66,33 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug {
|
||||
async fn list_namespaces(&self) -> Result<Vec<Self::Namespace>, Self::Error>;
|
||||
|
||||
/// Create an entry of the associate Entry type
|
||||
fn entry<D: AsRef<[u8]>>(&self, data: D, id: Id, ns: Self::Namespace) -> Self::Entry;
|
||||
fn entry<D: AsRef<[u8]>>(&self, data: D, entry_id: EntryId, ns: Self::Namespace)
|
||||
-> Self::Entry;
|
||||
|
||||
/// Create a namespace of the associate Namespace type
|
||||
// TODO(sunng87): confusion with `create_namespace`
|
||||
fn namespace(&self, id: namespace::Id) -> Self::Namespace;
|
||||
fn namespace(&self, ns_id: NamespaceId, wal_options: &WalOptions) -> Self::Namespace;
|
||||
|
||||
/// Mark all entry ids `<=id` of given `namespace` as obsolete so that logstore can safely delete
|
||||
/// the log files if all entries inside are obsolete. This method may not delete log
|
||||
/// files immediately.
|
||||
async fn obsolete(&self, namespace: Self::Namespace, id: Id) -> Result<(), Self::Error>;
|
||||
async fn obsolete(&self, ns: Self::Namespace, entry_id: EntryId) -> Result<(), Self::Error>;
|
||||
}
|
||||
|
||||
/// The response of an `append` operation.
|
||||
#[derive(Debug)]
|
||||
pub struct AppendResponse {
|
||||
pub entry_id: Id,
|
||||
/// The entry id of the appended log entry.
|
||||
pub entry_id: EntryId,
|
||||
/// The start entry offset of the appended log entry.
|
||||
/// Depends on the `LogStore` implementation, the entry offset may be missing.
|
||||
pub offset: Option<EntryOffset>,
|
||||
}
|
||||
|
||||
/// The response of an `append_batch` operation.
|
||||
#[derive(Debug, Default)]
|
||||
pub struct AppendBatchResponse {
|
||||
/// Key: region id (as u64). Value: the known minimum start offset of the appended log entries belonging to the region.
|
||||
/// Depends on the `LogStore` implementation, the entry offsets may be missing.
|
||||
pub offsets: HashMap<u64, EntryOffset>,
|
||||
}
|
||||
|
||||
@@ -16,9 +16,10 @@ use common_error::ext::ErrorExt;
|
||||
|
||||
use crate::logstore::namespace::Namespace;
|
||||
|
||||
pub type Offset = usize;
|
||||
pub type Epoch = u64;
|
||||
/// An entry's logical id, allocated by log store users.
|
||||
pub type Id = u64;
|
||||
/// An entry's physical offset in the underlying log store.
|
||||
pub type Offset = usize;
|
||||
|
||||
/// Entry is the minimal data storage unit in `LogStore`.
|
||||
pub trait Entry: Send + Sync {
|
||||
|
||||
@@ -773,6 +773,7 @@ timeout = "30s"
|
||||
body_limit = "64MiB"
|
||||
|
||||
[datanode.wal]
|
||||
provider = "raft_engine"
|
||||
file_size = "256MiB"
|
||||
purge_threshold = "4GiB"
|
||||
purge_interval = "10m"
|
||||
|
||||
Reference in New Issue
Block a user