From 5fc725031b67889b33d53dbdc7b4ada14042e97d Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Tue, 30 May 2023 15:10:36 +0300 Subject: [PATCH] rest of tracing changes --- pageserver/src/disk_usage_eviction_task.rs | 6 +-- pageserver/src/tenant.rs | 10 ++-- pageserver/src/tenant/mgr.rs | 2 +- pageserver/src/tenant/tasks.rs | 16 +++--- pageserver/src/tenant/timeline.rs | 50 +++++++++++-------- .../src/tenant/timeline/eviction_task.rs | 9 ++-- pageserver/src/tenant/timeline/walreceiver.rs | 14 ++++-- .../walreceiver/connection_manager.rs | 14 +++--- .../walreceiver/walreceiver_connection.rs | 6 ++- 9 files changed, 73 insertions(+), 54 deletions(-) diff --git a/pageserver/src/disk_usage_eviction_task.rs b/pageserver/src/disk_usage_eviction_task.rs index 1a8886935c..278b6c730b 100644 --- a/pageserver/src/disk_usage_eviction_task.rs +++ b/pageserver/src/disk_usage_eviction_task.rs @@ -111,7 +111,7 @@ pub fn launch_disk_usage_global_eviction_task( task_mgr::shutdown_token(), ) .await; - info!("disk usage based eviction task finishing"); + debug!("disk usage based eviction task finishing"); Ok(()) }, ); @@ -133,7 +133,7 @@ async fn disk_usage_eviction_task( .await .is_err() { - info!("shutting down"); + debug!("shutting down"); return; } } @@ -168,7 +168,7 @@ async fn disk_usage_eviction_task( tokio::select! { _ = tokio::time::sleep_until(sleep_until) => {}, _ = cancel.cancelled() => { - info!("shutting down"); + debug!("shutting down"); break } } diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index ff975db601..1e843e0f9f 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -1102,14 +1102,14 @@ impl Tenant { /// Subroutine of `load_tenant`, to load an individual timeline /// /// NB: The parent is assumed to be already loaded! - #[instrument(skip_all, fields(timeline_id))] + #[instrument(skip_all, fields(timeline_id=%timeline_id))] async fn load_local_timeline( &self, timeline_id: TimelineId, local_metadata: TimelineMetadata, ctx: &RequestContext, ) -> anyhow::Result<()> { - debug_assert_current_span_has_tenant_id(); + debug_assert_current_span_has_tenant_and_timeline_id(); let remote_client = self.remote_storage.as_ref().map(|remote_storage| { RemoteTimelineClient::new( @@ -1394,6 +1394,8 @@ impl Tenant { pitr: Duration, ctx: &RequestContext, ) -> anyhow::Result { + debug_assert_current_span_has_tenant_id(); + anyhow::ensure!( self.is_active(), "Cannot run GC iteration on inactive tenant" @@ -1408,6 +1410,8 @@ impl Tenant { /// Also it can be explicitly requested per timeline through page server /// api's 'compact' command. pub async fn compaction_iteration(&self, ctx: &RequestContext) -> anyhow::Result<()> { + debug_assert_current_span_has_tenant_id(); + anyhow::ensure!( self.is_active(), "Cannot run compaction iteration on inactive tenant" @@ -2141,7 +2145,7 @@ impl Tenant { let target_config_path = conf.tenant_config_path(tenant_id); let target_config_display = target_config_path.display(); - info!("loading tenantconf from {target_config_display}"); + debug!("loading tenantconf from {target_config_display}"); // FIXME If the config file is not found, assume that we're attaching // a detached tenant and config is passed via attach command. diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index d3cd914037..0a680afa44 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -212,7 +212,7 @@ pub fn schedule_local_tenant_processing( ) } } else { - info!("tenant {tenant_id} is assumed to be loadable, starting load operation"); + trace!("tenant {tenant_id} is assumed to be loadable, starting load operation"); // Start loading the tenant into memory. It will initially be in Loading state. Tenant::spawn_load( conf, diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index 02aed11114..6838c090df 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -61,7 +61,7 @@ pub fn start_background_loops(tenant: &Arc, init_done: Option<&completio /// async fn compaction_loop(tenant: Arc) { let wait_duration = Duration::from_secs(2); - info!("starting"); + debug!("starting"); TENANT_TASK_EVENTS.with_label_values(&["start"]).inc(); async { let cancel = task_mgr::shutdown_token(); @@ -72,7 +72,7 @@ async fn compaction_loop(tenant: Arc) { tokio::select! { _ = cancel.cancelled() => { - info!("received cancellation request"); + debug!("received cancellation request"); return; }, tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result { @@ -95,7 +95,7 @@ async fn compaction_loop(tenant: Arc) { let started_at = Instant::now(); let sleep_duration = if period == Duration::ZERO { - info!("automatic compaction is disabled"); + debug!("automatic compaction is disabled"); // check again in 10 seconds, in case it's been enabled again. Duration::from_secs(10) } else { @@ -113,7 +113,7 @@ async fn compaction_loop(tenant: Arc) { // Sleep tokio::select! { _ = cancel.cancelled() => { - info!("received cancellation request during idling"); + debug!("received cancellation request during idling"); break; }, _ = tokio::time::sleep(sleep_duration) => {}, @@ -131,7 +131,7 @@ async fn compaction_loop(tenant: Arc) { /// async fn gc_loop(tenant: Arc) { let wait_duration = Duration::from_secs(2); - info!("starting"); + debug!("starting"); TENANT_TASK_EVENTS.with_label_values(&["start"]).inc(); async { let cancel = task_mgr::shutdown_token(); @@ -145,7 +145,7 @@ async fn gc_loop(tenant: Arc) { tokio::select! { _ = cancel.cancelled() => { - info!("received cancellation request"); + debug!("received cancellation request"); return; }, tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result { @@ -167,7 +167,7 @@ async fn gc_loop(tenant: Arc) { let gc_horizon = tenant.get_gc_horizon(); let sleep_duration = if period == Duration::ZERO || gc_horizon == 0 { - info!("automatic GC is disabled"); + debug!("automatic GC is disabled"); // check again in 10 seconds, in case it's been enabled again. Duration::from_secs(10) } else { @@ -188,7 +188,7 @@ async fn gc_loop(tenant: Arc) { // Sleep tokio::select! { _ = cancel.cancelled() => { - info!("received cancellation request during idling"); + debug!("received cancellation request during idling"); break; }, _ = tokio::time::sleep(sleep_duration) => {}, diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index ee7b002450..dadfe1df4a 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -674,7 +674,10 @@ impl Timeline { } /// Outermost timeline compaction operation; downloads needed layers. + #[instrument(skip_all)] pub async fn compact(self: &Arc, ctx: &RequestContext) -> anyhow::Result<()> { + debug_assert_current_span_has_tenant_and_timeline_id(); + const ROUNDS: usize = 2; let last_record_lsn = self.get_last_record_lsn(); @@ -1434,7 +1437,7 @@ impl Timeline { match *flush_loop_state { FlushLoopState::NotStarted => (), FlushLoopState::Running => { - info!( + debug!( "skipping attempt to start flush_loop twice {}/{}", self.tenant_id, self.timeline_id ); @@ -1452,7 +1455,7 @@ impl Timeline { let layer_flush_start_rx = self.layer_flush_start_tx.subscribe(); let self_clone = Arc::clone(self); - info!("spawning flush loop"); + debug!("spawning flush loop"); task_mgr::spawn( task_mgr::BACKGROUND_RUNTIME.handle(), task_mgr::TaskKind::LayerFlushTask, @@ -1468,7 +1471,11 @@ impl Timeline { *flush_loop_state = FlushLoopState::Exited; Ok(()) } - .instrument(info_span!(parent: None, "layer flush task", tenant = %self.tenant_id, timeline = %self.timeline_id)) + .instrument({ + let span = info_span!(parent: None, "layer flush task", tenant_id = %self.tenant_id, timeline_id = %self.timeline_id); + span.follows_from(Span::current()); + span + }) ); *flush_loop_state = FlushLoopState::Running; @@ -1483,7 +1490,7 @@ impl Timeline { ctx: &RequestContext, broker_client: BrokerClientChannel, ) { - info!( + debug!( "launching WAL receiver for timeline {} of tenant {}", self.timeline_id, self.tenant_id ); @@ -1604,7 +1611,7 @@ impl Timeline { } else if fname == METADATA_FILE_NAME || fname.ends_with(".old") { // ignore these } else if remote_timeline_client::is_temp_download_file(&direntry_path) { - info!( + debug!( "skipping temp download file, reconcile_with_remote will resume / clean up: {}", fname ); @@ -1613,7 +1620,7 @@ impl Timeline { trace!("deleting old ephemeral file in timeline dir: {}", fname); fs::remove_file(&direntry_path)?; } else if is_temporary(&direntry_path) { - info!("removing temp timeline file at {}", direntry_path.display()); + debug!("removing temp timeline file at {}", direntry_path.display()); fs::remove_file(&direntry_path).with_context(|| { format!( "failed to remove temp download file at {}", @@ -1710,7 +1717,7 @@ impl Timeline { } } - info!( + debug!( "remote layer does not exist locally, creating remote layer: {}", remote_layer_name.file_name() ); @@ -1795,7 +1802,7 @@ impl Timeline { up_to_date_metadata: &TimelineMetadata, index_part: Option<&IndexPart>, ) -> anyhow::Result<()> { - info!("starting"); + trace!("starting"); let remote_client = self .remote_client .as_ref() @@ -1815,7 +1822,7 @@ impl Timeline { let has_local_layers = !local_layers.is_empty(); let local_only_layers = match index_part { Some(index_part) => { - info!( + debug!( "initializing upload queue from remote index with {} layer files", index_part.timeline_layers.len() ); @@ -1824,7 +1831,7 @@ impl Timeline { .await? } None => { - info!("initializing upload queue as empty"); + debug!("initializing upload queue as empty"); remote_client.init_upload_queue_for_empty_remote(up_to_date_metadata)?; local_layers } @@ -1842,7 +1849,7 @@ impl Timeline { .metadata() .with_context(|| format!("failed to get file {layer_path:?} metadata"))? .len(); - info!("scheduling {layer_path:?} for upload"); + debug!("scheduling {layer_path:?} for upload"); remote_client .schedule_layer_file_upload(layer_name, &LayerFileMetadata::new(layer_size))?; } @@ -1865,7 +1872,7 @@ impl Timeline { // Local timeline has a metadata file, remote one too, both have no layers to sync. } - info!("Done"); + trace!("Done"); Ok(()) } @@ -1887,7 +1894,7 @@ impl Timeline { .get() .is_none()); - info!( + debug!( "spawning logical size computation from context of task kind {:?}", ctx.task_kind() ); @@ -2081,6 +2088,7 @@ impl Timeline { /// /// NOTE: counted incrementally, includes ancestors. This can be a slow operation, /// especially if we need to download remote layers. + #[instrument(skip_all)] pub async fn calculate_logical_size( &self, up_to_lsn: Lsn, @@ -2088,7 +2096,7 @@ impl Timeline { cancel: CancellationToken, ctx: &RequestContext, ) -> Result { - info!( + debug!( "Calculating logical size for timeline {} at {}", self.timeline_id, up_to_lsn ); @@ -2130,8 +2138,8 @@ impl Timeline { let logical_size = self .get_current_logical_size_non_incremental(up_to_lsn, cancel, ctx) .await?; - debug!("calculated logical size: {logical_size}"); timer.stop_and_record(); + info!("calculated logical size: {logical_size}"); Ok(logical_size) } @@ -2643,11 +2651,11 @@ impl Timeline { mut layer_flush_start_rx: tokio::sync::watch::Receiver, ctx: &RequestContext, ) { - info!("started flush loop"); + debug!("started flush loop"); loop { tokio::select! { _ = task_mgr::shutdown_watcher() => { - info!("shutting down layer flush task"); + debug!("shutting down layer flush task"); break; }, _ = layer_flush_start_rx.changed() => {} @@ -2910,6 +2918,7 @@ impl Timeline { Ok((new_delta_filename, LayerFileMetadata::new(sz))) } + #[instrument(skip_all)] async fn repartition( &self, lsn: Lsn, @@ -3015,6 +3024,7 @@ impl Timeline { Ok(false) } + #[instrument(skip_all)] async fn create_image_layers( &self, partitioning: &KeyPartitioning, @@ -3238,7 +3248,7 @@ impl Timeline { let remotes = deltas_to_compact .iter() .filter(|l| l.is_remote_layer()) - .inspect(|l| info!("compact requires download of {}", l.filename().file_name())) + .inspect(|l| debug!("compact requires download of {}", l.filename().file_name())) .map(|l| { l.clone() .downcast_remote_layer() @@ -3262,7 +3272,7 @@ impl Timeline { ); for l in deltas_to_compact.iter() { - info!("compact includes {}", l.filename().file_name()); + debug!("compact includes {}", l.filename().file_name()); } // We don't need the original list of layers anymore. Drop it so that @@ -3790,7 +3800,7 @@ impl Timeline { write_guard.store_and_unlock(new_gc_cutoff).wait(); } - info!("GC starting"); + debug!("GC starting"); debug!("retain_lsns: {:?}", retain_lsns); diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index 558600692e..df8f0341eb 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -22,7 +22,7 @@ use std::{ use tokio::time::Instant; use tokio_util::sync::CancellationToken; -use tracing::{debug, error, info, info_span, instrument, warn, Instrument}; +use tracing::{debug, error, info, info_span, instrument, trace, warn, Instrument}; use crate::{ context::{DownloadBehavior, RequestContext}, @@ -58,7 +58,7 @@ impl Timeline { false, async move { self_clone.eviction_task(task_mgr::shutdown_token()).await; - info!("eviction task finishing"); + debug!("eviction task finishing"); Ok(()) }, ); @@ -74,7 +74,7 @@ impl Timeline { EvictionPolicy::NoEviction => Duration::from_secs(10), }; if random_init_delay(period, &cancel).await.is_err() { - info!("shutting down"); + trace!("shutting down"); return; } } @@ -89,7 +89,7 @@ impl Timeline { ControlFlow::Continue(sleep_until) => { tokio::select! { _ = cancel.cancelled() => { - info!("shutting down"); + trace!("shutting down"); break; } _ = tokio::time::sleep_until(sleep_until) => { } @@ -348,7 +348,6 @@ impl Timeline { cancel.clone(), ctx, ) - .instrument(info_span!("calculate_logical_size")) .await; match &size { diff --git a/pageserver/src/tenant/timeline/walreceiver.rs b/pageserver/src/tenant/timeline/walreceiver.rs index 7ebf3cf172..3a61c64f70 100644 --- a/pageserver/src/tenant/timeline/walreceiver.rs +++ b/pageserver/src/tenant/timeline/walreceiver.rs @@ -85,7 +85,7 @@ impl WalReceiver { &format!("walreceiver for timeline {tenant_id}/{timeline_id}"), false, async move { - info!("WAL receiver manager started, connecting to broker"); + debug!("WAL receiver manager started, connecting to broker"); let mut connection_manager_state = ConnectionManagerState::new( timeline, conf, @@ -93,7 +93,7 @@ impl WalReceiver { loop { select! { _ = task_mgr::shutdown_watcher() => { - info!("WAL receiver shutdown requested, shutting down"); + trace!("WAL receiver shutdown requested, shutting down"); break; }, loop_step_result = connection_manager_loop_step( @@ -104,7 +104,7 @@ impl WalReceiver { ) => match loop_step_result { ControlFlow::Continue(()) => continue, ControlFlow::Break(()) => { - info!("Connection manager loop ended, shutting down"); + trace!("Connection manager loop ended, shutting down"); break; } }, @@ -115,7 +115,11 @@ impl WalReceiver { *loop_status.write().unwrap() = None; Ok(()) } - .instrument(info_span!(parent: None, "wal_connection_manager", tenant = %tenant_id, timeline = %timeline_id)) + .instrument({ + let span = info_span!(parent: None, "wal_connection_manager", tenant = %tenant_id, timeline = %timeline_id); + span.follows_from(Span::current()); + span + }) ); Self { @@ -214,7 +218,7 @@ impl TaskHandle { // So, tone them down to info-level. // // XXX: rewrite this module to eliminate the race condition. - info!("sender is dropped while join handle is still alive"); + debug!("sender is dropped while join handle is still alive"); } let res = jh diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index 6b65e1fd42..931a80ac21 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -56,7 +56,7 @@ pub(super) async fn connection_manager_loop_step( { Ok(()) => {} Err(_) => { - info!("Timeline dropped state updates sender before becoming active, stopping wal connection manager loop"); + debug!("Timeline dropped state updates sender before becoming active, stopping wal connection manager loop"); return ControlFlow::Break(()); } } @@ -79,7 +79,7 @@ pub(super) async fn connection_manager_loop_step( // with other streams on this client (other connection managers). When // object goes out of scope, stream finishes in drop() automatically. let mut broker_subscription = subscribe_for_timeline_updates(broker_client, id).await; - info!("Subscribed for broker timeline updates"); + debug!("Subscribed for broker timeline updates"); loop { let time_until_next_retry = connection_manager_state.time_until_next_retry(); @@ -151,7 +151,7 @@ pub(super) async fn connection_manager_loop_step( // we're already active as walreceiver, no need to reactivate TimelineState::Active => continue, TimelineState::Broken | TimelineState::Stopping => { - info!("timeline entered terminal state {new_state:?}, stopping wal connection manager loop"); + debug!("timeline entered terminal state {new_state:?}, stopping wal connection manager loop"); return ControlFlow::Break(()); } TimelineState::Loading => { @@ -165,11 +165,11 @@ pub(super) async fn connection_manager_loop_step( } } => match new_event { ControlFlow::Continue(new_state) => { - info!("observed timeline state change, new state is {new_state:?}"); + debug!("observed timeline state change, new state is {new_state:?}"); return ControlFlow::Continue(()); } ControlFlow::Break(()) => { - info!("Timeline dropped state updates sender, stopping wal connection manager loop"); + debug!("Timeline dropped state updates sender, stopping wal connection manager loop"); return ControlFlow::Break(()); } }, @@ -470,7 +470,7 @@ impl ConnectionManagerState { if let Some(next) = &retry.next_retry_at { if next > &now { - info!( + debug!( "Next connection retry to {:?} is at {}", wal_connection.sk_id, next ); @@ -515,7 +515,7 @@ impl ConnectionManagerState { ); if old_entry.is_none() { - info!("New SK node was added: {new_safekeeper_id}"); + debug!("New SK node was added: {new_safekeeper_id}"); WALRECEIVER_CANDIDATES_ADDED.inc(); } } diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index 1cbed3416c..c465769f89 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -119,6 +119,7 @@ pub(super) async fn handle_walreceiver_connection( ctx.download_behavior(), ); let connection_cancellation = cancellation.clone(); + use tracing::Instrument; task_mgr::spawn( WALRECEIVER_RUNTIME.handle(), TaskKind::WalReceiverConnectionPoller, @@ -140,7 +141,8 @@ pub(super) async fn handle_walreceiver_connection( _ = connection_cancellation.cancelled() => info!("Connection cancelled"), } Ok(()) - }, + } + .instrument(tracing::info_span!("walreceiver")), ); // Immediately increment the gauge, then create a job to decrement it on task exit. @@ -153,7 +155,7 @@ pub(super) async fn handle_walreceiver_connection( } let identify = identify_system(&mut replication_client).await?; - info!("{identify:?}"); + debug!("{identify:?}"); let end_of_wal = Lsn::from(u64::from(identify.xlogpos)); let mut caught_up = false;