mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-03 02:30:37 +00:00
Compare commits
12 Commits
conrad/pro
...
task_hiera
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d87549696b | ||
|
|
960a29a6fe | ||
|
|
d6f6e9a87b | ||
|
|
ddae6e2b0a | ||
|
|
e021298dec | ||
|
|
9790a7c2e8 | ||
|
|
9660282c69 | ||
|
|
894cd3ddf7 | ||
|
|
735c9b3b70 | ||
|
|
e76b24ccc5 | ||
|
|
6ff2c07cc8 | ||
|
|
efd46e478a |
@@ -370,13 +370,18 @@ fn start_pageserver(
|
|||||||
// Top-level cancellation token for the process
|
// Top-level cancellation token for the process
|
||||||
let shutdown_pageserver = tokio_util::sync::CancellationToken::new();
|
let shutdown_pageserver = tokio_util::sync::CancellationToken::new();
|
||||||
|
|
||||||
|
pageserver::PAGESERVER_SHUTDOWN_TOKEN
|
||||||
|
.set(shutdown_pageserver.clone())
|
||||||
|
.map_err(|_| ())
|
||||||
|
.expect("cannot be set already");
|
||||||
|
|
||||||
// Set up remote storage client
|
// Set up remote storage client
|
||||||
let remote_storage = create_remote_storage_client(conf)?;
|
let remote_storage = create_remote_storage_client(conf)?;
|
||||||
|
|
||||||
// Set up deletion queue
|
// Set up deletion queue
|
||||||
let (deletion_queue, deletion_workers) = DeletionQueue::new(
|
let (deletion_queue, deletion_workers) = DeletionQueue::new(
|
||||||
remote_storage.clone(),
|
remote_storage.clone(),
|
||||||
ControlPlaneClient::new(conf, &shutdown_pageserver),
|
ControlPlaneClient::new(conf, shutdown_pageserver.child_token()),
|
||||||
conf,
|
conf,
|
||||||
);
|
);
|
||||||
if let Some(deletion_workers) = deletion_workers {
|
if let Some(deletion_workers) = deletion_workers {
|
||||||
@@ -420,12 +425,12 @@ fn start_pageserver(
|
|||||||
deletion_queue_client,
|
deletion_queue_client,
|
||||||
},
|
},
|
||||||
order,
|
order,
|
||||||
shutdown_pageserver.clone(),
|
shutdown_pageserver.child_token(),
|
||||||
))?;
|
))?;
|
||||||
let tenant_manager = Arc::new(tenant_manager);
|
let tenant_manager = Arc::new(tenant_manager);
|
||||||
|
|
||||||
BACKGROUND_RUNTIME.spawn({
|
BACKGROUND_RUNTIME.spawn({
|
||||||
let shutdown_pageserver = shutdown_pageserver.clone();
|
let shutdown_pageserver = shutdown_pageserver.child_token();
|
||||||
let drive_init = async move {
|
let drive_init = async move {
|
||||||
// NOTE: unlike many futures in pageserver, this one is cancellation-safe
|
// NOTE: unlike many futures in pageserver, this one is cancellation-safe
|
||||||
let guard = scopeguard::guard_on_success((), |_| {
|
let guard = scopeguard::guard_on_success((), |_| {
|
||||||
@@ -516,6 +521,7 @@ fn start_pageserver(
|
|||||||
remote_storage.clone(),
|
remote_storage.clone(),
|
||||||
disk_usage_eviction_state.clone(),
|
disk_usage_eviction_state.clone(),
|
||||||
background_jobs_barrier.clone(),
|
background_jobs_barrier.clone(),
|
||||||
|
shutdown_pageserver.child_token(),
|
||||||
)?;
|
)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -536,13 +542,16 @@ fn start_pageserver(
|
|||||||
)
|
)
|
||||||
.context("Failed to initialize router state")?,
|
.context("Failed to initialize router state")?,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let cancel = shutdown_pageserver.child_token();
|
||||||
|
|
||||||
let router = http::make_router(router_state, launch_ts, http_auth.clone())?
|
let router = http::make_router(router_state, launch_ts, http_auth.clone())?
|
||||||
.build()
|
.build()
|
||||||
.map_err(|err| anyhow!(err))?;
|
.map_err(|err| anyhow!(err))?;
|
||||||
let service = utils::http::RouterService::new(router).unwrap();
|
let service = utils::http::RouterService::new(router).unwrap();
|
||||||
let server = hyper::Server::from_tcp(http_listener)?
|
let server = hyper::Server::from_tcp(http_listener)?
|
||||||
.serve(service)
|
.serve(service)
|
||||||
.with_graceful_shutdown(task_mgr::shutdown_watcher());
|
.with_graceful_shutdown(cancel.clone().cancelled_owned());
|
||||||
|
|
||||||
task_mgr::spawn(
|
task_mgr::spawn(
|
||||||
MGMT_REQUEST_RUNTIME.handle(),
|
MGMT_REQUEST_RUNTIME.handle(),
|
||||||
@@ -551,6 +560,7 @@ fn start_pageserver(
|
|||||||
None,
|
None,
|
||||||
"http endpoint listener",
|
"http endpoint listener",
|
||||||
true,
|
true,
|
||||||
|
cancel,
|
||||||
async {
|
async {
|
||||||
server.await?;
|
server.await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
@@ -576,6 +586,7 @@ fn start_pageserver(
|
|||||||
None,
|
None,
|
||||||
"consumption metrics collection",
|
"consumption metrics collection",
|
||||||
true,
|
true,
|
||||||
|
shutdown_pageserver.child_token(),
|
||||||
async move {
|
async move {
|
||||||
// first wait until background jobs are cleared to launch.
|
// first wait until background jobs are cleared to launch.
|
||||||
//
|
//
|
||||||
@@ -624,6 +635,7 @@ fn start_pageserver(
|
|||||||
None,
|
None,
|
||||||
"libpq endpoint listener",
|
"libpq endpoint listener",
|
||||||
true,
|
true,
|
||||||
|
shutdown_pageserver.child_token(),
|
||||||
async move {
|
async move {
|
||||||
page_service::libpq_listener_main(
|
page_service::libpq_listener_main(
|
||||||
conf,
|
conf,
|
||||||
@@ -657,9 +669,8 @@ fn start_pageserver(
|
|||||||
signal.name()
|
signal.name()
|
||||||
);
|
);
|
||||||
|
|
||||||
// This cancels the `shutdown_pageserver` cancellation tree.
|
// This cancels the `shutdown_pageserver` cancellation tree and signals cancellation to
|
||||||
// Right now that tree doesn't reach very far, and `task_mgr` is used instead.
|
// all tasks in the system.
|
||||||
// The plan is to change that over time.
|
|
||||||
shutdown_pageserver.take();
|
shutdown_pageserver.take();
|
||||||
let bg_remote_storage = remote_storage.clone();
|
let bg_remote_storage = remote_storage.clone();
|
||||||
let bg_deletion_queue = deletion_queue.clone();
|
let bg_deletion_queue = deletion_queue.clone();
|
||||||
|
|||||||
@@ -65,6 +65,7 @@ pub async fn collect_metrics(
|
|||||||
None,
|
None,
|
||||||
"synthetic size calculation",
|
"synthetic size calculation",
|
||||||
false,
|
false,
|
||||||
|
cancel.child_token(),
|
||||||
async move {
|
async move {
|
||||||
calculate_synthetic_size_worker(
|
calculate_synthetic_size_worker(
|
||||||
synthetic_size_calculation_interval,
|
synthetic_size_calculation_interval,
|
||||||
|
|||||||
@@ -40,7 +40,7 @@ pub trait ControlPlaneGenerationsApi {
|
|||||||
impl ControlPlaneClient {
|
impl ControlPlaneClient {
|
||||||
/// A None return value indicates that the input `conf` object does not have control
|
/// A None return value indicates that the input `conf` object does not have control
|
||||||
/// plane API enabled.
|
/// plane API enabled.
|
||||||
pub fn new(conf: &'static PageServerConf, cancel: &CancellationToken) -> Option<Self> {
|
pub fn new(conf: &'static PageServerConf, cancel: CancellationToken) -> Option<Self> {
|
||||||
let mut url = match conf.control_plane_api.as_ref() {
|
let mut url = match conf.control_plane_api.as_ref() {
|
||||||
Some(u) => u.clone(),
|
Some(u) => u.clone(),
|
||||||
None => return None,
|
None => return None,
|
||||||
@@ -67,7 +67,7 @@ impl ControlPlaneClient {
|
|||||||
http_client: client.build().expect("Failed to construct HTTP client"),
|
http_client: client.build().expect("Failed to construct HTTP client"),
|
||||||
base_url: url,
|
base_url: url,
|
||||||
node_id: conf.id,
|
node_id: conf.id,
|
||||||
cancel: cancel.clone(),
|
cancel,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -87,6 +87,7 @@ pub fn launch_disk_usage_global_eviction_task(
|
|||||||
storage: GenericRemoteStorage,
|
storage: GenericRemoteStorage,
|
||||||
state: Arc<State>,
|
state: Arc<State>,
|
||||||
background_jobs_barrier: completion::Barrier,
|
background_jobs_barrier: completion::Barrier,
|
||||||
|
cancel: CancellationToken,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let Some(task_config) = &conf.disk_usage_based_eviction else {
|
let Some(task_config) = &conf.disk_usage_based_eviction else {
|
||||||
info!("disk usage based eviction task not configured");
|
info!("disk usage based eviction task not configured");
|
||||||
@@ -102,6 +103,7 @@ pub fn launch_disk_usage_global_eviction_task(
|
|||||||
None,
|
None,
|
||||||
"disk usage based eviction",
|
"disk usage based eviction",
|
||||||
false,
|
false,
|
||||||
|
cancel,
|
||||||
async move {
|
async move {
|
||||||
let cancel = task_mgr::shutdown_token();
|
let cancel = task_mgr::shutdown_token();
|
||||||
|
|
||||||
|
|||||||
@@ -49,11 +49,22 @@ pub const DELTA_FILE_MAGIC: u16 = 0x5A61;
|
|||||||
|
|
||||||
static ZERO_PAGE: bytes::Bytes = bytes::Bytes::from_static(&[0u8; 8192]);
|
static ZERO_PAGE: bytes::Bytes = bytes::Bytes::from_static(&[0u8; 8192]);
|
||||||
|
|
||||||
|
/// The main cancellation token for the process.
|
||||||
|
///
|
||||||
|
/// Should only ever be used to create child tokens.
|
||||||
|
pub static PAGESERVER_SHUTDOWN_TOKEN: std::sync::OnceLock<tokio_util::sync::CancellationToken> =
|
||||||
|
std::sync::OnceLock::new();
|
||||||
|
|
||||||
pub use crate::metrics::preinitialize_metrics;
|
pub use crate::metrics::preinitialize_metrics;
|
||||||
|
|
||||||
#[tracing::instrument(skip_all, fields(%exit_code))]
|
#[tracing::instrument(skip_all, fields(%exit_code))]
|
||||||
pub async fn shutdown_pageserver(deletion_queue: Option<DeletionQueue>, exit_code: i32) {
|
pub async fn shutdown_pageserver(deletion_queue: Option<DeletionQueue>, exit_code: i32) {
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
|
if let Some(token) = PAGESERVER_SHUTDOWN_TOKEN.get() {
|
||||||
|
token.cancel();
|
||||||
|
}
|
||||||
|
|
||||||
// Shut down the libpq endpoint task. This prevents new connections from
|
// Shut down the libpq endpoint task. This prevents new connections from
|
||||||
// being accepted.
|
// being accepted.
|
||||||
timed(
|
timed(
|
||||||
|
|||||||
@@ -166,6 +166,7 @@ pub async fn libpq_listener_main(
|
|||||||
None,
|
None,
|
||||||
"serving compute connection task",
|
"serving compute connection task",
|
||||||
false,
|
false,
|
||||||
|
cancel.child_token(),
|
||||||
page_service_conn_main(
|
page_service_conn_main(
|
||||||
conf,
|
conf,
|
||||||
broker_client.clone(),
|
broker_client.clone(),
|
||||||
|
|||||||
@@ -327,6 +327,7 @@ struct PageServerTask {
|
|||||||
/// Launch a new task
|
/// Launch a new task
|
||||||
/// Note: if shutdown_process_on_error is set to true failure
|
/// Note: if shutdown_process_on_error is set to true failure
|
||||||
/// of the task will lead to shutdown of entire process
|
/// of the task will lead to shutdown of entire process
|
||||||
|
#[allow(clippy::too_many_arguments)]
|
||||||
pub fn spawn<F>(
|
pub fn spawn<F>(
|
||||||
runtime: &tokio::runtime::Handle,
|
runtime: &tokio::runtime::Handle,
|
||||||
kind: TaskKind,
|
kind: TaskKind,
|
||||||
@@ -334,12 +335,13 @@ pub fn spawn<F>(
|
|||||||
timeline_id: Option<TimelineId>,
|
timeline_id: Option<TimelineId>,
|
||||||
name: &str,
|
name: &str,
|
||||||
shutdown_process_on_error: bool,
|
shutdown_process_on_error: bool,
|
||||||
|
cancel: CancellationToken,
|
||||||
future: F,
|
future: F,
|
||||||
) -> PageserverTaskId
|
) -> PageserverTaskId
|
||||||
where
|
where
|
||||||
F: Future<Output = anyhow::Result<()>> + Send + 'static,
|
F: Future<Output = anyhow::Result<()>> + Send + 'static,
|
||||||
{
|
{
|
||||||
let cancel = CancellationToken::new();
|
// let cancel = CancellationToken::new();
|
||||||
let task_id = NEXT_TASK_ID.fetch_add(1, Ordering::Relaxed);
|
let task_id = NEXT_TASK_ID.fetch_add(1, Ordering::Relaxed);
|
||||||
let task = Arc::new(PageServerTask {
|
let task = Arc::new(PageServerTask {
|
||||||
task_id: PageserverTaskId(task_id),
|
task_id: PageserverTaskId(task_id),
|
||||||
@@ -558,9 +560,14 @@ pub async fn shutdown_watcher() {
|
|||||||
/// cancelled. It can however be moved to other tasks, such as `tokio::task::spawn_blocking` or
|
/// cancelled. It can however be moved to other tasks, such as `tokio::task::spawn_blocking` or
|
||||||
/// `tokio::task::JoinSet::spawn`.
|
/// `tokio::task::JoinSet::spawn`.
|
||||||
pub fn shutdown_token() -> CancellationToken {
|
pub fn shutdown_token() -> CancellationToken {
|
||||||
SHUTDOWN_TOKEN
|
let res = SHUTDOWN_TOKEN.try_with(|t| t.clone());
|
||||||
.try_with(|t| t.clone())
|
|
||||||
.expect("shutdown_token() called in an unexpected task or thread")
|
if cfg!(test) {
|
||||||
|
res.unwrap_or_default()
|
||||||
|
} else {
|
||||||
|
// tests need to call the same paths which need to use get the shutdown token
|
||||||
|
res.expect("shutdown_token() called in an unexpected task or thread")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Has the current task been requested to shut down?
|
/// Has the current task been requested to shut down?
|
||||||
|
|||||||
@@ -486,6 +486,7 @@ impl Tenant {
|
|||||||
ancestor.clone(),
|
ancestor.clone(),
|
||||||
resources,
|
resources,
|
||||||
CreateTimelineCause::Load,
|
CreateTimelineCause::Load,
|
||||||
|
self.cancel.child_token(),
|
||||||
)?;
|
)?;
|
||||||
let disk_consistent_lsn = timeline.get_disk_consistent_lsn();
|
let disk_consistent_lsn = timeline.get_disk_consistent_lsn();
|
||||||
anyhow::ensure!(
|
anyhow::ensure!(
|
||||||
@@ -503,7 +504,7 @@ impl Tenant {
|
|||||||
.remote_client
|
.remote_client
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.init_upload_queue(index_part)?;
|
.init_upload_queue(index_part, timeline.cancel.child_token())?;
|
||||||
} else if self.remote_storage.is_some() {
|
} else if self.remote_storage.is_some() {
|
||||||
// No data on the remote storage, but we have local metadata file. We can end up
|
// No data on the remote storage, but we have local metadata file. We can end up
|
||||||
// here with timeline_create being interrupted before finishing index part upload.
|
// here with timeline_create being interrupted before finishing index part upload.
|
||||||
@@ -511,7 +512,7 @@ impl Tenant {
|
|||||||
// If control plane retries timeline creation in the meantime, the mgmt API handler
|
// If control plane retries timeline creation in the meantime, the mgmt API handler
|
||||||
// for timeline creation will coalesce on the upload we queue here.
|
// for timeline creation will coalesce on the upload we queue here.
|
||||||
let rtc = timeline.remote_client.as_ref().unwrap();
|
let rtc = timeline.remote_client.as_ref().unwrap();
|
||||||
rtc.init_upload_queue_for_empty_remote(&metadata)?;
|
rtc.init_upload_queue_for_empty_remote(&metadata, timeline.cancel.child_token())?;
|
||||||
rtc.schedule_index_upload_for_metadata_update(&metadata)?;
|
rtc.schedule_index_upload_for_metadata_update(&metadata)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -605,6 +606,12 @@ impl Tenant {
|
|||||||
let tenant_clone = Arc::clone(&tenant);
|
let tenant_clone = Arc::clone(&tenant);
|
||||||
|
|
||||||
let ctx = ctx.detached_child(TaskKind::Attach, DownloadBehavior::Warn);
|
let ctx = ctx.detached_child(TaskKind::Attach, DownloadBehavior::Warn);
|
||||||
|
let cancel = crate::PAGESERVER_SHUTDOWN_TOKEN
|
||||||
|
.get()
|
||||||
|
.cloned()
|
||||||
|
.unwrap_or_default()
|
||||||
|
.child_token();
|
||||||
|
|
||||||
task_mgr::spawn(
|
task_mgr::spawn(
|
||||||
&tokio::runtime::Handle::current(),
|
&tokio::runtime::Handle::current(),
|
||||||
TaskKind::Attach,
|
TaskKind::Attach,
|
||||||
@@ -612,6 +619,7 @@ impl Tenant {
|
|||||||
None,
|
None,
|
||||||
"attach tenant",
|
"attach tenant",
|
||||||
false,
|
false,
|
||||||
|
cancel,
|
||||||
async move {
|
async move {
|
||||||
// Ideally we should use Tenant::set_broken_no_wait, but it is not supposed to be used when tenant is in loading state.
|
// Ideally we should use Tenant::set_broken_no_wait, but it is not supposed to be used when tenant is in loading state.
|
||||||
let make_broken =
|
let make_broken =
|
||||||
@@ -871,8 +879,10 @@ impl Tenant {
|
|||||||
|
|
||||||
// Walk through deleted timelines, resume deletion
|
// Walk through deleted timelines, resume deletion
|
||||||
for (timeline_id, index_part, remote_timeline_client) in timelines_to_resume_deletions {
|
for (timeline_id, index_part, remote_timeline_client) in timelines_to_resume_deletions {
|
||||||
|
let cancel = self.cancel.child_token();
|
||||||
|
|
||||||
remote_timeline_client
|
remote_timeline_client
|
||||||
.init_upload_queue_stopped_to_continue_deletion(&index_part)
|
.init_upload_queue_stopped_to_continue_deletion(&index_part, cancel.child_token())
|
||||||
.context("init queue stopped")
|
.context("init queue stopped")
|
||||||
.map_err(LoadLocalTimelineError::ResumeDeletion)?;
|
.map_err(LoadLocalTimelineError::ResumeDeletion)?;
|
||||||
|
|
||||||
@@ -882,6 +892,7 @@ impl Tenant {
|
|||||||
&index_part.metadata,
|
&index_part.metadata,
|
||||||
Some(remote_timeline_client),
|
Some(remote_timeline_client),
|
||||||
self.deletion_queue_client.clone(),
|
self.deletion_queue_client.clone(),
|
||||||
|
cancel,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.context("resume_deletion")
|
.context("resume_deletion")
|
||||||
@@ -1215,7 +1226,7 @@ impl Tenant {
|
|||||||
timeline_id,
|
timeline_id,
|
||||||
self.generation,
|
self.generation,
|
||||||
);
|
);
|
||||||
let cancel_clone = cancel.clone();
|
let cancel_clone = cancel.child_token();
|
||||||
part_downloads.spawn(
|
part_downloads.spawn(
|
||||||
async move {
|
async move {
|
||||||
debug!("starting index part download");
|
debug!("starting index part download");
|
||||||
@@ -1376,6 +1387,7 @@ impl Tenant {
|
|||||||
&local_metadata,
|
&local_metadata,
|
||||||
None,
|
None,
|
||||||
self.deletion_queue_client.clone(),
|
self.deletion_queue_client.clone(),
|
||||||
|
self.cancel.child_token(),
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.context("resume deletion")
|
.context("resume deletion")
|
||||||
@@ -2290,6 +2302,7 @@ impl Tenant {
|
|||||||
ancestor: Option<Arc<Timeline>>,
|
ancestor: Option<Arc<Timeline>>,
|
||||||
resources: TimelineResources,
|
resources: TimelineResources,
|
||||||
cause: CreateTimelineCause,
|
cause: CreateTimelineCause,
|
||||||
|
cancel: CancellationToken,
|
||||||
) -> anyhow::Result<Arc<Timeline>> {
|
) -> anyhow::Result<Arc<Timeline>> {
|
||||||
let state = match cause {
|
let state = match cause {
|
||||||
CreateTimelineCause::Load => {
|
CreateTimelineCause::Load => {
|
||||||
@@ -2318,7 +2331,7 @@ impl Tenant {
|
|||||||
resources,
|
resources,
|
||||||
pg_version,
|
pg_version,
|
||||||
state,
|
state,
|
||||||
self.cancel.child_token(),
|
cancel,
|
||||||
);
|
);
|
||||||
|
|
||||||
Ok(timeline)
|
Ok(timeline)
|
||||||
@@ -2391,6 +2404,12 @@ impl Tenant {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
let cancel = crate::PAGESERVER_SHUTDOWN_TOKEN
|
||||||
|
.get()
|
||||||
|
.cloned()
|
||||||
|
.unwrap_or_default()
|
||||||
|
.child_token();
|
||||||
|
|
||||||
Tenant {
|
Tenant {
|
||||||
tenant_shard_id,
|
tenant_shard_id,
|
||||||
shard_identity,
|
shard_identity,
|
||||||
@@ -2410,7 +2429,7 @@ impl Tenant {
|
|||||||
cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)),
|
cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)),
|
||||||
eviction_task_tenant_state: tokio::sync::Mutex::new(EvictionTaskTenantState::default()),
|
eviction_task_tenant_state: tokio::sync::Mutex::new(EvictionTaskTenantState::default()),
|
||||||
delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTenantFlow::default())),
|
delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTenantFlow::default())),
|
||||||
cancel: CancellationToken::default(),
|
cancel,
|
||||||
gate: Gate::new(format!("Tenant<{tenant_shard_id}>")),
|
gate: Gate::new(format!("Tenant<{tenant_shard_id}>")),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -3154,8 +3173,11 @@ impl Tenant {
|
|||||||
let tenant_shard_id = self.tenant_shard_id;
|
let tenant_shard_id = self.tenant_shard_id;
|
||||||
|
|
||||||
let resources = self.build_timeline_resources(new_timeline_id);
|
let resources = self.build_timeline_resources(new_timeline_id);
|
||||||
|
|
||||||
|
let cancel = self.cancel.child_token();
|
||||||
|
|
||||||
if let Some(remote_client) = &resources.remote_client {
|
if let Some(remote_client) = &resources.remote_client {
|
||||||
remote_client.init_upload_queue_for_empty_remote(new_metadata)?;
|
remote_client.init_upload_queue_for_empty_remote(new_metadata, cancel.child_token())?;
|
||||||
}
|
}
|
||||||
|
|
||||||
let timeline_struct = self
|
let timeline_struct = self
|
||||||
@@ -3165,6 +3187,7 @@ impl Tenant {
|
|||||||
ancestor,
|
ancestor,
|
||||||
resources,
|
resources,
|
||||||
CreateTimelineCause::Load,
|
CreateTimelineCause::Load,
|
||||||
|
cancel,
|
||||||
)
|
)
|
||||||
.context("Failed to create timeline data structure")?;
|
.context("Failed to create timeline data structure")?;
|
||||||
|
|
||||||
|
|||||||
@@ -460,6 +460,12 @@ impl DeleteTenantFlow {
|
|||||||
) {
|
) {
|
||||||
let tenant_shard_id = tenant.tenant_shard_id;
|
let tenant_shard_id = tenant.tenant_shard_id;
|
||||||
|
|
||||||
|
let cancel = crate::PAGESERVER_SHUTDOWN_TOKEN
|
||||||
|
.get()
|
||||||
|
.cloned()
|
||||||
|
.unwrap_or_default()
|
||||||
|
.child_token();
|
||||||
|
|
||||||
task_mgr::spawn(
|
task_mgr::spawn(
|
||||||
task_mgr::BACKGROUND_RUNTIME.handle(),
|
task_mgr::BACKGROUND_RUNTIME.handle(),
|
||||||
TaskKind::TimelineDeletionWorker,
|
TaskKind::TimelineDeletionWorker,
|
||||||
@@ -467,6 +473,7 @@ impl DeleteTenantFlow {
|
|||||||
None,
|
None,
|
||||||
"tenant_delete",
|
"tenant_delete",
|
||||||
false,
|
false,
|
||||||
|
cancel,
|
||||||
async move {
|
async move {
|
||||||
if let Err(err) =
|
if let Err(err) =
|
||||||
Self::background(guard, conf, remote_storage, tenants, &tenant).await
|
Self::background(guard, conf, remote_storage, tenants, &tenant).await
|
||||||
|
|||||||
@@ -283,7 +283,7 @@ async fn init_load_generations(
|
|||||||
"Emergency mode! Tenants will be attached unsafely using their last known generation"
|
"Emergency mode! Tenants will be attached unsafely using their last known generation"
|
||||||
);
|
);
|
||||||
emergency_generations(tenant_confs)
|
emergency_generations(tenant_confs)
|
||||||
} else if let Some(client) = ControlPlaneClient::new(conf, cancel) {
|
} else if let Some(client) = ControlPlaneClient::new(conf, cancel.child_token()) {
|
||||||
info!("Calling control plane API to re-attach tenants");
|
info!("Calling control plane API to re-attach tenants");
|
||||||
// If we are configured to use the control plane API, then it is the source of truth for what tenants to load.
|
// If we are configured to use the control plane API, then it is the source of truth for what tenants to load.
|
||||||
match client.re_attach().await {
|
match client.re_attach().await {
|
||||||
@@ -1352,6 +1352,11 @@ pub(crate) async fn detach_tenant(
|
|||||||
// Although we are cleaning up the tenant, this task is not meant to be bound by the lifetime of the tenant in memory.
|
// Although we are cleaning up the tenant, this task is not meant to be bound by the lifetime of the tenant in memory.
|
||||||
// After a tenant is detached, there are no more task_mgr tasks for that tenant_id.
|
// After a tenant is detached, there are no more task_mgr tasks for that tenant_id.
|
||||||
let task_tenant_id = None;
|
let task_tenant_id = None;
|
||||||
|
let cancel = crate::PAGESERVER_SHUTDOWN_TOKEN
|
||||||
|
.get()
|
||||||
|
.cloned()
|
||||||
|
.unwrap_or_default()
|
||||||
|
.child_token();
|
||||||
task_mgr::spawn(
|
task_mgr::spawn(
|
||||||
task_mgr::BACKGROUND_RUNTIME.handle(),
|
task_mgr::BACKGROUND_RUNTIME.handle(),
|
||||||
TaskKind::MgmtRequest,
|
TaskKind::MgmtRequest,
|
||||||
@@ -1359,6 +1364,7 @@ pub(crate) async fn detach_tenant(
|
|||||||
None,
|
None,
|
||||||
"tenant_files_delete",
|
"tenant_files_delete",
|
||||||
false,
|
false,
|
||||||
|
cancel,
|
||||||
async move {
|
async move {
|
||||||
fs::remove_dir_all(tmp_path.as_path())
|
fs::remove_dir_all(tmp_path.as_path())
|
||||||
.await
|
.await
|
||||||
@@ -2086,6 +2092,7 @@ pub(crate) async fn immediate_gc(
|
|||||||
Some(timeline_id),
|
Some(timeline_id),
|
||||||
&format!("timeline_gc_handler garbage collection run for tenant {tenant_shard_id} timeline {timeline_id}"),
|
&format!("timeline_gc_handler garbage collection run for tenant {tenant_shard_id} timeline {timeline_id}"),
|
||||||
false,
|
false,
|
||||||
|
tenant.cancel.child_token(),
|
||||||
async move {
|
async move {
|
||||||
fail::fail_point!("immediate_gc_task_pre");
|
fail::fail_point!("immediate_gc_task_pre");
|
||||||
|
|
||||||
|
|||||||
@@ -357,9 +357,13 @@ impl RemoteTimelineClient {
|
|||||||
/// Initialize the upload queue for a remote storage that already received
|
/// Initialize the upload queue for a remote storage that already received
|
||||||
/// an index file upload, i.e., it's not empty.
|
/// an index file upload, i.e., it's not empty.
|
||||||
/// The given `index_part` must be the one on the remote.
|
/// The given `index_part` must be the one on the remote.
|
||||||
pub fn init_upload_queue(&self, index_part: &IndexPart) -> anyhow::Result<()> {
|
pub fn init_upload_queue(
|
||||||
|
&self,
|
||||||
|
index_part: &IndexPart,
|
||||||
|
cancel: CancellationToken,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
let mut upload_queue = self.upload_queue.lock().unwrap();
|
let mut upload_queue = self.upload_queue.lock().unwrap();
|
||||||
upload_queue.initialize_with_current_remote_index_part(index_part)?;
|
upload_queue.initialize_with_current_remote_index_part(index_part, cancel)?;
|
||||||
self.update_remote_physical_size_gauge(Some(index_part));
|
self.update_remote_physical_size_gauge(Some(index_part));
|
||||||
info!(
|
info!(
|
||||||
"initialized upload queue from remote index with {} layer files",
|
"initialized upload queue from remote index with {} layer files",
|
||||||
@@ -373,9 +377,10 @@ impl RemoteTimelineClient {
|
|||||||
pub fn init_upload_queue_for_empty_remote(
|
pub fn init_upload_queue_for_empty_remote(
|
||||||
&self,
|
&self,
|
||||||
local_metadata: &TimelineMetadata,
|
local_metadata: &TimelineMetadata,
|
||||||
|
cancel: CancellationToken,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let mut upload_queue = self.upload_queue.lock().unwrap();
|
let mut upload_queue = self.upload_queue.lock().unwrap();
|
||||||
upload_queue.initialize_empty_remote(local_metadata)?;
|
upload_queue.initialize_empty_remote(local_metadata, cancel)?;
|
||||||
self.update_remote_physical_size_gauge(None);
|
self.update_remote_physical_size_gauge(None);
|
||||||
info!("initialized upload queue as empty");
|
info!("initialized upload queue as empty");
|
||||||
Ok(())
|
Ok(())
|
||||||
@@ -386,6 +391,7 @@ impl RemoteTimelineClient {
|
|||||||
pub fn init_upload_queue_stopped_to_continue_deletion(
|
pub fn init_upload_queue_stopped_to_continue_deletion(
|
||||||
&self,
|
&self,
|
||||||
index_part: &IndexPart,
|
index_part: &IndexPart,
|
||||||
|
cancel: CancellationToken,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
// FIXME: consider newtype for DeletedIndexPart.
|
// FIXME: consider newtype for DeletedIndexPart.
|
||||||
let deleted_at = index_part.deleted_at.ok_or(anyhow::anyhow!(
|
let deleted_at = index_part.deleted_at.ok_or(anyhow::anyhow!(
|
||||||
@@ -394,7 +400,7 @@ impl RemoteTimelineClient {
|
|||||||
|
|
||||||
{
|
{
|
||||||
let mut upload_queue = self.upload_queue.lock().unwrap();
|
let mut upload_queue = self.upload_queue.lock().unwrap();
|
||||||
upload_queue.initialize_with_current_remote_index_part(index_part)?;
|
upload_queue.initialize_with_current_remote_index_part(index_part, cancel)?;
|
||||||
self.update_remote_physical_size_gauge(Some(index_part));
|
self.update_remote_physical_size_gauge(Some(index_part));
|
||||||
}
|
}
|
||||||
// also locks upload queue, without dropping the guard above it will be a deadlock
|
// also locks upload queue, without dropping the guard above it will be a deadlock
|
||||||
@@ -1227,6 +1233,7 @@ impl RemoteTimelineClient {
|
|||||||
Some(self.timeline_id),
|
Some(self.timeline_id),
|
||||||
"remote upload",
|
"remote upload",
|
||||||
false,
|
false,
|
||||||
|
upload_queue.cancel.child_token(),
|
||||||
async move {
|
async move {
|
||||||
self_rc.perform_upload_task(task).await;
|
self_rc.perform_upload_task(task).await;
|
||||||
Ok(())
|
Ok(())
|
||||||
@@ -1561,6 +1568,13 @@ impl RemoteTimelineClient {
|
|||||||
dangling_files: HashMap::default(),
|
dangling_files: HashMap::default(),
|
||||||
shutting_down: false,
|
shutting_down: false,
|
||||||
shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
|
shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
|
||||||
|
// TODO: this is the only place where we cannot reasonably continue the
|
||||||
|
// tree
|
||||||
|
cancel: crate::PAGESERVER_SHUTDOWN_TOKEN
|
||||||
|
.get()
|
||||||
|
.cloned()
|
||||||
|
.unwrap_or_default()
|
||||||
|
.child_token(),
|
||||||
};
|
};
|
||||||
|
|
||||||
let upload_queue = std::mem::replace(
|
let upload_queue = std::mem::replace(
|
||||||
|
|||||||
@@ -841,6 +841,7 @@ impl LayerInner {
|
|||||||
Some(self.desc.timeline_id),
|
Some(self.desc.timeline_id),
|
||||||
&task_name,
|
&task_name,
|
||||||
false,
|
false,
|
||||||
|
timeline.cancel.child_token(),
|
||||||
async move {
|
async move {
|
||||||
|
|
||||||
let client = timeline
|
let client = timeline
|
||||||
@@ -860,6 +861,21 @@ impl LayerInner {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
let consecutive_failures =
|
||||||
|
this.consecutive_failures.fetch_add(1, Ordering::Relaxed);
|
||||||
|
|
||||||
|
let backoff = utils::backoff::exponential_backoff_duration_seconds(
|
||||||
|
consecutive_failures.min(u32::MAX as usize) as u32,
|
||||||
|
1.5,
|
||||||
|
60.0,
|
||||||
|
);
|
||||||
|
let backoff = std::time::Duration::from_secs_f64(backoff);
|
||||||
|
|
||||||
|
tokio::select! {
|
||||||
|
_ = tokio::time::sleep(backoff) => {},
|
||||||
|
_ = crate::task_mgr::shutdown_token().cancelled_owned() => {},
|
||||||
|
};
|
||||||
|
|
||||||
Err(e)
|
Err(e)
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@@ -907,24 +923,7 @@ impl LayerInner {
|
|||||||
|
|
||||||
Ok(permit)
|
Ok(permit)
|
||||||
}
|
}
|
||||||
Ok((Err(e), _permit)) => {
|
Ok((Err(_), _permit)) => Err(DownloadError::DownloadFailed),
|
||||||
// FIXME: this should be with the spawned task and be cancellation sensitive
|
|
||||||
//
|
|
||||||
// while we should not need this, this backoff has turned out to be useful with
|
|
||||||
// a bug of unexpectedly deleted remote layer file (#5787).
|
|
||||||
let consecutive_failures =
|
|
||||||
self.consecutive_failures.fetch_add(1, Ordering::Relaxed);
|
|
||||||
tracing::error!(consecutive_failures, "layer file download failed: {e:#}");
|
|
||||||
let backoff = utils::backoff::exponential_backoff_duration_seconds(
|
|
||||||
consecutive_failures.min(u32::MAX as usize) as u32,
|
|
||||||
1.5,
|
|
||||||
60.0,
|
|
||||||
);
|
|
||||||
let backoff = std::time::Duration::from_secs_f64(backoff);
|
|
||||||
|
|
||||||
tokio::time::sleep(backoff).await;
|
|
||||||
Err(DownloadError::DownloadFailed)
|
|
||||||
}
|
|
||||||
Err(_gone) => Err(DownloadError::DownloadCancelled),
|
Err(_gone) => Err(DownloadError::DownloadCancelled),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -54,31 +54,21 @@ impl BackgroundLoopKind {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) enum RateLimitError {
|
pub(crate) async fn concurrent_background_tasks_rate_limit_permit(
|
||||||
Cancelled,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) async fn concurrent_background_tasks_rate_limit(
|
|
||||||
loop_kind: BackgroundLoopKind,
|
loop_kind: BackgroundLoopKind,
|
||||||
_ctx: &RequestContext,
|
_ctx: &RequestContext,
|
||||||
cancel: &CancellationToken,
|
) -> impl Drop {
|
||||||
) -> Result<impl Drop, RateLimitError> {
|
|
||||||
crate::metrics::BACKGROUND_LOOP_SEMAPHORE_WAIT_START_COUNT
|
crate::metrics::BACKGROUND_LOOP_SEMAPHORE_WAIT_START_COUNT
|
||||||
.with_label_values(&[loop_kind.as_static_str()])
|
.with_label_values(&[loop_kind.as_static_str()])
|
||||||
.inc();
|
.inc();
|
||||||
|
|
||||||
scopeguard::defer!(
|
scopeguard::defer!(
|
||||||
crate::metrics::BACKGROUND_LOOP_SEMAPHORE_WAIT_FINISH_COUNT.with_label_values(&[loop_kind.as_static_str()]).inc();
|
crate::metrics::BACKGROUND_LOOP_SEMAPHORE_WAIT_FINISH_COUNT.with_label_values(&[loop_kind.as_static_str()]).inc();
|
||||||
);
|
);
|
||||||
tokio::select! {
|
|
||||||
permit = CONCURRENT_BACKGROUND_TASKS.acquire() => {
|
match CONCURRENT_BACKGROUND_TASKS.acquire().await {
|
||||||
match permit {
|
Ok(permit) => permit,
|
||||||
Ok(permit) => Ok(permit),
|
Err(_closed) => unreachable!("we never close the semaphore"),
|
||||||
Err(_closed) => unreachable!("we never close the semaphore"),
|
|
||||||
}
|
|
||||||
},
|
|
||||||
_ = cancel.cancelled() => {
|
|
||||||
Err(RateLimitError::Cancelled)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -95,6 +85,7 @@ pub fn start_background_loops(
|
|||||||
None,
|
None,
|
||||||
&format!("compactor for tenant {tenant_shard_id}"),
|
&format!("compactor for tenant {tenant_shard_id}"),
|
||||||
false,
|
false,
|
||||||
|
tenant.cancel.child_token(),
|
||||||
{
|
{
|
||||||
let tenant = Arc::clone(tenant);
|
let tenant = Arc::clone(tenant);
|
||||||
let background_jobs_can_start = background_jobs_can_start.cloned();
|
let background_jobs_can_start = background_jobs_can_start.cloned();
|
||||||
@@ -118,6 +109,7 @@ pub fn start_background_loops(
|
|||||||
None,
|
None,
|
||||||
&format!("garbage collector for tenant {tenant_shard_id}"),
|
&format!("garbage collector for tenant {tenant_shard_id}"),
|
||||||
false,
|
false,
|
||||||
|
tenant.cancel.child_token(),
|
||||||
{
|
{
|
||||||
let tenant = Arc::clone(tenant);
|
let tenant = Arc::clone(tenant);
|
||||||
let background_jobs_can_start = background_jobs_can_start.cloned();
|
let background_jobs_can_start = background_jobs_can_start.cloned();
|
||||||
|
|||||||
@@ -51,7 +51,7 @@ use crate::tenant::storage_layer::{
|
|||||||
LayerAccessStatsReset, LayerFileName, ResidentLayer, ValueReconstructResult,
|
LayerAccessStatsReset, LayerFileName, ResidentLayer, ValueReconstructResult,
|
||||||
ValueReconstructState,
|
ValueReconstructState,
|
||||||
};
|
};
|
||||||
use crate::tenant::tasks::{BackgroundLoopKind, RateLimitError};
|
use crate::tenant::tasks::BackgroundLoopKind;
|
||||||
use crate::tenant::timeline::logical_size::CurrentLogicalSize;
|
use crate::tenant::timeline::logical_size::CurrentLogicalSize;
|
||||||
use crate::tenant::{
|
use crate::tenant::{
|
||||||
layer_map::{LayerMap, SearchResult},
|
layer_map::{LayerMap, SearchResult},
|
||||||
@@ -708,19 +708,27 @@ impl Timeline {
|
|||||||
flags: EnumSet<CompactFlags>,
|
flags: EnumSet<CompactFlags>,
|
||||||
ctx: &RequestContext,
|
ctx: &RequestContext,
|
||||||
) -> Result<(), CompactionError> {
|
) -> Result<(), CompactionError> {
|
||||||
let _g = self.compaction_lock.lock().await;
|
// most likely the cancellation token is from background task, but in tests it could be the
|
||||||
|
// request task as well.
|
||||||
|
|
||||||
|
let prepare = async move {
|
||||||
|
let guard = self.compaction_lock.lock().await;
|
||||||
|
|
||||||
|
let permit = super::tasks::concurrent_background_tasks_rate_limit_permit(
|
||||||
|
BackgroundLoopKind::Compaction,
|
||||||
|
ctx,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
(guard, permit)
|
||||||
|
};
|
||||||
|
|
||||||
// this wait probably never needs any "long time spent" logging, because we already nag if
|
// this wait probably never needs any "long time spent" logging, because we already nag if
|
||||||
// compaction task goes over it's period (20s) which is quite often in production.
|
// compaction task goes over it's period (20s) which is quite often in production.
|
||||||
let _permit = match super::tasks::concurrent_background_tasks_rate_limit(
|
let (_guard, _permit) = tokio::select! {
|
||||||
BackgroundLoopKind::Compaction,
|
tuple = prepare => { tuple },
|
||||||
ctx,
|
_ = self.cancel.cancelled() => return Ok(()),
|
||||||
cancel,
|
_ = cancel.cancelled() => return Ok(()),
|
||||||
)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(permit) => permit,
|
|
||||||
Err(RateLimitError::Cancelled) => return Ok(()),
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let last_record_lsn = self.get_last_record_lsn();
|
let last_record_lsn = self.get_last_record_lsn();
|
||||||
@@ -1389,6 +1397,7 @@ impl Timeline {
|
|||||||
Some(self.timeline_id),
|
Some(self.timeline_id),
|
||||||
"layer flush task",
|
"layer flush task",
|
||||||
false,
|
false,
|
||||||
|
self.cancel.child_token(),
|
||||||
async move {
|
async move {
|
||||||
let _guard = guard;
|
let _guard = guard;
|
||||||
let background_ctx = RequestContext::todo_child(TaskKind::LayerFlushTask, DownloadBehavior::Error);
|
let background_ctx = RequestContext::todo_child(TaskKind::LayerFlushTask, DownloadBehavior::Error);
|
||||||
@@ -1740,6 +1749,7 @@ impl Timeline {
|
|||||||
Some(self.timeline_id),
|
Some(self.timeline_id),
|
||||||
"initial size calculation",
|
"initial size calculation",
|
||||||
false,
|
false,
|
||||||
|
self.cancel.child_token(),
|
||||||
// NB: don't log errors here, task_mgr will do that.
|
// NB: don't log errors here, task_mgr will do that.
|
||||||
async move {
|
async move {
|
||||||
let cancel = task_mgr::shutdown_token();
|
let cancel = task_mgr::shutdown_token();
|
||||||
@@ -1775,22 +1785,19 @@ impl Timeline {
|
|||||||
let skip_concurrency_limiter = &skip_concurrency_limiter;
|
let skip_concurrency_limiter = &skip_concurrency_limiter;
|
||||||
async move {
|
async move {
|
||||||
let cancel = task_mgr::shutdown_token();
|
let cancel = task_mgr::shutdown_token();
|
||||||
let wait_for_permit = super::tasks::concurrent_background_tasks_rate_limit(
|
let wait_for_permit = super::tasks::concurrent_background_tasks_rate_limit_permit(
|
||||||
BackgroundLoopKind::InitialLogicalSizeCalculation,
|
BackgroundLoopKind::InitialLogicalSizeCalculation,
|
||||||
background_ctx,
|
background_ctx,
|
||||||
&cancel,
|
|
||||||
);
|
);
|
||||||
|
|
||||||
use crate::metrics::initial_logical_size::StartCircumstances;
|
use crate::metrics::initial_logical_size::StartCircumstances;
|
||||||
let (_maybe_permit, circumstances) = tokio::select! {
|
let (_maybe_permit, circumstances) = tokio::select! {
|
||||||
res = wait_for_permit => {
|
permit = wait_for_permit => {
|
||||||
match res {
|
(Some(permit), StartCircumstances::AfterBackgroundTasksRateLimit)
|
||||||
Ok(permit) => (Some(permit), StartCircumstances::AfterBackgroundTasksRateLimit),
|
|
||||||
Err(RateLimitError::Cancelled) => {
|
|
||||||
return Err(BackgroundCalculationError::Cancelled);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
_ = cancel.cancelled() => {
|
||||||
|
return Err(BackgroundCalculationError::Cancelled);
|
||||||
|
},
|
||||||
() = skip_concurrency_limiter.cancelled() => {
|
() = skip_concurrency_limiter.cancelled() => {
|
||||||
// Some action that is part of a end user interaction requested logical size
|
// Some action that is part of a end user interaction requested logical size
|
||||||
// => break out of the rate limit
|
// => break out of the rate limit
|
||||||
@@ -1913,6 +1920,7 @@ impl Timeline {
|
|||||||
Some(self.timeline_id),
|
Some(self.timeline_id),
|
||||||
"ondemand logical size calculation",
|
"ondemand logical size calculation",
|
||||||
false,
|
false,
|
||||||
|
self.cancel.child_token(),
|
||||||
async move {
|
async move {
|
||||||
let res = self_clone
|
let res = self_clone
|
||||||
.logical_size_calculation_task(lsn, cause, &ctx)
|
.logical_size_calculation_task(lsn, cause, &ctx)
|
||||||
@@ -3796,7 +3804,14 @@ impl Timeline {
|
|||||||
/// within a layer file. We can only remove the whole file if it's fully
|
/// within a layer file. We can only remove the whole file if it's fully
|
||||||
/// obsolete.
|
/// obsolete.
|
||||||
pub(super) async fn gc(&self) -> anyhow::Result<GcResult> {
|
pub(super) async fn gc(&self) -> anyhow::Result<GcResult> {
|
||||||
let _g = self.gc_lock.lock().await;
|
// this is most likely the background tasks, but it might be the spawned task from
|
||||||
|
// immediate_gc
|
||||||
|
let cancel = crate::task_mgr::shutdown_token();
|
||||||
|
let _g = tokio::select! {
|
||||||
|
guard = self.gc_lock.lock() => guard,
|
||||||
|
_ = self.cancel.cancelled() => return Ok(GcResult::default()),
|
||||||
|
_ = cancel.cancelled() => return Ok(GcResult::default()),
|
||||||
|
};
|
||||||
let timer = self.metrics.garbage_collect_histo.start_timer();
|
let timer = self.metrics.garbage_collect_histo.start_timer();
|
||||||
|
|
||||||
fail_point!("before-timeline-gc");
|
fail_point!("before-timeline-gc");
|
||||||
@@ -4138,6 +4153,7 @@ impl Timeline {
|
|||||||
Some(self.timeline_id),
|
Some(self.timeline_id),
|
||||||
"download all remote layers task",
|
"download all remote layers task",
|
||||||
false,
|
false,
|
||||||
|
self.cancel.child_token(),
|
||||||
async move {
|
async move {
|
||||||
self_clone.download_all_remote_layers(request).await;
|
self_clone.download_all_remote_layers(request).await;
|
||||||
let mut status_guard = self_clone.download_all_remote_layers_task_info.write().unwrap();
|
let mut status_guard = self_clone.download_all_remote_layers_task_info.write().unwrap();
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ use std::{
|
|||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
use pageserver_api::{models::TimelineState, shard::TenantShardId};
|
use pageserver_api::{models::TimelineState, shard::TenantShardId};
|
||||||
use tokio::sync::OwnedMutexGuard;
|
use tokio::sync::OwnedMutexGuard;
|
||||||
|
use tokio_util::sync::CancellationToken;
|
||||||
use tracing::{debug, error, info, instrument, warn, Instrument, Span};
|
use tracing::{debug, error, info, instrument, warn, Instrument, Span};
|
||||||
use utils::{crashsafe, fs_ext, id::TimelineId};
|
use utils::{crashsafe, fs_ext, id::TimelineId};
|
||||||
|
|
||||||
@@ -406,6 +407,7 @@ impl DeleteTimelineFlow {
|
|||||||
local_metadata: &TimelineMetadata,
|
local_metadata: &TimelineMetadata,
|
||||||
remote_client: Option<RemoteTimelineClient>,
|
remote_client: Option<RemoteTimelineClient>,
|
||||||
deletion_queue_client: DeletionQueueClient,
|
deletion_queue_client: DeletionQueueClient,
|
||||||
|
cancel: CancellationToken,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
// Note: here we even skip populating layer map. Timeline is essentially uninitialized.
|
// Note: here we even skip populating layer map. Timeline is essentially uninitialized.
|
||||||
// RemoteTimelineClient is the only functioning part.
|
// RemoteTimelineClient is the only functioning part.
|
||||||
@@ -421,6 +423,7 @@ impl DeleteTimelineFlow {
|
|||||||
// Important. We dont pass ancestor above because it can be missing.
|
// Important. We dont pass ancestor above because it can be missing.
|
||||||
// Thus we need to skip the validation here.
|
// Thus we need to skip the validation here.
|
||||||
CreateTimelineCause::Delete,
|
CreateTimelineCause::Delete,
|
||||||
|
cancel,
|
||||||
)
|
)
|
||||||
.context("create_timeline_struct")?;
|
.context("create_timeline_struct")?;
|
||||||
|
|
||||||
@@ -532,6 +535,7 @@ impl DeleteTimelineFlow {
|
|||||||
Some(timeline_id),
|
Some(timeline_id),
|
||||||
"timeline_delete",
|
"timeline_delete",
|
||||||
false,
|
false,
|
||||||
|
tenant.cancel.child_token(),
|
||||||
async move {
|
async move {
|
||||||
if let Err(err) = Self::background(guard, conf, &tenant, &timeline).await {
|
if let Err(err) = Self::background(guard, conf, &tenant, &timeline).await {
|
||||||
error!("Error: {err:#}");
|
error!("Error: {err:#}");
|
||||||
|
|||||||
@@ -30,7 +30,7 @@ use crate::{
|
|||||||
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
|
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
|
||||||
tenant::{
|
tenant::{
|
||||||
config::{EvictionPolicy, EvictionPolicyLayerAccessThreshold},
|
config::{EvictionPolicy, EvictionPolicyLayerAccessThreshold},
|
||||||
tasks::{BackgroundLoopKind, RateLimitError},
|
tasks::BackgroundLoopKind,
|
||||||
timeline::EvictionError,
|
timeline::EvictionError,
|
||||||
LogicalSizeCalculationCause, Tenant,
|
LogicalSizeCalculationCause, Tenant,
|
||||||
},
|
},
|
||||||
@@ -67,6 +67,7 @@ impl Timeline {
|
|||||||
self.tenant_shard_id, self.timeline_id
|
self.tenant_shard_id, self.timeline_id
|
||||||
),
|
),
|
||||||
false,
|
false,
|
||||||
|
self.cancel.child_token(),
|
||||||
async move {
|
async move {
|
||||||
let cancel = task_mgr::shutdown_token();
|
let cancel = task_mgr::shutdown_token();
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
@@ -158,15 +159,14 @@ impl Timeline {
|
|||||||
) -> ControlFlow<()> {
|
) -> ControlFlow<()> {
|
||||||
let now = SystemTime::now();
|
let now = SystemTime::now();
|
||||||
|
|
||||||
let _permit = match crate::tenant::tasks::concurrent_background_tasks_rate_limit(
|
let acquire_permit = crate::tenant::tasks::concurrent_background_tasks_rate_limit_permit(
|
||||||
BackgroundLoopKind::Eviction,
|
BackgroundLoopKind::Eviction,
|
||||||
ctx,
|
ctx,
|
||||||
cancel,
|
);
|
||||||
)
|
|
||||||
.await
|
let _permit = tokio::select! {
|
||||||
{
|
permit = acquire_permit => permit,
|
||||||
Ok(permit) => permit,
|
_ = cancel.cancelled() => return ControlFlow::Break(()),
|
||||||
Err(RateLimitError::Cancelled) => return ControlFlow::Break(()),
|
|
||||||
};
|
};
|
||||||
|
|
||||||
// If we evict layers but keep cached values derived from those layers, then
|
// If we evict layers but keep cached values derived from those layers, then
|
||||||
|
|||||||
@@ -87,6 +87,7 @@ impl WalReceiver {
|
|||||||
Some(timeline_id),
|
Some(timeline_id),
|
||||||
&format!("walreceiver for timeline {tenant_shard_id}/{timeline_id}"),
|
&format!("walreceiver for timeline {tenant_shard_id}/{timeline_id}"),
|
||||||
false,
|
false,
|
||||||
|
timeline.cancel.child_token(),
|
||||||
async move {
|
async move {
|
||||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||||
debug!("WAL receiver manager started, connecting to broker");
|
debug!("WAL receiver manager started, connecting to broker");
|
||||||
|
|||||||
@@ -167,6 +167,7 @@ pub(super) async fn handle_walreceiver_connection(
|
|||||||
Some(timeline.timeline_id),
|
Some(timeline.timeline_id),
|
||||||
"walreceiver connection",
|
"walreceiver connection",
|
||||||
false,
|
false,
|
||||||
|
cancellation.clone(),
|
||||||
async move {
|
async move {
|
||||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||||
|
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ use std::fmt::Debug;
|
|||||||
|
|
||||||
use chrono::NaiveDateTime;
|
use chrono::NaiveDateTime;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use tokio_util::sync::CancellationToken;
|
||||||
use tracing::info;
|
use tracing::info;
|
||||||
use utils::lsn::AtomicLsn;
|
use utils::lsn::AtomicLsn;
|
||||||
|
|
||||||
@@ -98,6 +99,8 @@ pub(crate) struct UploadQueueInitialized {
|
|||||||
/// wait on until one of them stops the queue. The semaphore is closed when
|
/// wait on until one of them stops the queue. The semaphore is closed when
|
||||||
/// `RemoteTimelineClient::launch_queued_tasks` encounters `UploadOp::Shutdown`.
|
/// `RemoteTimelineClient::launch_queued_tasks` encounters `UploadOp::Shutdown`.
|
||||||
pub(crate) shutdown_ready: Arc<tokio::sync::Semaphore>,
|
pub(crate) shutdown_ready: Arc<tokio::sync::Semaphore>,
|
||||||
|
|
||||||
|
pub(crate) cancel: CancellationToken,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl UploadQueueInitialized {
|
impl UploadQueueInitialized {
|
||||||
@@ -130,6 +133,7 @@ impl UploadQueue {
|
|||||||
pub(crate) fn initialize_empty_remote(
|
pub(crate) fn initialize_empty_remote(
|
||||||
&mut self,
|
&mut self,
|
||||||
metadata: &TimelineMetadata,
|
metadata: &TimelineMetadata,
|
||||||
|
cancel: CancellationToken,
|
||||||
) -> anyhow::Result<&mut UploadQueueInitialized> {
|
) -> anyhow::Result<&mut UploadQueueInitialized> {
|
||||||
match self {
|
match self {
|
||||||
UploadQueue::Uninitialized => (),
|
UploadQueue::Uninitialized => (),
|
||||||
@@ -158,6 +162,7 @@ impl UploadQueue {
|
|||||||
dangling_files: HashMap::new(),
|
dangling_files: HashMap::new(),
|
||||||
shutting_down: false,
|
shutting_down: false,
|
||||||
shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
|
shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
|
||||||
|
cancel,
|
||||||
};
|
};
|
||||||
|
|
||||||
*self = UploadQueue::Initialized(state);
|
*self = UploadQueue::Initialized(state);
|
||||||
@@ -167,6 +172,7 @@ impl UploadQueue {
|
|||||||
pub(crate) fn initialize_with_current_remote_index_part(
|
pub(crate) fn initialize_with_current_remote_index_part(
|
||||||
&mut self,
|
&mut self,
|
||||||
index_part: &IndexPart,
|
index_part: &IndexPart,
|
||||||
|
cancel: CancellationToken,
|
||||||
) -> anyhow::Result<&mut UploadQueueInitialized> {
|
) -> anyhow::Result<&mut UploadQueueInitialized> {
|
||||||
match self {
|
match self {
|
||||||
UploadQueue::Uninitialized => (),
|
UploadQueue::Uninitialized => (),
|
||||||
@@ -207,6 +213,7 @@ impl UploadQueue {
|
|||||||
dangling_files: HashMap::new(),
|
dangling_files: HashMap::new(),
|
||||||
shutting_down: false,
|
shutting_down: false,
|
||||||
shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
|
shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
|
||||||
|
cancel,
|
||||||
};
|
};
|
||||||
|
|
||||||
*self = UploadQueue::Initialized(state);
|
*self = UploadQueue::Initialized(state);
|
||||||
|
|||||||
Reference in New Issue
Block a user