mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-02 04:50:38 +00:00
Merge branch 'tenant-tasks' of github.com:neondatabase/neon into tenant-tasks
This commit is contained in:
@@ -1,15 +1,17 @@
|
||||
//! This module contains functions to serve per-tenant background processes,
|
||||
//! such as compaction and GC
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::ops::ControlFlow;
|
||||
|
||||
use crate::repository::Repository;
|
||||
use crate::tenant_mgr::TenantState;
|
||||
use crate::thread_mgr::ThreadKind;
|
||||
use crate::{tenant_mgr, thread_mgr};
|
||||
use anyhow;
|
||||
use anyhow::{self, Context};
|
||||
use once_cell::sync::OnceCell;
|
||||
use tokio::sync::mpsc::{self, Sender};
|
||||
use tokio::task::JoinHandle;
|
||||
use tracing::*;
|
||||
use utils::zid::ZTenantId;
|
||||
|
||||
@@ -75,9 +77,10 @@ static START_COMPACTION_LOOP: OnceCell<Sender<ZTenantId>> = OnceCell::new();
|
||||
pub fn start_gc_loop(tenantid: ZTenantId) -> anyhow::Result<()> {
|
||||
START_GC_LOOP
|
||||
.get()
|
||||
.expect("failed to get START_GC_LOOP")
|
||||
.context("Failed to get START_GC_LOOP")?
|
||||
.blocking_send(tenantid)
|
||||
.expect("Failed to send to START_GC_LOOP channel");
|
||||
.map_err(|e| anyhow::anyhow!(e))
|
||||
.context("Failed to send to START_GC_LOOP channel")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -87,9 +90,10 @@ pub fn start_gc_loop(tenantid: ZTenantId) -> anyhow::Result<()> {
|
||||
pub fn start_compaction_loop(tenantid: ZTenantId) -> anyhow::Result<()> {
|
||||
START_COMPACTION_LOOP
|
||||
.get()
|
||||
.expect("failed to get START_COMPACTION_LOOP")
|
||||
.context("failed to get START_COMPACTION_LOOP")?
|
||||
.blocking_send(tenantid)
|
||||
.expect("failed to send to START_COMPACTION_LOOP");
|
||||
.map_err(|e| anyhow::anyhow!(e))
|
||||
.context("failed to send to START_COMPACTION_LOOP")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -113,6 +117,10 @@ pub fn init_tenant_task_pool() -> anyhow::Result<()> {
|
||||
.set(compaction_send)
|
||||
.expect("Failed to set START_COMPACTION_LOOP");
|
||||
|
||||
// TODO this is getting repetitive
|
||||
let mut gc_loops = HashMap::<ZTenantId, JoinHandle<Result<(), anyhow::Error>>>::new();
|
||||
let mut compaction_loops = HashMap::<ZTenantId, JoinHandle<Result<(), anyhow::Error>>>::new();
|
||||
|
||||
thread_mgr::spawn(
|
||||
ThreadKind::TenantTaskManager,
|
||||
None,
|
||||
@@ -124,17 +132,29 @@ pub fn init_tenant_task_pool() -> anyhow::Result<()> {
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = thread_mgr::shutdown_watcher() => {
|
||||
// TODO cancel all running tasks
|
||||
break
|
||||
for (_, handle) in gc_loops.drain() {
|
||||
handle.abort();
|
||||
}
|
||||
for (_, handle) in compaction_loops.drain() {
|
||||
handle.abort();
|
||||
}
|
||||
break;
|
||||
},
|
||||
// TODO don't spawn if already running
|
||||
tenantid = gc_recv.recv() => {
|
||||
let tenantid = tenantid.expect("Gc task channel closed unexpectedly");
|
||||
tokio::spawn(gc_loop(tenantid));
|
||||
let new_handle = tokio::spawn(gc_loop(tenantid));
|
||||
if let Some(old_handle) = gc_loops.insert(tenantid, new_handle) {
|
||||
// TODO use non-blocking cancel chan instead
|
||||
old_handle.abort();
|
||||
}
|
||||
},
|
||||
tenantid = compaction_recv.recv() => {
|
||||
let tenantid = tenantid.expect("Compaction task channel closed unexpectedly");
|
||||
tokio::spawn(compaction_loop(tenantid));
|
||||
let new_handle = tokio::spawn(compaction_loop(tenantid));
|
||||
if let Some(old_handle) = compaction_loops.insert(tenantid, new_handle) {
|
||||
// TODO use non-blocking cancel chan instead
|
||||
old_handle.abort();
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user