mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-09 06:22:57 +00:00
[4/4] the fix: do not leak spawn_blocking() tasks from logical size calculation code
- Refactor logical_size_calculation_task, moving the pieces that are
specific to try_spawn_size_init_task into that function.
This allows us to spawn additional size calculation tasks that are not
init size calculation tasks.
- As part of this refactoring, stop logging cancellations as errors.
They are part of regular operations.
Logging them as errors was inadvertently introduced in earlier commit
427c1b2e9661161439e65aabc173d695cfc03ab4
initial logical size calculation: if it fails, retry on next call
- Change tenant size model request code to spawn task_mgr tasks using
the refactored logical_size_calculation_task function.
Using a task_mgr task ensures that the calculation cannot outlive
the timeline.
- There are presumably still some subtle race conditions if a size
requests comes in at exactly the same time as a detach / delete
request.
- But that's the concern of diferent area of the code (e.g., tenant_mgr)
and requires holistic solutions, such as the proposed TenantGuard.
- Make size calculation cancellable using CancellationToken.
This is more of a cherry on top.
NB: the test code doesn't use this because we _must_ return from
the failpoint, because the failpoint lib doesn't allow to just
continue execution in combination with executing the closure.
This commit fixes the tests introduced earlier in this patch series.
This commit is contained in:
committed by
Christian Schwarz
parent
38ebd6e7a0
commit
7db018e147
@@ -4,6 +4,7 @@ use anyhow::{anyhow, Context, Result};
|
||||
use hyper::StatusCode;
|
||||
use hyper::{Body, Request, Response, Uri};
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
|
||||
use super::models::{
|
||||
@@ -86,8 +87,14 @@ fn build_timeline_info(
|
||||
) -> anyhow::Result<TimelineInfo> {
|
||||
let mut info = build_timeline_info_common(timeline)?;
|
||||
if include_non_incremental_logical_size {
|
||||
// XXX we should be using spawn_ondemand_logical_size_calculation here.
|
||||
// Otherwise, if someone deletes the timeline / detaches the tenant while
|
||||
// we're executing this function, we will outlive the timeline on-disk state.
|
||||
info.current_logical_size_non_incremental =
|
||||
Some(timeline.get_current_logical_size_non_incremental(info.last_record_lsn)?);
|
||||
Some(timeline.get_current_logical_size_non_incremental(
|
||||
info.last_record_lsn,
|
||||
CancellationToken::new(),
|
||||
)?);
|
||||
}
|
||||
if include_non_incremental_physical_size {
|
||||
info.current_physical_size_non_incremental =
|
||||
|
||||
@@ -10,7 +10,7 @@ use crate::keyspace::{KeySpace, KeySpaceAccum};
|
||||
use crate::repository::*;
|
||||
use crate::tenant::Timeline;
|
||||
use crate::walrecord::NeonWalRecord;
|
||||
use anyhow::{bail, ensure, Result};
|
||||
use anyhow::{bail, ensure, Context, Result};
|
||||
use bytes::{Buf, Bytes};
|
||||
use pageserver_api::reltag::{RelTag, SlruKind};
|
||||
use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
|
||||
@@ -19,6 +19,7 @@ use postgres_ffi::{Oid, TimestampTz, TransactionId};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::{hash_map, HashMap, HashSet};
|
||||
use std::ops::Range;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, trace, warn};
|
||||
use utils::{bin_ser::BeSer, lsn::Lsn};
|
||||
|
||||
@@ -33,6 +34,14 @@ pub enum LsnForTimestamp {
|
||||
NoData(Lsn),
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum CalculateLogicalSizeError {
|
||||
#[error("cancelled")]
|
||||
Cancelled,
|
||||
#[error(transparent)]
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
///
|
||||
/// This impl provides all the functionality to store PostgreSQL relations, SLRUs,
|
||||
/// and other special kinds of files, in a versioned key-value store. The
|
||||
@@ -376,14 +385,21 @@ impl Timeline {
|
||||
///
|
||||
/// Only relation blocks are counted currently. That excludes metadata,
|
||||
/// SLRUs, twophase files etc.
|
||||
pub fn get_current_logical_size_non_incremental(&self, lsn: Lsn) -> Result<u64> {
|
||||
pub fn get_current_logical_size_non_incremental(
|
||||
&self,
|
||||
lsn: Lsn,
|
||||
cancel: CancellationToken,
|
||||
) -> std::result::Result<u64, CalculateLogicalSizeError> {
|
||||
// Fetch list of database dirs and iterate them
|
||||
let buf = self.get(DBDIR_KEY, lsn)?;
|
||||
let dbdir = DbDirectory::des(&buf)?;
|
||||
let dbdir = DbDirectory::des(&buf).context("deserialize db directory")?;
|
||||
|
||||
let mut total_size: u64 = 0;
|
||||
for (spcnode, dbnode) in dbdir.dbdirs.keys() {
|
||||
for rel in self.list_rels(*spcnode, *dbnode, lsn)? {
|
||||
if cancel.is_cancelled() {
|
||||
return Err(CalculateLogicalSizeError::Cancelled);
|
||||
}
|
||||
let relsize_key = rel_size_to_key(rel);
|
||||
let mut buf = self.get(relsize_key, lsn)?;
|
||||
let relsize = buf.get_u32_le();
|
||||
|
||||
@@ -3,8 +3,11 @@ use std::collections::{HashMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Context;
|
||||
use tokio::sync::oneshot::error::RecvError;
|
||||
use tokio::sync::Semaphore;
|
||||
|
||||
use crate::pgdatadir_mapping::CalculateLogicalSizeError;
|
||||
|
||||
use super::Tenant;
|
||||
use utils::id::TimelineId;
|
||||
use utils::lsn::Lsn;
|
||||
@@ -212,11 +215,30 @@ pub(super) async fn gather_inputs(
|
||||
let mut have_any_error = false;
|
||||
|
||||
while let Some(res) = joinset.join_next().await {
|
||||
// each of these come with Result<Result<_, JoinError>, JoinError>
|
||||
// each of these come with Result<anyhow::Result<_>, JoinError>
|
||||
// because of spawn + spawn_blocking
|
||||
let res = res.and_then(|inner| inner);
|
||||
match res {
|
||||
Ok(TimelineAtLsnSizeResult(timeline, lsn, Ok(size))) => {
|
||||
Err(join_error) if join_error.is_cancelled() => {
|
||||
unreachable!("we are not cancelling any of the futures, nor should be");
|
||||
}
|
||||
Err(join_error) => {
|
||||
// cannot really do anything, as this panic is likely a bug
|
||||
error!("task that calls spawn_ondemand_logical_size_calculation panicked: {join_error:#}");
|
||||
have_any_error = true;
|
||||
}
|
||||
Ok(Err(recv_result_error)) => {
|
||||
// cannot really do anything, as this panic is likely a bug
|
||||
error!("failed to receive logical size query result: {recv_result_error:#}");
|
||||
have_any_error = true;
|
||||
}
|
||||
Ok(Ok(TimelineAtLsnSizeResult(timeline, lsn, Err(error)))) => {
|
||||
warn!(
|
||||
timeline_id=%timeline.timeline_id,
|
||||
"failed to calculate logical size at {lsn}: {error:#}"
|
||||
);
|
||||
have_any_error = true;
|
||||
}
|
||||
Ok(Ok(TimelineAtLsnSizeResult(timeline, lsn, Ok(size)))) => {
|
||||
debug!(timeline_id=%timeline.timeline_id, %lsn, size, "size calculated");
|
||||
|
||||
logical_size_cache.insert((timeline.timeline_id, lsn), size);
|
||||
@@ -228,21 +250,6 @@ pub(super) async fn gather_inputs(
|
||||
command: Command::Update(size),
|
||||
});
|
||||
}
|
||||
Ok(TimelineAtLsnSizeResult(timeline, lsn, Err(error))) => {
|
||||
warn!(
|
||||
timeline_id=%timeline.timeline_id,
|
||||
"failed to calculate logical size at {lsn}: {error:#}"
|
||||
);
|
||||
have_any_error = true;
|
||||
}
|
||||
Err(join_error) if join_error.is_cancelled() => {
|
||||
unreachable!("we are not cancelling any of the futures, nor should be");
|
||||
}
|
||||
Err(join_error) => {
|
||||
// cannot really do anything, as this panic is likely a bug
|
||||
error!("logical size query panicked: {join_error:#}");
|
||||
have_any_error = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -351,7 +358,7 @@ enum LsnKind {
|
||||
struct TimelineAtLsnSizeResult(
|
||||
Arc<crate::tenant::Timeline>,
|
||||
utils::lsn::Lsn,
|
||||
anyhow::Result<u64>,
|
||||
Result<u64, CalculateLogicalSizeError>,
|
||||
);
|
||||
|
||||
#[instrument(skip_all, fields(timeline_id=%timeline.timeline_id, lsn=%lsn))]
|
||||
@@ -359,17 +366,15 @@ async fn calculate_logical_size(
|
||||
limit: Arc<tokio::sync::Semaphore>,
|
||||
timeline: Arc<crate::tenant::Timeline>,
|
||||
lsn: utils::lsn::Lsn,
|
||||
) -> Result<TimelineAtLsnSizeResult, tokio::task::JoinError> {
|
||||
let permit = tokio::sync::Semaphore::acquire_owned(limit)
|
||||
) -> Result<TimelineAtLsnSizeResult, RecvError> {
|
||||
let _permit = tokio::sync::Semaphore::acquire_owned(limit)
|
||||
.await
|
||||
.expect("global semaphore should not had been closed");
|
||||
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let _permit = permit;
|
||||
let size_res = timeline.calculate_logical_size(lsn);
|
||||
TimelineAtLsnSizeResult(timeline, lsn, size_res)
|
||||
})
|
||||
.await
|
||||
let size_res = timeline
|
||||
.spawn_ondemand_logical_size_calculation(lsn)
|
||||
.await?;
|
||||
Ok(TimelineAtLsnSizeResult(timeline, lsn, size_res))
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -6,8 +6,9 @@ use fail::fail_point;
|
||||
use itertools::Itertools;
|
||||
use once_cell::sync::OnceCell;
|
||||
use pageserver_api::models::TimelineState;
|
||||
use tokio::sync::watch;
|
||||
use tokio::sync::{oneshot, watch, Semaphore, TryAcquireError};
|
||||
use tokio::task::spawn_blocking;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
|
||||
use std::cmp::{max, min, Ordering};
|
||||
@@ -36,9 +37,9 @@ use crate::tenant::{
|
||||
use crate::config::PageServerConf;
|
||||
use crate::keyspace::{KeyPartitioning, KeySpace};
|
||||
use crate::metrics::TimelineMetrics;
|
||||
use crate::pgdatadir_mapping::BlockNumber;
|
||||
use crate::pgdatadir_mapping::LsnForTimestamp;
|
||||
use crate::pgdatadir_mapping::{is_rel_fsm_block_key, is_rel_vm_block_key};
|
||||
use crate::pgdatadir_mapping::{BlockNumber, CalculateLogicalSizeError};
|
||||
use crate::tenant_config::TenantConfOpt;
|
||||
use pageserver_api::reltag::RelTag;
|
||||
|
||||
@@ -176,7 +177,6 @@ pub struct Timeline {
|
||||
|
||||
/// Current logical size of the "datadir", at the last LSN.
|
||||
current_logical_size: LogicalSize,
|
||||
initial_size_computation_state: Mutex<InitialLogicalSizeComputationState>,
|
||||
|
||||
/// Information about the last processed message by the WAL receiver,
|
||||
/// or None if WAL receiver has not received anything for this timeline
|
||||
@@ -189,14 +189,6 @@ pub struct Timeline {
|
||||
state: watch::Sender<TimelineState>,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
|
||||
enum InitialLogicalSizeComputationState {
|
||||
NotStarted,
|
||||
Running,
|
||||
FailedWillRetryNextTime,
|
||||
Success,
|
||||
}
|
||||
|
||||
/// Internal structure to hold all data needed for logical size calculation.
|
||||
/// Calculation consists of two parts:
|
||||
/// 1. Initial size calculation. That might take a long time, because it requires
|
||||
@@ -210,6 +202,8 @@ struct LogicalSize {
|
||||
///
|
||||
/// NOTE: initial size is not a constant and will change between restarts.
|
||||
initial_logical_size: OnceCell<u64>,
|
||||
/// Semaphore to track ongoing calculation of `initial_logical_size`.
|
||||
initial_size_computation: Arc<tokio::sync::Semaphore>,
|
||||
/// Latest Lsn that has its size uncalculated, could be absent for freshly created timelines.
|
||||
initial_part_end: Option<Lsn>,
|
||||
/// All other size changes after startup, combined together.
|
||||
@@ -260,6 +254,8 @@ impl LogicalSize {
|
||||
fn empty_initial() -> Self {
|
||||
Self {
|
||||
initial_logical_size: OnceCell::with_value(0),
|
||||
// initial_logical_size already computed, so, don't admit any calculations
|
||||
initial_size_computation: Arc::new(Semaphore::new(0)),
|
||||
initial_part_end: None,
|
||||
size_added_after_initial: AtomicI64::new(0),
|
||||
}
|
||||
@@ -268,6 +264,7 @@ impl LogicalSize {
|
||||
fn deferred_initial(compute_to: Lsn) -> Self {
|
||||
Self {
|
||||
initial_logical_size: OnceCell::new(),
|
||||
initial_size_computation: Arc::new(Semaphore::new(1)),
|
||||
initial_part_end: Some(compute_to),
|
||||
size_added_after_initial: AtomicI64::new(0),
|
||||
}
|
||||
@@ -812,9 +809,6 @@ impl Timeline {
|
||||
// initial logical size is 0.
|
||||
LogicalSize::empty_initial()
|
||||
},
|
||||
initial_size_computation_state: Mutex::new(
|
||||
InitialLogicalSizeComputationState::NotStarted,
|
||||
),
|
||||
partitioning: Mutex::new((KeyPartitioning::new(), Lsn(0))),
|
||||
repartition_threshold: 0,
|
||||
|
||||
@@ -1231,13 +1225,21 @@ impl Timeline {
|
||||
}
|
||||
|
||||
fn try_spawn_size_init_task(self: &Arc<Self>, init_lsn: Lsn) {
|
||||
use InitialLogicalSizeComputationState::*;
|
||||
let mut guard = self.initial_size_computation_state.lock().unwrap();
|
||||
match *guard {
|
||||
Running | Success => return,
|
||||
NotStarted | FailedWillRetryNextTime => *guard = Running,
|
||||
}
|
||||
drop(guard);
|
||||
let permit = match Arc::clone(&self.current_logical_size.initial_size_computation)
|
||||
.try_acquire_owned()
|
||||
{
|
||||
Ok(permit) => permit,
|
||||
Err(TryAcquireError::NoPermits) => {
|
||||
// computation already ongoing or finished with success
|
||||
return;
|
||||
}
|
||||
Err(TryAcquireError::Closed) => unreachable!("we never call close"),
|
||||
};
|
||||
debug_assert!(self
|
||||
.current_logical_size
|
||||
.initial_logical_size
|
||||
.get()
|
||||
.is_none());
|
||||
// We need to start the computation task.
|
||||
let self_clone = Arc::clone(self);
|
||||
task_mgr::spawn(
|
||||
@@ -1247,79 +1249,131 @@ impl Timeline {
|
||||
Some(self.timeline_id),
|
||||
"initial size calculation",
|
||||
false,
|
||||
// NB: don't log errors here, task_mgr will do that.
|
||||
async move {
|
||||
let res = self_clone
|
||||
.initial_logical_size_calculation_task(init_lsn)
|
||||
.await;
|
||||
// task_mgr will log the result
|
||||
let new_state = match res {
|
||||
Ok(_) => Success,
|
||||
Err(_) => FailedWillRetryNextTime,
|
||||
let calculated_size = match self_clone.logical_size_calculation_task(init_lsn).await
|
||||
{
|
||||
Ok(s) => s,
|
||||
Err(CalculateLogicalSizeError::Cancelled) => {
|
||||
// Don't make noise, this is a common task.
|
||||
// In the unlikely case that there ihs another call to this function, we'll retry
|
||||
// because initial_logical_size is still None.
|
||||
info!("initial size calculation cancelled, likely timeline delete / tenant detach");
|
||||
return Ok(());
|
||||
}
|
||||
x @ Err(_) => x.context("Failed to calculate logical size")?,
|
||||
};
|
||||
let mut state = self_clone.initial_size_computation_state.lock().unwrap();
|
||||
if *state != Running {
|
||||
// Should be unreachable, but no reason to crash the pageserver. Don't touch anything.
|
||||
error!("expecting initial size computation task to be in state {Running:?}, got {state:?}")
|
||||
} else {
|
||||
*state = new_state;
|
||||
}
|
||||
res
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(tenant = %self.tenant_id, timeline = %self.timeline_id))]
|
||||
async fn initial_logical_size_calculation_task(
|
||||
self: &Arc<Self>,
|
||||
init_lsn: Lsn,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut timeline_state_updates = self.subscribe_for_state_updates();
|
||||
let self_calculation = Arc::clone(self);
|
||||
tokio::select! {
|
||||
calculation_result = spawn_blocking(move || self_calculation.calculate_logical_size(init_lsn)) => {
|
||||
let calculated_size = calculation_result
|
||||
.context("Failed to spawn calculation result task")?
|
||||
.context("Failed to calculate logical size")?;
|
||||
match self.current_logical_size.initial_logical_size.set(calculated_size) {
|
||||
match self_clone
|
||||
.current_logical_size
|
||||
.initial_logical_size
|
||||
.set(calculated_size)
|
||||
{
|
||||
Ok(()) => (),
|
||||
Err(existing_size) => {
|
||||
// This shouldn't happen because we use self.initial_size_computation_running to ensure exlusivity here.
|
||||
// This shouldn't happen because the semaphore is initialized with 1.
|
||||
// But if it happens, just complain & report success so there are no further retries.
|
||||
error!("Tried to update initial timeline size value to {calculated_size}, but the size was already set to {existing_size}, not changing")
|
||||
}
|
||||
}
|
||||
// now that `initial_logical_size.is_some()`, reduce permit count to 0
|
||||
// so that we prevent future callers from spawning this task
|
||||
permit.forget();
|
||||
Ok(())
|
||||
},
|
||||
_ = task_mgr::shutdown_watcher() => {
|
||||
anyhow::bail!("aborted because task_mgr shutdown requested");
|
||||
}
|
||||
new_event = async {
|
||||
loop {
|
||||
match timeline_state_updates.changed().await {
|
||||
Ok(()) => {
|
||||
let new_state = *timeline_state_updates.borrow();
|
||||
match new_state {
|
||||
// we're running this job for active timelines only
|
||||
TimelineState::Active => continue,
|
||||
TimelineState::Broken | TimelineState::Stopping | TimelineState::Suspended => return Some(new_state),
|
||||
);
|
||||
}
|
||||
|
||||
pub fn spawn_ondemand_logical_size_calculation(
|
||||
self: &Arc<Self>,
|
||||
lsn: Lsn,
|
||||
) -> oneshot::Receiver<Result<u64, CalculateLogicalSizeError>> {
|
||||
let (sender, receiver) = oneshot::channel();
|
||||
let self_clone = Arc::clone(self);
|
||||
task_mgr::spawn(
|
||||
task_mgr::BACKGROUND_RUNTIME.handle(),
|
||||
task_mgr::TaskKind::InitialLogicalSizeCalculation,
|
||||
Some(self.tenant_id),
|
||||
Some(self.timeline_id),
|
||||
"ondemand logical size calculation",
|
||||
false,
|
||||
async move {
|
||||
let res = self_clone.logical_size_calculation_task(lsn).await;
|
||||
let _ = sender.send(res).ok();
|
||||
Ok(()) // Receiver is responsible for handling errors
|
||||
},
|
||||
);
|
||||
receiver
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(tenant = %self.tenant_id, timeline = %self.timeline_id))]
|
||||
async fn logical_size_calculation_task(
|
||||
self: &Arc<Self>,
|
||||
init_lsn: Lsn,
|
||||
) -> Result<u64, CalculateLogicalSizeError> {
|
||||
let mut timeline_state_updates = self.subscribe_for_state_updates();
|
||||
let self_calculation = Arc::clone(self);
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
let calculation = async {
|
||||
let cancel = cancel.child_token();
|
||||
spawn_blocking(move || self_calculation.calculate_logical_size(init_lsn, cancel))
|
||||
.await
|
||||
.context("Failed to spawn calculation result task")?
|
||||
};
|
||||
let timeline_state_cancellation = async {
|
||||
loop {
|
||||
match timeline_state_updates.changed().await {
|
||||
Ok(()) => {
|
||||
let new_state = *timeline_state_updates.borrow();
|
||||
match new_state {
|
||||
// we're running this job for active timelines only
|
||||
TimelineState::Active => continue,
|
||||
TimelineState::Broken
|
||||
| TimelineState::Stopping
|
||||
| TimelineState::Suspended => {
|
||||
break format!("aborted because timeline became inactive (new state: {new_state:?})")
|
||||
}
|
||||
}
|
||||
Err(_sender_dropped_error) => return None,
|
||||
}
|
||||
Err(_sender_dropped_error) => {
|
||||
// can't happen, the sender is not dropped as long as the Timeline exists
|
||||
break "aborted because state watch was dropped".to_string();
|
||||
}
|
||||
}
|
||||
} => {
|
||||
match new_event {
|
||||
Some(new_state) => anyhow::bail!("aborted because timeline became inactive (new state: {new_state:?})"),
|
||||
None => anyhow::bail!("aborted because state watch was dropped"), // can't happen, the sender is not dropped as long as the Timeline exists
|
||||
}
|
||||
};
|
||||
|
||||
let taskmgr_shutdown_cancellation = async {
|
||||
task_mgr::shutdown_watcher().await;
|
||||
"aborted because task_mgr shutdown requested".to_string()
|
||||
};
|
||||
|
||||
tokio::pin!(calculation);
|
||||
loop {
|
||||
tokio::select! {
|
||||
res = &mut calculation => { return res }
|
||||
reason = timeline_state_cancellation => {
|
||||
debug!(reason = reason, "cancelling calculation");
|
||||
cancel.cancel();
|
||||
return calculation.await;
|
||||
}
|
||||
},
|
||||
reason = taskmgr_shutdown_cancellation => {
|
||||
debug!(reason = reason, "cancelling calculation");
|
||||
cancel.cancel();
|
||||
return calculation.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Calculate the logical size of the database at the latest LSN.
|
||||
///
|
||||
/// NOTE: counted incrementally, includes ancestors, this can be a slow operation.
|
||||
pub fn calculate_logical_size(&self, up_to_lsn: Lsn) -> anyhow::Result<u64> {
|
||||
pub fn calculate_logical_size(
|
||||
&self,
|
||||
up_to_lsn: Lsn,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<u64, CalculateLogicalSizeError> {
|
||||
info!(
|
||||
"Calculating logical size for timeline {} at {}",
|
||||
self.timeline_id, up_to_lsn
|
||||
@@ -1360,7 +1414,7 @@ impl Timeline {
|
||||
} else {
|
||||
self.metrics.logical_size_histo.start_timer()
|
||||
};
|
||||
let logical_size = self.get_current_logical_size_non_incremental(up_to_lsn)?;
|
||||
let logical_size = self.get_current_logical_size_non_incremental(up_to_lsn, cancel)?;
|
||||
debug!("calculated logical size: {logical_size}");
|
||||
timer.stop_and_record();
|
||||
Ok(logical_size)
|
||||
|
||||
@@ -265,10 +265,6 @@ def test_timeline_initial_logical_size_calculation_cancellation(
|
||||
log.info(
|
||||
f"try to delete the timeline using {deletion_method}, this should cancel size computation tasks and wait for them to finish"
|
||||
)
|
||||
if deletion_method == "timeline_delete":
|
||||
env.pageserver.allowed_errors.append(
|
||||
f".*initial size calculation.*{tenant_id}.*{timeline_id}.*aborted because task_mgr shutdown requested"
|
||||
)
|
||||
delete_timeline_success: queue.Queue[bool] = queue.Queue(maxsize=1)
|
||||
|
||||
def delete_timeline_thread_fn():
|
||||
|
||||
Reference in New Issue
Block a user