Compare commits

...

7 Commits

Author SHA1 Message Date
Joonas Koivunen
677f46301f try: enclose load operations to initial load span
this makes only sense if we would then have a "new span" start after
tenants and timelines have been activated, not doing that now.
2023-05-31 15:34:56 +03:00
Joonas Koivunen
2725f6197f tracing_flame: disable empty samples -- seem just like noise 2023-05-31 15:34:34 +03:00
Joonas Koivunen
2c55c5a0ea neon_local: exit early if process died 2023-05-31 14:13:12 +03:00
Joonas Koivunen
777bee85fa to be dropped: tracing-flame second attempt
with configuring it no post processing is needed. the time spent in
loading is very small so one needs to use:

cat .neon/tracing.folded | inferno-flamegraph --minwidth=0 >| flamegraph.svg

To find them, otherwise it will be just walreceiver and compaction.
Noticed also some trouble with the shutdown propagating from a signal
with single threaded runtime; it very likely is that all of the
compactions are already spawned, and as they are mostly blocking code,
they will block the shutdown procedure, which will be quite late
anyways.
2023-05-31 10:54:48 +03:00
Joonas Koivunen
5fc725031b rest of tracing changes 2023-05-30 18:38:44 +03:00
Joonas Koivunen
7963237c43 to be dropped: mandatory PAGESERVER_THREADS_PER_RUNTIME 2023-05-30 18:38:20 +03:00
Joonas Koivunen
2d9d679238 to be dropped: tracing-flame usage attempt
for our purposes the folded need post processing:

```
cat .neon/tracing.folded \
  | sed -Ee 's/^ThreadId\([0-9]+\)-//' \
  | inferno-flamegraph >| flamegraph.svg
```
2023-05-30 18:38:20 +03:00
16 changed files with 197 additions and 70 deletions

12
Cargo.lock generated
View File

@@ -4590,6 +4590,17 @@ dependencies = [
"tracing-subscriber",
]
[[package]]
name = "tracing-flame"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0bae117ee14789185e129aaee5d93750abe67fdc5a9a62650452bfe4e122a3a9"
dependencies = [
"lazy_static",
"tracing",
"tracing-subscriber",
]
[[package]]
name = "tracing-futures"
version = "0.2.5"
@@ -4843,6 +4854,7 @@ dependencies = [
"tokio",
"tracing",
"tracing-error",
"tracing-flame",
"tracing-subscriber",
"url",
"uuid",

View File

@@ -106,6 +106,17 @@ where
);
for retries in 0..RETRIES {
match spawned_process.try_wait() {
Ok(Some(exit)) => {
anyhow::bail!("process {process_name} already exited: {exit}")
}
Ok(None) => { /* not dead yet */ }
Err(e) => {
return Err(
anyhow::Error::new(e).context(format!("try_wait failed on process {pid}"))
)
}
}
match process_started(pid, Some(pid_file_to_check), &process_status_check) {
Ok(true) => {
println!("\n{process_name} started, pid: {pid}");
@@ -219,7 +230,12 @@ fn fill_rust_env_vars(cmd: &mut Command) -> &mut Command {
let mut filled_cmd = cmd.env_clear().env("RUST_BACKTRACE", backtrace_setting);
// Pass through these environment variables to the command
for var in ["LLVM_PROFILE_FILE", "FAILPOINTS", "RUST_LOG"] {
for var in [
"LLVM_PROFILE_FILE",
"FAILPOINTS",
"RUST_LOG",
"PAGESERVER_THREADS_PER_RUNTIME",
] {
if let Some(val) = std::env::var_os(var) {
filled_cmd = filled_cmd.env(var, val);
}

View File

@@ -40,6 +40,7 @@ uuid.workspace = true
pq_proto.workspace = true
metrics.workspace = true
workspace_hack.workspace = true
tracing-flame = "0.2"
[dev-dependencies]
byteorder.workspace = true

View File

@@ -104,6 +104,55 @@ pub fn init(
Ok(())
}
pub fn init_with_flame(
log_format: LogFormat,
tracing_error_layer_enablement: TracingErrorLayerEnablement,
) -> anyhow::Result<impl Drop> {
// We fall back to printing all spans at info-level or above if
// the RUST_LOG environment variable is not set.
let rust_log_env_filter = || {
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info"))
};
// NB: the order of the with() calls does not matter.
// See https://docs.rs/tracing-subscriber/0.3.16/tracing_subscriber/layer/index.html#per-layer-filtering
use tracing_subscriber::prelude::*;
let r = tracing_subscriber::registry();
let r = r.with({
let log_layer = tracing_subscriber::fmt::layer()
.with_target(false)
.with_ansi(atty::is(atty::Stream::Stdout))
.with_writer(std::io::stdout);
let log_layer = match log_format {
LogFormat::Json => log_layer.json().boxed(),
LogFormat::Plain => log_layer.boxed(),
LogFormat::Test => log_layer.with_test_writer().boxed(),
};
log_layer.with_filter(rust_log_env_filter())
});
let r = r.with(TracingEventCountLayer(&TRACING_EVENT_COUNT).with_filter(rust_log_env_filter()));
let (flame_layer, guard) = {
let (l, guard) = tracing_flame::FlameLayer::with_file("./tracing.folded").unwrap();
let l = l
.with_empty_samples(false)
.with_module_path(false)
.with_file_and_line(false)
.with_threads_collapsed(true);
(l, guard)
};
let r = r.with(flame_layer);
match tracing_error_layer_enablement {
TracingErrorLayerEnablement::EnableWithRustLogFilter => {
r.with(tracing_error::ErrorLayer::default().with_filter(rust_log_env_filter()))
.init();
}
TracingErrorLayerEnablement::Disabled => r.init(),
}
Ok(guard)
}
/// Disable the default rust panic hook by using `set_hook`.
///
/// For neon binaries, the assumption is that tracing is configured before with [`init`], after

View File

@@ -98,7 +98,7 @@ fn main() -> anyhow::Result<()> {
} else {
TracingErrorLayerEnablement::Disabled
};
logging::init(conf.log_format, tracing_error_layer_enablement)?;
let _guard = logging::init_with_flame(conf.log_format, tracing_error_layer_enablement)?;
// mind the order required here: 1. logging, 2. panic_hook, 3. sentry.
// disarming this hook on pageserver, because we never tear down tracing.
@@ -127,7 +127,7 @@ fn main() -> anyhow::Result<()> {
virtual_file::init(conf.max_file_descriptors);
page_cache::init(conf.page_cache_size);
start_pageserver(launch_ts, conf).context("Failed to start pageserver")?;
start_pageserver(launch_ts, conf, _guard).context("Failed to start pageserver")?;
scenario.teardown();
Ok(())
@@ -225,6 +225,7 @@ fn initialize_config(
fn start_pageserver(
launch_ts: &'static LaunchTimestamp,
conf: &'static PageServerConf,
guard: impl Drop,
) -> anyhow::Result<()> {
// Print version and launch timestamp to the log,
// and expose them as prometheus metrics.
@@ -341,16 +342,24 @@ fn start_pageserver(
let (init_done_tx, init_done_rx) = utils::completion::channel();
// Scan the local 'tenants/' directory and start loading the tenants
let span = tracing::info_span!("initial load");
let init_started_at = std::time::Instant::now();
BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(
conf,
broker_client.clone(),
remote_storage.clone(),
(init_done_tx, init_done_rx.clone()),
))?;
// initial load started by initializing tenant manager but stopped when initialization is
// actually complete
BACKGROUND_RUNTIME.block_on(
mgr::init_tenant_mgr(
conf,
broker_client.clone(),
remote_storage.clone(),
(init_done_tx, init_done_rx.clone()),
)
.instrument(span.clone()),
)?;
BACKGROUND_RUNTIME.spawn({
let init_done_rx = init_done_rx.clone();
async move {
init_done_rx.wait().await;
@@ -361,6 +370,7 @@ fn start_pageserver(
"Initial load completed."
);
}
.instrument(span)
});
// shared state between the disk-usage backed eviction background task and the http endpoint
@@ -484,7 +494,8 @@ fn start_pageserver(
}
// All started up! Now just sit and wait for shutdown signal.
ShutdownSignals::handle(|signal| match signal {
let mut guard = Some(guard);
ShutdownSignals::handle(move |signal| match signal {
Signal::Quit => {
info!(
"Got {}. Terminating in immediate shutdown mode",
@@ -498,7 +509,7 @@ fn start_pageserver(
"Got {}. Terminating gracefully in fast shutdown mode",
signal.name()
);
BACKGROUND_RUNTIME.block_on(pageserver::shutdown_pageserver(0));
BACKGROUND_RUNTIME.block_on(pageserver::shutdown_pageserver(0, guard.take()));
unreachable!()
}
})

View File

@@ -111,7 +111,7 @@ pub fn launch_disk_usage_global_eviction_task(
task_mgr::shutdown_token(),
)
.await;
info!("disk usage based eviction task finishing");
debug!("disk usage based eviction task finishing");
Ok(())
},
);
@@ -133,7 +133,7 @@ async fn disk_usage_eviction_task(
.await
.is_err()
{
info!("shutting down");
debug!("shutting down");
return;
}
}
@@ -168,7 +168,7 @@ async fn disk_usage_eviction_task(
tokio::select! {
_ = tokio::time::sleep_until(sleep_until) => {},
_ = cancel.cancelled() => {
info!("shutting down");
debug!("shutting down");
break
}
}

View File

@@ -45,8 +45,8 @@ static ZERO_PAGE: bytes::Bytes = bytes::Bytes::from_static(&[0u8; 8192]);
pub use crate::metrics::preinitialize_metrics;
#[tracing::instrument]
pub async fn shutdown_pageserver(exit_code: i32) {
#[tracing::instrument(skip(guard))]
pub async fn shutdown_pageserver(exit_code: i32, guard: Option<impl Drop>) {
// Shut down the libpq endpoint task. This prevents new connections from
// being accepted.
task_mgr::shutdown_tasks(Some(TaskKind::LibpqEndpointListener), None, None).await;
@@ -72,6 +72,9 @@ pub async fn shutdown_pageserver(exit_code: i32) {
// There should be nothing left, but let's be sure
task_mgr::shutdown_tasks(None, None, None).await;
info!("Shut down successfully completed");
drop(guard);
std::process::exit(exit_code);
}

View File

@@ -103,9 +103,23 @@ use crate::shutdown_pageserver;
// other operations, if the upload tasks e.g. get blocked on locks. It shouldn't
// happen, but still.
//
pub static THREADS_PER_RUNTIME: Lazy<std::num::NonZeroUsize> = Lazy::new(|| {
let num = std::env::var_os("PAGESERVER_THREADS_PER_RUNTIME")
.and_then(|var| var.to_str().map(|v| v.parse::<usize>()))
.expect("PAGESERVER_THREADS_PER_RUNTIME is unset")
.expect("PAGESERVER_THREADS_PER_RUNTIME is not an usize");
let num =
std::num::NonZeroUsize::new(num).expect("PAGESERVER_THREADS_PER_RUNTIME out of range");
num
});
pub static COMPUTE_REQUEST_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_multi_thread()
.thread_name("compute request worker")
.worker_threads(THREADS_PER_RUNTIME.get())
.enable_all()
.build()
.expect("Failed to create compute request runtime")
@@ -114,6 +128,7 @@ pub static COMPUTE_REQUEST_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
pub static MGMT_REQUEST_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_multi_thread()
.thread_name("mgmt request worker")
.worker_threads(THREADS_PER_RUNTIME.get())
.enable_all()
.build()
.expect("Failed to create mgmt request runtime")
@@ -122,6 +137,7 @@ pub static MGMT_REQUEST_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
pub static WALRECEIVER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_multi_thread()
.thread_name("walreceiver worker")
.worker_threads(THREADS_PER_RUNTIME.get())
.enable_all()
.build()
.expect("Failed to create walreceiver runtime")
@@ -130,6 +146,7 @@ pub static WALRECEIVER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
pub static BACKGROUND_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_multi_thread()
.thread_name("background op worker")
.worker_threads(THREADS_PER_RUNTIME.get())
.enable_all()
.build()
.expect("Failed to create background op runtime")
@@ -433,7 +450,7 @@ async fn task_finish(
}
if shutdown_process {
shutdown_pageserver(1).await;
shutdown_pageserver(1, None::<Box<u32>>).await;
}
}

View File

@@ -955,8 +955,7 @@ impl Tenant {
Ok(())
}
.instrument({
let span = tracing::info_span!(parent: None, "load", tenant_id=%tenant_id);
span.follows_from(Span::current());
let span = tracing::info_span!("load", tenant_id=%tenant_id);
span
}),
);
@@ -1102,14 +1101,14 @@ impl Tenant {
/// Subroutine of `load_tenant`, to load an individual timeline
///
/// NB: The parent is assumed to be already loaded!
#[instrument(skip_all, fields(timeline_id))]
#[instrument(skip_all, fields(timeline_id=%timeline_id))]
async fn load_local_timeline(
&self,
timeline_id: TimelineId,
local_metadata: TimelineMetadata,
ctx: &RequestContext,
) -> anyhow::Result<()> {
debug_assert_current_span_has_tenant_id();
debug_assert_current_span_has_tenant_and_timeline_id();
let remote_client = self.remote_storage.as_ref().map(|remote_storage| {
RemoteTimelineClient::new(
@@ -1394,6 +1393,8 @@ impl Tenant {
pitr: Duration,
ctx: &RequestContext,
) -> anyhow::Result<GcResult> {
debug_assert_current_span_has_tenant_id();
anyhow::ensure!(
self.is_active(),
"Cannot run GC iteration on inactive tenant"
@@ -1408,6 +1409,8 @@ impl Tenant {
/// Also it can be explicitly requested per timeline through page server
/// api's 'compact' command.
pub async fn compaction_iteration(&self, ctx: &RequestContext) -> anyhow::Result<()> {
debug_assert_current_span_has_tenant_id();
anyhow::ensure!(
self.is_active(),
"Cannot run compaction iteration on inactive tenant"
@@ -2141,7 +2144,7 @@ impl Tenant {
let target_config_path = conf.tenant_config_path(tenant_id);
let target_config_display = target_config_path.display();
info!("loading tenantconf from {target_config_display}");
debug!("loading tenantconf from {target_config_display}");
// FIXME If the config file is not found, assume that we're attaching
// a detached tenant and config is passed via attach command.

View File

@@ -212,7 +212,7 @@ pub fn schedule_local_tenant_processing(
)
}
} else {
info!("tenant {tenant_id} is assumed to be loadable, starting load operation");
trace!("tenant {tenant_id} is assumed to be loadable, starting load operation");
// Start loading the tenant into memory. It will initially be in Loading state.
Tenant::spawn_load(
conf,

View File

@@ -61,7 +61,7 @@ pub fn start_background_loops(tenant: &Arc<Tenant>, init_done: Option<&completio
///
async fn compaction_loop(tenant: Arc<Tenant>) {
let wait_duration = Duration::from_secs(2);
info!("starting");
debug!("starting");
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
async {
let cancel = task_mgr::shutdown_token();
@@ -72,7 +72,7 @@ async fn compaction_loop(tenant: Arc<Tenant>) {
tokio::select! {
_ = cancel.cancelled() => {
info!("received cancellation request");
debug!("received cancellation request");
return;
},
tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result {
@@ -95,7 +95,7 @@ async fn compaction_loop(tenant: Arc<Tenant>) {
let started_at = Instant::now();
let sleep_duration = if period == Duration::ZERO {
info!("automatic compaction is disabled");
debug!("automatic compaction is disabled");
// check again in 10 seconds, in case it's been enabled again.
Duration::from_secs(10)
} else {
@@ -113,7 +113,7 @@ async fn compaction_loop(tenant: Arc<Tenant>) {
// Sleep
tokio::select! {
_ = cancel.cancelled() => {
info!("received cancellation request during idling");
debug!("received cancellation request during idling");
break;
},
_ = tokio::time::sleep(sleep_duration) => {},
@@ -131,7 +131,7 @@ async fn compaction_loop(tenant: Arc<Tenant>) {
///
async fn gc_loop(tenant: Arc<Tenant>) {
let wait_duration = Duration::from_secs(2);
info!("starting");
debug!("starting");
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
async {
let cancel = task_mgr::shutdown_token();
@@ -145,7 +145,7 @@ async fn gc_loop(tenant: Arc<Tenant>) {
tokio::select! {
_ = cancel.cancelled() => {
info!("received cancellation request");
debug!("received cancellation request");
return;
},
tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result {
@@ -167,7 +167,7 @@ async fn gc_loop(tenant: Arc<Tenant>) {
let gc_horizon = tenant.get_gc_horizon();
let sleep_duration = if period == Duration::ZERO || gc_horizon == 0 {
info!("automatic GC is disabled");
debug!("automatic GC is disabled");
// check again in 10 seconds, in case it's been enabled again.
Duration::from_secs(10)
} else {
@@ -188,7 +188,7 @@ async fn gc_loop(tenant: Arc<Tenant>) {
// Sleep
tokio::select! {
_ = cancel.cancelled() => {
info!("received cancellation request during idling");
debug!("received cancellation request during idling");
break;
},
_ = tokio::time::sleep(sleep_duration) => {},

View File

@@ -674,7 +674,10 @@ impl Timeline {
}
/// Outermost timeline compaction operation; downloads needed layers.
#[instrument(skip_all)]
pub async fn compact(self: &Arc<Self>, ctx: &RequestContext) -> anyhow::Result<()> {
debug_assert_current_span_has_tenant_and_timeline_id();
const ROUNDS: usize = 2;
let last_record_lsn = self.get_last_record_lsn();
@@ -1434,7 +1437,7 @@ impl Timeline {
match *flush_loop_state {
FlushLoopState::NotStarted => (),
FlushLoopState::Running => {
info!(
debug!(
"skipping attempt to start flush_loop twice {}/{}",
self.tenant_id, self.timeline_id
);
@@ -1452,7 +1455,7 @@ impl Timeline {
let layer_flush_start_rx = self.layer_flush_start_tx.subscribe();
let self_clone = Arc::clone(self);
info!("spawning flush loop");
debug!("spawning flush loop");
task_mgr::spawn(
task_mgr::BACKGROUND_RUNTIME.handle(),
task_mgr::TaskKind::LayerFlushTask,
@@ -1468,7 +1471,11 @@ impl Timeline {
*flush_loop_state = FlushLoopState::Exited;
Ok(())
}
.instrument(info_span!(parent: None, "layer flush task", tenant = %self.tenant_id, timeline = %self.timeline_id))
.instrument({
let span = info_span!(parent: None, "layer flush task", tenant_id = %self.tenant_id, timeline_id = %self.timeline_id);
span.follows_from(Span::current());
span
})
);
*flush_loop_state = FlushLoopState::Running;
@@ -1483,7 +1490,7 @@ impl Timeline {
ctx: &RequestContext,
broker_client: BrokerClientChannel,
) {
info!(
debug!(
"launching WAL receiver for timeline {} of tenant {}",
self.timeline_id, self.tenant_id
);
@@ -1604,7 +1611,7 @@ impl Timeline {
} else if fname == METADATA_FILE_NAME || fname.ends_with(".old") {
// ignore these
} else if remote_timeline_client::is_temp_download_file(&direntry_path) {
info!(
debug!(
"skipping temp download file, reconcile_with_remote will resume / clean up: {}",
fname
);
@@ -1613,7 +1620,7 @@ impl Timeline {
trace!("deleting old ephemeral file in timeline dir: {}", fname);
fs::remove_file(&direntry_path)?;
} else if is_temporary(&direntry_path) {
info!("removing temp timeline file at {}", direntry_path.display());
debug!("removing temp timeline file at {}", direntry_path.display());
fs::remove_file(&direntry_path).with_context(|| {
format!(
"failed to remove temp download file at {}",
@@ -1710,7 +1717,7 @@ impl Timeline {
}
}
info!(
debug!(
"remote layer does not exist locally, creating remote layer: {}",
remote_layer_name.file_name()
);
@@ -1795,7 +1802,7 @@ impl Timeline {
up_to_date_metadata: &TimelineMetadata,
index_part: Option<&IndexPart>,
) -> anyhow::Result<()> {
info!("starting");
trace!("starting");
let remote_client = self
.remote_client
.as_ref()
@@ -1815,7 +1822,7 @@ impl Timeline {
let has_local_layers = !local_layers.is_empty();
let local_only_layers = match index_part {
Some(index_part) => {
info!(
debug!(
"initializing upload queue from remote index with {} layer files",
index_part.timeline_layers.len()
);
@@ -1824,7 +1831,7 @@ impl Timeline {
.await?
}
None => {
info!("initializing upload queue as empty");
debug!("initializing upload queue as empty");
remote_client.init_upload_queue_for_empty_remote(up_to_date_metadata)?;
local_layers
}
@@ -1842,7 +1849,7 @@ impl Timeline {
.metadata()
.with_context(|| format!("failed to get file {layer_path:?} metadata"))?
.len();
info!("scheduling {layer_path:?} for upload");
debug!("scheduling {layer_path:?} for upload");
remote_client
.schedule_layer_file_upload(layer_name, &LayerFileMetadata::new(layer_size))?;
}
@@ -1865,7 +1872,7 @@ impl Timeline {
// Local timeline has a metadata file, remote one too, both have no layers to sync.
}
info!("Done");
trace!("Done");
Ok(())
}
@@ -1887,7 +1894,7 @@ impl Timeline {
.get()
.is_none());
info!(
debug!(
"spawning logical size computation from context of task kind {:?}",
ctx.task_kind()
);
@@ -2081,6 +2088,7 @@ impl Timeline {
///
/// NOTE: counted incrementally, includes ancestors. This can be a slow operation,
/// especially if we need to download remote layers.
#[instrument(skip_all)]
pub async fn calculate_logical_size(
&self,
up_to_lsn: Lsn,
@@ -2088,7 +2096,7 @@ impl Timeline {
cancel: CancellationToken,
ctx: &RequestContext,
) -> Result<u64, CalculateLogicalSizeError> {
info!(
debug!(
"Calculating logical size for timeline {} at {}",
self.timeline_id, up_to_lsn
);
@@ -2130,8 +2138,8 @@ impl Timeline {
let logical_size = self
.get_current_logical_size_non_incremental(up_to_lsn, cancel, ctx)
.await?;
debug!("calculated logical size: {logical_size}");
timer.stop_and_record();
info!("calculated logical size: {logical_size}");
Ok(logical_size)
}
@@ -2643,11 +2651,11 @@ impl Timeline {
mut layer_flush_start_rx: tokio::sync::watch::Receiver<u64>,
ctx: &RequestContext,
) {
info!("started flush loop");
debug!("started flush loop");
loop {
tokio::select! {
_ = task_mgr::shutdown_watcher() => {
info!("shutting down layer flush task");
debug!("shutting down layer flush task");
break;
},
_ = layer_flush_start_rx.changed() => {}
@@ -2910,6 +2918,7 @@ impl Timeline {
Ok((new_delta_filename, LayerFileMetadata::new(sz)))
}
#[instrument(skip_all)]
async fn repartition(
&self,
lsn: Lsn,
@@ -3015,6 +3024,7 @@ impl Timeline {
Ok(false)
}
#[instrument(skip_all)]
async fn create_image_layers(
&self,
partitioning: &KeyPartitioning,
@@ -3238,7 +3248,7 @@ impl Timeline {
let remotes = deltas_to_compact
.iter()
.filter(|l| l.is_remote_layer())
.inspect(|l| info!("compact requires download of {}", l.filename().file_name()))
.inspect(|l| debug!("compact requires download of {}", l.filename().file_name()))
.map(|l| {
l.clone()
.downcast_remote_layer()
@@ -3262,7 +3272,7 @@ impl Timeline {
);
for l in deltas_to_compact.iter() {
info!("compact includes {}", l.filename().file_name());
debug!("compact includes {}", l.filename().file_name());
}
// We don't need the original list of layers anymore. Drop it so that
@@ -3790,7 +3800,7 @@ impl Timeline {
write_guard.store_and_unlock(new_gc_cutoff).wait();
}
info!("GC starting");
debug!("GC starting");
debug!("retain_lsns: {:?}", retain_lsns);

View File

@@ -22,7 +22,7 @@ use std::{
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, info_span, instrument, warn, Instrument};
use tracing::{debug, error, info, info_span, instrument, trace, warn, Instrument};
use crate::{
context::{DownloadBehavior, RequestContext},
@@ -58,7 +58,7 @@ impl Timeline {
false,
async move {
self_clone.eviction_task(task_mgr::shutdown_token()).await;
info!("eviction task finishing");
debug!("eviction task finishing");
Ok(())
},
);
@@ -74,7 +74,7 @@ impl Timeline {
EvictionPolicy::NoEviction => Duration::from_secs(10),
};
if random_init_delay(period, &cancel).await.is_err() {
info!("shutting down");
trace!("shutting down");
return;
}
}
@@ -89,7 +89,7 @@ impl Timeline {
ControlFlow::Continue(sleep_until) => {
tokio::select! {
_ = cancel.cancelled() => {
info!("shutting down");
trace!("shutting down");
break;
}
_ = tokio::time::sleep_until(sleep_until) => { }
@@ -348,7 +348,6 @@ impl Timeline {
cancel.clone(),
ctx,
)
.instrument(info_span!("calculate_logical_size"))
.await;
match &size {

View File

@@ -85,7 +85,7 @@ impl WalReceiver {
&format!("walreceiver for timeline {tenant_id}/{timeline_id}"),
false,
async move {
info!("WAL receiver manager started, connecting to broker");
debug!("WAL receiver manager started, connecting to broker");
let mut connection_manager_state = ConnectionManagerState::new(
timeline,
conf,
@@ -93,7 +93,7 @@ impl WalReceiver {
loop {
select! {
_ = task_mgr::shutdown_watcher() => {
info!("WAL receiver shutdown requested, shutting down");
trace!("WAL receiver shutdown requested, shutting down");
break;
},
loop_step_result = connection_manager_loop_step(
@@ -104,7 +104,7 @@ impl WalReceiver {
) => match loop_step_result {
ControlFlow::Continue(()) => continue,
ControlFlow::Break(()) => {
info!("Connection manager loop ended, shutting down");
trace!("Connection manager loop ended, shutting down");
break;
}
},
@@ -115,7 +115,11 @@ impl WalReceiver {
*loop_status.write().unwrap() = None;
Ok(())
}
.instrument(info_span!(parent: None, "wal_connection_manager", tenant = %tenant_id, timeline = %timeline_id))
.instrument({
let span = info_span!(parent: None, "wal_connection_manager", tenant = %tenant_id, timeline = %timeline_id);
span.follows_from(Span::current());
span
})
);
Self {
@@ -214,7 +218,7 @@ impl<E: Clone> TaskHandle<E> {
// So, tone them down to info-level.
//
// XXX: rewrite this module to eliminate the race condition.
info!("sender is dropped while join handle is still alive");
debug!("sender is dropped while join handle is still alive");
}
let res = jh

View File

@@ -56,7 +56,7 @@ pub(super) async fn connection_manager_loop_step(
{
Ok(()) => {}
Err(_) => {
info!("Timeline dropped state updates sender before becoming active, stopping wal connection manager loop");
debug!("Timeline dropped state updates sender before becoming active, stopping wal connection manager loop");
return ControlFlow::Break(());
}
}
@@ -79,7 +79,7 @@ pub(super) async fn connection_manager_loop_step(
// with other streams on this client (other connection managers). When
// object goes out of scope, stream finishes in drop() automatically.
let mut broker_subscription = subscribe_for_timeline_updates(broker_client, id).await;
info!("Subscribed for broker timeline updates");
debug!("Subscribed for broker timeline updates");
loop {
let time_until_next_retry = connection_manager_state.time_until_next_retry();
@@ -151,7 +151,7 @@ pub(super) async fn connection_manager_loop_step(
// we're already active as walreceiver, no need to reactivate
TimelineState::Active => continue,
TimelineState::Broken | TimelineState::Stopping => {
info!("timeline entered terminal state {new_state:?}, stopping wal connection manager loop");
debug!("timeline entered terminal state {new_state:?}, stopping wal connection manager loop");
return ControlFlow::Break(());
}
TimelineState::Loading => {
@@ -165,11 +165,11 @@ pub(super) async fn connection_manager_loop_step(
}
} => match new_event {
ControlFlow::Continue(new_state) => {
info!("observed timeline state change, new state is {new_state:?}");
debug!("observed timeline state change, new state is {new_state:?}");
return ControlFlow::Continue(());
}
ControlFlow::Break(()) => {
info!("Timeline dropped state updates sender, stopping wal connection manager loop");
debug!("Timeline dropped state updates sender, stopping wal connection manager loop");
return ControlFlow::Break(());
}
},
@@ -470,7 +470,7 @@ impl ConnectionManagerState {
if let Some(next) = &retry.next_retry_at {
if next > &now {
info!(
debug!(
"Next connection retry to {:?} is at {}",
wal_connection.sk_id, next
);
@@ -515,7 +515,7 @@ impl ConnectionManagerState {
);
if old_entry.is_none() {
info!("New SK node was added: {new_safekeeper_id}");
debug!("New SK node was added: {new_safekeeper_id}");
WALRECEIVER_CANDIDATES_ADDED.inc();
}
}

View File

@@ -119,6 +119,7 @@ pub(super) async fn handle_walreceiver_connection(
ctx.download_behavior(),
);
let connection_cancellation = cancellation.clone();
use tracing::Instrument;
task_mgr::spawn(
WALRECEIVER_RUNTIME.handle(),
TaskKind::WalReceiverConnectionPoller,
@@ -140,7 +141,8 @@ pub(super) async fn handle_walreceiver_connection(
_ = connection_cancellation.cancelled() => info!("Connection cancelled"),
}
Ok(())
},
}
.instrument(tracing::info_span!("walreceiver")),
);
// Immediately increment the gauge, then create a job to decrement it on task exit.
@@ -153,7 +155,7 @@ pub(super) async fn handle_walreceiver_connection(
}
let identify = identify_system(&mut replication_client).await?;
info!("{identify:?}");
debug!("{identify:?}");
let end_of_wal = Lsn::from(u64::from(identify.xlogpos));
let mut caught_up = false;