mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 10:22:56 +00:00
Compare commits
7 Commits
rc/2024-10
...
try_startu
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
677f46301f | ||
|
|
2725f6197f | ||
|
|
2c55c5a0ea | ||
|
|
777bee85fa | ||
|
|
5fc725031b | ||
|
|
7963237c43 | ||
|
|
2d9d679238 |
12
Cargo.lock
generated
12
Cargo.lock
generated
@@ -4590,6 +4590,17 @@ dependencies = [
|
||||
"tracing-subscriber",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing-flame"
|
||||
version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0bae117ee14789185e129aaee5d93750abe67fdc5a9a62650452bfe4e122a3a9"
|
||||
dependencies = [
|
||||
"lazy_static",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing-futures"
|
||||
version = "0.2.5"
|
||||
@@ -4843,6 +4854,7 @@ dependencies = [
|
||||
"tokio",
|
||||
"tracing",
|
||||
"tracing-error",
|
||||
"tracing-flame",
|
||||
"tracing-subscriber",
|
||||
"url",
|
||||
"uuid",
|
||||
|
||||
@@ -106,6 +106,17 @@ where
|
||||
);
|
||||
|
||||
for retries in 0..RETRIES {
|
||||
match spawned_process.try_wait() {
|
||||
Ok(Some(exit)) => {
|
||||
anyhow::bail!("process {process_name} already exited: {exit}")
|
||||
}
|
||||
Ok(None) => { /* not dead yet */ }
|
||||
Err(e) => {
|
||||
return Err(
|
||||
anyhow::Error::new(e).context(format!("try_wait failed on process {pid}"))
|
||||
)
|
||||
}
|
||||
}
|
||||
match process_started(pid, Some(pid_file_to_check), &process_status_check) {
|
||||
Ok(true) => {
|
||||
println!("\n{process_name} started, pid: {pid}");
|
||||
@@ -219,7 +230,12 @@ fn fill_rust_env_vars(cmd: &mut Command) -> &mut Command {
|
||||
let mut filled_cmd = cmd.env_clear().env("RUST_BACKTRACE", backtrace_setting);
|
||||
|
||||
// Pass through these environment variables to the command
|
||||
for var in ["LLVM_PROFILE_FILE", "FAILPOINTS", "RUST_LOG"] {
|
||||
for var in [
|
||||
"LLVM_PROFILE_FILE",
|
||||
"FAILPOINTS",
|
||||
"RUST_LOG",
|
||||
"PAGESERVER_THREADS_PER_RUNTIME",
|
||||
] {
|
||||
if let Some(val) = std::env::var_os(var) {
|
||||
filled_cmd = filled_cmd.env(var, val);
|
||||
}
|
||||
|
||||
@@ -40,6 +40,7 @@ uuid.workspace = true
|
||||
pq_proto.workspace = true
|
||||
metrics.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
tracing-flame = "0.2"
|
||||
|
||||
[dev-dependencies]
|
||||
byteorder.workspace = true
|
||||
|
||||
@@ -104,6 +104,55 @@ pub fn init(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn init_with_flame(
|
||||
log_format: LogFormat,
|
||||
tracing_error_layer_enablement: TracingErrorLayerEnablement,
|
||||
) -> anyhow::Result<impl Drop> {
|
||||
// We fall back to printing all spans at info-level or above if
|
||||
// the RUST_LOG environment variable is not set.
|
||||
let rust_log_env_filter = || {
|
||||
tracing_subscriber::EnvFilter::try_from_default_env()
|
||||
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info"))
|
||||
};
|
||||
|
||||
// NB: the order of the with() calls does not matter.
|
||||
// See https://docs.rs/tracing-subscriber/0.3.16/tracing_subscriber/layer/index.html#per-layer-filtering
|
||||
use tracing_subscriber::prelude::*;
|
||||
let r = tracing_subscriber::registry();
|
||||
let r = r.with({
|
||||
let log_layer = tracing_subscriber::fmt::layer()
|
||||
.with_target(false)
|
||||
.with_ansi(atty::is(atty::Stream::Stdout))
|
||||
.with_writer(std::io::stdout);
|
||||
let log_layer = match log_format {
|
||||
LogFormat::Json => log_layer.json().boxed(),
|
||||
LogFormat::Plain => log_layer.boxed(),
|
||||
LogFormat::Test => log_layer.with_test_writer().boxed(),
|
||||
};
|
||||
log_layer.with_filter(rust_log_env_filter())
|
||||
});
|
||||
let r = r.with(TracingEventCountLayer(&TRACING_EVENT_COUNT).with_filter(rust_log_env_filter()));
|
||||
let (flame_layer, guard) = {
|
||||
let (l, guard) = tracing_flame::FlameLayer::with_file("./tracing.folded").unwrap();
|
||||
let l = l
|
||||
.with_empty_samples(false)
|
||||
.with_module_path(false)
|
||||
.with_file_and_line(false)
|
||||
.with_threads_collapsed(true);
|
||||
(l, guard)
|
||||
};
|
||||
let r = r.with(flame_layer);
|
||||
match tracing_error_layer_enablement {
|
||||
TracingErrorLayerEnablement::EnableWithRustLogFilter => {
|
||||
r.with(tracing_error::ErrorLayer::default().with_filter(rust_log_env_filter()))
|
||||
.init();
|
||||
}
|
||||
TracingErrorLayerEnablement::Disabled => r.init(),
|
||||
}
|
||||
|
||||
Ok(guard)
|
||||
}
|
||||
|
||||
/// Disable the default rust panic hook by using `set_hook`.
|
||||
///
|
||||
/// For neon binaries, the assumption is that tracing is configured before with [`init`], after
|
||||
|
||||
@@ -98,7 +98,7 @@ fn main() -> anyhow::Result<()> {
|
||||
} else {
|
||||
TracingErrorLayerEnablement::Disabled
|
||||
};
|
||||
logging::init(conf.log_format, tracing_error_layer_enablement)?;
|
||||
let _guard = logging::init_with_flame(conf.log_format, tracing_error_layer_enablement)?;
|
||||
|
||||
// mind the order required here: 1. logging, 2. panic_hook, 3. sentry.
|
||||
// disarming this hook on pageserver, because we never tear down tracing.
|
||||
@@ -127,7 +127,7 @@ fn main() -> anyhow::Result<()> {
|
||||
virtual_file::init(conf.max_file_descriptors);
|
||||
page_cache::init(conf.page_cache_size);
|
||||
|
||||
start_pageserver(launch_ts, conf).context("Failed to start pageserver")?;
|
||||
start_pageserver(launch_ts, conf, _guard).context("Failed to start pageserver")?;
|
||||
|
||||
scenario.teardown();
|
||||
Ok(())
|
||||
@@ -225,6 +225,7 @@ fn initialize_config(
|
||||
fn start_pageserver(
|
||||
launch_ts: &'static LaunchTimestamp,
|
||||
conf: &'static PageServerConf,
|
||||
guard: impl Drop,
|
||||
) -> anyhow::Result<()> {
|
||||
// Print version and launch timestamp to the log,
|
||||
// and expose them as prometheus metrics.
|
||||
@@ -341,16 +342,24 @@ fn start_pageserver(
|
||||
let (init_done_tx, init_done_rx) = utils::completion::channel();
|
||||
|
||||
// Scan the local 'tenants/' directory and start loading the tenants
|
||||
let span = tracing::info_span!("initial load");
|
||||
let init_started_at = std::time::Instant::now();
|
||||
BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(
|
||||
conf,
|
||||
broker_client.clone(),
|
||||
remote_storage.clone(),
|
||||
(init_done_tx, init_done_rx.clone()),
|
||||
))?;
|
||||
|
||||
// initial load started by initializing tenant manager but stopped when initialization is
|
||||
// actually complete
|
||||
BACKGROUND_RUNTIME.block_on(
|
||||
mgr::init_tenant_mgr(
|
||||
conf,
|
||||
broker_client.clone(),
|
||||
remote_storage.clone(),
|
||||
(init_done_tx, init_done_rx.clone()),
|
||||
)
|
||||
.instrument(span.clone()),
|
||||
)?;
|
||||
|
||||
BACKGROUND_RUNTIME.spawn({
|
||||
let init_done_rx = init_done_rx.clone();
|
||||
|
||||
async move {
|
||||
init_done_rx.wait().await;
|
||||
|
||||
@@ -361,6 +370,7 @@ fn start_pageserver(
|
||||
"Initial load completed."
|
||||
);
|
||||
}
|
||||
.instrument(span)
|
||||
});
|
||||
|
||||
// shared state between the disk-usage backed eviction background task and the http endpoint
|
||||
@@ -484,7 +494,8 @@ fn start_pageserver(
|
||||
}
|
||||
|
||||
// All started up! Now just sit and wait for shutdown signal.
|
||||
ShutdownSignals::handle(|signal| match signal {
|
||||
let mut guard = Some(guard);
|
||||
ShutdownSignals::handle(move |signal| match signal {
|
||||
Signal::Quit => {
|
||||
info!(
|
||||
"Got {}. Terminating in immediate shutdown mode",
|
||||
@@ -498,7 +509,7 @@ fn start_pageserver(
|
||||
"Got {}. Terminating gracefully in fast shutdown mode",
|
||||
signal.name()
|
||||
);
|
||||
BACKGROUND_RUNTIME.block_on(pageserver::shutdown_pageserver(0));
|
||||
BACKGROUND_RUNTIME.block_on(pageserver::shutdown_pageserver(0, guard.take()));
|
||||
unreachable!()
|
||||
}
|
||||
})
|
||||
|
||||
@@ -111,7 +111,7 @@ pub fn launch_disk_usage_global_eviction_task(
|
||||
task_mgr::shutdown_token(),
|
||||
)
|
||||
.await;
|
||||
info!("disk usage based eviction task finishing");
|
||||
debug!("disk usage based eviction task finishing");
|
||||
Ok(())
|
||||
},
|
||||
);
|
||||
@@ -133,7 +133,7 @@ async fn disk_usage_eviction_task(
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
info!("shutting down");
|
||||
debug!("shutting down");
|
||||
return;
|
||||
}
|
||||
}
|
||||
@@ -168,7 +168,7 @@ async fn disk_usage_eviction_task(
|
||||
tokio::select! {
|
||||
_ = tokio::time::sleep_until(sleep_until) => {},
|
||||
_ = cancel.cancelled() => {
|
||||
info!("shutting down");
|
||||
debug!("shutting down");
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
@@ -45,8 +45,8 @@ static ZERO_PAGE: bytes::Bytes = bytes::Bytes::from_static(&[0u8; 8192]);
|
||||
|
||||
pub use crate::metrics::preinitialize_metrics;
|
||||
|
||||
#[tracing::instrument]
|
||||
pub async fn shutdown_pageserver(exit_code: i32) {
|
||||
#[tracing::instrument(skip(guard))]
|
||||
pub async fn shutdown_pageserver(exit_code: i32, guard: Option<impl Drop>) {
|
||||
// Shut down the libpq endpoint task. This prevents new connections from
|
||||
// being accepted.
|
||||
task_mgr::shutdown_tasks(Some(TaskKind::LibpqEndpointListener), None, None).await;
|
||||
@@ -72,6 +72,9 @@ pub async fn shutdown_pageserver(exit_code: i32) {
|
||||
// There should be nothing left, but let's be sure
|
||||
task_mgr::shutdown_tasks(None, None, None).await;
|
||||
info!("Shut down successfully completed");
|
||||
|
||||
drop(guard);
|
||||
|
||||
std::process::exit(exit_code);
|
||||
}
|
||||
|
||||
|
||||
@@ -103,9 +103,23 @@ use crate::shutdown_pageserver;
|
||||
// other operations, if the upload tasks e.g. get blocked on locks. It shouldn't
|
||||
// happen, but still.
|
||||
//
|
||||
|
||||
pub static THREADS_PER_RUNTIME: Lazy<std::num::NonZeroUsize> = Lazy::new(|| {
|
||||
let num = std::env::var_os("PAGESERVER_THREADS_PER_RUNTIME")
|
||||
.and_then(|var| var.to_str().map(|v| v.parse::<usize>()))
|
||||
.expect("PAGESERVER_THREADS_PER_RUNTIME is unset")
|
||||
.expect("PAGESERVER_THREADS_PER_RUNTIME is not an usize");
|
||||
|
||||
let num =
|
||||
std::num::NonZeroUsize::new(num).expect("PAGESERVER_THREADS_PER_RUNTIME out of range");
|
||||
|
||||
num
|
||||
});
|
||||
|
||||
pub static COMPUTE_REQUEST_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.thread_name("compute request worker")
|
||||
.worker_threads(THREADS_PER_RUNTIME.get())
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("Failed to create compute request runtime")
|
||||
@@ -114,6 +128,7 @@ pub static COMPUTE_REQUEST_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
||||
pub static MGMT_REQUEST_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.thread_name("mgmt request worker")
|
||||
.worker_threads(THREADS_PER_RUNTIME.get())
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("Failed to create mgmt request runtime")
|
||||
@@ -122,6 +137,7 @@ pub static MGMT_REQUEST_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
||||
pub static WALRECEIVER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.thread_name("walreceiver worker")
|
||||
.worker_threads(THREADS_PER_RUNTIME.get())
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("Failed to create walreceiver runtime")
|
||||
@@ -130,6 +146,7 @@ pub static WALRECEIVER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
||||
pub static BACKGROUND_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.thread_name("background op worker")
|
||||
.worker_threads(THREADS_PER_RUNTIME.get())
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("Failed to create background op runtime")
|
||||
@@ -433,7 +450,7 @@ async fn task_finish(
|
||||
}
|
||||
|
||||
if shutdown_process {
|
||||
shutdown_pageserver(1).await;
|
||||
shutdown_pageserver(1, None::<Box<u32>>).await;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -955,8 +955,7 @@ impl Tenant {
|
||||
Ok(())
|
||||
}
|
||||
.instrument({
|
||||
let span = tracing::info_span!(parent: None, "load", tenant_id=%tenant_id);
|
||||
span.follows_from(Span::current());
|
||||
let span = tracing::info_span!("load", tenant_id=%tenant_id);
|
||||
span
|
||||
}),
|
||||
);
|
||||
@@ -1102,14 +1101,14 @@ impl Tenant {
|
||||
/// Subroutine of `load_tenant`, to load an individual timeline
|
||||
///
|
||||
/// NB: The parent is assumed to be already loaded!
|
||||
#[instrument(skip_all, fields(timeline_id))]
|
||||
#[instrument(skip_all, fields(timeline_id=%timeline_id))]
|
||||
async fn load_local_timeline(
|
||||
&self,
|
||||
timeline_id: TimelineId,
|
||||
local_metadata: TimelineMetadata,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
debug_assert_current_span_has_tenant_id();
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
let remote_client = self.remote_storage.as_ref().map(|remote_storage| {
|
||||
RemoteTimelineClient::new(
|
||||
@@ -1394,6 +1393,8 @@ impl Tenant {
|
||||
pitr: Duration,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<GcResult> {
|
||||
debug_assert_current_span_has_tenant_id();
|
||||
|
||||
anyhow::ensure!(
|
||||
self.is_active(),
|
||||
"Cannot run GC iteration on inactive tenant"
|
||||
@@ -1408,6 +1409,8 @@ impl Tenant {
|
||||
/// Also it can be explicitly requested per timeline through page server
|
||||
/// api's 'compact' command.
|
||||
pub async fn compaction_iteration(&self, ctx: &RequestContext) -> anyhow::Result<()> {
|
||||
debug_assert_current_span_has_tenant_id();
|
||||
|
||||
anyhow::ensure!(
|
||||
self.is_active(),
|
||||
"Cannot run compaction iteration on inactive tenant"
|
||||
@@ -2141,7 +2144,7 @@ impl Tenant {
|
||||
let target_config_path = conf.tenant_config_path(tenant_id);
|
||||
let target_config_display = target_config_path.display();
|
||||
|
||||
info!("loading tenantconf from {target_config_display}");
|
||||
debug!("loading tenantconf from {target_config_display}");
|
||||
|
||||
// FIXME If the config file is not found, assume that we're attaching
|
||||
// a detached tenant and config is passed via attach command.
|
||||
|
||||
@@ -212,7 +212,7 @@ pub fn schedule_local_tenant_processing(
|
||||
)
|
||||
}
|
||||
} else {
|
||||
info!("tenant {tenant_id} is assumed to be loadable, starting load operation");
|
||||
trace!("tenant {tenant_id} is assumed to be loadable, starting load operation");
|
||||
// Start loading the tenant into memory. It will initially be in Loading state.
|
||||
Tenant::spawn_load(
|
||||
conf,
|
||||
|
||||
@@ -61,7 +61,7 @@ pub fn start_background_loops(tenant: &Arc<Tenant>, init_done: Option<&completio
|
||||
///
|
||||
async fn compaction_loop(tenant: Arc<Tenant>) {
|
||||
let wait_duration = Duration::from_secs(2);
|
||||
info!("starting");
|
||||
debug!("starting");
|
||||
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
|
||||
async {
|
||||
let cancel = task_mgr::shutdown_token();
|
||||
@@ -72,7 +72,7 @@ async fn compaction_loop(tenant: Arc<Tenant>) {
|
||||
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => {
|
||||
info!("received cancellation request");
|
||||
debug!("received cancellation request");
|
||||
return;
|
||||
},
|
||||
tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result {
|
||||
@@ -95,7 +95,7 @@ async fn compaction_loop(tenant: Arc<Tenant>) {
|
||||
let started_at = Instant::now();
|
||||
|
||||
let sleep_duration = if period == Duration::ZERO {
|
||||
info!("automatic compaction is disabled");
|
||||
debug!("automatic compaction is disabled");
|
||||
// check again in 10 seconds, in case it's been enabled again.
|
||||
Duration::from_secs(10)
|
||||
} else {
|
||||
@@ -113,7 +113,7 @@ async fn compaction_loop(tenant: Arc<Tenant>) {
|
||||
// Sleep
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => {
|
||||
info!("received cancellation request during idling");
|
||||
debug!("received cancellation request during idling");
|
||||
break;
|
||||
},
|
||||
_ = tokio::time::sleep(sleep_duration) => {},
|
||||
@@ -131,7 +131,7 @@ async fn compaction_loop(tenant: Arc<Tenant>) {
|
||||
///
|
||||
async fn gc_loop(tenant: Arc<Tenant>) {
|
||||
let wait_duration = Duration::from_secs(2);
|
||||
info!("starting");
|
||||
debug!("starting");
|
||||
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
|
||||
async {
|
||||
let cancel = task_mgr::shutdown_token();
|
||||
@@ -145,7 +145,7 @@ async fn gc_loop(tenant: Arc<Tenant>) {
|
||||
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => {
|
||||
info!("received cancellation request");
|
||||
debug!("received cancellation request");
|
||||
return;
|
||||
},
|
||||
tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result {
|
||||
@@ -167,7 +167,7 @@ async fn gc_loop(tenant: Arc<Tenant>) {
|
||||
|
||||
let gc_horizon = tenant.get_gc_horizon();
|
||||
let sleep_duration = if period == Duration::ZERO || gc_horizon == 0 {
|
||||
info!("automatic GC is disabled");
|
||||
debug!("automatic GC is disabled");
|
||||
// check again in 10 seconds, in case it's been enabled again.
|
||||
Duration::from_secs(10)
|
||||
} else {
|
||||
@@ -188,7 +188,7 @@ async fn gc_loop(tenant: Arc<Tenant>) {
|
||||
// Sleep
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => {
|
||||
info!("received cancellation request during idling");
|
||||
debug!("received cancellation request during idling");
|
||||
break;
|
||||
},
|
||||
_ = tokio::time::sleep(sleep_duration) => {},
|
||||
|
||||
@@ -674,7 +674,10 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Outermost timeline compaction operation; downloads needed layers.
|
||||
#[instrument(skip_all)]
|
||||
pub async fn compact(self: &Arc<Self>, ctx: &RequestContext) -> anyhow::Result<()> {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
const ROUNDS: usize = 2;
|
||||
|
||||
let last_record_lsn = self.get_last_record_lsn();
|
||||
@@ -1434,7 +1437,7 @@ impl Timeline {
|
||||
match *flush_loop_state {
|
||||
FlushLoopState::NotStarted => (),
|
||||
FlushLoopState::Running => {
|
||||
info!(
|
||||
debug!(
|
||||
"skipping attempt to start flush_loop twice {}/{}",
|
||||
self.tenant_id, self.timeline_id
|
||||
);
|
||||
@@ -1452,7 +1455,7 @@ impl Timeline {
|
||||
let layer_flush_start_rx = self.layer_flush_start_tx.subscribe();
|
||||
let self_clone = Arc::clone(self);
|
||||
|
||||
info!("spawning flush loop");
|
||||
debug!("spawning flush loop");
|
||||
task_mgr::spawn(
|
||||
task_mgr::BACKGROUND_RUNTIME.handle(),
|
||||
task_mgr::TaskKind::LayerFlushTask,
|
||||
@@ -1468,7 +1471,11 @@ impl Timeline {
|
||||
*flush_loop_state = FlushLoopState::Exited;
|
||||
Ok(())
|
||||
}
|
||||
.instrument(info_span!(parent: None, "layer flush task", tenant = %self.tenant_id, timeline = %self.timeline_id))
|
||||
.instrument({
|
||||
let span = info_span!(parent: None, "layer flush task", tenant_id = %self.tenant_id, timeline_id = %self.timeline_id);
|
||||
span.follows_from(Span::current());
|
||||
span
|
||||
})
|
||||
);
|
||||
|
||||
*flush_loop_state = FlushLoopState::Running;
|
||||
@@ -1483,7 +1490,7 @@ impl Timeline {
|
||||
ctx: &RequestContext,
|
||||
broker_client: BrokerClientChannel,
|
||||
) {
|
||||
info!(
|
||||
debug!(
|
||||
"launching WAL receiver for timeline {} of tenant {}",
|
||||
self.timeline_id, self.tenant_id
|
||||
);
|
||||
@@ -1604,7 +1611,7 @@ impl Timeline {
|
||||
} else if fname == METADATA_FILE_NAME || fname.ends_with(".old") {
|
||||
// ignore these
|
||||
} else if remote_timeline_client::is_temp_download_file(&direntry_path) {
|
||||
info!(
|
||||
debug!(
|
||||
"skipping temp download file, reconcile_with_remote will resume / clean up: {}",
|
||||
fname
|
||||
);
|
||||
@@ -1613,7 +1620,7 @@ impl Timeline {
|
||||
trace!("deleting old ephemeral file in timeline dir: {}", fname);
|
||||
fs::remove_file(&direntry_path)?;
|
||||
} else if is_temporary(&direntry_path) {
|
||||
info!("removing temp timeline file at {}", direntry_path.display());
|
||||
debug!("removing temp timeline file at {}", direntry_path.display());
|
||||
fs::remove_file(&direntry_path).with_context(|| {
|
||||
format!(
|
||||
"failed to remove temp download file at {}",
|
||||
@@ -1710,7 +1717,7 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
info!(
|
||||
debug!(
|
||||
"remote layer does not exist locally, creating remote layer: {}",
|
||||
remote_layer_name.file_name()
|
||||
);
|
||||
@@ -1795,7 +1802,7 @@ impl Timeline {
|
||||
up_to_date_metadata: &TimelineMetadata,
|
||||
index_part: Option<&IndexPart>,
|
||||
) -> anyhow::Result<()> {
|
||||
info!("starting");
|
||||
trace!("starting");
|
||||
let remote_client = self
|
||||
.remote_client
|
||||
.as_ref()
|
||||
@@ -1815,7 +1822,7 @@ impl Timeline {
|
||||
let has_local_layers = !local_layers.is_empty();
|
||||
let local_only_layers = match index_part {
|
||||
Some(index_part) => {
|
||||
info!(
|
||||
debug!(
|
||||
"initializing upload queue from remote index with {} layer files",
|
||||
index_part.timeline_layers.len()
|
||||
);
|
||||
@@ -1824,7 +1831,7 @@ impl Timeline {
|
||||
.await?
|
||||
}
|
||||
None => {
|
||||
info!("initializing upload queue as empty");
|
||||
debug!("initializing upload queue as empty");
|
||||
remote_client.init_upload_queue_for_empty_remote(up_to_date_metadata)?;
|
||||
local_layers
|
||||
}
|
||||
@@ -1842,7 +1849,7 @@ impl Timeline {
|
||||
.metadata()
|
||||
.with_context(|| format!("failed to get file {layer_path:?} metadata"))?
|
||||
.len();
|
||||
info!("scheduling {layer_path:?} for upload");
|
||||
debug!("scheduling {layer_path:?} for upload");
|
||||
remote_client
|
||||
.schedule_layer_file_upload(layer_name, &LayerFileMetadata::new(layer_size))?;
|
||||
}
|
||||
@@ -1865,7 +1872,7 @@ impl Timeline {
|
||||
// Local timeline has a metadata file, remote one too, both have no layers to sync.
|
||||
}
|
||||
|
||||
info!("Done");
|
||||
trace!("Done");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -1887,7 +1894,7 @@ impl Timeline {
|
||||
.get()
|
||||
.is_none());
|
||||
|
||||
info!(
|
||||
debug!(
|
||||
"spawning logical size computation from context of task kind {:?}",
|
||||
ctx.task_kind()
|
||||
);
|
||||
@@ -2081,6 +2088,7 @@ impl Timeline {
|
||||
///
|
||||
/// NOTE: counted incrementally, includes ancestors. This can be a slow operation,
|
||||
/// especially if we need to download remote layers.
|
||||
#[instrument(skip_all)]
|
||||
pub async fn calculate_logical_size(
|
||||
&self,
|
||||
up_to_lsn: Lsn,
|
||||
@@ -2088,7 +2096,7 @@ impl Timeline {
|
||||
cancel: CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<u64, CalculateLogicalSizeError> {
|
||||
info!(
|
||||
debug!(
|
||||
"Calculating logical size for timeline {} at {}",
|
||||
self.timeline_id, up_to_lsn
|
||||
);
|
||||
@@ -2130,8 +2138,8 @@ impl Timeline {
|
||||
let logical_size = self
|
||||
.get_current_logical_size_non_incremental(up_to_lsn, cancel, ctx)
|
||||
.await?;
|
||||
debug!("calculated logical size: {logical_size}");
|
||||
timer.stop_and_record();
|
||||
info!("calculated logical size: {logical_size}");
|
||||
Ok(logical_size)
|
||||
}
|
||||
|
||||
@@ -2643,11 +2651,11 @@ impl Timeline {
|
||||
mut layer_flush_start_rx: tokio::sync::watch::Receiver<u64>,
|
||||
ctx: &RequestContext,
|
||||
) {
|
||||
info!("started flush loop");
|
||||
debug!("started flush loop");
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = task_mgr::shutdown_watcher() => {
|
||||
info!("shutting down layer flush task");
|
||||
debug!("shutting down layer flush task");
|
||||
break;
|
||||
},
|
||||
_ = layer_flush_start_rx.changed() => {}
|
||||
@@ -2910,6 +2918,7 @@ impl Timeline {
|
||||
Ok((new_delta_filename, LayerFileMetadata::new(sz)))
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn repartition(
|
||||
&self,
|
||||
lsn: Lsn,
|
||||
@@ -3015,6 +3024,7 @@ impl Timeline {
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn create_image_layers(
|
||||
&self,
|
||||
partitioning: &KeyPartitioning,
|
||||
@@ -3238,7 +3248,7 @@ impl Timeline {
|
||||
let remotes = deltas_to_compact
|
||||
.iter()
|
||||
.filter(|l| l.is_remote_layer())
|
||||
.inspect(|l| info!("compact requires download of {}", l.filename().file_name()))
|
||||
.inspect(|l| debug!("compact requires download of {}", l.filename().file_name()))
|
||||
.map(|l| {
|
||||
l.clone()
|
||||
.downcast_remote_layer()
|
||||
@@ -3262,7 +3272,7 @@ impl Timeline {
|
||||
);
|
||||
|
||||
for l in deltas_to_compact.iter() {
|
||||
info!("compact includes {}", l.filename().file_name());
|
||||
debug!("compact includes {}", l.filename().file_name());
|
||||
}
|
||||
|
||||
// We don't need the original list of layers anymore. Drop it so that
|
||||
@@ -3790,7 +3800,7 @@ impl Timeline {
|
||||
write_guard.store_and_unlock(new_gc_cutoff).wait();
|
||||
}
|
||||
|
||||
info!("GC starting");
|
||||
debug!("GC starting");
|
||||
|
||||
debug!("retain_lsns: {:?}", retain_lsns);
|
||||
|
||||
|
||||
@@ -22,7 +22,7 @@ use std::{
|
||||
|
||||
use tokio::time::Instant;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, error, info, info_span, instrument, warn, Instrument};
|
||||
use tracing::{debug, error, info, info_span, instrument, trace, warn, Instrument};
|
||||
|
||||
use crate::{
|
||||
context::{DownloadBehavior, RequestContext},
|
||||
@@ -58,7 +58,7 @@ impl Timeline {
|
||||
false,
|
||||
async move {
|
||||
self_clone.eviction_task(task_mgr::shutdown_token()).await;
|
||||
info!("eviction task finishing");
|
||||
debug!("eviction task finishing");
|
||||
Ok(())
|
||||
},
|
||||
);
|
||||
@@ -74,7 +74,7 @@ impl Timeline {
|
||||
EvictionPolicy::NoEviction => Duration::from_secs(10),
|
||||
};
|
||||
if random_init_delay(period, &cancel).await.is_err() {
|
||||
info!("shutting down");
|
||||
trace!("shutting down");
|
||||
return;
|
||||
}
|
||||
}
|
||||
@@ -89,7 +89,7 @@ impl Timeline {
|
||||
ControlFlow::Continue(sleep_until) => {
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => {
|
||||
info!("shutting down");
|
||||
trace!("shutting down");
|
||||
break;
|
||||
}
|
||||
_ = tokio::time::sleep_until(sleep_until) => { }
|
||||
@@ -348,7 +348,6 @@ impl Timeline {
|
||||
cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.instrument(info_span!("calculate_logical_size"))
|
||||
.await;
|
||||
|
||||
match &size {
|
||||
|
||||
@@ -85,7 +85,7 @@ impl WalReceiver {
|
||||
&format!("walreceiver for timeline {tenant_id}/{timeline_id}"),
|
||||
false,
|
||||
async move {
|
||||
info!("WAL receiver manager started, connecting to broker");
|
||||
debug!("WAL receiver manager started, connecting to broker");
|
||||
let mut connection_manager_state = ConnectionManagerState::new(
|
||||
timeline,
|
||||
conf,
|
||||
@@ -93,7 +93,7 @@ impl WalReceiver {
|
||||
loop {
|
||||
select! {
|
||||
_ = task_mgr::shutdown_watcher() => {
|
||||
info!("WAL receiver shutdown requested, shutting down");
|
||||
trace!("WAL receiver shutdown requested, shutting down");
|
||||
break;
|
||||
},
|
||||
loop_step_result = connection_manager_loop_step(
|
||||
@@ -104,7 +104,7 @@ impl WalReceiver {
|
||||
) => match loop_step_result {
|
||||
ControlFlow::Continue(()) => continue,
|
||||
ControlFlow::Break(()) => {
|
||||
info!("Connection manager loop ended, shutting down");
|
||||
trace!("Connection manager loop ended, shutting down");
|
||||
break;
|
||||
}
|
||||
},
|
||||
@@ -115,7 +115,11 @@ impl WalReceiver {
|
||||
*loop_status.write().unwrap() = None;
|
||||
Ok(())
|
||||
}
|
||||
.instrument(info_span!(parent: None, "wal_connection_manager", tenant = %tenant_id, timeline = %timeline_id))
|
||||
.instrument({
|
||||
let span = info_span!(parent: None, "wal_connection_manager", tenant = %tenant_id, timeline = %timeline_id);
|
||||
span.follows_from(Span::current());
|
||||
span
|
||||
})
|
||||
);
|
||||
|
||||
Self {
|
||||
@@ -214,7 +218,7 @@ impl<E: Clone> TaskHandle<E> {
|
||||
// So, tone them down to info-level.
|
||||
//
|
||||
// XXX: rewrite this module to eliminate the race condition.
|
||||
info!("sender is dropped while join handle is still alive");
|
||||
debug!("sender is dropped while join handle is still alive");
|
||||
}
|
||||
|
||||
let res = jh
|
||||
|
||||
@@ -56,7 +56,7 @@ pub(super) async fn connection_manager_loop_step(
|
||||
{
|
||||
Ok(()) => {}
|
||||
Err(_) => {
|
||||
info!("Timeline dropped state updates sender before becoming active, stopping wal connection manager loop");
|
||||
debug!("Timeline dropped state updates sender before becoming active, stopping wal connection manager loop");
|
||||
return ControlFlow::Break(());
|
||||
}
|
||||
}
|
||||
@@ -79,7 +79,7 @@ pub(super) async fn connection_manager_loop_step(
|
||||
// with other streams on this client (other connection managers). When
|
||||
// object goes out of scope, stream finishes in drop() automatically.
|
||||
let mut broker_subscription = subscribe_for_timeline_updates(broker_client, id).await;
|
||||
info!("Subscribed for broker timeline updates");
|
||||
debug!("Subscribed for broker timeline updates");
|
||||
|
||||
loop {
|
||||
let time_until_next_retry = connection_manager_state.time_until_next_retry();
|
||||
@@ -151,7 +151,7 @@ pub(super) async fn connection_manager_loop_step(
|
||||
// we're already active as walreceiver, no need to reactivate
|
||||
TimelineState::Active => continue,
|
||||
TimelineState::Broken | TimelineState::Stopping => {
|
||||
info!("timeline entered terminal state {new_state:?}, stopping wal connection manager loop");
|
||||
debug!("timeline entered terminal state {new_state:?}, stopping wal connection manager loop");
|
||||
return ControlFlow::Break(());
|
||||
}
|
||||
TimelineState::Loading => {
|
||||
@@ -165,11 +165,11 @@ pub(super) async fn connection_manager_loop_step(
|
||||
}
|
||||
} => match new_event {
|
||||
ControlFlow::Continue(new_state) => {
|
||||
info!("observed timeline state change, new state is {new_state:?}");
|
||||
debug!("observed timeline state change, new state is {new_state:?}");
|
||||
return ControlFlow::Continue(());
|
||||
}
|
||||
ControlFlow::Break(()) => {
|
||||
info!("Timeline dropped state updates sender, stopping wal connection manager loop");
|
||||
debug!("Timeline dropped state updates sender, stopping wal connection manager loop");
|
||||
return ControlFlow::Break(());
|
||||
}
|
||||
},
|
||||
@@ -470,7 +470,7 @@ impl ConnectionManagerState {
|
||||
|
||||
if let Some(next) = &retry.next_retry_at {
|
||||
if next > &now {
|
||||
info!(
|
||||
debug!(
|
||||
"Next connection retry to {:?} is at {}",
|
||||
wal_connection.sk_id, next
|
||||
);
|
||||
@@ -515,7 +515,7 @@ impl ConnectionManagerState {
|
||||
);
|
||||
|
||||
if old_entry.is_none() {
|
||||
info!("New SK node was added: {new_safekeeper_id}");
|
||||
debug!("New SK node was added: {new_safekeeper_id}");
|
||||
WALRECEIVER_CANDIDATES_ADDED.inc();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -119,6 +119,7 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
ctx.download_behavior(),
|
||||
);
|
||||
let connection_cancellation = cancellation.clone();
|
||||
use tracing::Instrument;
|
||||
task_mgr::spawn(
|
||||
WALRECEIVER_RUNTIME.handle(),
|
||||
TaskKind::WalReceiverConnectionPoller,
|
||||
@@ -140,7 +141,8 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
_ = connection_cancellation.cancelled() => info!("Connection cancelled"),
|
||||
}
|
||||
Ok(())
|
||||
},
|
||||
}
|
||||
.instrument(tracing::info_span!("walreceiver")),
|
||||
);
|
||||
|
||||
// Immediately increment the gauge, then create a job to decrement it on task exit.
|
||||
@@ -153,7 +155,7 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
}
|
||||
|
||||
let identify = identify_system(&mut replication_client).await?;
|
||||
info!("{identify:?}");
|
||||
debug!("{identify:?}");
|
||||
|
||||
let end_of_wal = Lsn::from(u64::from(identify.xlogpos));
|
||||
let mut caught_up = false;
|
||||
|
||||
Reference in New Issue
Block a user