diff --git a/Cargo.lock b/Cargo.lock index bdd59547a9..0d93a4a45e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3170,6 +3170,15 @@ dependencies = [ "uuid", ] +[[package]] +name = "delta-encoding" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f8513a5eeb3d7b9149563409dc4ab6fd9de5767fd285af5b4d0ee1b778fbce0" +dependencies = [ + "num-traits", +] + [[package]] name = "der" version = "0.5.1" @@ -5806,10 +5815,12 @@ dependencies = [ "common-test-util", "common-time", "common-wal", + "delta-encoding", "futures", "futures-util", "itertools 0.10.5", "lazy_static", + "object-store", "pin-project", "prometheus", "protobuf", diff --git a/config/config.md b/config/config.md index cf622daf9f..295eacf4f1 100644 --- a/config/config.md +++ b/config/config.md @@ -374,6 +374,8 @@ | `wal.backoff_max` | String | `10s` | The maximum backoff delay.
**It's only used when the provider is `kafka`**. | | `wal.backoff_base` | Integer | `2` | The exponential backoff rate, i.e. next backoff = base * current backoff.
**It's only used when the provider is `kafka`**. | | `wal.backoff_deadline` | String | `5mins` | The deadline of retries.
**It's only used when the provider is `kafka`**. | +| `wal.create_index` | Bool | `true` | Whether to enable WAL index creation.
**It's only used when the provider is `kafka`**. | +| `wal.dump_index_interval` | String | `60s` | The interval for dumping WAL indexes.
**It's only used when the provider is `kafka`**. | | `storage` | -- | -- | The data storage options. | | `storage.data_home` | String | `/tmp/greptimedb/` | The working home directory. | | `storage.type` | String | `File` | The storage type used to store the data.
- `File`: the data is stored in the local file system.
- `S3`: the data is stored in the S3 object storage.
- `Gcs`: the data is stored in the Google Cloud Storage.
- `Azblob`: the data is stored in the Azure Blob Storage.
- `Oss`: the data is stored in the Aliyun OSS. | diff --git a/config/datanode.example.toml b/config/datanode.example.toml index c5cc04ebca..caaff9a3fb 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -187,6 +187,14 @@ backoff_base = 2 ## **It's only used when the provider is `kafka`**. backoff_deadline = "5mins" +## Whether to enable WAL index creation. +## **It's only used when the provider is `kafka`**. +create_index = true + +## The interval for dumping WAL indexes. +## **It's only used when the provider is `kafka`**. +dump_index_interval = "60s" + # The Kafka SASL configuration. # **It's only used when the provider is `kafka`**. # Available SASL mechanisms: diff --git a/src/common/wal/src/config.rs b/src/common/wal/src/config.rs index 9bf3280c5a..90f3e44f9c 100644 --- a/src/common/wal/src/config.rs +++ b/src/common/wal/src/config.rs @@ -30,6 +30,7 @@ pub enum MetasrvWalConfig { Kafka(MetasrvKafkaConfig), } +#[allow(clippy::large_enum_variant)] /// Wal configurations for datanode. #[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] #[serde(tag = "provider", rename_all = "snake_case")] @@ -223,6 +224,7 @@ mod tests { replication_factor: 1, create_topic_timeout: Duration::from_secs(30), }, + ..Default::default() }; assert_eq!(datanode_wal_config, DatanodeWalConfig::Kafka(expected)); } diff --git a/src/common/wal/src/config/kafka/datanode.rs b/src/common/wal/src/config/kafka/datanode.rs index a1260c05ef..84e9da6bcc 100644 --- a/src/common/wal/src/config/kafka/datanode.rs +++ b/src/common/wal/src/config/kafka/datanode.rs @@ -40,6 +40,9 @@ pub struct DatanodeKafkaConfig { /// The kafka topic config. #[serde(flatten)] pub kafka_topic: KafkaTopicConfig, + pub create_index: bool, + #[serde(with = "humantime_serde")] + pub dump_index_interval: Duration, } impl Default for DatanodeKafkaConfig { @@ -51,6 +54,8 @@ impl Default for DatanodeKafkaConfig { consumer_wait_timeout: Duration::from_millis(100), backoff: BackoffConfig::default(), kafka_topic: KafkaTopicConfig::default(), + create_index: true, + dump_index_interval: Duration::from_secs(60), } } } diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index ceb40081d1..eca551a4a0 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -16,6 +16,7 @@ use std::path::Path; use std::sync::Arc; +use std::time::Duration; use catalog::memory::MemoryCatalogManager; use common_base::Plugins; @@ -32,6 +33,7 @@ use common_wal::config::DatanodeWalConfig; use file_engine::engine::FileRegionEngine; use futures_util::TryStreamExt; use log_store::kafka::log_store::KafkaLogStore; +use log_store::kafka::{default_index_file, GlobalIndexCollector}; use log_store::raft_engine::log_store::RaftEngineLogStore; use meta_client::MetaClientRef; use metric_engine::engine::MetricEngine; @@ -64,7 +66,7 @@ use crate::event_listener::{ use crate::greptimedb_telemetry::get_greptimedb_telemetry_task; use crate::heartbeat::HeartbeatTask; use crate::region_server::{DummyTableProviderFactory, RegionServer}; -use crate::store; +use crate::store::{self, new_object_store_without_cache}; /// Datanode service. pub struct Datanode { @@ -398,15 +400,37 @@ impl DatanodeBuilder { ) .await .context(BuildMitoEngineSnafu)?, - DatanodeWalConfig::Kafka(kafka_config) => MitoEngine::new( - &opts.storage.data_home, - config, - Self::build_kafka_log_store(kafka_config).await?, - object_store_manager, - plugins, - ) - .await - .context(BuildMitoEngineSnafu)?, + DatanodeWalConfig::Kafka(kafka_config) => { + if kafka_config.create_index && opts.node_id.is_none() { + warn!("The WAL index creation only available in distributed mode.") + } + let global_index_collector = if kafka_config.create_index && opts.node_id.is_some() + { + let operator = new_object_store_without_cache( + &opts.storage.store, + &opts.storage.data_home, + ) + .await?; + let path = default_index_file(opts.node_id.unwrap()); + Some(Self::build_global_index_collector( + kafka_config.dump_index_interval, + operator, + path, + )) + } else { + None + }; + + MitoEngine::new( + &opts.storage.data_home, + config, + Self::build_kafka_log_store(kafka_config, global_index_collector).await?, + object_store_manager, + plugins, + ) + .await + .context(BuildMitoEngineSnafu)? + } }; Ok(mito_engine) } @@ -438,14 +462,26 @@ impl DatanodeBuilder { Ok(Arc::new(logstore)) } - /// Builds [KafkaLogStore]. - async fn build_kafka_log_store(config: &DatanodeKafkaConfig) -> Result> { - KafkaLogStore::try_new(config) + /// Builds [`KafkaLogStore`]. + async fn build_kafka_log_store( + config: &DatanodeKafkaConfig, + global_index_collector: Option, + ) -> Result> { + KafkaLogStore::try_new(config, global_index_collector) .await .map_err(Box::new) .context(OpenLogStoreSnafu) .map(Arc::new) } + + /// Builds [`GlobalIndexCollector`] + fn build_global_index_collector( + dump_index_interval: Duration, + operator: object_store::ObjectStore, + path: String, + ) -> GlobalIndexCollector { + GlobalIndexCollector::new(dump_index_interval, operator, path) + } } /// Open all regions belong to this datanode. diff --git a/src/datanode/src/store.rs b/src/datanode/src/store.rs index 877f044974..16b6e0bc8b 100644 --- a/src/datanode/src/store.rs +++ b/src/datanode/src/store.rs @@ -29,18 +29,18 @@ use common_telemetry::{info, warn}; use object_store::layers::{LruCacheLayer, RetryInterceptor, RetryLayer}; use object_store::services::Fs; use object_store::util::{join_dir, normalize_dir, with_instrument_layers}; -use object_store::{Error, HttpClient, ObjectStore, ObjectStoreBuilder}; +use object_store::{Access, Error, HttpClient, ObjectStore, ObjectStoreBuilder}; use snafu::prelude::*; use crate::config::{ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE}; use crate::error::{self, Result}; -pub(crate) async fn new_object_store( - store: ObjectStoreConfig, +pub(crate) async fn new_raw_object_store( + store: &ObjectStoreConfig, data_home: &str, ) -> Result { let data_home = normalize_dir(data_home); - let object_store = match &store { + let object_store = match store { ObjectStoreConfig::File(file_config) => { fs::new_fs_object_store(&data_home, file_config).await } @@ -51,27 +51,61 @@ pub(crate) async fn new_object_store( } ObjectStoreConfig::Gcs(gcs_config) => gcs::new_gcs_object_store(gcs_config).await, }?; + Ok(object_store) +} +fn with_retry_layers(object_store: ObjectStore) -> ObjectStore { + object_store.layer( + RetryLayer::new() + .with_jitter() + .with_notify(PrintDetailedError), + ) +} + +pub(crate) async fn new_object_store_without_cache( + store: &ObjectStoreConfig, + data_home: &str, +) -> Result { + let object_store = new_raw_object_store(store, data_home).await?; // Enable retry layer and cache layer for non-fs object storages let object_store = if !matches!(store, ObjectStoreConfig::File(..)) { - let object_store = create_object_store_with_cache(object_store, &store).await?; - object_store.layer( - RetryLayer::new() - .with_jitter() - .with_notify(PrintDetailedError), - ) + // Adds retry layer + with_retry_layers(object_store) } else { object_store }; - let store = with_instrument_layers(object_store, true); - Ok(store) + let object_store = with_instrument_layers(object_store, true); + Ok(object_store) } -async fn create_object_store_with_cache( - object_store: ObjectStore, - store_config: &ObjectStoreConfig, +pub(crate) async fn new_object_store( + store: ObjectStoreConfig, + data_home: &str, ) -> Result { + let object_store = new_raw_object_store(&store, data_home).await?; + // Enable retry layer and cache layer for non-fs object storages + let object_store = if !matches!(store, ObjectStoreConfig::File(..)) { + let object_store = if let Some(cache_layer) = build_cache_layer(&store).await? { + // Adds cache layer + object_store.layer(cache_layer) + } else { + object_store + }; + + // Adds retry layer + with_retry_layers(object_store) + } else { + object_store + }; + + let object_store = with_instrument_layers(object_store, true); + Ok(object_store) +} + +async fn build_cache_layer( + store_config: &ObjectStoreConfig, +) -> Result>> { let (cache_path, cache_capacity) = match store_config { ObjectStoreConfig::S3(s3_config) => { let path = s3_config.cache.cache_path.as_ref(); @@ -127,9 +161,9 @@ async fn create_object_store_with_cache( path, cache_capacity ); - Ok(object_store.layer(cache_layer)) + Ok(Some(cache_layer)) } else { - Ok(object_store) + Ok(None) } } @@ -175,7 +209,6 @@ pub(crate) fn build_http_client() -> Result { HttpClient::build(http_builder).context(error::InitBackendSnafu) } - struct PrintDetailedError; // PrintDetailedError is a retry interceptor that prints error in Debug format in retrying. diff --git a/src/log-store/Cargo.toml b/src/log-store/Cargo.toml index 6a84965974..b841bb3505 100644 --- a/src/log-store/Cargo.toml +++ b/src/log-store/Cargo.toml @@ -25,10 +25,12 @@ common-runtime.workspace = true common-telemetry.workspace = true common-time.workspace = true common-wal.workspace = true +delta-encoding = "0.4" futures.workspace = true futures-util.workspace = true itertools.workspace = true lazy_static.workspace = true +object-store.workspace = true pin-project.workspace = true prometheus.workspace = true protobuf = { version = "2", features = ["bytes"] } diff --git a/src/log-store/src/error.rs b/src/log-store/src/error.rs index 5572a05ddd..14648a8e92 100644 --- a/src/log-store/src/error.rs +++ b/src/log-store/src/error.rs @@ -272,7 +272,7 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to send produce request"))] + #[snafu(display("Failed to wait for ProduceResultReceiver"))] WaitProduceResultReceiver { #[snafu(implicit)] location: Location, @@ -280,6 +280,30 @@ pub enum Error { error: tokio::sync::oneshot::error::RecvError, }, + #[snafu(display("Failed to wait for result of DumpIndex"))] + WaitDumpIndex { + #[snafu(implicit)] + location: Location, + #[snafu(source)] + error: tokio::sync::oneshot::error::RecvError, + }, + + #[snafu(display("Failed to create writer"))] + CreateWriter { + #[snafu(implicit)] + location: Location, + #[snafu(source)] + error: object_store::Error, + }, + + #[snafu(display("Failed to write index"))] + WriteIndex { + #[snafu(implicit)] + location: Location, + #[snafu(source)] + error: object_store::Error, + }, + #[snafu(display( "The length of meta if exceeded the limit: {}, actual: {}", limit, diff --git a/src/log-store/src/kafka.rs b/src/log-store/src/kafka.rs index dfbf2f36d2..ad73b77039 100644 --- a/src/log-store/src/kafka.rs +++ b/src/log-store/src/kafka.rs @@ -13,18 +13,19 @@ // limitations under the License. pub(crate) mod client_manager; -// TODO(weny): remove it -#[allow(dead_code)] pub(crate) mod consumer; -#[allow(unused)] +/// TODO(weny): remove it. +#[allow(dead_code)] +#[allow(unused_imports)] pub(crate) mod index; pub mod log_store; pub(crate) mod producer; pub(crate) mod util; -// TODO(weny): remove it +/// TODO(weny): remove it. #[allow(dead_code)] pub(crate) mod worker; +pub use index::{default_index_file, GlobalIndexCollector}; use serde::{Deserialize, Serialize}; use store_api::logstore::entry::Id as EntryId; diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs index 3f4b4aecf7..6337683c93 100644 --- a/src/log-store/src/kafka/client_manager.rs +++ b/src/log-store/src/kafka/client_manager.rs @@ -68,7 +68,10 @@ pub(crate) struct ClientManager { impl ClientManager { /// Tries to create a ClientManager. - pub(crate) async fn try_new(config: &DatanodeKafkaConfig) -> Result { + pub(crate) async fn try_new( + config: &DatanodeKafkaConfig, + global_index_collector: Option, + ) -> Result { // Sets backoff config for the top-level kafka client and all clients constructed by it. let backoff_config = BackoffConfig { init_backoff: config.backoff.init, @@ -97,7 +100,7 @@ impl ClientManager { instances: RwLock::new(HashMap::new()), flush_batch_size: config.max_batch_bytes.as_bytes() as usize, compression: Compression::Lz4, - global_index_collector: None, + global_index_collector, }) } @@ -148,7 +151,9 @@ impl ClientManager { let (tx, rx) = OrderedBatchProducer::channel(); let index_collector = if let Some(global_collector) = self.global_index_collector.as_ref() { - global_collector.provider_level_index_collector(provider.clone(), tx.clone()) + global_collector + .provider_level_index_collector(provider.clone(), tx.clone()) + .await } else { Box::new(NoopCollector) }; @@ -163,6 +168,10 @@ impl ClientManager { Ok(Client { client, producer }) } + + pub(crate) fn global_index_collector(&self) -> Option<&GlobalIndexCollector> { + self.global_index_collector.as_ref() + } } #[cfg(test)] @@ -219,7 +228,7 @@ mod tests { }, ..Default::default() }; - let manager = ClientManager::try_new(&config).await.unwrap(); + let manager = ClientManager::try_new(&config, None).await.unwrap(); (manager, topics) } diff --git a/src/log-store/src/kafka/index.rs b/src/log-store/src/kafka/index.rs index 1c64637616..1bd0f3e621 100644 --- a/src/log-store/src/kafka/index.rs +++ b/src/log-store/src/kafka/index.rs @@ -13,12 +13,17 @@ // limitations under the License. mod collector; +mod encoder; mod iterator; -pub(crate) use collector::{ - GlobalIndexCollector, IndexCollector, IndexEncoder, NoopCollector, ProviderLevelIndexCollector, -}; +pub use collector::GlobalIndexCollector; +pub(crate) use collector::{IndexCollector, NoopCollector}; +pub(crate) use encoder::{IndexEncoder, JsonIndexEncoder}; pub(crate) use iterator::{ MultipleRegionWalIndexIterator, NextBatchHint, RegionWalIndexIterator, RegionWalRange, RegionWalVecIndex, }; + +pub fn default_index_file(datanode_id: u64) -> String { + format!("__datanode/{datanode_id}/index.json") +} diff --git a/src/log-store/src/kafka/index/collector.rs b/src/log-store/src/kafka/index/collector.rs index a8cb2546b6..1812223046 100644 --- a/src/log-store/src/kafka/index/collector.rs +++ b/src/log-store/src/kafka/index/collector.rs @@ -13,13 +13,11 @@ // limitations under the License. use std::collections::{BTreeSet, HashMap}; -use std::io::Write; -use std::sync::{Arc, Mutex}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; use std::time::Duration; -use bytes::buf::Writer; -use bytes::{BufMut, Bytes, BytesMut}; -use common_telemetry::tracing::error; +use common_telemetry::{error, info}; use futures::future::try_join_all; use serde::{Deserialize, Serialize}; use snafu::ResultExt; @@ -31,13 +29,9 @@ use tokio::sync::mpsc::Sender; use tokio::sync::Mutex as TokioMutex; use crate::error::{self, Result}; -use crate::kafka::worker::{DumpIndexRequest, WorkerRequest}; - -pub trait IndexEncoder: Send + Sync { - fn encode(&self, provider: &KafkaProvider, region_index: &RegionIndexes); - - fn finish(&self) -> Result>; -} +use crate::kafka::index::encoder::IndexEncoder; +use crate::kafka::index::JsonIndexEncoder; +use crate::kafka::worker::{DumpIndexRequest, TruncateIndexRequest, WorkerRequest}; /// The [`IndexCollector`] trait defines the operations for managing and collecting index entries. pub trait IndexCollector: Send + Sync { @@ -58,23 +52,148 @@ pub trait IndexCollector: Send + Sync { /// The [`GlobalIndexCollector`] struct is responsible for managing index entries /// across multiple providers. -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone)] pub struct GlobalIndexCollector { providers: Arc, Sender>>>, + task: CollectionTask, +} + +#[derive(Debug, Clone)] +pub struct CollectionTask { + providers: Arc, Sender>>>, + dump_index_interval: Duration, + operator: object_store::ObjectStore, + path: String, + running: Arc, +} + +impl CollectionTask { + async fn dump_index(&self) -> Result<()> { + let encoder = Arc::new(JsonIndexEncoder::default()); + let receivers = { + let providers = self.providers.lock().await; + let mut receivers = Vec::with_capacity(providers.len()); + for (provider, sender) in providers.iter() { + let (req, rx) = DumpIndexRequest::new(encoder.clone()); + receivers.push(rx); + if sender.send(WorkerRequest::DumpIndex(req)).await.is_err() { + error!( + "BackgroundProducerWorker is stopped, topic: {}", + provider.topic + ) + } + } + receivers + }; + try_join_all(receivers) + .await + .context(error::WaitDumpIndexSnafu)?; + let bytes = encoder.finish()?; + let mut writer = self + .operator + .writer(&self.path) + .await + .context(error::CreateWriterSnafu)?; + writer.write(bytes).await.context(error::WriteIndexSnafu)?; + writer.close().await.context(error::WriteIndexSnafu)?; + + Ok(()) + } + + /// The background task performs two main operations: + /// - Persists the WAL index to the specified `path` at every `dump_index_interval`. + /// - Updates the latest index ID for each WAL provider at every `checkpoint_interval`. + fn run(&self) { + let mut dump_index_interval = tokio::time::interval(self.dump_index_interval); + let running = self.running.clone(); + let moved_self = self.clone(); + common_runtime::spawn_global(async move { + loop { + if !running.load(Ordering::Relaxed) { + info!("shutdown the index collection task"); + break; + } + select! { + _ = dump_index_interval.tick() => { + if let Err(err) = moved_self.dump_index().await { + error!(err; "Failed to persist the WAL index"); + } + }, + } + } + }); + } +} + +impl Drop for CollectionTask { + fn drop(&mut self) { + self.running.store(false, Ordering::Relaxed); + } +} + +impl GlobalIndexCollector { + /// Constructs a [`GlobalIndexCollector`]. + /// + /// This method initializes a `GlobalIndexCollector` instance and starts a background task + /// for managing WAL (Write-Ahead Logging) indexes. + /// + /// The background task persists the WAL index to the specified `path` at every `dump_index_interval`. + pub fn new( + dump_index_interval: Duration, + operator: object_store::ObjectStore, + path: String, + ) -> Self { + let providers: Arc, Sender>>> = + Arc::new(Default::default()); + let task = CollectionTask { + providers: providers.clone(), + dump_index_interval, + operator, + path, + running: Arc::new(AtomicBool::new(true)), + }; + task.run(); + Self { providers, task } + } } impl GlobalIndexCollector { /// Creates a new [`ProviderLevelIndexCollector`] for a specified provider. - pub fn provider_level_index_collector( + pub(crate) async fn provider_level_index_collector( &self, provider: Arc, sender: Sender, ) -> Box { + self.providers.lock().await.insert(provider.clone(), sender); Box::new(ProviderLevelIndexCollector { indexes: Default::default(), provider, }) } + + /// Truncates the index for a specific region up to a given [`EntryId`]. + /// + /// It removes all [`EntryId`]s smaller than `entry_id`. + pub(crate) async fn truncate( + &self, + provider: &Arc, + region_id: RegionId, + entry_id: EntryId, + ) -> Result<()> { + if let Some(sender) = self.providers.lock().await.get(provider).cloned() { + if sender + .send(WorkerRequest::TruncateIndex(TruncateIndexRequest::new( + region_id, entry_id, + ))) + .await + .is_err() + { + return error::OrderedBatchProducerStoppedSnafu {}.fail(); + } + } + + Ok(()) + } } /// The [`RegionIndexes`] struct maintains indexes for a collection of regions. @@ -83,8 +202,8 @@ impl GlobalIndexCollector { /// latest [`EntryId`] across all regions. #[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct RegionIndexes { - regions: HashMap>, - latest_entry_id: EntryId, + pub(crate) regions: HashMap>, + pub(crate) latest_entry_id: EntryId, } impl RegionIndexes { @@ -145,5 +264,5 @@ impl IndexCollector for NoopCollector { fn set_latest_entry_id(&mut self, _entry_id: EntryId) {} - fn dump(&mut self, encoder: &dyn IndexEncoder) {} + fn dump(&mut self, _encoder: &dyn IndexEncoder) {} } diff --git a/src/log-store/src/kafka/index/encoder.rs b/src/log-store/src/kafka/index/encoder.rs new file mode 100644 index 0000000000..bfd11a982c --- /dev/null +++ b/src/log-store/src/kafka/index/encoder.rs @@ -0,0 +1,182 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{BTreeSet, HashMap}; +use std::sync::Mutex; + +use delta_encoding::{DeltaDecoderExt, DeltaEncoderExt}; +use serde::{Deserialize, Serialize}; +use snafu::ResultExt; +use store_api::logstore::provider::KafkaProvider; +use store_api::storage::RegionId; + +use crate::error::{self, Result}; +use crate::kafka::index::collector::RegionIndexes; + +/// Converts a [`RegionIndexes`] instance into a [`DeltaEncodedRegionIndexes`]. +/// +/// This conversion encodes the index values using delta encoding to reduce storage space. +impl From<&RegionIndexes> for DeltaEncodedRegionIndexes { + fn from(value: &RegionIndexes) -> Self { + let mut regions = HashMap::with_capacity(value.regions.len()); + for (region_id, indexes) in value.regions.iter() { + let indexes = indexes.iter().copied().deltas().collect(); + regions.insert(*region_id, indexes); + } + Self { + regions, + last_index: value.latest_entry_id, + } + } +} + +/// Represents the delta-encoded version of region indexes for efficient storage. +#[derive(Debug, Default, Serialize, Deserialize)] +pub struct DeltaEncodedRegionIndexes { + regions: HashMap>, + last_index: u64, +} + +impl DeltaEncodedRegionIndexes { + /// Retrieves the original (decoded) index values for a given region. + fn region(&self, region_id: RegionId) -> Option> { + let decoded = self + .regions + .get(®ion_id) + .map(|delta| delta.iter().copied().original().collect::>()); + + decoded + } + + /// Retrieves the last index. + fn last_index(&self) -> u64 { + self.last_index + } +} + +pub trait IndexEncoder: Send + Sync { + fn encode(&self, provider: &KafkaProvider, region_index: &RegionIndexes); + + fn finish(&self) -> Result>; +} + +/// [`DatanodeWalIndexes`] structure holds the WAL indexes for a datanode. +#[derive(Debug, Default, Serialize, Deserialize)] +pub(crate) struct DatanodeWalIndexes(HashMap); + +impl DatanodeWalIndexes { + fn insert(&mut self, topic: String, region_index: &RegionIndexes) { + self.0.insert(topic, region_index.into()); + } + + fn encode(&mut self) -> Result> { + let value = serde_json::to_vec(&self.0).context(error::EncodeJsonSnafu); + self.0.clear(); + value + } + + fn decode(byte: &[u8]) -> Result { + serde_json::from_slice(byte).context(error::DecodeJsonSnafu) + } + + /// Retrieves the delta encoded region indexes for a given `provider`. + pub(crate) fn provider(&self, provider: &KafkaProvider) -> Option<&DeltaEncodedRegionIndexes> { + self.0.get(&provider.topic) + } +} + +/// [`JsonIndexEncoder`] encodes the [`RegionIndexes`]s into JSON format. +#[derive(Debug, Default)] +pub(crate) struct JsonIndexEncoder { + buf: Mutex, +} + +impl IndexEncoder for JsonIndexEncoder { + fn encode(&self, provider: &KafkaProvider, region_index: &RegionIndexes) { + self.buf + .lock() + .unwrap() + .insert(provider.topic.to_string(), region_index); + } + + fn finish(&self) -> Result> { + let mut buf = self.buf.lock().unwrap(); + buf.encode() + } +} + +#[cfg(test)] +mod tests { + use std::collections::{BTreeSet, HashMap, HashSet}; + + use store_api::logstore::provider::KafkaProvider; + use store_api::storage::RegionId; + + use super::{DatanodeWalIndexes, IndexEncoder, JsonIndexEncoder}; + use crate::kafka::index::collector::RegionIndexes; + + #[test] + fn test_json_index_encoder() { + let encoder = JsonIndexEncoder::default(); + let topic_1 = KafkaProvider::new("my_topic_1".to_string()); + let region_1_indexes = BTreeSet::from([1u64, 2, 4, 5, 20]); + let region_2_indexes = BTreeSet::from([4u64, 12, 43, 54, 75]); + encoder.encode( + &topic_1, + &RegionIndexes { + regions: HashMap::from([ + (RegionId::new(1, 1), region_1_indexes.clone()), + (RegionId::new(1, 2), region_2_indexes.clone()), + ]), + latest_entry_id: 1024, + }, + ); + let topic_2 = KafkaProvider::new("my_topic_2".to_string()); + encoder.encode( + &topic_2, + &RegionIndexes { + regions: HashMap::from([ + ( + RegionId::new(1, 1), + BTreeSet::from([1024u64, 1025, 1026, 1028, 2048]), + ), + (RegionId::new(1, 2), BTreeSet::from([1512])), + ]), + latest_entry_id: 2048, + }, + ); + + let bytes = encoder.finish().unwrap(); + let datanode_index = DatanodeWalIndexes::decode(&bytes).unwrap(); + assert_eq!( + datanode_index + .provider(&topic_1) + .unwrap() + .region(RegionId::new(1, 1)) + .unwrap(), + region_1_indexes, + ); + assert_eq!( + datanode_index + .provider(&topic_1) + .unwrap() + .region(RegionId::new(1, 2)) + .unwrap(), + region_2_indexes, + ); + assert!(datanode_index + .provider(&KafkaProvider::new("my_topic_3".to_string())) + .is_none()); + } +} diff --git a/src/log-store/src/kafka/index/iterator.rs b/src/log-store/src/kafka/index/iterator.rs index 8a33cf1d9a..7df2518752 100644 --- a/src/log-store/src/kafka/index/iterator.rs +++ b/src/log-store/src/kafka/index/iterator.rs @@ -12,14 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::cmp::{max, min}; +use std::cmp::min; use std::collections::VecDeque; -use std::iter::Peekable; -use std::marker::PhantomData; -use std::ops::{Add, Mul, Range, Sub}; +use std::ops::Range; -use chrono::format::Item; -use itertools::Itertools; use store_api::logstore::EntryId; use crate::kafka::util::range::{ConvertIndexToRange, MergeRange}; @@ -197,7 +193,7 @@ mod tests { #[test] fn test_region_wal_range() { - let mut range = RegionWalRange::new(0..1024, 1024); + let range = RegionWalRange::new(0..1024, 1024); assert_eq!( range.next_batch_hint(10), Some(NextBatchHint { diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs index 28f6e247d4..69afa2e6b7 100644 --- a/src/log-store/src/kafka/log_store.rs +++ b/src/log-store/src/kafka/log_store.rs @@ -32,6 +32,7 @@ use store_api::storage::RegionId; use crate::error::{self, ConsumeRecordSnafu, Error, GetOffsetSnafu, InvalidProviderSnafu, Result}; use crate::kafka::client_manager::{ClientManager, ClientManagerRef}; +use crate::kafka::index::GlobalIndexCollector; use crate::kafka::producer::OrderedBatchProducerRef; use crate::kafka::util::record::{ convert_to_kafka_records, maybe_emit_entry, remaining_entries, Record, ESTIMATED_META_SIZE, @@ -51,8 +52,12 @@ pub struct KafkaLogStore { impl KafkaLogStore { /// Tries to create a Kafka log store. - pub async fn try_new(config: &DatanodeKafkaConfig) -> Result { - let client_manager = Arc::new(ClientManager::try_new(config).await?); + pub async fn try_new( + config: &DatanodeKafkaConfig, + global_index_collector: Option, + ) -> Result { + let client_manager = + Arc::new(ClientManager::try_new(config, global_index_collector).await?); Ok(Self { client_manager, @@ -329,7 +334,21 @@ impl LogStore for KafkaLogStore { /// Marks all entries with ids `<=entry_id` of the given `namespace` as obsolete, /// so that the log store can safely delete those entries. This method does not guarantee /// that the obsolete entries are deleted immediately. - async fn obsolete(&self, _provider: &Provider, _entry_id: EntryId) -> Result<()> { + async fn obsolete( + &self, + provider: &Provider, + region_id: RegionId, + entry_id: EntryId, + ) -> Result<()> { + if let Some(collector) = self.client_manager.global_index_collector() { + let provider = provider + .as_kafka_provider() + .with_context(|| InvalidProviderSnafu { + expected: KafkaProvider::type_name(), + actual: provider.type_name(), + })?; + collector.truncate(provider, region_id, entry_id).await?; + } Ok(()) } @@ -468,7 +487,7 @@ mod tests { max_batch_bytes: ReadableSize::kb(32), ..Default::default() }; - let logstore = KafkaLogStore::try_new(&config).await.unwrap(); + let logstore = KafkaLogStore::try_new(&config, None).await.unwrap(); let topic_name = uuid::Uuid::new_v4().to_string(); let provider = Provider::kafka_provider(topic_name); let region_entries = (0..5) @@ -540,7 +559,7 @@ mod tests { max_batch_bytes: ReadableSize::kb(8), ..Default::default() }; - let logstore = KafkaLogStore::try_new(&config).await.unwrap(); + let logstore = KafkaLogStore::try_new(&config, None).await.unwrap(); let topic_name = uuid::Uuid::new_v4().to_string(); let provider = Provider::kafka_provider(topic_name); let region_entries = (0..5) diff --git a/src/log-store/src/kafka/worker.rs b/src/log-store/src/kafka/worker.rs index 972d56d6f1..b05351d172 100644 --- a/src/log-store/src/kafka/worker.rs +++ b/src/log-store/src/kafka/worker.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub(crate) mod checkpoint; +pub(crate) mod dump_index; pub(crate) mod flush; pub(crate) mod produce; @@ -29,14 +29,12 @@ use store_api::storage::RegionId; use tokio::sync::mpsc::Receiver; use tokio::sync::oneshot::{self}; -use super::index::IndexEncoder; use crate::error::{self, NoMaxValueSnafu, Result}; -use crate::kafka::index::IndexCollector; +use crate::kafka::index::{IndexCollector, IndexEncoder}; use crate::kafka::producer::ProducerClient; pub(crate) enum WorkerRequest { Produce(ProduceRequest), - Checkpoint, TruncateIndex(TruncateIndexRequest), DumpIndex(DumpIndexRequest), } @@ -82,6 +80,15 @@ pub(crate) struct TruncateIndexRequest { entry_id: EntryId, } +impl TruncateIndexRequest { + pub fn new(region_id: RegionId, entry_id: EntryId) -> Self { + Self { + region_id, + entry_id, + } + } +} + pub(crate) struct ProduceRequest { region_id: RegionId, batch: Vec, @@ -179,27 +186,18 @@ impl BackgroundProducerWorker { async fn handle_requests(&mut self, buffer: &mut Vec) { let mut produce_requests = Vec::with_capacity(buffer.len()); - let mut do_checkpoint = false; for req in buffer.drain(..) { match req { WorkerRequest::Produce(req) => produce_requests.push(req), - WorkerRequest::Checkpoint => do_checkpoint = true, WorkerRequest::TruncateIndex(TruncateIndexRequest { region_id, entry_id, }) => self.index_collector.truncate(region_id, entry_id), - WorkerRequest::DumpIndex(req) => { - self.index_collector.dump(req.encoder.as_ref()); - let _ = req.sender.send(()); - } + WorkerRequest::DumpIndex(req) => self.dump_index(req).await, } } let pending_requests = self.aggregate_records(&mut produce_requests, self.max_batch_bytes); self.try_flush_pending_requests(pending_requests).await; - - if do_checkpoint { - self.do_checkpoint().await; - } } } diff --git a/src/log-store/src/kafka/worker/checkpoint.rs b/src/log-store/src/kafka/worker/dump_index.rs similarity index 76% rename from src/log-store/src/kafka/worker/checkpoint.rs rename to src/log-store/src/kafka/worker/dump_index.rs index 0ef6cb8627..92b1d34650 100644 --- a/src/log-store/src/kafka/worker/checkpoint.rs +++ b/src/log-store/src/kafka/worker/dump_index.rs @@ -16,11 +16,12 @@ use common_telemetry::error; use rskafka::client::partition::OffsetAt; use snafu::ResultExt; +use super::DumpIndexRequest; use crate::error; use crate::kafka::worker::BackgroundProducerWorker; impl BackgroundProducerWorker { - pub(crate) async fn do_checkpoint(&mut self) { + pub(crate) async fn dump_index(&mut self, req: DumpIndexRequest) { match self .client .get_offset(OffsetAt::Latest) @@ -28,7 +29,11 @@ impl BackgroundProducerWorker { .context(error::GetOffsetSnafu { topic: &self.provider.topic, }) { - Ok(offset) => self.index_collector.set_latest_entry_id(offset as u64), + Ok(offset) => { + self.index_collector.set_latest_entry_id(offset as u64); + self.index_collector.dump(req.encoder.as_ref()); + let _ = req.sender.send(()); + } Err(err) => error!(err; "Failed to do checkpoint"), } } diff --git a/src/log-store/src/raft_engine/log_store.rs b/src/log-store/src/raft_engine/log_store.rs index e6486da460..08f4019e0c 100644 --- a/src/log-store/src/raft_engine/log_store.rs +++ b/src/log-store/src/raft_engine/log_store.rs @@ -417,7 +417,12 @@ impl LogStore for RaftEngineLogStore { })) } - async fn obsolete(&self, provider: &Provider, entry_id: EntryId) -> Result<()> { + async fn obsolete( + &self, + provider: &Provider, + _region_id: RegionId, + entry_id: EntryId, + ) -> Result<()> { let ns = provider .as_raft_engine_provider() .with_context(|| InvalidProviderSnafu { @@ -637,7 +642,8 @@ mod tests { let dir = create_temp_dir("raft-engine-logstore-test"); let logstore = new_test_log_store(&dir).await; - let namespace_id = 42; + let region_id = RegionId::new(1, 1); + let namespace_id = region_id.as_u64(); let namespace = Provider::raft_engine_provider(namespace_id); for id in 0..4096 { let entry = EntryImpl::create(id, namespace_id, [b'x'; 4096].to_vec()).into(); @@ -645,7 +651,10 @@ mod tests { } let before_purge = wal_dir_usage(dir.path().to_str().unwrap()).await; - logstore.obsolete(&namespace, 4000).await.unwrap(); + logstore + .obsolete(&namespace, region_id, 4000) + .await + .unwrap(); tokio::time::sleep(Duration::from_secs(6)).await; let after_purge = wal_dir_usage(dir.path().to_str().unwrap()).await; @@ -662,14 +671,15 @@ mod tests { let dir = create_temp_dir("raft-engine-logstore-test"); let logstore = new_test_log_store(&dir).await; - let namespace_id = 42; + let region_id = RegionId::new(1, 1); + let namespace_id = region_id.as_u64(); let namespace = Provider::raft_engine_provider(namespace_id); for id in 0..1024 { let entry = EntryImpl::create(id, namespace_id, [b'x'; 4096].to_vec()).into(); let _ = logstore.append(entry).await.unwrap(); } - logstore.obsolete(&namespace, 100).await.unwrap(); + logstore.obsolete(&namespace, region_id, 100).await.unwrap(); assert_eq!(101, logstore.engine.first_index(namespace_id).unwrap()); let res = logstore.read(&namespace, 100).await.unwrap(); diff --git a/src/log-store/src/test_util/log_store_util.rs b/src/log-store/src/test_util/log_store_util.rs index f78b5a965d..b1fd183fba 100644 --- a/src/log-store/src/test_util/log_store_util.rs +++ b/src/log-store/src/test_util/log_store_util.rs @@ -34,13 +34,16 @@ pub async fn create_tmp_local_file_log_store>(path: P) -> RaftEng /// Create a [KafkaLogStore]. pub async fn create_kafka_log_store(broker_endpoints: Vec) -> KafkaLogStore { - KafkaLogStore::try_new(&DatanodeKafkaConfig { - connection: KafkaConnectionConfig { - broker_endpoints, + KafkaLogStore::try_new( + &DatanodeKafkaConfig { + connection: KafkaConnectionConfig { + broker_endpoints, + ..Default::default() + }, ..Default::default() }, - ..Default::default() - }) + None, + ) .await .unwrap() } diff --git a/src/mito2/src/wal.rs b/src/mito2/src/wal.rs index de6ad67b32..7413f52b2c 100644 --- a/src/mito2/src/wal.rs +++ b/src/mito2/src/wal.rs @@ -89,7 +89,7 @@ impl Wal { move |region_id, last_entry_id, provider| -> BoxFuture<'_, Result<()>> { Box::pin(async move { store - .obsolete(provider, last_entry_id) + .obsolete(provider, region_id, last_entry_id) .await .map_err(BoxedError::new) .context(DeleteWalSnafu { region_id }) @@ -142,7 +142,7 @@ impl Wal { provider: &Provider, ) -> Result<()> { self.store - .obsolete(provider, last_id) + .obsolete(provider, region_id, last_id) .await .map_err(BoxedError::new) .context(DeleteWalSnafu { region_id }) diff --git a/src/mito2/src/wal/raw_entry_reader.rs b/src/mito2/src/wal/raw_entry_reader.rs index 6dd11c2c8f..7436dec06a 100644 --- a/src/mito2/src/wal/raw_entry_reader.rs +++ b/src/mito2/src/wal/raw_entry_reader.rs @@ -168,6 +168,7 @@ mod tests { async fn obsolete( &self, _provider: &Provider, + _region_id: RegionId, _entry_id: EntryId, ) -> Result<(), Self::Error> { unreachable!() diff --git a/src/object-store/src/lib.rs b/src/object-store/src/lib.rs index ae561d4bbc..d94194ffdb 100644 --- a/src/object-store/src/lib.rs +++ b/src/object-store/src/lib.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub use opendal::raw::{normalize_path as raw_normalize_path, HttpClient}; +pub use opendal::raw::{normalize_path as raw_normalize_path, Access, HttpClient}; pub use opendal::{ services, Builder as ObjectStoreBuilder, Entry, EntryMode, Error, ErrorKind, FuturesAsyncReader, FuturesAsyncWriter, Lister, Metakey, Operator as ObjectStore, Reader, diff --git a/src/store-api/src/logstore.rs b/src/store-api/src/logstore.rs index 3476439827..32ab95f5c8 100644 --- a/src/store-api/src/logstore.rs +++ b/src/store-api/src/logstore.rs @@ -62,7 +62,12 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug { /// Marks all entries with ids `<=entry_id` of the given `namespace` as obsolete, /// so that the log store can safely delete those entries. This method does not guarantee /// that the obsolete entries are deleted immediately. - async fn obsolete(&self, provider: &Provider, entry_id: EntryId) -> Result<(), Self::Error>; + async fn obsolete( + &self, + provider: &Provider, + region_id: RegionId, + entry_id: EntryId, + ) -> Result<(), Self::Error>; /// Makes an entry instance of the associated Entry type fn entry(