feat(remote_wal): impl kafka log store (#2971)

* feat: introduce client manager

* chore: add errors for client manager

* chore: add record utils

* chore: impl kafka log store

* chore: build kafka log store upon starting datanode

* chore: update comments for kafka log store

* chore: add a todo for getting entry offset

* fix: typo

* chore: remove unused

* chore: update comments

* fix: typo

* fix: resolve some review conversations

* chore: move commonly referenced crates to workspace Cargo.toml

* fix: style

* fix: style

* chore: unify topic name prefix

* chore: make backoff config configurable by users

* chore: properly use backoff config in wal config

* refactor: read/write of kafka log store

* fix: typo

* fix: typo

* fix: resolve review conversations
This commit is contained in:
niebayes
2023-12-25 17:21:52 +08:00
committed by GitHub
parent d4ac8734bc
commit bab198ae68
19 changed files with 655 additions and 106 deletions

View File

@@ -41,7 +41,7 @@ tcp_nodelay = true
# WAL data directory
provider = "raft_engine"
# Raft-engine wal options, see `standalone.example.toml`
# Raft-engine wal options, see `standalone.example.toml`.
# dir = "/tmp/greptimedb/wal"
file_size = "256MB"
purge_threshold = "4GB"
@@ -49,21 +49,15 @@ purge_interval = "10m"
read_batch_size = 128
sync_write = false
# Kafka wal options.
# The broker endpoints of the Kafka cluster. ["127.0.0.1:9090"] by default.
# Kafka wal options, see `standalone.example.toml`.
# broker_endpoints = ["127.0.0.1:9090"]
# Number of topics shall be created beforehand.
# num_topics = 64
# Topic name prefix.
# topic_name_prefix = "greptimedb_wal_kafka_topic"
# Number of partitions per topic.
# num_partitions = 1
# The maximum log size an rskafka batch producer could buffer.
# max_batch_size = "4MB"
# The linger duration of an rskafka batch producer.
# linger = "200ms"
# The maximum amount of time (in milliseconds) to wait for Kafka records to be returned.
# max_wait_time = "100ms"
# backoff_init = "500ms"
# backoff_max = "10s"
# backoff_base = 2.0
# backoff_deadline = "5mins"
# Storage options, see `standalone.example.toml`.
[storage]

View File

@@ -61,7 +61,7 @@ provider = "raft_engine"
# - "round_robin" (default)
# selector_type = "round_robin"
# A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.
# topic_name_prefix = "greptimedb_wal_kafka"
# topic_name_prefix = "greptimedb_wal_topic"
# Number of partitions per topic.
# num_partitions = 1
# Expected number of replicas of each partition.

View File

@@ -87,7 +87,23 @@ enable = true
# - "Kafka"
provider = "raft_engine"
# There's no kafka wal config for standalone mode.
# Kafka wal options.
# The broker endpoints of the Kafka cluster. ["127.0.0.1:9090"] by default.
# broker_endpoints = ["127.0.0.1:9090"]
# The maximum log size a kafka batch producer could buffer.
# max_batch_size = "4MB"
# The linger duration of a kafka batch producer.
# linger = "200ms"
# The maximum amount of time (in milliseconds) to wait for Kafka records to be returned.
# max_wait_time = "100ms"
# The initial backoff for kafka clients.
# backoff_init = "500ms"
# The maximum backoff for kafka clients.
# backoff_max = "10s"
# Exponential backoff rate, i.e. next backoff = base * current backoff.
# backoff_base = 2.0
# Stop reconnecting if the total wait time reaches the deadline. If this config is missing, the reconnecting won't terminate.
# backoff_deadline = "5mins"
# WAL data directory
# dir = "/tmp/greptimedb/wal"

View File

@@ -70,23 +70,25 @@ mod tests {
fn test_serde_kafka_config() {
let toml_str = r#"
broker_endpoints = ["127.0.0.1:9090"]
num_topics = 32
topic_name_prefix = "greptimedb_wal_kafka_topic"
num_partitions = 1
max_batch_size = "4MB"
linger = "200ms"
max_wait_time = "100ms"
backoff_init = "500ms"
backoff_max = "10s"
backoff_base = 2
backoff_deadline = "5mins"
"#;
let decoded: KafkaConfig = toml::from_str(toml_str).unwrap();
let expected = KafkaConfig {
broker_endpoints: vec!["127.0.0.1:9090".to_string()],
num_topics: 32,
topic_name_prefix: "greptimedb_wal_kafka_topic".to_string(),
num_partitions: 1,
compression: RsKafkaCompression::default(),
max_batch_size: ReadableSize::mb(4),
linger: Duration::from_millis(200),
max_wait_time: Duration::from_millis(100),
backoff_init: Duration::from_millis(500),
backoff_max: Duration::from_secs(10),
backoff_base: 2,
backoff_deadline: Some(Duration::from_secs(60 * 5)),
};
assert_eq!(decoded, expected);
}

View File

@@ -19,7 +19,7 @@ use rskafka::client::partition::Compression as RsKafkaCompression;
use serde::{Deserialize, Serialize};
/// Topic name prefix.
pub const TOPIC_NAME_PREFIX: &str = "greptimedb_wal_kafka_topic";
pub const TOPIC_NAME_PREFIX: &str = "greptimedb_wal_topic";
/// Kafka wal topic.
pub type Topic = String;
@@ -29,37 +29,45 @@ pub type Topic = String;
pub struct KafkaConfig {
/// The broker endpoints of the Kafka cluster.
pub broker_endpoints: Vec<String>,
/// Number of topics shall be created beforehand.
pub num_topics: usize,
/// Topic name prefix.
pub topic_name_prefix: String,
/// Number of partitions per topic.
pub num_partitions: i32,
/// The compression algorithm used to compress log entries.
#[serde(skip)]
#[serde(default)]
pub compression: RsKafkaCompression,
/// The maximum log size an rskakfa batch producer could buffer.
/// The maximum log size a kakfa batch producer could buffer.
pub max_batch_size: ReadableSize,
/// The linger duration of an rskafka batch producer.
/// The linger duration of a kafka batch producer.
#[serde(with = "humantime_serde")]
pub linger: Duration,
/// The maximum amount of time (in milliseconds) to wait for Kafka records to be returned.
#[serde(with = "humantime_serde")]
pub max_wait_time: Duration,
/// The initial backoff for kafka clients.
#[serde(with = "humantime_serde")]
pub backoff_init: Duration,
/// The maximum backoff for kafka clients.
#[serde(with = "humantime_serde")]
pub backoff_max: Duration,
/// Exponential backoff rate, i.e. next backoff = base * current backoff.
// Sets to u32 type since some structs containing the KafkaConfig need to derive the Eq trait.
pub backoff_base: u32,
/// Stop reconnecting if the total wait time reaches the deadline.
/// If it's None, the reconnecting won't terminate.
#[serde(with = "humantime_serde")]
pub backoff_deadline: Option<Duration>,
}
impl Default for KafkaConfig {
fn default() -> Self {
Self {
broker_endpoints: vec!["127.0.0.1:9090".to_string()],
num_topics: 64,
topic_name_prefix: TOPIC_NAME_PREFIX.to_string(),
num_partitions: 1,
compression: RsKafkaCompression::NoCompression,
max_batch_size: ReadableSize::mb(4),
linger: Duration::from_millis(200),
max_wait_time: Duration::from_millis(100),
backoff_init: Duration::from_millis(500),
backoff_max: Duration::from_secs(10),
backoff_base: 2,
backoff_deadline: Some(Duration::from_secs(60 * 5)), // 5 mins
}
}
}

View File

@@ -304,7 +304,7 @@ pub enum Error {
},
#[snafu(display(
"Failed to build a kafka client, broker endpoints: {:?}",
"Failed to build a Kafka client, broker endpoints: {:?}",
broker_endpoints
))]
BuildKafkaClient {
@@ -314,14 +314,14 @@ pub enum Error {
error: rskafka::client::error::Error,
},
#[snafu(display("Failed to build a kafka controller client"))]
#[snafu(display("Failed to build a Kafka controller client"))]
BuildKafkaCtrlClient {
location: Location,
#[snafu(source)]
error: rskafka::client::error::Error,
},
#[snafu(display("Failed to create a kafka wal topic"))]
#[snafu(display("Failed to create a Kafka wal topic"))]
CreateKafkaWalTopic {
location: Location,
#[snafu(source)]

View File

@@ -69,13 +69,13 @@ mod tests {
broker_endpoints = ["127.0.0.1:9090"]
num_topics = 32
selector_type = "round_robin"
topic_name_prefix = "greptimedb_wal_kafka"
topic_name_prefix = "greptimedb_wal_topic"
num_partitions = 1
replication_factor = 3
create_topic_timeout = "30s"
backoff_init = "500ms"
backoff_max = "10s"
backoff_base = 2.0
backoff_base = 2
backoff_deadline = "5mins"
"#;
let wal_config: WalConfig = toml::from_str(toml_str).unwrap();
@@ -83,13 +83,13 @@ mod tests {
broker_endpoints: vec!["127.0.0.1:9090".to_string()],
num_topics: 32,
selector_type: KafkaTopicSelectorType::RoundRobin,
topic_name_prefix: "greptimedb_wal_kafka".to_string(),
topic_name_prefix: "greptimedb_wal_topic".to_string(),
num_partitions: 1,
replication_factor: 3,
create_topic_timeout: Duration::from_secs(30),
backoff_init: Duration::from_millis(500),
backoff_max: Duration::from_secs(10),
backoff_base: 2.0,
backoff_base: 2,
backoff_deadline: Some(Duration::from_secs(60 * 5)),
};
assert_eq!(wal_config, WalConfig::Kafka(expected_kafka_config));

View File

@@ -49,7 +49,9 @@ pub struct KafkaConfig {
#[serde(with = "humantime_serde")]
pub backoff_max: Duration,
/// Exponential backoff rate, i.e. next backoff = base * current backoff.
pub backoff_base: f64,
// Sets to u32 type since the `backoff_base` field in the KafkaConfig for datanode is of type u32,
// and we want to unify their types.
pub backoff_base: u32,
/// Stop reconnecting if the total wait time reaches the deadline.
/// If it's None, the reconnecting won't terminate.
#[serde(with = "humantime_serde")]
@@ -62,13 +64,13 @@ impl Default for KafkaConfig {
broker_endpoints: vec!["127.0.0.1:9090".to_string()],
num_topics: 64,
selector_type: TopicSelectorType::RoundRobin,
topic_name_prefix: "greptimedb_wal_kafka".to_string(),
topic_name_prefix: "greptimedb_wal_topic".to_string(),
num_partitions: 1,
replication_factor: 3,
create_topic_timeout: Duration::from_secs(30),
backoff_init: Duration::from_millis(500),
backoff_max: Duration::from_secs(10),
backoff_base: 2.0,
backoff_base: 2,
backoff_deadline: Some(Duration::from_secs(60 * 5)), // 5 mins
}
}

View File

@@ -105,7 +105,7 @@ impl TopicManager {
let backoff_config = BackoffConfig {
init_backoff: self.config.backoff_init,
max_backoff: self.config.backoff_max,
base: self.config.backoff_base,
base: self.config.backoff_base as f64,
deadline: self.config.backoff_deadline,
};
let client = ClientBuilder::new(self.config.broker_endpoints.clone())
@@ -181,7 +181,7 @@ mod tests {
#[tokio::test]
async fn test_restore_persisted_topics() {
let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
let topic_name_prefix = "greptimedb_wal_kafka";
let topic_name_prefix = "greptimedb_wal_topic";
let num_topics = 16;
// Constructs mock topics.

View File

@@ -504,8 +504,11 @@ impl DatanodeBuilder {
/// Builds [KafkaLogStore].
async fn build_kafka_log_store(config: &KafkaConfig) -> Result<Arc<KafkaLogStore>> {
let _ = config;
todo!()
KafkaLogStore::try_new(config)
.await
.map_err(Box::new)
.context(OpenLogStoreSnafu)
.map(Arc::new)
}
/// Builds [ObjectStoreManager]

View File

@@ -14,6 +14,7 @@
use std::any::Any;
use common_config::wal::KafkaWalTopic;
use common_error::ext::ErrorExt;
use common_macro::stack_trace_debug;
use common_runtime::error::Error as RuntimeError;
@@ -84,6 +85,90 @@ pub enum Error {
attempt_index: u64,
location: Location,
},
#[snafu(display(
"Failed to build a Kafka client, broker endpoints: {:?}",
broker_endpoints
))]
BuildClient {
broker_endpoints: Vec<String>,
location: Location,
#[snafu(source)]
error: rskafka::client::error::Error,
},
#[snafu(display(
"Failed to build a Kafka partition client, topic: {}, partition: {}",
topic,
partition
))]
BuildPartitionClient {
topic: String,
partition: i32,
location: Location,
#[snafu(source)]
error: rskafka::client::error::Error,
},
#[snafu(display(
"Failed to get a Kafka topic client, topic: {}, source: {}",
topic,
error
))]
GetClient {
topic: KafkaWalTopic,
location: Location,
error: String,
},
#[snafu(display("Failed to encode a record meta"))]
EncodeMeta {
location: Location,
#[snafu(source)]
error: serde_json::Error,
},
#[snafu(display("Failed to decode a record meta"))]
DecodeMeta {
location: Location,
#[snafu(source)]
error: serde_json::Error,
},
#[snafu(display("Missing required key in a record"))]
MissingKey { location: Location },
#[snafu(display("Missing required value in a record"))]
MissingValue { location: Location },
#[snafu(display("Cannot build a record from empty entries"))]
EmptyEntries { location: Location },
#[snafu(display("Failed to produce records to Kafka, topic: {}", topic))]
ProduceRecord {
topic: KafkaWalTopic,
location: Location,
#[snafu(source)]
error: rskafka::client::producer::Error,
},
#[snafu(display(
"Failed to read a record from Kafka, topic: {}, region_id: {}, offset: {}",
topic,
region_id,
offset,
))]
ConsumeRecord {
topic: String,
region_id: u64,
offset: i64,
location: Location,
#[snafu(source)]
error: rskafka::client::error::Error,
},
#[snafu(display("Failed to do a cast"))]
Cast { location: Location },
}
impl ErrorExt for Error {

View File

@@ -12,35 +12,25 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod client_manager;
pub mod log_store;
mod offset;
mod record_utils;
use common_meta::wal::KafkaWalTopic as Topic;
use serde::{Deserialize, Serialize};
use store_api::logstore::entry::{Entry, Id as EntryId};
use store_api::logstore::namespace::Namespace;
use crate::error::Error;
/// Kafka Namespace implementation.
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
pub struct NamespaceImpl {
region_id: u64,
topic: Topic,
}
impl NamespaceImpl {
fn new(region_id: u64, topic: Topic) -> Self {
Self { region_id, topic }
}
fn region_id(&self) -> u64 {
self.region_id
}
fn topic(&self) -> &Topic {
&self.topic
}
}
impl Namespace for NamespaceImpl {
fn id(&self) -> u64 {
self.region_id
@@ -48,6 +38,7 @@ impl Namespace for NamespaceImpl {
}
/// Kafka Entry implementation.
#[derive(Debug, PartialEq, Clone)]
pub struct EntryImpl {
/// Entry payload.
data: Vec<u8>,
@@ -57,16 +48,6 @@ pub struct EntryImpl {
ns: NamespaceImpl,
}
impl EntryImpl {
fn new(data: Vec<u8>, entry_id: EntryId, ns: NamespaceImpl) -> Self {
Self {
data,
id: entry_id,
ns,
}
}
}
impl Entry for EntryImpl {
type Error = Error;
type Namespace = NamespaceImpl;

View File

@@ -0,0 +1,126 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_config::wal::{KafkaConfig, KafkaWalTopic as Topic};
use dashmap::mapref::entry::Entry as DashMapEntry;
use dashmap::DashMap;
use rskafka::client::partition::{PartitionClient, UnknownTopicHandling};
use rskafka::client::producer::aggregator::RecordAggregator;
use rskafka::client::producer::{BatchProducer, BatchProducerBuilder};
use rskafka::client::{Client as RsKafkaClient, ClientBuilder};
use rskafka::BackoffConfig;
use snafu::ResultExt;
use crate::error::{BuildClientSnafu, BuildPartitionClientSnafu, Result};
// Each topic only has one partition for now.
// The `DEFAULT_PARTITION` refers to the index of the partition.
const DEFAULT_PARTITION: i32 = 0;
/// Arc wrapper of ClientManager.
pub(crate) type ClientManagerRef = Arc<ClientManager>;
/// A client through which to contact Kafka cluster. Each client associates with one partition of a topic.
/// Since a topic only has one partition in our design, the mapping between clients and topics are one-one.
#[derive(Debug, Clone)]
pub(crate) struct Client {
/// A raw client used to construct a batch producer and/or a stream consumer for a specific topic.
pub(crate) raw_client: Arc<PartitionClient>,
/// A producer used to buffer log entries for a specific topic before sending them in a batching manner.
pub(crate) producer: Arc<BatchProducer<RecordAggregator>>,
}
impl Client {
/// Creates a Client from the raw client.
pub(crate) fn new(raw_client: Arc<PartitionClient>, config: &KafkaConfig) -> Self {
let record_aggregator = RecordAggregator::new(config.max_batch_size.as_bytes() as usize);
let batch_producer = BatchProducerBuilder::new(raw_client.clone())
.with_compression(config.compression)
.with_linger(config.linger)
.build(record_aggregator);
Self {
raw_client,
producer: Arc::new(batch_producer),
}
}
}
/// Manages client construction and accesses.
#[derive(Debug)]
pub(crate) struct ClientManager {
config: KafkaConfig,
/// Top-level client in kafka. All clients are constructed by this client.
client_factory: RsKafkaClient,
/// A pool maintaining a collection of clients.
/// Key: a topic. Value: the associated client of the topic.
client_pool: DashMap<Topic, Client>,
}
impl ClientManager {
/// Tries to create a ClientManager.
pub(crate) async fn try_new(config: &KafkaConfig) -> Result<Self> {
// Sets backoff config for the top-level kafka client and all clients constructed by it.
let backoff_config = BackoffConfig {
init_backoff: config.backoff_init,
max_backoff: config.backoff_max,
base: config.backoff_base as f64,
deadline: config.backoff_deadline,
};
let client = ClientBuilder::new(config.broker_endpoints.clone())
.backoff_config(backoff_config)
.build()
.await
.with_context(|_| BuildClientSnafu {
broker_endpoints: config.broker_endpoints.clone(),
})?;
Ok(Self {
config: config.clone(),
client_factory: client,
client_pool: DashMap::new(),
})
}
/// Gets the client associated with the topic. If the client does not exist, a new one will
/// be created and returned.
pub(crate) async fn get_or_insert(&self, topic: &Topic) -> Result<Client> {
match self.client_pool.entry(topic.to_string()) {
DashMapEntry::Occupied(entry) => Ok(entry.get().clone()),
DashMapEntry::Vacant(entry) => {
let topic_client = self.try_create_client(topic).await?;
Ok(entry.insert(topic_client).clone())
}
}
}
async fn try_create_client(&self, topic: &Topic) -> Result<Client> {
// Sets to Retry to retry connecting if the kafka cluter replies with an UnknownTopic error.
// That's because the topic is believed to exist as the metasrv is expected to create required topics upon start.
// The reconnecting won't stop until succeed or a different error returns.
let raw_client = self
.client_factory
.partition_client(topic, DEFAULT_PARTITION, UnknownTopicHandling::Retry)
.await
.context(BuildPartitionClientSnafu {
topic,
partition: DEFAULT_PARTITION,
})
.map(Arc::new)?;
Ok(Client::new(raw_client, &self.config))
}
}

View File

@@ -13,22 +13,37 @@
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use common_config::wal::{KafkaConfig, WalOptions};
use futures_util::StreamExt;
use rskafka::client::consumer::{StartOffset, StreamConsumerBuilder};
use store_api::logstore::entry::Id as EntryId;
use store_api::logstore::entry_stream::SendableEntryStream;
use store_api::logstore::namespace::Id as NamespaceId;
use store_api::logstore::{AppendBatchResponse, AppendResponse, LogStore};
use crate::error::{Error, Result};
use crate::kafka::client_manager::{ClientManager, ClientManagerRef};
use crate::kafka::offset::Offset;
use crate::kafka::record_utils::{handle_consume_result, RecordProducer};
use crate::kafka::{EntryImpl, NamespaceImpl};
/// A log store backed by Kafka.
#[derive(Debug)]
pub struct KafkaLogStore;
pub struct KafkaLogStore {
config: KafkaConfig,
/// Manages kafka clients through which the log store contact the Kafka cluster.
client_manager: ClientManagerRef,
}
impl KafkaLogStore {
pub async fn try_new(config: KafkaConfig) -> Result<Self> {
todo!()
/// Tries to create a Kafka log store.
pub async fn try_new(config: &KafkaConfig) -> Result<Self> {
Ok(Self {
client_manager: Arc::new(ClientManager::try_new(config).await?),
config: config.clone(),
})
}
}
@@ -38,68 +53,135 @@ impl LogStore for KafkaLogStore {
type Entry = EntryImpl;
type Namespace = NamespaceImpl;
/// Create an entry of the associate Entry type.
/// Creates an entry of the associated Entry type.
fn entry<D: AsRef<[u8]>>(
&self,
data: D,
entry_id: EntryId,
ns: Self::Namespace,
) -> Self::Entry {
EntryImpl::new(data.as_ref().to_vec(), entry_id, ns)
EntryImpl {
data: data.as_ref().to_vec(),
id: entry_id,
ns,
}
}
/// Append an `Entry` to WAL with given namespace and return append response containing
/// the entry id.
/// Appends an entry to the log store and returns a response containing the entry id of the appended entry.
async fn append(&self, entry: Self::Entry) -> Result<AppendResponse> {
todo!()
let entry_id = RecordProducer::new(entry.ns.clone())
.with_entries(vec![entry])
.produce(&self.client_manager)
.await
.map(TryInto::try_into)??;
Ok(AppendResponse {
last_entry_id: entry_id,
})
}
/// For a batch of log entries belonging to multiple regions, each assigned to a specific topic,
/// we need to determine the minimum log offset returned for each region in this batch.
/// During replay, we use this offset to fetch log entries for a region from its assigned topic.
/// After fetching, we filter the entries to obtain log entries relevant to that specific region.
/// Appends a batch of entries and returns a response containing a map where the key is a region id
/// while the value is the id of the last successfully written entry of the region.
async fn append_batch(&self, entries: Vec<Self::Entry>) -> Result<AppendBatchResponse> {
todo!()
if entries.is_empty() {
return Ok(AppendBatchResponse::default());
}
// Groups entries by region id and pushes them to an associated record producer.
let mut producers = HashMap::with_capacity(entries.len());
for entry in entries {
producers
.entry(entry.ns.region_id)
.or_insert(RecordProducer::new(entry.ns.clone()))
.push(entry);
}
// Builds a record from entries belong to a region and produces them to kafka server.
let region_ids = producers.keys().cloned().collect::<Vec<_>>();
let tasks = producers
.into_values()
.map(|producer| producer.produce(&self.client_manager))
.collect::<Vec<_>>();
// Each produce operation returns a kafka offset of the produced record.
// The offsets are then converted to entry ids.
let entry_ids = futures::future::try_join_all(tasks)
.await?
.into_iter()
.map(TryInto::try_into)
.collect::<Result<Vec<_>>>()?;
Ok(AppendBatchResponse {
last_entry_ids: region_ids.into_iter().zip(entry_ids).collect(),
})
}
/// Create a new `EntryStream` to asynchronously generates `Entry` with ids
/// starting from `id`. The generated entries will be filtered by the namespace.
/// Creates a new `EntryStream` to asynchronously generates `Entry` with entry ids
/// starting from `entry_id`. The generated entries will be filtered by the namespace.
async fn read(
&self,
ns: &Self::Namespace,
entry_id: EntryId,
) -> Result<SendableEntryStream<Self::Entry, Self::Error>> {
todo!()
let topic = ns.topic.clone();
let region_id = ns.region_id;
// Gets the client associated with the topic.
let client = self
.client_manager
.get_or_insert(&topic)
.await?
.raw_client
.clone();
// Reads the entries starting from exactly the specified offset.
let offset = Offset::try_from(entry_id)?.0;
let mut stream_consumer = StreamConsumerBuilder::new(client, StartOffset::At(offset))
.with_max_batch_size(self.config.max_batch_size.as_bytes() as i32)
.with_max_wait_ms(self.config.max_wait_time.as_millis() as i32)
.build();
let stream = async_stream::stream!({
while let Some(consume_result) = stream_consumer.next().await {
yield handle_consume_result(consume_result, &topic, region_id, offset);
}
});
Ok(Box::pin(stream))
}
/// Create a namespace of the associate Namespace type
/// Creates a namespace of the associated Namespace type.
fn namespace(&self, ns_id: NamespaceId, wal_options: &WalOptions) -> Self::Namespace {
todo!()
// Safety: upon start, the datanode checks the consistency of the wal providers in the wal config of the
// datanode and that of the metasrv. Therefore, the wal options passed into the kafka log store
// must be of type WalOptions::Kafka.
let WalOptions::Kafka(kafka_options) = wal_options else {
unreachable!()
};
NamespaceImpl {
region_id: ns_id,
topic: kafka_options.topic.clone(),
}
}
/// Create a new `Namespace`.
/// Creates a new `Namespace` from the given ref.
async fn create_namespace(&self, _ns: &Self::Namespace) -> Result<()> {
Ok(())
}
/// Delete an existing `Namespace` with given ref.
/// Deletes an existing `Namespace` specified by the given ref.
async fn delete_namespace(&self, _ns: &Self::Namespace) -> Result<()> {
Ok(())
}
/// List all existing namespaces.
/// Lists all existing namespaces.
async fn list_namespaces(&self) -> Result<Vec<Self::Namespace>> {
Ok(vec![])
}
/// Mark all entry ids `<=id` of given `namespace` as obsolete so that logstore can safely delete
/// the log files if all entries inside are obsolete. This method may not delete log
/// files immediately.
/// Marks all entries with ids `<=entry_id` of the given `namespace` as obsolete,
/// so that the log store can safely delete those entries. This method does not guarantee
/// that the obsolete entries are deleted immediately.
async fn obsolete(&self, _ns: Self::Namespace, _entry_id: EntryId) -> Result<()> {
Ok(())
}
/// Stop components of logstore.
/// Stops components of the logstore.
async fn stop(&self) -> Result<()> {
Ok(())
}

View File

@@ -0,0 +1,37 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::error::{CastSnafu, Result};
use crate::kafka::EntryId;
/// A wrapper of kafka offset.
pub(crate) struct Offset(pub i64);
impl TryFrom<Offset> for EntryId {
type Error = crate::error::Error;
fn try_from(offset: Offset) -> Result<Self> {
EntryId::try_from(offset.0).map_err(|_| CastSnafu.build())
}
}
impl TryFrom<EntryId> for Offset {
type Error = crate::error::Error;
fn try_from(entry_id: EntryId) -> Result<Self> {
i64::try_from(entry_id)
.map(Offset)
.map_err(|_| CastSnafu.build())
}
}

View File

@@ -0,0 +1,219 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_config::wal::KafkaWalTopic as Topic;
use rskafka::record::{Record, RecordAndOffset};
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use crate::error::{
ConsumeRecordSnafu, DecodeMetaSnafu, EmptyEntriesSnafu, EncodeMetaSnafu, GetClientSnafu,
MissingKeySnafu, MissingValueSnafu, ProduceRecordSnafu, Result,
};
use crate::kafka::client_manager::ClientManagerRef;
use crate::kafka::offset::Offset;
use crate::kafka::{EntryId, EntryImpl, NamespaceImpl};
type ConsumeResult = std::result::Result<(RecordAndOffset, i64), rskafka::client::error::Error>;
/// Record metadata which will be serialized/deserialized to/from the `key` of a Record.
#[derive(Debug, Serialize, Deserialize, PartialEq)]
struct RecordMeta {
/// Meta version. Used for backward compatibility.
version: u32,
/// The namespace of the entries wrapped in the record.
ns: NamespaceImpl,
/// Ids of the entries built into the record.
entry_ids: Vec<EntryId>,
/// entry_offsets[i] is the end offset (exclusive) of the data of the i-th entry in the record value.
entry_offsets: Vec<usize>,
}
impl RecordMeta {
fn new(ns: NamespaceImpl, entries: &[EntryImpl]) -> Self {
Self {
version: 0,
ns,
entry_ids: entries.iter().map(|entry| entry.id).collect(),
entry_offsets: entries
.iter()
.map(|entry| entry.data.len())
.scan(0, |presum, x| {
*presum += x;
Some(*presum)
})
.collect(),
}
}
}
/// Produces a record to a kafka topic.
pub(crate) struct RecordProducer {
/// The namespace of the entries.
ns: NamespaceImpl,
/// Entries are buffered before being built into a record.
entries: Vec<EntryImpl>,
}
impl RecordProducer {
/// Creates a new producer for producing entries with the given namespace.
pub(crate) fn new(ns: NamespaceImpl) -> Self {
Self {
ns,
entries: Vec::new(),
}
}
/// Populates the entry buffer with the given entries.
pub(crate) fn with_entries(self, entries: Vec<EntryImpl>) -> Self {
Self { entries, ..self }
}
/// Pushes an entry into the entry buffer.
pub(crate) fn push(&mut self, entry: EntryImpl) {
self.entries.push(entry);
}
/// Produces the buffered entries to kafka sever as a kafka record.
/// Returns the kafka offset of the produced record.
// TODO(niebayes): since the total size of a region's entries may be way-too large,
// the producer may need to support splitting entries into multiple records.
pub(crate) async fn produce(self, client_manager: &ClientManagerRef) -> Result<Offset> {
ensure!(!self.entries.is_empty(), EmptyEntriesSnafu);
// Produces the record through a client. The client determines when to send the record to kafka server.
let client = client_manager
.get_or_insert(&self.ns.topic)
.await
.map_err(|e| {
GetClientSnafu {
topic: &self.ns.topic,
error: e.to_string(),
}
.build()
})?;
client
.producer
.produce(encode_to_record(self.ns.clone(), self.entries)?)
.await
.map(Offset)
.context(ProduceRecordSnafu {
topic: &self.ns.topic,
})
}
}
fn encode_to_record(ns: NamespaceImpl, entries: Vec<EntryImpl>) -> Result<Record> {
let meta = RecordMeta::new(ns, &entries);
let data = entries.into_iter().flat_map(|entry| entry.data).collect();
Ok(Record {
key: Some(serde_json::to_vec(&meta).context(EncodeMetaSnafu)?),
value: Some(data),
timestamp: rskafka::chrono::Utc::now(),
headers: Default::default(),
})
}
fn decode_from_record(record: Record) -> Result<Vec<EntryImpl>> {
let key = record.key.context(MissingKeySnafu)?;
let value = record.value.context(MissingValueSnafu)?;
let meta: RecordMeta = serde_json::from_slice(&key).context(DecodeMetaSnafu)?;
let mut entries = Vec::with_capacity(meta.entry_ids.len());
let mut start_offset = 0;
for (i, end_offset) in meta.entry_offsets.iter().enumerate() {
entries.push(EntryImpl {
// TODO(niebayes): try to avoid the clone.
data: value[start_offset..*end_offset].to_vec(),
id: meta.entry_ids[i],
ns: meta.ns.clone(),
});
start_offset = *end_offset;
}
Ok(entries)
}
/// Handles the result of a consume operation on a kafka topic.
pub(crate) fn handle_consume_result(
result: ConsumeResult,
topic: &Topic,
region_id: u64,
offset: i64,
) -> Result<Vec<EntryImpl>> {
match result {
Ok((record_and_offset, _)) => {
// Only produces entries belong to the region with the given region id.
// Since a record only contains entries from a single region, it suffices to check the first entry only.
let entries = decode_from_record(record_and_offset.record)?;
if let Some(entry) = entries.first()
&& entry.id == region_id
{
Ok(entries)
} else {
Ok(vec![])
}
}
Err(e) => Err(e).context(ConsumeRecordSnafu {
topic,
region_id,
offset,
}),
}
}
#[cfg(test)]
mod tests {
use super::*;
fn new_test_entry<D: AsRef<[u8]>>(data: D, entry_id: EntryId, ns: NamespaceImpl) -> EntryImpl {
EntryImpl {
data: data.as_ref().to_vec(),
id: entry_id,
ns,
}
}
#[test]
fn test_serde_record_meta() {
let ns = NamespaceImpl {
region_id: 1,
topic: "test_topic".to_string(),
};
let entries = vec![
new_test_entry(b"111", 1, ns.clone()),
new_test_entry(b"2222", 2, ns.clone()),
new_test_entry(b"33333", 3, ns.clone()),
];
let meta = RecordMeta::new(ns, &entries);
let encoded = serde_json::to_vec(&meta).unwrap();
let decoded: RecordMeta = serde_json::from_slice(&encoded).unwrap();
assert_eq!(meta, decoded);
}
#[test]
fn test_encdec_record() {
let ns = NamespaceImpl {
region_id: 1,
topic: "test_topic".to_string(),
};
let entries = vec![
new_test_entry(b"111", 1, ns.clone()),
new_test_entry(b"2222", 2, ns.clone()),
new_test_entry(b"33333", 3, ns.clone()),
];
let record = encode_to_record(ns, entries.clone()).unwrap();
let decoded_entries = decode_from_record(record).unwrap();
assert_eq!(entries, decoded_entries);
}
}

View File

@@ -15,7 +15,6 @@
#![feature(let_chains)]
pub mod error;
#[allow(unused)]
pub mod kafka;
mod noop;
pub mod raft_engine;

View File

@@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use common_config::wal::WalOptions;
use store_api::logstore::entry::{Entry, Id as EntryId};
use store_api::logstore::namespace::{Id as NamespaceId, Namespace};
@@ -65,15 +63,11 @@ impl LogStore for NoopLogStore {
}
async fn append(&self, mut _e: Self::Entry) -> Result<AppendResponse> {
Ok(AppendResponse {
last_entry_id: Default::default(),
})
Ok(AppendResponse::default())
}
async fn append_batch(&self, _e: Vec<Self::Entry>) -> Result<AppendBatchResponse> {
Ok(AppendBatchResponse {
last_entry_ids: HashMap::new(),
})
Ok(AppendBatchResponse::default())
}
async fn read(

View File

@@ -49,6 +49,7 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug {
/// Creates a new `EntryStream` to asynchronously generates `Entry` with ids
/// starting from `id`.
// TODO(niebayes): update docs for entry id.
async fn read(
&self,
ns: &Self::Namespace,
@@ -79,7 +80,7 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug {
}
/// The response of an `append` operation.
#[derive(Debug)]
#[derive(Debug, Default)]
pub struct AppendResponse {
/// The id of the entry appended to the log store.
pub last_entry_id: EntryId,