diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index f4a231f217..ef616c0a39 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -15,9 +15,9 @@ use metrics::launch_timestamp::{set_launch_timestamp_metric, LaunchTimestamp}; use pageserver::control_plane_client::ControlPlaneClient; use pageserver::disk_usage_eviction_task::{self, launch_disk_usage_global_eviction_task}; use pageserver::metrics::{STARTUP_DURATION, STARTUP_IS_LOADING}; +use pageserver::task_mgr::WALRECEIVER_RUNTIME; use pageserver::tenant::{secondary, TenantSharedResources}; use remote_storage::GenericRemoteStorage; -use tokio::signal::unix::SignalKind; use tokio::time::Instant; use tracing::*; @@ -28,7 +28,7 @@ use pageserver::{ deletion_queue::DeletionQueue, http, page_cache, page_service, task_mgr, task_mgr::TaskKind, - task_mgr::THE_RUNTIME, + task_mgr::{BACKGROUND_RUNTIME, COMPUTE_REQUEST_RUNTIME, MGMT_REQUEST_RUNTIME}, tenant::mgr, virtual_file, }; @@ -323,7 +323,7 @@ fn start_pageserver( // Launch broker client // The storage_broker::connect call needs to happen inside a tokio runtime thread. - let broker_client = THE_RUNTIME + let broker_client = WALRECEIVER_RUNTIME .block_on(async { // Note: we do not attempt connecting here (but validate endpoints sanity). storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval) @@ -391,7 +391,7 @@ fn start_pageserver( conf, ); if let Some(deletion_workers) = deletion_workers { - deletion_workers.spawn_with(THE_RUNTIME.handle()); + deletion_workers.spawn_with(BACKGROUND_RUNTIME.handle()); } // Up to this point no significant I/O has been done: this should have been fast. Record @@ -423,7 +423,7 @@ fn start_pageserver( // Scan the local 'tenants/' directory and start loading the tenants let deletion_queue_client = deletion_queue.new_client(); - let tenant_manager = THE_RUNTIME.block_on(mgr::init_tenant_mgr( + let tenant_manager = BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr( conf, TenantSharedResources { broker_client: broker_client.clone(), @@ -435,7 +435,7 @@ fn start_pageserver( ))?; let tenant_manager = Arc::new(tenant_manager); - THE_RUNTIME.spawn({ + BACKGROUND_RUNTIME.spawn({ let shutdown_pageserver = shutdown_pageserver.clone(); let drive_init = async move { // NOTE: unlike many futures in pageserver, this one is cancellation-safe @@ -545,7 +545,7 @@ fn start_pageserver( // Start up the service to handle HTTP mgmt API request. We created the // listener earlier already. { - let _rt_guard = THE_RUNTIME.enter(); + let _rt_guard = MGMT_REQUEST_RUNTIME.enter(); let router_state = Arc::new( http::routes::State::new( @@ -569,6 +569,7 @@ fn start_pageserver( .with_graceful_shutdown(task_mgr::shutdown_watcher()); task_mgr::spawn( + MGMT_REQUEST_RUNTIME.handle(), TaskKind::HttpEndpointListener, None, None, @@ -593,6 +594,7 @@ fn start_pageserver( let local_disk_storage = conf.workdir.join("last_consumption_metrics.json"); task_mgr::spawn( + crate::BACKGROUND_RUNTIME.handle(), TaskKind::MetricsCollection, None, None, @@ -641,6 +643,7 @@ fn start_pageserver( DownloadBehavior::Error, ); task_mgr::spawn( + COMPUTE_REQUEST_RUNTIME.handle(), TaskKind::LibpqEndpointListener, None, None, @@ -664,37 +667,42 @@ fn start_pageserver( let mut shutdown_pageserver = Some(shutdown_pageserver.drop_guard()); // All started up! Now just sit and wait for shutdown signal. - { - THE_RUNTIME.block_on(async move { - let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt()).unwrap(); - let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate()).unwrap(); - let mut sigquit = tokio::signal::unix::signal(SignalKind::quit()).unwrap(); - let signal = tokio::select! { - _ = sigquit.recv() => { - info!("Got signal SIGQUIT. Terminating in immediate shutdown mode",); - std::process::exit(111); - } - _ = sigint.recv() => { "SIGINT" }, - _ = sigterm.recv() => { "SIGTERM" }, - }; + use signal_hook::consts::*; + let signal_handler = BACKGROUND_RUNTIME.spawn_blocking(move || { + let mut signals = + signal_hook::iterator::Signals::new([SIGINT, SIGTERM, SIGQUIT]).unwrap(); + return signals + .forever() + .next() + .expect("forever() never returns None unless explicitly closed"); + }); + let signal = BACKGROUND_RUNTIME + .block_on(signal_handler) + .expect("join error"); + match signal { + SIGQUIT => { + info!("Got signal {signal}. Terminating in immediate shutdown mode",); + std::process::exit(111); + } + SIGINT | SIGTERM => { + info!("Got signal {signal}. Terminating gracefully in fast shutdown mode",); - info!("Got signal {signal}. Terminating gracefully in fast shutdown mode",); - - // This cancels the `shutdown_pageserver` cancellation tree. - // Right now that tree doesn't reach very far, and `task_mgr` is used instead. - // The plan is to change that over time. - shutdown_pageserver.take(); - let bg_remote_storage = remote_storage.clone(); - let bg_deletion_queue = deletion_queue.clone(); - pageserver::shutdown_pageserver( - &tenant_manager, - bg_remote_storage.map(|_| bg_deletion_queue), - 0, - ) - .await; - unreachable!() - }) + // This cancels the `shutdown_pageserver` cancellation tree. + // Right now that tree doesn't reach very far, and `task_mgr` is used instead. + // The plan is to change that over time. + shutdown_pageserver.take(); + let bg_remote_storage = remote_storage.clone(); + let bg_deletion_queue = deletion_queue.clone(); + BACKGROUND_RUNTIME.block_on(pageserver::shutdown_pageserver( + &tenant_manager, + bg_remote_storage.map(|_| bg_deletion_queue), + 0, + )); + unreachable!() + } + _ => unreachable!(), + } } } diff --git a/pageserver/src/consumption_metrics.rs b/pageserver/src/consumption_metrics.rs index c82be8c581..3429e3a0a6 100644 --- a/pageserver/src/consumption_metrics.rs +++ b/pageserver/src/consumption_metrics.rs @@ -1,7 +1,7 @@ //! Periodically collect consumption metrics for all active tenants //! and push them to a HTTP endpoint. use crate::context::{DownloadBehavior, RequestContext}; -use crate::task_mgr::{self, TaskKind}; +use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME}; use crate::tenant::tasks::BackgroundLoopKind; use crate::tenant::{mgr, LogicalSizeCalculationCause, PageReconstructError, Tenant}; use camino::Utf8PathBuf; @@ -61,6 +61,7 @@ pub async fn collect_metrics( let worker_ctx = ctx.detached_child(TaskKind::CalculateSyntheticSize, DownloadBehavior::Download); task_mgr::spawn( + BACKGROUND_RUNTIME.handle(), TaskKind::CalculateSyntheticSize, None, None, diff --git a/pageserver/src/control_plane_client.rs b/pageserver/src/control_plane_client.rs index 55d80c2966..42c800822b 100644 --- a/pageserver/src/control_plane_client.rs +++ b/pageserver/src/control_plane_client.rs @@ -173,6 +173,8 @@ impl ControlPlaneGenerationsApi for ControlPlaneClient { register, }; + fail::fail_point!("control-plane-client-re-attach"); + let response: ReAttachResponse = self.retry_http_forever(&re_attach_path, request).await?; tracing::info!( "Received re-attach response with {} tenants", @@ -208,7 +210,7 @@ impl ControlPlaneGenerationsApi for ControlPlaneClient { .collect(), }; - crate::tenant::pausable_failpoint!("control-plane-client-validate"); + fail::fail_point!("control-plane-client-validate"); let response: ValidateResponse = self.retry_http_forever(&re_attach_path, request).await?; diff --git a/pageserver/src/disk_usage_eviction_task.rs b/pageserver/src/disk_usage_eviction_task.rs index 6b68acd1c7..92c1475aef 100644 --- a/pageserver/src/disk_usage_eviction_task.rs +++ b/pageserver/src/disk_usage_eviction_task.rs @@ -59,7 +59,7 @@ use utils::{completion, id::TimelineId}; use crate::{ config::PageServerConf, metrics::disk_usage_based_eviction::METRICS, - task_mgr::{self, TaskKind}, + task_mgr::{self, TaskKind, BACKGROUND_RUNTIME}, tenant::{ self, mgr::TenantManager, @@ -202,6 +202,7 @@ pub fn launch_disk_usage_global_eviction_task( info!("launching disk usage based eviction task"); task_mgr::spawn( + BACKGROUND_RUNTIME.handle(), TaskKind::DiskUsageEviction, None, None, diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index fa1a0f535b..f3ceb7d3e6 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -180,6 +180,7 @@ pub async fn libpq_listener_main( // only deal with a particular timeline, but we don't know which one // yet. task_mgr::spawn( + &tokio::runtime::Handle::current(), TaskKind::PageRequestHandler, None, None, diff --git a/pageserver/src/task_mgr.rs b/pageserver/src/task_mgr.rs index 2d97389982..69e163effa 100644 --- a/pageserver/src/task_mgr.rs +++ b/pageserver/src/task_mgr.rs @@ -98,22 +98,42 @@ use utils::id::TimelineId; // other operations, if the upload tasks e.g. get blocked on locks. It shouldn't // happen, but still. // - -/// The single tokio runtime used by all pageserver code. -/// In the past, we had multiple runtimes, and in the future we should weed out -/// remaining references to this global field and rely on ambient runtime instead, -/// i.e., use `tokio::spawn` instead of `THE_RUNTIME.spawn()`, etc. -pub static THE_RUNTIME: Lazy = Lazy::new(|| { +pub static COMPUTE_REQUEST_RUNTIME: Lazy = Lazy::new(|| { tokio::runtime::Builder::new_multi_thread() + .thread_name("compute request worker") + .enable_all() + .build() + .expect("Failed to create compute request runtime") +}); + +pub static MGMT_REQUEST_RUNTIME: Lazy = Lazy::new(|| { + tokio::runtime::Builder::new_multi_thread() + .thread_name("mgmt request worker") + .enable_all() + .build() + .expect("Failed to create mgmt request runtime") +}); + +pub static WALRECEIVER_RUNTIME: Lazy = Lazy::new(|| { + tokio::runtime::Builder::new_multi_thread() + .thread_name("walreceiver worker") + .enable_all() + .build() + .expect("Failed to create walreceiver runtime") +}); + +pub static BACKGROUND_RUNTIME: Lazy = Lazy::new(|| { + tokio::runtime::Builder::new_multi_thread() + .thread_name("background op worker") // if you change the number of worker threads please change the constant below .enable_all() .build() .expect("Failed to create background op runtime") }); -pub(crate) static THE_RUNTIME_WORKER_THREADS: Lazy = Lazy::new(|| { +pub(crate) static BACKGROUND_RUNTIME_WORKER_THREADS: Lazy = Lazy::new(|| { // force init and thus panics - let _ = THE_RUNTIME.handle(); + let _ = BACKGROUND_RUNTIME.handle(); // replicates tokio-1.28.1::loom::sys::num_cpus which is not available publicly // tokio would had already panicked for parsing errors or NotUnicode // @@ -305,6 +325,7 @@ struct PageServerTask { /// Note: if shutdown_process_on_error is set to true failure /// of the task will lead to shutdown of entire process pub fn spawn( + runtime: &tokio::runtime::Handle, kind: TaskKind, tenant_shard_id: Option, timeline_id: Option, @@ -333,7 +354,7 @@ where let task_name = name.to_string(); let task_cloned = Arc::clone(&task); - let join_handle = THE_RUNTIME.spawn(task_wrapper( + let join_handle = runtime.spawn(task_wrapper( task_name, task_id, task_cloned, diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index b923e473ce..dcf9b1a605 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -144,7 +144,6 @@ macro_rules! pausable_failpoint { } }; } -pub(crate) use pausable_failpoint; pub mod blob_io; pub mod block_io; @@ -662,6 +661,7 @@ impl Tenant { let tenant_clone = Arc::clone(&tenant); let ctx = ctx.detached_child(TaskKind::Attach, DownloadBehavior::Warn); task_mgr::spawn( + &tokio::runtime::Handle::current(), TaskKind::Attach, Some(tenant_shard_id), None, diff --git a/pageserver/src/tenant/delete.rs b/pageserver/src/tenant/delete.rs index 3866136dbd..7d37873a67 100644 --- a/pageserver/src/tenant/delete.rs +++ b/pageserver/src/tenant/delete.rs @@ -482,6 +482,7 @@ impl DeleteTenantFlow { let tenant_shard_id = tenant.tenant_shard_id; task_mgr::spawn( + task_mgr::BACKGROUND_RUNTIME.handle(), TaskKind::TimelineDeletionWorker, Some(tenant_shard_id), None, diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 34ca43a173..97a505ded9 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -1850,6 +1850,7 @@ impl TenantManager { let task_tenant_id = None; task_mgr::spawn( + task_mgr::BACKGROUND_RUNTIME.handle(), TaskKind::MgmtRequest, task_tenant_id, None, @@ -2815,12 +2816,15 @@ pub(crate) fn immediate_gc( // TODO: spawning is redundant now, need to hold the gate task_mgr::spawn( + &tokio::runtime::Handle::current(), TaskKind::GarbageCollector, Some(tenant_shard_id), Some(timeline_id), &format!("timeline_gc_handler garbage collection run for tenant {tenant_shard_id} timeline {timeline_id}"), false, async move { + fail::fail_point!("immediate_gc_task_pre"); + #[allow(unused_mut)] let mut result = tenant .gc_iteration(Some(timeline_id), gc_horizon, pitr, &cancel, &ctx) diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index b4b3243d11..cbd942d706 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -223,6 +223,7 @@ use crate::{ config::PageServerConf, task_mgr, task_mgr::TaskKind, + task_mgr::BACKGROUND_RUNTIME, tenant::metadata::TimelineMetadata, tenant::upload_queue::{ UploadOp, UploadQueue, UploadQueueInitialized, UploadQueueStopped, UploadTask, @@ -297,6 +298,8 @@ pub enum PersistIndexPartWithDeletedFlagError { pub struct RemoteTimelineClient { conf: &'static PageServerConf, + runtime: tokio::runtime::Handle, + tenant_shard_id: TenantShardId, timeline_id: TimelineId, generation: Generation, @@ -329,6 +332,12 @@ impl RemoteTimelineClient { ) -> RemoteTimelineClient { RemoteTimelineClient { conf, + runtime: if cfg!(test) { + // remote_timeline_client.rs tests rely on current-thread runtime + tokio::runtime::Handle::current() + } else { + BACKGROUND_RUNTIME.handle().clone() + }, tenant_shard_id, timeline_id, generation, @@ -1264,6 +1273,7 @@ impl RemoteTimelineClient { let tenant_shard_id = self.tenant_shard_id; let timeline_id = self.timeline_id; task_mgr::spawn( + &self.runtime, TaskKind::RemoteUploadTask, Some(self.tenant_shard_id), Some(self.timeline_id), @@ -1857,6 +1867,7 @@ mod tests { fn build_client(&self, generation: Generation) -> Arc { Arc::new(RemoteTimelineClient { conf: self.harness.conf, + runtime: tokio::runtime::Handle::current(), tenant_shard_id: self.harness.tenant_shard_id, timeline_id: TIMELINE_ID, generation, diff --git a/pageserver/src/tenant/secondary.rs b/pageserver/src/tenant/secondary.rs index b0babb1308..19f36c722e 100644 --- a/pageserver/src/tenant/secondary.rs +++ b/pageserver/src/tenant/secondary.rs @@ -8,7 +8,7 @@ use std::{sync::Arc, time::SystemTime}; use crate::{ config::PageServerConf, disk_usage_eviction_task::DiskUsageEvictionInfo, - task_mgr::{self, TaskKind}, + task_mgr::{self, TaskKind, BACKGROUND_RUNTIME}, virtual_file::MaybeFatalIo, }; @@ -317,6 +317,7 @@ pub fn spawn_tasks( tokio::sync::mpsc::channel::>(16); task_mgr::spawn( + BACKGROUND_RUNTIME.handle(), TaskKind::SecondaryDownloads, None, None, @@ -337,6 +338,7 @@ pub fn spawn_tasks( ); task_mgr::spawn( + BACKGROUND_RUNTIME.handle(), TaskKind::SecondaryUploads, None, None, diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs index e101a40da4..8ba37b5a86 100644 --- a/pageserver/src/tenant/storage_layer/layer.rs +++ b/pageserver/src/tenant/storage_layer/layer.rs @@ -1447,7 +1447,7 @@ impl LayerInner { #[cfg(test)] tokio::task::spawn(fut); #[cfg(not(test))] - crate::task_mgr::THE_RUNTIME.spawn(fut); + crate::task_mgr::BACKGROUND_RUNTIME.spawn(fut); } /// Needed to use entered runtime in tests, but otherwise use BACKGROUND_RUNTIME. @@ -1458,7 +1458,7 @@ impl LayerInner { #[cfg(test)] tokio::task::spawn_blocking(f); #[cfg(not(test))] - crate::task_mgr::THE_RUNTIME.spawn_blocking(f); + crate::task_mgr::BACKGROUND_RUNTIME.spawn_blocking(f); } } diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index db32223a60..e4f5f75132 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -8,7 +8,7 @@ use std::time::{Duration, Instant}; use crate::context::{DownloadBehavior, RequestContext}; use crate::metrics::TENANT_TASK_EVENTS; use crate::task_mgr; -use crate::task_mgr::TaskKind; +use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME}; use crate::tenant::throttle::Stats; use crate::tenant::timeline::CompactionError; use crate::tenant::{Tenant, TenantState}; @@ -18,7 +18,7 @@ use utils::{backoff, completion}; static CONCURRENT_BACKGROUND_TASKS: once_cell::sync::Lazy = once_cell::sync::Lazy::new(|| { - let total_threads = *crate::task_mgr::THE_RUNTIME_WORKER_THREADS; + let total_threads = *task_mgr::BACKGROUND_RUNTIME_WORKER_THREADS; let permits = usize::max( 1, // while a lot of the work is done on spawn_blocking, we still do @@ -85,6 +85,7 @@ pub fn start_background_loops( ) { let tenant_shard_id = tenant.tenant_shard_id; task_mgr::spawn( + BACKGROUND_RUNTIME.handle(), TaskKind::Compaction, Some(tenant_shard_id), None, @@ -108,6 +109,7 @@ pub fn start_background_loops( }, ); task_mgr::spawn( + BACKGROUND_RUNTIME.handle(), TaskKind::GarbageCollector, Some(tenant_shard_id), None, diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 6c6bb4b788..0b8cdac1cc 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1714,6 +1714,7 @@ impl Timeline { initdb_optimization_count: 0, }; task_mgr::spawn( + task_mgr::BACKGROUND_RUNTIME.handle(), task_mgr::TaskKind::LayerFlushTask, Some(self.tenant_shard_id), Some(self.timeline_id), @@ -2076,6 +2077,7 @@ impl Timeline { DownloadBehavior::Download, ); task_mgr::spawn( + task_mgr::BACKGROUND_RUNTIME.handle(), task_mgr::TaskKind::InitialLogicalSizeCalculation, Some(self.tenant_shard_id), Some(self.timeline_id), @@ -2253,6 +2255,7 @@ impl Timeline { DownloadBehavior::Download, ); task_mgr::spawn( + task_mgr::BACKGROUND_RUNTIME.handle(), task_mgr::TaskKind::OndemandLogicalSizeCalculation, Some(self.tenant_shard_id), Some(self.timeline_id), @@ -3828,7 +3831,7 @@ impl Timeline { }; let timer = self.metrics.garbage_collect_histo.start_timer(); - pausable_failpoint!("before-timeline-gc"); + fail_point!("before-timeline-gc"); // Is the timeline being deleted? if self.is_stopping() { @@ -4139,6 +4142,7 @@ impl Timeline { let self_clone = Arc::clone(&self); let task_id = task_mgr::spawn( + task_mgr::BACKGROUND_RUNTIME.handle(), task_mgr::TaskKind::DownloadAllRemoteLayers, Some(self.tenant_shard_id), Some(self.timeline_id), diff --git a/pageserver/src/tenant/timeline/delete.rs b/pageserver/src/tenant/timeline/delete.rs index e9afbfd8ba..ab0a88c764 100644 --- a/pageserver/src/tenant/timeline/delete.rs +++ b/pageserver/src/tenant/timeline/delete.rs @@ -429,6 +429,7 @@ impl DeleteTimelineFlow { let timeline_id = timeline.timeline_id; task_mgr::spawn( + task_mgr::BACKGROUND_RUNTIME.handle(), TaskKind::TimelineDeletionWorker, Some(tenant_shard_id), Some(timeline_id), diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index f84a4b0dac..dd769d4121 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -28,7 +28,7 @@ use tracing::{debug, error, info, info_span, instrument, warn, Instrument}; use crate::{ context::{DownloadBehavior, RequestContext}, pgdatadir_mapping::CollectKeySpaceError, - task_mgr::{self, TaskKind}, + task_mgr::{self, TaskKind, BACKGROUND_RUNTIME}, tenant::{ tasks::BackgroundLoopKind, timeline::EvictionError, LogicalSizeCalculationCause, Tenant, }, @@ -56,6 +56,7 @@ impl Timeline { let self_clone = Arc::clone(self); let background_tasks_can_start = background_tasks_can_start.cloned(); task_mgr::spawn( + BACKGROUND_RUNTIME.handle(), TaskKind::Eviction, Some(self.tenant_shard_id), Some(self.timeline_id), diff --git a/pageserver/src/tenant/timeline/walreceiver.rs b/pageserver/src/tenant/timeline/walreceiver.rs index 3592dda8d7..2fab6722b8 100644 --- a/pageserver/src/tenant/timeline/walreceiver.rs +++ b/pageserver/src/tenant/timeline/walreceiver.rs @@ -24,7 +24,7 @@ mod connection_manager; mod walreceiver_connection; use crate::context::{DownloadBehavior, RequestContext}; -use crate::task_mgr::{self, TaskKind}; +use crate::task_mgr::{self, TaskKind, WALRECEIVER_RUNTIME}; use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id; use crate::tenant::timeline::walreceiver::connection_manager::{ connection_manager_loop_step, ConnectionManagerState, @@ -82,6 +82,7 @@ impl WalReceiver { let loop_status = Arc::new(std::sync::RwLock::new(None)); let manager_status = Arc::clone(&loop_status); task_mgr::spawn( + WALRECEIVER_RUNTIME.handle(), TaskKind::WalReceiverManager, Some(timeline.tenant_shard_id), Some(timeline_id), @@ -180,7 +181,7 @@ impl TaskHandle { let (events_sender, events_receiver) = watch::channel(TaskStateUpdate::Started); let cancellation_clone = cancellation.clone(); - let join_handle = tokio::spawn(async move { + let join_handle = WALRECEIVER_RUNTIME.spawn(async move { events_sender.send(TaskStateUpdate::Started).ok(); task(events_sender, cancellation_clone).await // events_sender is dropped at some point during the .await above. diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index cf87cc6ce0..d9f780cfd1 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -11,6 +11,7 @@ use std::{ use anyhow::{anyhow, Context}; use bytes::BytesMut; use chrono::{NaiveDateTime, Utc}; +use fail::fail_point; use futures::StreamExt; use postgres::{error::SqlState, SimpleQueryMessage, SimpleQueryRow}; use postgres_ffi::WAL_SEGMENT_SIZE; @@ -26,7 +27,9 @@ use super::TaskStateUpdate; use crate::{ context::RequestContext, metrics::{LIVE_CONNECTIONS_COUNT, WALRECEIVER_STARTED_CONNECTIONS, WAL_INGEST}, - task_mgr::{self, TaskKind}, + task_mgr, + task_mgr::TaskKind, + task_mgr::WALRECEIVER_RUNTIME, tenant::{debug_assert_current_span_has_tenant_and_timeline_id, Timeline, WalReceiverInfo}, walingest::WalIngest, walrecord::DecodedWALRecord, @@ -160,6 +163,7 @@ pub(super) async fn handle_walreceiver_connection( ); let connection_cancellation = cancellation.clone(); task_mgr::spawn( + WALRECEIVER_RUNTIME.handle(), TaskKind::WalReceiverConnectionPoller, Some(timeline.tenant_shard_id), Some(timeline.timeline_id), @@ -325,17 +329,7 @@ pub(super) async fn handle_walreceiver_connection( filtered_records += 1; } - // don't simply use pausable_failpoint here because its spawn_blocking slows - // slows down the tests too much. - fail::fail_point!("walreceiver-after-ingest-blocking"); - if let Err(()) = (|| { - fail::fail_point!("walreceiver-after-ingest-pause-activate", |_| { - Err(()) - }); - Ok(()) - })() { - pausable_failpoint!("walreceiver-after-ingest-pause"); - } + fail_point!("walreceiver-after-ingest"); last_rec_lsn = lsn; diff --git a/test_runner/regress/test_backpressure.py b/test_runner/regress/test_backpressure.py index af17a2e89d..819912dd05 100644 --- a/test_runner/regress/test_backpressure.py +++ b/test_runner/regress/test_backpressure.py @@ -116,7 +116,7 @@ def test_backpressure_received_lsn_lag(neon_env_builder: NeonEnvBuilder): # Configure failpoint to slow down walreceiver ingest with closing(env.pageserver.connect()) as psconn: with psconn.cursor(cursor_factory=psycopg2.extras.DictCursor) as pscur: - pscur.execute("failpoints walreceiver-after-ingest-blocking=sleep(20)") + pscur.execute("failpoints walreceiver-after-ingest=sleep(20)") # FIXME # Wait for the check thread to start diff --git a/test_runner/regress/test_timeline_size.py b/test_runner/regress/test_timeline_size.py index efd257900d..628c484fbd 100644 --- a/test_runner/regress/test_timeline_size.py +++ b/test_runner/regress/test_timeline_size.py @@ -931,7 +931,7 @@ def test_timeline_logical_size_task_priority(neon_env_builder: NeonEnvBuilder): env.pageserver.stop() env.pageserver.start( extra_env_vars={ - "FAILPOINTS": "initial-size-calculation-permit-pause=pause;walreceiver-after-ingest-pause-activate=return(1);walreceiver-after-ingest-pause=pause" + "FAILPOINTS": "initial-size-calculation-permit-pause=pause;walreceiver-after-ingest=pause" } ) @@ -953,11 +953,7 @@ def test_timeline_logical_size_task_priority(neon_env_builder: NeonEnvBuilder): assert details["current_logical_size_is_accurate"] is True client.configure_failpoints( - [ - ("initial-size-calculation-permit-pause", "off"), - ("walreceiver-after-ingest-pause-activate", "off"), - ("walreceiver-after-ingest-pause", "off"), - ] + [("initial-size-calculation-permit-pause", "off"), ("walreceiver-after-ingest", "off")] ) @@ -987,7 +983,7 @@ def test_eager_attach_does_not_queue_up(neon_env_builder: NeonEnvBuilder): # pause at logical size calculation, also pause before walreceiver can give feedback so it will give priority to logical size calculation env.pageserver.start( extra_env_vars={ - "FAILPOINTS": "timeline-calculate-logical-size-pause=pause;walreceiver-after-ingest-pause-activate=return(1);walreceiver-after-ingest-pause=pause" + "FAILPOINTS": "timeline-calculate-logical-size-pause=pause;walreceiver-after-ingest=pause" } ) @@ -1033,11 +1029,7 @@ def test_eager_attach_does_not_queue_up(neon_env_builder: NeonEnvBuilder): other_is_attaching() client.configure_failpoints( - [ - ("timeline-calculate-logical-size-pause", "off"), - ("walreceiver-after-ingest-pause-activate", "off"), - ("walreceiver-after-ingest-pause", "off"), - ] + [("timeline-calculate-logical-size-pause", "off"), ("walreceiver-after-ingest", "off")] ) @@ -1067,7 +1059,7 @@ def test_lazy_attach_activation(neon_env_builder: NeonEnvBuilder, activation_met # pause at logical size calculation, also pause before walreceiver can give feedback so it will give priority to logical size calculation env.pageserver.start( extra_env_vars={ - "FAILPOINTS": "timeline-calculate-logical-size-pause=pause;walreceiver-after-ingest-pause-activate=return(1);walreceiver-after-ingest-pause=pause" + "FAILPOINTS": "timeline-calculate-logical-size-pause=pause;walreceiver-after-ingest=pause" } ) @@ -1119,11 +1111,3 @@ def test_lazy_attach_activation(neon_env_builder: NeonEnvBuilder, activation_met delete_lazy_activating(lazy_tenant, env.pageserver, expect_attaching=True) else: raise RuntimeError(activation_method) - - client.configure_failpoints( - [ - ("timeline-calculate-logical-size-pause", "off"), - ("walreceiver-after-ingest-pause-activate", "off"), - ("walreceiver-after-ingest-pause", "off"), - ] - )