mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-20 14:40:37 +00:00
only one runtime
This commit is contained in:
@@ -15,7 +15,6 @@ use metrics::launch_timestamp::{set_launch_timestamp_metric, LaunchTimestamp};
|
||||
use pageserver::control_plane_client::ControlPlaneClient;
|
||||
use pageserver::disk_usage_eviction_task::{self, launch_disk_usage_global_eviction_task};
|
||||
use pageserver::metrics::{STARTUP_DURATION, STARTUP_IS_LOADING};
|
||||
use pageserver::task_mgr::WALRECEIVER_RUNTIME;
|
||||
use pageserver::tenant::{secondary, TenantSharedResources};
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use tokio::time::Instant;
|
||||
@@ -28,7 +27,7 @@ use pageserver::{
|
||||
deletion_queue::DeletionQueue,
|
||||
http, page_cache, page_service, task_mgr,
|
||||
task_mgr::TaskKind,
|
||||
task_mgr::{BACKGROUND_RUNTIME, COMPUTE_REQUEST_RUNTIME, MGMT_REQUEST_RUNTIME},
|
||||
task_mgr::THE_RUNTIME,
|
||||
tenant::mgr,
|
||||
virtual_file,
|
||||
};
|
||||
@@ -323,7 +322,7 @@ fn start_pageserver(
|
||||
|
||||
// Launch broker client
|
||||
// The storage_broker::connect call needs to happen inside a tokio runtime thread.
|
||||
let broker_client = WALRECEIVER_RUNTIME
|
||||
let broker_client = THE_RUNTIME
|
||||
.block_on(async {
|
||||
// Note: we do not attempt connecting here (but validate endpoints sanity).
|
||||
storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)
|
||||
@@ -391,7 +390,7 @@ fn start_pageserver(
|
||||
conf,
|
||||
);
|
||||
if let Some(deletion_workers) = deletion_workers {
|
||||
deletion_workers.spawn_with(BACKGROUND_RUNTIME.handle());
|
||||
deletion_workers.spawn_with(THE_RUNTIME.handle());
|
||||
}
|
||||
|
||||
// Up to this point no significant I/O has been done: this should have been fast. Record
|
||||
@@ -423,7 +422,7 @@ fn start_pageserver(
|
||||
|
||||
// Scan the local 'tenants/' directory and start loading the tenants
|
||||
let deletion_queue_client = deletion_queue.new_client();
|
||||
let tenant_manager = BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(
|
||||
let tenant_manager = THE_RUNTIME.block_on(mgr::init_tenant_mgr(
|
||||
conf,
|
||||
TenantSharedResources {
|
||||
broker_client: broker_client.clone(),
|
||||
@@ -435,7 +434,7 @@ fn start_pageserver(
|
||||
))?;
|
||||
let tenant_manager = Arc::new(tenant_manager);
|
||||
|
||||
BACKGROUND_RUNTIME.spawn({
|
||||
THE_RUNTIME.spawn({
|
||||
let shutdown_pageserver = shutdown_pageserver.clone();
|
||||
let drive_init = async move {
|
||||
// NOTE: unlike many futures in pageserver, this one is cancellation-safe
|
||||
@@ -545,7 +544,7 @@ fn start_pageserver(
|
||||
// Start up the service to handle HTTP mgmt API request. We created the
|
||||
// listener earlier already.
|
||||
{
|
||||
let _rt_guard = MGMT_REQUEST_RUNTIME.enter();
|
||||
let _rt_guard = THE_RUNTIME.enter();
|
||||
|
||||
let router_state = Arc::new(
|
||||
http::routes::State::new(
|
||||
@@ -569,7 +568,6 @@ fn start_pageserver(
|
||||
.with_graceful_shutdown(task_mgr::shutdown_watcher());
|
||||
|
||||
task_mgr::spawn(
|
||||
MGMT_REQUEST_RUNTIME.handle(),
|
||||
TaskKind::HttpEndpointListener,
|
||||
None,
|
||||
None,
|
||||
@@ -594,7 +592,6 @@ fn start_pageserver(
|
||||
let local_disk_storage = conf.workdir.join("last_consumption_metrics.json");
|
||||
|
||||
task_mgr::spawn(
|
||||
crate::BACKGROUND_RUNTIME.handle(),
|
||||
TaskKind::MetricsCollection,
|
||||
None,
|
||||
None,
|
||||
@@ -642,7 +639,6 @@ fn start_pageserver(
|
||||
DownloadBehavior::Error,
|
||||
);
|
||||
task_mgr::spawn(
|
||||
COMPUTE_REQUEST_RUNTIME.handle(),
|
||||
TaskKind::LibpqEndpointListener,
|
||||
None,
|
||||
None,
|
||||
@@ -668,7 +664,7 @@ fn start_pageserver(
|
||||
// All started up! Now just sit and wait for shutdown signal.
|
||||
{
|
||||
use signal_hook::consts::*;
|
||||
let signal_handler = BACKGROUND_RUNTIME.spawn_blocking(move || {
|
||||
let signal_handler = THE_RUNTIME.spawn_blocking(move || {
|
||||
let mut signals =
|
||||
signal_hook::iterator::Signals::new([SIGINT, SIGTERM, SIGQUIT]).unwrap();
|
||||
return signals
|
||||
@@ -676,9 +672,7 @@ fn start_pageserver(
|
||||
.next()
|
||||
.expect("forever() never returns None unless explicitly closed");
|
||||
});
|
||||
let signal = BACKGROUND_RUNTIME
|
||||
.block_on(signal_handler)
|
||||
.expect("join error");
|
||||
let signal = THE_RUNTIME.block_on(signal_handler).expect("join error");
|
||||
match signal {
|
||||
SIGQUIT => {
|
||||
info!("Got signal {signal}. Terminating in immediate shutdown mode",);
|
||||
@@ -693,7 +687,7 @@ fn start_pageserver(
|
||||
shutdown_pageserver.take();
|
||||
let bg_remote_storage = remote_storage.clone();
|
||||
let bg_deletion_queue = deletion_queue.clone();
|
||||
BACKGROUND_RUNTIME.block_on(pageserver::shutdown_pageserver(
|
||||
THE_RUNTIME.block_on(pageserver::shutdown_pageserver(
|
||||
&tenant_manager,
|
||||
bg_remote_storage.map(|_| bg_deletion_queue),
|
||||
0,
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
//! Periodically collect consumption metrics for all active tenants
|
||||
//! and push them to a HTTP endpoint.
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME};
|
||||
use crate::task_mgr::{self, TaskKind};
|
||||
use crate::tenant::tasks::BackgroundLoopKind;
|
||||
use crate::tenant::{mgr, LogicalSizeCalculationCause, PageReconstructError, Tenant};
|
||||
use camino::Utf8PathBuf;
|
||||
@@ -59,7 +59,6 @@ pub async fn collect_metrics(
|
||||
let worker_ctx =
|
||||
ctx.detached_child(TaskKind::CalculateSyntheticSize, DownloadBehavior::Download);
|
||||
task_mgr::spawn(
|
||||
BACKGROUND_RUNTIME.handle(),
|
||||
TaskKind::CalculateSyntheticSize,
|
||||
None,
|
||||
None,
|
||||
|
||||
@@ -59,7 +59,7 @@ use utils::{completion, id::TimelineId};
|
||||
use crate::{
|
||||
config::PageServerConf,
|
||||
metrics::disk_usage_based_eviction::METRICS,
|
||||
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
|
||||
task_mgr::{self, TaskKind},
|
||||
tenant::{
|
||||
self,
|
||||
mgr::TenantManager,
|
||||
@@ -202,7 +202,6 @@ pub fn launch_disk_usage_global_eviction_task(
|
||||
info!("launching disk usage based eviction task");
|
||||
|
||||
task_mgr::spawn(
|
||||
BACKGROUND_RUNTIME.handle(),
|
||||
TaskKind::DiskUsageEviction,
|
||||
None,
|
||||
None,
|
||||
|
||||
@@ -180,7 +180,6 @@ pub async fn libpq_listener_main(
|
||||
// only deal with a particular timeline, but we don't know which one
|
||||
// yet.
|
||||
task_mgr::spawn(
|
||||
&tokio::runtime::Handle::current(),
|
||||
TaskKind::PageRequestHandler,
|
||||
None,
|
||||
None,
|
||||
|
||||
@@ -98,42 +98,22 @@ use utils::id::TimelineId;
|
||||
// other operations, if the upload tasks e.g. get blocked on locks. It shouldn't
|
||||
// happen, but still.
|
||||
//
|
||||
pub static COMPUTE_REQUEST_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.thread_name("compute request worker")
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("Failed to create compute request runtime")
|
||||
});
|
||||
|
||||
pub static MGMT_REQUEST_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
||||
/// The single tokio runtime used by all pageserver code.
|
||||
/// In the past, we had multiple runtimes, and in the future we should weed out
|
||||
/// remaining references to this global field and rely on ambient runtime instead,
|
||||
/// i.e., use `tokio::spawn` instead of `THE_RUNTIME.spawn()`, etc.
|
||||
pub static THE_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.thread_name("mgmt request worker")
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("Failed to create mgmt request runtime")
|
||||
});
|
||||
|
||||
pub static WALRECEIVER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.thread_name("walreceiver worker")
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("Failed to create walreceiver runtime")
|
||||
});
|
||||
|
||||
pub static BACKGROUND_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.thread_name("background op worker")
|
||||
// if you change the number of worker threads please change the constant below
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("Failed to create background op runtime")
|
||||
});
|
||||
|
||||
pub(crate) static BACKGROUND_RUNTIME_WORKER_THREADS: Lazy<usize> = Lazy::new(|| {
|
||||
pub(crate) static THE_RUNTIME_WORKER_THREADS: Lazy<usize> = Lazy::new(|| {
|
||||
// force init and thus panics
|
||||
let _ = BACKGROUND_RUNTIME.handle();
|
||||
let _ = THE_RUNTIME.handle();
|
||||
// replicates tokio-1.28.1::loom::sys::num_cpus which is not available publicly
|
||||
// tokio would had already panicked for parsing errors or NotUnicode
|
||||
//
|
||||
@@ -325,7 +305,6 @@ struct PageServerTask {
|
||||
/// Note: if shutdown_process_on_error is set to true failure
|
||||
/// of the task will lead to shutdown of entire process
|
||||
pub fn spawn<F>(
|
||||
runtime: &tokio::runtime::Handle,
|
||||
kind: TaskKind,
|
||||
tenant_shard_id: Option<TenantShardId>,
|
||||
timeline_id: Option<TimelineId>,
|
||||
@@ -354,7 +333,7 @@ where
|
||||
|
||||
let task_name = name.to_string();
|
||||
let task_cloned = Arc::clone(&task);
|
||||
let join_handle = runtime.spawn(task_wrapper(
|
||||
let join_handle = tokio::spawn(task_wrapper(
|
||||
task_name,
|
||||
task_id,
|
||||
task_cloned,
|
||||
|
||||
@@ -661,7 +661,6 @@ impl Tenant {
|
||||
let tenant_clone = Arc::clone(&tenant);
|
||||
let ctx = ctx.detached_child(TaskKind::Attach, DownloadBehavior::Warn);
|
||||
task_mgr::spawn(
|
||||
&tokio::runtime::Handle::current(),
|
||||
TaskKind::Attach,
|
||||
Some(tenant_shard_id),
|
||||
None,
|
||||
|
||||
@@ -482,7 +482,6 @@ impl DeleteTenantFlow {
|
||||
let tenant_shard_id = tenant.tenant_shard_id;
|
||||
|
||||
task_mgr::spawn(
|
||||
task_mgr::BACKGROUND_RUNTIME.handle(),
|
||||
TaskKind::TimelineDeletionWorker,
|
||||
Some(tenant_shard_id),
|
||||
None,
|
||||
|
||||
@@ -1850,7 +1850,6 @@ impl TenantManager {
|
||||
let task_tenant_id = None;
|
||||
|
||||
task_mgr::spawn(
|
||||
task_mgr::BACKGROUND_RUNTIME.handle(),
|
||||
TaskKind::MgmtRequest,
|
||||
task_tenant_id,
|
||||
None,
|
||||
@@ -2816,7 +2815,6 @@ pub(crate) fn immediate_gc(
|
||||
|
||||
// TODO: spawning is redundant now, need to hold the gate
|
||||
task_mgr::spawn(
|
||||
&tokio::runtime::Handle::current(),
|
||||
TaskKind::GarbageCollector,
|
||||
Some(tenant_shard_id),
|
||||
Some(timeline_id),
|
||||
|
||||
@@ -223,7 +223,6 @@ use crate::{
|
||||
config::PageServerConf,
|
||||
task_mgr,
|
||||
task_mgr::TaskKind,
|
||||
task_mgr::BACKGROUND_RUNTIME,
|
||||
tenant::metadata::TimelineMetadata,
|
||||
tenant::upload_queue::{
|
||||
UploadOp, UploadQueue, UploadQueueInitialized, UploadQueueStopped, UploadTask,
|
||||
@@ -307,8 +306,6 @@ pub enum PersistIndexPartWithDeletedFlagError {
|
||||
pub struct RemoteTimelineClient {
|
||||
conf: &'static PageServerConf,
|
||||
|
||||
runtime: tokio::runtime::Handle,
|
||||
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
generation: Generation,
|
||||
@@ -341,12 +338,6 @@ impl RemoteTimelineClient {
|
||||
) -> RemoteTimelineClient {
|
||||
RemoteTimelineClient {
|
||||
conf,
|
||||
runtime: if cfg!(test) {
|
||||
// remote_timeline_client.rs tests rely on current-thread runtime
|
||||
tokio::runtime::Handle::current()
|
||||
} else {
|
||||
BACKGROUND_RUNTIME.handle().clone()
|
||||
},
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
generation,
|
||||
@@ -1281,7 +1272,6 @@ impl RemoteTimelineClient {
|
||||
let tenant_shard_id = self.tenant_shard_id;
|
||||
let timeline_id = self.timeline_id;
|
||||
task_mgr::spawn(
|
||||
&self.runtime,
|
||||
TaskKind::RemoteUploadTask,
|
||||
Some(self.tenant_shard_id),
|
||||
Some(self.timeline_id),
|
||||
@@ -1876,7 +1866,6 @@ mod tests {
|
||||
fn build_client(&self, generation: Generation) -> Arc<RemoteTimelineClient> {
|
||||
Arc::new(RemoteTimelineClient {
|
||||
conf: self.harness.conf,
|
||||
runtime: tokio::runtime::Handle::current(),
|
||||
tenant_shard_id: self.harness.tenant_shard_id,
|
||||
timeline_id: TIMELINE_ID,
|
||||
generation,
|
||||
|
||||
@@ -8,7 +8,7 @@ use std::{sync::Arc, time::SystemTime};
|
||||
use crate::{
|
||||
config::PageServerConf,
|
||||
disk_usage_eviction_task::DiskUsageEvictionInfo,
|
||||
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
|
||||
task_mgr::{self, TaskKind},
|
||||
virtual_file::MaybeFatalIo,
|
||||
};
|
||||
|
||||
@@ -317,7 +317,6 @@ pub fn spawn_tasks(
|
||||
tokio::sync::mpsc::channel::<CommandRequest<UploadCommand>>(16);
|
||||
|
||||
task_mgr::spawn(
|
||||
BACKGROUND_RUNTIME.handle(),
|
||||
TaskKind::SecondaryDownloads,
|
||||
None,
|
||||
None,
|
||||
@@ -338,7 +337,6 @@ pub fn spawn_tasks(
|
||||
);
|
||||
|
||||
task_mgr::spawn(
|
||||
BACKGROUND_RUNTIME.handle(),
|
||||
TaskKind::SecondaryUploads,
|
||||
None,
|
||||
None,
|
||||
|
||||
@@ -1447,7 +1447,7 @@ impl LayerInner {
|
||||
#[cfg(test)]
|
||||
tokio::task::spawn(fut);
|
||||
#[cfg(not(test))]
|
||||
crate::task_mgr::BACKGROUND_RUNTIME.spawn(fut);
|
||||
crate::task_mgr::THE_RUNTIME.spawn(fut);
|
||||
}
|
||||
|
||||
/// Needed to use entered runtime in tests, but otherwise use BACKGROUND_RUNTIME.
|
||||
@@ -1458,7 +1458,7 @@ impl LayerInner {
|
||||
#[cfg(test)]
|
||||
tokio::task::spawn_blocking(f);
|
||||
#[cfg(not(test))]
|
||||
crate::task_mgr::BACKGROUND_RUNTIME.spawn_blocking(f);
|
||||
crate::task_mgr::THE_RUNTIME.spawn_blocking(f);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -8,7 +8,7 @@ use std::time::{Duration, Instant};
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::metrics::TENANT_TASK_EVENTS;
|
||||
use crate::task_mgr;
|
||||
use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME};
|
||||
use crate::task_mgr::TaskKind;
|
||||
use crate::tenant::throttle::Stats;
|
||||
use crate::tenant::timeline::CompactionError;
|
||||
use crate::tenant::{Tenant, TenantState};
|
||||
@@ -18,7 +18,7 @@ use utils::{backoff, completion};
|
||||
|
||||
static CONCURRENT_BACKGROUND_TASKS: once_cell::sync::Lazy<tokio::sync::Semaphore> =
|
||||
once_cell::sync::Lazy::new(|| {
|
||||
let total_threads = *task_mgr::BACKGROUND_RUNTIME_WORKER_THREADS;
|
||||
let total_threads = *crate::task_mgr::THE_RUNTIME_WORKER_THREADS;
|
||||
let permits = usize::max(
|
||||
1,
|
||||
// while a lot of the work is done on spawn_blocking, we still do
|
||||
@@ -85,7 +85,6 @@ pub fn start_background_loops(
|
||||
) {
|
||||
let tenant_shard_id = tenant.tenant_shard_id;
|
||||
task_mgr::spawn(
|
||||
BACKGROUND_RUNTIME.handle(),
|
||||
TaskKind::Compaction,
|
||||
Some(tenant_shard_id),
|
||||
None,
|
||||
@@ -109,7 +108,6 @@ pub fn start_background_loops(
|
||||
},
|
||||
);
|
||||
task_mgr::spawn(
|
||||
BACKGROUND_RUNTIME.handle(),
|
||||
TaskKind::GarbageCollector,
|
||||
Some(tenant_shard_id),
|
||||
None,
|
||||
|
||||
@@ -1723,7 +1723,6 @@ impl Timeline {
|
||||
initdb_optimization_count: 0,
|
||||
};
|
||||
task_mgr::spawn(
|
||||
task_mgr::BACKGROUND_RUNTIME.handle(),
|
||||
task_mgr::TaskKind::LayerFlushTask,
|
||||
Some(self.tenant_shard_id),
|
||||
Some(self.timeline_id),
|
||||
@@ -2086,7 +2085,6 @@ impl Timeline {
|
||||
DownloadBehavior::Download,
|
||||
);
|
||||
task_mgr::spawn(
|
||||
task_mgr::BACKGROUND_RUNTIME.handle(),
|
||||
task_mgr::TaskKind::InitialLogicalSizeCalculation,
|
||||
Some(self.tenant_shard_id),
|
||||
Some(self.timeline_id),
|
||||
@@ -2264,7 +2262,6 @@ impl Timeline {
|
||||
DownloadBehavior::Download,
|
||||
);
|
||||
task_mgr::spawn(
|
||||
task_mgr::BACKGROUND_RUNTIME.handle(),
|
||||
task_mgr::TaskKind::OndemandLogicalSizeCalculation,
|
||||
Some(self.tenant_shard_id),
|
||||
Some(self.timeline_id),
|
||||
@@ -4151,7 +4148,6 @@ impl Timeline {
|
||||
|
||||
let self_clone = Arc::clone(&self);
|
||||
let task_id = task_mgr::spawn(
|
||||
task_mgr::BACKGROUND_RUNTIME.handle(),
|
||||
task_mgr::TaskKind::DownloadAllRemoteLayers,
|
||||
Some(self.tenant_shard_id),
|
||||
Some(self.timeline_id),
|
||||
|
||||
@@ -443,7 +443,6 @@ impl DeleteTimelineFlow {
|
||||
let timeline_id = timeline.timeline_id;
|
||||
|
||||
task_mgr::spawn(
|
||||
task_mgr::BACKGROUND_RUNTIME.handle(),
|
||||
TaskKind::TimelineDeletionWorker,
|
||||
Some(tenant_shard_id),
|
||||
Some(timeline_id),
|
||||
|
||||
@@ -28,7 +28,7 @@ use tracing::{debug, error, info, info_span, instrument, warn, Instrument};
|
||||
use crate::{
|
||||
context::{DownloadBehavior, RequestContext},
|
||||
pgdatadir_mapping::CollectKeySpaceError,
|
||||
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
|
||||
task_mgr::{self, TaskKind},
|
||||
tenant::{
|
||||
tasks::BackgroundLoopKind, timeline::EvictionError, LogicalSizeCalculationCause, Tenant,
|
||||
},
|
||||
@@ -56,7 +56,6 @@ impl Timeline {
|
||||
let self_clone = Arc::clone(self);
|
||||
let background_tasks_can_start = background_tasks_can_start.cloned();
|
||||
task_mgr::spawn(
|
||||
BACKGROUND_RUNTIME.handle(),
|
||||
TaskKind::Eviction,
|
||||
Some(self.tenant_shard_id),
|
||||
Some(self.timeline_id),
|
||||
|
||||
@@ -24,7 +24,7 @@ mod connection_manager;
|
||||
mod walreceiver_connection;
|
||||
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::task_mgr::{self, TaskKind, WALRECEIVER_RUNTIME};
|
||||
use crate::task_mgr::{self, TaskKind};
|
||||
use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
|
||||
use crate::tenant::timeline::walreceiver::connection_manager::{
|
||||
connection_manager_loop_step, ConnectionManagerState,
|
||||
@@ -82,7 +82,6 @@ impl WalReceiver {
|
||||
let loop_status = Arc::new(std::sync::RwLock::new(None));
|
||||
let manager_status = Arc::clone(&loop_status);
|
||||
task_mgr::spawn(
|
||||
WALRECEIVER_RUNTIME.handle(),
|
||||
TaskKind::WalReceiverManager,
|
||||
Some(timeline.tenant_shard_id),
|
||||
Some(timeline_id),
|
||||
@@ -181,7 +180,7 @@ impl<E: Clone> TaskHandle<E> {
|
||||
let (events_sender, events_receiver) = watch::channel(TaskStateUpdate::Started);
|
||||
|
||||
let cancellation_clone = cancellation.clone();
|
||||
let join_handle = WALRECEIVER_RUNTIME.spawn(async move {
|
||||
let join_handle = tokio::spawn(async move {
|
||||
events_sender.send(TaskStateUpdate::Started).ok();
|
||||
task(events_sender, cancellation_clone).await
|
||||
// events_sender is dropped at some point during the .await above.
|
||||
|
||||
@@ -27,9 +27,7 @@ use super::TaskStateUpdate;
|
||||
use crate::{
|
||||
context::RequestContext,
|
||||
metrics::{LIVE_CONNECTIONS_COUNT, WALRECEIVER_STARTED_CONNECTIONS, WAL_INGEST},
|
||||
task_mgr,
|
||||
task_mgr::TaskKind,
|
||||
task_mgr::WALRECEIVER_RUNTIME,
|
||||
task_mgr::{self, TaskKind},
|
||||
tenant::{debug_assert_current_span_has_tenant_and_timeline_id, Timeline, WalReceiverInfo},
|
||||
walingest::WalIngest,
|
||||
walrecord::DecodedWALRecord,
|
||||
@@ -163,7 +161,6 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
);
|
||||
let connection_cancellation = cancellation.clone();
|
||||
task_mgr::spawn(
|
||||
WALRECEIVER_RUNTIME.handle(),
|
||||
TaskKind::WalReceiverConnectionPoller,
|
||||
Some(timeline.tenant_shard_id),
|
||||
Some(timeline.timeline_id),
|
||||
|
||||
Reference in New Issue
Block a user