Preserve task result in TaskHandle by keeping join handle around (#2521)

* Preserve task result in TaskHandle by keeping join handle around

The solution is not great, but it should hep to debug staging issue
I tried to do it in a least destructive way. TaskHandle used only in
one place so it is ok to use something less generic unless we want
to extend its usage across the codebase. In its current current form
for its single usage place it looks too abstract

Some problems around this code:
1. Task can drop event sender and continue running
2. Task cannot be joined several times (probably not needed,
    but still, can be surprising)
3. Had to split task event into two types because ahyhow::Error
    does not implement clone. So TaskContinueEvent derives clone
    but usual task evend does not. Clone requirement appears
    because we clone the current value in next_task_event.
    Taking it by reference is complicated.
4. Split between Init and Started is artificial and comes from
    watch::channel requirement to have some initial value.

    To summarize from 3 and 4. It may be a better idea to use
    RWLock or a bounded channel instead
This commit is contained in:
Dmitry Rodionov
2022-09-26 23:57:02 +03:00
committed by GitHub
parent d15116f2cc
commit fb68d01449
3 changed files with 89 additions and 46 deletions

View File

@@ -31,7 +31,6 @@ use etcd_broker::Client;
use itertools::Itertools;
use once_cell::sync::OnceCell;
use std::future::Future;
use std::sync::Arc;
use tokio::sync::watch;
use tracing::*;
use url::Url;
@@ -88,37 +87,44 @@ pub fn is_etcd_client_initialized() -> bool {
/// That may lead to certain events not being observed by the listener.
#[derive(Debug)]
pub struct TaskHandle<E> {
events_receiver: watch::Receiver<TaskEvent<E>>,
join_handle: Option<tokio::task::JoinHandle<anyhow::Result<()>>>,
events_receiver: watch::Receiver<TaskStateUpdate<E>>,
cancellation: watch::Sender<()>,
}
#[derive(Debug, Clone)]
pub enum TaskEvent<E> {
Update(TaskStateUpdate<E>),
End(anyhow::Result<()>),
}
#[derive(Debug, Clone)]
pub enum TaskStateUpdate<E> {
Init,
Started,
NewEvent(E),
End,
Progress(E),
}
impl<E: Clone> TaskHandle<E> {
/// Initializes the task, starting it immediately after the creation.
pub fn spawn<Fut>(
task: impl FnOnce(Arc<watch::Sender<TaskEvent<E>>>, watch::Receiver<()>) -> Fut + Send + 'static,
task: impl FnOnce(watch::Sender<TaskStateUpdate<E>>, watch::Receiver<()>) -> Fut
+ Send
+ 'static,
) -> Self
where
Fut: Future<Output = Result<(), String>> + Send,
E: Sync + Send + 'static,
Fut: Future<Output = anyhow::Result<()>> + Send,
E: Send + Sync + 'static,
{
let (cancellation, cancellation_receiver) = watch::channel(());
let (events_sender, events_receiver) = watch::channel(TaskEvent::Started);
let events_sender = Arc::new(events_sender);
let (events_sender, events_receiver) = watch::channel(TaskStateUpdate::Started);
let sender = Arc::clone(&events_sender);
let _ = WALRECEIVER_RUNTIME.spawn(async move {
events_sender.send(TaskEvent::Started).ok();
task(sender, cancellation_receiver).await
let join_handle = WALRECEIVER_RUNTIME.spawn(async move {
events_sender.send(TaskStateUpdate::Started).ok();
task(events_sender, cancellation_receiver).await
});
TaskHandle {
join_handle: Some(join_handle),
events_receiver,
cancellation,
}
@@ -126,15 +132,45 @@ impl<E: Clone> TaskHandle<E> {
async fn next_task_event(&mut self) -> TaskEvent<E> {
match self.events_receiver.changed().await {
Ok(()) => self.events_receiver.borrow().clone(),
Err(_task_channel_part_dropped) => TaskEvent::End,
Ok(()) => TaskEvent::Update((self.events_receiver.borrow()).clone()),
Err(_task_channel_part_dropped) => {
TaskEvent::End(match self.join_handle.take() {
Some(jh) => {
if !jh.is_finished() {
warn!("sender is dropped while join handle is still alive");
}
jh.await
.map_err(|e| anyhow::anyhow!("Failed to join task: {e}"))
.and_then(|x| x)
}
None => {
// Another option is to have an enum, join handle or result and give away the reference to it
Err(anyhow::anyhow!("Task was joined more than once"))
}
})
}
}
}
/// Aborts current task, waiting for it to finish.
pub async fn shutdown(mut self) {
self.cancellation.send(()).ok();
// wait until the sender is dropped
while self.events_receiver.changed().await.is_ok() {}
pub async fn shutdown(self) {
match self.join_handle {
Some(jh) => {
self.cancellation.send(()).ok();
match jh.await {
Ok(Ok(())) => debug!("Shutdown success"),
Ok(Err(e)) => error!("Shutdown task error: {e:?}"),
Err(join_error) => {
if join_error.is_cancelled() {
error!("Shutdown task was cancelled");
} else {
error!("Shutdown task join error: {join_error}")
}
}
}
}
None => {}
}
}
}

View File

@@ -16,10 +16,10 @@ use std::{
time::Duration,
};
use crate::task_mgr;
use crate::task_mgr::TaskKind;
use crate::task_mgr::WALRECEIVER_RUNTIME;
use crate::tenant::Timeline;
use crate::{task_mgr, walreceiver::TaskStateUpdate};
use anyhow::Context;
use chrono::{NaiveDateTime, Utc};
use etcd_broker::{
@@ -145,19 +145,26 @@ async fn connection_manager_loop_step(
let wal_connection = walreceiver_state.wal_connection.as_mut()
.expect("Should have a connection, as checked by the corresponding select! guard");
match wal_connection_update {
TaskEvent::Started => {},
TaskEvent::NewEvent(status) => {
if status.has_processed_wal {
// We have advanced last_record_lsn by processing the WAL received
// from this safekeeper. This is good enough to clean unsuccessful
// retries history and allow reconnecting to this safekeeper without
// sleeping for a long time.
walreceiver_state.wal_connection_retries.remove(&wal_connection.sk_id);
TaskEvent::Update(c) => {
match c {
TaskStateUpdate::Init | TaskStateUpdate::Started => {},
TaskStateUpdate::Progress(status) => {
if status.has_processed_wal {
// We have advanced last_record_lsn by processing the WAL received
// from this safekeeper. This is good enough to clean unsuccessful
// retries history and allow reconnecting to this safekeeper without
// sleeping for a long time.
walreceiver_state.wal_connection_retries.remove(&wal_connection.sk_id);
}
wal_connection.status = status.to_owned();
}
}
wal_connection.status = status;
},
TaskEvent::End => {
debug!("WAL receiving task finished");
TaskEvent::End(walreceiver_task_result) => {
match walreceiver_task_result {
Ok(()) => debug!("WAL receiving task finished"),
Err(e) => error!("wal receiver task finished with an error: {e:?}"),
}
walreceiver_state.drop_old_connection(false).await;
},
}
@@ -363,13 +370,13 @@ impl WalreceiverState {
async move {
super::walreceiver_connection::handle_walreceiver_connection(
timeline,
&new_wal_source_connstr,
events_sender.as_ref(),
new_wal_source_connstr,
events_sender,
cancellation,
connect_timeout,
)
.await
.map_err(|e| format!("walreceiver connection handling failure: {e:#}"))
.context("walreceiver connection handling failure")
}
.instrument(info_span!("walreceiver_connection", id = %id))
});
@@ -885,7 +892,7 @@ mod tests {
status: connection_status.clone(),
connection_task: TaskHandle::spawn(move |sender, _| async move {
sender
.send(TaskEvent::NewEvent(connection_status.clone()))
.send(TaskStateUpdate::Progress(connection_status.clone()))
.ok();
Ok(())
}),
@@ -1145,7 +1152,7 @@ mod tests {
status: connection_status.clone(),
connection_task: TaskHandle::spawn(move |sender, _| async move {
sender
.send(TaskEvent::NewEvent(connection_status.clone()))
.send(TaskStateUpdate::Progress(connection_status.clone()))
.ok();
Ok(())
}),
@@ -1233,7 +1240,7 @@ mod tests {
status: connection_status.clone(),
connection_task: TaskHandle::spawn(move |sender, _| async move {
sender
.send(TaskEvent::NewEvent(connection_status.clone()))
.send(TaskStateUpdate::Progress(connection_status.clone()))
.ok();
Ok(())
}),

View File

@@ -18,8 +18,7 @@ use tokio::{pin, select, sync::watch, time};
use tokio_postgres::{replication::ReplicationStream, Client};
use tracing::{debug, error, info, trace, warn};
use super::TaskEvent;
use crate::metrics::LIVE_CONNECTIONS_COUNT;
use crate::{metrics::LIVE_CONNECTIONS_COUNT, walreceiver::TaskStateUpdate};
use crate::{
task_mgr,
task_mgr::TaskKind,
@@ -55,8 +54,8 @@ pub struct WalConnectionStatus {
/// messages as we go.
pub async fn handle_walreceiver_connection(
timeline: Arc<Timeline>,
wal_source_connstr: &str,
events_sender: &watch::Sender<TaskEvent<WalConnectionStatus>>,
wal_source_connstr: String,
events_sender: watch::Sender<TaskStateUpdate<WalConnectionStatus>>,
mut cancellation: watch::Receiver<()>,
connect_timeout: Duration,
) -> anyhow::Result<()> {
@@ -81,7 +80,7 @@ pub async fn handle_walreceiver_connection(
streaming_lsn: None,
commit_lsn: None,
};
if let Err(e) = events_sender.send(TaskEvent::NewEvent(connection_status.clone())) {
if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status.clone())) {
warn!("Wal connection event listener dropped right after connection init, aborting the connection: {e}");
return Ok(());
}
@@ -133,7 +132,7 @@ pub async fn handle_walreceiver_connection(
connection_status.latest_connection_update = Utc::now().naive_utc();
connection_status.latest_wal_update = Utc::now().naive_utc();
connection_status.commit_lsn = Some(end_of_wal);
if let Err(e) = events_sender.send(TaskEvent::NewEvent(connection_status.clone())) {
if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status.clone())) {
warn!("Wal connection event listener dropped after IDENTIFY_SYSTEM, aborting the connection: {e}");
return Ok(());
}
@@ -201,7 +200,7 @@ pub async fn handle_walreceiver_connection(
}
&_ => {}
};
if let Err(e) = events_sender.send(TaskEvent::NewEvent(connection_status.clone())) {
if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status.clone())) {
warn!("Wal connection event listener dropped, aborting the connection: {e}");
return Ok(());
}
@@ -267,7 +266,8 @@ pub async fn handle_walreceiver_connection(
if !connection_status.has_processed_wal && last_rec_lsn > last_rec_lsn_before_msg {
// We have successfully processed at least one WAL record.
connection_status.has_processed_wal = true;
if let Err(e) = events_sender.send(TaskEvent::NewEvent(connection_status.clone())) {
if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status.clone()))
{
warn!("Wal connection event listener dropped, aborting the connection: {e}");
return Ok(());
}