feat(log_store): introduce the CollectionTask (#4530)

* feat: introduce the `CollectionTask`

* feat: add config of index collector

* chore: remove unused code

* feat: truncate indexes

* chore: apply suggestions from CR

* chore: update config examples

* refactor: retrieve latest offset while dumping indexes

* chore: print warn
This commit is contained in:
Weny Xu
2024-08-19 11:48:35 +08:00
committed by GitHub
parent 2a73e0937f
commit 76dc906574
24 changed files with 578 additions and 102 deletions

11
Cargo.lock generated
View File

@@ -3170,6 +3170,15 @@ dependencies = [
"uuid",
]
[[package]]
name = "delta-encoding"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4f8513a5eeb3d7b9149563409dc4ab6fd9de5767fd285af5b4d0ee1b778fbce0"
dependencies = [
"num-traits",
]
[[package]]
name = "der"
version = "0.5.1"
@@ -5806,10 +5815,12 @@ dependencies = [
"common-test-util",
"common-time",
"common-wal",
"delta-encoding",
"futures",
"futures-util",
"itertools 0.10.5",
"lazy_static",
"object-store",
"pin-project",
"prometheus",
"protobuf",

View File

@@ -374,6 +374,8 @@
| `wal.backoff_max` | String | `10s` | The maximum backoff delay.<br/>**It's only used when the provider is `kafka`**. |
| `wal.backoff_base` | Integer | `2` | The exponential backoff rate, i.e. next backoff = base * current backoff.<br/>**It's only used when the provider is `kafka`**. |
| `wal.backoff_deadline` | String | `5mins` | The deadline of retries.<br/>**It's only used when the provider is `kafka`**. |
| `wal.create_index` | Bool | `true` | Whether to enable WAL index creation.<br/>**It's only used when the provider is `kafka`**. |
| `wal.dump_index_interval` | String | `60s` | The interval for dumping WAL indexes.<br/>**It's only used when the provider is `kafka`**. |
| `storage` | -- | -- | The data storage options. |
| `storage.data_home` | String | `/tmp/greptimedb/` | The working home directory. |
| `storage.type` | String | `File` | The storage type used to store the data.<br/>- `File`: the data is stored in the local file system.<br/>- `S3`: the data is stored in the S3 object storage.<br/>- `Gcs`: the data is stored in the Google Cloud Storage.<br/>- `Azblob`: the data is stored in the Azure Blob Storage.<br/>- `Oss`: the data is stored in the Aliyun OSS. |

View File

@@ -187,6 +187,14 @@ backoff_base = 2
## **It's only used when the provider is `kafka`**.
backoff_deadline = "5mins"
## Whether to enable WAL index creation.
## **It's only used when the provider is `kafka`**.
create_index = true
## The interval for dumping WAL indexes.
## **It's only used when the provider is `kafka`**.
dump_index_interval = "60s"
# The Kafka SASL configuration.
# **It's only used when the provider is `kafka`**.
# Available SASL mechanisms:

View File

@@ -30,6 +30,7 @@ pub enum MetasrvWalConfig {
Kafka(MetasrvKafkaConfig),
}
#[allow(clippy::large_enum_variant)]
/// Wal configurations for datanode.
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
#[serde(tag = "provider", rename_all = "snake_case")]
@@ -223,6 +224,7 @@ mod tests {
replication_factor: 1,
create_topic_timeout: Duration::from_secs(30),
},
..Default::default()
};
assert_eq!(datanode_wal_config, DatanodeWalConfig::Kafka(expected));
}

View File

@@ -40,6 +40,9 @@ pub struct DatanodeKafkaConfig {
/// The kafka topic config.
#[serde(flatten)]
pub kafka_topic: KafkaTopicConfig,
pub create_index: bool,
#[serde(with = "humantime_serde")]
pub dump_index_interval: Duration,
}
impl Default for DatanodeKafkaConfig {
@@ -51,6 +54,8 @@ impl Default for DatanodeKafkaConfig {
consumer_wait_timeout: Duration::from_millis(100),
backoff: BackoffConfig::default(),
kafka_topic: KafkaTopicConfig::default(),
create_index: true,
dump_index_interval: Duration::from_secs(60),
}
}
}

View File

@@ -16,6 +16,7 @@
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use catalog::memory::MemoryCatalogManager;
use common_base::Plugins;
@@ -32,6 +33,7 @@ use common_wal::config::DatanodeWalConfig;
use file_engine::engine::FileRegionEngine;
use futures_util::TryStreamExt;
use log_store::kafka::log_store::KafkaLogStore;
use log_store::kafka::{default_index_file, GlobalIndexCollector};
use log_store::raft_engine::log_store::RaftEngineLogStore;
use meta_client::MetaClientRef;
use metric_engine::engine::MetricEngine;
@@ -64,7 +66,7 @@ use crate::event_listener::{
use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
use crate::heartbeat::HeartbeatTask;
use crate::region_server::{DummyTableProviderFactory, RegionServer};
use crate::store;
use crate::store::{self, new_object_store_without_cache};
/// Datanode service.
pub struct Datanode {
@@ -398,15 +400,37 @@ impl DatanodeBuilder {
)
.await
.context(BuildMitoEngineSnafu)?,
DatanodeWalConfig::Kafka(kafka_config) => MitoEngine::new(
&opts.storage.data_home,
config,
Self::build_kafka_log_store(kafka_config).await?,
object_store_manager,
plugins,
)
.await
.context(BuildMitoEngineSnafu)?,
DatanodeWalConfig::Kafka(kafka_config) => {
if kafka_config.create_index && opts.node_id.is_none() {
warn!("The WAL index creation only available in distributed mode.")
}
let global_index_collector = if kafka_config.create_index && opts.node_id.is_some()
{
let operator = new_object_store_without_cache(
&opts.storage.store,
&opts.storage.data_home,
)
.await?;
let path = default_index_file(opts.node_id.unwrap());
Some(Self::build_global_index_collector(
kafka_config.dump_index_interval,
operator,
path,
))
} else {
None
};
MitoEngine::new(
&opts.storage.data_home,
config,
Self::build_kafka_log_store(kafka_config, global_index_collector).await?,
object_store_manager,
plugins,
)
.await
.context(BuildMitoEngineSnafu)?
}
};
Ok(mito_engine)
}
@@ -438,14 +462,26 @@ impl DatanodeBuilder {
Ok(Arc::new(logstore))
}
/// Builds [KafkaLogStore].
async fn build_kafka_log_store(config: &DatanodeKafkaConfig) -> Result<Arc<KafkaLogStore>> {
KafkaLogStore::try_new(config)
/// Builds [`KafkaLogStore`].
async fn build_kafka_log_store(
config: &DatanodeKafkaConfig,
global_index_collector: Option<GlobalIndexCollector>,
) -> Result<Arc<KafkaLogStore>> {
KafkaLogStore::try_new(config, global_index_collector)
.await
.map_err(Box::new)
.context(OpenLogStoreSnafu)
.map(Arc::new)
}
/// Builds [`GlobalIndexCollector`]
fn build_global_index_collector(
dump_index_interval: Duration,
operator: object_store::ObjectStore,
path: String,
) -> GlobalIndexCollector {
GlobalIndexCollector::new(dump_index_interval, operator, path)
}
}
/// Open all regions belong to this datanode.

View File

@@ -29,18 +29,18 @@ use common_telemetry::{info, warn};
use object_store::layers::{LruCacheLayer, RetryInterceptor, RetryLayer};
use object_store::services::Fs;
use object_store::util::{join_dir, normalize_dir, with_instrument_layers};
use object_store::{Error, HttpClient, ObjectStore, ObjectStoreBuilder};
use object_store::{Access, Error, HttpClient, ObjectStore, ObjectStoreBuilder};
use snafu::prelude::*;
use crate::config::{ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE};
use crate::error::{self, Result};
pub(crate) async fn new_object_store(
store: ObjectStoreConfig,
pub(crate) async fn new_raw_object_store(
store: &ObjectStoreConfig,
data_home: &str,
) -> Result<ObjectStore> {
let data_home = normalize_dir(data_home);
let object_store = match &store {
let object_store = match store {
ObjectStoreConfig::File(file_config) => {
fs::new_fs_object_store(&data_home, file_config).await
}
@@ -51,27 +51,61 @@ pub(crate) async fn new_object_store(
}
ObjectStoreConfig::Gcs(gcs_config) => gcs::new_gcs_object_store(gcs_config).await,
}?;
Ok(object_store)
}
fn with_retry_layers(object_store: ObjectStore) -> ObjectStore {
object_store.layer(
RetryLayer::new()
.with_jitter()
.with_notify(PrintDetailedError),
)
}
pub(crate) async fn new_object_store_without_cache(
store: &ObjectStoreConfig,
data_home: &str,
) -> Result<ObjectStore> {
let object_store = new_raw_object_store(store, data_home).await?;
// Enable retry layer and cache layer for non-fs object storages
let object_store = if !matches!(store, ObjectStoreConfig::File(..)) {
let object_store = create_object_store_with_cache(object_store, &store).await?;
object_store.layer(
RetryLayer::new()
.with_jitter()
.with_notify(PrintDetailedError),
)
// Adds retry layer
with_retry_layers(object_store)
} else {
object_store
};
let store = with_instrument_layers(object_store, true);
Ok(store)
let object_store = with_instrument_layers(object_store, true);
Ok(object_store)
}
async fn create_object_store_with_cache(
object_store: ObjectStore,
store_config: &ObjectStoreConfig,
pub(crate) async fn new_object_store(
store: ObjectStoreConfig,
data_home: &str,
) -> Result<ObjectStore> {
let object_store = new_raw_object_store(&store, data_home).await?;
// Enable retry layer and cache layer for non-fs object storages
let object_store = if !matches!(store, ObjectStoreConfig::File(..)) {
let object_store = if let Some(cache_layer) = build_cache_layer(&store).await? {
// Adds cache layer
object_store.layer(cache_layer)
} else {
object_store
};
// Adds retry layer
with_retry_layers(object_store)
} else {
object_store
};
let object_store = with_instrument_layers(object_store, true);
Ok(object_store)
}
async fn build_cache_layer(
store_config: &ObjectStoreConfig,
) -> Result<Option<LruCacheLayer<impl Access>>> {
let (cache_path, cache_capacity) = match store_config {
ObjectStoreConfig::S3(s3_config) => {
let path = s3_config.cache.cache_path.as_ref();
@@ -127,9 +161,9 @@ async fn create_object_store_with_cache(
path, cache_capacity
);
Ok(object_store.layer(cache_layer))
Ok(Some(cache_layer))
} else {
Ok(object_store)
Ok(None)
}
}
@@ -175,7 +209,6 @@ pub(crate) fn build_http_client() -> Result<HttpClient> {
HttpClient::build(http_builder).context(error::InitBackendSnafu)
}
struct PrintDetailedError;
// PrintDetailedError is a retry interceptor that prints error in Debug format in retrying.

View File

@@ -25,10 +25,12 @@ common-runtime.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
common-wal.workspace = true
delta-encoding = "0.4"
futures.workspace = true
futures-util.workspace = true
itertools.workspace = true
lazy_static.workspace = true
object-store.workspace = true
pin-project.workspace = true
prometheus.workspace = true
protobuf = { version = "2", features = ["bytes"] }

View File

@@ -272,7 +272,7 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to send produce request"))]
#[snafu(display("Failed to wait for ProduceResultReceiver"))]
WaitProduceResultReceiver {
#[snafu(implicit)]
location: Location,
@@ -280,6 +280,30 @@ pub enum Error {
error: tokio::sync::oneshot::error::RecvError,
},
#[snafu(display("Failed to wait for result of DumpIndex"))]
WaitDumpIndex {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: tokio::sync::oneshot::error::RecvError,
},
#[snafu(display("Failed to create writer"))]
CreateWriter {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: object_store::Error,
},
#[snafu(display("Failed to write index"))]
WriteIndex {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: object_store::Error,
},
#[snafu(display(
"The length of meta if exceeded the limit: {}, actual: {}",
limit,

View File

@@ -13,18 +13,19 @@
// limitations under the License.
pub(crate) mod client_manager;
// TODO(weny): remove it
#[allow(dead_code)]
pub(crate) mod consumer;
#[allow(unused)]
/// TODO(weny): remove it.
#[allow(dead_code)]
#[allow(unused_imports)]
pub(crate) mod index;
pub mod log_store;
pub(crate) mod producer;
pub(crate) mod util;
// TODO(weny): remove it
/// TODO(weny): remove it.
#[allow(dead_code)]
pub(crate) mod worker;
pub use index::{default_index_file, GlobalIndexCollector};
use serde::{Deserialize, Serialize};
use store_api::logstore::entry::Id as EntryId;

View File

@@ -68,7 +68,10 @@ pub(crate) struct ClientManager {
impl ClientManager {
/// Tries to create a ClientManager.
pub(crate) async fn try_new(config: &DatanodeKafkaConfig) -> Result<Self> {
pub(crate) async fn try_new(
config: &DatanodeKafkaConfig,
global_index_collector: Option<GlobalIndexCollector>,
) -> Result<Self> {
// Sets backoff config for the top-level kafka client and all clients constructed by it.
let backoff_config = BackoffConfig {
init_backoff: config.backoff.init,
@@ -97,7 +100,7 @@ impl ClientManager {
instances: RwLock::new(HashMap::new()),
flush_batch_size: config.max_batch_bytes.as_bytes() as usize,
compression: Compression::Lz4,
global_index_collector: None,
global_index_collector,
})
}
@@ -148,7 +151,9 @@ impl ClientManager {
let (tx, rx) = OrderedBatchProducer::channel();
let index_collector = if let Some(global_collector) = self.global_index_collector.as_ref() {
global_collector.provider_level_index_collector(provider.clone(), tx.clone())
global_collector
.provider_level_index_collector(provider.clone(), tx.clone())
.await
} else {
Box::new(NoopCollector)
};
@@ -163,6 +168,10 @@ impl ClientManager {
Ok(Client { client, producer })
}
pub(crate) fn global_index_collector(&self) -> Option<&GlobalIndexCollector> {
self.global_index_collector.as_ref()
}
}
#[cfg(test)]
@@ -219,7 +228,7 @@ mod tests {
},
..Default::default()
};
let manager = ClientManager::try_new(&config).await.unwrap();
let manager = ClientManager::try_new(&config, None).await.unwrap();
(manager, topics)
}

View File

@@ -13,12 +13,17 @@
// limitations under the License.
mod collector;
mod encoder;
mod iterator;
pub(crate) use collector::{
GlobalIndexCollector, IndexCollector, IndexEncoder, NoopCollector, ProviderLevelIndexCollector,
};
pub use collector::GlobalIndexCollector;
pub(crate) use collector::{IndexCollector, NoopCollector};
pub(crate) use encoder::{IndexEncoder, JsonIndexEncoder};
pub(crate) use iterator::{
MultipleRegionWalIndexIterator, NextBatchHint, RegionWalIndexIterator, RegionWalRange,
RegionWalVecIndex,
};
pub fn default_index_file(datanode_id: u64) -> String {
format!("__datanode/{datanode_id}/index.json")
}

View File

@@ -13,13 +13,11 @@
// limitations under the License.
use std::collections::{BTreeSet, HashMap};
use std::io::Write;
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use bytes::buf::Writer;
use bytes::{BufMut, Bytes, BytesMut};
use common_telemetry::tracing::error;
use common_telemetry::{error, info};
use futures::future::try_join_all;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
@@ -31,13 +29,9 @@ use tokio::sync::mpsc::Sender;
use tokio::sync::Mutex as TokioMutex;
use crate::error::{self, Result};
use crate::kafka::worker::{DumpIndexRequest, WorkerRequest};
pub trait IndexEncoder: Send + Sync {
fn encode(&self, provider: &KafkaProvider, region_index: &RegionIndexes);
fn finish(&self) -> Result<Vec<u8>>;
}
use crate::kafka::index::encoder::IndexEncoder;
use crate::kafka::index::JsonIndexEncoder;
use crate::kafka::worker::{DumpIndexRequest, TruncateIndexRequest, WorkerRequest};
/// The [`IndexCollector`] trait defines the operations for managing and collecting index entries.
pub trait IndexCollector: Send + Sync {
@@ -58,23 +52,148 @@ pub trait IndexCollector: Send + Sync {
/// The [`GlobalIndexCollector`] struct is responsible for managing index entries
/// across multiple providers.
#[derive(Debug, Clone, Default)]
#[derive(Debug, Clone)]
pub struct GlobalIndexCollector {
providers: Arc<TokioMutex<HashMap<Arc<KafkaProvider>, Sender<WorkerRequest>>>>,
task: CollectionTask,
}
#[derive(Debug, Clone)]
pub struct CollectionTask {
providers: Arc<TokioMutex<HashMap<Arc<KafkaProvider>, Sender<WorkerRequest>>>>,
dump_index_interval: Duration,
operator: object_store::ObjectStore,
path: String,
running: Arc<AtomicBool>,
}
impl CollectionTask {
async fn dump_index(&self) -> Result<()> {
let encoder = Arc::new(JsonIndexEncoder::default());
let receivers = {
let providers = self.providers.lock().await;
let mut receivers = Vec::with_capacity(providers.len());
for (provider, sender) in providers.iter() {
let (req, rx) = DumpIndexRequest::new(encoder.clone());
receivers.push(rx);
if sender.send(WorkerRequest::DumpIndex(req)).await.is_err() {
error!(
"BackgroundProducerWorker is stopped, topic: {}",
provider.topic
)
}
}
receivers
};
try_join_all(receivers)
.await
.context(error::WaitDumpIndexSnafu)?;
let bytes = encoder.finish()?;
let mut writer = self
.operator
.writer(&self.path)
.await
.context(error::CreateWriterSnafu)?;
writer.write(bytes).await.context(error::WriteIndexSnafu)?;
writer.close().await.context(error::WriteIndexSnafu)?;
Ok(())
}
/// The background task performs two main operations:
/// - Persists the WAL index to the specified `path` at every `dump_index_interval`.
/// - Updates the latest index ID for each WAL provider at every `checkpoint_interval`.
fn run(&self) {
let mut dump_index_interval = tokio::time::interval(self.dump_index_interval);
let running = self.running.clone();
let moved_self = self.clone();
common_runtime::spawn_global(async move {
loop {
if !running.load(Ordering::Relaxed) {
info!("shutdown the index collection task");
break;
}
select! {
_ = dump_index_interval.tick() => {
if let Err(err) = moved_self.dump_index().await {
error!(err; "Failed to persist the WAL index");
}
},
}
}
});
}
}
impl Drop for CollectionTask {
fn drop(&mut self) {
self.running.store(false, Ordering::Relaxed);
}
}
impl GlobalIndexCollector {
/// Constructs a [`GlobalIndexCollector`].
///
/// This method initializes a `GlobalIndexCollector` instance and starts a background task
/// for managing WAL (Write-Ahead Logging) indexes.
///
/// The background task persists the WAL index to the specified `path` at every `dump_index_interval`.
pub fn new(
dump_index_interval: Duration,
operator: object_store::ObjectStore,
path: String,
) -> Self {
let providers: Arc<TokioMutex<HashMap<Arc<KafkaProvider>, Sender<WorkerRequest>>>> =
Arc::new(Default::default());
let task = CollectionTask {
providers: providers.clone(),
dump_index_interval,
operator,
path,
running: Arc::new(AtomicBool::new(true)),
};
task.run();
Self { providers, task }
}
}
impl GlobalIndexCollector {
/// Creates a new [`ProviderLevelIndexCollector`] for a specified provider.
pub fn provider_level_index_collector(
pub(crate) async fn provider_level_index_collector(
&self,
provider: Arc<KafkaProvider>,
sender: Sender<WorkerRequest>,
) -> Box<dyn IndexCollector> {
self.providers.lock().await.insert(provider.clone(), sender);
Box::new(ProviderLevelIndexCollector {
indexes: Default::default(),
provider,
})
}
/// Truncates the index for a specific region up to a given [`EntryId`].
///
/// It removes all [`EntryId`]s smaller than `entry_id`.
pub(crate) async fn truncate(
&self,
provider: &Arc<KafkaProvider>,
region_id: RegionId,
entry_id: EntryId,
) -> Result<()> {
if let Some(sender) = self.providers.lock().await.get(provider).cloned() {
if sender
.send(WorkerRequest::TruncateIndex(TruncateIndexRequest::new(
region_id, entry_id,
)))
.await
.is_err()
{
return error::OrderedBatchProducerStoppedSnafu {}.fail();
}
}
Ok(())
}
}
/// The [`RegionIndexes`] struct maintains indexes for a collection of regions.
@@ -83,8 +202,8 @@ impl GlobalIndexCollector {
/// latest [`EntryId`] across all regions.
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct RegionIndexes {
regions: HashMap<RegionId, BTreeSet<EntryId>>,
latest_entry_id: EntryId,
pub(crate) regions: HashMap<RegionId, BTreeSet<EntryId>>,
pub(crate) latest_entry_id: EntryId,
}
impl RegionIndexes {
@@ -145,5 +264,5 @@ impl IndexCollector for NoopCollector {
fn set_latest_entry_id(&mut self, _entry_id: EntryId) {}
fn dump(&mut self, encoder: &dyn IndexEncoder) {}
fn dump(&mut self, _encoder: &dyn IndexEncoder) {}
}

View File

@@ -0,0 +1,182 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{BTreeSet, HashMap};
use std::sync::Mutex;
use delta_encoding::{DeltaDecoderExt, DeltaEncoderExt};
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use store_api::logstore::provider::KafkaProvider;
use store_api::storage::RegionId;
use crate::error::{self, Result};
use crate::kafka::index::collector::RegionIndexes;
/// Converts a [`RegionIndexes`] instance into a [`DeltaEncodedRegionIndexes`].
///
/// This conversion encodes the index values using delta encoding to reduce storage space.
impl From<&RegionIndexes> for DeltaEncodedRegionIndexes {
fn from(value: &RegionIndexes) -> Self {
let mut regions = HashMap::with_capacity(value.regions.len());
for (region_id, indexes) in value.regions.iter() {
let indexes = indexes.iter().copied().deltas().collect();
regions.insert(*region_id, indexes);
}
Self {
regions,
last_index: value.latest_entry_id,
}
}
}
/// Represents the delta-encoded version of region indexes for efficient storage.
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct DeltaEncodedRegionIndexes {
regions: HashMap<RegionId, Vec<u64>>,
last_index: u64,
}
impl DeltaEncodedRegionIndexes {
/// Retrieves the original (decoded) index values for a given region.
fn region(&self, region_id: RegionId) -> Option<BTreeSet<u64>> {
let decoded = self
.regions
.get(&region_id)
.map(|delta| delta.iter().copied().original().collect::<BTreeSet<_>>());
decoded
}
/// Retrieves the last index.
fn last_index(&self) -> u64 {
self.last_index
}
}
pub trait IndexEncoder: Send + Sync {
fn encode(&self, provider: &KafkaProvider, region_index: &RegionIndexes);
fn finish(&self) -> Result<Vec<u8>>;
}
/// [`DatanodeWalIndexes`] structure holds the WAL indexes for a datanode.
#[derive(Debug, Default, Serialize, Deserialize)]
pub(crate) struct DatanodeWalIndexes(HashMap<String, DeltaEncodedRegionIndexes>);
impl DatanodeWalIndexes {
fn insert(&mut self, topic: String, region_index: &RegionIndexes) {
self.0.insert(topic, region_index.into());
}
fn encode(&mut self) -> Result<Vec<u8>> {
let value = serde_json::to_vec(&self.0).context(error::EncodeJsonSnafu);
self.0.clear();
value
}
fn decode(byte: &[u8]) -> Result<Self> {
serde_json::from_slice(byte).context(error::DecodeJsonSnafu)
}
/// Retrieves the delta encoded region indexes for a given `provider`.
pub(crate) fn provider(&self, provider: &KafkaProvider) -> Option<&DeltaEncodedRegionIndexes> {
self.0.get(&provider.topic)
}
}
/// [`JsonIndexEncoder`] encodes the [`RegionIndexes`]s into JSON format.
#[derive(Debug, Default)]
pub(crate) struct JsonIndexEncoder {
buf: Mutex<DatanodeWalIndexes>,
}
impl IndexEncoder for JsonIndexEncoder {
fn encode(&self, provider: &KafkaProvider, region_index: &RegionIndexes) {
self.buf
.lock()
.unwrap()
.insert(provider.topic.to_string(), region_index);
}
fn finish(&self) -> Result<Vec<u8>> {
let mut buf = self.buf.lock().unwrap();
buf.encode()
}
}
#[cfg(test)]
mod tests {
use std::collections::{BTreeSet, HashMap, HashSet};
use store_api::logstore::provider::KafkaProvider;
use store_api::storage::RegionId;
use super::{DatanodeWalIndexes, IndexEncoder, JsonIndexEncoder};
use crate::kafka::index::collector::RegionIndexes;
#[test]
fn test_json_index_encoder() {
let encoder = JsonIndexEncoder::default();
let topic_1 = KafkaProvider::new("my_topic_1".to_string());
let region_1_indexes = BTreeSet::from([1u64, 2, 4, 5, 20]);
let region_2_indexes = BTreeSet::from([4u64, 12, 43, 54, 75]);
encoder.encode(
&topic_1,
&RegionIndexes {
regions: HashMap::from([
(RegionId::new(1, 1), region_1_indexes.clone()),
(RegionId::new(1, 2), region_2_indexes.clone()),
]),
latest_entry_id: 1024,
},
);
let topic_2 = KafkaProvider::new("my_topic_2".to_string());
encoder.encode(
&topic_2,
&RegionIndexes {
regions: HashMap::from([
(
RegionId::new(1, 1),
BTreeSet::from([1024u64, 1025, 1026, 1028, 2048]),
),
(RegionId::new(1, 2), BTreeSet::from([1512])),
]),
latest_entry_id: 2048,
},
);
let bytes = encoder.finish().unwrap();
let datanode_index = DatanodeWalIndexes::decode(&bytes).unwrap();
assert_eq!(
datanode_index
.provider(&topic_1)
.unwrap()
.region(RegionId::new(1, 1))
.unwrap(),
region_1_indexes,
);
assert_eq!(
datanode_index
.provider(&topic_1)
.unwrap()
.region(RegionId::new(1, 2))
.unwrap(),
region_2_indexes,
);
assert!(datanode_index
.provider(&KafkaProvider::new("my_topic_3".to_string()))
.is_none());
}
}

View File

@@ -12,14 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::cmp::{max, min};
use std::cmp::min;
use std::collections::VecDeque;
use std::iter::Peekable;
use std::marker::PhantomData;
use std::ops::{Add, Mul, Range, Sub};
use std::ops::Range;
use chrono::format::Item;
use itertools::Itertools;
use store_api::logstore::EntryId;
use crate::kafka::util::range::{ConvertIndexToRange, MergeRange};
@@ -197,7 +193,7 @@ mod tests {
#[test]
fn test_region_wal_range() {
let mut range = RegionWalRange::new(0..1024, 1024);
let range = RegionWalRange::new(0..1024, 1024);
assert_eq!(
range.next_batch_hint(10),
Some(NextBatchHint {

View File

@@ -32,6 +32,7 @@ use store_api::storage::RegionId;
use crate::error::{self, ConsumeRecordSnafu, Error, GetOffsetSnafu, InvalidProviderSnafu, Result};
use crate::kafka::client_manager::{ClientManager, ClientManagerRef};
use crate::kafka::index::GlobalIndexCollector;
use crate::kafka::producer::OrderedBatchProducerRef;
use crate::kafka::util::record::{
convert_to_kafka_records, maybe_emit_entry, remaining_entries, Record, ESTIMATED_META_SIZE,
@@ -51,8 +52,12 @@ pub struct KafkaLogStore {
impl KafkaLogStore {
/// Tries to create a Kafka log store.
pub async fn try_new(config: &DatanodeKafkaConfig) -> Result<Self> {
let client_manager = Arc::new(ClientManager::try_new(config).await?);
pub async fn try_new(
config: &DatanodeKafkaConfig,
global_index_collector: Option<GlobalIndexCollector>,
) -> Result<Self> {
let client_manager =
Arc::new(ClientManager::try_new(config, global_index_collector).await?);
Ok(Self {
client_manager,
@@ -329,7 +334,21 @@ impl LogStore for KafkaLogStore {
/// Marks all entries with ids `<=entry_id` of the given `namespace` as obsolete,
/// so that the log store can safely delete those entries. This method does not guarantee
/// that the obsolete entries are deleted immediately.
async fn obsolete(&self, _provider: &Provider, _entry_id: EntryId) -> Result<()> {
async fn obsolete(
&self,
provider: &Provider,
region_id: RegionId,
entry_id: EntryId,
) -> Result<()> {
if let Some(collector) = self.client_manager.global_index_collector() {
let provider = provider
.as_kafka_provider()
.with_context(|| InvalidProviderSnafu {
expected: KafkaProvider::type_name(),
actual: provider.type_name(),
})?;
collector.truncate(provider, region_id, entry_id).await?;
}
Ok(())
}
@@ -468,7 +487,7 @@ mod tests {
max_batch_bytes: ReadableSize::kb(32),
..Default::default()
};
let logstore = KafkaLogStore::try_new(&config).await.unwrap();
let logstore = KafkaLogStore::try_new(&config, None).await.unwrap();
let topic_name = uuid::Uuid::new_v4().to_string();
let provider = Provider::kafka_provider(topic_name);
let region_entries = (0..5)
@@ -540,7 +559,7 @@ mod tests {
max_batch_bytes: ReadableSize::kb(8),
..Default::default()
};
let logstore = KafkaLogStore::try_new(&config).await.unwrap();
let logstore = KafkaLogStore::try_new(&config, None).await.unwrap();
let topic_name = uuid::Uuid::new_v4().to_string();
let provider = Provider::kafka_provider(topic_name);
let region_entries = (0..5)

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub(crate) mod checkpoint;
pub(crate) mod dump_index;
pub(crate) mod flush;
pub(crate) mod produce;
@@ -29,14 +29,12 @@ use store_api::storage::RegionId;
use tokio::sync::mpsc::Receiver;
use tokio::sync::oneshot::{self};
use super::index::IndexEncoder;
use crate::error::{self, NoMaxValueSnafu, Result};
use crate::kafka::index::IndexCollector;
use crate::kafka::index::{IndexCollector, IndexEncoder};
use crate::kafka::producer::ProducerClient;
pub(crate) enum WorkerRequest {
Produce(ProduceRequest),
Checkpoint,
TruncateIndex(TruncateIndexRequest),
DumpIndex(DumpIndexRequest),
}
@@ -82,6 +80,15 @@ pub(crate) struct TruncateIndexRequest {
entry_id: EntryId,
}
impl TruncateIndexRequest {
pub fn new(region_id: RegionId, entry_id: EntryId) -> Self {
Self {
region_id,
entry_id,
}
}
}
pub(crate) struct ProduceRequest {
region_id: RegionId,
batch: Vec<Record>,
@@ -179,27 +186,18 @@ impl BackgroundProducerWorker {
async fn handle_requests(&mut self, buffer: &mut Vec<WorkerRequest>) {
let mut produce_requests = Vec::with_capacity(buffer.len());
let mut do_checkpoint = false;
for req in buffer.drain(..) {
match req {
WorkerRequest::Produce(req) => produce_requests.push(req),
WorkerRequest::Checkpoint => do_checkpoint = true,
WorkerRequest::TruncateIndex(TruncateIndexRequest {
region_id,
entry_id,
}) => self.index_collector.truncate(region_id, entry_id),
WorkerRequest::DumpIndex(req) => {
self.index_collector.dump(req.encoder.as_ref());
let _ = req.sender.send(());
}
WorkerRequest::DumpIndex(req) => self.dump_index(req).await,
}
}
let pending_requests = self.aggregate_records(&mut produce_requests, self.max_batch_bytes);
self.try_flush_pending_requests(pending_requests).await;
if do_checkpoint {
self.do_checkpoint().await;
}
}
}

View File

@@ -16,11 +16,12 @@ use common_telemetry::error;
use rskafka::client::partition::OffsetAt;
use snafu::ResultExt;
use super::DumpIndexRequest;
use crate::error;
use crate::kafka::worker::BackgroundProducerWorker;
impl BackgroundProducerWorker {
pub(crate) async fn do_checkpoint(&mut self) {
pub(crate) async fn dump_index(&mut self, req: DumpIndexRequest) {
match self
.client
.get_offset(OffsetAt::Latest)
@@ -28,7 +29,11 @@ impl BackgroundProducerWorker {
.context(error::GetOffsetSnafu {
topic: &self.provider.topic,
}) {
Ok(offset) => self.index_collector.set_latest_entry_id(offset as u64),
Ok(offset) => {
self.index_collector.set_latest_entry_id(offset as u64);
self.index_collector.dump(req.encoder.as_ref());
let _ = req.sender.send(());
}
Err(err) => error!(err; "Failed to do checkpoint"),
}
}

View File

@@ -417,7 +417,12 @@ impl LogStore for RaftEngineLogStore {
}))
}
async fn obsolete(&self, provider: &Provider, entry_id: EntryId) -> Result<()> {
async fn obsolete(
&self,
provider: &Provider,
_region_id: RegionId,
entry_id: EntryId,
) -> Result<()> {
let ns = provider
.as_raft_engine_provider()
.with_context(|| InvalidProviderSnafu {
@@ -637,7 +642,8 @@ mod tests {
let dir = create_temp_dir("raft-engine-logstore-test");
let logstore = new_test_log_store(&dir).await;
let namespace_id = 42;
let region_id = RegionId::new(1, 1);
let namespace_id = region_id.as_u64();
let namespace = Provider::raft_engine_provider(namespace_id);
for id in 0..4096 {
let entry = EntryImpl::create(id, namespace_id, [b'x'; 4096].to_vec()).into();
@@ -645,7 +651,10 @@ mod tests {
}
let before_purge = wal_dir_usage(dir.path().to_str().unwrap()).await;
logstore.obsolete(&namespace, 4000).await.unwrap();
logstore
.obsolete(&namespace, region_id, 4000)
.await
.unwrap();
tokio::time::sleep(Duration::from_secs(6)).await;
let after_purge = wal_dir_usage(dir.path().to_str().unwrap()).await;
@@ -662,14 +671,15 @@ mod tests {
let dir = create_temp_dir("raft-engine-logstore-test");
let logstore = new_test_log_store(&dir).await;
let namespace_id = 42;
let region_id = RegionId::new(1, 1);
let namespace_id = region_id.as_u64();
let namespace = Provider::raft_engine_provider(namespace_id);
for id in 0..1024 {
let entry = EntryImpl::create(id, namespace_id, [b'x'; 4096].to_vec()).into();
let _ = logstore.append(entry).await.unwrap();
}
logstore.obsolete(&namespace, 100).await.unwrap();
logstore.obsolete(&namespace, region_id, 100).await.unwrap();
assert_eq!(101, logstore.engine.first_index(namespace_id).unwrap());
let res = logstore.read(&namespace, 100).await.unwrap();

View File

@@ -34,13 +34,16 @@ pub async fn create_tmp_local_file_log_store<P: AsRef<Path>>(path: P) -> RaftEng
/// Create a [KafkaLogStore].
pub async fn create_kafka_log_store(broker_endpoints: Vec<String>) -> KafkaLogStore {
KafkaLogStore::try_new(&DatanodeKafkaConfig {
connection: KafkaConnectionConfig {
broker_endpoints,
KafkaLogStore::try_new(
&DatanodeKafkaConfig {
connection: KafkaConnectionConfig {
broker_endpoints,
..Default::default()
},
..Default::default()
},
..Default::default()
})
None,
)
.await
.unwrap()
}

View File

@@ -89,7 +89,7 @@ impl<S: LogStore> Wal<S> {
move |region_id, last_entry_id, provider| -> BoxFuture<'_, Result<()>> {
Box::pin(async move {
store
.obsolete(provider, last_entry_id)
.obsolete(provider, region_id, last_entry_id)
.await
.map_err(BoxedError::new)
.context(DeleteWalSnafu { region_id })
@@ -142,7 +142,7 @@ impl<S: LogStore> Wal<S> {
provider: &Provider,
) -> Result<()> {
self.store
.obsolete(provider, last_id)
.obsolete(provider, region_id, last_id)
.await
.map_err(BoxedError::new)
.context(DeleteWalSnafu { region_id })

View File

@@ -168,6 +168,7 @@ mod tests {
async fn obsolete(
&self,
_provider: &Provider,
_region_id: RegionId,
_entry_id: EntryId,
) -> Result<(), Self::Error> {
unreachable!()

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub use opendal::raw::{normalize_path as raw_normalize_path, HttpClient};
pub use opendal::raw::{normalize_path as raw_normalize_path, Access, HttpClient};
pub use opendal::{
services, Builder as ObjectStoreBuilder, Entry, EntryMode, Error, ErrorKind,
FuturesAsyncReader, FuturesAsyncWriter, Lister, Metakey, Operator as ObjectStore, Reader,

View File

@@ -62,7 +62,12 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug {
/// Marks all entries with ids `<=entry_id` of the given `namespace` as obsolete,
/// so that the log store can safely delete those entries. This method does not guarantee
/// that the obsolete entries are deleted immediately.
async fn obsolete(&self, provider: &Provider, entry_id: EntryId) -> Result<(), Self::Error>;
async fn obsolete(
&self,
provider: &Provider,
region_id: RegionId,
entry_id: EntryId,
) -> Result<(), Self::Error>;
/// Makes an entry instance of the associated Entry type
fn entry(