feat(remote_wal): introduce kafka remote wal (#3001)

* feat: integrate remote wal to standalone

* fix: test

* chore: ready to debug kafka remote wal

* fix: test

* chore: add some logs for remote wal

* chore: add logs for topic manager

* fix: properly terminate stream consumer

* fix: properly handle TopicAlreadyExists error

* fix: parse config file error

* fix: properly handle last entry id

* chore: prepare for merge

* fix: test

* fix: typo

* fix: set replication_factor properly

* fix: CR

* test: tmp for test

* Revert "test: tmp for test"

This reverts commit 093a3e0038.

* fix: serde

* fix selector type deserialize
This commit is contained in:
niebayes
2023-12-26 20:35:24 +08:00
committed by GitHub
parent 8ce8a8f3c7
commit d061bf3d07
14 changed files with 182 additions and 80 deletions

View File

@@ -56,7 +56,7 @@ sync_write = false
# produce_record_timeout = "100ms"
# backoff_init = "500ms"
# backoff_max = "10s"
# backoff_base = 2.0
# backoff_base = 2
# backoff_deadline = "5mins"
# Storage options, see `standalone.example.toml`.

View File

@@ -65,7 +65,7 @@ provider = "raft_engine"
# Number of partitions per topic.
# num_partitions = 1
# Expected number of replicas of each partition.
# replication_factor = 3
# replication_factor = 1
# Above which a topic creation operation will be cancelled.
# create_topic_timeout = "30s"
# The initial backoff for kafka clients.
@@ -73,7 +73,7 @@ provider = "raft_engine"
# The maximum backoff for kafka clients.
# backoff_max = "10s"
# Exponential backoff rate, i.e. next backoff = base * current backoff.
# backoff_base = 2.0
# backoff_base = 2
# Stop reconnecting if the total wait time reaches the deadline. If this config is missing, the reconnecting won't terminate.
# backoff_deadline = "5mins"

View File

@@ -103,7 +103,7 @@ provider = "raft_engine"
# Number of partitions per topic.
# num_partitions = 1
# Expected number of replicas of each partition.
# replication_factor = 3
# replication_factor = 1
# The maximum log size a kafka batch producer could buffer.
# max_batch_size = "4MB"
@@ -119,7 +119,7 @@ provider = "raft_engine"
# The maximum backoff for kafka clients.
# backoff_max = "10s"
# Exponential backoff rate, i.e. next backoff = base * current backoff.
# backoff_base = 2.0
# backoff_base = 2
# Stop reconnecting if the total wait time reaches the deadline. If this config is missing, the reconnecting won't terminate.
# backoff_deadline = "5mins"

View File

@@ -121,13 +121,16 @@ pub struct StandaloneKafkaConfig {
impl Default for StandaloneKafkaConfig {
fn default() -> Self {
let base = KafkaConfig::default();
let replication_factor = base.broker_endpoints.len() as i16;
Self {
base: KafkaConfig::default(),
base,
num_topics: 64,
selector_type: TopicSelectorType::RoundRobin,
topic_name_prefix: "greptimedb_wal_topic".to_string(),
num_partitions: 1,
replication_factor: 3,
replication_factor,
create_topic_timeout: Duration::from_secs(30),
}
}

View File

@@ -89,7 +89,7 @@ mod tests {
selector_type = "round_robin"
topic_name_prefix = "greptimedb_wal_topic"
num_partitions = 1
replication_factor = 3
replication_factor = 1
create_topic_timeout = "30s"
backoff_init = "500ms"
backoff_max = "10s"
@@ -103,7 +103,7 @@ mod tests {
selector_type: TopicSelectorType::RoundRobin,
topic_name_prefix: "greptimedb_wal_topic".to_string(),
num_partitions: 1,
replication_factor: 3,
replication_factor: 1,
create_topic_timeout: Duration::from_secs(30),
backoff: KafkaBackoffConfig {
init: Duration::from_millis(500),

View File

@@ -50,13 +50,16 @@ pub struct KafkaConfig {
impl Default for KafkaConfig {
fn default() -> Self {
let broker_endpoints = vec!["127.0.0.1:9092".to_string()];
let replication_factor = broker_endpoints.len() as i16;
Self {
broker_endpoints: vec!["127.0.0.1:9092".to_string()],
broker_endpoints,
num_topics: 64,
selector_type: TopicSelectorType::RoundRobin,
topic_name_prefix: "greptimedb_wal_topic".to_string(),
num_partitions: 1,
replication_factor: 3,
replication_factor,
create_topic_timeout: Duration::from_secs(30),
backoff: KafkaBackoffConfig::default(),
}

View File

@@ -17,10 +17,13 @@ use std::sync::Arc;
use std::time::Duration;
use common_config::wal::kafka::TopicSelectorType;
use common_telemetry::debug;
use common_telemetry::{debug, error, info};
use rskafka::client::controller::ControllerClient;
use rskafka::client::error::Error as RsKafkaError;
use rskafka::client::error::ProtocolError::TopicAlreadyExists;
use rskafka::client::ClientBuilder;
use rskafka::BackoffConfig;
use snafu::{ensure, ResultExt};
use snafu::{ensure, AsErrorSource, ResultExt};
use crate::error::{
BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, CreateKafkaWalTopicSnafu, DecodeJsonSnafu,
@@ -79,7 +82,6 @@ impl TopicManager {
.await?
.into_iter()
.collect::<HashSet<Topic>>();
debug!("Restored {} topics", created_topics.len());
// Creates missing topics.
let to_be_created = topics
@@ -92,10 +94,10 @@ impl TopicManager {
Some(i)
})
.collect::<Vec<_>>();
if !to_be_created.is_empty() {
self.try_create_topics(topics, &to_be_created).await?;
Self::persist_created_topics(topics, &self.kv_backend).await?;
debug!("Persisted {} topics", topics.len());
}
Ok(())
}
@@ -119,23 +121,12 @@ impl TopicManager {
.controller_client()
.context(BuildKafkaCtrlClientSnafu)?;
// Spawns tokio tasks for creating missing topics.
// Try to create missing topics.
let tasks = to_be_created
.iter()
.map(|i| {
client.create_topic(
topics[*i].clone(),
self.config.num_partitions,
self.config.replication_factor,
self.config.create_topic_timeout.as_millis() as i32,
)
})
.map(|i| self.try_create_topic(&topics[*i], &client))
.collect::<Vec<_>>();
// FIXME(niebayes): try to create an already-exist topic would raise an error.
futures::future::try_join_all(tasks)
.await
.context(CreateKafkaWalTopicSnafu)
.map(|_| ())
futures::future::try_join_all(tasks).await.map(|_| ())
}
/// Selects one topic from the topic pool through the topic selector.
@@ -150,6 +141,32 @@ impl TopicManager {
.collect()
}
async fn try_create_topic(&self, topic: &Topic, client: &ControllerClient) -> Result<()> {
match client
.create_topic(
topic.clone(),
self.config.num_partitions,
self.config.replication_factor,
self.config.create_topic_timeout.as_millis() as i32,
)
.await
{
Ok(_) => {
info!("Successfully created topic {}", topic);
Ok(())
}
Err(e) => {
if Self::is_topic_already_exist_err(&e) {
info!("The topic {} already exists", topic);
Ok(())
} else {
error!("Failed to create a topic {}, error {:?}", topic, e);
Err(e).context(CreateKafkaWalTopicSnafu)
}
}
}
}
async fn restore_created_topics(kv_backend: &KvBackendRef) -> Result<Vec<Topic>> {
kv_backend
.get(CREATED_TOPICS_KEY.as_bytes())
@@ -171,6 +188,16 @@ impl TopicManager {
.await
.map(|_| ())
}
fn is_topic_already_exist_err(e: &RsKafkaError) -> bool {
matches!(
e,
&RsKafkaError::ServerError {
protocol_error: TopicAlreadyExists,
..
}
)
}
}
#[cfg(test)]

View File

@@ -20,6 +20,8 @@ use common_macro::stack_trace_debug;
use common_runtime::error::Error as RuntimeError;
use snafu::{Location, Snafu};
use crate::kafka::NamespaceImpl as KafkaNamespace;
#[derive(Snafu)]
#[snafu(visibility(pub))]
#[stack_trace_debug]
@@ -152,16 +154,17 @@ pub enum Error {
error: rskafka::client::producer::Error,
},
#[snafu(display(
"Failed to read a record from Kafka, topic: {}, region_id: {}, offset: {}",
topic,
region_id,
offset,
))]
#[snafu(display("Failed to read a record from Kafka, ns: {}", ns))]
ConsumeRecord {
topic: String,
region_id: u64,
offset: i64,
ns: KafkaNamespace,
location: Location,
#[snafu(source)]
error: rskafka::client::error::Error,
},
#[snafu(display("Failed to get the latest offset, ns: {}", ns))]
GetOffset {
ns: KafkaNamespace,
location: Location,
#[snafu(source)]
error: rskafka::client::error::Error,

View File

@@ -17,6 +17,8 @@ pub mod log_store;
mod offset;
mod record_utils;
use std::fmt::Display;
use common_meta::wal::KafkaWalTopic as Topic;
use serde::{Deserialize, Serialize};
use store_api::logstore::entry::{Entry, Id as EntryId};
@@ -37,6 +39,12 @@ impl Namespace for NamespaceImpl {
}
}
impl Display for NamespaceImpl {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}/{}", self.topic, self.region_id)
}
}
/// Kafka Entry implementation.
#[derive(Debug, PartialEq, Clone)]
pub struct EntryImpl {
@@ -64,3 +72,15 @@ impl Entry for EntryImpl {
self.ns.clone()
}
}
impl Display for EntryImpl {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Entry (ns: {}, id: {}, data_len: {})",
self.ns,
self.id,
self.data.len()
)
}
}

View File

@@ -16,17 +16,20 @@ use std::collections::HashMap;
use std::sync::Arc;
use common_config::wal::{KafkaConfig, WalOptions};
use common_telemetry::{debug, warn};
use futures_util::StreamExt;
use rskafka::client::consumer::{StartOffset, StreamConsumerBuilder};
use rskafka::client::partition::OffsetAt;
use snafu::ResultExt;
use store_api::logstore::entry::Id as EntryId;
use store_api::logstore::entry_stream::SendableEntryStream;
use store_api::logstore::namespace::Id as NamespaceId;
use store_api::logstore::{AppendBatchResponse, AppendResponse, LogStore};
use crate::error::{Error, Result};
use crate::error::{ConsumeRecordSnafu, Error, GetOffsetSnafu, Result};
use crate::kafka::client_manager::{ClientManager, ClientManagerRef};
use crate::kafka::offset::Offset;
use crate::kafka::record_utils::{handle_consume_result, RecordProducer};
use crate::kafka::record_utils::{decode_from_record, RecordProducer};
use crate::kafka::{EntryImpl, NamespaceImpl};
/// A log store backed by Kafka.
@@ -82,6 +85,8 @@ impl LogStore for KafkaLogStore {
/// Appends a batch of entries and returns a response containing a map where the key is a region id
/// while the value is the id of the last successfully written entry of the region.
async fn append_batch(&self, entries: Vec<Self::Entry>) -> Result<AppendBatchResponse> {
debug!("LogStore handles append_batch with entries {:?}", entries);
if entries.is_empty() {
return Ok(AppendBatchResponse::default());
}
@@ -97,6 +102,7 @@ impl LogStore for KafkaLogStore {
// Builds a record from entries belong to a region and produces them to kafka server.
let region_ids = producers.keys().cloned().collect::<Vec<_>>();
let tasks = producers
.into_values()
.map(|producer| producer.produce(&self.client_manager))
@@ -108,6 +114,8 @@ impl LogStore for KafkaLogStore {
.into_iter()
.map(TryInto::try_into)
.collect::<Result<Vec<_>>>()?;
debug!("The entries are appended at offsets {:?}", entry_ids);
Ok(AppendBatchResponse {
last_entry_ids: region_ids.into_iter().zip(entry_ids).collect(),
})
@@ -131,15 +139,72 @@ impl LogStore for KafkaLogStore {
.raw_client
.clone();
// Reads the entries starting from exactly the specified offset.
let offset = Offset::try_from(entry_id)?.0;
let mut stream_consumer = StreamConsumerBuilder::new(client, StartOffset::At(offset))
// Gets the offset of the latest record in the topic. Actually, it's the latest record of the single partition in the topic.
// The read operation terminates when this record is consumed.
// Warning: the `get_offset` returns the end offset of the latest record. For our usage, it should be decremented.
// See: https://kafka.apache.org/36/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#endOffsets(java.util.Collection)
let end_offset = client
.get_offset(OffsetAt::Latest)
.await
.context(GetOffsetSnafu { ns: ns.clone() })?
- 1;
// Reads entries with offsets in the range [start_offset, end_offset).
let start_offset = Offset::try_from(entry_id)?.0;
// Abort if there're no new entries.
// FIXME(niebayes): how come this case happens?
if start_offset > end_offset {
warn!(
"No new entries for ns {} in range [{}, {})",
ns, start_offset, end_offset
);
return Ok(futures_util::stream::empty().boxed());
}
let mut stream_consumer = StreamConsumerBuilder::new(client, StartOffset::At(start_offset))
.with_max_batch_size(self.config.max_batch_size.as_bytes() as i32)
.with_max_wait_ms(self.config.produce_record_timeout.as_millis() as i32)
.build();
debug!(
"Built a stream consumer for ns {} to consume entries in range [{}, {})",
ns, start_offset, end_offset
);
let ns_clone = ns.clone();
let stream = async_stream::stream!({
while let Some(consume_result) = stream_consumer.next().await {
yield handle_consume_result(consume_result, &topic, region_id, offset);
// Each next will prdoce a `RecordAndOffset` and a high watermark offset.
// The `RecordAndOffset` contains the record data and its start offset.
// The high watermark offset is the end offset of the latest record in the partition.
let (record, high_watermark) = consume_result.context(ConsumeRecordSnafu {
ns: ns_clone.clone(),
})?;
let record_offset = record.offset;
debug!(
"Read a record at offset {} for ns {}, high watermark: {}",
record_offset, ns_clone, high_watermark
);
let entries = decode_from_record(record.record)?;
// Filters entries by region id.
if let Some(entry) = entries.first()
&& entry.ns.region_id == region_id
{
yield Ok(entries);
} else {
yield Ok(vec![]);
}
// Terminates the stream if the entry with the end offset was read.
if record_offset >= end_offset {
debug!(
"Stream consumer for ns {} terminates at offset {}",
ns_clone, record_offset
);
break;
}
}
});
Ok(Box::pin(stream))

View File

@@ -12,21 +12,18 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_config::wal::KafkaWalTopic as Topic;
use rskafka::record::{Record, RecordAndOffset};
use rskafka::record::Record;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use crate::error::{
ConsumeRecordSnafu, DecodeMetaSnafu, EmptyEntriesSnafu, EncodeMetaSnafu, GetClientSnafu,
MissingKeySnafu, MissingValueSnafu, ProduceRecordSnafu, Result,
DecodeMetaSnafu, EmptyEntriesSnafu, EncodeMetaSnafu, GetClientSnafu, MissingKeySnafu,
MissingValueSnafu, ProduceRecordSnafu, Result,
};
use crate::kafka::client_manager::ClientManagerRef;
use crate::kafka::offset::Offset;
use crate::kafka::{EntryId, EntryImpl, NamespaceImpl};
type ConsumeResult = std::result::Result<(RecordAndOffset, i64), rskafka::client::error::Error>;
/// Record metadata which will be serialized/deserialized to/from the `key` of a Record.
#[derive(Debug, Serialize, Deserialize, PartialEq)]
struct RecordMeta {
@@ -125,7 +122,7 @@ fn encode_to_record(ns: NamespaceImpl, entries: Vec<EntryImpl>) -> Result<Record
})
}
fn decode_from_record(record: Record) -> Result<Vec<EntryImpl>> {
pub(crate) fn decode_from_record(record: Record) -> Result<Vec<EntryImpl>> {
let key = record.key.context(MissingKeySnafu)?;
let value = record.value.context(MissingValueSnafu)?;
let meta: RecordMeta = serde_json::from_slice(&key).context(DecodeMetaSnafu)?;
@@ -144,34 +141,6 @@ fn decode_from_record(record: Record) -> Result<Vec<EntryImpl>> {
Ok(entries)
}
/// Handles the result of a consume operation on a kafka topic.
pub(crate) fn handle_consume_result(
result: ConsumeResult,
topic: &Topic,
region_id: u64,
offset: i64,
) -> Result<Vec<EntryImpl>> {
match result {
Ok((record_and_offset, _)) => {
// Only produces entries belong to the region with the given region id.
// Since a record only contains entries from a single region, it suffices to check the first entry only.
let entries = decode_from_record(record_and_offset.record)?;
if let Some(entry) = entries.first()
&& entry.id == region_id
{
Ok(entries)
} else {
Ok(vec![])
}
}
Err(e) => Err(e).context(ConsumeRecordSnafu {
topic,
region_id,
offset,
}),
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -56,6 +56,7 @@ impl Default for SelectorOptions {
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(try_from = "String")]
pub enum SelectorType {
#[default]
LoadBased,
@@ -77,6 +78,14 @@ impl TryFrom<&str> for SelectorType {
}
}
impl TryFrom<String> for SelectorType {
type Error = error::Error;
fn try_from(value: String) -> Result<Self> {
SelectorType::try_from(value.as_str())
}
}
#[cfg(test)]
mod tests {
use super::SelectorType;

View File

@@ -256,6 +256,10 @@ impl RegionOpener {
let flushed_entry_id = version.flushed_entry_id;
let version_control = Arc::new(VersionControl::new(version));
if !self.skip_wal_replay {
info!(
"Start replaying memtable at flushed_entry_id {} for region {}",
flushed_entry_id, region_id
);
replay_memtable(
wal,
&wal_options,

View File

@@ -49,7 +49,6 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug {
/// Creates a new `EntryStream` to asynchronously generates `Entry` with ids
/// starting from `id`.
// TODO(niebayes): update docs for entry id.
async fn read(
&self,
ns: &Self::Namespace,