feat(log_store): introduce the IndexCollector (#4461)

* feat: introduce the IndexCollector

* refactor: separate BackgroundProducerWorker code into files

* feat: introduce index related operations

* feat: introduce the `GlobalIndexCollector`

* refactor: move collector to index mod

* refactor: refactor `GlobalIndexCollector`

* chore: remove unused collector.rs

* chore: add comments

* chore: add comments

* chore: apply suggestions from CR

* chore: apply suggestions from CR
This commit is contained in:
Weny Xu
2024-08-13 14:15:24 +08:00
committed by GitHub
parent 2e2eacf3b2
commit c821d21111
13 changed files with 696 additions and 268 deletions

1
Cargo.lock generated
View File

@@ -5789,6 +5789,7 @@ version = "0.9.1"
dependencies = [
"async-stream",
"async-trait",
"bytes",
"chrono",
"common-base",
"common-error",

View File

@@ -15,6 +15,7 @@ workspace = true
[dependencies]
async-stream.workspace = true
async-trait.workspace = true
bytes.workspace = true
chrono.workspace = true
common-base.workspace = true
common-error.workspace = true

View File

@@ -21,8 +21,6 @@ use serde_json::error::Error as JsonError;
use snafu::{Location, Snafu};
use store_api::storage::RegionId;
use crate::kafka::producer::ProduceRequest;
#[derive(Snafu)]
#[snafu(visibility(pub))]
#[stack_trace_debug]
@@ -268,12 +266,10 @@ pub enum Error {
attempt_index: u64,
},
#[snafu(display("Failed to send produce request"))]
SendProduceRequest {
#[snafu(display("OrderedBatchProducer is stopped",))]
OrderedBatchProducerStopped {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: tokio::sync::mpsc::error::SendError<ProduceRequest>,
},
#[snafu(display("Failed to send produce request"))]

View File

@@ -13,12 +13,17 @@
// limitations under the License.
pub(crate) mod client_manager;
// TODO(weny): remove it
#[allow(dead_code)]
pub(crate) mod consumer;
#[allow(unused)]
pub(crate) mod index;
pub mod log_store;
pub(crate) mod producer;
pub(crate) mod util;
// TODO(weny): remove it
#[allow(dead_code)]
pub(crate) mod worker;
use serde::{Deserialize, Serialize};
use store_api::logstore::entry::Id as EntryId;

View File

@@ -23,19 +23,16 @@ use snafu::ResultExt;
use store_api::logstore::provider::KafkaProvider;
use tokio::sync::{Mutex, RwLock};
use super::producer::OrderedBatchProducer;
use crate::error::{
BuildClientSnafu, BuildPartitionClientSnafu, ResolveKafkaEndpointSnafu, Result, TlsConfigSnafu,
};
use crate::kafka::producer::OrderedBatchProducerRef;
use crate::kafka::index::{GlobalIndexCollector, NoopCollector};
use crate::kafka::producer::{OrderedBatchProducer, OrderedBatchProducerRef};
// Each topic only has one partition for now.
// The `DEFAULT_PARTITION` refers to the index of the partition.
const DEFAULT_PARTITION: i32 = 0;
// Max batch size for a `OrderedBatchProducer` to handle requests.
const REQUEST_BATCH_SIZE: usize = 64;
/// Arc wrapper of ClientManager.
pub(crate) type ClientManagerRef = Arc<ClientManager>;
@@ -63,9 +60,8 @@ pub(crate) struct ClientManager {
/// Used to initialize a new [Client].
mutex: Mutex<()>,
instances: RwLock<HashMap<Arc<KafkaProvider>, Client>>,
global_index_collector: Option<GlobalIndexCollector>,
producer_channel_size: usize,
producer_request_batch_size: usize,
flush_batch_size: usize,
compression: Compression,
}
@@ -99,10 +95,9 @@ impl ClientManager {
client,
mutex: Mutex::new(()),
instances: RwLock::new(HashMap::new()),
producer_channel_size: REQUEST_BATCH_SIZE * 2,
producer_request_batch_size: REQUEST_BATCH_SIZE,
flush_batch_size: config.max_batch_bytes.as_bytes() as usize,
compression: Compression::Lz4,
global_index_collector: None,
})
}
@@ -151,12 +146,19 @@ impl ClientManager {
})
.map(Arc::new)?;
let (tx, rx) = OrderedBatchProducer::channel();
let index_collector = if let Some(global_collector) = self.global_index_collector.as_ref() {
global_collector.provider_level_index_collector(provider.clone(), tx.clone())
} else {
Box::new(NoopCollector)
};
let producer = Arc::new(OrderedBatchProducer::new(
(tx, rx),
provider.clone(),
client.clone(),
self.compression,
self.producer_channel_size,
self.producer_request_batch_size,
self.flush_batch_size,
index_collector,
));
Ok(Client { client, producer })

View File

@@ -12,8 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod collector;
mod iterator;
pub(crate) use collector::{
GlobalIndexCollector, IndexCollector, IndexEncoder, NoopCollector, ProviderLevelIndexCollector,
};
pub(crate) use iterator::{
MultipleRegionWalIndexIterator, NextBatchHint, RegionWalIndexIterator, RegionWalRange,
RegionWalVecIndex,

View File

@@ -0,0 +1,149 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{BTreeSet, HashMap};
use std::io::Write;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use bytes::buf::Writer;
use bytes::{BufMut, Bytes, BytesMut};
use common_telemetry::tracing::error;
use futures::future::try_join_all;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use store_api::logstore::provider::KafkaProvider;
use store_api::logstore::EntryId;
use store_api::storage::RegionId;
use tokio::select;
use tokio::sync::mpsc::Sender;
use tokio::sync::Mutex as TokioMutex;
use crate::error::{self, Result};
use crate::kafka::worker::{DumpIndexRequest, WorkerRequest};
pub trait IndexEncoder: Send + Sync {
fn encode(&self, provider: &KafkaProvider, region_index: &RegionIndexes);
fn finish(&self) -> Result<Vec<u8>>;
}
/// The [`IndexCollector`] trait defines the operations for managing and collecting index entries.
pub trait IndexCollector: Send + Sync {
/// Appends an [`EntryId`] for a specific region.
fn append(&mut self, region_id: RegionId, entry_id: EntryId);
/// Truncates the index for a specific region up to a given [`EntryId`].
///
/// It removes all [`EntryId`]s smaller than `entry_id`.
fn truncate(&mut self, region_id: RegionId, entry_id: EntryId);
/// Sets the latest [`EntryId`].
fn set_latest_entry_id(&mut self, entry_id: EntryId);
/// Dumps the index.
fn dump(&mut self, encoder: &dyn IndexEncoder);
}
/// The [`GlobalIndexCollector`] struct is responsible for managing index entries
/// across multiple providers.
#[derive(Debug, Clone, Default)]
pub struct GlobalIndexCollector {
providers: Arc<TokioMutex<HashMap<Arc<KafkaProvider>, Sender<WorkerRequest>>>>,
}
impl GlobalIndexCollector {
/// Creates a new [`ProviderLevelIndexCollector`] for a specified provider.
pub fn provider_level_index_collector(
&self,
provider: Arc<KafkaProvider>,
sender: Sender<WorkerRequest>,
) -> Box<dyn IndexCollector> {
Box::new(ProviderLevelIndexCollector {
indexes: Default::default(),
provider,
})
}
}
/// The [`RegionIndexes`] struct maintains indexes for a collection of regions.
/// Each region is identified by a `RegionId` and maps to a set of [`EntryId`]s,
/// representing the entries within that region. It also keeps track of the
/// latest [`EntryId`] across all regions.
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct RegionIndexes {
regions: HashMap<RegionId, BTreeSet<EntryId>>,
latest_entry_id: EntryId,
}
impl RegionIndexes {
fn append(&mut self, region_id: RegionId, entry_id: EntryId) {
self.regions.entry(region_id).or_default().insert(entry_id);
self.latest_entry_id = self.latest_entry_id.max(entry_id);
}
fn truncate(&mut self, region_id: RegionId, entry_id: EntryId) {
if let Some(entry_ids) = self.regions.get_mut(&region_id) {
*entry_ids = entry_ids.split_off(&entry_id);
// The `RegionIndexes` can be empty, keeps to track the latest entry id.
self.latest_entry_id = self.latest_entry_id.max(entry_id);
}
}
fn set_latest_entry_id(&mut self, entry_id: EntryId) {
self.latest_entry_id = entry_id;
}
}
/// The [`ProviderLevelIndexCollector`] struct is responsible for managing index entries
/// specific to a particular provider.
#[derive(Debug, Clone)]
pub struct ProviderLevelIndexCollector {
indexes: RegionIndexes,
provider: Arc<KafkaProvider>,
}
impl IndexCollector for ProviderLevelIndexCollector {
fn append(&mut self, region_id: RegionId, entry_id: EntryId) {
self.indexes.append(region_id, entry_id)
}
fn truncate(&mut self, region_id: RegionId, entry_id: EntryId) {
self.indexes.truncate(region_id, entry_id)
}
fn set_latest_entry_id(&mut self, entry_id: EntryId) {
self.indexes.set_latest_entry_id(entry_id);
}
fn dump(&mut self, encoder: &dyn IndexEncoder) {
encoder.encode(&self.provider, &self.indexes)
}
}
/// The [`NoopCollector`] struct implements the [`IndexCollector`] trait with no-op methods.
///
/// This collector effectively ignores all operations, making it suitable for cases
/// where index collection is not required or should be disabled.
pub struct NoopCollector;
impl IndexCollector for NoopCollector {
fn append(&mut self, _region_id: RegionId, _entry_id: EntryId) {}
fn truncate(&mut self, _region_id: RegionId, _entry_id: EntryId) {}
fn set_latest_entry_id(&mut self, _entry_id: EntryId) {}
fn dump(&mut self, encoder: &dyn IndexEncoder) {}
}

View File

@@ -178,7 +178,8 @@ impl LogStore for KafkaLogStore {
for (region_id, (producer, records)) in region_grouped_records {
// Safety: `KafkaLogStore::entry` will ensure that the
// `Record`'s `approximate_size` must be less or equal to `max_batch_bytes`.
region_grouped_result_receivers.push((region_id, producer.produce(records).await?))
region_grouped_result_receivers
.push((region_id, producer.produce(region_id, records).await?))
}
let region_grouped_max_offset =

View File

@@ -12,230 +12,58 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use common_telemetry::{debug, warn};
use futures::future::try_join_all;
use rskafka::client::partition::Compression;
use rskafka::client::producer::ProducerClient;
use common_telemetry::warn;
use rskafka::client::partition::{Compression, OffsetAt, PartitionClient};
use rskafka::record::Record;
use snafu::{OptionExt, ResultExt};
use store_api::logstore::provider::KafkaProvider;
use store_api::storage::RegionId;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::sync::oneshot;
use crate::error::{self, NoMaxValueSnafu, Result};
pub struct ProduceRequest {
batch: Vec<Record>,
sender: oneshot::Sender<ProduceResultReceiver>,
}
#[derive(Default)]
struct ProduceResultReceiver {
receivers: Vec<oneshot::Receiver<Result<Vec<i64>>>>,
}
impl ProduceResultReceiver {
fn add_receiver(&mut self, receiver: oneshot::Receiver<Result<Vec<i64>>>) {
self.receivers.push(receiver)
}
async fn wait(self) -> Result<u64> {
Ok(try_join_all(self.receivers)
.await
.into_iter()
.flatten()
.collect::<Result<Vec<_>>>()?
.into_iter()
.flatten()
.max()
.context(NoMaxValueSnafu)? as u64)
}
}
struct BackgroundProducerWorker {
/// The [`ProducerClient`].
client: Arc<dyn ProducerClient>,
// The compression configuration.
compression: Compression,
// The running flag.
running: Arc<AtomicBool>,
/// Receiver of [ProduceRequest].
receiver: Receiver<ProduceRequest>,
/// Max batch size for a worker to handle requests.
request_batch_size: usize,
/// Max bytes size for a single flush.
max_batch_bytes: usize,
/// The [PendingRequest]s.
pending_requests: Vec<PendingRequest>,
}
struct PendingRequest {
batch: Vec<Record>,
size: usize,
sender: oneshot::Sender<Result<Vec<i64>>>,
}
/// ## Panic
/// Panic if any [Record]'s `approximate_size` > `max_batch_bytes`.
fn handle_produce_requests(
requests: &mut Vec<ProduceRequest>,
max_batch_bytes: usize,
) -> Vec<PendingRequest> {
let mut records_buffer = vec![];
let mut batch_size = 0;
let mut pending_requests = Vec::with_capacity(requests.len());
for ProduceRequest { batch, sender } in requests.drain(..) {
let mut receiver = ProduceResultReceiver::default();
for record in batch {
assert!(record.approximate_size() <= max_batch_bytes);
// Yields the `PendingRequest` if buffer is full.
if batch_size + record.approximate_size() > max_batch_bytes {
let (tx, rx) = oneshot::channel();
pending_requests.push(PendingRequest {
batch: std::mem::take(&mut records_buffer),
size: batch_size,
sender: tx,
});
batch_size = 0;
receiver.add_receiver(rx);
}
batch_size += record.approximate_size();
records_buffer.push(record);
}
// The remaining records.
if batch_size > 0 {
// Yields `PendingRequest`
let (tx, rx) = oneshot::channel();
pending_requests.push(PendingRequest {
batch: std::mem::take(&mut records_buffer),
size: batch_size,
sender: tx,
});
batch_size = 0;
receiver.add_receiver(rx);
}
let _ = sender.send(receiver);
}
pending_requests
}
async fn do_flush(
client: &Arc<dyn ProducerClient>,
PendingRequest {
batch,
sender,
size: _size,
}: PendingRequest,
compression: Compression,
) {
let result = client
.produce(batch, compression)
.await
.context(error::BatchProduceSnafu);
if let Err(err) = sender.send(result) {
warn!(err; "BatchFlushState Receiver is dropped");
}
}
impl BackgroundProducerWorker {
async fn run(&mut self) {
let mut buffer = Vec::with_capacity(self.request_batch_size);
while self.running.load(Ordering::Relaxed) {
// Processes pending requests first.
if !self.pending_requests.is_empty() {
// TODO(weny): Considering merge `PendingRequest`s.
for req in self.pending_requests.drain(..) {
do_flush(&self.client, req, self.compression).await
}
}
match self.receiver.recv().await {
Some(req) => {
buffer.clear();
buffer.push(req);
for _ in 1..self.request_batch_size {
match self.receiver.try_recv() {
Ok(req) => buffer.push(req),
Err(_) => break,
}
}
self.pending_requests =
handle_produce_requests(&mut buffer, self.max_batch_bytes);
}
None => {
debug!("The sender is dropped, BackgroundProducerWorker exited");
// Exits the loop if the `sender` is dropped.
break;
}
}
}
}
}
use crate::error::{self, Result};
use crate::kafka::index::IndexCollector;
use crate::kafka::worker::{BackgroundProducerWorker, ProduceResultHandle, WorkerRequest};
pub type OrderedBatchProducerRef = Arc<OrderedBatchProducer>;
// Max batch size for a `OrderedBatchProducer` to handle requests.
const REQUEST_BATCH_SIZE: usize = 64;
// Producer channel size
const PRODUCER_CHANNEL_SIZE: usize = REQUEST_BATCH_SIZE * 2;
/// [`OrderedBatchProducer`] attempts to aggregate multiple produce requests together
#[derive(Debug)]
pub(crate) struct OrderedBatchProducer {
sender: Sender<ProduceRequest>,
/// Used to control the [`BackgroundProducerWorker`].
running: Arc<AtomicBool>,
}
impl Drop for OrderedBatchProducer {
fn drop(&mut self) {
self.running.store(false, Ordering::Relaxed);
}
}
/// Receives the committed offsets when data has been committed to Kafka
/// or an unrecoverable error has been encountered.
pub(crate) struct ProduceResultHandle {
receiver: oneshot::Receiver<ProduceResultReceiver>,
}
impl ProduceResultHandle {
/// Waits for the data has been committed to Kafka.
/// Returns the **max** committed offsets.
pub(crate) async fn wait(self) -> Result<u64> {
self.receiver
.await
.context(error::WaitProduceResultReceiverSnafu)?
.wait()
.await
}
pub(crate) sender: Sender<WorkerRequest>,
}
impl OrderedBatchProducer {
pub(crate) fn channel() -> (Sender<WorkerRequest>, Receiver<WorkerRequest>) {
mpsc::channel(PRODUCER_CHANNEL_SIZE)
}
/// Constructs a new [`OrderedBatchProducer`].
pub(crate) fn new(
(tx, rx): (Sender<WorkerRequest>, Receiver<WorkerRequest>),
provider: Arc<KafkaProvider>,
client: Arc<dyn ProducerClient>,
compression: Compression,
channel_size: usize,
request_batch_size: usize,
max_batch_bytes: usize,
index_collector: Box<dyn IndexCollector>,
) -> Self {
let (tx, rx) = mpsc::channel(channel_size);
let running = Arc::new(AtomicBool::new(true));
let mut worker = BackgroundProducerWorker {
provider,
client,
compression,
running: running.clone(),
receiver: rx,
request_batch_size,
request_batch_size: REQUEST_BATCH_SIZE,
max_batch_bytes,
pending_requests: vec![],
index_collector,
};
tokio::spawn(async move { worker.run().await });
Self {
sender: tx,
running,
}
Self { sender: tx }
}
/// Writes `data` to the [`OrderedBatchProducer`].
@@ -245,17 +73,44 @@ impl OrderedBatchProducer {
///
/// ## Panic
/// Panic if any [Record]'s `approximate_size` > `max_batch_bytes`.
pub(crate) async fn produce(&self, batch: Vec<Record>) -> Result<ProduceResultHandle> {
let receiver = {
let (tx, rx) = oneshot::channel();
self.sender
.send(ProduceRequest { batch, sender: tx })
.await
.context(error::SendProduceRequestSnafu)?;
rx
};
pub(crate) async fn produce(
&self,
region_id: RegionId,
batch: Vec<Record>,
) -> Result<ProduceResultHandle> {
let (req, handle) = WorkerRequest::new_produce_request(region_id, batch);
if self.sender.send(req).await.is_err() {
warn!("OrderedBatchProducer is already exited");
return error::OrderedBatchProducerStoppedSnafu {}.fail();
}
Ok(ProduceResultHandle { receiver })
Ok(handle)
}
}
#[async_trait::async_trait]
pub trait ProducerClient: std::fmt::Debug + Send + Sync {
async fn produce(
&self,
records: Vec<Record>,
compression: Compression,
) -> rskafka::client::error::Result<Vec<i64>>;
async fn get_offset(&self, at: OffsetAt) -> rskafka::client::error::Result<i64>;
}
#[async_trait::async_trait]
impl ProducerClient for PartitionClient {
async fn produce(
&self,
records: Vec<Record>,
compression: Compression,
) -> rskafka::client::error::Result<Vec<i64>> {
self.produce(records, compression).await
}
async fn get_offset(&self, at: OffsetAt) -> rskafka::client::error::Result<i64> {
self.get_offset(at).await
}
}
@@ -267,15 +122,16 @@ mod tests {
use chrono::{TimeZone, Utc};
use common_base::readable_size::ReadableSize;
use common_telemetry::debug;
use futures::future::BoxFuture;
use futures::stream::FuturesUnordered;
use futures::{FutureExt, StreamExt};
use rskafka::client::error::{Error as ClientError, RequestContext};
use rskafka::client::partition::Compression;
use rskafka::client::producer::ProducerClient;
use rskafka::protocol::error::Error as ProtocolError;
use rskafka::record::Record;
use store_api::storage::RegionId;
use super::*;
use crate::kafka::index::NoopCollector;
use crate::kafka::producer::OrderedBatchProducer;
#[derive(Debug)]
@@ -286,38 +142,41 @@ mod tests {
batch_sizes: Mutex<Vec<usize>>,
}
#[async_trait::async_trait]
impl ProducerClient for MockClient {
fn produce(
async fn produce(
&self,
records: Vec<Record>,
_compression: Compression,
) -> BoxFuture<'_, Result<Vec<i64>, ClientError>> {
Box::pin(async move {
tokio::time::sleep(self.delay).await;
) -> rskafka::client::error::Result<Vec<i64>> {
tokio::time::sleep(self.delay).await;
if let Some(e) = self.error {
return Err(ClientError::ServerError {
protocol_error: e,
error_message: None,
request: RequestContext::Partition("foo".into(), 1),
response: None,
is_virtual: false,
});
}
if let Some(e) = self.error {
return Err(ClientError::ServerError {
protocol_error: e,
error_message: None,
request: RequestContext::Partition("foo".into(), 1),
response: None,
is_virtual: false,
});
}
if let Some(p) = self.panic.as_ref() {
panic!("{}", p);
}
if let Some(p) = self.panic.as_ref() {
panic!("{}", p);
}
let mut batch_sizes = self.batch_sizes.lock().unwrap();
let offset_base = batch_sizes.iter().sum::<usize>();
let offsets = (0..records.len())
.map(|x| (x + offset_base) as i64)
.collect();
batch_sizes.push(records.len());
debug!("Return offsets: {offsets:?}");
Ok(offsets)
})
let mut batch_sizes = self.batch_sizes.lock().unwrap();
let offset_base = batch_sizes.iter().sum::<usize>();
let offsets = (0..records.len())
.map(|x| (x + offset_base) as i64)
.collect();
batch_sizes.push(records.len());
debug!("Return offsets: {offsets:?}");
Ok(offsets)
}
async fn get_offset(&self, _at: OffsetAt) -> rskafka::client::error::Result<i64> {
todo!()
}
}
@@ -341,18 +200,23 @@ mod tests {
delay,
batch_sizes: Default::default(),
});
let provider = Arc::new(KafkaProvider::new(String::new()));
let producer = OrderedBatchProducer::new(
OrderedBatchProducer::channel(),
provider,
client.clone(),
Compression::NoCompression,
128,
64,
ReadableSize((record.approximate_size() * 2) as u64).as_bytes() as usize,
Box::new(NoopCollector),
);
let region_id = RegionId::new(1, 1);
// Produces 3 records
let handle = producer
.produce(vec![record.clone(), record.clone(), record.clone()])
.produce(
region_id,
vec![record.clone(), record.clone(), record.clone()],
)
.await
.unwrap();
assert_eq!(handle.wait().await.unwrap(), 2);
@@ -360,14 +224,17 @@ mod tests {
// Produces 2 records
let handle = producer
.produce(vec![record.clone(), record.clone()])
.produce(region_id, vec![record.clone(), record.clone()])
.await
.unwrap();
assert_eq!(handle.wait().await.unwrap(), 4);
assert_eq!(client.batch_sizes.lock().unwrap().as_slice(), &[2, 1, 2]);
// Produces 1 records
let handle = producer.produce(vec![record.clone()]).await.unwrap();
let handle = producer
.produce(region_id, vec![record.clone()])
.await
.unwrap();
assert_eq!(handle.wait().await.unwrap(), 5);
assert_eq!(client.batch_sizes.lock().unwrap().as_slice(), &[2, 1, 2, 1]);
}
@@ -381,31 +248,42 @@ mod tests {
delay: Duration::from_millis(1),
batch_sizes: Default::default(),
});
let provider = Arc::new(KafkaProvider::new(String::new()));
let producer = OrderedBatchProducer::new(
OrderedBatchProducer::channel(),
provider,
client.clone(),
Compression::NoCompression,
128,
64,
ReadableSize((record.approximate_size() * 2) as u64).as_bytes() as usize,
Box::new(NoopCollector),
);
let region_id = RegionId::new(1, 1);
let mut futures = FuturesUnordered::new();
futures.push(
producer
.produce(vec![record.clone(), record.clone(), record.clone()])
.produce(
region_id,
vec![record.clone(), record.clone(), record.clone()],
)
.await
.unwrap()
.wait(),
);
futures.push(
producer
.produce(vec![record.clone(), record.clone()])
.produce(region_id, vec![record.clone(), record.clone()])
.await
.unwrap()
.wait(),
);
futures.push(
producer
.produce(region_id, vec![record.clone()])
.await
.unwrap()
.wait(),
);
futures.push(producer.produce(vec![record.clone()]).await.unwrap().wait());
futures.next().await.unwrap().unwrap_err();
futures.next().await.unwrap().unwrap_err();
@@ -422,22 +300,33 @@ mod tests {
batch_sizes: Default::default(),
});
let provider = Arc::new(KafkaProvider::new(String::new()));
let producer = OrderedBatchProducer::new(
OrderedBatchProducer::channel(),
provider,
client.clone(),
Compression::NoCompression,
128,
64,
ReadableSize((record.approximate_size() * 2) as u64).as_bytes() as usize,
Box::new(NoopCollector),
);
let region_id = RegionId::new(1, 1);
let a = producer
.produce(vec![record.clone(), record.clone(), record.clone()])
.produce(
region_id,
vec![record.clone(), record.clone(), record.clone()],
)
.await
.unwrap()
.wait()
.fuse();
let b = producer.produce(vec![record]).await.unwrap().wait().fuse();
let b = producer
.produce(region_id, vec![record])
.await
.unwrap()
.wait()
.fuse();
let mut b = Box::pin(b);

View File

@@ -0,0 +1,205 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub(crate) mod checkpoint;
pub(crate) mod flush;
pub(crate) mod produce;
use std::sync::Arc;
use common_telemetry::debug;
use futures::future::try_join_all;
use rskafka::client::partition::Compression;
use rskafka::record::Record;
use snafu::{OptionExt, ResultExt};
use store_api::logstore::provider::KafkaProvider;
use store_api::logstore::EntryId;
use store_api::storage::RegionId;
use tokio::sync::mpsc::Receiver;
use tokio::sync::oneshot::{self};
use super::index::IndexEncoder;
use crate::error::{self, NoMaxValueSnafu, Result};
use crate::kafka::index::IndexCollector;
use crate::kafka::producer::ProducerClient;
pub(crate) enum WorkerRequest {
Produce(ProduceRequest),
Checkpoint,
TruncateIndex(TruncateIndexRequest),
DumpIndex(DumpIndexRequest),
}
impl WorkerRequest {
pub(crate) fn new_produce_request(
region_id: RegionId,
batch: Vec<Record>,
) -> (WorkerRequest, ProduceResultHandle) {
let (tx, rx) = oneshot::channel();
(
WorkerRequest::Produce(ProduceRequest {
region_id,
batch,
sender: tx,
}),
ProduceResultHandle { receiver: rx },
)
}
}
pub(crate) struct DumpIndexRequest {
encoder: Arc<dyn IndexEncoder>,
sender: oneshot::Sender<()>,
}
impl DumpIndexRequest {
pub fn new(encoder: Arc<dyn IndexEncoder>) -> (DumpIndexRequest, oneshot::Receiver<()>) {
let (tx, rx) = oneshot::channel();
(
DumpIndexRequest {
encoder,
sender: tx,
},
rx,
)
}
}
pub(crate) struct TruncateIndexRequest {
region_id: RegionId,
entry_id: EntryId,
}
pub(crate) struct ProduceRequest {
region_id: RegionId,
batch: Vec<Record>,
sender: oneshot::Sender<ProduceResultReceiver>,
}
/// Receives the committed offsets when data has been committed to Kafka
/// or an unrecoverable error has been encountered.
pub(crate) struct ProduceResultHandle {
receiver: oneshot::Receiver<ProduceResultReceiver>,
}
impl ProduceResultHandle {
/// Waits for the data has been committed to Kafka.
/// Returns the **max** committed offsets.
pub(crate) async fn wait(self) -> Result<u64> {
self.receiver
.await
.context(error::WaitProduceResultReceiverSnafu)?
.wait()
.await
}
}
#[derive(Default)]
pub(crate) struct ProduceResultReceiver {
receivers: Vec<oneshot::Receiver<Result<Vec<i64>>>>,
}
impl ProduceResultReceiver {
fn add_receiver(&mut self, receiver: oneshot::Receiver<Result<Vec<i64>>>) {
self.receivers.push(receiver)
}
async fn wait(self) -> Result<u64> {
Ok(try_join_all(self.receivers)
.await
.into_iter()
.flatten()
.collect::<Result<Vec<_>>>()?
.into_iter()
.flatten()
.max()
.context(NoMaxValueSnafu)? as u64)
}
}
pub(crate) struct PendingRequest {
batch: Vec<Record>,
region_ids: Vec<RegionId>,
size: usize,
sender: oneshot::Sender<Result<Vec<i64>>>,
}
pub(crate) struct BackgroundProducerWorker {
pub(crate) provider: Arc<KafkaProvider>,
/// The [`ProducerClient`].
pub(crate) client: Arc<dyn ProducerClient>,
// The compression configuration.
pub(crate) compression: Compression,
/// Receiver of [ProduceRequest].
pub(crate) receiver: Receiver<WorkerRequest>,
/// Max batch size for a worker to handle requests.
pub(crate) request_batch_size: usize,
/// Max bytes size for a single flush.
pub(crate) max_batch_bytes: usize,
/// Collecting ids of WAL entries.
pub(crate) index_collector: Box<dyn IndexCollector>,
}
impl BackgroundProducerWorker {
pub(crate) async fn run(&mut self) {
let mut buffer = Vec::with_capacity(self.request_batch_size);
loop {
match self.receiver.recv().await {
Some(req) => {
buffer.clear();
buffer.push(req);
for _ in 1..self.request_batch_size {
match self.receiver.try_recv() {
Ok(req) => buffer.push(req),
Err(_) => break,
}
}
self.handle_requests(&mut buffer).await;
}
None => {
debug!("The sender is dropped, BackgroundProducerWorker exited");
// Exits the loop if the `sender` is dropped.
break;
}
}
}
}
async fn handle_requests(&mut self, buffer: &mut Vec<WorkerRequest>) {
let mut produce_requests = Vec::with_capacity(buffer.len());
let mut do_checkpoint = false;
for req in buffer.drain(..) {
match req {
WorkerRequest::Produce(req) => produce_requests.push(req),
WorkerRequest::Checkpoint => do_checkpoint = true,
WorkerRequest::TruncateIndex(TruncateIndexRequest {
region_id,
entry_id,
}) => self.index_collector.truncate(region_id, entry_id),
WorkerRequest::DumpIndex(req) => {
self.index_collector.dump(req.encoder.as_ref());
let _ = req.sender.send(());
}
}
}
let pending_requests = self.aggregate_records(&mut produce_requests, self.max_batch_bytes);
self.try_flush_pending_requests(pending_requests).await;
if do_checkpoint {
self.do_checkpoint().await;
}
}
}

View File

@@ -0,0 +1,35 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_telemetry::error;
use rskafka::client::partition::OffsetAt;
use snafu::ResultExt;
use crate::error;
use crate::kafka::worker::BackgroundProducerWorker;
impl BackgroundProducerWorker {
pub(crate) async fn do_checkpoint(&mut self) {
match self
.client
.get_offset(OffsetAt::Latest)
.await
.context(error::GetOffsetSnafu {
topic: &self.provider.topic,
}) {
Ok(offset) => self.index_collector.set_latest_entry_id(offset as u64),
Err(err) => error!(err; "Failed to do checkpoint"),
}
}
}

View File

@@ -0,0 +1,57 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_telemetry::warn;
use snafu::ResultExt;
use crate::error;
use crate::kafka::worker::{BackgroundProducerWorker, PendingRequest};
impl BackgroundProducerWorker {
async fn do_flush(
&mut self,
PendingRequest {
batch,
region_ids,
sender,
size: _size,
}: PendingRequest,
) {
let result = self
.client
.produce(batch, self.compression)
.await
.context(error::BatchProduceSnafu);
if let Ok(result) = &result {
for (idx, region_id) in result.iter().zip(region_ids) {
self.index_collector.append(region_id, *idx as u64);
}
}
if let Err(err) = sender.send(result) {
warn!(err; "BatchFlushState Receiver is dropped");
}
}
pub(crate) async fn try_flush_pending_requests(
&mut self,
pending_requests: Vec<PendingRequest>,
) {
// TODO(weny): Considering merge `PendingRequest`s.
for req in pending_requests {
self.do_flush(req).await
}
}
}

View File

@@ -0,0 +1,83 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_telemetry::tracing::warn;
use tokio::sync::oneshot;
use crate::kafka::worker::{
BackgroundProducerWorker, PendingRequest, ProduceRequest, ProduceResultReceiver,
};
impl BackgroundProducerWorker {
/// Aggregates records into batches, ensuring that the size of each batch does not exceed a specified maximum (`max_batch_bytes`).
///
/// ## Panic
/// Panic if any [Record]'s `approximate_size` > `max_batch_bytes`.
pub(crate) fn aggregate_records(
&self,
requests: &mut Vec<ProduceRequest>,
max_batch_bytes: usize,
) -> Vec<PendingRequest> {
let mut records_buffer = vec![];
let mut region_ids = vec![];
let mut batch_size = 0;
let mut pending_requests = Vec::with_capacity(requests.len());
for ProduceRequest {
batch,
sender,
region_id,
} in std::mem::take(requests)
{
let mut receiver = ProduceResultReceiver::default();
for record in batch {
assert!(record.approximate_size() <= max_batch_bytes);
// Yields the `PendingRequest` if buffer is full.
if batch_size + record.approximate_size() > max_batch_bytes {
let (tx, rx) = oneshot::channel();
pending_requests.push(PendingRequest {
batch: std::mem::take(&mut records_buffer),
region_ids: std::mem::take(&mut region_ids),
size: batch_size,
sender: tx,
});
batch_size = 0;
receiver.add_receiver(rx);
}
batch_size += record.approximate_size();
records_buffer.push(record);
region_ids.push(region_id);
}
// The remaining records.
if batch_size > 0 {
// Yields `PendingRequest`
let (tx, rx) = oneshot::channel();
pending_requests.push(PendingRequest {
batch: std::mem::take(&mut records_buffer),
region_ids: std::mem::take(&mut region_ids),
size: batch_size,
sender: tx,
});
batch_size = 0;
receiver.add_receiver(rx);
}
if sender.send(receiver).is_err() {
warn!("The Receiver of ProduceResultReceiver is dropped");
}
}
pending_requests
}
}