mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-05 21:02:58 +00:00
fix(remote_wal): some known issues (#3052)
* fix: some known issues * fix: CR * fix: CR * chore: replace Mutex with RwLock
This commit is contained in:
@@ -12,17 +12,17 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_config::wal::{KafkaConfig, KafkaWalTopic as Topic};
|
||||
use dashmap::mapref::entry::Entry as DashMapEntry;
|
||||
use dashmap::DashMap;
|
||||
use rskafka::client::partition::{PartitionClient, UnknownTopicHandling};
|
||||
use rskafka::client::producer::aggregator::RecordAggregator;
|
||||
use rskafka::client::producer::{BatchProducer, BatchProducerBuilder};
|
||||
use rskafka::client::{Client as RsKafkaClient, ClientBuilder};
|
||||
use rskafka::BackoffConfig;
|
||||
use snafu::ResultExt;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use crate::error::{BuildClientSnafu, BuildPartitionClientSnafu, Result};
|
||||
|
||||
@@ -67,7 +67,7 @@ pub(crate) struct ClientManager {
|
||||
client_factory: RsKafkaClient,
|
||||
/// A pool maintaining a collection of clients.
|
||||
/// Key: a topic. Value: the associated client of the topic.
|
||||
client_pool: DashMap<Topic, Client>,
|
||||
client_pool: RwLock<HashMap<Topic, Client>>,
|
||||
}
|
||||
|
||||
impl ClientManager {
|
||||
@@ -91,18 +91,28 @@ impl ClientManager {
|
||||
Ok(Self {
|
||||
config: config.clone(),
|
||||
client_factory: client,
|
||||
client_pool: DashMap::new(),
|
||||
client_pool: RwLock::new(HashMap::new()),
|
||||
})
|
||||
}
|
||||
|
||||
/// Gets the client associated with the topic. If the client does not exist, a new one will
|
||||
/// be created and returned.
|
||||
pub(crate) async fn get_or_insert(&self, topic: &Topic) -> Result<Client> {
|
||||
match self.client_pool.entry(topic.to_string()) {
|
||||
DashMapEntry::Occupied(entry) => Ok(entry.get().clone()),
|
||||
DashMapEntry::Vacant(entry) => {
|
||||
let topic_client = self.try_create_client(topic).await?;
|
||||
Ok(entry.insert(topic_client).clone())
|
||||
let client_pool = self.client_pool.read().await;
|
||||
if let Some(client) = client_pool.get(topic) {
|
||||
return Ok(client.clone());
|
||||
}
|
||||
// Manullay releases the read lock.
|
||||
drop(client_pool);
|
||||
|
||||
// Acquires the write lock.
|
||||
let mut client_pool = self.client_pool.write().await;
|
||||
match client_pool.get(topic) {
|
||||
Some(client) => Ok(client.clone()),
|
||||
None => {
|
||||
let client = self.try_create_client(topic).await?;
|
||||
client_pool.insert(topic.clone(), client.clone());
|
||||
Ok(client)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -100,25 +100,24 @@ impl LogStore for KafkaLogStore {
|
||||
.push(entry);
|
||||
}
|
||||
|
||||
// Builds a record from entries belong to a region and produces them to kafka server.
|
||||
let region_ids = producers.keys().cloned().collect::<Vec<_>>();
|
||||
// Produces entries for each region and gets the offset those entries written to.
|
||||
// The returned offset is then converted into an entry id.
|
||||
let last_entry_ids = futures::future::try_join_all(producers.into_iter().map(
|
||||
|(region_id, producer)| async move {
|
||||
let entry_id = producer
|
||||
.produce(&self.client_manager)
|
||||
.await
|
||||
.map(TryInto::try_into)??;
|
||||
Ok((region_id, entry_id))
|
||||
},
|
||||
))
|
||||
.await?
|
||||
.into_iter()
|
||||
.collect::<HashMap<_, _>>();
|
||||
|
||||
let tasks = producers
|
||||
.into_values()
|
||||
.map(|producer| producer.produce(&self.client_manager))
|
||||
.collect::<Vec<_>>();
|
||||
// Each produce operation returns a kafka offset of the produced record.
|
||||
// The offsets are then converted to entry ids.
|
||||
let entry_ids = futures::future::try_join_all(tasks)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(TryInto::try_into)
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
debug!("The entries are appended at offsets {:?}", entry_ids);
|
||||
debug!("Append batch result: {:?}", last_entry_ids);
|
||||
|
||||
Ok(AppendBatchResponse {
|
||||
last_entry_ids: region_ids.into_iter().zip(entry_ids).collect(),
|
||||
})
|
||||
Ok(AppendBatchResponse { last_entry_ids })
|
||||
}
|
||||
|
||||
/// Creates a new `EntryStream` to asynchronously generates `Entry` with entry ids
|
||||
@@ -186,7 +185,7 @@ impl LogStore for KafkaLogStore {
|
||||
record_offset, ns_clone, high_watermark
|
||||
);
|
||||
|
||||
// Ignores the noop record.
|
||||
// Ignores noop records.
|
||||
if record.record.value.is_none() {
|
||||
continue;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user