feat: implement the OrderedBatchProducer (#4134)

* feat: implement the `OrderedBatchProducer`

* test: add test of cancel safety

* chore: apply suggestions from CR

* chore: apply suggestions from CR

* refactor: simplify the `BackgroundProducerWorker`

* feat: implement the OrderedBatchProducer v2

* refactor: switch to `OrderedBatchProducer`

* chore: rename to `MAX_FLUSH_QUEUE_SIZE`

* refactor: switch to `OrderedBatchProducerV2`

* refactor: remove `OrderedBatchProducerV1`

* test: add tests

* refactor: make config configurable

* refactor: minor refactor

* chore: remove unused code

* chore: remove `benchmarks` crate

* chore: update config doc

* chore: remove unused comment

* refactor: refactor client registry

* refactor: rename `max_batch_size` to `max_batch_bytes`

* chore: use constant value

* chore: ensure serialized meta < ESTIMATED_META_SIZE

* chore: apply suggestions from CR

* chore: remove the `CHANNEL_SIZE`

* chore: apply suggestions from CR

* fix: ensure serialized meta < ESTIMATED_META_SIZE

* chore: apply suggestions from CR

* chore: apply suggestions from CR
This commit is contained in:
Weny Xu
2024-06-18 15:20:01 +08:00
committed by GitHub
parent 70d113a355
commit ea2d067cf1
29 changed files with 753 additions and 1263 deletions

View File

@@ -101,8 +101,7 @@ impl From<StandaloneWalConfig> for DatanodeWalConfig {
StandaloneWalConfig::Kafka(config) => Self::Kafka(DatanodeKafkaConfig {
broker_endpoints: config.broker_endpoints,
compression: config.compression,
max_batch_size: config.max_batch_size,
linger: config.linger,
max_batch_bytes: config.max_batch_bytes,
consumer_wait_timeout: config.consumer_wait_timeout,
backoff: config.backoff,
}),
@@ -176,7 +175,7 @@ mod tests {
topic_name_prefix = "greptimedb_wal_topic"
replication_factor = 1
create_topic_timeout = "30s"
max_batch_size = "1MB"
max_batch_bytes = "1MB"
linger = "200ms"
consumer_wait_timeout = "100ms"
backoff_init = "500ms"
@@ -209,8 +208,7 @@ mod tests {
let expected = DatanodeKafkaConfig {
broker_endpoints: vec!["127.0.0.1:9092".to_string()],
compression: Compression::default(),
max_batch_size: ReadableSize::mb(1),
linger: Duration::from_millis(200),
max_batch_bytes: ReadableSize::mb(1),
consumer_wait_timeout: Duration::from_millis(100),
backoff: BackoffConfig {
init: Duration::from_millis(500),
@@ -232,8 +230,7 @@ mod tests {
replication_factor: 1,
create_topic_timeout: Duration::from_secs(30),
compression: Compression::default(),
max_batch_size: ReadableSize::mb(1),
linger: Duration::from_millis(200),
max_batch_bytes: ReadableSize::mb(1),
consumer_wait_timeout: Duration::from_millis(100),
backoff: BackoffConfig {
init: Duration::from_millis(500),

View File

@@ -30,11 +30,10 @@ pub struct DatanodeKafkaConfig {
/// The compression algorithm used to compress kafka records.
#[serde(skip)]
pub compression: Compression,
/// TODO(weny): Remove the alias once we release v0.9.
/// The max size of a single producer batch.
pub max_batch_size: ReadableSize,
/// The linger duration of a kafka batch producer.
#[serde(with = "humantime_serde")]
pub linger: Duration,
#[serde(alias = "max_batch_size")]
pub max_batch_bytes: ReadableSize,
/// The consumer wait timeout.
#[serde(with = "humantime_serde")]
pub consumer_wait_timeout: Duration,
@@ -49,8 +48,7 @@ impl Default for DatanodeKafkaConfig {
broker_endpoints: vec![BROKER_ENDPOINT.to_string()],
compression: Compression::NoCompression,
// Warning: Kafka has a default limit of 1MB per message in a topic.
max_batch_size: ReadableSize::mb(1),
linger: Duration::from_millis(200),
max_batch_bytes: ReadableSize::mb(1),
consumer_wait_timeout: Duration::from_millis(100),
backoff: BackoffConfig::default(),
}

View File

@@ -43,11 +43,10 @@ pub struct StandaloneKafkaConfig {
/// The compression algorithm used to compress kafka records.
#[serde(skip)]
pub compression: Compression,
/// TODO(weny): Remove the alias once we release v0.9.
/// The max size of a single producer batch.
pub max_batch_size: ReadableSize,
/// The linger duration of a kafka batch producer.
#[serde(with = "humantime_serde")]
pub linger: Duration,
#[serde(alias = "max_batch_size")]
pub max_batch_bytes: ReadableSize,
/// The consumer wait timeout.
#[serde(with = "humantime_serde")]
pub consumer_wait_timeout: Duration,
@@ -70,8 +69,7 @@ impl Default for StandaloneKafkaConfig {
create_topic_timeout: Duration::from_secs(30),
compression: Compression::NoCompression,
// Warning: Kafka has a default limit of 1MB per message in a topic.
max_batch_size: ReadableSize::mb(1),
linger: Duration::from_millis(200),
max_batch_bytes: ReadableSize::mb(1),
consumer_wait_timeout: Duration::from_millis(100),
backoff: BackoffConfig::default(),
}

View File

@@ -44,4 +44,5 @@ common-wal = { workspace = true, features = ["testing"] }
itertools.workspace = true
rand.workspace = true
rand_distr = "0.4"
rskafka = { workspace = true, features = ["unstable-fuzzing"] }
uuid.workspace = true

View File

@@ -21,6 +21,8 @@ use serde_json::error::Error as JsonError;
use snafu::{Location, Snafu};
use store_api::storage::RegionId;
use crate::kafka::producer::ProduceRequest;
#[derive(Snafu)]
#[snafu(visibility(pub))]
#[stack_trace_debug]
@@ -129,6 +131,12 @@ pub enum Error {
error: rskafka::client::error::Error,
},
#[snafu(display("Failed to found client"))]
ClientNotFount {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to resolve Kafka broker endpoint."))]
ResolveKafkaEndpoint { source: common_wal::error::Error },
@@ -186,6 +194,14 @@ pub enum Error {
error: rskafka::client::producer::Error,
},
#[snafu(display("Failed to produce batch records to Kafka"))]
BatchProduce {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: rskafka::client::error::Error,
},
#[snafu(display("Failed to read a record from Kafka, topic: {}", topic))]
ConsumeRecord {
topic: String,
@@ -244,6 +260,40 @@ pub enum Error {
last_index: u64,
attempt_index: u64,
},
#[snafu(display("Failed to send produce request"))]
SendProduceRequest {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: tokio::sync::mpsc::error::SendError<ProduceRequest>,
},
#[snafu(display("Failed to send produce request"))]
WaitProduceResultReceiver {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: tokio::sync::oneshot::error::RecvError,
},
#[snafu(display(
"The length of meta if exceeded the limit: {}, actual: {}",
limit,
actual
))]
MetaLengthExceededLimit {
#[snafu(implicit)]
location: Location,
limit: usize,
actual: usize,
},
#[snafu(display("No max value"))]
NoMaxValue {
#[snafu(implicit)]
location: Location,
},
}
impl ErrorExt for Error {

View File

@@ -14,6 +14,7 @@
pub(crate) mod client_manager;
pub mod log_store;
pub(crate) mod producer;
pub(crate) mod util;
use serde::{Deserialize, Serialize};

View File

@@ -16,62 +16,58 @@ use std::collections::HashMap;
use std::sync::Arc;
use common_wal::config::kafka::DatanodeKafkaConfig;
use rskafka::client::partition::{PartitionClient, UnknownTopicHandling};
use rskafka::client::producer::aggregator::RecordAggregator;
use rskafka::client::producer::{BatchProducer, BatchProducerBuilder};
use rskafka::client::{Client as RsKafkaClient, ClientBuilder};
use rskafka::client::partition::{Compression, PartitionClient, UnknownTopicHandling};
use rskafka::client::ClientBuilder;
use rskafka::BackoffConfig;
use snafu::ResultExt;
use tokio::sync::RwLock;
use store_api::logstore::provider::KafkaProvider;
use tokio::sync::{Mutex, RwLock};
use super::producer::OrderedBatchProducer;
use crate::error::{
BuildClientSnafu, BuildPartitionClientSnafu, ResolveKafkaEndpointSnafu, Result,
};
use crate::kafka::util::record::MIN_BATCH_SIZE;
use crate::kafka::producer::OrderedBatchProducerRef;
// Each topic only has one partition for now.
// The `DEFAULT_PARTITION` refers to the index of the partition.
const DEFAULT_PARTITION: i32 = 0;
// Max batch size for a `OrderedBatchProducer` to handle requests.
const REQUEST_BATCH_SIZE: usize = 64;
/// Arc wrapper of ClientManager.
pub(crate) type ClientManagerRef = Arc<ClientManager>;
/// A client through which to contact Kafka cluster. Each client associates with one partition of a topic.
/// Since a topic only has one partition in our design, the mapping between clients and topics are one-one.
/// Topic client.
#[derive(Debug, Clone)]
pub(crate) struct Client {
/// A raw client used to construct a batch producer and/or a stream consumer for a specific topic.
pub(crate) raw_client: Arc<PartitionClient>,
/// A producer used to buffer log entries for a specific topic before sending them in a batching manner.
pub(crate) producer: Arc<BatchProducer<RecordAggregator>>,
client: Arc<PartitionClient>,
producer: OrderedBatchProducerRef,
}
impl Client {
/// Creates a Client from the raw client.
pub(crate) fn new(raw_client: Arc<PartitionClient>, config: &DatanodeKafkaConfig) -> Self {
let record_aggregator =
RecordAggregator::new((config.max_batch_size.as_bytes() as usize).max(MIN_BATCH_SIZE));
let batch_producer = BatchProducerBuilder::new(raw_client.clone())
.with_compression(config.compression)
.with_linger(config.linger)
.build(record_aggregator);
pub(crate) fn client(&self) -> &Arc<PartitionClient> {
&self.client
}
Self {
raw_client,
producer: Arc::new(batch_producer),
}
pub(crate) fn producer(&self) -> &OrderedBatchProducerRef {
&self.producer
}
}
/// Manages client construction and accesses.
#[derive(Debug)]
pub(crate) struct ClientManager {
pub(crate) config: DatanodeKafkaConfig,
/// Top-level client in kafka. All clients are constructed by this client.
client_factory: RsKafkaClient,
/// A pool maintaining a collection of clients.
/// Key: a topic. Value: the associated client of the topic.
client_pool: RwLock<HashMap<String, Client>>,
client: rskafka::client::Client,
/// Used to initialize a new [Client].
mutex: Mutex<()>,
instances: RwLock<HashMap<Arc<KafkaProvider>, Client>>,
producer_channel_size: usize,
producer_request_batch_size: usize,
flush_batch_size: usize,
compression: Compression,
}
impl ClientManager {
@@ -96,48 +92,70 @@ impl ClientManager {
})?;
Ok(Self {
config: config.clone(),
client_factory: client,
client_pool: RwLock::new(HashMap::new()),
client,
mutex: Mutex::new(()),
instances: RwLock::new(HashMap::new()),
producer_channel_size: REQUEST_BATCH_SIZE * 2,
producer_request_batch_size: REQUEST_BATCH_SIZE,
flush_batch_size: config.max_batch_bytes.as_bytes() as usize,
compression: config.compression,
})
}
/// Gets the client associated with the topic. If the client does not exist, a new one will
/// be created and returned.
pub(crate) async fn get_or_insert(&self, topic: &String) -> Result<Client> {
{
let client_pool = self.client_pool.read().await;
if let Some(client) = client_pool.get(topic) {
return Ok(client.clone());
}
}
async fn try_insert(&self, provider: &Arc<KafkaProvider>) -> Result<Client> {
let _guard = self.mutex.lock().await;
let mut client_pool = self.client_pool.write().await;
match client_pool.get(topic) {
Some(client) => Ok(client.clone()),
let client = self.instances.read().await.get(provider).cloned();
match client {
Some(client) => Ok(client),
None => {
let client = self.try_create_client(topic).await?;
client_pool.insert(topic.clone(), client.clone());
let client = self.try_create_client(provider).await?;
self.instances
.write()
.await
.insert(provider.clone(), client.clone());
Ok(client)
}
}
}
async fn try_create_client(&self, topic: &String) -> Result<Client> {
/// Gets the client associated with the topic. If the client does not exist, a new one will
/// be created and returned.
pub(crate) async fn get_or_insert(&self, provider: &Arc<KafkaProvider>) -> Result<Client> {
let client = self.instances.read().await.get(provider).cloned();
match client {
Some(client) => Ok(client),
None => self.try_insert(provider).await,
}
}
async fn try_create_client(&self, provider: &Arc<KafkaProvider>) -> Result<Client> {
// Sets to Retry to retry connecting if the kafka cluter replies with an UnknownTopic error.
// That's because the topic is believed to exist as the metasrv is expected to create required topics upon start.
// The reconnecting won't stop until succeed or a different error returns.
let raw_client = self
.client_factory
.partition_client(topic, DEFAULT_PARTITION, UnknownTopicHandling::Retry)
let client = self
.client
.partition_client(
provider.topic.as_str(),
DEFAULT_PARTITION,
UnknownTopicHandling::Retry,
)
.await
.context(BuildPartitionClientSnafu {
topic,
topic: &provider.topic,
partition: DEFAULT_PARTITION,
})
.map(Arc::new)?;
Ok(Client::new(raw_client, &self.config))
let producer = Arc::new(OrderedBatchProducer::new(
client.clone(),
self.compression,
self.producer_channel_size,
self.producer_request_batch_size,
self.flush_batch_size,
));
Ok(Client { client, producer })
}
}
@@ -147,7 +165,32 @@ mod tests {
use tokio::sync::Barrier;
use super::*;
use crate::test_util::kafka::create_topics;
/// Creates `num_topiocs` number of topics each will be decorated by the given decorator.
pub async fn create_topics<F>(
num_topics: usize,
decorator: F,
broker_endpoints: &[String],
) -> Vec<String>
where
F: Fn(usize) -> String,
{
assert!(!broker_endpoints.is_empty());
let client = ClientBuilder::new(broker_endpoints.to_vec())
.build()
.await
.unwrap();
let ctrl_client = client.controller_client().unwrap();
let (topics, tasks): (Vec<_>, Vec<_>) = (0..num_topics)
.map(|i| {
let topic = decorator(i);
let task = ctrl_client.create_topic(topic.clone(), 1, 1, 500);
(topic, task)
})
.unzip();
futures::future::try_join_all(tasks).await.unwrap();
topics
}
/// Prepares for a test in that a collection of topics and a client manager are created.
async fn prepare(
@@ -184,12 +227,16 @@ mod tests {
// Gets all clients sequentially.
for (_, topic) in region_topic {
manager.get_or_insert(topic).await.unwrap();
let provider = Arc::new(KafkaProvider::new(topic.to_string()));
manager.get_or_insert(&provider).await.unwrap();
}
// Ensures all clients exist.
let client_pool = manager.client_pool.read().await;
let all_exist = topics.iter().all(|topic| client_pool.contains_key(topic));
let client_pool = manager.instances.read().await;
let all_exist = topics.iter().all(|topic| {
let provider = Arc::new(KafkaProvider::new(topic.to_string()));
client_pool.contains_key(&provider)
});
assert!(all_exist);
})
})
@@ -215,17 +262,22 @@ mod tests {
.map(|topic| {
let manager = manager.clone();
let barrier = barrier.clone();
tokio::spawn(async move {
barrier.wait().await;
assert!(manager.get_or_insert(&topic).await.is_ok());
let provider = Arc::new(KafkaProvider::new(topic));
assert!(manager.get_or_insert(&provider).await.is_ok());
})
})
.collect::<Vec<_>>();
futures::future::try_join_all(tasks).await.unwrap();
// Ensures all clients exist.
let client_pool = manager.client_pool.read().await;
let all_exist = topics.iter().all(|topic| client_pool.contains_key(topic));
let client_pool = manager.instances.read().await;
let all_exist = topics.iter().all(|topic| {
let provider = Arc::new(KafkaProvider::new(topic.to_string()));
client_pool.contains_key(&provider)
});
assert!(all_exist);
})
})

View File

@@ -12,11 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;
use common_telemetry::{debug, warn};
use common_wal::config::kafka::DatanodeKafkaConfig;
use futures::future::try_join_all;
use futures_util::StreamExt;
use rskafka::client::consumer::{StartOffset, StreamConsumerBuilder};
use rskafka::client::partition::OffsetAt;
@@ -30,26 +32,32 @@ use store_api::storage::RegionId;
use crate::error::{self, ConsumeRecordSnafu, Error, GetOffsetSnafu, InvalidProviderSnafu, Result};
use crate::kafka::client_manager::{ClientManager, ClientManagerRef};
use crate::kafka::util::offset::Offset;
use crate::kafka::producer::OrderedBatchProducerRef;
use crate::kafka::util::record::{
maybe_emit_entry, remaining_entries, Record, RecordProducer, ESTIMATED_META_SIZE,
convert_to_kafka_records, maybe_emit_entry, remaining_entries, Record, ESTIMATED_META_SIZE,
};
use crate::metrics;
/// A log store backed by Kafka.
#[derive(Debug)]
pub struct KafkaLogStore {
config: DatanodeKafkaConfig,
/// Manages kafka clients through which the log store contact the Kafka cluster.
/// The manager of topic clients.
client_manager: ClientManagerRef,
/// The max size of a batch.
max_batch_bytes: usize,
/// The consumer wait timeout.
consumer_wait_timeout: Duration,
}
impl KafkaLogStore {
/// Tries to create a Kafka log store.
pub async fn try_new(config: &DatanodeKafkaConfig) -> Result<Self> {
let client_manager = Arc::new(ClientManager::try_new(config).await?);
Ok(Self {
client_manager: Arc::new(ClientManager::try_new(config).await?),
config: config.clone(),
client_manager,
max_batch_bytes: config.max_batch_bytes.as_bytes() as usize,
consumer_wait_timeout: config.consumer_wait_timeout,
})
}
}
@@ -109,8 +117,7 @@ impl LogStore for KafkaLogStore {
actual: provider.type_name(),
})?;
let max_data_size =
self.client_manager.config.max_batch_size.as_bytes() as usize - ESTIMATED_META_SIZE;
let max_data_size = self.max_batch_bytes - ESTIMATED_META_SIZE;
Ok(build_entry(
data,
entry_id,
@@ -120,7 +127,6 @@ impl LogStore for KafkaLogStore {
))
}
// TODO(weny): refactor the writing.
/// Appends a batch of entries and returns a response containing a map where the key is a region id
/// while the value is the id of the last successfully written entry of the region.
async fn append_batch(&self, entries: Vec<Entry>) -> Result<AppendBatchResponse> {
@@ -137,39 +143,55 @@ impl LogStore for KafkaLogStore {
return Ok(AppendBatchResponse::default());
}
// Groups entries by region id and pushes them to an associated record producer.
let mut producers = HashMap::with_capacity(entries.len());
let region_ids = entries
.iter()
.map(|entry| entry.region_id())
.collect::<HashSet<_>>();
let mut region_grouped_records: HashMap<RegionId, (OrderedBatchProducerRef, Vec<_>)> =
HashMap::with_capacity(region_ids.len());
for entry in entries {
let provider = entry
.provider()
.as_kafka_provider()
.context(error::InvalidProviderSnafu {
let provider = entry.provider().as_kafka_provider().with_context(|| {
error::InvalidProviderSnafu {
expected: KafkaProvider::type_name(),
actual: entry.provider().type_name(),
})?
.clone();
producers
.entry(entry.region_id())
.or_insert_with(|| RecordProducer::new(provider))
.push(entry);
}
})?;
let region_id = entry.region_id();
match region_grouped_records.entry(region_id) {
std::collections::hash_map::Entry::Occupied(mut slot) => {
slot.get_mut().1.extend(convert_to_kafka_records(entry)?);
}
std::collections::hash_map::Entry::Vacant(slot) => {
let producer = self
.client_manager
.get_or_insert(provider)
.await?
.producer()
.clone();
slot.insert((producer, convert_to_kafka_records(entry)?));
}
}
}
// Produces entries for each region and gets the offset those entries written to.
// The returned offset is then converted into an entry id.
let last_entry_ids = futures::future::try_join_all(producers.into_iter().map(
|(region_id, producer)| async move {
let entry_id = producer
.produce(&self.client_manager)
.await
.map(TryInto::try_into)??;
Ok((region_id, entry_id))
},
))
.await?
.into_iter()
.collect::<HashMap<_, _>>();
let mut region_grouped_result_receivers = Vec::with_capacity(region_ids.len());
for (region_id, (producer, records)) in region_grouped_records {
// Safety: `KafkaLogStore::entry` will ensure that the
// `Record`'s `approximate_size` must be less or equal to `max_batch_bytes`.
region_grouped_result_receivers.push((region_id, producer.produce(records).await?))
}
Ok(AppendBatchResponse { last_entry_ids })
let region_grouped_max_offset =
try_join_all(region_grouped_result_receivers.into_iter().map(
|(region_id, receiver)| async move {
receiver.wait().await.map(|offset| (region_id, offset))
},
))
.await?;
Ok(AppendBatchResponse {
last_entry_ids: region_grouped_max_offset.into_iter().collect(),
})
}
/// Creates a new `EntryStream` to asynchronously generates `Entry` with entry ids.
@@ -192,9 +214,9 @@ impl LogStore for KafkaLogStore {
// Gets the client associated with the topic.
let client = self
.client_manager
.get_or_insert(&provider.topic)
.get_or_insert(provider)
.await?
.raw_client
.client()
.clone();
// Gets the offset of the latest record in the topic. Actually, it's the latest record of the single partition in the topic.
@@ -209,7 +231,7 @@ impl LogStore for KafkaLogStore {
})?
- 1;
// Reads entries with offsets in the range [start_offset, end_offset].
let start_offset = Offset::try_from(entry_id)?.0;
let start_offset = entry_id as i64;
debug!(
"Start reading entries in range [{}, {}] for ns {}",
@@ -227,8 +249,8 @@ impl LogStore for KafkaLogStore {
}
let mut stream_consumer = StreamConsumerBuilder::new(client, StartOffset::At(start_offset))
.with_max_batch_size(self.config.max_batch_size.as_bytes() as i32)
.with_max_wait_ms(self.config.consumer_wait_timeout.as_millis() as i32)
.with_max_batch_size(self.max_batch_bytes as i32)
.with_max_wait_ms(self.consumer_wait_timeout.as_millis() as i32)
.build();
debug!(
@@ -440,7 +462,7 @@ mod tests {
.collect::<Vec<_>>();
let config = DatanodeKafkaConfig {
broker_endpoints,
max_batch_size: ReadableSize::kb(32),
max_batch_bytes: ReadableSize::kb(32),
..Default::default()
};
let logstore = KafkaLogStore::try_new(&config).await.unwrap();
@@ -509,7 +531,7 @@ mod tests {
.collect::<Vec<_>>();
let config = DatanodeKafkaConfig {
broker_endpoints,
max_batch_size: ReadableSize::kb(8),
max_batch_bytes: ReadableSize::kb(8),
..Default::default()
};
let logstore = KafkaLogStore::try_new(&config).await.unwrap();

View File

@@ -0,0 +1,474 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use common_telemetry::{debug, warn};
use futures::future::try_join_all;
use rskafka::client::partition::Compression;
use rskafka::client::producer::ProducerClient;
use rskafka::record::Record;
use snafu::{OptionExt, ResultExt};
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::sync::oneshot;
use crate::error::{self, NoMaxValueSnafu, Result};
pub struct ProduceRequest {
batch: Vec<Record>,
sender: oneshot::Sender<ProduceResultReceiver>,
}
#[derive(Default)]
struct ProduceResultReceiver {
receivers: Vec<oneshot::Receiver<Result<Vec<i64>>>>,
}
impl ProduceResultReceiver {
fn add_receiver(&mut self, receiver: oneshot::Receiver<Result<Vec<i64>>>) {
self.receivers.push(receiver)
}
async fn wait(self) -> Result<u64> {
Ok(try_join_all(self.receivers)
.await
.into_iter()
.flatten()
.collect::<Result<Vec<_>>>()?
.into_iter()
.flatten()
.max()
.context(NoMaxValueSnafu)? as u64)
}
}
struct BackgroundProducerWorker {
/// The [`ProducerClient`].
client: Arc<dyn ProducerClient>,
// The compression configuration.
compression: Compression,
// The running flag.
running: Arc<AtomicBool>,
/// Receiver of [ProduceRequest].
receiver: Receiver<ProduceRequest>,
/// Max batch size for a worker to handle requests.
request_batch_size: usize,
/// Max bytes size for a single flush.
max_batch_bytes: usize,
/// The [PendingRequest]s.
pending_requests: Vec<PendingRequest>,
}
struct PendingRequest {
batch: Vec<Record>,
size: usize,
sender: oneshot::Sender<Result<Vec<i64>>>,
}
/// ## Panic
/// Panic if any [Record]'s `approximate_size` > `max_batch_bytes`.
fn handle_produce_requests(
requests: &mut Vec<ProduceRequest>,
max_batch_bytes: usize,
) -> Vec<PendingRequest> {
let mut records_buffer = vec![];
let mut batch_size = 0;
let mut pending_requests = Vec::with_capacity(requests.len());
for ProduceRequest { batch, sender } in requests.drain(..) {
let mut receiver = ProduceResultReceiver::default();
for record in batch {
assert!(record.approximate_size() <= max_batch_bytes);
// Yields the `PendingRequest` if buffer is full.
if batch_size + record.approximate_size() > max_batch_bytes {
let (tx, rx) = oneshot::channel();
pending_requests.push(PendingRequest {
batch: std::mem::take(&mut records_buffer),
size: batch_size,
sender: tx,
});
batch_size = 0;
receiver.add_receiver(rx);
}
batch_size += record.approximate_size();
records_buffer.push(record);
}
// The remaining records.
if batch_size > 0 {
// Yields `PendingRequest`
let (tx, rx) = oneshot::channel();
pending_requests.push(PendingRequest {
batch: std::mem::take(&mut records_buffer),
size: batch_size,
sender: tx,
});
batch_size = 0;
receiver.add_receiver(rx);
}
let _ = sender.send(receiver);
}
pending_requests
}
async fn do_flush(
client: &Arc<dyn ProducerClient>,
PendingRequest {
batch,
sender,
size: _size,
}: PendingRequest,
compression: Compression,
) {
let result = client
.produce(batch, compression)
.await
.context(error::BatchProduceSnafu);
if let Err(err) = sender.send(result) {
warn!(err; "BatchFlushState Receiver is dropped");
}
}
impl BackgroundProducerWorker {
async fn run(&mut self) {
let mut buffer = Vec::with_capacity(self.request_batch_size);
while self.running.load(Ordering::Relaxed) {
// Processes pending requests first.
if !self.pending_requests.is_empty() {
// TODO(weny): Considering merge `PendingRequest`s.
for req in self.pending_requests.drain(..) {
do_flush(&self.client, req, self.compression).await
}
}
match self.receiver.recv().await {
Some(req) => {
buffer.clear();
buffer.push(req);
for _ in 1..self.request_batch_size {
match self.receiver.try_recv() {
Ok(req) => buffer.push(req),
Err(_) => break,
}
}
self.pending_requests =
handle_produce_requests(&mut buffer, self.max_batch_bytes);
}
None => {
debug!("The sender is dropped, BackgroundProducerWorker exited");
// Exits the loop if the `sender` is dropped.
break;
}
}
}
}
}
pub type OrderedBatchProducerRef = Arc<OrderedBatchProducer>;
/// [`OrderedBatchProducer`] attempts to aggregate multiple produce requests together
#[derive(Debug)]
pub(crate) struct OrderedBatchProducer {
sender: Sender<ProduceRequest>,
/// Used to control the [`BackgroundProducerWorker`].
running: Arc<AtomicBool>,
}
impl Drop for OrderedBatchProducer {
fn drop(&mut self) {
self.running.store(false, Ordering::Relaxed);
}
}
/// Receives the committed offsets when data has been committed to Kafka
/// or an unrecoverable error has been encountered.
pub(crate) struct ProduceResultHandle {
receiver: oneshot::Receiver<ProduceResultReceiver>,
}
impl ProduceResultHandle {
/// Waits for the data has been committed to Kafka.
/// Returns the **max** committed offsets.
pub(crate) async fn wait(self) -> Result<u64> {
self.receiver
.await
.context(error::WaitProduceResultReceiverSnafu)?
.wait()
.await
}
}
impl OrderedBatchProducer {
/// Constructs a new [`OrderedBatchProducer`].
pub(crate) fn new(
client: Arc<dyn ProducerClient>,
compression: Compression,
channel_size: usize,
request_batch_size: usize,
max_batch_bytes: usize,
) -> Self {
let (tx, rx) = mpsc::channel(channel_size);
let running = Arc::new(AtomicBool::new(true));
let mut worker = BackgroundProducerWorker {
client,
compression,
running: running.clone(),
receiver: rx,
request_batch_size,
max_batch_bytes,
pending_requests: vec![],
};
tokio::spawn(async move { worker.run().await });
Self {
sender: tx,
running,
}
}
/// Writes `data` to the [`OrderedBatchProducer`].
///
/// Returns the [ProduceResultHandle], which will receive a result when data has been committed to Kafka
/// or an unrecoverable error has been encountered.
///
/// ## Panic
/// Panic if any [Record]'s `approximate_size` > `max_batch_bytes`.
pub(crate) async fn produce(&self, batch: Vec<Record>) -> Result<ProduceResultHandle> {
let receiver = {
let (tx, rx) = oneshot::channel();
self.sender
.send(ProduceRequest { batch, sender: tx })
.await
.context(error::SendProduceRequestSnafu)?;
rx
};
Ok(ProduceResultHandle { receiver })
}
}
#[cfg(test)]
mod tests {
use std::sync::{Arc, Mutex};
use std::time::Duration;
use chrono::{TimeZone, Utc};
use common_base::readable_size::ReadableSize;
use common_telemetry::debug;
use futures::future::BoxFuture;
use futures::stream::FuturesUnordered;
use futures::{FutureExt, StreamExt};
use rskafka::client::error::{Error as ClientError, RequestContext};
use rskafka::client::partition::Compression;
use rskafka::client::producer::ProducerClient;
use rskafka::protocol::error::Error as ProtocolError;
use rskafka::record::Record;
use crate::kafka::producer::OrderedBatchProducer;
#[derive(Debug)]
struct MockClient {
error: Option<ProtocolError>,
panic: Option<String>,
delay: Duration,
batch_sizes: Mutex<Vec<usize>>,
}
impl ProducerClient for MockClient {
fn produce(
&self,
records: Vec<Record>,
_compression: Compression,
) -> BoxFuture<'_, Result<Vec<i64>, ClientError>> {
Box::pin(async move {
tokio::time::sleep(self.delay).await;
if let Some(e) = self.error {
return Err(ClientError::ServerError {
protocol_error: e,
error_message: None,
request: RequestContext::Partition("foo".into(), 1),
response: None,
is_virtual: false,
});
}
if let Some(p) = self.panic.as_ref() {
panic!("{}", p);
}
let mut batch_sizes = self.batch_sizes.lock().unwrap();
let offset_base = batch_sizes.iter().sum::<usize>();
let offsets = (0..records.len())
.map(|x| (x + offset_base) as i64)
.collect();
batch_sizes.push(records.len());
debug!("Return offsets: {offsets:?}");
Ok(offsets)
})
}
}
fn record() -> Record {
Record {
key: Some(vec![0; 4]),
value: Some(vec![0; 6]),
headers: Default::default(),
timestamp: Utc.timestamp_millis_opt(320).unwrap(),
}
}
#[tokio::test]
async fn test_producer() {
common_telemetry::init_default_ut_logging();
let record = record();
let delay = Duration::from_secs(0);
let client = Arc::new(MockClient {
error: None,
panic: None,
delay,
batch_sizes: Default::default(),
});
let producer = OrderedBatchProducer::new(
client.clone(),
Compression::NoCompression,
128,
64,
ReadableSize((record.approximate_size() * 2) as u64).as_bytes() as usize,
);
// Produces 3 records
let handle = producer
.produce(vec![record.clone(), record.clone(), record.clone()])
.await
.unwrap();
assert_eq!(handle.wait().await.unwrap(), 2);
assert_eq!(client.batch_sizes.lock().unwrap().as_slice(), &[2, 1]);
// Produces 2 records
let handle = producer
.produce(vec![record.clone(), record.clone()])
.await
.unwrap();
assert_eq!(handle.wait().await.unwrap(), 4);
assert_eq!(client.batch_sizes.lock().unwrap().as_slice(), &[2, 1, 2]);
// Produces 1 records
let handle = producer.produce(vec![record.clone()]).await.unwrap();
assert_eq!(handle.wait().await.unwrap(), 5);
assert_eq!(client.batch_sizes.lock().unwrap().as_slice(), &[2, 1, 2, 1]);
}
#[tokio::test]
async fn test_producer_client_error() {
let record = record();
let client = Arc::new(MockClient {
error: Some(ProtocolError::NetworkException),
panic: None,
delay: Duration::from_millis(1),
batch_sizes: Default::default(),
});
let producer = OrderedBatchProducer::new(
client.clone(),
Compression::NoCompression,
128,
64,
ReadableSize((record.approximate_size() * 2) as u64).as_bytes() as usize,
);
let mut futures = FuturesUnordered::new();
futures.push(
producer
.produce(vec![record.clone(), record.clone(), record.clone()])
.await
.unwrap()
.wait(),
);
futures.push(
producer
.produce(vec![record.clone(), record.clone()])
.await
.unwrap()
.wait(),
);
futures.push(producer.produce(vec![record.clone()]).await.unwrap().wait());
futures.next().await.unwrap().unwrap_err();
futures.next().await.unwrap().unwrap_err();
futures.next().await.unwrap().unwrap_err();
}
#[tokio::test]
async fn test_producer_cancel() {
let record = record();
let client = Arc::new(MockClient {
error: None,
panic: None,
delay: Duration::from_millis(1),
batch_sizes: Default::default(),
});
let producer = OrderedBatchProducer::new(
client.clone(),
Compression::NoCompression,
128,
64,
ReadableSize((record.approximate_size() * 2) as u64).as_bytes() as usize,
);
let a = producer
.produce(vec![record.clone(), record.clone(), record.clone()])
.await
.unwrap()
.wait()
.fuse();
let b = producer.produce(vec![record]).await.unwrap().wait().fuse();
let mut b = Box::pin(b);
{
// Cancel a when it exits this block
let mut a = Box::pin(a);
// Select biased to encourage `a` to be the one with the linger that
// expires first and performs the produce operation
futures::select_biased! {
_ = &mut a => panic!("a should not have flushed"),
_ = &mut b => panic!("b should not have flushed"),
_ = tokio::time::sleep(Duration::from_millis(1)).fuse() => {},
}
}
// But `b` should still complete successfully
tokio::time::timeout(Duration::from_secs(1), b)
.await
.unwrap()
.unwrap();
assert_eq!(
client
.batch_sizes
.lock()
.unwrap()
.as_slice()
.iter()
.sum::<usize>(),
4
);
}
}

View File

@@ -12,5 +12,4 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod offset;
pub mod record;

View File

@@ -1,37 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::error::{CastSnafu, Result};
use crate::kafka::EntryId;
/// A wrapper of kafka offset.
pub(crate) struct Offset(pub i64);
impl TryFrom<Offset> for EntryId {
type Error = crate::error::Error;
fn try_from(offset: Offset) -> Result<Self> {
EntryId::try_from(offset.0).map_err(|_| CastSnafu.build())
}
}
impl TryFrom<EntryId> for Offset {
type Error = crate::error::Error;
fn try_from(entry_id: EntryId) -> Result<Self> {
i64::try_from(entry_id)
.map(Offset)
.map_err(|_| CastSnafu.build())
}
}

View File

@@ -23,24 +23,18 @@ use store_api::logstore::provider::{KafkaProvider, Provider};
use store_api::storage::RegionId;
use crate::error::{
DecodeJsonSnafu, EmptyEntriesSnafu, EncodeJsonSnafu, GetClientSnafu, IllegalSequenceSnafu,
MissingKeySnafu, MissingValueSnafu, ProduceRecordSnafu, Result,
DecodeJsonSnafu, EncodeJsonSnafu, IllegalSequenceSnafu, MetaLengthExceededLimitSnafu,
MissingKeySnafu, MissingValueSnafu, Result,
};
use crate::kafka::client_manager::ClientManagerRef;
use crate::kafka::util::offset::Offset;
use crate::kafka::{EntryId, NamespaceImpl};
use crate::metrics;
/// The current version of Record.
pub(crate) const VERSION: u32 = 0;
/// The estimated size in bytes of a serialized RecordMeta.
/// A record is guaranteed to have sizeof(meta) + sizeof(data) <= max_batch_size - ESTIMATED_META_SIZE.
/// A record is guaranteed to have sizeof(meta) + sizeof(data) <= max_batch_byte - ESTIMATED_META_SIZE.
pub(crate) const ESTIMATED_META_SIZE: usize = 256;
/// The minimum batch size
pub(crate) const MIN_BATCH_SIZE: usize = 4 * 1024;
/// The type of a record.
///
/// - If the entry is able to fit into a Kafka record, it's converted into a Full record.
@@ -96,6 +90,13 @@ impl TryFrom<Record> for KafkaRecord {
fn try_from(record: Record) -> Result<Self> {
let key = serde_json::to_vec(&record.meta).context(EncodeJsonSnafu)?;
ensure!(
key.len() < ESTIMATED_META_SIZE,
MetaLengthExceededLimitSnafu {
limit: ESTIMATED_META_SIZE,
actual: key.len()
}
);
Ok(KafkaRecord {
key: Some(key),
value: Some(record.data),
@@ -117,77 +118,9 @@ impl TryFrom<KafkaRecord> for Record {
}
}
/// Produces a record to a kafka topic.
pub(crate) struct RecordProducer {
/// The provide of the entries.
provider: Arc<KafkaProvider>,
/// Entries are buffered before being built into a record.
entries: Vec<Entry>,
}
impl RecordProducer {
/// Creates a new producer for producing entries with the given namespace.
pub(crate) fn new(provider: Arc<KafkaProvider>) -> Self {
Self {
provider,
entries: Vec::new(),
}
}
/// Pushes an entry into the entry buffer.
pub(crate) fn push(&mut self, entry: Entry) {
self.entries.push(entry);
}
/// Produces the buffered entries to Kafka sever. Those entries may span several Kafka records.
/// Returns the offset of the last successfully produced record.
// TODO(niebayes): maybe requires more fine-grained metrics to measure stages of writing to kafka.
pub(crate) async fn produce(self, client_manager: &ClientManagerRef) -> Result<Offset> {
ensure!(!self.entries.is_empty(), EmptyEntriesSnafu);
// Gets the producer in which a record buffer is maintained.
let producer = client_manager
.get_or_insert(&self.provider.topic)
.await
.map_err(|e| {
GetClientSnafu {
topic: &self.provider.topic,
error: e.to_string(),
}
.build()
})?
.producer;
// Stores the offset of the last successfully produced record.
let mut last_offset = None;
for entry in self.entries {
for record in convert_to_records(entry) {
let kafka_record = KafkaRecord::try_from(record)?;
metrics::METRIC_KAFKA_PRODUCE_RECORD_COUNTS.inc();
metrics::METRIC_KAFKA_PRODUCE_RECORD_BYTES_TOTAL
.inc_by(kafka_record.approximate_size() as u64);
// Records of a certain region cannot be produced in parallel since their order must be static.
let offset = producer
.produce(kafka_record.clone())
.await
.map(Offset)
.with_context(|_| ProduceRecordSnafu {
topic: &self.provider.topic,
size: kafka_record.approximate_size(),
})?;
last_offset = Some(offset);
}
}
// Safety: there must be at least one record produced when the entries are guaranteed not empty.
Ok(last_offset.unwrap())
}
}
fn convert_to_records(entry: Entry) -> Vec<Record> {
pub(crate) fn convert_to_kafka_records(entry: Entry) -> Result<Vec<KafkaRecord>> {
match entry {
Entry::Naive(entry) => vec![Record {
Entry::Naive(entry) => Ok(vec![KafkaRecord::try_from(Record {
meta: RecordMeta {
version: VERSION,
tp: RecordType::Full,
@@ -200,7 +133,7 @@ fn convert_to_records(entry: Entry) -> Vec<Record> {
},
},
data: entry.data,
}],
})?]),
Entry::MultiplePart(entry) => {
let mut entries = Vec::with_capacity(entry.parts.len());
@@ -210,7 +143,7 @@ fn convert_to_records(entry: Entry) -> Vec<Record> {
MultiplePartHeader::Middle(i) => RecordType::Middle(i),
MultiplePartHeader::Last => RecordType::Last,
};
entries.push(Record {
entries.push(KafkaRecord::try_from(Record {
meta: RecordMeta {
version: VERSION,
tp,
@@ -222,9 +155,9 @@ fn convert_to_records(entry: Entry) -> Vec<Record> {
},
},
data: part,
})
})?)
}
entries
Ok(entries)
}
}
}
@@ -511,4 +444,20 @@ mod tests {
let err = maybe_emit_entry(&provider, record, &mut buffer).unwrap_err();
assert_matches!(err, error::Error::IllegalSequence { .. });
}
#[test]
fn test_meta_size() {
let meta = RecordMeta {
version: VERSION,
tp: RecordType::Middle(usize::MAX),
entry_id: u64::MAX,
ns: NamespaceImpl {
region_id: RegionId::new(u32::MAX, u32::MAX).as_u64(),
topic: format!("greptime_kafka_cluster/1024/2048/{}", uuid::Uuid::new_v4()),
},
};
let serialized = serde_json::to_vec(&meta).unwrap();
// The len of serialized data is 202.
assert!(serialized.len() < ESTIMATED_META_SIZE);
}
}

View File

@@ -12,6 +12,4 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#[cfg(test)]
pub mod kafka;
pub mod log_store_util;

View File

@@ -1,125 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::atomic::{AtomicU64 as AtomicEntryId, Ordering};
use std::sync::Mutex;
use rand::distributions::Alphanumeric;
use rand::rngs::ThreadRng;
use rand::{thread_rng, Rng};
use rskafka::client::ClientBuilder;
use store_api::logstore::EntryId;
use crate::kafka::{EntryImpl, NamespaceImpl};
/// Creates `num_topiocs` number of topics each will be decorated by the given decorator.
pub async fn create_topics<F>(
num_topics: usize,
decorator: F,
broker_endpoints: &[String],
) -> Vec<String>
where
F: Fn(usize) -> String,
{
assert!(!broker_endpoints.is_empty());
let client = ClientBuilder::new(broker_endpoints.to_vec())
.build()
.await
.unwrap();
let ctrl_client = client.controller_client().unwrap();
let (topics, tasks): (Vec<_>, Vec<_>) = (0..num_topics)
.map(|i| {
let topic = decorator(i);
let task = ctrl_client.create_topic(topic.clone(), 1, 1, 500);
(topic, task)
})
.unzip();
futures::future::try_join_all(tasks).await.unwrap();
topics
}
/// Creates a new Kafka namespace with the given topic and region id.
pub fn new_namespace(topic: &str, region_id: u64) -> NamespaceImpl {
NamespaceImpl {
topic: topic.to_string(),
region_id,
}
}
/// A builder for building entries for a namespace.
pub struct EntryBuilder {
/// The namespace of the entries.
ns: NamespaceImpl,
/// The next entry id to allocate. It starts from 0 by default.
next_entry_id: AtomicEntryId,
/// A generator for supporting random data generation.
/// Wrapped with Mutex<Option<_>> to provide interior mutability.
rng: Mutex<Option<ThreadRng>>,
}
impl EntryBuilder {
/// Creates an EntryBuilder for the given namespace.
pub fn new(ns: NamespaceImpl) -> Self {
Self {
ns,
next_entry_id: AtomicEntryId::new(0),
rng: Mutex::new(Some(thread_rng())),
}
}
/// Sets the next entry id to the given entry id.
pub fn next_entry_id(self, entry_id: EntryId) -> Self {
Self {
next_entry_id: AtomicEntryId::new(entry_id),
..self
}
}
/// Skips the next `step` entry ids and returns the next entry id after the stepping.
pub fn skip(&mut self, step: EntryId) -> EntryId {
let old = self.next_entry_id.fetch_add(step, Ordering::Relaxed);
old + step
}
/// Builds an entry with the given data.
pub fn with_data<D: AsRef<[u8]>>(&self, data: D) -> EntryImpl {
EntryImpl {
data: data.as_ref().to_vec(),
id: self.alloc_entry_id(),
ns: self.ns.clone(),
}
}
/// Builds an entry with random data.
pub fn with_random_data(&self) -> EntryImpl {
self.with_data(self.make_random_data())
}
fn alloc_entry_id(&self) -> EntryId {
self.next_entry_id.fetch_add(1, Ordering::Relaxed)
}
fn make_random_data(&self) -> Vec<u8> {
let mut guard = self.rng.lock().unwrap();
let rng = guard.as_mut().unwrap();
(0..42).map(|_| rng.sample(Alphanumeric)).collect()
}
}
/// Builds a batch of entries each with random data.
pub fn entries_with_random_data(batch_size: usize, builder: &EntryBuilder) -> Vec<EntryImpl> {
(0..batch_size)
.map(|_| builder.with_random_data())
.collect()
}

View File

@@ -13,7 +13,6 @@
// limitations under the License.
use std::path::Path;
use std::time::Duration;
use common_base::readable_size::ReadableSize;
use common_wal::config::kafka::DatanodeKafkaConfig;
@@ -36,7 +35,6 @@ pub async fn create_tmp_local_file_log_store<P: AsRef<Path>>(path: P) -> RaftEng
pub async fn create_kafka_log_store(broker_endpoints: Vec<String>) -> KafkaLogStore {
KafkaLogStore::try_new(&DatanodeKafkaConfig {
broker_endpoints,
linger: Duration::from_millis(1),
..Default::default()
})
.await

View File

@@ -18,7 +18,7 @@ use std::sync::Arc;
use crate::storage::RegionId;
// The Provider of kafka log store
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct KafkaProvider {
pub topic: String,
}