rest of tracing changes

This commit is contained in:
Joonas Koivunen
2023-05-30 15:10:36 +03:00
parent 7963237c43
commit 5fc725031b
9 changed files with 73 additions and 54 deletions

View File

@@ -111,7 +111,7 @@ pub fn launch_disk_usage_global_eviction_task(
task_mgr::shutdown_token(),
)
.await;
info!("disk usage based eviction task finishing");
debug!("disk usage based eviction task finishing");
Ok(())
},
);
@@ -133,7 +133,7 @@ async fn disk_usage_eviction_task(
.await
.is_err()
{
info!("shutting down");
debug!("shutting down");
return;
}
}
@@ -168,7 +168,7 @@ async fn disk_usage_eviction_task(
tokio::select! {
_ = tokio::time::sleep_until(sleep_until) => {},
_ = cancel.cancelled() => {
info!("shutting down");
debug!("shutting down");
break
}
}

View File

@@ -1102,14 +1102,14 @@ impl Tenant {
/// Subroutine of `load_tenant`, to load an individual timeline
///
/// NB: The parent is assumed to be already loaded!
#[instrument(skip_all, fields(timeline_id))]
#[instrument(skip_all, fields(timeline_id=%timeline_id))]
async fn load_local_timeline(
&self,
timeline_id: TimelineId,
local_metadata: TimelineMetadata,
ctx: &RequestContext,
) -> anyhow::Result<()> {
debug_assert_current_span_has_tenant_id();
debug_assert_current_span_has_tenant_and_timeline_id();
let remote_client = self.remote_storage.as_ref().map(|remote_storage| {
RemoteTimelineClient::new(
@@ -1394,6 +1394,8 @@ impl Tenant {
pitr: Duration,
ctx: &RequestContext,
) -> anyhow::Result<GcResult> {
debug_assert_current_span_has_tenant_id();
anyhow::ensure!(
self.is_active(),
"Cannot run GC iteration on inactive tenant"
@@ -1408,6 +1410,8 @@ impl Tenant {
/// Also it can be explicitly requested per timeline through page server
/// api's 'compact' command.
pub async fn compaction_iteration(&self, ctx: &RequestContext) -> anyhow::Result<()> {
debug_assert_current_span_has_tenant_id();
anyhow::ensure!(
self.is_active(),
"Cannot run compaction iteration on inactive tenant"
@@ -2141,7 +2145,7 @@ impl Tenant {
let target_config_path = conf.tenant_config_path(tenant_id);
let target_config_display = target_config_path.display();
info!("loading tenantconf from {target_config_display}");
debug!("loading tenantconf from {target_config_display}");
// FIXME If the config file is not found, assume that we're attaching
// a detached tenant and config is passed via attach command.

View File

@@ -212,7 +212,7 @@ pub fn schedule_local_tenant_processing(
)
}
} else {
info!("tenant {tenant_id} is assumed to be loadable, starting load operation");
trace!("tenant {tenant_id} is assumed to be loadable, starting load operation");
// Start loading the tenant into memory. It will initially be in Loading state.
Tenant::spawn_load(
conf,

View File

@@ -61,7 +61,7 @@ pub fn start_background_loops(tenant: &Arc<Tenant>, init_done: Option<&completio
///
async fn compaction_loop(tenant: Arc<Tenant>) {
let wait_duration = Duration::from_secs(2);
info!("starting");
debug!("starting");
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
async {
let cancel = task_mgr::shutdown_token();
@@ -72,7 +72,7 @@ async fn compaction_loop(tenant: Arc<Tenant>) {
tokio::select! {
_ = cancel.cancelled() => {
info!("received cancellation request");
debug!("received cancellation request");
return;
},
tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result {
@@ -95,7 +95,7 @@ async fn compaction_loop(tenant: Arc<Tenant>) {
let started_at = Instant::now();
let sleep_duration = if period == Duration::ZERO {
info!("automatic compaction is disabled");
debug!("automatic compaction is disabled");
// check again in 10 seconds, in case it's been enabled again.
Duration::from_secs(10)
} else {
@@ -113,7 +113,7 @@ async fn compaction_loop(tenant: Arc<Tenant>) {
// Sleep
tokio::select! {
_ = cancel.cancelled() => {
info!("received cancellation request during idling");
debug!("received cancellation request during idling");
break;
},
_ = tokio::time::sleep(sleep_duration) => {},
@@ -131,7 +131,7 @@ async fn compaction_loop(tenant: Arc<Tenant>) {
///
async fn gc_loop(tenant: Arc<Tenant>) {
let wait_duration = Duration::from_secs(2);
info!("starting");
debug!("starting");
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
async {
let cancel = task_mgr::shutdown_token();
@@ -145,7 +145,7 @@ async fn gc_loop(tenant: Arc<Tenant>) {
tokio::select! {
_ = cancel.cancelled() => {
info!("received cancellation request");
debug!("received cancellation request");
return;
},
tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result {
@@ -167,7 +167,7 @@ async fn gc_loop(tenant: Arc<Tenant>) {
let gc_horizon = tenant.get_gc_horizon();
let sleep_duration = if period == Duration::ZERO || gc_horizon == 0 {
info!("automatic GC is disabled");
debug!("automatic GC is disabled");
// check again in 10 seconds, in case it's been enabled again.
Duration::from_secs(10)
} else {
@@ -188,7 +188,7 @@ async fn gc_loop(tenant: Arc<Tenant>) {
// Sleep
tokio::select! {
_ = cancel.cancelled() => {
info!("received cancellation request during idling");
debug!("received cancellation request during idling");
break;
},
_ = tokio::time::sleep(sleep_duration) => {},

View File

@@ -674,7 +674,10 @@ impl Timeline {
}
/// Outermost timeline compaction operation; downloads needed layers.
#[instrument(skip_all)]
pub async fn compact(self: &Arc<Self>, ctx: &RequestContext) -> anyhow::Result<()> {
debug_assert_current_span_has_tenant_and_timeline_id();
const ROUNDS: usize = 2;
let last_record_lsn = self.get_last_record_lsn();
@@ -1434,7 +1437,7 @@ impl Timeline {
match *flush_loop_state {
FlushLoopState::NotStarted => (),
FlushLoopState::Running => {
info!(
debug!(
"skipping attempt to start flush_loop twice {}/{}",
self.tenant_id, self.timeline_id
);
@@ -1452,7 +1455,7 @@ impl Timeline {
let layer_flush_start_rx = self.layer_flush_start_tx.subscribe();
let self_clone = Arc::clone(self);
info!("spawning flush loop");
debug!("spawning flush loop");
task_mgr::spawn(
task_mgr::BACKGROUND_RUNTIME.handle(),
task_mgr::TaskKind::LayerFlushTask,
@@ -1468,7 +1471,11 @@ impl Timeline {
*flush_loop_state = FlushLoopState::Exited;
Ok(())
}
.instrument(info_span!(parent: None, "layer flush task", tenant = %self.tenant_id, timeline = %self.timeline_id))
.instrument({
let span = info_span!(parent: None, "layer flush task", tenant_id = %self.tenant_id, timeline_id = %self.timeline_id);
span.follows_from(Span::current());
span
})
);
*flush_loop_state = FlushLoopState::Running;
@@ -1483,7 +1490,7 @@ impl Timeline {
ctx: &RequestContext,
broker_client: BrokerClientChannel,
) {
info!(
debug!(
"launching WAL receiver for timeline {} of tenant {}",
self.timeline_id, self.tenant_id
);
@@ -1604,7 +1611,7 @@ impl Timeline {
} else if fname == METADATA_FILE_NAME || fname.ends_with(".old") {
// ignore these
} else if remote_timeline_client::is_temp_download_file(&direntry_path) {
info!(
debug!(
"skipping temp download file, reconcile_with_remote will resume / clean up: {}",
fname
);
@@ -1613,7 +1620,7 @@ impl Timeline {
trace!("deleting old ephemeral file in timeline dir: {}", fname);
fs::remove_file(&direntry_path)?;
} else if is_temporary(&direntry_path) {
info!("removing temp timeline file at {}", direntry_path.display());
debug!("removing temp timeline file at {}", direntry_path.display());
fs::remove_file(&direntry_path).with_context(|| {
format!(
"failed to remove temp download file at {}",
@@ -1710,7 +1717,7 @@ impl Timeline {
}
}
info!(
debug!(
"remote layer does not exist locally, creating remote layer: {}",
remote_layer_name.file_name()
);
@@ -1795,7 +1802,7 @@ impl Timeline {
up_to_date_metadata: &TimelineMetadata,
index_part: Option<&IndexPart>,
) -> anyhow::Result<()> {
info!("starting");
trace!("starting");
let remote_client = self
.remote_client
.as_ref()
@@ -1815,7 +1822,7 @@ impl Timeline {
let has_local_layers = !local_layers.is_empty();
let local_only_layers = match index_part {
Some(index_part) => {
info!(
debug!(
"initializing upload queue from remote index with {} layer files",
index_part.timeline_layers.len()
);
@@ -1824,7 +1831,7 @@ impl Timeline {
.await?
}
None => {
info!("initializing upload queue as empty");
debug!("initializing upload queue as empty");
remote_client.init_upload_queue_for_empty_remote(up_to_date_metadata)?;
local_layers
}
@@ -1842,7 +1849,7 @@ impl Timeline {
.metadata()
.with_context(|| format!("failed to get file {layer_path:?} metadata"))?
.len();
info!("scheduling {layer_path:?} for upload");
debug!("scheduling {layer_path:?} for upload");
remote_client
.schedule_layer_file_upload(layer_name, &LayerFileMetadata::new(layer_size))?;
}
@@ -1865,7 +1872,7 @@ impl Timeline {
// Local timeline has a metadata file, remote one too, both have no layers to sync.
}
info!("Done");
trace!("Done");
Ok(())
}
@@ -1887,7 +1894,7 @@ impl Timeline {
.get()
.is_none());
info!(
debug!(
"spawning logical size computation from context of task kind {:?}",
ctx.task_kind()
);
@@ -2081,6 +2088,7 @@ impl Timeline {
///
/// NOTE: counted incrementally, includes ancestors. This can be a slow operation,
/// especially if we need to download remote layers.
#[instrument(skip_all)]
pub async fn calculate_logical_size(
&self,
up_to_lsn: Lsn,
@@ -2088,7 +2096,7 @@ impl Timeline {
cancel: CancellationToken,
ctx: &RequestContext,
) -> Result<u64, CalculateLogicalSizeError> {
info!(
debug!(
"Calculating logical size for timeline {} at {}",
self.timeline_id, up_to_lsn
);
@@ -2130,8 +2138,8 @@ impl Timeline {
let logical_size = self
.get_current_logical_size_non_incremental(up_to_lsn, cancel, ctx)
.await?;
debug!("calculated logical size: {logical_size}");
timer.stop_and_record();
info!("calculated logical size: {logical_size}");
Ok(logical_size)
}
@@ -2643,11 +2651,11 @@ impl Timeline {
mut layer_flush_start_rx: tokio::sync::watch::Receiver<u64>,
ctx: &RequestContext,
) {
info!("started flush loop");
debug!("started flush loop");
loop {
tokio::select! {
_ = task_mgr::shutdown_watcher() => {
info!("shutting down layer flush task");
debug!("shutting down layer flush task");
break;
},
_ = layer_flush_start_rx.changed() => {}
@@ -2910,6 +2918,7 @@ impl Timeline {
Ok((new_delta_filename, LayerFileMetadata::new(sz)))
}
#[instrument(skip_all)]
async fn repartition(
&self,
lsn: Lsn,
@@ -3015,6 +3024,7 @@ impl Timeline {
Ok(false)
}
#[instrument(skip_all)]
async fn create_image_layers(
&self,
partitioning: &KeyPartitioning,
@@ -3238,7 +3248,7 @@ impl Timeline {
let remotes = deltas_to_compact
.iter()
.filter(|l| l.is_remote_layer())
.inspect(|l| info!("compact requires download of {}", l.filename().file_name()))
.inspect(|l| debug!("compact requires download of {}", l.filename().file_name()))
.map(|l| {
l.clone()
.downcast_remote_layer()
@@ -3262,7 +3272,7 @@ impl Timeline {
);
for l in deltas_to_compact.iter() {
info!("compact includes {}", l.filename().file_name());
debug!("compact includes {}", l.filename().file_name());
}
// We don't need the original list of layers anymore. Drop it so that
@@ -3790,7 +3800,7 @@ impl Timeline {
write_guard.store_and_unlock(new_gc_cutoff).wait();
}
info!("GC starting");
debug!("GC starting");
debug!("retain_lsns: {:?}", retain_lsns);

View File

@@ -22,7 +22,7 @@ use std::{
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, info_span, instrument, warn, Instrument};
use tracing::{debug, error, info, info_span, instrument, trace, warn, Instrument};
use crate::{
context::{DownloadBehavior, RequestContext},
@@ -58,7 +58,7 @@ impl Timeline {
false,
async move {
self_clone.eviction_task(task_mgr::shutdown_token()).await;
info!("eviction task finishing");
debug!("eviction task finishing");
Ok(())
},
);
@@ -74,7 +74,7 @@ impl Timeline {
EvictionPolicy::NoEviction => Duration::from_secs(10),
};
if random_init_delay(period, &cancel).await.is_err() {
info!("shutting down");
trace!("shutting down");
return;
}
}
@@ -89,7 +89,7 @@ impl Timeline {
ControlFlow::Continue(sleep_until) => {
tokio::select! {
_ = cancel.cancelled() => {
info!("shutting down");
trace!("shutting down");
break;
}
_ = tokio::time::sleep_until(sleep_until) => { }
@@ -348,7 +348,6 @@ impl Timeline {
cancel.clone(),
ctx,
)
.instrument(info_span!("calculate_logical_size"))
.await;
match &size {

View File

@@ -85,7 +85,7 @@ impl WalReceiver {
&format!("walreceiver for timeline {tenant_id}/{timeline_id}"),
false,
async move {
info!("WAL receiver manager started, connecting to broker");
debug!("WAL receiver manager started, connecting to broker");
let mut connection_manager_state = ConnectionManagerState::new(
timeline,
conf,
@@ -93,7 +93,7 @@ impl WalReceiver {
loop {
select! {
_ = task_mgr::shutdown_watcher() => {
info!("WAL receiver shutdown requested, shutting down");
trace!("WAL receiver shutdown requested, shutting down");
break;
},
loop_step_result = connection_manager_loop_step(
@@ -104,7 +104,7 @@ impl WalReceiver {
) => match loop_step_result {
ControlFlow::Continue(()) => continue,
ControlFlow::Break(()) => {
info!("Connection manager loop ended, shutting down");
trace!("Connection manager loop ended, shutting down");
break;
}
},
@@ -115,7 +115,11 @@ impl WalReceiver {
*loop_status.write().unwrap() = None;
Ok(())
}
.instrument(info_span!(parent: None, "wal_connection_manager", tenant = %tenant_id, timeline = %timeline_id))
.instrument({
let span = info_span!(parent: None, "wal_connection_manager", tenant = %tenant_id, timeline = %timeline_id);
span.follows_from(Span::current());
span
})
);
Self {
@@ -214,7 +218,7 @@ impl<E: Clone> TaskHandle<E> {
// So, tone them down to info-level.
//
// XXX: rewrite this module to eliminate the race condition.
info!("sender is dropped while join handle is still alive");
debug!("sender is dropped while join handle is still alive");
}
let res = jh

View File

@@ -56,7 +56,7 @@ pub(super) async fn connection_manager_loop_step(
{
Ok(()) => {}
Err(_) => {
info!("Timeline dropped state updates sender before becoming active, stopping wal connection manager loop");
debug!("Timeline dropped state updates sender before becoming active, stopping wal connection manager loop");
return ControlFlow::Break(());
}
}
@@ -79,7 +79,7 @@ pub(super) async fn connection_manager_loop_step(
// with other streams on this client (other connection managers). When
// object goes out of scope, stream finishes in drop() automatically.
let mut broker_subscription = subscribe_for_timeline_updates(broker_client, id).await;
info!("Subscribed for broker timeline updates");
debug!("Subscribed for broker timeline updates");
loop {
let time_until_next_retry = connection_manager_state.time_until_next_retry();
@@ -151,7 +151,7 @@ pub(super) async fn connection_manager_loop_step(
// we're already active as walreceiver, no need to reactivate
TimelineState::Active => continue,
TimelineState::Broken | TimelineState::Stopping => {
info!("timeline entered terminal state {new_state:?}, stopping wal connection manager loop");
debug!("timeline entered terminal state {new_state:?}, stopping wal connection manager loop");
return ControlFlow::Break(());
}
TimelineState::Loading => {
@@ -165,11 +165,11 @@ pub(super) async fn connection_manager_loop_step(
}
} => match new_event {
ControlFlow::Continue(new_state) => {
info!("observed timeline state change, new state is {new_state:?}");
debug!("observed timeline state change, new state is {new_state:?}");
return ControlFlow::Continue(());
}
ControlFlow::Break(()) => {
info!("Timeline dropped state updates sender, stopping wal connection manager loop");
debug!("Timeline dropped state updates sender, stopping wal connection manager loop");
return ControlFlow::Break(());
}
},
@@ -470,7 +470,7 @@ impl ConnectionManagerState {
if let Some(next) = &retry.next_retry_at {
if next > &now {
info!(
debug!(
"Next connection retry to {:?} is at {}",
wal_connection.sk_id, next
);
@@ -515,7 +515,7 @@ impl ConnectionManagerState {
);
if old_entry.is_none() {
info!("New SK node was added: {new_safekeeper_id}");
debug!("New SK node was added: {new_safekeeper_id}");
WALRECEIVER_CANDIDATES_ADDED.inc();
}
}

View File

@@ -119,6 +119,7 @@ pub(super) async fn handle_walreceiver_connection(
ctx.download_behavior(),
);
let connection_cancellation = cancellation.clone();
use tracing::Instrument;
task_mgr::spawn(
WALRECEIVER_RUNTIME.handle(),
TaskKind::WalReceiverConnectionPoller,
@@ -140,7 +141,8 @@ pub(super) async fn handle_walreceiver_connection(
_ = connection_cancellation.cancelled() => info!("Connection cancelled"),
}
Ok(())
},
}
.instrument(tracing::info_span!("walreceiver")),
);
// Immediately increment the gauge, then create a job to decrement it on task exit.
@@ -153,7 +155,7 @@ pub(super) async fn handle_walreceiver_connection(
}
let identify = identify_system(&mut replication_client).await?;
info!("{identify:?}");
debug!("{identify:?}");
let end_of_wal = Lsn::from(u64::from(identify.xlogpos));
let mut caught_up = false;