feat(remote_wal): split an entry if it's too large (#3092)

* feat: split an entry if it's too large

* chore: rewrite check records

* test: add some unit tests for record

* chore: rewrite entry splitting

* chore: add unit tests for build records

* chore: add more unit tests for record

* chore: rewrite encdec of record

* revert: ignored test

* fix: set limit for max_batch_size

* fix: clippy

* chore: remove heavy logging

* fix: CR

* fix: properly terminate

* fix: CR

* fix: compiling

* fix: sqlness

* fix: CR

* fix: license

* fix: license
This commit is contained in:
niebayes
2024-01-05 20:41:43 +08:00
committed by GitHub
parent bd1a5dc265
commit 78303639db
17 changed files with 820 additions and 282 deletions

2
Cargo.lock generated
View File

@@ -4498,6 +4498,7 @@ dependencies = [
"async-trait",
"byteorder",
"bytes",
"chrono",
"common-base",
"common-config",
"common-error",
@@ -4509,6 +4510,7 @@ dependencies = [
"dashmap",
"futures",
"futures-util",
"itertools 0.10.5",
"protobuf",
"protobuf-build",
"raft-engine",

View File

@@ -51,7 +51,8 @@ sync_write = false
# Kafka wal options, see `standalone.example.toml`.
# broker_endpoints = ["127.0.0.1:9092"]
# max_batch_size = "4MB"
# Warning: Kafka has a default limit of 1MB per message in a topic.
# max_batch_size = "1MB"
# linger = "200ms"
# consumer_wait_timeout = "100ms"
# backoff_init = "500ms"

View File

@@ -108,7 +108,8 @@ provider = "raft_engine"
# replication_factor = 1
# The max size of a single producer batch.
# max_batch_size = "4MB"
# Warning: Kafka has a default limit of 1MB per message in a topic.
# max_batch_size = "1MB"
# The linger duration.
# linger = "200ms"
# The consumer wait timeout.

View File

@@ -90,9 +90,10 @@ mod tests {
#[test]
fn test_serde_kafka_config() {
// With all fields.
let toml_str = r#"
broker_endpoints = ["127.0.0.1:9092"]
max_batch_size = "4MB"
max_batch_size = "1MB"
linger = "200ms"
consumer_wait_timeout = "100ms"
backoff_init = "500ms"
@@ -104,7 +105,7 @@ mod tests {
let expected = KafkaConfig {
broker_endpoints: vec!["127.0.0.1:9092".to_string()],
compression: RsKafkaCompression::default(),
max_batch_size: ReadableSize::mb(4),
max_batch_size: ReadableSize::mb(1),
linger: Duration::from_millis(200),
consumer_wait_timeout: Duration::from_millis(100),
backoff: KafkaBackoffConfig {
@@ -115,6 +116,19 @@ mod tests {
},
};
assert_eq!(decoded, expected);
// With some fields missing.
let toml_str = r#"
broker_endpoints = ["127.0.0.1:9092"]
linger = "200ms"
"#;
let decoded: KafkaConfig = toml::from_str(toml_str).unwrap();
let expected = KafkaConfig {
broker_endpoints: vec!["127.0.0.1:9092".to_string()],
linger: Duration::from_millis(200),
..Default::default()
};
assert_eq!(decoded, expected);
}
#[test]

View File

@@ -60,7 +60,8 @@ impl Default for KafkaConfig {
Self {
broker_endpoints: vec!["127.0.0.1:9092".to_string()],
compression: RsKafkaCompression::NoCompression,
max_batch_size: ReadableSize::mb(4),
// Warning: Kafka has a default limit of 1MB per message in a topic.
max_batch_size: ReadableSize::mb(1),
linger: Duration::from_millis(200),
consumer_wait_timeout: Duration::from_millis(100),
backoff: KafkaBackoffConfig::default(),

View File

@@ -14,6 +14,7 @@ async-stream.workspace = true
async-trait.workspace = true
byteorder = "1.4"
bytes.workspace = true
chrono.workspace = true
common-base.workspace = true
common-config.workspace = true
common-error.workspace = true
@@ -24,6 +25,7 @@ common-telemetry.workspace = true
dashmap.workspace = true
futures-util.workspace = true
futures.workspace = true
itertools.workspace = true
protobuf = { version = "2", features = ["bytes"] }
raft-engine.workspace = true
rskafka.workspace = true

View File

@@ -18,6 +18,7 @@ use common_config::wal::KafkaWalTopic;
use common_error::ext::ErrorExt;
use common_macro::stack_trace_debug;
use common_runtime::error::Error as RuntimeError;
use serde_json::error::Error as JsonError;
use snafu::{Location, Snafu};
use crate::kafka::NamespaceImpl as KafkaNamespace;
@@ -123,20 +124,6 @@ pub enum Error {
error: String,
},
#[snafu(display("Failed to encode a record meta"))]
EncodeMeta {
location: Location,
#[snafu(source)]
error: serde_json::Error,
},
#[snafu(display("Failed to decode a record meta"))]
DecodeMeta {
location: Location,
#[snafu(source)]
error: serde_json::Error,
},
#[snafu(display("Missing required key in a record"))]
MissingKey { location: Location },
@@ -146,9 +133,16 @@ pub enum Error {
#[snafu(display("Cannot build a record from empty entries"))]
EmptyEntries { location: Location },
#[snafu(display("Failed to produce records to Kafka, topic: {}", topic))]
#[snafu(display(
"Failed to produce records to Kafka, topic: {}, size: {}, limit: {}",
topic,
size,
limit,
))]
ProduceRecord {
topic: KafkaWalTopic,
size: usize,
limit: usize,
location: Location,
#[snafu(source)]
error: rskafka::client::producer::Error,
@@ -172,6 +166,23 @@ pub enum Error {
#[snafu(display("Failed to do a cast"))]
Cast { location: Location },
#[snafu(display("Failed to encode object into json"))]
EncodeJson {
location: Location,
#[snafu(source)]
error: JsonError,
},
#[snafu(display("Failed to decode object from json"))]
DecodeJson {
location: Location,
#[snafu(source)]
error: JsonError,
},
#[snafu(display("The record sequence is not legal, error: {}", error))]
IllegalSequence { location: Location, error: String },
}
impl ErrorExt for Error {

View File

@@ -12,10 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod client_manager;
pub(crate) mod client_manager;
pub mod log_store;
mod offset;
mod record_utils;
pub(crate) mod util;
use std::fmt::Display;
@@ -29,8 +28,8 @@ use crate::error::Error;
/// Kafka Namespace implementation.
#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
pub struct NamespaceImpl {
region_id: u64,
topic: Topic,
pub region_id: u64,
pub topic: Topic,
}
impl Namespace for NamespaceImpl {

View File

@@ -62,7 +62,7 @@ impl Client {
/// Manages client construction and accesses.
#[derive(Debug)]
pub(crate) struct ClientManager {
config: KafkaConfig,
pub(crate) config: KafkaConfig,
/// Top-level client in kafka. All clients are constructed by this client.
client_factory: RsKafkaClient,
/// A pool maintaining a collection of clients.

View File

@@ -26,10 +26,10 @@ use store_api::logstore::entry_stream::SendableEntryStream;
use store_api::logstore::namespace::Id as NamespaceId;
use store_api::logstore::{AppendBatchResponse, AppendResponse, LogStore};
use crate::error::{ConsumeRecordSnafu, Error, GetOffsetSnafu, Result};
use crate::error::{ConsumeRecordSnafu, Error, GetOffsetSnafu, IllegalSequenceSnafu, Result};
use crate::kafka::client_manager::{ClientManager, ClientManagerRef};
use crate::kafka::offset::Offset;
use crate::kafka::record_utils::{decode_from_record, RecordProducer};
use crate::kafka::util::offset::Offset;
use crate::kafka::util::record::{maybe_emit_entry, Record, RecordProducer};
use crate::kafka::{EntryImpl, NamespaceImpl};
/// A log store backed by Kafka.
@@ -85,8 +85,6 @@ impl LogStore for KafkaLogStore {
/// Appends a batch of entries and returns a response containing a map where the key is a region id
/// while the value is the id of the last successfully written entry of the region.
async fn append_batch(&self, entries: Vec<Self::Entry>) -> Result<AppendBatchResponse> {
debug!("LogStore handles append_batch with entries {:?}", entries);
if entries.is_empty() {
return Ok(AppendBatchResponse::default());
}
@@ -96,7 +94,7 @@ impl LogStore for KafkaLogStore {
for entry in entries {
producers
.entry(entry.ns.region_id)
.or_insert(RecordProducer::new(entry.ns.clone()))
.or_insert_with(|| RecordProducer::new(entry.ns.clone()))
.push(entry);
}
@@ -115,8 +113,6 @@ impl LogStore for KafkaLogStore {
.into_iter()
.collect::<HashMap<_, _>>();
debug!("Append batch result: {:?}", last_entry_ids);
Ok(AppendBatchResponse { last_entry_ids })
}
@@ -127,13 +123,10 @@ impl LogStore for KafkaLogStore {
ns: &Self::Namespace,
entry_id: EntryId,
) -> Result<SendableEntryStream<Self::Entry, Self::Error>> {
let topic = ns.topic.clone();
let region_id = ns.region_id;
// Gets the client associated with the topic.
let client = self
.client_manager
.get_or_insert(&topic)
.get_or_insert(&ns.topic)
.await?
.raw_client
.clone();
@@ -148,13 +141,13 @@ impl LogStore for KafkaLogStore {
.context(GetOffsetSnafu { ns: ns.clone() })?
- 1;
// Reads entries with offsets in the range [start_offset, end_offset).
let start_offset = Offset::try_from(entry_id)?.0;
let start_offset: i64 = Offset::try_from(entry_id)?.0;
// Abort if there're no new entries.
// FIXME(niebayes): how come this case happens?
if start_offset > end_offset {
warn!(
"No new entries for ns {} in range [{}, {})",
"No new entries for ns {} in range [{}, {}]",
ns, start_offset, end_offset
);
return Ok(futures_util::stream::empty().boxed());
@@ -166,44 +159,52 @@ impl LogStore for KafkaLogStore {
.build();
debug!(
"Built a stream consumer for ns {} to consume entries in range [{}, {})",
"Built a stream consumer for ns {} to consume entries in range [{}, {}]",
ns, start_offset, end_offset
);
// Key: entry id, Value: the records associated with the entry.
let mut entry_records: HashMap<_, Vec<_>> = HashMap::new();
let ns_clone = ns.clone();
let stream = async_stream::stream!({
while let Some(consume_result) = stream_consumer.next().await {
// Each next will prdoce a `RecordAndOffset` and a high watermark offset.
// Each next on the stream consumer produces a `RecordAndOffset` and a high watermark offset.
// The `RecordAndOffset` contains the record data and its start offset.
// The high watermark offset is the end offset of the latest record in the partition.
let (record, high_watermark) = consume_result.context(ConsumeRecordSnafu {
ns: ns_clone.clone(),
})?;
let record_offset = record.offset;
// The high watermark offset is the offset of the last record plus one.
let (record_and_offset, high_watermark) =
consume_result.with_context(|_| ConsumeRecordSnafu {
ns: ns_clone.clone(),
})?;
let (kafka_record, offset) = (record_and_offset.record, record_and_offset.offset);
debug!(
"Read a record at offset {} for ns {}, high watermark: {}",
record_offset, ns_clone, high_watermark
offset, ns_clone, high_watermark
);
// Ignores noop records.
if record.record.value.is_none() {
// Ignores no-op records.
if kafka_record.value.is_none() {
if check_termination(offset, end_offset, &entry_records)? {
break;
}
continue;
}
let entries = decode_from_record(record.record)?;
// Filters entries by region id.
if let Some(entry) = entries.first()
&& entry.ns.region_id == region_id
{
yield Ok(entries);
// Filters records by namespace.
let record = Record::try_from(kafka_record)?;
if record.meta.ns != ns_clone {
if check_termination(offset, end_offset, &entry_records)? {
break;
}
continue;
}
// Terminates the stream if the entry with the end offset was read.
if record_offset >= end_offset {
debug!(
"Stream consumer for ns {} terminates at offset {}",
ns_clone, record_offset
);
// Tries to construct an entry from records consumed so far.
if let Some(entry) = maybe_emit_entry(record, &mut entry_records)? {
yield Ok(vec![entry]);
}
if check_termination(offset, end_offset, &entry_records)? {
break;
}
}
@@ -252,3 +253,24 @@ impl LogStore for KafkaLogStore {
Ok(())
}
}
fn check_termination(
offset: i64,
end_offset: i64,
entry_records: &HashMap<EntryId, Vec<Record>>,
) -> Result<bool> {
// Terminates the stream if the entry with the end offset was read.
if offset >= end_offset {
debug!("Stream consumer terminates at offset {}", offset);
// There must have no records when the stream terminates.
if !entry_records.is_empty() {
return IllegalSequenceSnafu {
error: "Found records leftover",
}
.fail();
}
Ok(true)
} else {
Ok(false)
}
}

View File

@@ -1,188 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use rskafka::record::Record;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use crate::error::{
DecodeMetaSnafu, EmptyEntriesSnafu, EncodeMetaSnafu, GetClientSnafu, MissingKeySnafu,
MissingValueSnafu, ProduceRecordSnafu, Result,
};
use crate::kafka::client_manager::ClientManagerRef;
use crate::kafka::offset::Offset;
use crate::kafka::{EntryId, EntryImpl, NamespaceImpl};
/// Record metadata which will be serialized/deserialized to/from the `key` of a Record.
#[derive(Debug, Serialize, Deserialize, PartialEq)]
struct RecordMeta {
/// Meta version. Used for backward compatibility.
version: u32,
/// The namespace of the entries wrapped in the record.
ns: NamespaceImpl,
/// Ids of the entries built into the record.
entry_ids: Vec<EntryId>,
/// entry_offsets[i] is the end offset (exclusive) of the data of the i-th entry in the record value.
entry_offsets: Vec<usize>,
}
impl RecordMeta {
fn new(ns: NamespaceImpl, entries: &[EntryImpl]) -> Self {
Self {
version: 0,
ns,
entry_ids: entries.iter().map(|entry| entry.id).collect(),
entry_offsets: entries
.iter()
.map(|entry| entry.data.len())
.scan(0, |presum, x| {
*presum += x;
Some(*presum)
})
.collect(),
}
}
}
/// Produces a record to a kafka topic.
pub(crate) struct RecordProducer {
/// The namespace of the entries.
ns: NamespaceImpl,
/// Entries are buffered before being built into a record.
entries: Vec<EntryImpl>,
}
impl RecordProducer {
/// Creates a new producer for producing entries with the given namespace.
pub(crate) fn new(ns: NamespaceImpl) -> Self {
Self {
ns,
entries: Vec::new(),
}
}
/// Populates the entry buffer with the given entries.
pub(crate) fn with_entries(self, entries: Vec<EntryImpl>) -> Self {
Self { entries, ..self }
}
/// Pushes an entry into the entry buffer.
pub(crate) fn push(&mut self, entry: EntryImpl) {
self.entries.push(entry);
}
/// Produces the buffered entries to kafka sever as a kafka record.
/// Returns the kafka offset of the produced record.
// TODO(niebayes): since the total size of a region's entries may be way-too large,
// the producer may need to support splitting entries into multiple records.
pub(crate) async fn produce(self, client_manager: &ClientManagerRef) -> Result<Offset> {
ensure!(!self.entries.is_empty(), EmptyEntriesSnafu);
// Produces the record through a client. The client determines when to send the record to kafka server.
let client = client_manager
.get_or_insert(&self.ns.topic)
.await
.map_err(|e| {
GetClientSnafu {
topic: &self.ns.topic,
error: e.to_string(),
}
.build()
})?;
client
.producer
.produce(encode_to_record(self.ns.clone(), self.entries)?)
.await
.map(Offset)
.context(ProduceRecordSnafu {
topic: &self.ns.topic,
})
}
}
fn encode_to_record(ns: NamespaceImpl, entries: Vec<EntryImpl>) -> Result<Record> {
let meta = RecordMeta::new(ns, &entries);
let data = entries.into_iter().flat_map(|entry| entry.data).collect();
Ok(Record {
key: Some(serde_json::to_vec(&meta).context(EncodeMetaSnafu)?),
value: Some(data),
timestamp: rskafka::chrono::Utc::now(),
headers: Default::default(),
})
}
pub(crate) fn decode_from_record(record: Record) -> Result<Vec<EntryImpl>> {
let key = record.key.context(MissingKeySnafu)?;
let value = record.value.context(MissingValueSnafu)?;
let meta: RecordMeta = serde_json::from_slice(&key).context(DecodeMetaSnafu)?;
let mut entries = Vec::with_capacity(meta.entry_ids.len());
let mut start_offset = 0;
for (i, end_offset) in meta.entry_offsets.iter().enumerate() {
entries.push(EntryImpl {
// TODO(niebayes): try to avoid the clone.
data: value[start_offset..*end_offset].to_vec(),
id: meta.entry_ids[i],
ns: meta.ns.clone(),
});
start_offset = *end_offset;
}
Ok(entries)
}
#[cfg(test)]
mod tests {
use super::*;
fn new_test_entry<D: AsRef<[u8]>>(data: D, entry_id: EntryId, ns: NamespaceImpl) -> EntryImpl {
EntryImpl {
data: data.as_ref().to_vec(),
id: entry_id,
ns,
}
}
#[test]
fn test_serde_record_meta() {
let ns = NamespaceImpl {
region_id: 1,
topic: "test_topic".to_string(),
};
let entries = vec![
new_test_entry(b"111", 1, ns.clone()),
new_test_entry(b"2222", 2, ns.clone()),
new_test_entry(b"33333", 3, ns.clone()),
];
let meta = RecordMeta::new(ns, &entries);
let encoded = serde_json::to_vec(&meta).unwrap();
let decoded: RecordMeta = serde_json::from_slice(&encoded).unwrap();
assert_eq!(meta, decoded);
}
#[test]
fn test_encdec_record() {
let ns = NamespaceImpl {
region_id: 1,
topic: "test_topic".to_string(),
};
let entries = vec![
new_test_entry(b"111", 1, ns.clone()),
new_test_entry(b"2222", 2, ns.clone()),
new_test_entry(b"33333", 3, ns.clone()),
];
let record = encode_to_record(ns, entries.clone()).unwrap();
let decoded_entries = decode_from_record(record).unwrap();
assert_eq!(entries, decoded_entries);
}
}

View File

@@ -0,0 +1,16 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod offset;
pub mod record;

View File

@@ -0,0 +1,564 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use rskafka::record::Record as KafkaRecord;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use crate::error::{
DecodeJsonSnafu, EmptyEntriesSnafu, EncodeJsonSnafu, GetClientSnafu, IllegalSequenceSnafu,
MissingKeySnafu, MissingValueSnafu, ProduceRecordSnafu, Result,
};
use crate::kafka::client_manager::ClientManagerRef;
use crate::kafka::util::offset::Offset;
use crate::kafka::{EntryId, EntryImpl, NamespaceImpl};
/// The current version of Record.
pub(crate) const VERSION: u32 = 0;
/// The estimated size in bytes of a serialized RecordMeta.
/// A record is guaranteed to have sizeof(meta) + sizeof(data) <= max_batch_size - ESTIMATED_META_SIZE.
const ESTIMATED_META_SIZE: usize = 256;
/// The type of a record.
///
/// - If the entry is able to fit into a Kafka record, it's converted into a Full record.
///
/// - If the entry is too large to fit into a Kafka record, it's converted into a collection of records.
/// Those records must contain exactly one First record and one Last record, and potentially several
/// Middle records. There may be no Middle record.
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
pub enum RecordType {
/// The record is self-contained, i.e. an entry's data is fully stored into this record.
Full,
/// The record contains the first part of an entry's data.
First,
/// The record contains one of the middle parts of an entry's data.
/// The sequence of the record is identified by the inner field.
Middle(usize),
/// The record contains the last part of an entry's data.
Last,
}
/// The metadata of a record.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct RecordMeta {
/// The version of the record. Used for backward compatibility.
version: u32,
/// The type of the record.
pub tp: RecordType,
/// The id of the entry the record associated with.
pub entry_id: EntryId,
/// The namespace of the entry the record associated with.
pub ns: NamespaceImpl,
}
/// The minimal storage unit in the Kafka log store.
///
/// An entry will be first converted into several Records before producing.
/// If an entry is able to fit into a KafkaRecord, it converts to a single Record.
/// If otherwise an entry cannot fit into a KafkaRecord, it will be split into a collection of Records.
///
/// A KafkaRecord is the minimal storage unit used by Kafka client and Kafka server.
/// The Kafka client produces KafkaRecords and consumes KafkaRecords, and Kafka server stores
/// a collection of KafkaRecords.
#[derive(Debug, Clone, PartialEq)]
pub(crate) struct Record {
/// The metadata of the record.
pub(crate) meta: RecordMeta,
/// The payload of the record.
data: Vec<u8>,
}
impl TryFrom<Record> for KafkaRecord {
type Error = crate::error::Error;
fn try_from(record: Record) -> Result<Self> {
let key = serde_json::to_vec(&record.meta).context(EncodeJsonSnafu)?;
Ok(KafkaRecord {
key: Some(key),
value: Some(record.data),
timestamp: chrono::Utc::now(),
headers: Default::default(),
})
}
}
impl TryFrom<KafkaRecord> for Record {
type Error = crate::error::Error;
fn try_from(kafka_record: KafkaRecord) -> Result<Self> {
let key = kafka_record.key.context(MissingKeySnafu)?;
let meta = serde_json::from_slice(&key).context(DecodeJsonSnafu)?;
let data = kafka_record.value.context(MissingValueSnafu)?;
Ok(Self { meta, data })
}
}
impl From<Vec<Record>> for EntryImpl {
fn from(records: Vec<Record>) -> Self {
let entry_id = records[0].meta.entry_id;
let ns = records[0].meta.ns.clone();
let data = records.into_iter().flat_map(|record| record.data).collect();
EntryImpl {
data,
id: entry_id,
ns,
}
}
}
/// Produces a record to a kafka topic.
pub(crate) struct RecordProducer {
/// The namespace of the entries.
ns: NamespaceImpl,
/// Entries are buffered before being built into a record.
entries: Vec<EntryImpl>,
}
impl RecordProducer {
/// Creates a new producer for producing entries with the given namespace.
pub(crate) fn new(ns: NamespaceImpl) -> Self {
Self {
ns,
entries: Vec::new(),
}
}
/// Populates the entry buffer with the given entries.
pub(crate) fn with_entries(self, entries: Vec<EntryImpl>) -> Self {
Self { entries, ..self }
}
/// Pushes an entry into the entry buffer.
pub(crate) fn push(&mut self, entry: EntryImpl) {
self.entries.push(entry);
}
/// Produces the buffered entries to Kafka sever. Those entries may span several Kafka records.
pub(crate) async fn produce(self, client_manager: &ClientManagerRef) -> Result<Offset> {
ensure!(!self.entries.is_empty(), EmptyEntriesSnafu);
// Gets the producer in which a record buffer is maintained.
let producer = client_manager
.get_or_insert(&self.ns.topic)
.await
.map_err(|e| {
GetClientSnafu {
topic: &self.ns.topic,
error: e.to_string(),
}
.build()
})?
.producer;
// Stores the offset of the last successfully produced record.
let mut last_offset = None;
let max_record_size =
client_manager.config.max_batch_size.as_bytes() as usize - ESTIMATED_META_SIZE;
for entry in self.entries {
for record in build_records(entry, max_record_size) {
let kafka_record = KafkaRecord::try_from(record)?;
// Records of a certain region cannot be produced in parallel since their order must be static.
let offset = producer
.produce(kafka_record.clone())
.await
.map(Offset)
.with_context(|_| ProduceRecordSnafu {
topic: &self.ns.topic,
size: kafka_record.approximate_size(),
limit: max_record_size,
})?;
last_offset = Some(offset);
}
}
// Safety: there must be at least one record produced when the entries are guaranteed not empty.
Ok(last_offset.unwrap())
}
}
fn record_type(seq: usize, num_records: usize) -> RecordType {
if seq == 0 {
RecordType::First
} else if seq == num_records - 1 {
RecordType::Last
} else {
RecordType::Middle(seq)
}
}
fn build_records(entry: EntryImpl, max_record_size: usize) -> Vec<Record> {
if entry.data.len() <= max_record_size {
let record = Record {
meta: RecordMeta {
version: VERSION,
tp: RecordType::Full,
entry_id: entry.id,
ns: entry.ns,
},
data: entry.data,
};
return vec![record];
}
let chunks = entry.data.chunks(max_record_size);
let num_chunks = chunks.len();
chunks
.enumerate()
.map(|(i, chunk)| Record {
meta: RecordMeta {
version: VERSION,
tp: record_type(i, num_chunks),
entry_id: entry.id,
ns: entry.ns.clone(),
},
data: chunk.to_vec(),
})
.collect()
}
pub fn maybe_emit_entry(
record: Record,
entry_records: &mut HashMap<EntryId, Vec<Record>>,
) -> Result<Option<EntryImpl>> {
let mut entry = None;
match record.meta.tp {
RecordType::Full => {
entry = Some(EntryImpl::from(vec![record]));
}
RecordType::First => {
ensure!(
!entry_records.contains_key(&record.meta.entry_id),
IllegalSequenceSnafu {
error: "First record must be the first"
}
);
entry_records.insert(record.meta.entry_id, vec![record]);
}
RecordType::Middle(seq) => {
let prefix =
entry_records
.get_mut(&record.meta.entry_id)
.context(IllegalSequenceSnafu {
error: "Middle record must not be the first",
})?;
// Safety: the records are guaranteed not empty if the key exists.
let last_record = prefix.last().unwrap();
let legal = match last_record.meta.tp {
// Legal if this record follows a First record.
RecordType::First => seq == 1,
// Legal if this record follows a Middle record just prior to this record.
RecordType::Middle(last_seq) => last_seq + 1 == seq,
// Illegal sequence.
_ => false,
};
ensure!(
legal,
IllegalSequenceSnafu {
error: "Illegal prefix for a Middle record"
}
);
prefix.push(record);
}
RecordType::Last => {
// There must have a sequence prefix before a Last record is read.
let mut records =
entry_records
.remove(&record.meta.entry_id)
.context(IllegalSequenceSnafu {
error: "Missing prefix for a Last record",
})?;
records.push(record);
entry = Some(EntryImpl::from(records));
}
}
Ok(entry)
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_base::readable_size::ReadableSize;
use common_config::wal::KafkaConfig;
use rand::Rng;
use super::*;
use crate::kafka::client_manager::ClientManager;
// Implements some utility methods for testing.
impl Default for Record {
fn default() -> Self {
Self {
meta: RecordMeta {
version: VERSION,
tp: RecordType::Full,
ns: NamespaceImpl {
region_id: 0,
topic: "greptimedb_wal_topic".to_string(),
},
entry_id: 0,
},
data: Vec::new(),
}
}
}
impl Record {
/// Overrides tp.
fn with_tp(&self, tp: RecordType) -> Self {
Self {
meta: RecordMeta {
tp,
..self.meta.clone()
},
..self.clone()
}
}
/// Overrides data with the given data.
fn with_data(&self, data: &[u8]) -> Self {
Self {
data: data.to_vec(),
..self.clone()
}
}
/// Overrides entry id.
fn with_entry_id(&self, entry_id: EntryId) -> Self {
Self {
meta: RecordMeta {
entry_id,
..self.meta.clone()
},
..self.clone()
}
}
/// Overrides namespace.
fn with_ns(&self, ns: NamespaceImpl) -> Self {
Self {
meta: RecordMeta { ns, ..self.meta },
..self.clone()
}
}
}
fn new_test_entry<D: AsRef<[u8]>>(data: D, entry_id: EntryId, ns: NamespaceImpl) -> EntryImpl {
EntryImpl {
data: data.as_ref().to_vec(),
id: entry_id,
ns,
}
}
/// Tests that the `build_records` works as expected.
#[test]
fn test_build_records() {
let max_record_size = 128;
// On a small entry.
let ns = NamespaceImpl {
region_id: 1,
topic: "greptimedb_wal_topic".to_string(),
};
let entry = new_test_entry([b'1'; 100], 0, ns.clone());
let records = build_records(entry.clone(), max_record_size);
assert!(records.len() == 1);
assert_eq!(entry.data, records[0].data);
// On a large entry.
let entry = new_test_entry([b'1'; 150], 0, ns.clone());
let records = build_records(entry.clone(), max_record_size);
assert!(records.len() == 2);
assert_eq!(&records[0].data, &[b'1'; 128]);
assert_eq!(&records[1].data, &[b'1'; 22]);
// On a way-too large entry.
let entry = new_test_entry([b'1'; 5000], 0, ns.clone());
let records = build_records(entry.clone(), max_record_size);
let matched = entry
.data
.chunks(max_record_size)
.enumerate()
.all(|(i, chunk)| records[i].data == chunk);
assert!(matched);
}
/// Tests that Record and KafkaRecord are able to be converted back and forth.
#[test]
fn test_record_conversion() {
let record = Record {
meta: RecordMeta {
version: VERSION,
tp: RecordType::Full,
entry_id: 1,
ns: NamespaceImpl {
region_id: 1,
topic: "greptimedb_wal_topic".to_string(),
},
},
data: b"12345".to_vec(),
};
let kafka_record: KafkaRecord = record.clone().try_into().unwrap();
let got = Record::try_from(kafka_record).unwrap();
assert_eq!(record, got);
}
/// Tests that the reconstruction of an entry works as expected.
#[test]
fn test_reconstruct_entry() {
let template = Record::default();
let records = vec![
template.with_data(b"111").with_tp(RecordType::First),
template.with_data(b"222").with_tp(RecordType::Middle(1)),
template.with_data(b"333").with_tp(RecordType::Last),
];
let entry = EntryImpl::from(records.clone());
assert_eq!(records[0].meta.entry_id, entry.id);
assert_eq!(records[0].meta.ns, entry.ns);
assert_eq!(
entry.data,
records
.into_iter()
.flat_map(|record| record.data)
.collect::<Vec<_>>()
);
}
/// Tests that `maybe_emit_entry` works as expected.
/// This test does not check for illegal record sequences since they're already tested in the `test_check_records` test.
#[test]
fn test_maybe_emit_entry() {
let ns = NamespaceImpl {
region_id: 1,
topic: "greptimedb_wal_topic".to_string(),
};
let template = Record::default().with_ns(ns);
let mut entry_records = HashMap::from([
(
1,
vec![template.with_entry_id(1).with_tp(RecordType::First)],
),
(
2,
vec![template.with_entry_id(2).with_tp(RecordType::First)],
),
(
3,
vec![
template.with_entry_id(3).with_tp(RecordType::First),
template.with_entry_id(3).with_tp(RecordType::Middle(1)),
],
),
]);
// A Full record arrives.
let got = maybe_emit_entry(
template.with_entry_id(0).with_tp(RecordType::Full),
&mut entry_records,
)
.unwrap();
assert!(got.is_some());
// A First record arrives with no prefix.
let got = maybe_emit_entry(
template.with_entry_id(0).with_tp(RecordType::First),
&mut entry_records,
)
.unwrap();
assert!(got.is_none());
// A First record arrives with some prefix.
let got = maybe_emit_entry(
template.with_entry_id(1).with_tp(RecordType::First),
&mut entry_records,
);
assert!(got.is_err());
// A Middle record arrives with legal prefix (First).
let got = maybe_emit_entry(
template.with_entry_id(2).with_tp(RecordType::Middle(1)),
&mut entry_records,
)
.unwrap();
assert!(got.is_none());
// A Middle record arrives with legal prefix (Middle).
let got = maybe_emit_entry(
template.with_entry_id(2).with_tp(RecordType::Middle(2)),
&mut entry_records,
)
.unwrap();
assert!(got.is_none());
// A Middle record arrives with illegal prefix.
let got = maybe_emit_entry(
template.with_entry_id(2).with_tp(RecordType::Middle(1)),
&mut entry_records,
);
assert!(got.is_err());
// A Middle record arrives with no prefix.
let got = maybe_emit_entry(
template.with_entry_id(22).with_tp(RecordType::Middle(1)),
&mut entry_records,
);
assert!(got.is_err());
// A Last record arrives with no prefix.
let got = maybe_emit_entry(
template.with_entry_id(33).with_tp(RecordType::Last),
&mut entry_records,
);
assert!(got.is_err());
// A Last record arrives with legal prefix.
let got = maybe_emit_entry(
template.with_entry_id(3).with_tp(RecordType::Last),
&mut entry_records,
)
.unwrap();
assert!(got.is_some());
// Check state.
assert_eq!(entry_records.len(), 3);
assert_eq!(entry_records[&0].len(), 1);
assert_eq!(entry_records[&1].len(), 1);
assert_eq!(entry_records[&2].len(), 3);
}
#[tokio::test]
async fn test_produce_large_entry() {
let topic = format!("greptimedb_wal_topic_{}", rand::thread_rng().gen::<usize>());
let ns = NamespaceImpl {
region_id: 1,
topic,
};
let entry = new_test_entry([b'1'; 2000000], 0, ns.clone());
let producer = RecordProducer::new(ns.clone()).with_entries(vec![entry]);
// TODO(niebayes): get broker endpoints from env vars.
let config = KafkaConfig {
broker_endpoints: vec!["localhost:9092".to_string()],
max_batch_size: ReadableSize::mb(1),
..Default::default()
};
let manager = Arc::new(ClientManager::try_new(&config).await.unwrap());
producer.produce(&manager).await.unwrap();
}
}

View File

@@ -165,6 +165,7 @@ impl RegionWriteCtx {
&self.wal_entry,
&self.wal_options,
)?;
self.next_entry_id += 1;
Ok(())
}

View File

@@ -126,22 +126,111 @@ SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
| 128 | 128 | 10000 | 1280000 |
+----------+-------------------+-----------------------------------+-----------------------------------+
-- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 53 FROM bigtable;
-- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
-- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 57 FROM bigtable;
-- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
-- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 61 FROM bigtable;
-- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
-- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 63 FROM bigtable;
-- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
-- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 67 FROM bigtable;
-- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
-- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 71 FROM bigtable;
-- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
-- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 73 FROM bigtable;
-- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
-- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 79 FROM bigtable;
-- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
INSERT INTO bigtable SELECT a, to_unixtime(ts) * 53 FROM bigtable;
Affected Rows: 128
SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
+----------+-------------------+-----------------------------------+-----------------------------------+
| COUNT(*) | COUNT(bigtable.a) | MAX(character_length(bigtable.a)) | SUM(character_length(bigtable.a)) |
+----------+-------------------+-----------------------------------+-----------------------------------+
| 256 | 256 | 10000 | 2560000 |
+----------+-------------------+-----------------------------------+-----------------------------------+
INSERT INTO bigtable SELECT a, to_unixtime(ts) * 57 FROM bigtable;
Affected Rows: 256
SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
+----------+-------------------+-----------------------------------+-----------------------------------+
| COUNT(*) | COUNT(bigtable.a) | MAX(character_length(bigtable.a)) | SUM(character_length(bigtable.a)) |
+----------+-------------------+-----------------------------------+-----------------------------------+
| 512 | 512 | 10000 | 5120000 |
+----------+-------------------+-----------------------------------+-----------------------------------+
INSERT INTO bigtable SELECT a, to_unixtime(ts) * 61 FROM bigtable;
Affected Rows: 512
SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
+----------+-------------------+-----------------------------------+-----------------------------------+
| COUNT(*) | COUNT(bigtable.a) | MAX(character_length(bigtable.a)) | SUM(character_length(bigtable.a)) |
+----------+-------------------+-----------------------------------+-----------------------------------+
| 1024 | 1024 | 10000 | 10240000 |
+----------+-------------------+-----------------------------------+-----------------------------------+
INSERT INTO bigtable SELECT a, to_unixtime(ts) * 63 FROM bigtable;
Affected Rows: 1024
SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
+----------+-------------------+-----------------------------------+-----------------------------------+
| COUNT(*) | COUNT(bigtable.a) | MAX(character_length(bigtable.a)) | SUM(character_length(bigtable.a)) |
+----------+-------------------+-----------------------------------+-----------------------------------+
| 2048 | 2048 | 10000 | 20480000 |
+----------+-------------------+-----------------------------------+-----------------------------------+
INSERT INTO bigtable SELECT a, to_unixtime(ts) * 67 FROM bigtable;
Affected Rows: 2048
SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
+----------+-------------------+-----------------------------------+-----------------------------------+
| COUNT(*) | COUNT(bigtable.a) | MAX(character_length(bigtable.a)) | SUM(character_length(bigtable.a)) |
+----------+-------------------+-----------------------------------+-----------------------------------+
| 4096 | 4096 | 10000 | 40960000 |
+----------+-------------------+-----------------------------------+-----------------------------------+
INSERT INTO bigtable SELECT a, to_unixtime(ts) * 71 FROM bigtable;
Affected Rows: 4096
SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
+----------+-------------------+-----------------------------------+-----------------------------------+
| COUNT(*) | COUNT(bigtable.a) | MAX(character_length(bigtable.a)) | SUM(character_length(bigtable.a)) |
+----------+-------------------+-----------------------------------+-----------------------------------+
| 8192 | 8192 | 10000 | 81920000 |
+----------+-------------------+-----------------------------------+-----------------------------------+
-- SQLNESS ARG restart=true
SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
+----------+-------------------+-----------------------------------+-----------------------------------+
| COUNT(*) | COUNT(bigtable.a) | MAX(character_length(bigtable.a)) | SUM(character_length(bigtable.a)) |
+----------+-------------------+-----------------------------------+-----------------------------------+
| 8192 | 8192 | 10000 | 81920000 |
+----------+-------------------+-----------------------------------+-----------------------------------+
INSERT INTO bigtable SELECT a, to_unixtime(ts) * 73 FROM bigtable;
Affected Rows: 8192
SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
+----------+-------------------+-----------------------------------+-----------------------------------+
| COUNT(*) | COUNT(bigtable.a) | MAX(character_length(bigtable.a)) | SUM(character_length(bigtable.a)) |
+----------+-------------------+-----------------------------------+-----------------------------------+
| 16384 | 16384 | 10000 | 163840000 |
+----------+-------------------+-----------------------------------+-----------------------------------+
INSERT INTO bigtable SELECT a, to_unixtime(ts) * 79 FROM bigtable;
Affected Rows: 16384
SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
+----------+-------------------+-----------------------------------+-----------------------------------+
| COUNT(*) | COUNT(bigtable.a) | MAX(character_length(bigtable.a)) | SUM(character_length(bigtable.a)) |
+----------+-------------------+-----------------------------------+-----------------------------------+
| 32768 | 32768 | 10000 | 327680000 |
+----------+-------------------+-----------------------------------+-----------------------------------+
DROP TABLE test;
Affected Rows: 0

View File

@@ -51,38 +51,41 @@ INSERT INTO bigtable SELECT a, to_unixtime(ts) * 51 FROM bigtable;
SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
-- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 53 FROM bigtable;
INSERT INTO bigtable SELECT a, to_unixtime(ts) * 53 FROM bigtable;
-- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
-- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 57 FROM bigtable;
INSERT INTO bigtable SELECT a, to_unixtime(ts) * 57 FROM bigtable;
-- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
-- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 61 FROM bigtable;
INSERT INTO bigtable SELECT a, to_unixtime(ts) * 61 FROM bigtable;
-- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
-- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 63 FROM bigtable;
INSERT INTO bigtable SELECT a, to_unixtime(ts) * 63 FROM bigtable;
-- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
-- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 67 FROM bigtable;
INSERT INTO bigtable SELECT a, to_unixtime(ts) * 67 FROM bigtable;
-- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
-- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 71 FROM bigtable;
INSERT INTO bigtable SELECT a, to_unixtime(ts) * 71 FROM bigtable;
-- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
-- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 73 FROM bigtable;
-- SQLNESS ARG restart=true
SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
-- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
INSERT INTO bigtable SELECT a, to_unixtime(ts) * 73 FROM bigtable;
-- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 79 FROM bigtable;
SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
-- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
INSERT INTO bigtable SELECT a, to_unixtime(ts) * 79 FROM bigtable;
SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
DROP TABLE test;