make current_thread mode work

We need to have &'static Runtime, not &'static Handle, because
&'static Handle doesn't drive IO/timers on current_thread RT.
This commit is contained in:
Christian Schwarz
2024-04-05 17:51:04 +00:00
parent 70fb7e3580
commit edd7f69c2d
12 changed files with 27 additions and 23 deletions

View File

@@ -391,7 +391,7 @@ fn start_pageserver(
conf,
);
if let Some(deletion_workers) = deletion_workers {
deletion_workers.spawn_with(*BACKGROUND_RUNTIME);
deletion_workers.spawn_with(BACKGROUND_RUNTIME.handle());
}
// Up to this point no significant I/O has been done: this should have been fast. Record
@@ -569,7 +569,7 @@ fn start_pageserver(
.with_graceful_shutdown(task_mgr::shutdown_watcher());
task_mgr::spawn(
*MGMT_REQUEST_RUNTIME,
MGMT_REQUEST_RUNTIME.handle(),
TaskKind::HttpEndpointListener,
None,
None,
@@ -594,7 +594,7 @@ fn start_pageserver(
let local_disk_storage = conf.workdir.join("last_consumption_metrics.json");
task_mgr::spawn(
*crate::BACKGROUND_RUNTIME,
crate::BACKGROUND_RUNTIME.handle(),
TaskKind::MetricsCollection,
None,
None,
@@ -647,7 +647,7 @@ fn start_pageserver(
DownloadBehavior::Error,
);
task_mgr::spawn(
*COMPUTE_REQUEST_RUNTIME,
COMPUTE_REQUEST_RUNTIME.handle(),
TaskKind::LibpqEndpointListener,
None,
None,
@@ -682,6 +682,10 @@ fn start_pageserver(
.expect("forever() never returns None unless explicitly closed");
});
let signal = BACKGROUND_RUNTIME
// NB: in `NEON_PAGESERVER_USE_ONE_RUNTIME=current_thread`, this
// is where the executor is actually driven. In multi-threaded runtime
// modes, the executor threads are spawned internally, so, async execution
// is driven even before we reach here.
.block_on(signal_handler)
.expect("join error");
match signal {

View File

@@ -64,7 +64,7 @@ pub async fn collect_metrics(
let worker_ctx =
ctx.detached_child(TaskKind::CalculateSyntheticSize, DownloadBehavior::Download);
task_mgr::spawn(
*BACKGROUND_RUNTIME,
BACKGROUND_RUNTIME.handle(),
TaskKind::CalculateSyntheticSize,
None,
None,

View File

@@ -201,7 +201,7 @@ pub fn launch_disk_usage_global_eviction_task(
info!("launching disk usage based eviction task");
task_mgr::spawn(
*BACKGROUND_RUNTIME,
BACKGROUND_RUNTIME.handle(),
TaskKind::DiskUsageEviction,
None,
None,

View File

@@ -147,7 +147,7 @@ impl FromStr for TokioRuntimeMode {
}
static ONE_RUNTIME: Lazy<Option<tokio::runtime::Runtime>> = Lazy::new(|| {
let thread_name = "pageserver worker";
let thread_name = "tokio-executor";
let Some(mode) = env::var("NEON_PAGESERVER_USE_ONE_RUNTIME") else {
// If the env var is not set, leave this static as None.
set_tokio_runtime_setup(
@@ -190,9 +190,9 @@ static ONE_RUNTIME: Lazy<Option<tokio::runtime::Runtime>> = Lazy::new(|| {
/// otherwise.
macro_rules! pageserver_runtime {
($varname:ident, $name:literal) => {
pub static $varname: Lazy<&'static tokio::runtime::Handle> = Lazy::new(|| {
pub static $varname: Lazy<&'static tokio::runtime::Runtime> = Lazy::new(|| {
if let Some(runtime) = &*ONE_RUNTIME {
return runtime.handle();
return runtime;
}
static RUNTIME: Lazy<tokio::runtime::Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_multi_thread()
@@ -202,7 +202,7 @@ macro_rules! pageserver_runtime {
.build()
.expect(std::concat!("Failed to create runtime ", $name))
});
RUNTIME.handle()
&*RUNTIME
});
};
}

View File

@@ -485,7 +485,7 @@ impl DeleteTenantFlow {
let tenant_shard_id = tenant.tenant_shard_id;
task_mgr::spawn(
*task_mgr::BACKGROUND_RUNTIME,
task_mgr::BACKGROUND_RUNTIME.handle(),
TaskKind::TimelineDeletionWorker,
Some(tenant_shard_id),
None,

View File

@@ -1849,7 +1849,7 @@ impl TenantManager {
let task_tenant_id = None;
task_mgr::spawn(
*task_mgr::BACKGROUND_RUNTIME,
task_mgr::BACKGROUND_RUNTIME.handle(),
TaskKind::MgmtRequest,
task_tenant_id,
None,

View File

@@ -341,7 +341,7 @@ impl RemoteTimelineClient {
// remote_timeline_client.rs tests rely on current-thread runtime
tokio::runtime::Handle::current()
} else {
BACKGROUND_RUNTIME.clone()
BACKGROUND_RUNTIME.handle().clone()
},
tenant_shard_id,
timeline_id,

View File

@@ -317,7 +317,7 @@ pub fn spawn_tasks(
tokio::sync::mpsc::channel::<CommandRequest<UploadCommand>>(16);
task_mgr::spawn(
*BACKGROUND_RUNTIME,
BACKGROUND_RUNTIME.handle(),
TaskKind::SecondaryDownloads,
None,
None,
@@ -338,7 +338,7 @@ pub fn spawn_tasks(
);
task_mgr::spawn(
*BACKGROUND_RUNTIME,
BACKGROUND_RUNTIME.handle(),
TaskKind::SecondaryUploads,
None,
None,

View File

@@ -86,7 +86,7 @@ pub fn start_background_loops(
) {
let tenant_shard_id = tenant.tenant_shard_id;
task_mgr::spawn(
*BACKGROUND_RUNTIME,
BACKGROUND_RUNTIME.handle(),
TaskKind::Compaction,
Some(tenant_shard_id),
None,
@@ -110,7 +110,7 @@ pub fn start_background_loops(
},
);
task_mgr::spawn(
*BACKGROUND_RUNTIME,
BACKGROUND_RUNTIME.handle(),
TaskKind::GarbageCollector,
Some(tenant_shard_id),
None,

View File

@@ -1962,7 +1962,7 @@ impl Timeline {
initdb_optimization_count: 0,
};
task_mgr::spawn(
*task_mgr::BACKGROUND_RUNTIME,
task_mgr::BACKGROUND_RUNTIME.handle(),
task_mgr::TaskKind::LayerFlushTask,
Some(self.tenant_shard_id),
Some(self.timeline_id),
@@ -2324,7 +2324,7 @@ impl Timeline {
DownloadBehavior::Download,
);
task_mgr::spawn(
*task_mgr::BACKGROUND_RUNTIME,
task_mgr::BACKGROUND_RUNTIME.handle(),
task_mgr::TaskKind::InitialLogicalSizeCalculation,
Some(self.tenant_shard_id),
Some(self.timeline_id),
@@ -2502,7 +2502,7 @@ impl Timeline {
DownloadBehavior::Download,
);
task_mgr::spawn(
*task_mgr::BACKGROUND_RUNTIME,
task_mgr::BACKGROUND_RUNTIME.handle(),
task_mgr::TaskKind::OndemandLogicalSizeCalculation,
Some(self.tenant_shard_id),
Some(self.timeline_id),
@@ -4484,7 +4484,7 @@ impl Timeline {
let self_clone = Arc::clone(&self);
let task_id = task_mgr::spawn(
*task_mgr::BACKGROUND_RUNTIME,
task_mgr::BACKGROUND_RUNTIME.handle(),
task_mgr::TaskKind::DownloadAllRemoteLayers,
Some(self.tenant_shard_id),
Some(self.timeline_id),

View File

@@ -383,7 +383,7 @@ impl DeleteTimelineFlow {
let timeline_id = timeline.timeline_id;
task_mgr::spawn(
*task_mgr::BACKGROUND_RUNTIME,
task_mgr::BACKGROUND_RUNTIME.handle(),
TaskKind::TimelineDeletionWorker,
Some(tenant_shard_id),
Some(timeline_id),

View File

@@ -57,7 +57,7 @@ impl Timeline {
let self_clone = Arc::clone(self);
let background_tasks_can_start = background_tasks_can_start.cloned();
task_mgr::spawn(
*BACKGROUND_RUNTIME,
BACKGROUND_RUNTIME.handle(),
TaskKind::Eviction,
Some(self.tenant_shard_id),
Some(self.timeline_id),