mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
@@ -528,9 +528,6 @@ pub enum Error {
|
||||
source: common_wal::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to resolve Kafka broker endpoint."))]
|
||||
ResolveKafkaEndpoint { source: common_wal::error::Error },
|
||||
|
||||
#[snafu(display("Failed to build a Kafka controller client"))]
|
||||
BuildKafkaCtrlClient {
|
||||
#[snafu(implicit)]
|
||||
@@ -1040,7 +1037,6 @@ impl ErrorExt for Error {
|
||||
| BuildKafkaClient { .. }
|
||||
| BuildKafkaCtrlClient { .. }
|
||||
| KafkaPartitionClient { .. }
|
||||
| ResolveKafkaEndpoint { .. }
|
||||
| ProduceRecord { .. }
|
||||
| CreateKafkaWalTopic { .. }
|
||||
| EmptyTopicPool { .. }
|
||||
|
||||
@@ -25,8 +25,7 @@ use snafu::ResultExt;
|
||||
|
||||
use crate::error::{
|
||||
BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, CreateKafkaWalTopicSnafu,
|
||||
KafkaGetOffsetSnafu, KafkaPartitionClientSnafu, ProduceRecordSnafu, ResolveKafkaEndpointSnafu,
|
||||
Result, TlsConfigSnafu,
|
||||
KafkaGetOffsetSnafu, KafkaPartitionClientSnafu, ProduceRecordSnafu, Result, TlsConfigSnafu,
|
||||
};
|
||||
|
||||
// Each topic only has one partition for now.
|
||||
@@ -209,10 +208,8 @@ impl KafkaTopicCreator {
|
||||
/// Builds a kafka [Client](rskafka::client::Client).
|
||||
pub async fn build_kafka_client(connection: &KafkaConnectionConfig) -> Result<Client> {
|
||||
// Builds an kafka controller client for creating topics.
|
||||
let broker_endpoints = common_wal::resolve_to_ipv4(&connection.broker_endpoints)
|
||||
.await
|
||||
.context(ResolveKafkaEndpointSnafu)?;
|
||||
let mut builder = ClientBuilder::new(broker_endpoints).backoff_config(DEFAULT_BACKOFF_CONFIG);
|
||||
let mut builder = ClientBuilder::new(connection.broker_endpoints.clone())
|
||||
.backoff_config(DEFAULT_BACKOFF_CONFIG);
|
||||
if let Some(sasl) = &connection.sasl {
|
||||
builder = builder.sasl_config(sasl.config.clone().into_sasl_config());
|
||||
};
|
||||
|
||||
@@ -218,6 +218,7 @@ impl HeartbeatTask {
|
||||
if let Some(message) = message {
|
||||
Self::new_heartbeat_request(&heartbeat_request, Some(message), &latest_report)
|
||||
} else {
|
||||
warn!("Sender has been dropped, exiting the heartbeat loop");
|
||||
// Receives None that means Sender was dropped, we need to break the current loop
|
||||
break
|
||||
}
|
||||
@@ -259,7 +260,11 @@ impl HeartbeatTask {
|
||||
error!(e; "Error while handling heartbeat response");
|
||||
}
|
||||
}
|
||||
Ok(None) => break,
|
||||
Ok(None) => {
|
||||
warn!("Heartbeat response stream closed");
|
||||
capture_self.start_with_retry(retry_interval).await;
|
||||
break;
|
||||
}
|
||||
Err(e) => {
|
||||
error!(e; "Occur error while reading heartbeat response");
|
||||
capture_self.start_with_retry(retry_interval).await;
|
||||
|
||||
@@ -23,7 +23,7 @@ use common_meta::heartbeat::handler::{
|
||||
};
|
||||
use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMessage};
|
||||
use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
|
||||
use common_telemetry::{debug, error, info};
|
||||
use common_telemetry::{debug, error, info, warn};
|
||||
use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient};
|
||||
use servers::addrs;
|
||||
use servers::heartbeat_options::HeartbeatOptions;
|
||||
@@ -42,8 +42,8 @@ use crate::metrics::{HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT};
|
||||
pub struct HeartbeatTask {
|
||||
peer_addr: String,
|
||||
meta_client: Arc<MetaClient>,
|
||||
report_interval: u64,
|
||||
retry_interval: u64,
|
||||
report_interval: Duration,
|
||||
retry_interval: Duration,
|
||||
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
|
||||
start_time_ms: u64,
|
||||
}
|
||||
@@ -58,8 +58,8 @@ impl HeartbeatTask {
|
||||
HeartbeatTask {
|
||||
peer_addr: addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
|
||||
meta_client,
|
||||
report_interval: heartbeat_opts.interval.as_millis() as u64,
|
||||
retry_interval: heartbeat_opts.retry_interval.as_millis() as u64,
|
||||
report_interval: heartbeat_opts.interval,
|
||||
retry_interval: heartbeat_opts.retry_interval,
|
||||
resp_handler_executor,
|
||||
start_time_ms: common_time::util::current_time_millis() as u64,
|
||||
}
|
||||
@@ -103,13 +103,15 @@ impl HeartbeatTask {
|
||||
HEARTBEAT_RECV_COUNT.with_label_values(&["success"]).inc();
|
||||
}
|
||||
}
|
||||
Ok(None) => break,
|
||||
Ok(None) => {
|
||||
warn!("Heartbeat response stream closed");
|
||||
capture_self.start_with_retry(retry_interval).await;
|
||||
break;
|
||||
}
|
||||
Err(e) => {
|
||||
HEARTBEAT_RECV_COUNT.with_label_values(&["error"]).inc();
|
||||
error!(e; "Occur error while reading heartbeat response");
|
||||
capture_self
|
||||
.start_with_retry(Duration::from_millis(retry_interval))
|
||||
.await;
|
||||
capture_self.start_with_retry(retry_interval).await;
|
||||
|
||||
break;
|
||||
}
|
||||
@@ -177,12 +179,13 @@ impl HeartbeatTask {
|
||||
if let Some(message) = message {
|
||||
Self::new_heartbeat_request(&heartbeat_request, Some(message))
|
||||
} else {
|
||||
warn!("Sender has been dropped, exiting the heartbeat loop");
|
||||
// Receives None that means Sender was dropped, we need to break the current loop
|
||||
break
|
||||
}
|
||||
}
|
||||
_ = &mut sleep => {
|
||||
sleep.as_mut().reset(Instant::now() + Duration::from_millis(report_interval));
|
||||
sleep.as_mut().reset(Instant::now() + report_interval);
|
||||
Self::new_heartbeat_request(&heartbeat_request, None)
|
||||
}
|
||||
};
|
||||
|
||||
@@ -139,9 +139,6 @@ pub enum Error {
|
||||
error: rskafka::client::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to resolve Kafka broker endpoint."))]
|
||||
ResolveKafkaEndpoint { source: common_wal::error::Error },
|
||||
|
||||
#[snafu(display(
|
||||
"Failed to build a Kafka partition client, topic: {}, partition: {}",
|
||||
topic,
|
||||
@@ -343,7 +340,6 @@ impl ErrorExt for Error {
|
||||
StartWalTask { .. }
|
||||
| StopWalTask { .. }
|
||||
| IllegalState { .. }
|
||||
| ResolveKafkaEndpoint { .. }
|
||||
| NoMaxValue { .. }
|
||||
| Cast { .. }
|
||||
| EncodeJson { .. }
|
||||
|
||||
@@ -24,9 +24,7 @@ use snafu::ResultExt;
|
||||
use store_api::logstore::provider::KafkaProvider;
|
||||
use tokio::sync::{Mutex, RwLock};
|
||||
|
||||
use crate::error::{
|
||||
BuildClientSnafu, BuildPartitionClientSnafu, ResolveKafkaEndpointSnafu, Result, TlsConfigSnafu,
|
||||
};
|
||||
use crate::error::{BuildClientSnafu, BuildPartitionClientSnafu, Result, TlsConfigSnafu};
|
||||
use crate::kafka::index::{GlobalIndexCollector, NoopCollector};
|
||||
use crate::kafka::producer::{OrderedBatchProducer, OrderedBatchProducerRef};
|
||||
|
||||
@@ -78,11 +76,8 @@ impl ClientManager {
|
||||
high_watermark: Arc<DashMap<Arc<KafkaProvider>, u64>>,
|
||||
) -> Result<Self> {
|
||||
// Sets backoff config for the top-level kafka client and all clients constructed by it.
|
||||
let broker_endpoints = common_wal::resolve_to_ipv4(&config.connection.broker_endpoints)
|
||||
.await
|
||||
.context(ResolveKafkaEndpointSnafu)?;
|
||||
let mut builder =
|
||||
ClientBuilder::new(broker_endpoints).backoff_config(DEFAULT_BACKOFF_CONFIG);
|
||||
let mut builder = ClientBuilder::new(config.connection.broker_endpoints.clone())
|
||||
.backoff_config(DEFAULT_BACKOFF_CONFIG);
|
||||
if let Some(sasl) = &config.connection.sasl {
|
||||
builder = builder.sasl_config(sasl.config.clone().into_sasl_config());
|
||||
};
|
||||
|
||||
@@ -27,10 +27,9 @@ use snafu::OptionExt;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
use tonic::{Request, Response, Streaming};
|
||||
use tonic::{Request, Response, Status, Streaming};
|
||||
|
||||
use crate::error;
|
||||
use crate::error::Result;
|
||||
use crate::error::{self, Result};
|
||||
use crate::handler::{HeartbeatHandlerGroup, Pusher, PusherId};
|
||||
use crate::metasrv::{Context, Metasrv};
|
||||
use crate::metrics::METRIC_META_HEARTBEAT_RECV;
|
||||
@@ -109,6 +108,12 @@ impl heartbeat_server::Heartbeat for Metasrv {
|
||||
|
||||
if is_not_leader {
|
||||
warn!("Quit because it is no longer the leader");
|
||||
let _ = tx
|
||||
.send(Err(Status::aborted(format!(
|
||||
"The requested metasrv node is not leader, node addr: {}",
|
||||
ctx.server_addr
|
||||
))))
|
||||
.await;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user