Introduce RequestContexts.

RequestContext is used to track each "operation" or "task" in a way
that's not tied to tokio tasks. It provides support for fine-grained
cancellation of individual operations, or all tasks working on an
active tenant or timeline. Most async functions now take a
RequestContext argument.

RequestContexts form a hierarchy, so that you have a top-level context
e.g.  for a TCP listener task, a child context for each task handling
a connection, and perhaps a grandchild context for each individual
client request. In addition to the hierarchy, each RequestContext can
be associated with a Tenant or Timeline object. This is used to
prevent a Tenant or Timeline from being deleted or detached while
there are still tasks accessing it. This fixes a long-standing race
conditions between GC/compaction and deletion (see issues #2914 and
compiler in any way, but the functions like `get_active_timeline`
make it easy to do the right thing.

This replaces most of the machinery in `task_mgr.rs`. We don't track
running tasks as such anymore, only RequestContexts. In practice,
every task holds onto a RequestContext.

In addition to supporting cancellation, the RequestContext specifies
the desired behavior if a remote layer is needed for the operation.
This replaces the `with_ondemand_download_sync` and
`no_ondemand_download` macros. The on-demand download now happens deep
in the call stack, in get_reconstruct_data(), and the caller is no
longer involved in the download, except by passing a RequestContext
that specifies whether to do on-demand download or not. The
PageReconstructResult type is gone but the
PageReconstructError::NeedsDownload variant remains. It's now returned
if the context specified "don't do on-demand download", and a layer
is missing.

TODO:
- Enforce better that you hold a RequestContext associated with a Tenant
  or Timeline.
- All the fields in RequestContext are currently 'pub', but things will
  break if you modify the tenant/timeline fields directly. Make that more
  safe.
- When you create a subcontext, should it inherit the Tenant / Timeline
  of its parent?
- Can the walreceiver::TaskHandle stuff be replaced with this?
- Extract smaller patches:
  - What else could we extract?
This commit is contained in:
Heikki Linnakangas
2023-01-04 09:24:36 +02:00
parent 6a53b8fac6
commit 5aaa5302eb
24 changed files with 1609 additions and 1280 deletions

View File

@@ -10,7 +10,7 @@
//! This module is responsible for creation of such tarball
//! from data stored in object storage.
//!
use anyhow::{anyhow, bail, ensure, Context};
use anyhow::{anyhow, ensure, Context, Result};
use bytes::{BufMut, BytesMut};
use fail::fail_point;
use std::fmt::Write as FmtWrite;
@@ -27,7 +27,8 @@ use tracing::*;
///
use tokio_tar::{Builder, EntryType, Header};
use crate::tenant::{Timeline, TimelineRequestContext};
use crate::tenant::TimelineRequestContext;
use crate::tenant::{PageReconstructError, Timeline};
use pageserver_api::reltag::{RelTag, SlruKind};
use postgres_ffi::pg_constants::{DEFAULTTABLESPACE_OID, GLOBALTABLESPACE_OID};
@@ -53,7 +54,7 @@ pub async fn send_basebackup_tarball<'a, W>(
prev_lsn: Option<Lsn>,
full_backup: bool,
ctx: &'a TimelineRequestContext,
) -> anyhow::Result<()>
) -> Result<(), PageReconstructError>
where
W: AsyncWrite + Send + Sync + Unpin,
{
@@ -92,8 +93,10 @@ where
// Consolidate the derived and the provided prev_lsn values
let prev_lsn = if let Some(provided_prev_lsn) = prev_lsn {
if backup_prev != Lsn(0) {
ensure!(backup_prev == provided_prev_lsn);
if backup_prev != Lsn(0) && backup_prev != provided_prev_lsn {
return Err(PageReconstructError::Other(anyhow!(
"prev LSN doesn't match"
)));
}
provided_prev_lsn
} else {
@@ -138,7 +141,7 @@ impl<'a, W> Basebackup<'a, W>
where
W: AsyncWrite + Send + Sync + Unpin,
{
async fn send_tarball(mut self) -> anyhow::Result<()> {
async fn send_tarball(mut self) -> Result<(), PageReconstructError> {
// TODO include checksum
// Create pgdata subdirs structure
@@ -209,17 +212,19 @@ where
}
fail_point!("basebackup-before-control-file", |_| {
bail!("failpoint basebackup-before-control-file")
Err(PageReconstructError::from(anyhow!(
"failpoint basebackup-before-control-file"
)))
});
// Generate pg_control and bootstrap WAL segment.
self.add_pgcontrol_file().await?;
self.ar.finish().await?;
self.ar.finish().await.context("could not finish tarball")?;
debug!("all tarred up!");
Ok(())
}
async fn add_rel(&mut self, tag: RelTag) -> anyhow::Result<()> {
async fn add_rel(&mut self, tag: RelTag) -> Result<(), PageReconstructError> {
let nblocks = self
.timeline
.get_rel_size(tag, self.lsn, false, self.ctx)
@@ -229,7 +234,10 @@ where
if nblocks == 0 {
let file_name = tag.to_segfile_name(0);
let header = new_tar_header(&file_name, 0)?;
self.ar.append(&header, &mut io::empty()).await?;
self.ar
.append(&header, &mut io::empty())
.await
.context("could not write empty relfile to tar stream")?;
return Ok(());
}
@@ -249,7 +257,10 @@ where
let file_name = tag.to_segfile_name(seg as u32);
let header = new_tar_header(&file_name, segment_data.len() as u64)?;
self.ar.append(&header, segment_data.as_slice()).await?;
self.ar
.append(&header, segment_data.as_slice())
.await
.context("could not write relfile segment to tar stream")?;
seg += 1;
startblk = endblk;

View File

@@ -13,8 +13,8 @@ use tracing::*;
use metrics::set_build_info_metric;
use pageserver::{
config::{defaults::*, PageServerConf},
context::{DownloadBehavior, RequestContext, TaskKind},
http, page_cache, page_service, task_mgr,
task_mgr::TaskKind,
task_mgr::{
BACKGROUND_RUNTIME, COMPUTE_REQUEST_RUNTIME, MGMT_REQUEST_RUNTIME, WALRECEIVER_RUNTIME,
},
@@ -303,62 +303,79 @@ fn start_pageserver(conf: &'static PageServerConf) -> anyhow::Result<()> {
{
let _rt_guard = MGMT_REQUEST_RUNTIME.enter();
let mgmt_ctx = RequestContext::new(TaskKind::HttpEndpointListener, DownloadBehavior::Error);
let cancellation_token = Box::leak(Box::new(mgmt_ctx.cancellation_token().clone()));
let router = http::make_router(conf, auth.clone(), remote_storage)?
.build()
.map_err(|err| anyhow!(err))?;
let service = utils::http::RouterService::new(router).unwrap();
let server = hyper::Server::from_tcp(http_listener)?
.serve(service)
.with_graceful_shutdown(task_mgr::shutdown_watcher());
.with_graceful_shutdown(cancellation_token.cancelled());
task_mgr::spawn(
MGMT_REQUEST_RUNTIME.handle(),
TaskKind::HttpEndpointListener,
None,
None,
"http endpoint listener",
true,
async {
server.await?;
Ok(())
match server.await {
Ok(()) => info!("HTTP endpoint listener shut down"),
Err(err) => error!("HTTP endpoint listener shut down with error: {err:?}"),
}
},
);
}
if let Some(metric_collection_endpoint) = &conf.metric_collection_endpoint {
task_mgr::spawn(
MGMT_REQUEST_RUNTIME.handle(),
TaskKind::MetricsCollection,
None,
None,
"consumption metrics collection",
true,
async move {
pageserver::consumption_metrics::collect_metrics(
metric_collection_endpoint,
conf.metric_collection_interval,
conf.id,
)
.instrument(info_span!("metrics_collection"))
.await?;
Ok(())
},
);
}
if let Some(metric_collection_endpoint) = &conf.metric_collection_endpoint {
let metrics_ctx = RequestContext::new(
TaskKind::MetricsCollection,
DownloadBehavior::Error, // metrics collector shouldn't be downloading anything
);
task_mgr::spawn(
MGMT_REQUEST_RUNTIME.handle(),
"consumption metrics collection",
true,
pageserver::consumption_metrics::collect_metrics(
metric_collection_endpoint,
conf.metric_collection_interval,
conf.id,
metrics_ctx,
)
.instrument(info_span!("metrics_collection")),
);
}
// Spawn a task to listen for libpq connections. It will spawn further tasks
// for each connection. We created the listener earlier already.
task_mgr::spawn(
COMPUTE_REQUEST_RUNTIME.handle(),
TaskKind::LibpqEndpointListener,
None,
None,
"libpq endpoint listener",
true,
async move {
page_service::libpq_listener_main(conf, auth, pageserver_listener, conf.auth_type).await
},
);
{
let libpq_ctx = RequestContext::new(
TaskKind::LibpqEndpointListener,
// listener task shouldn't need to download anything. (We will
// create a separate sub-contexts for each connection, with their
// own download behavior. This context is used only to listen and
// accept connections.)
DownloadBehavior::Error,
);
task_mgr::spawn(
COMPUTE_REQUEST_RUNTIME.handle(),
"libpq endpoint listener",
true,
async move {
match page_service::libpq_listener_main(
conf,
auth,
pageserver_listener,
conf.auth_type,
libpq_ctx,
)
.await
{
Ok(()) => info!("libpq endpoint listener shut down"),
Err(err) => error!("libpq endpoint listener shut down with error: {err:?}"),
}
},
);
}
// All started up! Now just sit and wait for shutdown signal.
signals.handle(|signal| match signal {
@@ -375,7 +392,7 @@ fn start_pageserver(conf: &'static PageServerConf) -> anyhow::Result<()> {
"Got {}. Terminating gracefully in fast shutdown mode",
signal.name()
);
BACKGROUND_RUNTIME.block_on(pageserver::shutdown_pageserver(0));
BACKGROUND_RUNTIME.block_on(task_mgr::shutdown_pageserver(0));
unreachable!()
}
})

View File

@@ -9,9 +9,8 @@ use tracing::*;
use utils::id::NodeId;
use utils::id::TimelineId;
use crate::task_mgr;
use crate::context::RequestContext;
use crate::tenant::mgr;
use pageserver_api::models::TenantState;
use utils::id::TenantId;
use serde::{Deserialize, Serialize};
@@ -138,12 +137,13 @@ struct EventChunk<'a> {
events: &'a [ConsumptionMetric],
}
/// Main thread that serves metrics collection
/// Main task that serves metrics collection
pub async fn collect_metrics(
metric_collection_endpoint: &Url,
metric_collection_interval: Duration,
node_id: NodeId,
) -> anyhow::Result<()> {
metrics_ctx: RequestContext,
) {
let mut ticker = tokio::time::interval(metric_collection_interval);
info!("starting collect_metrics");
@@ -154,12 +154,15 @@ pub async fn collect_metrics(
loop {
tokio::select! {
_ = task_mgr::shutdown_watcher() => {
_ = metrics_ctx.cancelled() => {
info!("collect_metrics received cancellation request");
return Ok(());
return;
},
_ = ticker.tick() => {
collect_metrics_task(&client, &mut cached_metrics, metric_collection_endpoint, node_id).await?;
if let Err(err) = collect_metrics_task(&client, &mut cached_metrics, metric_collection_endpoint, node_id, &metrics_ctx).await {
// Log the error and continue
error!("metrics collection failed: {err:?}");
}
}
}
}
@@ -174,6 +177,7 @@ pub async fn collect_metrics_task(
cached_metrics: &mut HashMap<ConsumptionMetricsKey, u64>,
metric_collection_endpoint: &reqwest::Url,
node_id: NodeId,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let mut current_metrics: Vec<(ConsumptionMetricsKey, u64)> = Vec::new();
trace!(
@@ -186,19 +190,27 @@ pub async fn collect_metrics_task(
// iterate through list of Active tenants and collect metrics
for (tenant_id, tenant_state) in tenants {
if tenant_state != TenantState::Active {
if ctx.is_cancelled() {
continue;
}
let tenant = mgr::get_tenant(tenant_id, true).await?;
let tenant = mgr::get_tenant(tenant_id).await?;
// If the tenant was shut down while while we were looking elsewhere, skip it.
let tenant_ctx = match tenant.get_context(ctx) {
Ok(ctx) => ctx,
Err(_state) => {
debug!(
"skipping metrics collection for tenant {tenant_id} because it is not active"
);
continue;
}
};
let mut tenant_resident_size = 0;
// iterate through list of timelines in tenant
for timeline in tenant.list_timelines().iter() {
// collect per-timeline metrics only for active timelines
let timeline_ctx = timeline.get_context();
if timeline.is_active() {
if let Ok(timeline_ctx) = timeline.get_context(&tenant_ctx) {
let timeline_written_size = u64::from(timeline.get_last_record_lsn());
current_metrics.push((

View File

@@ -1,12 +1,341 @@
//!
//! Most async functions throughout the pageserver take a `ctx: &RequestContext`
//! argument. Currently, it's just a placeholder, but in upcoming commit, it
//! will be used for cancellation, and to ensure that a Tenant or Timeline isn't
//! removed while there are still tasks operating on it.
//! argument. It is used to control desired behaviour of the operation, and to
//! allow cancelling the operation gracefully.
//!
//! # Context hierarchy
//!
//! RequestContext's form a hierarchy. For example:
//!
//! listener context (LibpqEndpointListener)
//! connection context (PageRequestHandler)
//! per-request context (PageRequestHandler)
//!
//! The top "listener context" is created at pageserver startup. The tokio
//! task that listens on the libpq protocol TCP port holds that context. When
//! it accepts a connection, it spawns a new task to handle that connection
//! and creates a new per-connection context for it. The mgmt API listener,
//! background jobs, and other things form separate but similar hierarchies.
//!
//! Usually, each tokio task has its own context, but it's not a strict
//! requirement and some tasks can hold multiple contexts, and converesely,
//! some contexts are shared by multiple tasks that work together to perform
//! some operation.
//!
//! The hierarchy is not explictly tracked in the RequestContext struct
//! itself, but only by their cancellation tokens. It's entirely possible for
//! the parent context to be dropped before its children.
//!
//! # Tenant and Timeline registration
//!
//! Most operations are performed on a particular Tenant or Timeline. When
//! operating on a Tenant or Timeline, it's important that the Tenant/Timeline
//! isn't detached or deleted while there are tasks working on it. To ensure
//! that, a RequestContext can be registered with a Tenant or Timeline. See
//! `Tenant::register_context` and `Timeline::register_context` When
//! shutting down a Tenant or Timeline, the shutdown routine cancels all the
//! registered contexts, and waits for them to be dropped before completing
//! the shutdown.
//!
//! To enforce that you hold a registered context when operating on a Tenant
//! or Timeline, most functions take a TimelineRequestContext or
//! TenantRequestContext reference as argument.
//!
//! NOTE: The Tenant / Timeline registration is separate from the context
//! hierarchy. You can create a new RequestContext with TimelineRequestContext
//! as the parent, and register it with a different timeline, for example.
//!
//! # Notes
//!
//! All RequestContexts in the system have a unique ID, and are also tracked
//! in a global hash table, CONTEXTS.
//!
//! - Futures are normally not assumed to be async cancellation-safe. Pass a
//! RequestContext as argument and use cancel() on it instead.
//!
//! - If you perform an operation that depends on some external actor or the
//! network, use the cancellation token to check for cancellation
//!
//! - By convention, the appropriate context for current operation is carried in
//! a variable called 'ctx'. If a function handles multiple contexts, it's
//! best to *not* have a variable called 'ctx', to force you to think which
//! one to use in each call.
//!
//! # TODO
//! - include a unique request ID for tracing
//!
pub struct RequestContext {}
use once_cell::sync::Lazy;
use tokio_util::sync::CancellationToken;
use tracing::{info, warn};
impl RequestContext {
pub fn new() -> Self {
RequestContext {}
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Mutex;
/// Each RequestContext has a unique context ID. It's just an increasing
/// number that we assign.
static NEXT_CONTEXT_ID: AtomicU64 = AtomicU64::new(1);
/// Global registry of contexts
static CONTEXTS: Lazy<Mutex<HashMap<RequestContextId, (TaskKind, CancellationToken)>>> =
Lazy::new(|| Mutex::new(HashMap::new()));
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
pub struct RequestContextId(u64);
///
pub struct RequestContext {
context_id: RequestContextId,
task_kind: TaskKind,
download_behavior: DownloadBehavior,
cancellation_token: CancellationToken,
}
/// DownloadBehavior option specifies the behavior if completing the operation
/// would require downloading a layer file from remote storage.
#[derive(Clone, Copy, PartialEq, Eq)]
pub enum DownloadBehavior {
/// Download the layer file. It can take a while.
Download,
/// Download the layer file, but print a warning to the log. This should be used
/// in code where the layer file is expected to already exist locally.
Warn,
/// Return a PageReconstructError::NeedsDownload error
Error,
}
///
/// There are many kinds of tasks in the system. Some are associated with a particular
/// tenant or timeline, while others are global.
///
/// Note that we don't try to limit how many task of a certain kind can be running
/// at the same time.
///
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum TaskKind {
// libpq listener task. It just accepts connection and spawns a
// PageRequestHandler task for each connection.
LibpqEndpointListener,
// HTTP endpoint listener.
HttpEndpointListener,
// Task that handles a single connection. A PageRequestHandler task
// starts detached from any particular tenant or timeline, but it can be
// associated with one later, after receiving a command from the client.
PageRequestHandler,
// Context for one management API request
MgmtRequest,
// Manages the WAL receiver connection for one timeline. It subscribes to
// events from storage_broker, decides which safekeeper to connect to. It spawns a
// separate WalReceiverConnection task to handle each connection.
WalReceiverManager,
// Handles a connection to a safekeeper, to stream WAL to a timeline.
WalReceiverConnection,
// Garbage collection worker. One per tenant
GarbageCollector,
// Compaction. One per tenant.
Compaction,
// Initial logical size calculation
InitialLogicalSizeCalculation,
// Task that flushes frozen in-memory layers to disk
LayerFlush,
// Task that uploads a file to remote storage
RemoteUploadTask,
// Task that downloads a file from remote storage
RemoteDownloadTask,
// task that handles the initial downloading of all tenants
InitialLoad,
// task that handles attaching a tenant
Attach,
// task that handles metrics collection
MetricsCollection,
// task that drives downloading layers
DownloadAllRemoteLayers,
// Only used in unit tests
UnitTest,
}
impl Drop for RequestContext {
fn drop(&mut self) {
CONTEXTS
.lock()
.unwrap()
.remove(&self.context_id)
.expect("context is not in global registry");
}
}
impl RequestContext {
/// Create a new RequestContext
pub fn new(task_kind: TaskKind, download_behavior: DownloadBehavior) -> Self {
let cancellation_token = CancellationToken::new();
let context_id = RequestContextId(NEXT_CONTEXT_ID.fetch_add(1, Ordering::Relaxed));
CONTEXTS
.lock()
.unwrap()
.insert(context_id, (task_kind, cancellation_token.clone()));
RequestContext {
task_kind,
context_id,
download_behavior,
cancellation_token,
}
}
/// Create a new RequestContext, as a child of 'parent'.
pub fn with_parent(
task_kind: TaskKind,
download_behavior: DownloadBehavior,
parent: &RequestContext,
) -> Self {
let cancellation_token = parent.cancellation_token.child_token();
let context_id = RequestContextId(NEXT_CONTEXT_ID.fetch_add(1, Ordering::Relaxed));
CONTEXTS
.lock()
.unwrap()
.insert(context_id, (task_kind, cancellation_token.clone()));
RequestContext {
task_kind,
context_id,
download_behavior,
cancellation_token,
}
}
pub fn context_id(&self) -> RequestContextId {
self.context_id
}
pub fn task_kind(&self) -> TaskKind {
self.task_kind
}
pub fn download_behavior(&self) -> DownloadBehavior {
self.download_behavior
}
pub fn cancellation_token(&self) -> &CancellationToken {
&self.cancellation_token
}
pub fn is_cancelled(&self) -> bool {
self.cancellation_token.is_cancelled()
}
pub async fn cancelled(&self) {
self.cancellation_token.cancelled().await
}
}
///
/// Cancel all the contexts in 'context_ids' and wait for them to finish.
///
/// Whenever we notice that one of the contexts has finished, it is removed
/// from 'context_ids'. On return, it is empty.
///
pub async fn cancel_and_wait(context_ids: &mut Vec<RequestContextId>) {
{
let contexts = CONTEXTS.lock().unwrap();
context_ids.retain(|context_id| {
if let Some((task_kind, cancellation_token)) = contexts.get(context_id) {
info!("cancelling task {task_kind:?} with ID {context_id:?}");
cancellation_token.cancel();
true
} else {
// Already gone
false
}
});
}
wait_contexts_to_finish(context_ids).await
}
async fn wait_contexts_to_finish(context_ids: &mut Vec<RequestContextId>) {
let mut n = 0;
while !context_ids.is_empty() {
{
let contexts = CONTEXTS.lock().unwrap();
while let Some(context_id) = context_ids.last() {
if let Some((task_kind, _cancellation_token)) = contexts.get(context_id) {
info!("waiting for task {task_kind:?} with ID {context_id:?} to finish");
break;
} else {
context_ids.pop();
}
}
}
if !context_ids.is_empty() {
crate::exponential_backoff(
n,
crate::DEFAULT_BASE_BACKOFF_SECONDS,
crate::DEFAULT_MAX_BACKOFF_SECONDS,
)
.await;
n += 1;
}
}
}
/// Cancel and wait for all tasks of given 'kind' to finish
pub async fn shutdown_tasks(kind: TaskKind) {
let mut context_ids = Vec::new();
{
let contexts = CONTEXTS.lock().unwrap();
for (&context_id, (task_kind, cancellation_token)) in contexts.iter() {
if *task_kind == kind {
cancellation_token.cancel();
context_ids.push(context_id);
}
}
}
wait_contexts_to_finish(&mut context_ids).await
}
/// Cancel all remaining contexts.
///
/// This is used as part of pageserver shutdown. We have already shut down all
/// tasks / contexts, this is just a backstop or sanity check to make sure we
/// didn't miss anything. Hence, also print a warning for any remaining tasks.
pub async fn shutdown_all_tasks() {
loop {
let mut context_ids = Vec::new();
{
let contexts = CONTEXTS.lock().unwrap();
if contexts.is_empty() {
return;
}
for (&context_id, (task_kind, cancellation_token)) in contexts.iter() {
cancellation_token.cancel();
context_ids.push(context_id);
warn!(
"unexpected task of kind {:?} with ID {:?} still running",
*task_kind, context_id
);
}
}
wait_contexts_to_finish(&mut context_ids).await
}
}

View File

@@ -4,14 +4,13 @@ use anyhow::{anyhow, Context, Result};
use hyper::StatusCode;
use hyper::{Body, Request, Response, Uri};
use remote_storage::GenericRemoteStorage;
use tokio_util::sync::CancellationToken;
use tracing::*;
use super::models::{
StatusResponse, TenantConfigRequest, TenantCreateRequest, TenantCreateResponse, TenantInfo,
TimelineCreateRequest, TimelineInfo,
};
use crate::context::RequestContext;
use crate::context::{DownloadBehavior, RequestContext, TaskKind};
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::tenant::config::TenantConfOpt;
use crate::tenant::{PageReconstructError, Timeline, TimelineRequestContext};
@@ -81,6 +80,16 @@ fn check_permission(request: &Request<Body>, tenant_id: Option<TenantId>) -> Res
fn apierror_from_prerror(err: PageReconstructError) -> ApiError {
match err {
PageReconstructError::Other(err) => ApiError::InternalServerError(err),
PageReconstructError::NeedsDownload(_, _) => {
// This shouldn't happen, because we use a RequestContext that requests to
// download any missing layer files on-demand.
ApiError::InternalServerError(anyhow::anyhow!(
"would need to download remote layer file"
))
}
PageReconstructError::Cancelled => {
ApiError::InternalServerError(anyhow::anyhow!("request was cancelled"))
}
PageReconstructError::WalRedo(err) => {
ApiError::InternalServerError(anyhow::Error::new(err))
}
@@ -91,29 +100,26 @@ fn apierror_from_prerror(err: PageReconstructError) -> ApiError {
async fn build_timeline_info(
timeline: &Arc<Timeline>,
include_non_incremental_logical_size: bool,
ctx: &TimelineRequestContext,
ctx: Option<&TimelineRequestContext>,
) -> anyhow::Result<TimelineInfo> {
let mut info = build_timeline_info_common(timeline, ctx)?;
if include_non_incremental_logical_size {
// XXX we should be using spawn_ondemand_logical_size_calculation here.
// Otherwise, if someone deletes the timeline / detaches the tenant while
// we're executing this function, we will outlive the timeline on-disk state.
info.current_logical_size_non_incremental = Some(
timeline
.get_current_logical_size_non_incremental(
info.last_record_lsn,
CancellationToken::new(),
ctx,
)
.await?,
);
if let Some(ctx) = ctx {
info.current_logical_size_non_incremental = Some(
timeline
.get_current_logical_size_non_incremental(info.last_record_lsn, ctx)
.await?,
);
} else {
info!("could not calculate non-incremental size for timeline because it is not active");
}
}
Ok(info)
}
fn build_timeline_info_common(
timeline: &Arc<Timeline>,
ctx: &TimelineRequestContext,
ctx: Option<&TimelineRequestContext>,
) -> anyhow::Result<TimelineInfo> {
let last_record_lsn = timeline.get_last_record_lsn();
let (wal_source_connstr, last_received_msg_lsn, last_received_msg_ts) = {
@@ -134,12 +140,16 @@ fn build_timeline_info_common(
Lsn(0) => None,
lsn @ Lsn(_) => Some(lsn),
};
let current_logical_size = match timeline.get_current_logical_size(ctx) {
Ok((size, _)) => Some(size),
Err(err) => {
error!("Timeline info creation failed to get current logical size: {err:?}");
None
let current_logical_size = if let Some(ctx) = ctx {
match timeline.get_current_logical_size(ctx) {
Ok((size, _)) => Some(size),
Err(err) => {
error!("Timeline info creation failed to get current logical size: {err:?}");
None
}
}
} else {
None
};
let current_physical_size = Some(timeline.layer_size_sum().approximate_is_ok());
let state = timeline.current_state();
@@ -185,10 +195,11 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
.new_timeline_id
.unwrap_or_else(TimelineId::generate);
let tenant = mgr::get_tenant(tenant_id, true)
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let (tenant, tenant_ctx) = mgr::get_active_tenant(tenant_id, &ctx)
.await
.map_err(ApiError::NotFound)?;
let tenant_ctx = tenant.get_context();
match tenant.create_timeline(
new_timeline_id,
request_data.ancestor_timeline_id.map(TimelineId::from),
@@ -198,10 +209,9 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
)
.instrument(info_span!("timeline_create", tenant = %tenant_id, new_timeline = ?request_data.new_timeline_id, timeline_id = %new_timeline_id, lsn=?request_data.ancestor_start_lsn, pg_version=?request_data.pg_version))
.await {
Ok(Some(new_timeline)) => {
Ok(Some((new_timeline, timeline_ctx))) => {
// Created. Construct a TimelineInfo for it.
let timeline_ctx = new_timeline.get_context();
let timeline_info = build_timeline_info_common(&new_timeline, &timeline_ctx)
let timeline_info = build_timeline_info_common(&new_timeline, Some(&timeline_ctx))
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::CREATED, timeline_info)
}
@@ -216,19 +226,21 @@ async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>,
query_param_present(&request, "include-non-incremental-logical-size");
check_permission(&request, Some(tenant_id))?;
let top_ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let response_data = async {
let tenant = mgr::get_tenant(tenant_id, true)
let (tenant, tenant_ctx) = mgr::get_active_tenant(tenant_id, &top_ctx)
.await
.map_err(ApiError::NotFound)?;
let timelines = tenant.list_timelines();
let mut response_data = Vec::with_capacity(timelines.len());
for timeline in timelines {
let timeline_ctx = timeline.get_context();
let timeline_ctx = timeline.get_context(&tenant_ctx).ok();
let timeline_info = build_timeline_info(
&timeline,
include_non_incremental_logical_size,
&timeline_ctx,
timeline_ctx.as_ref(),
)
.await
.context("Failed to convert tenant timeline {timeline_id} into the local one: {e:?}")
@@ -281,20 +293,22 @@ async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body
query_param_present(&request, "include-non-incremental-logical-size");
check_permission(&request, Some(tenant_id))?;
let top_ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let timeline_info = async {
let tenant = mgr::get_tenant(tenant_id, true)
let (tenant, tenant_ctx) = mgr::get_active_tenant(tenant_id, &top_ctx)
.await
.map_err(ApiError::NotFound)?;
let timeline = tenant
.get_timeline(timeline_id, false)
.get_timeline(timeline_id)
.map_err(ApiError::NotFound)?;
let timeline_ctx = timeline.get_context();
let timeline_ctx = timeline.get_context(&tenant_ctx).ok();
let timeline_info = build_timeline_info(
&timeline,
include_non_incremental_logical_size,
&timeline_ctx,
timeline_ctx.as_ref(),
)
.await
.context("Failed to get local timeline info: {e:#}")
@@ -319,13 +333,17 @@ async fn get_lsn_by_timestamp_handler(request: Request<Body>) -> Result<Response
.map_err(ApiError::BadRequest)?;
let timestamp_pg = postgres_ffi::to_pg_timestamp(timestamp);
let timeline = mgr::get_tenant(tenant_id, true)
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let (tenant, ctx) = mgr::get_active_tenant(tenant_id, &ctx)
.await
.and_then(|tenant| tenant.get_timeline(timeline_id, true))
.map_err(ApiError::NotFound)?;
let timeline_ctx = timeline.get_context();
let (timeline, ctx) = tenant
.get_active_timeline(timeline_id, &ctx)
.map_err(ApiError::NotFound)?;
let result = timeline
.find_lsn_for_timestamp(timestamp_pg, &timeline_ctx)
.find_lsn_for_timestamp(timestamp_pg, &ctx)
.await
.map_err(apierror_from_prerror)?;
@@ -367,7 +385,8 @@ async fn timeline_delete_handler(request: Request<Body>) -> Result<Response<Body
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_id))?;
let ctx = RequestContext::new();
// deleting shouldn't require downloading anything
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
mgr::delete_timeline(tenant_id, timeline_id, &ctx)
.instrument(info_span!("timeline_delete", tenant = %tenant_id, timeline = %timeline_id))
@@ -447,8 +466,10 @@ async fn tenant_status(request: Request<Body>) -> Result<Response<Body>, ApiErro
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
let mut _req_ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let tenant_info = async {
let tenant = mgr::get_tenant(tenant_id, false).await?;
let tenant = mgr::get_tenant(tenant_id).await?;
// Calculate total physical size of all timelines
let mut current_physical_size = 0;
@@ -475,10 +496,11 @@ async fn tenant_size_handler(request: Request<Body>) -> Result<Response<Body>, A
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
let tenant = mgr::get_tenant(tenant_id, true)
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let (tenant, ctx) = mgr::get_active_tenant(tenant_id, &ctx)
.await
.map_err(ApiError::InternalServerError)?;
let ctx = tenant.get_context();
// this can be long operation, it currently is not backed by any request coalescing or similar
let inputs = tenant
@@ -525,7 +547,7 @@ fn bad_duration<'a>(field_name: &'static str, value: &'a str) -> impl 'a + Fn()
async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permission(&request, None)?;
let ctx = RequestContext::new();
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let request_data: TenantCreateRequest = json_request(&mut request).await?;
@@ -615,7 +637,7 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
Some(tenant) => {
// We created the tenant. Existing API semantics are that the tenant
// is Active when this function returns.
if let res @ Err(_) = tenant.wait_to_become_active(&ctx).await {
if let res @ Err(_) = tenant.wait_to_become_active(ctx).await {
// This shouldn't happen because we just created the tenant directory
// in tenant::mgr::create_tenant, and there aren't any remote timelines
// to load, so, nothing can really fail during load.
@@ -639,7 +661,7 @@ async fn tenant_config_handler(mut request: Request<Body>) -> Result<Response<Bo
let tenant_id = request_data.tenant_id;
check_permission(&request, Some(tenant_id))?;
let ctx = RequestContext::new();
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let mut tenant_conf: TenantConfOpt = Default::default();
if let Some(gc_period) = request_data.gc_period {
@@ -755,11 +777,21 @@ async fn timeline_gc_handler(mut request: Request<Body>) -> Result<Response<Body
let gc_req: TimelineGcRequest = json_request(&mut request).await?;
let wait_task_done = mgr::immediate_gc(tenant_id, timeline_id, gc_req).await?;
let gc_result = wait_task_done
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let (tenant, ctx) = mgr::get_active_tenant(tenant_id, &ctx)
.await
.map_err(ApiError::NotFound)?;
let gc_horizon = gc_req.gc_horizon.unwrap_or_else(|| tenant.get_gc_horizon());
// Use tenant's pitr setting
let pitr = tenant.get_pitr_interval();
fail::fail_point!("immediate_gc_task_pre");
let gc_result = tenant
.gc_iteration(Some(timeline_id), gc_horizon, pitr, &ctx)
.instrument(info_span!("manual_gc", tenant = %tenant_id, timeline = %timeline_id))
.await
.context("wait for gc task")
.map_err(ApiError::InternalServerError)?
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, gc_result)
@@ -772,16 +804,18 @@ async fn timeline_compact_handler(request: Request<Body>) -> Result<Response<Bod
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_id))?;
let result_receiver = mgr::immediate_compact(tenant_id, timeline_id)
.await
.context("spawn compaction task")
.map_err(ApiError::InternalServerError)?;
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let result: anyhow::Result<()> = result_receiver
let (tenant, ctx) = mgr::get_active_tenant(tenant_id, &ctx)
.await
.map_err(ApiError::NotFound)?;
let (timeline, ctx) = tenant
.get_active_timeline(timeline_id, &ctx)
.map_err(ApiError::NotFound)?;
timeline
.compact(&ctx)
.await
.context("receive compaction result")
.map_err(ApiError::InternalServerError)?;
result.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, ())
}
@@ -793,13 +827,14 @@ async fn timeline_checkpoint_handler(request: Request<Body>) -> Result<Response<
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_id))?;
let tenant = mgr::get_tenant(tenant_id, true)
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let (tenant, ctx) = mgr::get_active_tenant(tenant_id, &ctx)
.await
.map_err(ApiError::NotFound)?;
let timeline = tenant
.get_timeline(timeline_id, true)
let (timeline, ctx) = tenant
.get_active_timeline(timeline_id, &ctx)
.map_err(ApiError::NotFound)?;
let ctx = timeline.get_context();
timeline
.freeze_and_flush()
.await
@@ -819,13 +854,14 @@ async fn timeline_download_remote_layers_handler_post(
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_id))?;
let tenant = mgr::get_tenant(tenant_id, true)
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let (tenant, ctx) = mgr::get_active_tenant(tenant_id, &ctx)
.await
.map_err(ApiError::NotFound)?;
let timeline = tenant
.get_timeline(timeline_id, true)
let (timeline, ctx) = tenant
.get_active_timeline(timeline_id, &ctx)
.map_err(ApiError::NotFound)?;
let ctx = timeline.get_context();
match timeline.spawn_download_all_remote_layers(&ctx).await {
Ok(st) => json_response(StatusCode::ACCEPTED, st),
Err(st) => json_response(StatusCode::CONFLICT, st),
@@ -839,11 +875,13 @@ async fn timeline_download_remote_layers_handler_get(
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_id))?;
let tenant = mgr::get_tenant(tenant_id, true)
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let (tenant, ctx) = mgr::get_active_tenant(tenant_id, &ctx)
.await
.map_err(ApiError::NotFound)?;
let timeline = tenant
.get_timeline(timeline_id, true)
let (timeline, _ctx) = tenant
.get_active_timeline(timeline_id, &ctx)
.map_err(ApiError::NotFound)?;
let info = timeline
.get_download_all_remote_layers_task_info()

View File

@@ -261,7 +261,6 @@ async fn import_wal(
endpoint: Lsn,
ctx: &TimelineRequestContext,
) -> anyhow::Result<()> {
use std::io::Read;
let mut waldecoder = WalStreamDecoder::new(startpoint, tline.pg_version);
let mut segno = startpoint.segment_number(WAL_SEGMENT_SIZE);
@@ -291,6 +290,7 @@ async fn import_wal(
file.seek(std::io::SeekFrom::Start(offset as u64))?;
}
use std::io::Read;
let nread = file.read_to_end(&mut buf)?;
if nread != WAL_SEGMENT_SIZE - offset {
// Maybe allow this for .partial files?

View File

@@ -22,7 +22,6 @@ pub mod walredo;
use std::path::Path;
use crate::task_mgr::TaskKind;
use tracing::info;
/// Current storage format version
@@ -42,35 +41,6 @@ pub const DELTA_FILE_MAGIC: u16 = 0x5A61;
static ZERO_PAGE: bytes::Bytes = bytes::Bytes::from_static(&[0u8; 8192]);
pub async fn shutdown_pageserver(exit_code: i32) {
// Shut down the libpq endpoint task. This prevents new connections from
// being accepted.
task_mgr::shutdown_tasks(Some(TaskKind::LibpqEndpointListener), None, None).await;
// Shut down any page service tasks.
task_mgr::shutdown_tasks(Some(TaskKind::PageRequestHandler), None, None).await;
// Shut down all the tenants. This flushes everything to disk and kills
// the checkpoint and GC tasks.
tenant::mgr::shutdown_all_tenants().await;
// Stop syncing with remote storage.
//
// FIXME: Does this wait for the sync tasks to finish syncing what's queued up?
// Should it?
task_mgr::shutdown_tasks(Some(TaskKind::RemoteUploadTask), None, None).await;
// Shut down the HTTP endpoint last, so that you can still check the server's
// status while it's shutting down.
// FIXME: We should probably stop accepting commands like attach/detach earlier.
task_mgr::shutdown_tasks(Some(TaskKind::HttpEndpointListener), None, None).await;
// There should be nothing left, but let's be sure
task_mgr::shutdown_tasks(None, None, None).await;
info!("Shut down successfully completed");
std::process::exit(exit_code);
}
const DEFAULT_BASE_BACKOFF_SECONDS: f64 = 0.1;
const DEFAULT_MAX_BACKOFF_SECONDS: f64 = 3.0;

View File

@@ -43,13 +43,12 @@ use utils::{
use crate::auth::check_permission;
use crate::basebackup;
use crate::config::PageServerConf;
use crate::context::RequestContext;
use crate::context::{DownloadBehavior, RequestContext, TaskKind};
use crate::import_datadir::import_wal_from_tar;
use crate::metrics::{LIVE_CONNECTIONS_COUNT, SMGR_QUERY_TIME};
use crate::task_mgr;
use crate::task_mgr::TaskKind;
use crate::tenant::mgr;
use crate::tenant::{Tenant, Timeline, TimelineRequestContext};
use crate::tenant::{Tenant, TenantRequestContext, Timeline, TimelineRequestContext};
use crate::trace::Tracer;
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
@@ -64,7 +63,7 @@ fn copyin_stream<'a>(
let msg = tokio::select! {
biased;
_ = task_mgr::shutdown_watcher() => {
_ = ctx.cancelled() => {
// We were requested to shut down.
let msg = format!("pageserver is shutting down");
let _ = pgb.write_message(&BeMessage::ErrorResponse(&msg, None));
@@ -127,6 +126,7 @@ pub async fn libpq_listener_main(
auth: Option<Arc<JwtAuth>>,
listener: TcpListener,
auth_type: AuthType,
listener_ctx: RequestContext,
) -> anyhow::Result<()> {
listener.set_nonblocking(true)?;
let tokio_listener = tokio::net::TcpListener::from_std(listener)?;
@@ -135,8 +135,9 @@ pub async fn libpq_listener_main(
while let Some(res) = tokio::select! {
biased;
_ = task_mgr::shutdown_watcher() => {
_ = listener_ctx.cancelled() => {
// We were requested to shut down.
info!("libpq listener shutting down");
None
}
@@ -150,7 +151,11 @@ pub async fn libpq_listener_main(
debug!("accepted connection from {}", peer_addr);
let local_auth = auth.clone();
let connection_ctx = RequestContext::new();
let connection_ctx = RequestContext::with_parent(
TaskKind::PageRequestHandler,
DownloadBehavior::Download,
&listener_ctx,
);
// PageRequestHandler tasks are not associated with any particular
// timeline in the task manager. In practice most connections will
@@ -158,12 +163,21 @@ pub async fn libpq_listener_main(
// yet.
task_mgr::spawn(
&tokio::runtime::Handle::current(),
TaskKind::PageRequestHandler,
None,
None,
"serving compute connection task",
false,
page_service_conn_main(conf, local_auth, socket, auth_type, connection_ctx),
async move {
if let Err(err) = page_service_conn_main(
conf,
local_auth,
socket,
auth_type,
connection_ctx,
)
.await
{
error!("connection handler exited with error: {err:?}");
}
},
);
}
Err(err) => {
@@ -198,11 +212,13 @@ async fn page_service_conn_main(
.set_nodelay(true)
.context("could not set TCP_NODELAY")?;
let cancellation_token = connection_ctx.cancellation_token().clone();
let mut conn_handler = PageServerHandler::new(conf, auth, connection_ctx);
let pgbackend = PostgresBackend::new(socket, auth_type, None)?;
let result = pgbackend
.run(&mut conn_handler, task_mgr::shutdown_watcher)
.run(&mut conn_handler, || cancellation_token.cancelled())
.await;
match result {
Ok(()) => {
@@ -287,12 +303,9 @@ impl PageServerHandler {
tenant_id: TenantId,
timeline_id: TimelineId,
) -> anyhow::Result<()> {
// NOTE: pagerequests handler exits when connection is closed,
// so there is no need to reset the association
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
let (tenant, ctx) = get_active_tenant_with_timeout(tenant_id, &self.connection_ctx).await?;
// Make request tracer if needed
let tenant = get_active_tenant_with_timeout(tenant_id, &self.connection_ctx).await?;
let mut tracer = if tenant.get_trace_read_requests() {
let connection_id = ConnectionId::generate();
let path = tenant
@@ -304,8 +317,7 @@ impl PageServerHandler {
};
// Check that the timeline exists
let timeline = tenant.get_timeline(timeline_id, true)?;
let ctx = timeline.get_context();
let (timeline, ctx) = tenant.get_active_timeline(timeline_id, &ctx)?;
// switch client to COPYBOTH
pgb.write_message(&BeMessage::CopyBothResponse)?;
@@ -317,7 +329,7 @@ impl PageServerHandler {
let msg = tokio::select! {
biased;
_ = task_mgr::shutdown_watcher() => {
_ = ctx.cancelled() => {
// We were requested to shut down.
info!("shutdown request received in page handler");
break;
@@ -344,6 +356,9 @@ impl PageServerHandler {
let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
// TODO: We could create a new per-request context here, with unique ID.
// Currently we use the same per-timeline context for all requests
let response = match neon_fe_msg {
PagestreamFeMessage::Exists(req) => {
let _timer = metrics.get_rel_exists.start_timer();
@@ -382,7 +397,7 @@ impl PageServerHandler {
#[instrument(skip(self, pgb))]
async fn handle_import_basebackup(
&self,
&mut self,
pgb: &mut PostgresBackend,
tenant_id: TenantId,
timeline_id: TimelineId,
@@ -390,11 +405,10 @@ impl PageServerHandler {
_end_lsn: Lsn,
pg_version: u32,
) -> Result<(), QueryError> {
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
// Create empty timeline
info!("creating new timeline");
let tenant = get_active_tenant_with_timeout(tenant_id, &self.connection_ctx).await?;
let tenant_ctx = tenant.get_context();
let (tenant, tenant_ctx) =
get_active_tenant_with_timeout(tenant_id, &self.connection_ctx).await?;
let (timeline, ctx) =
tenant.create_empty_timeline(timeline_id, base_lsn, pg_version, &tenant_ctx)?;
@@ -439,18 +453,15 @@ impl PageServerHandler {
#[instrument(skip(self, pgb))]
async fn handle_import_wal(
&self,
&mut self,
pgb: &mut PostgresBackend,
tenant_id: TenantId,
timeline_id: TimelineId,
start_lsn: Lsn,
end_lsn: Lsn,
) -> Result<(), QueryError> {
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
let timeline =
let (timeline, ctx) =
get_active_timeline_with_timeout(tenant_id, timeline_id, &self.connection_ctx).await?;
let ctx = timeline.get_context();
let last_record_lsn = timeline.get_last_record_lsn();
if last_record_lsn != start_lsn {
return Err(QueryError::Other(
@@ -659,9 +670,8 @@ impl PageServerHandler {
full_backup: bool,
) -> anyhow::Result<()> {
// check that the timeline exists
let timeline =
let (timeline, ctx) =
get_active_timeline_with_timeout(tenant_id, timeline_id, &self.connection_ctx).await?;
let ctx = timeline.get_context();
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
if let Some(lsn) = lsn {
@@ -826,7 +836,7 @@ impl postgres_backend_async::Handler for PageServerHandler {
.with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
self.check_permission(Some(tenant_id))?;
let timeline =
let (timeline, _ctx) =
get_active_timeline_with_timeout(tenant_id, timeline_id, &self.connection_ctx)
.await?;
@@ -988,7 +998,8 @@ impl postgres_backend_async::Handler for PageServerHandler {
self.check_permission(Some(tenant_id))?;
let tenant = get_active_tenant_with_timeout(tenant_id, &self.connection_ctx).await?;
let (tenant, _ctx) =
get_active_tenant_with_timeout(tenant_id, &self.connection_ctx).await?;
pgb.write_message(&BeMessage::RowDescription(&[
RowDescriptor::int8_col(b"checkpoint_distance"),
RowDescriptor::int8_col(b"checkpoint_timeout"),
@@ -1042,17 +1053,22 @@ impl postgres_backend_async::Handler for PageServerHandler {
async fn get_active_tenant_with_timeout(
tenant_id: TenantId,
parent_ctx: &RequestContext,
) -> anyhow::Result<Arc<Tenant>> {
let tenant = mgr::get_tenant(tenant_id, false).await?;
) -> anyhow::Result<(Arc<Tenant>, TenantRequestContext)> {
let child_ctx = RequestContext::with_parent(
parent_ctx.task_kind(),
parent_ctx.download_behavior(),
parent_ctx,
);
let tenant = mgr::get_tenant(tenant_id).await?;
match tokio::time::timeout(
Duration::from_secs(30),
tenant.wait_to_become_active(parent_ctx),
tenant.wait_to_become_active(child_ctx),
)
.await
{
Ok(wait_result) => wait_result
// no .context(), the error message is good enough and some tests depend on it
.map(move |()| tenant),
Ok(Ok(ctx)) => Ok((tenant, ctx)),
Ok(Err(err)) => Err(err),
Err(_) => anyhow::bail!("Timeout waiting for tenant {tenant_id} to become Active"),
}
}
@@ -1062,8 +1078,8 @@ async fn get_active_timeline_with_timeout(
tenant_id: TenantId,
timeline_id: TimelineId,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Timeline>> {
) -> anyhow::Result<(Arc<Timeline>, TimelineRequestContext)> {
get_active_tenant_with_timeout(tenant_id, ctx)
.await
.and_then(|tenant| tenant.get_timeline(timeline_id, true))
.and_then(|(tenant, ctx)| tenant.get_active_timeline(timeline_id, &ctx))
}

View File

@@ -19,7 +19,6 @@ use postgres_ffi::{Oid, TimestampTz, TransactionId};
use serde::{Deserialize, Serialize};
use std::collections::{hash_map, HashMap, HashSet};
use std::ops::Range;
use tokio_util::sync::CancellationToken;
use tracing::{debug, trace, warn};
use utils::{bin_ser::BeSer, lsn::Lsn};
@@ -34,14 +33,6 @@ pub enum LsnForTimestamp {
NoData(Lsn),
}
#[derive(Debug, thiserror::Error)]
pub enum CalculateLogicalSizeError {
#[error("cancelled")]
Cancelled,
#[error(transparent)]
Other(#[from] anyhow::Error),
}
///
/// This impl provides all the functionality to store PostgreSQL relations, SLRUs,
/// and other special kinds of files, in a versioned key-value store. The
@@ -429,7 +420,8 @@ impl Timeline {
) -> Result<Bytes, PageReconstructError> {
let key = relmap_file_key(spcnode, dbnode);
self.get(key, lsn, ctx).await
let buf = self.get(key, lsn, ctx).await?;
Ok(buf)
}
pub async fn list_dbdirs(
@@ -495,28 +487,20 @@ impl Timeline {
pub async fn get_current_logical_size_non_incremental(
&self,
lsn: Lsn,
cancel: CancellationToken,
ctx: &TimelineRequestContext,
) -> Result<u64, CalculateLogicalSizeError> {
) -> Result<u64, PageReconstructError> {
// Fetch list of database dirs and iterate them
let buf = self.get(DBDIR_KEY, lsn, ctx).await.context("read dbdir")?;
let dbdir = DbDirectory::des(&buf).context("deserialize db directory")?;
let mut total_size: u64 = 0;
for (spcnode, dbnode) in dbdir.dbdirs.keys() {
for rel in self
.list_rels(*spcnode, *dbnode, lsn, ctx)
.await
.context("list rels")?
{
if cancel.is_cancelled() {
return Err(CalculateLogicalSizeError::Cancelled);
for rel in self.list_rels(*spcnode, *dbnode, lsn, ctx).await? {
if ctx.is_cancelled() {
return Err(PageReconstructError::Cancelled);
}
let relsize_key = rel_size_to_key(rel);
let mut buf = self
.get(relsize_key, lsn, ctx)
.await
.context("read relation size of {rel:?}")?;
let mut buf = self.get(relsize_key, lsn, ctx).await?;
let relsize = buf.get_u32_le();
total_size += relsize as u64;
@@ -553,7 +537,8 @@ impl Timeline {
let mut rels: Vec<RelTag> = self
.list_rels(spcnode, dbnode, lsn, ctx)
.await?
.into_iter()
.iter()
.cloned()
.collect();
rels.sort_unstable();
for rel in rels {

View File

@@ -1,59 +1,21 @@
//!
//! This module provides centralized handling of tokio tasks in the Page Server.
//! This module provides some helpers for spawning tokio tasks in the pageserver.
//!
//! We provide a few basic facilities:
//! - A global registry of tasks that lists what kind of tasks they are, and
//! which tenant or timeline they are working on
//!
//! - The ability to request a task to shut down.
//!
//!
//! # How it works?
//!
//! There is a global hashmap of all the tasks (`TASKS`). Whenever a new
//! task is spawned, a PageServerTask entry is added there, and when a
//! task dies, it removes itself from the hashmap. If you want to kill a
//! task, you can scan the hashmap to find it.
//!
//! # Task shutdown
//!
//! To kill a task, we rely on co-operation from the victim. Each task is
//! expected to periodically call the `is_shutdown_requested()` function, and
//! if it returns true, exit gracefully. In addition to that, when waiting for
//! the network or other long-running operation, you can use
//! `shutdown_watcher()` function to get a Future that will become ready if
//! the current task has been requested to shut down. You can use that with
//! Tokio select!().
//!
//! TODO: This would be a good place to also handle panics in a somewhat sane way.
//! Depending on what task panics, we might want to kill the whole server, or
//! only a single tenant or timeline.
//! Mostly just a wrapper around tokio::spawn, with some code to handle panics.
//!
// Clippy 1.60 incorrectly complains about the tokio::task_local!() macro.
// Silence it. See https://github.com/rust-lang/rust-clippy/issues/9224.
#![allow(clippy::declare_interior_mutable_const)]
use std::collections::HashMap;
use std::fmt;
use std::future::Future;
use std::panic::AssertUnwindSafe;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::panic::{resume_unwind, AssertUnwindSafe};
use futures::FutureExt;
use tokio::runtime::Runtime;
use tokio::task::JoinHandle;
use tokio::task_local;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
use tracing::{debug, error, info};
use once_cell::sync::Lazy;
use utils::id::{TenantId, TimelineId};
use crate::shutdown_pageserver;
use crate::context::{self, TaskKind};
//
// There are four runtimes:
@@ -92,10 +54,6 @@ use crate::shutdown_pageserver;
// runtime. If a GetPage request comes in before the load of a tenant has finished, the
// GetPage request will wait for the tenant load to finish.
//
// The core Timeline code is synchronous, and uses a bunch of std Mutexes and RWLocks to
// protect data structures. Let's keep it that way. Synchronous code is easier to debug
// and analyze, and there's a lot of hairy, low-level, performance critical code there.
//
// It's nice to have different runtimes, so that you can quickly eyeball how much CPU
// time each class of operations is taking, with 'top -H' or similar.
//
@@ -135,355 +93,81 @@ pub static BACKGROUND_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
.expect("Failed to create background op runtime")
});
#[derive(Debug, Clone, Copy)]
pub struct PageserverTaskId(u64);
impl fmt::Display for PageserverTaskId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}
/// Each task that we track is associated with a "task ID". It's just an
/// increasing number that we assign. Note that it is different from tokio::task::Id.
static NEXT_TASK_ID: AtomicU64 = AtomicU64::new(1);
/// Global registry of tasks
static TASKS: Lazy<Mutex<HashMap<u64, Arc<PageServerTask>>>> =
Lazy::new(|| Mutex::new(HashMap::new()));
task_local! {
// This is a cancellation token which will be cancelled when a task needs to shut down. The
// root token is kept in the global registry, so that anyone can send the signal to request
// task shutdown.
static SHUTDOWN_TOKEN: CancellationToken;
// Each task holds reference to its own PageServerTask here.
static CURRENT_TASK: Arc<PageServerTask>;
}
///
/// There are many kinds of tasks in the system. Some are associated with a particular
/// tenant or timeline, while others are global.
///
/// Note that we don't try to limit how many task of a certain kind can be running
/// at the same time.
///
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum TaskKind {
// libpq listener task. It just accepts connection and spawns a
// PageRequestHandler task for each connection.
LibpqEndpointListener,
// HTTP endpoint listener.
HttpEndpointListener,
// Task that handles a single connection. A PageRequestHandler task
// starts detached from any particular tenant or timeline, but it can be
// associated with one later, after receiving a command from the client.
PageRequestHandler,
// Manages the WAL receiver connection for one timeline. It subscribes to
// events from storage_broker, decides which safekeeper to connect to. It spawns a
// separate WalReceiverConnection task to handle each connection.
WalReceiverManager,
// Handles a connection to a safekeeper, to stream WAL to a timeline.
WalReceiverConnection,
// Garbage collection worker. One per tenant
GarbageCollector,
// Compaction. One per tenant.
Compaction,
// Initial logical size calculation
InitialLogicalSizeCalculation,
// Task that flushes frozen in-memory layers to disk
LayerFlushTask,
// Task that uploads a file to remote storage
RemoteUploadTask,
// Task that downloads a file from remote storage
RemoteDownloadTask,
// task that handles the initial downloading of all tenants
InitialLoad,
// task that handles attaching a tenant
Attach,
// task that handhes metrics collection
MetricsCollection,
// task that drives downloading layers
DownloadAllRemoteLayers,
}
#[derive(Default)]
struct MutableTaskState {
/// Tenant and timeline that this task is associated with.
tenant_id: Option<TenantId>,
timeline_id: Option<TimelineId>,
/// Handle for waiting for the task to exit. It can be None, if the
/// the task has already exited.
join_handle: Option<JoinHandle<()>>,
}
struct PageServerTask {
#[allow(dead_code)] // unused currently
task_id: PageserverTaskId,
kind: TaskKind,
name: String,
// To request task shutdown, just cancel this token.
cancel: CancellationToken,
mutable: Mutex<MutableTaskState>,
}
/// Launch a new task
/// Note: if shutdown_process_on_error is set to true failure
/// of the task will lead to shutdown of entire process
///
/// This is a wrapper around tokio::spawn. One difference is that the Future
/// is marked to return nothing to avoid silently swallowing errors. This
/// forces the future to handle errors by itself. If you need the return
/// value, you could create another function that passes it through, but we
/// don't have a need for that currently.
///
/// If shutdown_process_on_panic is set to true, panic of the task will lead
/// to shutdown of entire process. Otherwise we log the panic and continue.
pub fn spawn<F>(
runtime: &tokio::runtime::Handle,
kind: TaskKind,
tenant_id: Option<TenantId>,
timeline_id: Option<TimelineId>,
name: &str,
shutdown_process_on_error: bool,
shutdown_process_on_panic: bool,
future: F,
) -> PageserverTaskId
) -> JoinHandle<F::Output>
where
F: Future<Output = anyhow::Result<()>> + Send + 'static,
F: Future<Output = ()> + Send + 'static,
{
let cancel = CancellationToken::new();
let task_id = NEXT_TASK_ID.fetch_add(1, Ordering::Relaxed);
let task = Arc::new(PageServerTask {
task_id: PageserverTaskId(task_id),
kind,
name: name.to_string(),
cancel: cancel.clone(),
mutable: Mutex::new(MutableTaskState {
tenant_id,
timeline_id,
join_handle: None,
}),
});
TASKS.lock().unwrap().insert(task_id, Arc::clone(&task));
let mut task_mut = task.mutable.lock().unwrap();
let task_name = name.to_string();
let task_cloned = Arc::clone(&task);
let join_handle = runtime.spawn(task_wrapper(
task_name,
task_id,
task_cloned,
cancel,
shutdown_process_on_error,
future,
));
task_mut.join_handle = Some(join_handle);
drop(task_mut);
// The task is now running. Nothing more to do here
PageserverTaskId(task_id)
runtime.spawn(task_wrapper(task_name, shutdown_process_on_panic, future))
}
/// This wrapper function runs in a newly-spawned task. It initializes the
/// task-local variables and calls the payload function.
async fn task_wrapper<F>(
task_name: String,
task_id: u64,
task: Arc<PageServerTask>,
shutdown_token: CancellationToken,
shutdown_process_on_error: bool,
future: F,
) where
F: Future<Output = anyhow::Result<()>> + Send + 'static,
/// This wrapper function runs in a newly-spawned task. To handle panics.
async fn task_wrapper<F, R>(task_name: String, shutdown_process_on_panic: bool, future: F) -> R
where
F: Future<Output = R> + Send + 'static,
{
debug!("Starting task '{}'", task_name);
let result = SHUTDOWN_TOKEN
.scope(
shutdown_token,
CURRENT_TASK.scope(task, {
// We use AssertUnwindSafe here so that the payload function
// doesn't need to be UnwindSafe. We don't do anything after the
// unwinding that would expose us to unwind-unsafe behavior.
AssertUnwindSafe(future).catch_unwind()
}),
)
.await;
task_finish(result, task_name, task_id, shutdown_process_on_error).await;
}
// We use AssertUnwindSafe here so that the payload function
// doesn't need to be UnwindSafe. We don't do anything after the
// unwinding that would expose us to unwind-unsafe behavior.
let result = AssertUnwindSafe(future).catch_unwind().await;
async fn task_finish(
result: std::result::Result<
anyhow::Result<()>,
std::boxed::Box<dyn std::any::Any + std::marker::Send>,
>,
task_name: String,
task_id: u64,
shutdown_process_on_error: bool,
) {
// Remove our entry from the global hashmap.
let task = TASKS
.lock()
.unwrap()
.remove(&task_id)
.expect("no task in registry");
let mut shutdown_process = false;
{
let task_mut = task.mutable.lock().unwrap();
match result {
Ok(Ok(())) => {
debug!("Task '{}' exited normally", task_name);
}
Ok(Err(err)) => {
if shutdown_process_on_error {
error!(
"Shutting down: task '{}' tenant_id: {:?}, timeline_id: {:?} exited with error: {:?}",
task_name, task_mut.tenant_id, task_mut.timeline_id, err
);
shutdown_process = true;
} else {
error!(
"Task '{}' tenant_id: {:?}, timeline_id: {:?} exited with error: {:?}",
task_name, task_mut.tenant_id, task_mut.timeline_id, err
);
}
}
Err(err) => {
if shutdown_process_on_error {
error!(
"Shutting down: task '{}' tenant_id: {:?}, timeline_id: {:?} panicked: {:?}",
task_name, task_mut.tenant_id, task_mut.timeline_id, err
);
shutdown_process = true;
} else {
error!(
"Task '{}' tenant_id: {:?}, timeline_id: {:?} panicked: {:?}",
task_name, task_mut.tenant_id, task_mut.timeline_id, err
);
}
// Handle panics
match result {
Ok(result) => {
debug!("Task '{}' exited normally", task_name);
result
}
Err(err) => {
if shutdown_process_on_panic {
error!("Shutting down: task '{}' panicked: {:?}", task_name, err);
shutdown_pageserver(1).await;
unreachable!();
} else {
error!("Task '{}' panicked: {:?}", task_name, err);
resume_unwind(err);
}
}
}
if shutdown_process {
shutdown_pageserver(1).await;
}
}
// expected to be called from the task of the given id.
pub fn associate_with(tenant_id: Option<TenantId>, timeline_id: Option<TimelineId>) {
CURRENT_TASK.with(|ct| {
let mut task_mut = ct.mutable.lock().unwrap();
task_mut.tenant_id = tenant_id;
task_mut.timeline_id = timeline_id;
});
}
/// Is there a task running that matches the criteria
/// Signal and wait for tasks to shut down.
///
/// Perform pageserver shutdown. This is called on receiving a signal,
/// or if one of the tasks marked as 'shutdown_process_on_error' dies.
///
/// The arguments are used to select the tasks to kill. Any None arguments are
/// ignored. For example, to shut down all WalReceiver tasks:
///
/// shutdown_tasks(Some(TaskKind::WalReceiver), None, None)
///
/// Or to shut down all tasks for given timeline:
///
/// shutdown_tasks(None, Some(tenant_id), Some(timeline_id))
///
pub async fn shutdown_tasks(
kind: Option<TaskKind>,
tenant_id: Option<TenantId>,
timeline_id: Option<TimelineId>,
) {
let mut victim_tasks = Vec::new();
/// This never returns.
pub async fn shutdown_pageserver(exit_code: i32) {
// Shut down the libpq endpoint task. This prevents new connections from
// being accepted.
context::shutdown_tasks(TaskKind::LibpqEndpointListener).await;
{
let tasks = TASKS.lock().unwrap();
for task in tasks.values() {
let task_mut = task.mutable.lock().unwrap();
if (kind.is_none() || Some(task.kind) == kind)
&& (tenant_id.is_none() || task_mut.tenant_id == tenant_id)
&& (timeline_id.is_none() || task_mut.timeline_id == timeline_id)
{
task.cancel.cancel();
victim_tasks.push(Arc::clone(task));
}
}
}
// Shut down all tenants gracefully
crate::tenant::mgr::shutdown_all_tenants().await;
for task in victim_tasks {
let join_handle = {
let mut task_mut = task.mutable.lock().unwrap();
info!("waiting for {} to shut down", task.name);
let join_handle = task_mut.join_handle.take();
drop(task_mut);
join_handle
};
if let Some(join_handle) = join_handle {
let _ = join_handle.await;
} else {
// Possibly one of:
// * The task had not even fully started yet.
// * It was shut down concurrently and already exited
}
}
}
pub fn current_task_kind() -> Option<TaskKind> {
CURRENT_TASK.try_with(|ct| ct.kind).ok()
}
pub fn current_task_id() -> Option<PageserverTaskId> {
CURRENT_TASK.try_with(|ct| ct.task_id).ok()
}
/// A Future that can be used to check if the current task has been requested to
/// shut down.
pub async fn shutdown_watcher() {
let token = SHUTDOWN_TOKEN
.try_with(|t| t.clone())
.expect("shutdown_requested() called in an unexpected task or thread");
token.cancelled().await;
}
/// Clone the current task's cancellation token, which can be moved across tasks.
///
/// When the task which is currently executing is shutdown, the cancellation token will be
/// cancelled. It can however be moved to other tasks, such as `tokio::task::spawn_blocking` or
/// `tokio::task::JoinSet::spawn`.
pub fn shutdown_token() -> CancellationToken {
SHUTDOWN_TOKEN
.try_with(|t| t.clone())
.expect("shutdown_token() called in an unexpected task or thread")
}
/// Has the current task been requested to shut down?
pub fn is_shutdown_requested() -> bool {
if let Ok(cancel) = SHUTDOWN_TOKEN.try_with(|t| t.clone()) {
cancel.is_cancelled()
} else {
if !cfg!(test) {
warn!("is_shutdown_requested() called in an unexpected task or thread");
}
false
}
// Shut down the HTTP endpoint last, so that you can still check the server's
// status while it's shutting down.
// FIXME: We should probably stop accepting commands like attach/detach earlier.
context::shutdown_tasks(TaskKind::HttpEndpointListener).await;
// There should be nothing left, but let's be sure
context::shutdown_all_tasks().await;
info!("Shut down successfully completed");
std::process::exit(exit_code);
}

File diff suppressed because it is too large Load Diff

View File

@@ -8,6 +8,8 @@ use std::sync::Arc;
use tokio::fs;
use anyhow::Context;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use once_cell::sync::Lazy;
use tokio::sync::RwLock;
use tracing::*;
@@ -17,9 +19,8 @@ use utils::crashsafe;
use crate::config::PageServerConf;
use crate::context::RequestContext;
use crate::task_mgr::{self, TaskKind};
use crate::tenant::config::TenantConfOpt;
use crate::tenant::{Tenant, TenantState};
use crate::tenant::{Tenant, TenantRequestContext, TenantState};
use crate::IGNORED_TENANT_FILE_NAME;
use utils::fs_ext::PathExt;
@@ -182,25 +183,11 @@ pub async fn shutdown_all_tenants() {
tenants_to_shut_down
};
// Shut down all existing walreceiver connections and stop accepting the new ones.
task_mgr::shutdown_tasks(Some(TaskKind::WalReceiverManager), None, None).await;
// Ok, no background tasks running anymore. Flush any remaining data in
// memory to disk.
//
// We assume that any incoming connections that might request pages from
// the tenant have already been terminated by the caller, so there
// should be no more activity in any of the repositories.
//
// On error, log it but continue with the shutdown for other tenants.
for tenant in tenants_to_shut_down {
let tenant_id = tenant.tenant_id();
debug!("shutdown tenant {tenant_id}");
if let Err(err) = tenant.freeze_and_flush().await {
error!("Could not checkpoint tenant {tenant_id} during shutdown: {err:?}");
}
let mut shutdown_futures: FuturesUnordered<_> = FuturesUnordered::new();
for tenant in tenants_to_shut_down.iter() {
shutdown_futures.push(tenant.graceful_shutdown(true));
}
while let Some(_result) = shutdown_futures.next().await {}
}
pub async fn create_tenant(
@@ -238,28 +225,34 @@ pub async fn update_tenant_config(
ctx: &RequestContext,
) -> anyhow::Result<()> {
info!("configuring tenant {tenant_id}");
get_tenant(tenant_id, true)
.await?
.update_tenant_config(tenant_conf);
let (tenant, _ctx) = get_active_tenant(tenant_id, ctx).await?;
tenant.update_tenant_config(tenant_conf);
Tenant::persist_tenant_config(&conf.tenant_config_path(tenant_id), tenant_conf, false)?;
Ok(())
}
/// Gets the tenant from the in-memory data, erroring if it's absent or is not fitting to the query.
/// `active_only = true` allows to query only tenants that are ready for operations, erroring on other kinds of tenants.
pub async fn get_tenant(tenant_id: TenantId, active_only: bool) -> anyhow::Result<Arc<Tenant>> {
pub async fn get_active_tenant(
tenant_id: TenantId,
parent_ctx: &RequestContext,
) -> anyhow::Result<(Arc<Tenant>, TenantRequestContext)> {
let tenant = get_tenant(tenant_id).await?;
let tenant_ctx = match tenant.get_context(parent_ctx) {
Ok(ctx) => ctx,
Err(state) => anyhow::bail!("Tenant {} is not active, state: {:?}", tenant_id, state,),
};
Ok((tenant, tenant_ctx))
}
pub async fn get_tenant(tenant_id: TenantId) -> anyhow::Result<Arc<Tenant>> {
let m = TENANTS.read().await;
let tenant = m
.get(&tenant_id)
.with_context(|| format!("Tenant {tenant_id} not found in the local state"))?;
if active_only && !tenant.is_active() {
anyhow::bail!(
"Tenant {tenant_id} is not active. Current state: {:?}",
tenant.current_state()
)
} else {
Ok(Arc::clone(tenant))
}
Ok(Arc::clone(tenant))
}
pub async fn delete_timeline(
@@ -267,10 +260,9 @@ pub async fn delete_timeline(
timeline_id: TimelineId,
ctx: &RequestContext,
) -> anyhow::Result<()> {
match get_tenant(tenant_id, true).await {
Ok(tenant) => {
let tenant_ctx = tenant.get_context();
tenant.delete_timeline(timeline_id, &tenant_ctx).await?;
match get_active_tenant(tenant_id, ctx).await {
Ok((tenant, ctx)) => {
tenant.delete_timeline(timeline_id, &ctx).await?;
}
Err(e) => anyhow::bail!("Cannot access tenant {tenant_id} in local tenant state: {e:?}"),
}
@@ -402,27 +394,31 @@ where
// The exclusive lock here ensures we don't miss the tenant state updates before trying another removal.
// tenant-wde cleanup operations may take some time (removing the entire tenant directory), we want to
// avoid holding the lock for the entire process.
{
let tenant = {
let tenants_accessor = TENANTS.write().await;
match tenants_accessor.get(&tenant_id) {
Some(tenant) => match tenant.current_state() {
TenantState::Attaching
| TenantState::Loading
| TenantState::Broken
| TenantState::Active => tenant.set_stopping(),
| TenantState::Active => {
tenant.set_stopping();
Arc::clone(tenant)
}
TenantState::Stopping => {
anyhow::bail!("Tenant {tenant_id} is stopping already")
}
},
None => anyhow::bail!("Tenant not found for id {tenant_id}"),
}
}
};
// shutdown all tenant and timeline tasks: gc, compaction, page service)
// No new tasks will be started for this tenant because it's in `Stopping` state.
// Hence, once we're done here, the `tenant_cleanup` callback can mutate tenant on-disk state freely.
task_mgr::shutdown_tasks(None, Some(tenant_id), None).await;
// Shut down all tenant and timeline tasks.
tenant.graceful_shutdown(true).await;
// All tasks that operated on the tenant or any of its timelines have no finished,
// and they are in Stopped state so that new ones cannot appear anymore. Proceed
// with the cleanup.
match tenant_cleanup
.await
.with_context(|| format!("Failed to run cleanup for tenant {tenant_id}"))
@@ -444,110 +440,3 @@ where
}
}
}
#[cfg(feature = "testing")]
use {
crate::repository::GcResult, pageserver_api::models::TimelineGcRequest,
utils::http::error::ApiError,
};
#[cfg(feature = "testing")]
pub async fn immediate_gc(
tenant_id: TenantId,
timeline_id: TimelineId,
gc_req: TimelineGcRequest,
) -> Result<tokio::sync::oneshot::Receiver<Result<GcResult, anyhow::Error>>, ApiError> {
let guard = TENANTS.read().await;
let tenant = guard
.get(&tenant_id)
.map(Arc::clone)
.with_context(|| format!("Tenant {tenant_id} not found"))
.map_err(ApiError::NotFound)?;
let gc_horizon = gc_req.gc_horizon.unwrap_or_else(|| tenant.get_gc_horizon());
// Use tenant's pitr setting
let pitr = tenant.get_pitr_interval();
// Run in task_mgr to avoid race with detach operation
let (task_done, wait_task_done) = tokio::sync::oneshot::channel();
task_mgr::spawn(
&tokio::runtime::Handle::current(),
TaskKind::GarbageCollector,
Some(tenant_id),
Some(timeline_id),
&format!("timeline_gc_handler garbage collection run for tenant {tenant_id} timeline {timeline_id}"),
false,
async move {
fail::fail_point!("immediate_gc_task_pre");
let tenant_ctx = tenant.get_context();
let result = tenant
.gc_iteration(Some(timeline_id), gc_horizon, pitr, &tenant_ctx)
.instrument(info_span!("manual_gc", tenant = %tenant_id, timeline = %timeline_id))
.await;
// FIXME: `gc_iteration` can return an error for multiple reasons; we should handle it
// better once the types support it.
match task_done.send(result) {
Ok(_) => (),
Err(result) => error!("failed to send gc result: {result:?}"),
}
Ok(())
}
);
// drop the guard until after we've spawned the task so that timeline shutdown will wait for the task
drop(guard);
Ok(wait_task_done)
}
#[cfg(feature = "testing")]
pub async fn immediate_compact(
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Result<tokio::sync::oneshot::Receiver<anyhow::Result<()>>, ApiError> {
let guard = TENANTS.read().await;
let tenant = guard
.get(&tenant_id)
.map(Arc::clone)
.with_context(|| format!("Tenant {tenant_id} not found"))
.map_err(ApiError::NotFound)?;
let timeline = tenant
.get_timeline(timeline_id, true)
.map_err(ApiError::NotFound)?;
let timeline_ctx = timeline.get_context();
// Run in task_mgr to avoid race with detach operation
let (task_done, wait_task_done) = tokio::sync::oneshot::channel();
task_mgr::spawn(
&tokio::runtime::Handle::current(),
TaskKind::Compaction,
Some(tenant_id),
Some(timeline_id),
&format!(
"timeline_compact_handler compaction run for tenant {tenant_id} timeline {timeline_id}"
),
false,
async move {
let result = timeline
.compact(&timeline_ctx)
.instrument(
info_span!("manual_compact", tenant = %tenant_id, timeline = %timeline_id),
)
.await;
match task_done.send(result) {
Ok(_) => (),
Err(result) => error!("failed to send compaction result: {result:?}"),
}
Ok(())
},
);
// drop the guard until after we've spawned the task so that timeline shutdown will wait for the task
drop(guard);
Ok(wait_task_done)
}

View File

@@ -214,6 +214,7 @@ use anyhow::ensure;
use remote_storage::{DownloadError, GenericRemoteStorage};
use std::ops::DerefMut;
use tokio::runtime::Runtime;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn};
use tracing::{info_span, Instrument};
use utils::lsn::Lsn;
@@ -225,12 +226,12 @@ use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
use crate::{
config::PageServerConf,
task_mgr,
task_mgr::TaskKind,
task_mgr::BACKGROUND_RUNTIME,
tenant::metadata::TimelineMetadata,
tenant::upload_queue::{
UploadOp, UploadQueue, UploadQueueInitialized, UploadQueueStopped, UploadTask,
},
tenant::TimelineRequestContext,
{exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS},
};
@@ -313,25 +314,50 @@ impl RemoteTimelineClient {
/// Initialize the upload queue for a remote storage that already received
/// an index file upload, i.e., it's not empty.
/// The given `index_part` must be the one on the remote.
pub fn init_upload_queue(&self, index_part: &IndexPart) -> anyhow::Result<()> {
pub fn init_upload_queue(
self: &Arc<Self>,
index_part: &IndexPart,
upload_ctx: TimelineRequestContext,
) -> anyhow::Result<()> {
let cancellation_token = upload_ctx.cancellation_token().clone();
let mut upload_queue = self.upload_queue.lock().unwrap();
upload_queue.initialize_with_current_remote_index_part(index_part)?;
upload_queue.initialize_with_current_remote_index_part(index_part, upload_ctx)?;
self.update_remote_physical_size_gauge(Some(index_part));
self.spawn_cancellation_watch(cancellation_token);
Ok(())
}
/// Initialize the upload queue for the case where the remote storage is empty,
/// i.e., it doesn't have an `IndexPart`.
pub fn init_upload_queue_for_empty_remote(
&self,
self: &Arc<Self>,
local_metadata: &TimelineMetadata,
upload_ctx: TimelineRequestContext,
) -> anyhow::Result<()> {
let cancellation_token = upload_ctx.cancellation_token().clone();
let mut upload_queue = self.upload_queue.lock().unwrap();
upload_queue.initialize_empty_remote(local_metadata)?;
upload_queue.initialize_empty_remote(local_metadata, upload_ctx)?;
self.update_remote_physical_size_gauge(None);
self.spawn_cancellation_watch(cancellation_token);
Ok(())
}
/// Spawn a task that calls `stop` on cancellation. It's important that we
/// stop the upload queue promptly, because it holds onto the RequestContext,
/// which in turn prevents the Timeline from shutting down.
fn spawn_cancellation_watch(self: &Arc<Self>, cancellation_token: CancellationToken) {
let self_rc = Arc::clone(self);
task_mgr::spawn(
self.runtime.handle(),
"remote upload queue cancellation watch",
false,
async move {
cancellation_token.cancelled().await;
self_rc.stop();
},
);
}
pub fn last_uploaded_consistent_lsn(&self) -> Option<Lsn> {
match &*self.upload_queue.lock().unwrap() {
UploadQueue::Uninitialized => None,
@@ -625,7 +651,10 @@ impl RemoteTimelineClient {
///
/// Wait for all previously scheduled uploads/deletions to complete
///
pub async fn wait_completion(self: &Arc<Self>) -> anyhow::Result<()> {
pub async fn wait_completion(
self: &Arc<Self>,
ctx: &TimelineRequestContext,
) -> anyhow::Result<()> {
let (sender, mut receiver) = tokio::sync::watch::channel(());
let barrier_op = UploadOp::Barrier(sender);
@@ -639,9 +668,16 @@ impl RemoteTimelineClient {
self.launch_queued_tasks(upload_queue);
}
if receiver.changed().await.is_err() {
anyhow::bail!("wait_completion aborted because upload queue was stopped");
}
tokio::select! {
result = receiver.changed() => {
if result.is_err() {
anyhow::bail!("wait_completion aborted because upload queue was stopped");
}
},
_ = ctx.cancelled() => {
anyhow::bail!("request cancelled while waiting on uploads to finish");
},
};
Ok(())
}
@@ -719,16 +755,15 @@ impl RemoteTimelineClient {
// Spawn task to perform the task
let self_rc = Arc::clone(self);
let cancellation_token = upload_queue.upload_ctx.cancellation_token().clone();
task_mgr::spawn(
self.runtime.handle(),
TaskKind::RemoteUploadTask,
Some(self.tenant_id),
Some(self.timeline_id),
"remote upload",
false,
async move {
self_rc.perform_upload_task(task).await;
Ok(())
self_rc.perform_upload_task(task, cancellation_token).await;
}
.instrument(info_span!(parent: None, "remote_upload", tenant = %self.tenant_id, timeline = %self.timeline_id, upload_task_id = %task_id)),
);
@@ -748,7 +783,11 @@ impl RemoteTimelineClient {
/// The task can be shut down, however. That leads to stopping the whole
/// queue.
///
async fn perform_upload_task(self: &Arc<Self>, task: Arc<UploadTask>) {
async fn perform_upload_task(
self: &Arc<Self>,
task: Arc<UploadTask>,
cancellation_token: CancellationToken,
) {
// Loop to retry until it completes.
loop {
// If we're requested to shut down, close up shop and exit.
@@ -760,7 +799,7 @@ impl RemoteTimelineClient {
// the Future, but we're not 100% sure if the remote storage library
// is cancellation safe, so we don't dare to do that. Hopefully, the
// upload finishes or times out soon enough.
if task_mgr::is_shutdown_requested() {
if cancellation_token.is_cancelled() {
info!("upload task cancelled by shutdown request");
self.calls_unfinished_metric_end(&task.op);
self.stop();
@@ -858,7 +897,7 @@ impl RemoteTimelineClient {
// sleep until it's time to retry, or we're cancelled
tokio::select! {
_ = task_mgr::shutdown_watcher() => { },
_ = cancellation_token.cancelled() => { },
_ = exponential_backoff(
retries,
DEFAULT_BASE_BACKOFF_SECONDS,
@@ -1010,7 +1049,9 @@ impl RemoteTimelineClient {
#[cfg(test)]
mod tests {
use super::*;
use crate::context::{DownloadBehavior, RequestContext, TaskKind};
use crate::tenant::harness::{TenantHarness, TIMELINE_ID};
use crate::DEFAULT_PG_VERSION;
use remote_storage::{RemoteStorageConfig, RemoteStorageKind};
use std::{collections::HashSet, path::Path};
use utils::lsn::Lsn;
@@ -1029,7 +1070,7 @@ mod tests {
Lsn(0),
// Any version will do
// but it should be consistent with the one in the tests
crate::DEFAULT_PG_VERSION,
DEFAULT_PG_VERSION,
);
// go through serialize + deserialize to fix the header, including checksum
@@ -1064,9 +1105,19 @@ mod tests {
// Test scheduling
#[test]
fn upload_scheduling() -> anyhow::Result<()> {
// Use a current-thread runtime in the test
let runtime = Box::leak(Box::new(
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?,
));
let _entered = runtime.enter();
let harness = TenantHarness::create("upload_scheduling")?;
let (tenant, tenant_ctx) = runtime.block_on(harness.load());
let (_timeline, timeline_ctx) =
tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &tenant_ctx)?;
let timeline_path = harness.timeline_path(&TIMELINE_ID);
std::fs::create_dir_all(&timeline_path)?;
let remote_fs_dir = harness.conf.workdir.join("remote_fs");
std::fs::create_dir_all(remote_fs_dir)?;
@@ -1084,14 +1135,6 @@ mod tests {
storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()),
};
// Use a current-thread runtime in the test
let runtime = Box::leak(Box::new(
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?,
));
let _entered = runtime.enter();
// Test outline:
//
// Schedule upload of a bunch of layers. Check that they are started immediately, not queued
@@ -1127,7 +1170,11 @@ mod tests {
println!("remote_timeline_dir: {}", remote_timeline_dir.display());
let metadata = dummy_metadata(Lsn(0x10));
client.init_upload_queue_for_empty_remote(&metadata)?;
let upload_ctx = timeline_ctx.register_another(RequestContext::new(
TaskKind::RemoteUploadTask,
DownloadBehavior::Error,
));
client.init_upload_queue_for_empty_remote(&metadata, upload_ctx)?;
// Create a couple of dummy files, schedule upload for them
let content_foo = dummy_contents("foo");
@@ -1167,7 +1214,7 @@ mod tests {
}
// Wait for the uploads to finish
runtime.block_on(client.wait_completion())?;
runtime.block_on(client.wait_completion(&timeline_ctx))?;
{
let mut guard = client.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut().unwrap();
@@ -1204,7 +1251,7 @@ mod tests {
assert_remote_files(&["foo", "bar", "index_part.json"], &remote_timeline_dir);
// Finish them
runtime.block_on(client.wait_completion())?;
runtime.block_on(client.wait_completion(&timeline_ctx))?;
assert_remote_files(&["bar", "baz", "index_part.json"], &remote_timeline_dir);

View File

@@ -3,11 +3,9 @@ use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use anyhow::Context;
use tokio::sync::oneshot::error::RecvError;
use tokio::sync::Semaphore;
use crate::pgdatadir_mapping::CalculateLogicalSizeError;
use crate::tenant::{TenantRequestContext, TimelineRequestContext};
use crate::tenant::{PageReconstructError, TenantRequestContext, TimelineRequestContext};
use super::Tenant;
use utils::id::TimelineId;
@@ -99,10 +97,19 @@ pub(super) async fn gather_inputs(
// used to determine the `retention_period` for the size model
let mut max_cutoff_distance = None;
let mut ctx_dropguards: Vec<tokio_util::sync::DropGuard> = Vec::new();
for timeline in timelines {
let last_record_lsn = timeline.get_last_record_lsn();
let ctx = timeline.get_context();
let ctx = match timeline.get_context(tenant_ctx) {
Ok(ctx) => ctx,
Err(state) => {
info!("skipping tenant size calculation for timeline because it is in {state:?} state");
continue;
}
};
ctx_dropguards.push(ctx.cancellation_token().clone().drop_guard());
let ctx = Arc::new(ctx);
let (interesting_lsns, horizon_cutoff, pitr_cutoff, next_gc_cutoff) = {
@@ -366,7 +373,7 @@ enum LsnKind {
struct TimelineAtLsnSizeResult(
Arc<crate::tenant::Timeline>,
utils::lsn::Lsn,
Result<u64, CalculateLogicalSizeError>,
Result<u64, PageReconstructError>,
);
#[instrument(skip_all, fields(timeline_id=%timeline.timeline_id, lsn=%lsn))]
@@ -375,14 +382,12 @@ async fn calculate_logical_size(
timeline: Arc<crate::tenant::Timeline>,
lsn: utils::lsn::Lsn,
ctx: &TimelineRequestContext,
) -> Result<TimelineAtLsnSizeResult, RecvError> {
) -> Result<TimelineAtLsnSizeResult, PageReconstructError> {
let _permit = tokio::sync::Semaphore::acquire_owned(limit)
.await
.expect("global semaphore should not have been closed");
let size_res = timeline
.spawn_ondemand_logical_size_calculation(lsn, ctx)
.await?;
let size_res = timeline.calculate_logical_size(lsn, ctx).await;
Ok(TimelineAtLsnSizeResult(timeline, lsn, size_res))
}

View File

@@ -1,45 +1,39 @@
//! This module contains functions to serve per-tenant background processes,
//! such as compaction and GC
use std::ops::ControlFlow;
use std::sync::Arc;
use std::time::Duration;
use crate::context::{DownloadBehavior, RequestContext, TaskKind};
use crate::metrics::TENANT_TASK_EVENTS;
use crate::task_mgr;
use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME};
use crate::tenant::mgr;
use crate::tenant::{Tenant, TenantState};
use crate::task_mgr::BACKGROUND_RUNTIME;
use crate::tenant::Tenant;
use tracing::*;
use utils::id::TenantId;
pub fn start_background_loops(tenant_id: TenantId) {
pub fn start_background_loops(tenant: &Arc<Tenant>) {
let tenant_id = tenant.tenant_id;
let tenant_clone = Arc::clone(tenant);
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::Compaction,
Some(tenant_id),
None,
&format!("compactor for tenant {tenant_id}"),
false,
async move {
compaction_loop(tenant_id)
compaction_loop(&tenant_clone)
.instrument(info_span!("compaction_loop", tenant_id = %tenant_id))
.await;
Ok(())
},
);
let tenant_clone = Arc::clone(tenant);
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::GarbageCollector,
Some(tenant_id),
None,
&format!("garbage collector for tenant {tenant_id}"),
false,
async move {
gc_loop(tenant_id)
gc_loop(&tenant_clone)
.instrument(info_span!("gc_loop", tenant_id = %tenant_id))
.await;
Ok(())
},
);
}
@@ -47,26 +41,27 @@ pub fn start_background_loops(tenant_id: TenantId) {
///
/// Compaction task's main loop
///
async fn compaction_loop(tenant_id: TenantId) {
async fn compaction_loop(tenant: &Arc<Tenant>) {
let wait_duration = Duration::from_secs(2);
info!("starting");
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
async {
let top_ctx = RequestContext::new(TaskKind::Compaction, DownloadBehavior::Download);
let tenant_ctx = match tenant.get_context(&top_ctx) {
Ok(ctx) => ctx,
Err(state) => {
// This could happen if the tenant is detached or the pageserver is shut
// down immediately after loading or attaching completed and the tenant
// was activated. It seems unlikely enough in practice that we better print
// a warning, as it could also be a bug.
error!("Not running compaction loop, tenant is not active: {state:?}");
return;
}
};
loop {
trace!("waking up");
let tenant = tokio::select! {
_ = task_mgr::shutdown_watcher() => {
info!("received cancellation request");
return;
},
tenant_wait_result = wait_for_active_tenant(tenant_id, wait_duration) => match tenant_wait_result {
ControlFlow::Break(()) => return,
ControlFlow::Continue(tenant) => tenant,
},
};
let tenant_ctx = tenant.get_context();
let mut sleep_duration = tenant.get_compaction_period();
if sleep_duration == Duration::ZERO {
info!("automatic compaction is disabled");
@@ -82,7 +77,7 @@ async fn compaction_loop(tenant_id: TenantId) {
// Sleep
tokio::select! {
_ = task_mgr::shutdown_watcher() => {
_ = tenant_ctx.cancelled() => {
info!("received cancellation request during idling");
break;
},
@@ -99,26 +94,28 @@ async fn compaction_loop(tenant_id: TenantId) {
///
/// GC task's main loop
///
async fn gc_loop(tenant_id: TenantId) {
async fn gc_loop(tenant: &Arc<Tenant>) {
let wait_duration = Duration::from_secs(2);
info!("starting");
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
async {
// GC might require downloading, to find the cutoff LSN that corresponds to the
// cutoff specified as time.
let top_ctx = RequestContext::new(TaskKind::GarbageCollector, DownloadBehavior::Download);
let tenant_ctx = match tenant.get_context(&top_ctx) {
Ok(ctx) => ctx,
Err(state) => {
// This could happen if the tenant is detached or the pageserver is shut
// down immediately after loading or attaching completed and the tenant
// was activated. It seems unlikely enough in practice that we better print
// a warning, as it could also be a bug.
error!("Not running GC loop, tenant is not active: {state:?}");
return;
}
};
loop {
trace!("waking up");
let tenant = tokio::select! {
_ = task_mgr::shutdown_watcher() => {
info!("received cancellation request");
return;
},
tenant_wait_result = wait_for_active_tenant(tenant_id, wait_duration) => match tenant_wait_result {
ControlFlow::Break(()) => return,
ControlFlow::Continue(tenant) => tenant,
},
};
let tenant_ctx = tenant.get_context();
let gc_period = tenant.get_gc_period();
let gc_horizon = tenant.get_gc_horizon();
let mut sleep_duration = gc_period;
@@ -129,6 +126,7 @@ async fn gc_loop(tenant_id: TenantId) {
} else {
// Run gc
if gc_horizon > 0 {
// Run compaction
if let Err(e) = tenant
.gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), &tenant_ctx)
.await
@@ -141,7 +139,7 @@ async fn gc_loop(tenant_id: TenantId) {
// Sleep
tokio::select! {
_ = task_mgr::shutdown_watcher() => {
_ = tenant_ctx.cancelled() => {
info!("received cancellation request during idling");
break;
},
@@ -153,46 +151,3 @@ async fn gc_loop(tenant_id: TenantId) {
TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
trace!("GC loop stopped.");
}
async fn wait_for_active_tenant(
tenant_id: TenantId,
wait: Duration,
) -> ControlFlow<(), Arc<Tenant>> {
let tenant = loop {
match mgr::get_tenant(tenant_id, false).await {
Ok(tenant) => break tenant,
Err(e) => {
error!("Failed to get a tenant {tenant_id}: {e:#}");
tokio::time::sleep(wait).await;
}
}
};
// if the tenant has a proper status already, no need to wait for anything
if tenant.current_state() == TenantState::Active {
ControlFlow::Continue(tenant)
} else {
let mut tenant_state_updates = tenant.subscribe_for_state_updates();
loop {
match tenant_state_updates.changed().await {
Ok(()) => {
let new_state = *tenant_state_updates.borrow();
match new_state {
TenantState::Active => {
debug!("Tenant state changed to active, continuing the task loop");
return ControlFlow::Continue(tenant);
}
state => {
debug!("Not running the task loop, tenant is not active: {state:?}");
continue;
}
}
}
Err(_sender_dropped_error) => {
info!("Tenant dropped the state updates sender, quitting waiting for tenant and the task loop");
return ControlFlow::Break(());
}
}
}
}
}

View File

@@ -10,8 +10,7 @@ use once_cell::sync::OnceCell;
use pageserver_api::models::{
DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskState, TimelineState,
};
use tokio::sync::{oneshot, watch, Semaphore, TryAcquireError};
use tokio_util::sync::CancellationToken;
use tokio::sync::{watch, Semaphore, TryAcquireError};
use tracing::*;
use std::cmp::{max, min, Ordering};
@@ -23,7 +22,7 @@ use std::sync::atomic::{AtomicI64, Ordering as AtomicOrdering};
use std::sync::{Arc, Mutex, MutexGuard, RwLock, Weak};
use std::time::{Duration, Instant, SystemTime};
use crate::context::RequestContext;
use crate::context::{DownloadBehavior, RequestContext, RequestContextId, TaskKind};
use crate::tenant::remote_timeline_client::{self, index::LayerFileMetadata};
use crate::tenant::storage_layer::{
DeltaFileName, DeltaLayerWriter, ImageFileName, ImageLayerWriter, InMemoryLayer, LayerFileName,
@@ -41,9 +40,9 @@ use crate::tenant::{
use crate::config::PageServerConf;
use crate::keyspace::{KeyPartitioning, KeySpace};
use crate::metrics::TimelineMetrics;
use crate::pgdatadir_mapping::BlockNumber;
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::pgdatadir_mapping::{is_rel_fsm_block_key, is_rel_vm_block_key};
use crate::pgdatadir_mapping::{BlockNumber, CalculateLogicalSizeError};
use crate::tenant::config::TenantConfOpt;
use pageserver_api::reltag::RelTag;
@@ -59,7 +58,6 @@ use utils::{
use crate::page_cache;
use crate::repository::GcResult;
use crate::repository::{Key, Value};
use crate::task_mgr::TaskKind;
use crate::walreceiver::{is_broker_client_initialized, spawn_connection_manager_task};
use crate::walredo::WalRedoManager;
use crate::METADATA_FILE_NAME;
@@ -192,6 +190,10 @@ pub struct Timeline {
download_all_remote_layers_task_info: RwLock<Option<DownloadRemoteLayersTaskInfo>>,
state: watch::Sender<TimelineState>,
/// RequestContexts associated with this timeline. Used on
/// shutdown, to cancel and wait for operations to finish.
active_contexts: Mutex<HashMap<RequestContextId, TaskKind>>,
}
/// Internal structure to hold all data needed for logical size calculation.
@@ -379,6 +381,12 @@ pub enum PageReconstructError {
#[error(transparent)]
Other(#[from] anyhow::Error), // source and Display delegate to anyhow::Error
/// The operation would require downloading a layer that is missing locally.
NeedsDownload(Weak<Timeline>, Weak<RemoteLayer>),
/// The operation was cancelled
Cancelled,
/// An error happened replaying WAL records
#[error(transparent)]
WalRedo(#[from] crate::walredo::WalRedoError),
@@ -388,6 +396,19 @@ impl std::fmt::Debug for PageReconstructError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
match self {
Self::Other(err) => err.fmt(f),
Self::NeedsDownload(_tli, _layer) => write!(f, "needs download"),
Self::Cancelled => write!(f, "cancelled"),
Self::WalRedo(err) => err.fmt(f),
}
}
}
impl std::fmt::Display for PageReconstructError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
match self {
Self::Other(err) => err.fmt(f),
Self::NeedsDownload(_tli, _layer) => write!(f, "needs download"),
Self::Cancelled => write!(f, "cancelled"),
Self::WalRedo(err) => err.fmt(f),
}
}
@@ -412,15 +433,76 @@ impl Timeline {
self.latest_gc_cutoff_lsn.read()
}
/// Return a new TenantRequestContext.
/// similar to Tenant::register_context
pub fn register_context(
&self,
tenant_ctx: TenantRequestContext,
) -> Result<TimelineRequestContext, TimelineState> {
let state_ref = self.state.borrow();
let state = *state_ref;
if state == TimelineState::Active || state == TimelineState::Suspended {
if self
.active_contexts
.lock()
.unwrap()
.insert(tenant_ctx.ctx.context_id(), tenant_ctx.ctx.task_kind())
.is_some()
{
panic!("active_contexts out of sync");
}
let timeline_ctx = TimelineRequestContext {
ctx: tenant_ctx,
timeline: self.myself.upgrade().unwrap(),
};
Ok(timeline_ctx)
} else {
Err(state)
}
}
pub fn get_context(
&self,
tenant_ctx: &TenantRequestContext,
) -> Result<TimelineRequestContext, TimelineState> {
self.register_context(tenant_ctx.register_another(RequestContext::with_parent(
tenant_ctx.task_kind(),
tenant_ctx.download_behavior(),
&tenant_ctx.ctx,
)))
}
fn deregister_context(&self, context_id: RequestContextId) {
if self
.active_contexts
.lock()
.unwrap()
.remove(&context_id)
.is_none()
{
panic!("active_contexts out of sync");
}
}
///
/// XXX: This is a placeholder. In the next commit, this will
/// check that the timeline is still active.
pub fn get_context(&self) -> TimelineRequestContext {
TimelineRequestContext {
ctx: TenantRequestContext {
ctx: RequestContext {},
},
/// Wait until all RequestContexts registered with the Timeline have been dropped.
///
/// This should be called only after setting the state to Stopping. Otherwise
/// new contexts can appear at any time.
///
pub async fn wait_no_more_active_contexts(&self) {
let mut retries = 0;
loop {
if self.active_contexts.lock().unwrap().is_empty() {
return;
}
crate::exponential_backoff(
retries,
crate::DEFAULT_BASE_BACKOFF_SECONDS,
crate::DEFAULT_MAX_BACKOFF_SECONDS,
)
.await;
retries += 1;
}
}
@@ -532,13 +614,14 @@ impl Timeline {
/// You should call this before any of the other get_* or list_* functions. Calling
/// those functions with an LSN that has been processed yet is an error.
///
/// TODO: also return if 'ctx' is cancelled
pub async fn wait_lsn(&self, lsn: Lsn, ctx: &TimelineRequestContext) -> anyhow::Result<()> {
anyhow::ensure!(self.is_active(), "Cannot wait for Lsn on inactive timeline");
// This should never be called from the WAL receiver, because that could lead
// to a deadlock.
anyhow::ensure!(
task_mgr::current_task_kind() != Some(TaskKind::WalReceiverConnection),
ctx.task_kind() != TaskKind::WalReceiverConnection,
"wait_lsn cannot be called in WAL receiver"
);
@@ -905,12 +988,80 @@ impl Timeline {
download_all_remote_layers_task_info: RwLock::new(None),
state,
active_contexts: Mutex::new(HashMap::new()),
};
result.repartition_threshold = result.get_checkpoint_distance() / 10;
result
})
}
async fn graceful_shutdown_phase(&self, phase: u32) {
let mut contexts_to_kill = Vec::new();
{
let active_contexts = self.active_contexts.lock().unwrap();
for (&context_id, &task_kind) in active_contexts.iter() {
let this_phase = match task_kind {
TaskKind::UnitTest => 1,
// First shut down all client connections and
// management request.
// Also, if we were still doing the initial load or
// attach operation, cancel that.
// Also stop compaction and GC background tasks.
//
// Also stop WAL receiver immediately. A client
// request could be waiting for new WAL to arrive,
// but we're cancelling all such requests too.
TaskKind::LibpqEndpointListener
| TaskKind::HttpEndpointListener
| TaskKind::PageRequestHandler
| TaskKind::MgmtRequest
| TaskKind::GarbageCollector
| TaskKind::Compaction
| TaskKind::InitialLogicalSizeCalculation
| TaskKind::InitialLoad
| TaskKind::Attach
| TaskKind::DownloadAllRemoteLayers
| TaskKind::RemoteDownloadTask
| TaskKind::WalReceiverConnection
| TaskKind::WalReceiverManager => 1,
// There is no more incoming WAL.
TaskKind::LayerFlush => 2,
// FIXME: should we wait for in-progress uploads to finish?
// With a timeout?
TaskKind::RemoteUploadTask => 2,
TaskKind::MetricsCollection => 3,
};
if this_phase == phase {
contexts_to_kill.push(context_id);
}
}
}
crate::context::cancel_and_wait(&mut contexts_to_kill).await;
}
pub(super) async fn graceful_shutdown(&self, flush: bool) {
let state = *self.state.borrow();
assert!(
state == TimelineState::Stopping || state == TimelineState::Suspended,
"graceful_shutdown called on timeline in state {state:?}"
);
self.graceful_shutdown_phase(1).await;
if flush {
if let Err(err) = self.freeze_and_flush().await {
error!("error flushing in-memory data during shutdown: {err:?}");
// Continue with the shutdown anyway, it's the best we can do
}
}
self.graceful_shutdown_phase(2).await;
self.graceful_shutdown_phase(3).await;
}
pub(super) fn maybe_spawn_flush_loop(self: &Arc<Self>, ctx: &TimelineRequestContext) {
let mut flush_loop_state = self.flush_loop_state.lock().unwrap();
match *flush_loop_state {
@@ -933,22 +1084,25 @@ impl Timeline {
let layer_flush_start_rx = self.layer_flush_start_tx.subscribe();
let self_clone = Arc::clone(self);
let background_ctx = ctx.register_another(RequestContext::new(
TaskKind::LayerFlush,
DownloadBehavior::Error,
));
info!("spawning flush loop");
task_mgr::spawn(
task_mgr::BACKGROUND_RUNTIME.handle(),
task_mgr::TaskKind::LayerFlushTask,
Some(self.tenant_id),
Some(self.timeline_id),
"layer flush task",
false,
async move {
self_clone.flush_loop(layer_flush_start_rx).await;
let mut flush_loop_state = self_clone.flush_loop_state.lock().unwrap();
assert_eq!(*flush_loop_state, FlushLoopState::Running);
*flush_loop_state = FlushLoopState::Exited;
Ok(()) }
.instrument(info_span!(parent: None, "layer flush task", tenant = %self.tenant_id, timeline = %self.timeline_id))
);
task_mgr::BACKGROUND_RUNTIME.handle(),
"layer flush task",
false,
async move {
self_clone.flush_loop(layer_flush_start_rx, &background_ctx).await;
let mut flush_loop_state = self_clone.flush_loop_state.lock().unwrap();
assert_eq!(*flush_loop_state, FlushLoopState::Running);
*flush_loop_state = FlushLoopState::Exited;
}
.instrument(info_span!(parent: None, "layer flush task", tenant = %self.tenant_id, timeline = %self.timeline_id))
);
*flush_loop_state = FlushLoopState::Running;
}
@@ -963,7 +1117,10 @@ impl Timeline {
}
}
let background_ctx = self.get_context();
let background_ctx = ctx.register_another(RequestContext::new(
TaskKind::WalReceiverManager,
DownloadBehavior::Error,
));
info!(
"launching WAL receiver for timeline {} of tenant {}",
@@ -1270,19 +1427,24 @@ impl Timeline {
.map(|l| (l.filename(), l))
.collect::<HashMap<_, _>>();
let upload_ctx = ctx.register_another(RequestContext::new(
TaskKind::RemoteUploadTask,
DownloadBehavior::Error,
));
let local_only_layers = match index_part {
Some(index_part) => {
info!(
"initializing upload queue from remote index with {} layer files",
index_part.timeline_layers.len()
);
remote_client.init_upload_queue(index_part)?;
remote_client.init_upload_queue(index_part, upload_ctx)?;
self.create_remote_layers(index_part, local_layers, disk_consistent_lsn)
.await?
}
None => {
info!("initializing upload queue as empty");
remote_client.init_upload_queue_for_empty_remote(up_to_date_metadata)?;
remote_client
.init_upload_queue_for_empty_remote(up_to_date_metadata, upload_ctx)?;
local_layers
}
};
@@ -1326,26 +1488,35 @@ impl Timeline {
.is_none());
// We need to start the computation task.
let self_clone = Arc::clone(self);
let background_ctx = ctx.register_another(RequestContext::new(
TaskKind::InitialLogicalSizeCalculation,
DownloadBehavior::Download,
));
task_mgr::spawn(
task_mgr::BACKGROUND_RUNTIME.handle(),
task_mgr::TaskKind::InitialLogicalSizeCalculation,
Some(self.tenant_id),
Some(self.timeline_id),
"initial size calculation",
false,
// NB: don't log errors here, task_mgr will do that.
async move {
let calculated_size = match self_clone.logical_size_calculation_task(init_lsn).await
let calculated_size = match self_clone
.calculate_logical_size(init_lsn, &background_ctx)
.await
{
Ok(s) => s,
Err(CalculateLogicalSizeError::Cancelled) => {
Err(PageReconstructError::Cancelled) => {
// Don't make noise, this is a common task.
// In the unlikely case that there ihs another call to this function, we'll retry
// because initial_logical_size is still None.
info!("initial size calculation cancelled, likely timeline delete / tenant detach");
return Ok(());
return;
}
Err(err) => {
error!(
"initial size calculation for {}/{} failed: {:?}",
self_clone.tenant_id, self_clone.timeline_id, err
);
return;
}
x @ Err(_) => x.context("Failed to calculate logical size")?,
};
match self_clone
.current_logical_size
@@ -1362,112 +1533,19 @@ impl Timeline {
// now that `initial_logical_size.is_some()`, reduce permit count to 0
// so that we prevent future callers from spawning this task
permit.forget();
Ok(())
},
);
}
pub fn spawn_ondemand_logical_size_calculation(
self: &Arc<Self>,
lsn: Lsn,
ctx: &TimelineRequestContext,
) -> oneshot::Receiver<Result<u64, CalculateLogicalSizeError>> {
let (sender, receiver) = oneshot::channel();
let self_clone = Arc::clone(self);
task_mgr::spawn(
task_mgr::BACKGROUND_RUNTIME.handle(),
task_mgr::TaskKind::InitialLogicalSizeCalculation,
Some(self.tenant_id),
Some(self.timeline_id),
"ondemand logical size calculation",
false,
async move {
let res = self_clone.logical_size_calculation_task(lsn).await;
let _ = sender.send(res).ok();
Ok(()) // Receiver is responsible for handling errors
},
);
receiver
}
#[instrument(skip_all, fields(tenant = %self.tenant_id, timeline = %self.timeline_id))]
async fn logical_size_calculation_task(
self: &Arc<Self>,
init_lsn: Lsn,
) -> Result<u64, CalculateLogicalSizeError> {
let mut timeline_state_updates = self.subscribe_for_state_updates();
let self_calculation = Arc::clone(self);
let cancel = CancellationToken::new();
let calculation = async {
let cancel = cancel.child_token();
tokio::task::spawn_blocking(move || {
// Run in a separate thread since this can do a lot of
// synchronous file IO without .await inbetween
// if there are no RemoteLayers that would require downloading.
let h = tokio::runtime::Handle::current();
let ctx = self_calculation.get_context();
h.block_on(self_calculation.calculate_logical_size(init_lsn, cancel, &ctx))
})
.await
.context("Failed to spawn calculation result task")?
};
let timeline_state_cancellation = async {
loop {
match timeline_state_updates.changed().await {
Ok(()) => {
let new_state = *timeline_state_updates.borrow();
match new_state {
// we're running this job for active timelines only
TimelineState::Active => continue,
TimelineState::Broken
| TimelineState::Stopping
| TimelineState::Suspended => {
break format!("aborted because timeline became inactive (new state: {new_state:?})")
}
}
}
Err(_sender_dropped_error) => {
// can't happen, the sender is not dropped as long as the Timeline exists
break "aborted because state watch was dropped".to_string();
}
}
}
};
let taskmgr_shutdown_cancellation = async {
task_mgr::shutdown_watcher().await;
"aborted because task_mgr shutdown requested".to_string()
};
tokio::pin!(calculation);
loop {
tokio::select! {
res = &mut calculation => { return res }
reason = timeline_state_cancellation => {
debug!(reason = reason, "cancelling calculation");
cancel.cancel();
return calculation.await;
}
reason = taskmgr_shutdown_cancellation => {
debug!(reason = reason, "cancelling calculation");
cancel.cancel();
return calculation.await;
}
}
}
}
/// Calculate the logical size of the database at the latest LSN.
///
/// NOTE: counted incrementally, includes ancestors. This can be a slow operation,
/// especially if we need to download remote layers.
async fn calculate_logical_size(
pub async fn calculate_logical_size(
&self,
up_to_lsn: Lsn,
cancel: CancellationToken,
ctx: &TimelineRequestContext,
) -> Result<u64, CalculateLogicalSizeError> {
) -> Result<u64, PageReconstructError> {
info!(
"Calculating logical size for timeline {} at {}",
self.timeline_id, up_to_lsn
@@ -1509,7 +1587,7 @@ impl Timeline {
self.metrics.logical_size_histo.start_timer()
};
let logical_size = self
.get_current_logical_size_non_incremental(up_to_lsn, cancel, ctx)
.get_current_logical_size_non_incremental(up_to_lsn, ctx)
.await?;
debug!("calculated logical size: {logical_size}");
timer.stop_and_record();
@@ -1779,9 +1857,24 @@ impl Timeline {
// The next layer doesn't exist locally. Need to download it.
// (The control flow is a bit complicated here because we must drop the 'layers'
// lock before awaiting on the Future.)
info!("on-demand downloading remote layer {id}");
timeline.download_remote_layer(remote_layer).await?;
continue 'layer_map_search;
match ctx.download_behavior() {
DownloadBehavior::Download => {
info!("on-demand downloading remote layer {id}");
timeline.download_remote_layer(remote_layer).await?;
continue 'layer_map_search;
}
DownloadBehavior::Warn => {
warn!("unexpectedly on-demand downloading remote layer {id}");
timeline.download_remote_layer(remote_layer).await?;
continue 'layer_map_search;
}
DownloadBehavior::Error => {
return Err(PageReconstructError::NeedsDownload(
timeline.myself.clone(),
Arc::downgrade(&remote_layer),
))
}
}
}
}
}
@@ -1903,12 +1996,15 @@ impl Timeline {
}
/// Layer flusher task's main loop.
async fn flush_loop(&self, mut layer_flush_start_rx: tokio::sync::watch::Receiver<u64>) {
async fn flush_loop(
&self,
mut layer_flush_start_rx: tokio::sync::watch::Receiver<u64>,
ctx: &TimelineRequestContext,
) {
info!("started flush loop");
let ctx = self.get_context();
loop {
tokio::select! {
_ = task_mgr::shutdown_watcher() => {
_ = ctx.cancelled() => {
info!("shutting down layer flush task");
break;
},
@@ -1925,7 +2021,7 @@ impl Timeline {
// drop 'layers' lock to allow concurrent reads and writes
};
if let Some(layer_to_flush) = layer_to_flush {
if let Err(err) = self.flush_frozen_layer(layer_to_flush, &ctx).await {
if let Err(err) = self.flush_frozen_layer(layer_to_flush, ctx).await {
error!("could not flush frozen layer: {err:?}");
break Err(err);
}
@@ -2003,6 +2099,8 @@ impl Timeline {
let lsn_range = frozen_layer.get_lsn_range();
let layer_paths_to_upload =
if lsn_range.start == self.initdb_lsn && lsn_range.end == Lsn(self.initdb_lsn.0 + 1) {
// Note: The 'ctx' in use here has DownloadBehavior::Error. We should not
// require downloading anything during initial import.
let (partitioning, _lsn) = self
.repartition(self.initdb_lsn, self.get_compaction_target_size(), ctx)
.await?;
@@ -2613,7 +2711,7 @@ impl Timeline {
// Do it here because we don't want to hold self.layers.write() while waiting.
if let Some(remote_client) = &self.remote_client {
debug!("waiting for upload ops to complete");
remote_client.wait_completion().await?;
remote_client.wait_completion(ctx).await?;
}
let mut layers = self.layers.write().unwrap();
@@ -2838,7 +2936,7 @@ impl Timeline {
// Do it here because we don't want to hold self.layers.write() while waiting.
if let Some(remote_client) = &self.remote_client {
debug!("waiting for upload ops to complete");
remote_client.wait_completion().await?;
remote_client.wait_completion(ctx).await?;
}
let mut layers_to_remove = Vec::new();
@@ -3097,9 +3195,6 @@ impl Timeline {
let self_clone = self.myself.upgrade().expect("timeline is gone");
task_mgr::spawn(
&tokio::runtime::Handle::current(),
TaskKind::RemoteDownloadTask,
Some(self.tenant_id),
Some(self.timeline_id),
&format!("download layer {}", remote_layer.short_id()),
false,
async move {
@@ -3145,8 +3240,6 @@ impl Timeline {
// XXX: This resets the exponential backoff because it's a new call to
// download_layer file.
drop(permit);
Ok(())
},
);
@@ -3170,14 +3263,15 @@ impl Timeline {
}
}
let child_ctx = self.get_context();
let child_ctx = ctx.register_another(RequestContext::new(
TaskKind::DownloadAllRemoteLayers,
DownloadBehavior::Download,
));
let task_id = child_ctx.context_id();
let self_clone = Arc::clone(&self);
let task_id = task_mgr::spawn(
task_mgr::spawn(
task_mgr::BACKGROUND_RUNTIME.handle(),
task_mgr::TaskKind::DownloadAllRemoteLayers,
Some(self.tenant_id),
Some(self.timeline_id),
"download all remote layers task",
false,
async move {
@@ -3188,7 +3282,7 @@ impl Timeline {
warn!("tasks status is supposed to be Some(), since we are running");
}
Some(st) => {
let exp_task_id = format!("{}", task_mgr::current_task_id().unwrap());
let exp_task_id = format!("{:?}", child_ctx.context_id());
if st.task_id != exp_task_id {
warn!("task id changed while we were still running, expecting {} but have {}", exp_task_id, st.task_id);
} else {
@@ -3196,13 +3290,12 @@ impl Timeline {
}
}
};
Ok(())
}
.instrument(info_span!(parent: None, "download_all_remote_layers", tenant = %self.tenant_id, timeline = %self.timeline_id))
);
let initial_info = DownloadRemoteLayersTaskInfo {
task_id: format!("{task_id}"),
task_id: format!("{task_id:?}"),
state: DownloadRemoteLayersTaskState::Running,
total_layer_count: 0,
successful_download_count: 0,
@@ -3229,13 +3322,7 @@ impl Timeline {
let st = st
.as_mut()
.expect("this function is only called after the task has been spawned");
assert_eq!(
st.task_id,
format!(
"{}",
task_mgr::current_task_id().expect("we run inside a task_mgr task")
)
);
assert_eq!(st.task_id, format!("{:?}", ctx.context_id()));
let $st = st;
};
}
@@ -3259,9 +3346,9 @@ impl Timeline {
}
}
}
_ = task_mgr::shutdown_watcher() => {
_ = ctx.cancelled() => {
// Kind of pointless to watch for shutdowns here,
// as download_remote_layer spawns other task_mgr tasks internally.
// as download_remote_layer spawns other tasks internally.
lock_status!(st);
st.state = DownloadRemoteLayersTaskState::ShutDown;
}
@@ -3378,9 +3465,18 @@ fn rename_to_backup(path: &Path) -> anyhow::Result<()> {
bail!("couldn't find an unused backup number for {:?}", path)
}
/// XXX: Placeholder
///
/// TimelineRequestContext is a RequestContext that has a reference to a particular
/// Timeline in a Tenant.
///
/// Like TenantRequestContext, holding a TimelineRequestContext prevents the Timeline
/// from being deleted.
///
/// Use Timeline::get_context() to get a TimelineRequestContext.
///
pub struct TimelineRequestContext {
ctx: TenantRequestContext,
pub ctx: TenantRequestContext,
timeline: Arc<Timeline>,
}
impl Deref for TimelineRequestContext {
@@ -3390,3 +3486,30 @@ impl Deref for TimelineRequestContext {
&self.ctx.ctx
}
}
impl Drop for TimelineRequestContext {
fn drop(&mut self) {
self.timeline.deregister_context(self.context_id())
}
}
impl TimelineRequestContext {
pub fn register_another(&self, ctx: RequestContext) -> TimelineRequestContext {
let ctx = self.ctx.register_another(ctx);
if self
.timeline
.active_contexts
.lock()
.unwrap()
.insert(ctx.ctx.context_id(), ctx.ctx.task_kind())
.is_some()
{
panic!("active_contexts out of sync");
}
TimelineRequestContext {
ctx,
timeline: Arc::clone(&self.timeline),
}
}
}

View File

@@ -4,6 +4,7 @@ use super::storage_layer::LayerFileName;
use crate::tenant::metadata::TimelineMetadata;
use crate::tenant::remote_timeline_client::index::IndexPart;
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
use crate::tenant::TimelineRequestContext;
use std::collections::{HashMap, VecDeque};
use std::fmt::Debug;
@@ -73,6 +74,13 @@ pub(crate) struct UploadQueueInitialized {
/// tasks to finish. For example, metadata upload cannot be performed before all
/// preceding layer file uploads have completed.
pub(crate) queued_operations: VecDeque<UploadOp>,
/// Context used for the upload tasks. Note that this is associated with the
/// Timeline, so this prevents the Timeline from being shut down. To ensure quick
/// shutdown, RemoteTimelineClient spawns a task to wait for cancellation on the
/// context and stop the queue. Otherwise we woudn't notice the cancellation
/// until next upload attempt.
pub(crate) upload_ctx: TimelineRequestContext,
}
pub(crate) struct UploadQueueStopped {
@@ -83,6 +91,7 @@ impl UploadQueue {
pub(crate) fn initialize_empty_remote(
&mut self,
metadata: &TimelineMetadata,
upload_ctx: TimelineRequestContext,
) -> anyhow::Result<&mut UploadQueueInitialized> {
match self {
UploadQueue::Uninitialized => (),
@@ -108,6 +117,7 @@ impl UploadQueue {
num_inprogress_deletions: 0,
inprogress_tasks: HashMap::new(),
queued_operations: VecDeque::new(),
upload_ctx,
};
*self = UploadQueue::Initialized(state);
@@ -117,6 +127,7 @@ impl UploadQueue {
pub(crate) fn initialize_with_current_remote_index_part(
&mut self,
index_part: &IndexPart,
upload_ctx: TimelineRequestContext,
) -> anyhow::Result<&mut UploadQueueInitialized> {
match self {
UploadQueue::Uninitialized => (),
@@ -153,6 +164,7 @@ impl UploadQueue {
num_inprogress_deletions: 0,
inprogress_tasks: HashMap::new(),
queued_operations: VecDeque::new(),
upload_ctx,
};
*self = UploadQueue::Initialized(state);

View File

@@ -25,7 +25,7 @@ use postgres_ffi::v14::nonrelfile_utils::clogpage_precedes;
use postgres_ffi::v14::nonrelfile_utils::slru_may_delete_clogsegment;
use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn};
use anyhow::Result;
use anyhow::{Context, Result};
use bytes::{Buf, Bytes, BytesMut};
use tracing::*;
@@ -85,7 +85,7 @@ impl<'a> WalIngest<'a> {
modification: &mut DatadirModification<'_>,
decoded: &mut DecodedWALRecord,
ctx: &TimelineRequestContext,
) -> anyhow::Result<()> {
) -> Result<(), PageReconstructError> {
modification.lsn = lsn;
decode_wal_record(recdata, decoded, self.timeline.pg_version)?;
@@ -288,7 +288,8 @@ impl<'a> WalIngest<'a> {
{
let mut checkpoint_bytes = [0u8; SIZEOF_CHECKPOINT];
buf.copy_to_slice(&mut checkpoint_bytes);
let xlog_checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
let xlog_checkpoint =
CheckPoint::decode(&checkpoint_bytes).context("error decoding checkpoint")?;
trace!(
"xlog_checkpoint.oldestXid={}, checkpoint.oldestXid={}",
xlog_checkpoint.oldestXid,
@@ -315,7 +316,10 @@ impl<'a> WalIngest<'a> {
// If checkpoint data was updated, store the new version in the repository
if self.checkpoint_modified {
let new_checkpoint_bytes = self.checkpoint.encode()?;
let new_checkpoint_bytes = self
.checkpoint
.encode()
.context("error encoding checkpoint")?;
modification.put_checkpoint(new_checkpoint_bytes)?;
self.checkpoint_modified = false;
@@ -1198,8 +1202,7 @@ mod tests {
#[tokio::test]
async fn test_relsize() -> Result<()> {
let tenant = TenantHarness::create("test_relsize")?.load().await;
let tenant_ctx = tenant.get_context();
let (tenant, tenant_ctx) = TenantHarness::create("test_relsize")?.load().await;
let (tline, ctx) =
create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &tenant_ctx)?;
let mut walingest = init_walingest_test(&tline, &ctx).await?;
@@ -1419,8 +1422,7 @@ mod tests {
// and then created it again within the same layer.
#[tokio::test]
async fn test_drop_extend() -> Result<()> {
let tenant = TenantHarness::create("test_drop_extend")?.load().await;
let tenant_ctx = tenant.get_context();
let (tenant, tenant_ctx) = TenantHarness::create("test_drop_extend")?.load().await;
let (tline, ctx) =
create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &tenant_ctx)?;
let mut walingest = init_walingest_test(&tline, &ctx).await?;
@@ -1490,8 +1492,7 @@ mod tests {
// and then extended it again within the same layer.
#[tokio::test]
async fn test_truncate_extend() -> Result<()> {
let tenant = TenantHarness::create("test_truncate_extend")?.load().await;
let tenant_ctx = tenant.get_context();
let (tenant, tenant_ctx) = TenantHarness::create("test_truncate_extend")?.load().await;
let (tline, ctx) =
create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &tenant_ctx)?;
let mut walingest = init_walingest_test(&tline, &ctx).await?;
@@ -1632,8 +1633,7 @@ mod tests {
/// split into multiple 1 GB segments in Postgres.
#[tokio::test]
async fn test_large_rel() -> Result<()> {
let tenant = TenantHarness::create("test_large_rel")?.load().await;
let tenant_ctx = tenant.get_context();
let (tenant, tenant_ctx) = TenantHarness::create("test_large_rel")?.load().await;
let (tline, ctx) =
create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &tenant_ctx)?;
let mut walingest = init_walingest_test(&tline, &ctx).await?;

View File

@@ -103,19 +103,18 @@ pub enum TaskStateUpdate<E> {
impl<E: Clone> TaskHandle<E> {
/// Initializes the task, starting it immediately after the creation.
pub fn spawn<Fut>(
task: impl FnOnce(watch::Sender<TaskStateUpdate<E>>, CancellationToken) -> Fut + Send + 'static,
task: impl FnOnce(watch::Sender<TaskStateUpdate<E>>) -> Fut + Send + 'static,
cancellation: CancellationToken,
) -> Self
where
Fut: Future<Output = anyhow::Result<()>> + Send,
E: Send + Sync + 'static,
{
let cancellation = CancellationToken::new();
let (events_sender, events_receiver) = watch::channel(TaskStateUpdate::Started);
let cancellation_clone = cancellation.clone();
let join_handle = WALRECEIVER_RUNTIME.spawn(async move {
events_sender.send(TaskStateUpdate::Started).ok();
task(events_sender, cancellation_clone).await
task(events_sender).await
});
TaskHandle {

View File

@@ -11,7 +11,7 @@
use std::{collections::HashMap, num::NonZeroU64, ops::ControlFlow, sync::Arc, time::Duration};
use crate::task_mgr::TaskKind;
use crate::context::{DownloadBehavior, RequestContext, TaskKind};
use crate::task_mgr::WALRECEIVER_RUNTIME;
use crate::tenant::{Timeline, TimelineRequestContext};
use crate::{task_mgr, walreceiver::TaskStateUpdate};
@@ -55,9 +55,6 @@ pub fn spawn_connection_manager_task(
task_mgr::spawn(
WALRECEIVER_RUNTIME.handle(),
TaskKind::WalReceiverManager,
Some(tenant_id),
Some(timeline_id),
&format!("walreceiver for timeline {tenant_id}/{timeline_id}"),
false,
async move {
@@ -71,10 +68,10 @@ pub fn spawn_connection_manager_task(
);
loop {
select! {
_ = task_mgr::shutdown_watcher() => {
_ = ctx.cancelled() => {
info!("WAL receiver shutdown requested, shutting down");
walreceiver_state.shutdown().await;
return Ok(());
return;
},
loop_step_result = connection_manager_loop_step(
&mut broker_client,
@@ -85,7 +82,7 @@ pub fn spawn_connection_manager_task(
ControlFlow::Break(()) => {
info!("Connection manager loop ended, shutting down");
walreceiver_state.shutdown().await;
return Ok(());
return;
}
},
}
@@ -400,22 +397,31 @@ impl WalreceiverState {
let id = self.id;
let connect_timeout = self.wal_connect_timeout;
let timeline = Arc::clone(&self.timeline);
let child_ctx = timeline.get_context();
let connection_handle = TaskHandle::spawn(move |events_sender, cancellation| {
async move {
super::walreceiver_connection::handle_walreceiver_connection(
timeline,
new_wal_source_connconf,
events_sender,
cancellation,
connect_timeout,
child_ctx,
)
.await
.context("walreceiver connection handling failure")
}
.instrument(info_span!("walreceiver_connection", id = %id, node_id = %new_sk_id))
});
let child_ctx = ctx.register_another(RequestContext::with_parent(
TaskKind::WalReceiverConnection,
DownloadBehavior::Download,
ctx,
));
let cancellation_token = child_ctx.cancellation_token().clone();
let connection_handle = TaskHandle::spawn(
move |events_sender| {
async move {
super::walreceiver_connection::handle_walreceiver_connection(
timeline,
new_wal_source_connconf,
events_sender,
connect_timeout,
child_ctx,
)
.await
.context("walreceiver connection handling failure")
}
.instrument(info_span!("walreceiver_connection", id = %id, node_id = %new_sk_id))
},
cancellation_token,
);
let now = Utc::now().naive_utc();
self.wal_connection = Some(WalConnection {
@@ -827,6 +833,7 @@ fn wal_stream_connection_config(
mod tests {
use super::*;
use crate::tenant::harness::{TenantHarness, TIMELINE_ID};
use tokio_util::sync::CancellationToken;
use url::Host;
fn dummy_broker_sk_timeline(
@@ -907,12 +914,15 @@ mod tests {
started_at: now,
sk_id: connected_sk_id,
status: connection_status,
connection_task: TaskHandle::spawn(move |sender, _| async move {
sender
.send(TaskStateUpdate::Progress(connection_status))
.ok();
Ok(())
}),
connection_task: TaskHandle::spawn(
move |sender| async move {
sender
.send(TaskStateUpdate::Progress(connection_status))
.ok();
Ok(())
},
CancellationToken::new(),
),
discovered_new_wal: None,
});
state.wal_stream_candidates = HashMap::from([
@@ -1069,12 +1079,15 @@ mod tests {
started_at: now,
sk_id: connected_sk_id,
status: connection_status,
connection_task: TaskHandle::spawn(move |sender, _| async move {
sender
.send(TaskStateUpdate::Progress(connection_status))
.ok();
Ok(())
}),
connection_task: TaskHandle::spawn(
move |sender| async move {
sender
.send(TaskStateUpdate::Progress(connection_status))
.ok();
Ok(())
},
CancellationToken::new(),
),
discovered_new_wal: None,
});
state.wal_stream_candidates = HashMap::from([
@@ -1134,12 +1147,15 @@ mod tests {
started_at: now,
sk_id: NodeId(1),
status: connection_status,
connection_task: TaskHandle::spawn(move |sender, _| async move {
sender
.send(TaskStateUpdate::Progress(connection_status))
.ok();
Ok(())
}),
connection_task: TaskHandle::spawn(
move |sender| async move {
sender
.send(TaskStateUpdate::Progress(connection_status))
.ok();
Ok(())
},
CancellationToken::new(),
),
discovered_new_wal: None,
});
state.wal_stream_candidates = HashMap::from([(
@@ -1196,7 +1212,10 @@ mod tests {
started_at: now,
sk_id: NodeId(1),
status: connection_status,
connection_task: TaskHandle::spawn(move |_, _| async move { Ok(()) }),
connection_task: TaskHandle::spawn(
move |_| async move { Ok(()) },
CancellationToken::new(),
),
discovered_new_wal: Some(NewCommittedWAL {
discovered_at: time_over_threshold,
lsn: new_lsn,
@@ -1240,12 +1259,12 @@ mod tests {
const DUMMY_SAFEKEEPER_HOST: &str = "safekeeper_connstr";
async fn dummy_state(harness: &TenantHarness<'_>) -> WalreceiverState {
let tenant = harness.load().await;
let tenant_ctx = tenant.get_context();
let (tenant, tenant_ctx) = harness.load().await;
let (timeline, timeline_ctx) = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), crate::DEFAULT_PG_VERSION, &tenant_ctx)
.expect("Failed to create an empty timeline for dummy wal connection manager");
let timeline = timeline.initialize(&timeline_ctx).unwrap();
WalreceiverState {
id: TenantTimelineId {
tenant_id: harness.tenant_id,

View File

@@ -19,13 +19,11 @@ use postgres_protocol::message::backend::ReplicationMessage;
use postgres_types::PgLsn;
use tokio::{pin, select, sync::watch, time};
use tokio_postgres::{replication::ReplicationStream, Client};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, trace, warn};
use crate::{metrics::LIVE_CONNECTIONS_COUNT, walreceiver::TaskStateUpdate};
use crate::{
task_mgr,
task_mgr::TaskKind,
task_mgr::WALRECEIVER_RUNTIME,
tenant::{Timeline, TimelineRequestContext, WalReceiverInfo},
walingest::WalIngest,
@@ -60,7 +58,6 @@ pub async fn handle_walreceiver_connection(
timeline: Arc<Timeline>,
wal_source_connconf: PgConnectionConfig,
events_sender: watch::Sender<TaskStateUpdate<WalConnectionStatus>>,
cancellation: CancellationToken,
connect_timeout: Duration,
ctx: TimelineRequestContext,
) -> anyhow::Result<()> {
@@ -100,12 +97,9 @@ pub async fn handle_walreceiver_connection(
// The connection object performs the actual communication with the database,
// so spawn it off to run on its own.
let connection_cancellation = cancellation.clone();
let cancellation_token = ctx.cancellation_token().clone();
task_mgr::spawn(
WALRECEIVER_RUNTIME.handle(),
TaskKind::WalReceiverConnection,
Some(timeline.tenant_id),
Some(timeline.timeline_id),
"walreceiver connection",
false,
async move {
@@ -119,9 +113,8 @@ pub async fn handle_walreceiver_connection(
}
},
_ = connection_cancellation.cancelled() => info!("Connection cancelled"),
_ = cancellation_token.cancelled() => info!("Connection cancelled"),
}
Ok(())
},
);
@@ -183,6 +176,8 @@ pub async fn handle_walreceiver_connection(
let mut walingest = WalIngest::new(timeline.as_ref(), startpoint, &ctx).await?;
let cancellation = ctx.cancellation_token().clone();
while let Some(replication_message) = {
select! {
_ = cancellation.cancelled() => {

View File

@@ -1914,9 +1914,9 @@ class NeonPageserver(PgProtocol):
".*Shutdown task error: walreceiver connection handling failure.*",
".*wal_connection_manager.*tcp connect error: Connection refused.*",
".*query handler for .* failed: Socket IO error: Connection reset by peer.*",
".*serving compute connection task.*exited with error: Postgres connection error.*",
".*serving compute connection task.*exited with error: Connection reset by peer.*",
".*serving compute connection task.*exited with error: Postgres query error.*",
".*connection handler exited with error: Postgres connection error.*",
".*connection handler exited with error: Connection reset by peer.*",
".*connection handler exited with error: Postgres query error.*",
".*Connection aborted: connection error: error communicating with the server: Broken pipe.*",
".*Connection aborted: connection error: error communicating with the server: Transport endpoint is not connected.*",
".*Connection aborted: connection error: error communicating with the server: Connection reset by peer.*",

View File

@@ -371,7 +371,7 @@ def test_download_remote_layers_api(
env.pageserver.allowed_errors.extend(
[
f".*download_all_remote_layers.*{tenant_id}.*{timeline_id}.*layer download failed.*remote-storage-download-pre-rename failpoint",
f".*initial size calculation.*{tenant_id}.*{timeline_id}.*Failed to calculate logical size",
f".*initial size calculation for {tenant_id}/{timeline_id} failed.*remote-storage-download-pre-rename failpoint",
]
)

View File

@@ -543,6 +543,7 @@ def test_timeline_deletion_with_files_stuck_in_upload_queue(
)
env = neon_env_builder.init_start()
env.pageserver.allowed_errors.append(".*request cancelled while waiting on uploads to finish.*")
# create tenant with config that will determinstically allow
# compaction and gc