Compare commits

...

9 Commits

Author SHA1 Message Date
Vlad Lazar
95bb6ce2e4 another fix 2025-06-26 11:11:05 +02:00
Vlad Lazar
858f5f2ddc wip: try out a fix 2025-06-25 14:26:34 +02:00
Vlad Lazar
a4fdef69ad wip: one more log 2025-06-25 12:24:40 +02:00
Vlad Lazar
dae1b58964 wip: more logging 2025-06-24 16:30:58 +02:00
Vlad Lazar
e91a410472 wip: more logs 2025-06-24 13:07:59 +02:00
Vlad Lazar
9553a2670e Merge branch 'main' into vlad/debug-test-sharding-auto-split 2025-06-23 17:47:59 +03:00
Vlad Lazar
74d2d233e4 wip: more logs 2025-06-23 13:51:53 +02:00
Vlad Lazar
57edf217b7 trigger bench 2025-06-20 10:45:55 +02:00
Vlad Lazar
ab1335cba0 wip: add some info logs for timeline shutdown 2025-06-20 10:40:52 +02:00
4 changed files with 66 additions and 26 deletions

View File

@@ -2146,12 +2146,13 @@ impl Timeline {
// Regardless of whether we're going to try_freeze_and_flush
// or not, stop ingesting any more data.
let walreceiver = self.walreceiver.lock().unwrap().take();
tracing::debug!(
tracing::info!(
is_some = walreceiver.is_some(),
"Waiting for WalReceiverManager..."
);
if let Some(walreceiver) = walreceiver {
walreceiver.shutdown().await;
tracing::info!("WalReceiverManager shut down");
}
// ... and inform any waiters for newer LSNs that there won't be any.
self.last_record_lsn.shutdown();
@@ -2248,6 +2249,7 @@ impl Timeline {
// As documented in remote_client.stop()'s doc comment, it's our responsibility
// to shut down the upload queue tasks.
// TODO: fix that, task management should be encapsulated inside remote_client.
tracing::info!("Waiting for remote uploads tasks...");
task_mgr::shutdown_tasks(
Some(TaskKind::RemoteUploadTask),
Some(self.tenant_shard_id),
@@ -2256,12 +2258,13 @@ impl Timeline {
.await;
// TODO: work toward making this a no-op. See this function's doc comment for more context.
tracing::debug!("Waiting for tasks...");
tracing::info!("Waiting for tasks...");
task_mgr::shutdown_tasks(None, Some(self.tenant_shard_id), Some(self.timeline_id)).await;
{
// Allow any remaining in-memory layers to do cleanup -- until that, they hold the gate
// open.
tracing::info!("Waiting for layer manager shutdown...");
let mut write_guard = self.write_lock.lock().await;
self.layers
.write(LayerManagerLockHolder::Shutdown)
@@ -2273,6 +2276,7 @@ impl Timeline {
//
// TODO: once above shutdown_tasks is a no-op, we can close the gate before calling shutdown_tasks
// and use a TBD variant of shutdown_tasks that asserts that there were no tasks left.
tracing::info!("Waiting for timeline gate close...");
self.gate.close().await;
self.metrics.shutdown();
@@ -4670,6 +4674,7 @@ impl Timeline {
};
info!("started flush loop");
loop {
tokio::select! {
_ = self.cancel.cancelled() => {
@@ -4684,13 +4689,12 @@ impl Timeline {
// The highest LSN to which we flushed in the loop over frozen layers
let mut flushed_to_lsn = Lsn(0);
let result = loop {
// Force not bailing early by wrapping the code into a closure.
#[allow(clippy::redundant_closure_call)]
let result = (async || { loop {
if self.cancel.is_cancelled() {
info!("dropping out of flush loop for timeline shutdown");
// Note: we do not bother transmitting into [`layer_flush_done_tx`], because
// anyone waiting on that will respect self.cancel as well: they will stop
// waiting at the same time we as drop out of this loop.
return;
break Err(FlushLayerError::Cancelled);
}
// Break to notify potential waiters as soon as we've flushed the requested LSN. If
@@ -4703,8 +4707,8 @@ impl Timeline {
let (layer, l0_count, frozen_count, frozen_size) = {
let layers = self.layers.read(LayerManagerLockHolder::FlushLoop).await;
let Ok(lm) = layers.layer_map() else {
info!("dropping out of flush loop for timeline shutdown");
return;
info!("dropping out of flush loop for layer map shutdown");
break Err(FlushLayerError::Cancelled);
};
let l0_count = lm.level0_deltas().len();
let frozen_count = lm.frozen_layers.len();
@@ -4752,8 +4756,8 @@ impl Timeline {
match self.flush_frozen_layer(layer, ctx).await {
Ok(layer_lsn) => flushed_to_lsn = max(flushed_to_lsn, layer_lsn),
Err(FlushLayerError::Cancelled) => {
info!("dropping out of flush loop for timeline shutdown");
return;
info!("dropping out of flush loop for remote client shutdown");
break Err(FlushLayerError::Cancelled);
}
err @ Err(
FlushLayerError::NotRunning(_)
@@ -4794,7 +4798,7 @@ impl Timeline {
}
}
}
};
}})().await;
// Unsharded tenants should never advance their LSN beyond the end of the
// highest layer they write: such gaps between layer data and the frozen LSN
@@ -7408,7 +7412,7 @@ impl TimelineWriter<'_> {
if let Some(wait_threshold) = wait_threshold {
if l0_count >= wait_threshold {
debug!(
info!(
"layer roll waiting for flush due to compaction backpressure at {l0_count} L0 layers"
);
self.tl.wait_flush_completion(flush_id).await?;

View File

@@ -106,11 +106,12 @@ impl WalReceiver {
match loop_step_result {
Ok(()) => continue,
Err(_cancelled) => {
trace!("Connection manager loop ended, shutting down");
info!("Connection manager loop ended, shutting down");
break;
}
}
}
info!("Awaiting connection manager state shutdown ...");
connection_manager_state.shutdown().await;
*loop_status.write().unwrap() = None;
info!("task exits");
@@ -128,7 +129,7 @@ impl WalReceiver {
#[instrument(skip_all, level = tracing::Level::DEBUG)]
pub async fn shutdown(self) {
debug_assert_current_span_has_tenant_and_timeline_id();
debug!("cancelling walreceiver tasks");
info!("cancelling walreceiver tasks");
self.cancel.cancel();
match self.task.await {
Ok(()) => debug!("Shutdown success"),
@@ -171,7 +172,7 @@ enum TaskStateUpdate<E> {
Progress(E),
}
impl<E: Clone> TaskHandle<E> {
impl<E: Clone + std::fmt::Debug> TaskHandle<E> {
/// Initializes the task, starting it immediately after the creation.
///
/// The second argument to `task` is a child token of `cancel_parent` ([`CancellationToken::child_token`]).
@@ -243,10 +244,30 @@ impl<E: Clone> TaskHandle<E> {
}
/// Aborts current task, waiting for it to finish.
async fn shutdown(self) {
if let Some(jh) = self.join_handle {
async fn shutdown(mut self) {
if let Some(mut jh) = self.join_handle {
self.cancellation.cancel();
match jh.await {
let res = loop {
tokio::select! {
res = &mut jh => {
break res;
},
received = self.events_receiver.changed() => {
match received {
Ok(()) => {
let event = self.events_receiver.borrow();
tracing::info!("Received update after cancellation: {event:?}");
},
Err(err) => {
tracing::info!("Sender dropped after cancellation: {err}");
}
}
}
}
};
match res {
Ok(Ok(())) => debug!("Shutdown success"),
Ok(Err(e)) => error!("Shutdown task error: {e:?}"),
Err(je) if je.is_cancelled() => unreachable!("not used"),

View File

@@ -66,7 +66,7 @@ pub(super) async fn connection_manager_loop_step(
} {
Ok(()) => {}
Err(new_state) => {
debug!(
info!(
?new_state,
"state changed, stopping wal connection manager loop"
);
@@ -145,7 +145,7 @@ pub(super) async fn connection_manager_loop_step(
}
TaskEvent::End(walreceiver_task_result) => {
match walreceiver_task_result {
Ok(()) => debug!("WAL receiving task finished"),
Ok(()) => info!("WAL receiving task finished"),
Err(e) => error!("wal receiver task finished with an error: {e:?}"),
}
connection_manager_state.drop_old_connection(false).await;

View File

@@ -193,7 +193,7 @@ pub(super) async fn handle_walreceiver_connection(
debug_assert_current_span_has_tenant_and_timeline_id();
select! {
connection_result = connection => match connection_result {
Ok(()) => debug!("Walreceiver db connection closed"),
Ok(()) => info!("Walreceiver db connection closed"),
Err(connection_error) => {
match WalReceiverError::from(connection_error) {
WalReceiverError::ExpectedSafekeeperError(_) => {
@@ -202,7 +202,7 @@ pub(super) async fn handle_walreceiver_connection(
},
WalReceiverError::SuccessfulCompletion(_) => {}
WalReceiverError::Cancelled => {
debug!("Connection cancelled")
info!("Connection cancelled")
}
WalReceiverError::ClosedGate => {
// doesn't happen at runtime
@@ -213,7 +213,7 @@ pub(super) async fn handle_walreceiver_connection(
}
}
},
_ = connection_cancellation.cancelled() => debug!("Connection cancelled"),
_ = connection_cancellation.cancelled() => info!("Connection cancelled"),
}
drop(poller_guard);
}
@@ -299,7 +299,7 @@ pub(super) async fn handle_walreceiver_connection(
select! {
biased;
_ = cancellation.cancelled() => {
debug!("walreceiver interrupted");
info!("walreceiver interrupted");
None
}
replication_message = physical_stream.next() => replication_message,
@@ -307,6 +307,19 @@ pub(super) async fn handle_walreceiver_connection(
} {
let replication_message = replication_message?;
match &replication_message {
ReplicationMessage::XLogData(_) => {
tracing::info!("Received XLogData replication message")
}
ReplicationMessage::PrimaryKeepAlive(_) => {
tracing::info!("Received PrimaryKeepAlive replication message")
}
ReplicationMessage::RawInterpretedWalRecords(_) => {
tracing::info!("Received RawInterpretedWalRecords replication message")
}
unknown => tracing::info!("Received unknown replication message: {unknown:?}"),
}
let now = Utc::now().naive_utc();
let last_rec_lsn_before_msg = last_rec_lsn;
@@ -577,7 +590,7 @@ pub(super) async fn handle_walreceiver_connection(
shard_number: timeline.tenant_shard_id.shard_number.0 as u32,
};
debug!("neon_status_update {status_update:?}");
info!("sending neon_status_update {status_update:?}");
let mut data = BytesMut::new();
status_update.serialize(&mut data);
@@ -585,6 +598,8 @@ pub(super) async fn handle_walreceiver_connection(
.as_mut()
.zenith_status_update(data.len() as u64, &data)
.await?;
info!("sent neon_status_update");
}
}