mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-27 23:30:38 +00:00
Compare commits
9 Commits
min_prefet
...
vlad/debug
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
95bb6ce2e4 | ||
|
|
858f5f2ddc | ||
|
|
a4fdef69ad | ||
|
|
dae1b58964 | ||
|
|
e91a410472 | ||
|
|
9553a2670e | ||
|
|
74d2d233e4 | ||
|
|
57edf217b7 | ||
|
|
ab1335cba0 |
@@ -2146,12 +2146,13 @@ impl Timeline {
|
||||
// Regardless of whether we're going to try_freeze_and_flush
|
||||
// or not, stop ingesting any more data.
|
||||
let walreceiver = self.walreceiver.lock().unwrap().take();
|
||||
tracing::debug!(
|
||||
tracing::info!(
|
||||
is_some = walreceiver.is_some(),
|
||||
"Waiting for WalReceiverManager..."
|
||||
);
|
||||
if let Some(walreceiver) = walreceiver {
|
||||
walreceiver.shutdown().await;
|
||||
tracing::info!("WalReceiverManager shut down");
|
||||
}
|
||||
// ... and inform any waiters for newer LSNs that there won't be any.
|
||||
self.last_record_lsn.shutdown();
|
||||
@@ -2248,6 +2249,7 @@ impl Timeline {
|
||||
// As documented in remote_client.stop()'s doc comment, it's our responsibility
|
||||
// to shut down the upload queue tasks.
|
||||
// TODO: fix that, task management should be encapsulated inside remote_client.
|
||||
tracing::info!("Waiting for remote uploads tasks...");
|
||||
task_mgr::shutdown_tasks(
|
||||
Some(TaskKind::RemoteUploadTask),
|
||||
Some(self.tenant_shard_id),
|
||||
@@ -2256,12 +2258,13 @@ impl Timeline {
|
||||
.await;
|
||||
|
||||
// TODO: work toward making this a no-op. See this function's doc comment for more context.
|
||||
tracing::debug!("Waiting for tasks...");
|
||||
tracing::info!("Waiting for tasks...");
|
||||
task_mgr::shutdown_tasks(None, Some(self.tenant_shard_id), Some(self.timeline_id)).await;
|
||||
|
||||
{
|
||||
// Allow any remaining in-memory layers to do cleanup -- until that, they hold the gate
|
||||
// open.
|
||||
tracing::info!("Waiting for layer manager shutdown...");
|
||||
let mut write_guard = self.write_lock.lock().await;
|
||||
self.layers
|
||||
.write(LayerManagerLockHolder::Shutdown)
|
||||
@@ -2273,6 +2276,7 @@ impl Timeline {
|
||||
//
|
||||
// TODO: once above shutdown_tasks is a no-op, we can close the gate before calling shutdown_tasks
|
||||
// and use a TBD variant of shutdown_tasks that asserts that there were no tasks left.
|
||||
tracing::info!("Waiting for timeline gate close...");
|
||||
self.gate.close().await;
|
||||
|
||||
self.metrics.shutdown();
|
||||
@@ -4670,6 +4674,7 @@ impl Timeline {
|
||||
};
|
||||
|
||||
info!("started flush loop");
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = self.cancel.cancelled() => {
|
||||
@@ -4684,13 +4689,12 @@ impl Timeline {
|
||||
// The highest LSN to which we flushed in the loop over frozen layers
|
||||
let mut flushed_to_lsn = Lsn(0);
|
||||
|
||||
let result = loop {
|
||||
// Force not bailing early by wrapping the code into a closure.
|
||||
#[allow(clippy::redundant_closure_call)]
|
||||
let result = (async || { loop {
|
||||
if self.cancel.is_cancelled() {
|
||||
info!("dropping out of flush loop for timeline shutdown");
|
||||
// Note: we do not bother transmitting into [`layer_flush_done_tx`], because
|
||||
// anyone waiting on that will respect self.cancel as well: they will stop
|
||||
// waiting at the same time we as drop out of this loop.
|
||||
return;
|
||||
break Err(FlushLayerError::Cancelled);
|
||||
}
|
||||
|
||||
// Break to notify potential waiters as soon as we've flushed the requested LSN. If
|
||||
@@ -4703,8 +4707,8 @@ impl Timeline {
|
||||
let (layer, l0_count, frozen_count, frozen_size) = {
|
||||
let layers = self.layers.read(LayerManagerLockHolder::FlushLoop).await;
|
||||
let Ok(lm) = layers.layer_map() else {
|
||||
info!("dropping out of flush loop for timeline shutdown");
|
||||
return;
|
||||
info!("dropping out of flush loop for layer map shutdown");
|
||||
break Err(FlushLayerError::Cancelled);
|
||||
};
|
||||
let l0_count = lm.level0_deltas().len();
|
||||
let frozen_count = lm.frozen_layers.len();
|
||||
@@ -4752,8 +4756,8 @@ impl Timeline {
|
||||
match self.flush_frozen_layer(layer, ctx).await {
|
||||
Ok(layer_lsn) => flushed_to_lsn = max(flushed_to_lsn, layer_lsn),
|
||||
Err(FlushLayerError::Cancelled) => {
|
||||
info!("dropping out of flush loop for timeline shutdown");
|
||||
return;
|
||||
info!("dropping out of flush loop for remote client shutdown");
|
||||
break Err(FlushLayerError::Cancelled);
|
||||
}
|
||||
err @ Err(
|
||||
FlushLayerError::NotRunning(_)
|
||||
@@ -4794,7 +4798,7 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
}})().await;
|
||||
|
||||
// Unsharded tenants should never advance their LSN beyond the end of the
|
||||
// highest layer they write: such gaps between layer data and the frozen LSN
|
||||
@@ -7408,7 +7412,7 @@ impl TimelineWriter<'_> {
|
||||
|
||||
if let Some(wait_threshold) = wait_threshold {
|
||||
if l0_count >= wait_threshold {
|
||||
debug!(
|
||||
info!(
|
||||
"layer roll waiting for flush due to compaction backpressure at {l0_count} L0 layers"
|
||||
);
|
||||
self.tl.wait_flush_completion(flush_id).await?;
|
||||
|
||||
@@ -106,11 +106,12 @@ impl WalReceiver {
|
||||
match loop_step_result {
|
||||
Ok(()) => continue,
|
||||
Err(_cancelled) => {
|
||||
trace!("Connection manager loop ended, shutting down");
|
||||
info!("Connection manager loop ended, shutting down");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
info!("Awaiting connection manager state shutdown ...");
|
||||
connection_manager_state.shutdown().await;
|
||||
*loop_status.write().unwrap() = None;
|
||||
info!("task exits");
|
||||
@@ -128,7 +129,7 @@ impl WalReceiver {
|
||||
#[instrument(skip_all, level = tracing::Level::DEBUG)]
|
||||
pub async fn shutdown(self) {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
debug!("cancelling walreceiver tasks");
|
||||
info!("cancelling walreceiver tasks");
|
||||
self.cancel.cancel();
|
||||
match self.task.await {
|
||||
Ok(()) => debug!("Shutdown success"),
|
||||
@@ -171,7 +172,7 @@ enum TaskStateUpdate<E> {
|
||||
Progress(E),
|
||||
}
|
||||
|
||||
impl<E: Clone> TaskHandle<E> {
|
||||
impl<E: Clone + std::fmt::Debug> TaskHandle<E> {
|
||||
/// Initializes the task, starting it immediately after the creation.
|
||||
///
|
||||
/// The second argument to `task` is a child token of `cancel_parent` ([`CancellationToken::child_token`]).
|
||||
@@ -243,10 +244,30 @@ impl<E: Clone> TaskHandle<E> {
|
||||
}
|
||||
|
||||
/// Aborts current task, waiting for it to finish.
|
||||
async fn shutdown(self) {
|
||||
if let Some(jh) = self.join_handle {
|
||||
async fn shutdown(mut self) {
|
||||
if let Some(mut jh) = self.join_handle {
|
||||
self.cancellation.cancel();
|
||||
match jh.await {
|
||||
|
||||
let res = loop {
|
||||
tokio::select! {
|
||||
res = &mut jh => {
|
||||
break res;
|
||||
},
|
||||
received = self.events_receiver.changed() => {
|
||||
match received {
|
||||
Ok(()) => {
|
||||
let event = self.events_receiver.borrow();
|
||||
tracing::info!("Received update after cancellation: {event:?}");
|
||||
},
|
||||
Err(err) => {
|
||||
tracing::info!("Sender dropped after cancellation: {err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
match res {
|
||||
Ok(Ok(())) => debug!("Shutdown success"),
|
||||
Ok(Err(e)) => error!("Shutdown task error: {e:?}"),
|
||||
Err(je) if je.is_cancelled() => unreachable!("not used"),
|
||||
|
||||
@@ -66,7 +66,7 @@ pub(super) async fn connection_manager_loop_step(
|
||||
} {
|
||||
Ok(()) => {}
|
||||
Err(new_state) => {
|
||||
debug!(
|
||||
info!(
|
||||
?new_state,
|
||||
"state changed, stopping wal connection manager loop"
|
||||
);
|
||||
@@ -145,7 +145,7 @@ pub(super) async fn connection_manager_loop_step(
|
||||
}
|
||||
TaskEvent::End(walreceiver_task_result) => {
|
||||
match walreceiver_task_result {
|
||||
Ok(()) => debug!("WAL receiving task finished"),
|
||||
Ok(()) => info!("WAL receiving task finished"),
|
||||
Err(e) => error!("wal receiver task finished with an error: {e:?}"),
|
||||
}
|
||||
connection_manager_state.drop_old_connection(false).await;
|
||||
|
||||
@@ -193,7 +193,7 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
select! {
|
||||
connection_result = connection => match connection_result {
|
||||
Ok(()) => debug!("Walreceiver db connection closed"),
|
||||
Ok(()) => info!("Walreceiver db connection closed"),
|
||||
Err(connection_error) => {
|
||||
match WalReceiverError::from(connection_error) {
|
||||
WalReceiverError::ExpectedSafekeeperError(_) => {
|
||||
@@ -202,7 +202,7 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
},
|
||||
WalReceiverError::SuccessfulCompletion(_) => {}
|
||||
WalReceiverError::Cancelled => {
|
||||
debug!("Connection cancelled")
|
||||
info!("Connection cancelled")
|
||||
}
|
||||
WalReceiverError::ClosedGate => {
|
||||
// doesn't happen at runtime
|
||||
@@ -213,7 +213,7 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
}
|
||||
}
|
||||
},
|
||||
_ = connection_cancellation.cancelled() => debug!("Connection cancelled"),
|
||||
_ = connection_cancellation.cancelled() => info!("Connection cancelled"),
|
||||
}
|
||||
drop(poller_guard);
|
||||
}
|
||||
@@ -299,7 +299,7 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
select! {
|
||||
biased;
|
||||
_ = cancellation.cancelled() => {
|
||||
debug!("walreceiver interrupted");
|
||||
info!("walreceiver interrupted");
|
||||
None
|
||||
}
|
||||
replication_message = physical_stream.next() => replication_message,
|
||||
@@ -307,6 +307,19 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
} {
|
||||
let replication_message = replication_message?;
|
||||
|
||||
match &replication_message {
|
||||
ReplicationMessage::XLogData(_) => {
|
||||
tracing::info!("Received XLogData replication message")
|
||||
}
|
||||
ReplicationMessage::PrimaryKeepAlive(_) => {
|
||||
tracing::info!("Received PrimaryKeepAlive replication message")
|
||||
}
|
||||
ReplicationMessage::RawInterpretedWalRecords(_) => {
|
||||
tracing::info!("Received RawInterpretedWalRecords replication message")
|
||||
}
|
||||
unknown => tracing::info!("Received unknown replication message: {unknown:?}"),
|
||||
}
|
||||
|
||||
let now = Utc::now().naive_utc();
|
||||
let last_rec_lsn_before_msg = last_rec_lsn;
|
||||
|
||||
@@ -577,7 +590,7 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
shard_number: timeline.tenant_shard_id.shard_number.0 as u32,
|
||||
};
|
||||
|
||||
debug!("neon_status_update {status_update:?}");
|
||||
info!("sending neon_status_update {status_update:?}");
|
||||
|
||||
let mut data = BytesMut::new();
|
||||
status_update.serialize(&mut data);
|
||||
@@ -585,6 +598,8 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
.as_mut()
|
||||
.zenith_status_update(data.len() as u64, &data)
|
||||
.await?;
|
||||
|
||||
info!("sent neon_status_update");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user