Rework etcd timeline updates and their handling

This commit is contained in:
Kirill Bulatov
2022-05-02 23:28:54 +03:00
committed by Kirill Bulatov
parent d059e588a6
commit 7c49abe7d1
5 changed files with 1322 additions and 1115 deletions

View File

@@ -6,17 +6,13 @@ pub mod subscription_key;
/// All broker values, possible to use when dealing with etcd.
pub mod subscription_value;
use std::{
collections::{hash_map, HashMap},
str::FromStr,
};
use std::str::FromStr;
use serde::de::DeserializeOwned;
use subscription_key::SubscriptionKey;
use tokio::{sync::mpsc, task::JoinHandle};
use tracing::*;
use utils::zid::{NodeId, ZTenantTimelineId};
use crate::subscription_key::SubscriptionFullKey;
@@ -28,18 +24,17 @@ pub const DEFAULT_NEON_BROKER_ETCD_PREFIX: &str = "neon";
/// A way to control the data retrieval from a certain subscription.
pub struct BrokerSubscription<V> {
value_updates: mpsc::UnboundedReceiver<HashMap<ZTenantTimelineId, HashMap<NodeId, V>>>,
/// An unbounded channel to fetch the relevant etcd updates from.
pub value_updates: mpsc::UnboundedReceiver<BrokerUpdate<V>>,
key: SubscriptionKey,
watcher_handle: JoinHandle<Result<(), BrokerError>>,
/// A subscription task handle, to allow waiting on it for the task to complete.
/// Both the updates channel and the handle require `&mut`, so it's better to keep
/// both `pub` to allow using both in the same structures without borrow checker complaining.
pub watcher_handle: JoinHandle<Result<(), BrokerError>>,
watcher: Watcher,
}
impl<V> BrokerSubscription<V> {
/// Asynchronously polls for more data from the subscription, suspending the current future if there's no data sent yet.
pub async fn fetch_data(&mut self) -> Option<HashMap<ZTenantTimelineId, HashMap<NodeId, V>>> {
self.value_updates.recv().await
}
/// Cancels the subscription, stopping the data poller and waiting for it to shut down.
pub async fn cancel(mut self) -> Result<(), BrokerError> {
self.watcher.cancel().await.map_err(|e| {
@@ -48,15 +43,41 @@ impl<V> BrokerSubscription<V> {
format!("Failed to cancel broker subscription, kind: {:?}", self.key),
)
})?;
self.watcher_handle.await.map_err(|e| {
BrokerError::InternalError(format!(
"Failed to join the broker value updates task, kind: {:?}, error: {e}",
self.key
))
})?
match (&mut self.watcher_handle).await {
Ok(res) => res,
Err(e) => {
if e.is_cancelled() {
// don't error on the tasks that are cancelled already
Ok(())
} else {
Err(BrokerError::InternalError(format!(
"Panicked during broker subscription task, kind: {:?}, error: {e}",
self.key
)))
}
}
}
}
}
impl<V> Drop for BrokerSubscription<V> {
fn drop(&mut self) {
// we poll data from etcd into the channel in the same struct, so if the whole struct gets dropped,
// no more data is used by the receiver and it's safe to cancel and drop the whole etcd subscription task.
self.watcher_handle.abort();
}
}
/// An update from the etcd broker.
pub struct BrokerUpdate<V> {
/// Etcd generation version, the bigger the more actual the data is.
pub etcd_version: i64,
/// Etcd key for the corresponding value, parsed from the broker KV.
pub key: SubscriptionFullKey,
/// Current etcd value, parsed from the broker KV.
pub value: V,
}
#[derive(Debug, thiserror::Error)]
pub enum BrokerError {
#[error("Etcd client error: {0}. Context: {1}")]
@@ -124,41 +145,21 @@ where
break;
}
let mut value_updates: HashMap<ZTenantTimelineId, HashMap<NodeId, V>> = HashMap::new();
// Keep track that the timeline data updates from etcd arrive in the right order.
// https://etcd.io/docs/v3.5/learning/api_guarantees/#isolation-level-and-consistency-of-replicas
// > etcd does not ensure linearizability for watch operations. Users are expected to verify the revision of watch responses to ensure correct ordering.
let mut value_etcd_versions: HashMap<ZTenantTimelineId, i64> = HashMap::new();
let events = resp.events();
debug!("Processing {} events", events.len());
for event in events {
if EventType::Put == event.event_type() {
if let Some(new_etcd_kv) = event.kv() {
let new_kv_version = new_etcd_kv.version();
match parse_etcd_kv(new_etcd_kv, &value_parser, &key.cluster_prefix) {
Ok(Some((key, value))) => match value_updates
.entry(key.id)
.or_default()
.entry(key.node_id)
{
hash_map::Entry::Occupied(mut o) => {
let old_etcd_kv_version = value_etcd_versions.get(&key.id).copied().unwrap_or(i64::MIN);
if old_etcd_kv_version < new_kv_version {
o.insert(value);
value_etcd_versions.insert(key.id,new_kv_version);
} else {
debug!("Skipping etcd timeline update due to older version compared to one that's already stored");
}
}
hash_map::Entry::Vacant(v) => {
v.insert(value);
value_etcd_versions.insert(key.id,new_kv_version);
}
},
Ok(Some((key, value))) => if let Err(e) = value_updates_sender.send(BrokerUpdate {
etcd_version: new_etcd_kv.version(),
key,
value,
}) {
info!("Broker value updates for key {key:?} sender got dropped, exiting: {e}");
break;
},
Ok(None) => debug!("Ignoring key {key:?} : no value was returned by the parser"),
Err(BrokerError::KeyNotParsed(e)) => debug!("Unexpected key {key:?} for timeline update: {e}"),
Err(e) => error!("Failed to represent etcd KV {new_etcd_kv:?}: {e}"),
@@ -166,13 +167,6 @@ where
}
}
}
if !value_updates.is_empty() {
if let Err(e) = value_updates_sender.send(value_updates) {
info!("Broker value updates for key {key:?} sender got dropped, exiting: {e}");
break;
}
}
}
Ok(())

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -1,5 +1,5 @@
//! Actual Postgres connection handler to stream WAL to the server.
//! Runs as a separate, cancellable Tokio task.
use std::{
str::FromStr,
sync::Arc,
@@ -10,113 +10,29 @@ use anyhow::{bail, ensure, Context};
use bytes::BytesMut;
use fail::fail_point;
use postgres::{SimpleQueryMessage, SimpleQueryRow};
use postgres_ffi::waldecoder::WalStreamDecoder;
use postgres_protocol::message::backend::ReplicationMessage;
use postgres_types::PgLsn;
use tokio::{pin, select, sync::watch, time};
use tokio_postgres::{replication::ReplicationStream, Client};
use tokio_stream::StreamExt;
use tracing::{debug, error, info, info_span, trace, warn, Instrument};
use utils::{
lsn::Lsn,
pq_proto::ReplicationFeedback,
zid::{NodeId, ZTenantTimelineId},
};
use super::TaskEvent;
use crate::{
http::models::WalReceiverEntry,
repository::{Repository, Timeline},
tenant_mgr,
walingest::WalIngest,
};
use postgres_ffi::waldecoder::WalStreamDecoder;
use utils::{lsn::Lsn, pq_proto::ReplicationFeedback, zid::ZTenantTimelineId};
#[derive(Debug, Clone)]
pub enum WalConnectionEvent {
Started,
NewWal(ReplicationFeedback),
End(Result<(), String>),
}
/// A wrapper around standalone Tokio task, to poll its updates or cancel the task.
#[derive(Debug)]
pub struct WalReceiverConnection {
handle: tokio::task::JoinHandle<()>,
cancellation: watch::Sender<()>,
events_receiver: watch::Receiver<WalConnectionEvent>,
}
impl WalReceiverConnection {
/// Initializes the connection task, returning a set of handles on top of it.
/// The task is started immediately after the creation, fails if no connection is established during the timeout given.
pub fn open(
id: ZTenantTimelineId,
safekeeper_id: NodeId,
wal_producer_connstr: String,
connect_timeout: Duration,
) -> Self {
let (cancellation, mut cancellation_receiver) = watch::channel(());
let (events_sender, events_receiver) = watch::channel(WalConnectionEvent::Started);
let handle = tokio::spawn(
async move {
let connection_result = handle_walreceiver_connection(
id,
&wal_producer_connstr,
&events_sender,
&mut cancellation_receiver,
connect_timeout,
)
.await
.map_err(|e| {
format!("Walreceiver connection for id {id} failed with error: {e:#}")
});
match &connection_result {
Ok(()) => {
debug!("Walreceiver connection for id {id} ended successfully")
}
Err(e) => warn!("{e}"),
}
events_sender
.send(WalConnectionEvent::End(connection_result))
.ok();
}
.instrument(info_span!("safekeeper_handle", sk = %safekeeper_id)),
);
Self {
handle,
cancellation,
events_receiver,
}
}
/// Polls for the next WAL receiver event, if there's any available since the last check.
/// Blocks if there's no new event available, returns `None` if no new events will ever occur.
/// Only the last event is returned, all events received between observatins are lost.
pub async fn next_event(&mut self) -> Option<WalConnectionEvent> {
match self.events_receiver.changed().await {
Ok(()) => Some(self.events_receiver.borrow().clone()),
Err(_cancellation_error) => None,
}
}
/// Gracefully aborts current WAL streaming task, waiting for the current WAL streamed.
pub async fn shutdown(&mut self) -> anyhow::Result<()> {
self.cancellation.send(()).ok();
let handle = &mut self.handle;
handle
.await
.context("Failed to join on a walreceiver connection task")?;
Ok(())
}
}
async fn handle_walreceiver_connection(
/// Opens a conneciton to the given wal producer and streams the WAL, sending progress messages during streaming.
pub async fn handle_walreceiver_connection(
id: ZTenantTimelineId,
wal_producer_connstr: &str,
events_sender: &watch::Sender<WalConnectionEvent>,
cancellation: &mut watch::Receiver<()>,
events_sender: &watch::Sender<TaskEvent<ReplicationFeedback>>,
mut cancellation: watch::Receiver<()>,
connect_timeout: Duration,
) -> anyhow::Result<()> {
// Connect to the database in replication mode.
@@ -214,8 +130,6 @@ async fn handle_walreceiver_connection(
while let Some(replication_message) = {
select! {
// check for shutdown first
biased;
_ = cancellation.changed() => {
info!("walreceiver interrupted");
None
@@ -344,7 +258,7 @@ async fn handle_walreceiver_connection(
.as_mut()
.zenith_status_update(data.len() as u64, &data)
.await?;
if let Err(e) = events_sender.send(WalConnectionEvent::NewWal(zenith_status_update)) {
if let Err(e) = events_sender.send(TaskEvent::NewEvent(zenith_status_update)) {
warn!("Wal connection event listener dropped, aborting the connection: {e}");
return Ok(());
}

View File

@@ -221,15 +221,12 @@ async fn pull_loop(conf: SafeKeeperConf) -> Result<()> {
.await
.context("failed to subscribe for safekeeper info")?;
loop {
match subscription.fetch_data().await {
match subscription.value_updates.recv().await {
Some(new_info) => {
for (zttid, sk_info) in new_info {
// note: there are blocking operations below, but it's considered fine for now
if let Ok(tli) = GlobalTimelines::get(&conf, zttid, false) {
for (safekeeper_id, info) in sk_info {
tli.record_safekeeper_info(&info, safekeeper_id).await?
}
}
// note: there are blocking operations below, but it's considered fine for now
if let Ok(tli) = GlobalTimelines::get(&conf, new_info.key.id, false) {
tli.record_safekeeper_info(&new_info.value, new_info.key.node_id)
.await?
}
}
None => {