Compare commits

..

11 Commits

Author SHA1 Message Date
Christian Schwarz
1155945a3e Revert "revert the cancellation sensitivity change for the flush task, doesn't work because flush task can't disambiguate orderly shutdown"
This reverts commit 42922cebe0.
2025-04-11 18:39:33 +02:00
Christian Schwarz
42922cebe0 revert the cancellation sensitivity change for the flush task, doesn't work because flush task can't disambiguate orderly shutdown 2025-04-11 18:36:06 +02:00
Christian Schwarz
69e0c65393 also do this async cleanup after drop for EphemeralFile, it already uses BufferedWriter 2025-04-11 18:26:15 +02:00
Christian Schwarz
2f0677be26 refactor delta&image writers to perform cleanup on Drop in the background
In #10063 we will switch BlobWriter, which underlies delta and image
layer writers, to use the owned buffers IO buffered writer.

That buffered writer implements double-buffering by virtue of a background task
that performs the flushing -- it owns the VirtualFile and both
DeltaLayerWriter and ImageLayerWriter are mere clients to it.

The implication is that it's no longer true that dropping these client
objects guarantees that all IO activity is complete. We must wait for the
flush task to exit.

In preparation for that new world, this PR moves the cleanup to a short-lived
task that is spawned from the Drop impl, and adds appropriate gate guard
holdings to hook it into the Timeline lifecycle.

We must (theoretically) worry that there will be a retry inbetween Drop
completing and the spawned task completing. It could collide on the
randomly generated temporary file name. We avoid this by switching to a
global monotonic counter.

Refs
- extracted from https://github.com/neondatabase/neon/pull/10063
- epic https://github.com/neondatabase/neon/issues/9868
2025-04-11 17:40:42 +02:00
Christian Schwarz
062c7b9a76 refactor: plumb gate and cancellation down to to blob_io::BlobWriter
In #10063 we will switch BlobWriter to use the owned buffers IO buffered
writer, which implements double-buffering by virtue of a background task
that performs the flushing.

That task's lifecylce must be contained within the Timeline lifecycle,
so, it must hold the timeline gate open and respect Timeline::cancel.

This PR does the noisy plumbing to reduce the #10063 diff.

Refs
- extracted from https://github.com/neondatabase/neon/pull/10063
- epic https://github.com/neondatabase/neon/issues/9868
2025-04-11 17:00:00 +02:00
Arpad Müller
c66444ea15 Add timeline_import http endpoint (#11484)
The added `timleine_import` endpoint allows us to migrate safekeeper
timelines from control plane managed to storcon managed.
 
Part of #9011
2025-04-11 14:10:27 +00:00
Arpad Müller
88f01c1ca1 Introduce WalIngestError (#11506)
Introduces a `WalIngestError` struct together with a
`WalIngestErrorKind` enum, to be used for walingest related failures and
errors.

* the enum captures backtraces, so we don't regress in comparison to
`anyhow::Error`s (backtraces might be a bit shorter if we use one of the
`anyhow::Error` wrappers)
* it explicitly lists most/all of the potential cases that can occur.

I've originally been inspired to do this in #11496, but it's a
longer-term TODO.
2025-04-11 14:08:46 +00:00
Erik Grinaker
a6937a3281 pageserver: improve shard ancestor compaction logging (#11535)
## Problem

Shard ancestor compaction always logs "starting shard ancestor
compaction", even if there is no work to do. This is very spammy (every
20 seconds for every shard). It also has limited progress logging.

## Summary of changes

* Only log "starting shard ancestor compaction" when there's work to do.
* Include details about the amount of work.
* Log progress messages for each layer, and when waiting for uploads.
* Log when compaction is completed, with elapsed duration and whether
there is more work for a later iteration.
2025-04-11 12:14:08 +00:00
Erik Grinaker
3c8565a194 test_runner: propagate config via attach_hook for test fix (#11529)
## Problem

The `pagebench` benchmarks set up an initial dataset by creating a
template tenant, copying the remote storage to a bunch of new tenants,
and attaching them to Pageservers.

In #11420, we found that
`test_pageserver_characterize_throughput_with_n_tenants` had degraded
performance because it set a custom tenant config in Pageservers that
was then replaced with the default tenant config by the storage
controller.

The initial fix was to register the tenants directly in the storage
controller, but this created the tenants with generation 1. This broke
`test_basebackup_with_high_slru_count`, where the template tenant was at
generation 2, leading to all layer files at generation 2 being ignored.

Resolves #11485.
Touches #11381.

## Summary of changes

This patch addresses both test issues by modifying `attach_hook` to also
take a custom tenant config. This allows attaching tenants to
Pageservers from pre-existing remote storage, specifying both the
generation and tenant config when registering them in the storage
controller.
2025-04-11 11:31:12 +00:00
Christian Schwarz
979fa0682b tests: update batching perf test workload to include scattered LSNs (#11391)
The batching perf test workload is currently read-only sequential scans.
However, realistic workloads have concurrent writes (to other pages)
going on.

This PR simulates concurrent writes to other pages by emitting logical
replication messages.

These degrade the achieved batching factor, for the reason see
- https://github.com/neondatabase/neon/issues/10765

PR 
- https://github.com/neondatabase/neon/pull/11494

will fix this problem and get batching factor back up.

---------

Co-authored-by: Vlad Lazar <vlad@neon.tech>
2025-04-11 09:55:49 +00:00
Christian Schwarz
8884865bca tests: make test_pageserver_getpage_throttle less flaky (#11482)
# Refs

- fixes https://github.com/neondatabase/neon/issues/11395

# Problem

Since 2025-03-10, we have observed increased flakiness of
`test_pageserver_getpage_throttle`.

The test is timing-dependent by nature, and was hitting the

```
 assert duration_secs >= 10 * actual_smgr_query_seconds, (
        "smgr metrics should not include throttle wait time"
    )
```

quite frequently.

# Analysis

These failures are not reproducible.

In this PR's history is a commit that reran the test 100 times without
requiring a single retry.

In https://github.com/neondatabase/neon/issues/11395 there is a link to
a query to the test results database.
It shows that the flakiness was not constant, but rather episodic:
2025-03-{10,11,12,13} 2025-03-{19,20,21} 2025-03-31 and 2025-04-01.

To me, this suggests variability in available CPU.

# Solution

The point of the offending assertion is to ensure that most of the
request latency is spent on throttling, because testing of the
throttling mechanism is the point of the test.
The `10` magic number means at most 10% of mean latency may be spent on
request processing.

Ideally we would control the passage of time (virtual clock source) to
make this test deterministic.

But I don't see that happening in our regression test setup.

So, this PR de-flakes the test as follows:
- allot up to 66% of mean latency for request processing
- increase duration from 10s to 20s, hoping to get better protection
from momentary CPU spikes in noisy neighbor tests or VMs on the runner
host

As a drive-by, switch to `pytest.approx` and remove one self-test
assertion I can't make sense of anymore.
2025-04-11 09:38:05 +00:00
30 changed files with 752 additions and 296 deletions

View File

@@ -13,7 +13,9 @@ use pageserver_api::controller_api::{
NodeConfigureRequest, NodeDescribeResponse, NodeRegisterRequest, TenantCreateRequest,
TenantCreateResponse, TenantLocateResponse,
};
use pageserver_api::models::{TenantConfigRequest, TimelineCreateRequest, TimelineInfo};
use pageserver_api::models::{
TenantConfig, TenantConfigRequest, TimelineCreateRequest, TimelineInfo,
};
use pageserver_api::shard::TenantShardId;
use pageserver_client::mgmt_api::ResponseErrorMessageExt;
use postgres_backend::AuthType;
@@ -82,7 +84,8 @@ impl NeonStorageControllerStopArgs {
pub struct AttachHookRequest {
pub tenant_shard_id: TenantShardId,
pub node_id: Option<NodeId>,
pub generation_override: Option<i32>,
pub generation_override: Option<i32>, // only new tenants
pub config: Option<TenantConfig>, // only new tenants
}
#[derive(Serialize, Deserialize)]
@@ -805,6 +808,7 @@ impl StorageController {
tenant_shard_id,
node_id: Some(pageserver_id),
generation_override: None,
config: None,
};
let response = self

View File

@@ -7,7 +7,8 @@ use std::time::{Duration, Instant};
/// API (`/control/v1` prefix). Implemented by the server
/// in [`storage_controller::http`]
use serde::{Deserialize, Serialize};
use utils::id::{NodeId, TenantId};
use utils::id::{NodeId, TenantId, TimelineId};
use utils::lsn::Lsn;
use crate::models::{PageserverUtilization, ShardParameters, TenantConfig};
use crate::shard::{ShardStripeSize, TenantShardId};
@@ -499,6 +500,15 @@ pub struct SafekeeperSchedulingPolicyRequest {
pub scheduling_policy: SkSchedulingPolicy,
}
/// Import request for safekeeper timelines.
#[derive(Serialize, Deserialize, Clone)]
pub struct TimelineImportRequest {
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub start_lsn: Lsn,
pub sk_set: Vec<NodeId>,
}
#[cfg(test)]
mod test {
use serde_json;

View File

@@ -927,7 +927,7 @@ impl Key {
/// Guaranteed to return `Ok()` if [`Self::is_rel_block_key`] returns `true` for `key`.
#[inline(always)]
pub fn to_rel_block(self) -> anyhow::Result<(RelTag, BlockNumber)> {
pub fn to_rel_block(self) -> Result<(RelTag, BlockNumber), ToRelBlockError> {
Ok(match self.field1 {
0x00 => (
RelTag {
@@ -938,7 +938,7 @@ impl Key {
},
self.field6,
),
_ => anyhow::bail!("unexpected value kind 0x{:02x}", self.field1),
_ => return Err(ToRelBlockError(self.field1)),
})
}
}
@@ -951,6 +951,17 @@ impl std::str::FromStr for Key {
}
}
#[derive(Debug)]
pub struct ToRelBlockError(u8);
impl fmt::Display for ToRelBlockError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "unexpected value kind 0x{:02x}", self.0)
}
}
impl std::error::Error for ToRelBlockError {}
#[cfg(test)]
mod tests {
use std::str::FromStr;

View File

@@ -126,7 +126,7 @@ async fn ingest(
max_concurrency: NonZeroUsize::new(1).unwrap(),
});
let (_desc, path) = layer
.write_to_disk(&ctx, None, l0_flush_state.inner())
.write_to_disk(&ctx, None, l0_flush_state.inner(), &gate, cancel.clone())
.await?
.unwrap();
tokio::fs::remove_file(path).await?;

View File

@@ -3253,7 +3253,7 @@ async fn ingest_aux_files(
modification
.put_file(&fname, content.as_bytes(), &ctx)
.await
.map_err(ApiError::InternalServerError)?;
.map_err(|e| ApiError::InternalServerError(e.into()))?;
}
modification
.commit(&ctx)

View File

@@ -27,7 +27,7 @@ use crate::context::RequestContext;
use crate::metrics::WAL_INGEST;
use crate::pgdatadir_mapping::*;
use crate::tenant::Timeline;
use crate::walingest::WalIngest;
use crate::walingest::{WalIngest, WalIngestErrorKind};
// Returns checkpoint LSN from controlfile
pub fn get_lsn_from_controlfile(path: &Utf8Path) -> Result<Lsn> {
@@ -157,9 +157,9 @@ async fn import_rel(
.put_rel_creation(rel, nblocks as u32, ctx)
.await
{
match e {
RelationError::AlreadyExists => {
debug!("Relation {} already exist. We must be extending it.", rel)
match e.kind {
WalIngestErrorKind::RelationAlreadyExists(rel) => {
debug!("Relation {rel} already exists. We must be extending it.")
}
_ => return Err(e.into()),
}

View File

@@ -9,8 +9,9 @@
use std::collections::{BTreeMap, HashMap, HashSet, hash_map};
use std::ops::{ControlFlow, Range};
use crate::PERF_TRACE_TARGET;
use anyhow::{Context, ensure};
use crate::walingest::{WalIngestError, WalIngestErrorKind};
use crate::{PERF_TRACE_TARGET, ensure_walingest};
use anyhow::Context;
use bytes::{Buf, Bytes, BytesMut};
use enum_map::Enum;
use itertools::Itertools;
@@ -136,12 +137,8 @@ impl From<PageReconstructError> for CalculateLogicalSizeError {
#[derive(Debug, thiserror::Error)]
pub enum RelationError {
#[error("Relation Already Exists")]
AlreadyExists,
#[error("invalid relnode")]
InvalidRelnode,
#[error(transparent)]
Other(#[from] anyhow::Error),
}
///
@@ -1478,8 +1475,8 @@ impl DatadirModification<'_> {
}
/// Set the current lsn
pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> anyhow::Result<()> {
ensure!(
pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> Result<(), WalIngestError> {
ensure_walingest!(
lsn >= self.lsn,
"setting an older lsn {} than {} is not allowed",
lsn,
@@ -1578,7 +1575,7 @@ impl DatadirModification<'_> {
&mut self,
rel: RelTag,
ctx: &RequestContext,
) -> Result<u32, PageReconstructError> {
) -> Result<u32, WalIngestError> {
// Get current size and put rel creation if rel doesn't exist
//
// NOTE: we check the cache first even though get_rel_exists and get_rel_size would
@@ -1593,14 +1590,13 @@ impl DatadirModification<'_> {
.await?
{
// create it with 0 size initially, the logic below will extend it
self.put_rel_creation(rel, 0, ctx)
.await
.context("Relation Error")?;
self.put_rel_creation(rel, 0, ctx).await?;
Ok(0)
} else {
self.tline
Ok(self
.tline
.get_rel_size(rel, Version::Modified(self), ctx)
.await
.await?)
}
}
@@ -1637,11 +1633,14 @@ impl DatadirModification<'_> {
// TODO(vlad): remove this argument and replace the shard check with is_key_local
shard: &ShardIdentity,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
let mut gaps_at_lsns = Vec::default();
for meta in batch.metadata.iter() {
let (rel, blkno) = Key::from_compact(meta.key()).to_rel_block()?;
let key = Key::from_compact(meta.key());
let (rel, blkno) = key
.to_rel_block()
.map_err(|_| WalIngestErrorKind::InvalidKey(key, meta.lsn()))?;
let new_nblocks = blkno + 1;
let old_nblocks = self.create_relation_if_required(rel, ctx).await?;
@@ -1683,8 +1682,8 @@ impl DatadirModification<'_> {
rel: RelTag,
blknum: BlockNumber,
rec: NeonWalRecord,
) -> anyhow::Result<()> {
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
) -> Result<(), WalIngestError> {
ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec));
Ok(())
}
@@ -1696,7 +1695,7 @@ impl DatadirModification<'_> {
segno: u32,
blknum: BlockNumber,
rec: NeonWalRecord,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
if !self.tline.tenant_shard_id.is_shard_zero() {
return Ok(());
}
@@ -1714,14 +1713,11 @@ impl DatadirModification<'_> {
rel: RelTag,
blknum: BlockNumber,
img: Bytes,
) -> anyhow::Result<()> {
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
) -> Result<(), WalIngestError> {
ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
let key = rel_block_to_key(rel, blknum);
if !key.is_valid_key_on_write_path() {
anyhow::bail!(
"the request contains data not supported by pageserver at {}",
key
);
Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
}
self.put(rel_block_to_key(rel, blknum), Value::Image(img));
Ok(())
@@ -1733,15 +1729,12 @@ impl DatadirModification<'_> {
segno: u32,
blknum: BlockNumber,
img: Bytes,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
assert!(self.tline.tenant_shard_id.is_shard_zero());
let key = slru_block_to_key(kind, segno, blknum);
if !key.is_valid_key_on_write_path() {
anyhow::bail!(
"the request contains data not supported by pageserver at {}",
key
);
Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
}
self.put(key, Value::Image(img));
Ok(())
@@ -1751,15 +1744,11 @@ impl DatadirModification<'_> {
&mut self,
rel: RelTag,
blknum: BlockNumber,
) -> anyhow::Result<()> {
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
) -> Result<(), WalIngestError> {
ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
let key = rel_block_to_key(rel, blknum);
if !key.is_valid_key_on_write_path() {
anyhow::bail!(
"the request contains data not supported by pageserver: {} @ {}",
key,
self.lsn
);
Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
}
let batch = self
@@ -1776,15 +1765,11 @@ impl DatadirModification<'_> {
kind: SlruKind,
segno: u32,
blknum: BlockNumber,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
assert!(self.tline.tenant_shard_id.is_shard_zero());
let key = slru_block_to_key(kind, segno, blknum);
if !key.is_valid_key_on_write_path() {
anyhow::bail!(
"the request contains data not supported by pageserver: {} @ {}",
key,
self.lsn
);
Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
}
let batch = self
@@ -1832,8 +1817,10 @@ impl DatadirModification<'_> {
dbnode: Oid,
img: Bytes,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let v2_enabled = self.maybe_enable_rel_size_v2()?;
) -> Result<(), WalIngestError> {
let v2_enabled = self
.maybe_enable_rel_size_v2()
.map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
// Add it to the directory (if it doesn't exist already)
let buf = self.get(DBDIR_KEY, ctx).await?;
@@ -1874,13 +1861,13 @@ impl DatadirModification<'_> {
xid: u64,
img: Bytes,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
// Add it to the directory entry
let dirbuf = self.get(TWOPHASEDIR_KEY, ctx).await?;
let newdirbuf = if self.tline.pg_version >= 17 {
let mut dir = TwoPhaseDirectoryV17::des(&dirbuf)?;
if !dir.xids.insert(xid) {
anyhow::bail!("twophase file for xid {} already exists", xid);
Err(WalIngestErrorKind::FileAlreadyExists(xid))?;
}
self.pending_directory_entries.push((
DirectoryKind::TwoPhase,
@@ -1891,7 +1878,7 @@ impl DatadirModification<'_> {
let xid = xid as u32;
let mut dir = TwoPhaseDirectory::des(&dirbuf)?;
if !dir.xids.insert(xid) {
anyhow::bail!("twophase file for xid {} already exists", xid);
Err(WalIngestErrorKind::FileAlreadyExists(xid.into()))?;
}
self.pending_directory_entries.push((
DirectoryKind::TwoPhase,
@@ -1909,22 +1896,22 @@ impl DatadirModification<'_> {
&mut self,
origin_id: RepOriginId,
origin_lsn: Lsn,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
let key = repl_origin_key(origin_id);
self.put(key, Value::Image(origin_lsn.ser().unwrap().into()));
Ok(())
}
pub async fn drop_replorigin(&mut self, origin_id: RepOriginId) -> anyhow::Result<()> {
pub async fn drop_replorigin(&mut self, origin_id: RepOriginId) -> Result<(), WalIngestError> {
self.set_replorigin(origin_id, Lsn::INVALID).await
}
pub fn put_control_file(&mut self, img: Bytes) -> anyhow::Result<()> {
pub fn put_control_file(&mut self, img: Bytes) -> Result<(), WalIngestError> {
self.put(CONTROLFILE_KEY, Value::Image(img));
Ok(())
}
pub fn put_checkpoint(&mut self, img: Bytes) -> anyhow::Result<()> {
pub fn put_checkpoint(&mut self, img: Bytes) -> Result<(), WalIngestError> {
self.put(CHECKPOINT_KEY, Value::Image(img));
Ok(())
}
@@ -1934,7 +1921,7 @@ impl DatadirModification<'_> {
spcnode: Oid,
dbnode: Oid,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
let total_blocks = self
.tline
.get_db_size(spcnode, dbnode, Version::Modified(self), ctx)
@@ -1973,20 +1960,21 @@ impl DatadirModification<'_> {
rel: RelTag,
nblocks: BlockNumber,
ctx: &RequestContext,
) -> Result<(), RelationError> {
) -> Result<(), WalIngestError> {
if rel.relnode == 0 {
return Err(RelationError::InvalidRelnode);
Err(WalIngestErrorKind::LogicalError(anyhow::anyhow!(
"invalid relnode"
)))?;
}
// It's possible that this is the first rel for this db in this
// tablespace. Create the reldir entry for it if so.
let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await.context("read db")?)
.context("deserialize db")?;
let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await?)?;
let dbdir_exists =
if let hash_map::Entry::Vacant(e) = dbdir.dbdirs.entry((rel.spcnode, rel.dbnode)) {
// Didn't exist. Update dbdir
e.insert(false);
let buf = DbDirectory::ser(&dbdir).context("serialize db")?;
let buf = DbDirectory::ser(&dbdir)?;
self.pending_directory_entries.push((
DirectoryKind::Db,
MetricsUpdate::Set(dbdir.dbdirs.len() as u64),
@@ -2003,27 +1991,25 @@ impl DatadirModification<'_> {
RelDirectory::default()
} else {
// reldir already exists, fetch it
RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?)
.context("deserialize db")?
RelDirectory::des(&self.get(rel_dir_key, ctx).await?)?
};
let v2_enabled = self.maybe_enable_rel_size_v2()?;
let v2_enabled = self
.maybe_enable_rel_size_v2()
.map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
if v2_enabled {
if rel_dir.rels.contains(&(rel.relnode, rel.forknum)) {
return Err(RelationError::AlreadyExists);
Err(WalIngestErrorKind::RelationAlreadyExists(rel))?;
}
let sparse_rel_dir_key =
rel_tag_sparse_key(rel.spcnode, rel.dbnode, rel.relnode, rel.forknum);
// check if the rel_dir_key exists in v2
let val = self
.sparse_get(sparse_rel_dir_key, ctx)
.await
.map_err(|e| RelationError::Other(e.into()))?;
let val = self.sparse_get(sparse_rel_dir_key, ctx).await?;
let val = RelDirExists::decode_option(val)
.map_err(|_| RelationError::Other(anyhow::anyhow!("invalid reldir key")))?;
.map_err(|_| WalIngestErrorKind::InvalidRelDirKey(sparse_rel_dir_key))?;
if val == RelDirExists::Exists {
return Err(RelationError::AlreadyExists);
Err(WalIngestErrorKind::RelationAlreadyExists(rel))?;
}
self.put(
sparse_rel_dir_key,
@@ -2039,9 +2025,7 @@ impl DatadirModification<'_> {
// will be key not found errors if we don't create an empty one for rel_size_v2.
self.put(
rel_dir_key,
Value::Image(Bytes::from(
RelDirectory::ser(&RelDirectory::default()).context("serialize")?,
)),
Value::Image(Bytes::from(RelDirectory::ser(&RelDirectory::default())?)),
);
}
self.pending_directory_entries
@@ -2049,7 +2033,7 @@ impl DatadirModification<'_> {
} else {
// Add the new relation to the rel directory entry, and write it back
if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
return Err(RelationError::AlreadyExists);
Err(WalIngestErrorKind::RelationAlreadyExists(rel))?;
}
if !dbdir_exists {
self.pending_directory_entries
@@ -2059,9 +2043,7 @@ impl DatadirModification<'_> {
.push((DirectoryKind::Rel, MetricsUpdate::Add(1)));
self.put(
rel_dir_key,
Value::Image(Bytes::from(
RelDirectory::ser(&rel_dir).context("serialize")?,
)),
Value::Image(Bytes::from(RelDirectory::ser(&rel_dir)?)),
);
}
@@ -2086,8 +2068,8 @@ impl DatadirModification<'_> {
rel: RelTag,
nblocks: BlockNumber,
ctx: &RequestContext,
) -> anyhow::Result<()> {
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
) -> Result<(), WalIngestError> {
ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
if self
.tline
.get_rel_exists(rel, Version::Modified(self), ctx)
@@ -2117,8 +2099,8 @@ impl DatadirModification<'_> {
rel: RelTag,
nblocks: BlockNumber,
ctx: &RequestContext,
) -> anyhow::Result<()> {
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
) -> Result<(), WalIngestError> {
ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
// Put size
let size_key = rel_size_to_key(rel);
@@ -2142,8 +2124,10 @@ impl DatadirModification<'_> {
&mut self,
drop_relations: HashMap<(u32, u32), Vec<RelTag>>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let v2_enabled = self.maybe_enable_rel_size_v2()?;
) -> Result<(), WalIngestError> {
let v2_enabled = self
.maybe_enable_rel_size_v2()
.map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
for ((spc_node, db_node), rel_tags) in drop_relations {
let dir_key = rel_dir_to_key(spc_node, db_node);
let buf = self.get(dir_key, ctx).await?;
@@ -2163,7 +2147,7 @@ impl DatadirModification<'_> {
let key =
rel_tag_sparse_key(spc_node, db_node, rel_tag.relnode, rel_tag.forknum);
let val = RelDirExists::decode_option(self.sparse_get(key, ctx).await?)
.map_err(|_| RelationError::Other(anyhow::anyhow!("invalid reldir key")))?;
.map_err(|_| WalIngestErrorKind::InvalidKey(key, self.lsn))?;
if val == RelDirExists::Exists {
self.pending_directory_entries
.push((DirectoryKind::RelV2, MetricsUpdate::Sub(1)));
@@ -2206,7 +2190,7 @@ impl DatadirModification<'_> {
segno: u32,
nblocks: BlockNumber,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
assert!(self.tline.tenant_shard_id.is_shard_zero());
// Add it to the directory entry
@@ -2215,7 +2199,7 @@ impl DatadirModification<'_> {
let mut dir = SlruSegmentDirectory::des(&buf)?;
if !dir.segments.insert(segno) {
anyhow::bail!("slru segment {kind:?}/{segno} already exists");
Err(WalIngestErrorKind::SlruAlreadyExists(kind, segno))?;
}
self.pending_directory_entries.push((
DirectoryKind::SlruSegment(kind),
@@ -2242,7 +2226,7 @@ impl DatadirModification<'_> {
kind: SlruKind,
segno: u32,
nblocks: BlockNumber,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
assert!(self.tline.tenant_shard_id.is_shard_zero());
// Put size
@@ -2258,7 +2242,7 @@ impl DatadirModification<'_> {
kind: SlruKind,
segno: u32,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
// Remove it from the directory entry
let dir_key = slru_dir_to_key(kind);
let buf = self.get(dir_key, ctx).await?;
@@ -2283,7 +2267,7 @@ impl DatadirModification<'_> {
}
/// Drop a relmapper file (pg_filenode.map)
pub fn drop_relmap_file(&mut self, _spcnode: Oid, _dbnode: Oid) -> anyhow::Result<()> {
pub fn drop_relmap_file(&mut self, _spcnode: Oid, _dbnode: Oid) -> Result<(), WalIngestError> {
// TODO
Ok(())
}
@@ -2293,7 +2277,7 @@ impl DatadirModification<'_> {
&mut self,
xid: u64,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
// Remove it from the directory entry
let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
let newdirbuf = if self.tline.pg_version >= 17 {
@@ -2308,7 +2292,8 @@ impl DatadirModification<'_> {
));
Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
} else {
let xid: u32 = u32::try_from(xid)?;
let xid: u32 = u32::try_from(xid)
.map_err(|e| WalIngestErrorKind::LogicalError(anyhow::Error::from(e)))?;
let mut dir = TwoPhaseDirectory::des(&buf)?;
if !dir.xids.remove(&xid) {
@@ -2333,7 +2318,7 @@ impl DatadirModification<'_> {
path: &str,
content: &[u8],
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
let key = aux_file::encode_aux_file_key(path);
// retrieve the key from the engine
let old_val = match self.get(key, ctx).await {
@@ -2342,7 +2327,7 @@ impl DatadirModification<'_> {
Err(e) => return Err(e.into()),
};
let files: Vec<(&str, &[u8])> = if let Some(ref old_val) = old_val {
aux_file::decode_file_value(old_val)?
aux_file::decode_file_value(old_val).map_err(WalIngestErrorKind::EncodeAuxFileError)?
} else {
Vec::new()
};
@@ -2387,7 +2372,8 @@ impl DatadirModification<'_> {
}
(None, true) => warn!("removing non-existing aux file: {}", path),
}
let new_val = aux_file::encode_file_value(&new_files)?;
let new_val = aux_file::encode_file_value(&new_files)
.map_err(WalIngestErrorKind::EncodeAuxFileError)?;
self.put(key, Value::Image(new_val.into()));
Ok(())

View File

@@ -22,6 +22,7 @@ use bytes::{BufMut, BytesMut};
use pageserver_api::models::ImageCompressionAlgorithm;
use tokio::io::AsyncWriteExt;
use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
use tokio_util::sync::CancellationToken;
use tracing::warn;
use crate::context::RequestContext;
@@ -169,7 +170,13 @@ pub struct BlobWriter<const BUFFERED: bool> {
}
impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
pub fn new(inner: VirtualFile, start_offset: u64) -> Self {
pub fn new(
inner: VirtualFile,
start_offset: u64,
_gate: &utils::sync::gate::Gate,
_cancel: CancellationToken,
_ctx: &RequestContext,
) -> Self {
Self {
inner,
offset: start_offset,
@@ -432,12 +439,14 @@ pub(crate) mod tests {
) -> Result<(Utf8TempDir, Utf8PathBuf, Vec<u64>), Error> {
let temp_dir = camino_tempfile::tempdir()?;
let pathbuf = temp_dir.path().join("file");
let gate = utils::sync::gate::Gate::default();
let cancel = CancellationToken::new();
// Write part (in block to drop the file)
let mut offsets = Vec::new();
{
let file = VirtualFile::create(pathbuf.as_path(), ctx).await?;
let mut wtr = BlobWriter::<BUFFERED>::new(file, 0);
let mut wtr = BlobWriter::<BUFFERED>::new(file, 0, &gate, cancel.clone(), ctx);
for blob in blobs.iter() {
let (_, res) = if compression {
let res = wtr

View File

@@ -28,6 +28,11 @@ pub struct EphemeralFile {
_timeline_id: TimelineId,
page_cache_file_id: page_cache::FileId,
bytes_written: u64,
// Always Some except during Drop
inner: Option<Inner>,
}
struct Inner {
buffered_writer: owned_buffers_io::write::BufferedWriter<IoBufferMut, VirtualFile>,
/// Gate guard is held on as long as we need to do operations in the path (delete on drop)
_gate_guard: utils::sync::gate::GateGuard,
@@ -44,9 +49,9 @@ impl EphemeralFile {
cancel: &CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<EphemeralFile> {
static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1);
static NEXT_TEMP_DISAMBIGUATOR: AtomicU64 = AtomicU64::new(1);
let filename_disambiguator =
NEXT_FILENAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
NEXT_TEMP_DISAMBIGUATOR.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let filename = conf
.timeline_path(&tenant_shard_id, &timeline_id)
@@ -73,34 +78,68 @@ impl EphemeralFile {
_timeline_id: timeline_id,
page_cache_file_id,
bytes_written: 0,
buffered_writer: owned_buffers_io::write::BufferedWriter::new(
file,
|| IoBufferMut::with_capacity(TAIL_SZ),
gate.enter()?,
cancel.child_token(),
ctx,
info_span!(parent: None, "ephemeral_file_buffered_writer", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %filename),
),
_gate_guard: gate.enter()?,
inner: Some(Inner {
buffered_writer: owned_buffers_io::write::BufferedWriter::new(
file,
|| IoBufferMut::with_capacity(TAIL_SZ),
gate.enter()?,
cancel.child_token(),
ctx,
info_span!(parent: None, "ephemeral_file_buffered_writer", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %filename),
),
_gate_guard: gate.enter()?,
}),
})
}
fn buffered_writer(
&self,
) -> &owned_buffers_io::write::BufferedWriter<IoBufferMut, VirtualFile> {
&self
.inner
.as_ref()
.expect("we never take out except during drop")
.buffered_writer
}
fn buffered_writer_mut(
&mut self,
) -> &mut owned_buffers_io::write::BufferedWriter<IoBufferMut, VirtualFile> {
&mut self
.inner
.as_mut()
.expect("we never take out except during drop")
.buffered_writer
}
}
impl Drop for EphemeralFile {
fn drop(&mut self) {
// unlink the file
// we are clear to do this, because we have entered a gate
let path = self.buffered_writer.as_inner().path();
let res = std::fs::remove_file(path);
if let Err(e) = res {
if e.kind() != std::io::ErrorKind::NotFound {
// just never log the not found errors, we cannot do anything for them; on detach
// the tenant directory is already gone.
//
// not found files might also be related to https://github.com/neondatabase/neon/issues/2442
error!("could not remove ephemeral file '{path}': {e}");
let inner = self.inner.take().expect("we never take out except here");
tokio::spawn(async move {
let Inner {
buffered_writer,
_gate_guard,
} = inner;
// XXX kinda ugly that we have this Arc here, would like to call VirtualFile::remove()
let virtual_file: Arc<VirtualFile> = buffered_writer.into_inner_no_flush().await;
let path = virtual_file.path();
let res = std::fs::remove_file(path);
if let Err(e) = res {
if e.kind() != std::io::ErrorKind::NotFound {
// TODO: can we retry?
// just never log the not found errors, we cannot do anything for them; on detach
// the tenant directory is already gone.
//
// not found files might also be related to https://github.com/neondatabase/neon/issues/2442
error!("could not remove ephemeral file '{path}': {e}");
}
}
}
drop(_gate_guard);
});
}
}
@@ -168,7 +207,7 @@ impl EphemeralFile {
// Write the payload
let (nwritten, control) = self
.buffered_writer
.buffered_writer_mut()
.write_buffered_borrowed_controlled(srcbuf, ctx)
.await
.map_err(|e| match e {
@@ -193,9 +232,9 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
dst: tokio_epoll_uring::Slice<B>,
ctx: &RequestContext,
) -> std::io::Result<(tokio_epoll_uring::Slice<B>, usize)> {
let submitted_offset = self.buffered_writer.bytes_submitted();
let submitted_offset = self.buffered_writer().bytes_submitted();
let mutable = match self.buffered_writer.inspect_mutable() {
let mutable = match self.buffered_writer().inspect_mutable() {
Some(mutable) => &mutable[0..mutable.pending()],
None => {
// Timeline::cancel and hence buffered writer flush was cancelled.
@@ -204,7 +243,7 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
}
};
let maybe_flushed = self.buffered_writer.inspect_maybe_flushed();
let maybe_flushed = self.buffered_writer().inspect_maybe_flushed();
let dst_cap = dst.bytes_total().into_u64();
let end = {
@@ -262,7 +301,7 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
let mutable_range = Range(std::cmp::max(start, submitted_offset), end);
let dst = if written_range.len() > 0 {
let file: &VirtualFile = self.buffered_writer.as_inner();
let file: &VirtualFile = self.buffered_writer().as_inner();
let bounds = dst.bounds();
let slice = file
.read_exact_at(dst.slice(0..written_range.len().into_usize()), start, ctx)
@@ -419,7 +458,7 @@ mod tests {
.await
.unwrap();
let mutable = file.buffered_writer.mutable();
let mutable = file.buffered_writer().mutable();
let cap = mutable.capacity();
let align = mutable.align();
@@ -456,13 +495,13 @@ mod tests {
assert_eq!(&buf, &content[range]);
}
let file_contents = std::fs::read(file.buffered_writer.as_inner().path()).unwrap();
let file_contents = std::fs::read(file.buffered_writer().as_inner().path()).unwrap();
assert!(file_contents == content[0..cap * 2]);
let maybe_flushed_buffer_contents = file.buffered_writer.inspect_maybe_flushed().unwrap();
let maybe_flushed_buffer_contents = file.buffered_writer().inspect_maybe_flushed().unwrap();
assert_eq!(&maybe_flushed_buffer_contents[..], &content[cap..cap * 2]);
let mutable_buffer_contents = file.buffered_writer.mutable();
let mutable_buffer_contents = file.buffered_writer().mutable();
assert_eq!(mutable_buffer_contents, &content[cap * 2..write_nbytes]);
}
@@ -477,7 +516,7 @@ mod tests {
.unwrap();
// mutable buffer and maybe_flushed buffer each has `cap` bytes.
let cap = file.buffered_writer.mutable().capacity();
let cap = file.buffered_writer().mutable().capacity();
let content: Vec<u8> = rand::thread_rng()
.sample_iter(rand::distributions::Standard)
@@ -489,18 +528,18 @@ mod tests {
// assert the state is as this test expects it to be
let load_io_buf_res = file.load_to_io_buf(&ctx).await.unwrap();
assert_eq!(&load_io_buf_res[..], &content[0..cap * 2 + cap / 2]);
let md = file.buffered_writer.as_inner().path().metadata().unwrap();
let md = file.buffered_writer().as_inner().path().metadata().unwrap();
assert_eq!(
md.len(),
2 * cap.into_u64(),
"buffered writer requires one write to be flushed if we write 2.5x buffer capacity"
);
assert_eq!(
&file.buffered_writer.inspect_maybe_flushed().unwrap()[0..cap],
&file.buffered_writer().inspect_maybe_flushed().unwrap()[0..cap],
&content[cap..cap * 2]
);
assert_eq!(
&file.buffered_writer.mutable()[0..cap / 2],
&file.buffered_writer().mutable()[0..cap / 2],
&content[cap * 2..cap * 2 + cap / 2]
);
}
@@ -522,7 +561,7 @@ mod tests {
.await
.unwrap();
let mutable = file.buffered_writer.mutable();
let mutable = file.buffered_writer().mutable();
let cap = mutable.capacity();
let align = mutable.align();
let content: Vec<u8> = rand::thread_rng()

View File

@@ -5,6 +5,7 @@ use std::sync::Arc;
use bytes::Bytes;
use pageserver_api::key::{KEY_SIZE, Key};
use pageserver_api::value::Value;
use tokio_util::sync::CancellationToken;
use utils::id::TimelineId;
use utils::lsn::Lsn;
use utils::shard::TenantShardId;
@@ -179,7 +180,7 @@ impl BatchLayerWriter {
/// An image writer that takes images and produces multiple image layers.
#[must_use]
pub struct SplitImageLayerWriter {
pub struct SplitImageLayerWriter<'a> {
inner: ImageLayerWriter,
target_layer_size: u64,
lsn: Lsn,
@@ -188,9 +189,12 @@ pub struct SplitImageLayerWriter {
tenant_shard_id: TenantShardId,
batches: BatchLayerWriter,
start_key: Key,
gate: &'a utils::sync::gate::Gate,
cancel: CancellationToken,
}
impl SplitImageLayerWriter {
impl<'a> SplitImageLayerWriter<'a> {
#[allow(clippy::too_many_arguments)]
pub async fn new(
conf: &'static PageServerConf,
timeline_id: TimelineId,
@@ -198,6 +202,8 @@ impl SplitImageLayerWriter {
start_key: Key,
lsn: Lsn,
target_layer_size: u64,
gate: &'a utils::sync::gate::Gate,
cancel: CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
Ok(Self {
@@ -208,6 +214,8 @@ impl SplitImageLayerWriter {
tenant_shard_id,
&(start_key..Key::MAX),
lsn,
gate,
cancel.clone(),
ctx,
)
.await?,
@@ -217,6 +225,8 @@ impl SplitImageLayerWriter {
batches: BatchLayerWriter::new(conf).await?,
lsn,
start_key,
gate,
cancel,
})
}
@@ -239,6 +249,8 @@ impl SplitImageLayerWriter {
self.tenant_shard_id,
&(key..Key::MAX),
self.lsn,
self.gate,
self.cancel.clone(),
ctx,
)
.await?;
@@ -291,7 +303,7 @@ impl SplitImageLayerWriter {
/// into a single file. This behavior might change in the future. For reference, the legacy compaction algorithm
/// will split them into multiple files based on size.
#[must_use]
pub struct SplitDeltaLayerWriter {
pub struct SplitDeltaLayerWriter<'a> {
inner: Option<(Key, DeltaLayerWriter)>,
target_layer_size: u64,
conf: &'static PageServerConf,
@@ -300,15 +312,19 @@ pub struct SplitDeltaLayerWriter {
lsn_range: Range<Lsn>,
last_key_written: Key,
batches: BatchLayerWriter,
gate: &'a utils::sync::gate::Gate,
cancel: CancellationToken,
}
impl SplitDeltaLayerWriter {
impl<'a> SplitDeltaLayerWriter<'a> {
pub async fn new(
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
lsn_range: Range<Lsn>,
target_layer_size: u64,
gate: &'a utils::sync::gate::Gate,
cancel: CancellationToken,
) -> anyhow::Result<Self> {
Ok(Self {
target_layer_size,
@@ -319,6 +335,8 @@ impl SplitDeltaLayerWriter {
lsn_range,
last_key_written: Key::MIN,
batches: BatchLayerWriter::new(conf).await?,
gate,
cancel,
})
}
@@ -344,6 +362,8 @@ impl SplitDeltaLayerWriter {
self.tenant_shard_id,
key,
self.lsn_range.clone(),
self.gate,
self.cancel.clone(),
ctx,
)
.await?,
@@ -362,6 +382,8 @@ impl SplitDeltaLayerWriter {
self.tenant_shard_id,
key,
self.lsn_range.clone(),
self.gate,
self.cancel.clone(),
ctx,
)
.await?;
@@ -469,6 +491,8 @@ mod tests {
get_key(0),
Lsn(0x18),
4 * 1024 * 1024,
&tline.gate,
tline.cancel.clone(),
&ctx,
)
.await
@@ -480,6 +504,8 @@ mod tests {
tenant.tenant_shard_id,
Lsn(0x18)..Lsn(0x20),
4 * 1024 * 1024,
&tline.gate,
tline.cancel.clone(),
)
.await
.unwrap();
@@ -546,6 +572,8 @@ mod tests {
get_key(0),
Lsn(0x18),
4 * 1024 * 1024,
&tline.gate,
tline.cancel.clone(),
&ctx,
)
.await
@@ -556,6 +584,8 @@ mod tests {
tenant.tenant_shard_id,
Lsn(0x18)..Lsn(0x20),
4 * 1024 * 1024,
&tline.gate,
tline.cancel.clone(),
)
.await
.unwrap();
@@ -643,6 +673,8 @@ mod tests {
get_key(0),
Lsn(0x18),
4 * 1024,
&tline.gate,
tline.cancel.clone(),
&ctx,
)
.await
@@ -654,6 +686,8 @@ mod tests {
tenant.tenant_shard_id,
Lsn(0x18)..Lsn(0x20),
4 * 1024,
&tline.gate,
tline.cancel.clone(),
)
.await
.unwrap();
@@ -730,6 +764,8 @@ mod tests {
tenant.tenant_shard_id,
Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
4 * 1024 * 1024,
&tline.gate,
tline.cancel.clone(),
)
.await
.unwrap();

View File

@@ -34,6 +34,7 @@ use std::ops::Range;
use std::os::unix::fs::FileExt;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use anyhow::{Context, Result, bail, ensure};
use camino::{Utf8Path, Utf8PathBuf};
@@ -45,11 +46,10 @@ use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::ImageCompressionAlgorithm;
use pageserver_api::shard::TenantShardId;
use pageserver_api::value::Value;
use rand::Rng;
use rand::distributions::Alphanumeric;
use serde::{Deserialize, Serialize};
use tokio::sync::OnceCell;
use tokio_epoll_uring::IoBuf;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::bin_ser::BeSer;
use utils::id::{TenantId, TimelineId};
@@ -287,19 +287,19 @@ impl DeltaLayer {
key_start: Key,
lsn_range: &Range<Lsn>,
) -> Utf8PathBuf {
let rand_string: String = rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(8)
.map(char::from)
.collect();
// Never reuse a filename in the lifetime of a pageserver process so that we need
// not worry about laggard Drop impl's async unlink hitting an already reused filename.
static NEXT_TEMP_DISAMBIGUATOR: AtomicU64 = AtomicU64::new(1);
let filename_disambiguator =
NEXT_TEMP_DISAMBIGUATOR.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
conf.timeline_path(tenant_shard_id, timeline_id)
.join(format!(
"{}-XXX__{:016X}-{:016X}.{}.{}",
"{}-XXX__{:016X}-{:016X}.{:x}.{}",
key_start,
u64::from(lsn_range.start),
u64::from(lsn_range.end),
rand_string,
filename_disambiguator,
TEMP_FILE_SUFFIX,
))
}
@@ -394,18 +394,23 @@ struct DeltaLayerWriterInner {
// Number of key-lsns in the layer.
num_keys: usize,
_gate_guard: utils::sync::gate::GateGuard,
}
impl DeltaLayerWriterInner {
///
/// Start building a new delta layer.
///
#[allow(clippy::too_many_arguments)]
async fn new(
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
key_start: Key,
lsn_range: Range<Lsn>,
gate: &utils::sync::gate::Gate,
cancel: CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
// Create the file initially with a temporary filename. We don't know
@@ -420,7 +425,7 @@ impl DeltaLayerWriterInner {
let mut file = VirtualFile::create(&path, ctx).await?;
// make room for the header block
file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64, gate, cancel, ctx);
// Initialize the b-tree index builder
let block_buf = BlockBuf::new();
@@ -435,6 +440,7 @@ impl DeltaLayerWriterInner {
tree: tree_builder,
blob_writer,
num_keys: 0,
_gate_guard: gate.enter()?,
})
}
@@ -628,12 +634,15 @@ impl DeltaLayerWriter {
///
/// Start building a new delta layer.
///
#[allow(clippy::too_many_arguments)]
pub async fn new(
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
key_start: Key,
lsn_range: Range<Lsn>,
gate: &utils::sync::gate::Gate,
cancel: CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
Ok(Self {
@@ -644,6 +653,8 @@ impl DeltaLayerWriter {
tenant_shard_id,
key_start,
lsn_range,
gate,
cancel,
ctx,
)
.await?,
@@ -719,12 +730,22 @@ impl DeltaLayerWriter {
impl Drop for DeltaLayerWriter {
fn drop(&mut self) {
if let Some(inner) = self.inner.take() {
// We want to remove the virtual file here, so it's fine to not
// having completely flushed unwritten data.
let vfile = inner.blob_writer.into_inner_no_flush();
let Some(inner) = self.inner.take() else {
return;
};
tokio::spawn(async move {
let DeltaLayerWriterInner {
blob_writer,
_gate_guard,
..
} = inner;
let vfile = blob_writer.into_inner_no_flush();
vfile.remove();
}
drop(_gate_guard);
});
}
}
@@ -1600,8 +1621,8 @@ pub(crate) mod test {
use bytes::Bytes;
use itertools::MinMaxResult;
use pageserver_api::value::Value;
use rand::RngCore;
use rand::prelude::{SeedableRng, SliceRandom, StdRng};
use rand::{Rng, RngCore};
use super::*;
use crate::DEFAULT_PG_VERSION;
@@ -1885,6 +1906,8 @@ pub(crate) mod test {
harness.tenant_shard_id,
entries_meta.key_range.start,
entries_meta.lsn_range.clone(),
&timeline.gate,
timeline.cancel.clone(),
&ctx,
)
.await?;
@@ -2079,6 +2102,8 @@ pub(crate) mod test {
tenant.tenant_shard_id,
Key::MIN,
Lsn(0x11)..truncate_at,
&branch.gate,
branch.cancel.clone(),
ctx,
)
.await
@@ -2213,6 +2238,8 @@ pub(crate) mod test {
tenant.tenant_shard_id,
*key_start,
(*lsn_min)..lsn_end,
&tline.gate,
tline.cancel.clone(),
ctx,
)
.await?;

View File

@@ -32,6 +32,7 @@ use std::ops::Range;
use std::os::unix::prelude::FileExt;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use anyhow::{Context, Result, bail, ensure};
use bytes::Bytes;
@@ -43,11 +44,10 @@ use pageserver_api::key::{DBDIR_KEY, KEY_SIZE, Key};
use pageserver_api::keyspace::KeySpace;
use pageserver_api::shard::{ShardIdentity, TenantShardId};
use pageserver_api::value::Value;
use rand::Rng;
use rand::distributions::Alphanumeric;
use serde::{Deserialize, Serialize};
use tokio::sync::OnceCell;
use tokio_stream::StreamExt;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::bin_ser::BeSer;
use utils::id::{TenantId, TimelineId};
@@ -251,14 +251,17 @@ impl ImageLayer {
tenant_shard_id: TenantShardId,
fname: &ImageLayerName,
) -> Utf8PathBuf {
let rand_string: String = rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(8)
.map(char::from)
.collect();
// Never reuse a filename in the lifetime of a pageserver process so that we need
// not worry about laggard Drop impl's async unlink hitting an already reused filename.
static NEXT_TEMP_DISAMBIGUATOR: AtomicU64 = AtomicU64::new(1);
let filename_disambiguator =
NEXT_TEMP_DISAMBIGUATOR.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
conf.timeline_path(&tenant_shard_id, &timeline_id)
.join(format!("{fname}.{rand_string}.{TEMP_FILE_SUFFIX}"))
.join(format!(
"{fname}.{:x}.{TEMP_FILE_SUFFIX}",
filename_disambiguator
))
}
///
@@ -742,18 +745,23 @@ struct ImageLayerWriterInner {
#[cfg(feature = "testing")]
last_written_key: Key,
_gate_guard: utils::sync::gate::GateGuard,
}
impl ImageLayerWriterInner {
///
/// Start building a new image layer.
///
#[allow(clippy::too_many_arguments)]
async fn new(
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
key_range: &Range<Key>,
lsn: Lsn,
gate: &utils::sync::gate::Gate,
cancel: CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
// Create the file initially with a temporary filename.
@@ -780,7 +788,7 @@ impl ImageLayerWriterInner {
};
// make room for the header block
file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64, gate, cancel, ctx);
// Initialize the b-tree index builder
let block_buf = BlockBuf::new();
@@ -801,6 +809,7 @@ impl ImageLayerWriterInner {
num_keys: 0,
#[cfg(feature = "testing")]
last_written_key: Key::MIN,
_gate_guard: gate.enter()?,
};
Ok(writer)
@@ -988,18 +997,30 @@ impl ImageLayerWriter {
///
/// Start building a new image layer.
///
#[allow(clippy::too_many_arguments)]
pub async fn new(
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
key_range: &Range<Key>,
lsn: Lsn,
gate: &utils::sync::gate::Gate,
cancel: CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<ImageLayerWriter> {
Ok(Self {
inner: Some(
ImageLayerWriterInner::new(conf, timeline_id, tenant_shard_id, key_range, lsn, ctx)
.await?,
ImageLayerWriterInner::new(
conf,
timeline_id,
tenant_shard_id,
key_range,
lsn,
gate,
cancel,
ctx,
)
.await?,
),
})
}
@@ -1050,9 +1071,22 @@ impl ImageLayerWriter {
impl Drop for ImageLayerWriter {
fn drop(&mut self) {
if let Some(inner) = self.inner.take() {
inner.blob_writer.into_inner().remove();
}
let Some(inner) = self.inner.take() else {
return;
};
tokio::spawn(async move {
let ImageLayerWriterInner {
blob_writer,
_gate_guard,
..
} = inner;
let vfile = blob_writer.into_inner();
vfile.remove();
drop(_gate_guard);
});
}
}
@@ -1203,6 +1237,8 @@ mod test {
harness.tenant_shard_id,
&range,
lsn,
&timeline.gate,
timeline.cancel.clone(),
&ctx,
)
.await
@@ -1268,6 +1304,8 @@ mod test {
harness.tenant_shard_id,
&range,
lsn,
&timeline.gate,
timeline.cancel.clone(),
&ctx,
)
.await
@@ -1346,6 +1384,8 @@ mod test {
tenant.tenant_shard_id,
&key_range,
lsn,
&tline.gate,
tline.cancel.clone(),
ctx,
)
.await?;

View File

@@ -719,6 +719,8 @@ impl InMemoryLayer {
ctx: &RequestContext,
key_range: Option<Range<Key>>,
l0_flush_global_state: &l0_flush::Inner,
gate: &utils::sync::gate::Gate,
cancel: CancellationToken,
) -> Result<Option<(PersistentLayerDesc, Utf8PathBuf)>> {
// Grab the lock in read-mode. We hold it over the I/O, but because this
// layer is not writeable anymore, no one should be trying to acquire the
@@ -759,6 +761,8 @@ impl InMemoryLayer {
self.tenant_shard_id,
Key::MIN,
self.start_lsn..end_lsn,
gate,
cancel,
ctx,
)
.await?;

View File

@@ -4805,7 +4805,13 @@ impl Timeline {
let ctx = ctx.attached_child();
let work = async move {
let Some((desc, path)) = frozen_layer
.write_to_disk(&ctx, key_range, self_clone.l0_flush_global_state.inner())
.write_to_disk(
&ctx,
key_range,
self_clone.l0_flush_global_state.inner(),
&self_clone.gate,
self_clone.cancel.clone(),
)
.await?
else {
return Ok(None);
@@ -5343,6 +5349,8 @@ impl Timeline {
self.tenant_shard_id,
&img_range,
lsn,
&self.gate,
self.cancel.clone(),
ctx,
)
.await?;
@@ -6707,6 +6715,8 @@ impl Timeline {
self.tenant_shard_id,
&(min_key..end_key),
lsn,
&self.gate,
self.cancel.clone(),
ctx,
)
.await?;
@@ -6768,6 +6778,8 @@ impl Timeline {
self.tenant_shard_id,
deltas.key_range.start,
deltas.lsn_range,
&self.gate,
self.cancel.clone(),
ctx,
)
.await?;

View File

@@ -747,8 +747,8 @@ impl KeyHistoryRetention {
async fn pipe_to(
self,
key: Key,
delta_writer: &mut SplitDeltaLayerWriter,
mut image_writer: Option<&mut SplitImageLayerWriter>,
delta_writer: &mut SplitDeltaLayerWriter<'_>,
mut image_writer: Option<&mut SplitImageLayerWriter<'_>>,
stat: &mut CompactionStatistics,
ctx: &RequestContext,
) -> anyhow::Result<()> {
@@ -1140,6 +1140,7 @@ impl Timeline {
) -> Result<(), CompactionError> {
let mut drop_layers = Vec::new();
let mut layers_to_rewrite: Vec<Layer> = Vec::new();
let mut rewrite_max_exceeded: bool = false;
// We will use the Lsn cutoff of the last GC as a threshold for rewriting layers: if a
// layer is behind this Lsn, it indicates that the layer is being retained beyond the
@@ -1148,12 +1149,7 @@ impl Timeline {
// Holding this read guard also blocks [`Self::gc_timeline`] from entering while we
// are rewriting layers.
let latest_gc_cutoff = self.get_applied_gc_cutoff_lsn();
tracing::info!(
"starting shard ancestor compaction, latest_gc_cutoff: {}, pitr cutoff {}",
*latest_gc_cutoff,
self.gc_info.read().unwrap().cutoffs.time
);
let pitr_cutoff = self.gc_info.read().unwrap().cutoffs.time;
let layers = self.layers.read().await;
for layer_desc in layers.layer_map()?.iter_historic_layers() {
@@ -1171,8 +1167,8 @@ impl Timeline {
// This ancestral layer only covers keys that belong to other shards.
// We include the full metadata in the log: if we had some critical bug that caused
// us to incorrectly drop layers, this would simplify manually debugging + reinstating those layers.
info!(%layer, old_metadata=?layer.metadata(),
"dropping layer after shard split, contains no keys for this shard.",
debug!(%layer, old_metadata=?layer.metadata(),
"dropping layer after shard split, contains no keys for this shard",
);
if cfg!(debug_assertions) {
@@ -1234,9 +1230,10 @@ impl Timeline {
}
if layers_to_rewrite.len() >= rewrite_max {
tracing::info!(%layer, "Will rewrite layer on a future compaction, already rewrote {}",
debug!(%layer, "Will rewrite layer on a future compaction, already rewrote {}",
layers_to_rewrite.len()
);
rewrite_max_exceeded = true;
continue;
}
@@ -1244,9 +1241,24 @@ impl Timeline {
layers_to_rewrite.push(layer);
}
// Drop read lock on layer map before we start doing time-consuming I/O
// Drop read lock on layer map before we start doing time-consuming I/O.
drop(layers);
// Drop out early if there's nothing to do.
if layers_to_rewrite.is_empty() && drop_layers.is_empty() {
return Ok(());
}
info!(
"starting shard ancestor compaction, rewriting {} layers and dropping {} layers \
(latest_gc_cutoff={} pitr_cutoff={})",
layers_to_rewrite.len(),
drop_layers.len(),
*latest_gc_cutoff,
pitr_cutoff,
);
let started = Instant::now();
let mut replace_image_layers = Vec::new();
for layer in layers_to_rewrite {
@@ -1254,13 +1266,15 @@ impl Timeline {
return Err(CompactionError::ShuttingDown);
}
tracing::info!(layer=%layer, "Rewriting layer after shard split...");
info!(layer=%layer, "rewriting layer after shard split");
let mut image_layer_writer = ImageLayerWriter::new(
self.conf,
self.timeline_id,
self.tenant_shard_id,
&layer.layer_desc().key_range,
layer.layer_desc().image_layer_lsn(),
&self.gate,
self.cancel.clone(),
ctx,
)
.await
@@ -1292,7 +1306,7 @@ impl Timeline {
.map_err(CompactionError::Other)?;
let new_layer = Layer::finish_creating(self.conf, self, desc, &path)
.map_err(CompactionError::Other)?;
tracing::info!(layer=%new_layer, "Rewrote layer, {} -> {} bytes",
info!(layer=%new_layer, "rewrote layer, {} -> {} bytes",
layer.metadata().file_size,
new_layer.metadata().file_size);
@@ -1304,6 +1318,12 @@ impl Timeline {
}
}
for layer in &drop_layers {
info!(%layer, old_metadata=?layer.metadata(),
"dropping layer after shard split (no keys for this shard)",
);
}
// At this point, we have replaced local layer files with their rewritten form, but not yet uploaded
// metadata to reflect that. If we restart here, the replaced layer files will look invalid (size mismatch
// to remote index) and be removed. This is inefficient but safe.
@@ -1319,6 +1339,7 @@ impl Timeline {
// necessary for correctness, but it simplifies testing, and avoids proceeding with another
// Timeline's compaction while this timeline's uploads may be generating lots of disk I/O
// load.
info!("shard ancestor compaction waiting for uploads");
match self.remote_client.wait_completion().await {
Ok(()) => (),
Err(WaitCompletionError::NotInitialized(ni)) => return Err(CompactionError::from(ni)),
@@ -1327,6 +1348,15 @@ impl Timeline {
}
}
info!(
"shard ancestor compaction done in {:.3}s{}",
started.elapsed().as_secs_f64(),
match rewrite_max_exceeded {
true => format!(", more work pending due to rewrite_max={rewrite_max}"),
false => String::new(),
}
);
fail::fail_point!("compact-shard-ancestors-persistent");
Ok(())
@@ -1861,6 +1891,8 @@ impl Timeline {
debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
lsn_range.clone()
},
&self.gate,
self.cancel.clone(),
ctx,
)
.await
@@ -3055,6 +3087,8 @@ impl Timeline {
job_desc.compaction_key_range.start,
lowest_retain_lsn,
self.get_compaction_target_size(),
&self.gate,
self.cancel.clone(),
ctx,
)
.await
@@ -3071,6 +3105,8 @@ impl Timeline {
self.tenant_shard_id,
lowest_retain_lsn..end_lsn,
self.get_compaction_target_size(),
&self.gate,
self.cancel.clone(),
)
.await
.context("failed to create delta layer writer")
@@ -3167,6 +3203,8 @@ impl Timeline {
self.tenant_shard_id,
desc.key_range.start,
desc.lsn_range.clone(),
&self.gate,
self.cancel.clone(),
ctx,
)
.await
@@ -3184,6 +3222,8 @@ impl Timeline {
self.tenant_shard_id,
job_desc.compaction_key_range.end,
desc.lsn_range.clone(),
&self.gate,
self.cancel.clone(),
ctx,
)
.await
@@ -3753,6 +3793,8 @@ impl CompactionJobExecutor for TimelineAdaptor {
self.timeline.tenant_shard_id,
key_range.start,
lsn_range.clone(),
&self.timeline.gate,
self.timeline.cancel.clone(),
ctx,
)
.await?;
@@ -3828,6 +3870,8 @@ impl TimelineAdaptor {
self.timeline.tenant_shard_id,
key_range,
lsn,
&self.timeline.gate,
self.timeline.cancel.clone(),
ctx,
)
.await?;

View File

@@ -231,6 +231,8 @@ async fn generate_tombstone_image_layer(
detached.tenant_shard_id,
&key_range,
image_lsn,
&detached.gate,
detached.cancel.clone(),
ctx,
)
.await
@@ -779,6 +781,8 @@ async fn copy_lsn_prefix(
target_timeline.tenant_shard_id,
layer.layer_desc().key_range.start,
layer.layer_desc().lsn_range.start..end_lsn,
&target_timeline.gate,
target_timeline.cancel.clone(),
ctx,
)
.await

View File

@@ -738,6 +738,8 @@ impl ChunkProcessingJob {
self.timeline.tenant_shard_id,
&self.range,
self.pgdata_lsn,
&self.timeline.gate,
self.timeline.cancel.clone(),
ctx,
)
.await?;

View File

@@ -580,6 +580,7 @@ impl ConnectionManagerState {
);
Ok(())
}
WalReceiverError::Cancelled => Ok(()),
WalReceiverError::Other(e) => {
// give out an error to have task_mgr give it a really verbose logging
if cancellation.is_cancelled() {

View File

@@ -73,6 +73,7 @@ pub(super) enum WalReceiverError {
/// Generic error
Other(anyhow::Error),
ClosedGate,
Cancelled,
}
impl From<tokio_postgres::Error> for WalReceiverError {
@@ -200,6 +201,9 @@ pub(super) async fn handle_walreceiver_connection(
// with a similar error.
},
WalReceiverError::SuccessfulCompletion(_) => {}
WalReceiverError::Cancelled => {
debug!("Connection cancelled")
}
WalReceiverError::ClosedGate => {
// doesn't happen at runtime
}
@@ -273,7 +277,12 @@ pub(super) async fn handle_walreceiver_connection(
let mut waldecoder = WalStreamDecoder::new(startpoint, timeline.pg_version);
let mut walingest = WalIngest::new(timeline.as_ref(), startpoint, &ctx).await?;
let mut walingest = WalIngest::new(timeline.as_ref(), startpoint, &ctx)
.await
.map_err(|e| match e.kind {
crate::walingest::WalIngestErrorKind::Cancelled => WalReceiverError::Cancelled,
_ => WalReceiverError::Other(e.into()),
})?;
let shard = vec![*timeline.get_shard_identity()];

View File

@@ -134,6 +134,20 @@ where
Ok((bytes_amount, writer))
}
pub async fn into_inner_no_flush(self) -> Arc<W> {
let Self {
mutable: buf,
maybe_flushed: _,
writer,
mut flush_handle,
bytes_submitted: _,
} = self;
// If the flush task panicked, that's fine.
let _ = flush_handle.shutdown().await;
assert!(buf.is_some());
writer
}
#[cfg(test)]
pub(crate) fn mutable(&self) -> &B {
self.mutable.as_ref().expect("must not use after an error")

View File

@@ -20,6 +20,11 @@ pub struct FlushHandleInner<Buf, W> {
/// A bi-directional channel that sends (buffer, offset) for writes,
/// and receives recyled buffer.
channel: duplex::mpsc::Duplex<FlushRequest<Buf>, FullSlice<Buf>>,
/// The flush task is sometimes sensitive to channel disconnection
/// (i.e. when we drop [`Self::channel`]), other times sensitive to
/// [`FlushBackgroundTask::cancel`], but never both.
/// So, also store this drop guard.
set_flush_task_cancelled: tokio_util::sync::DropGuard,
/// Join handle for the background flush task.
join_handle: tokio::task::JoinHandle<Result<Arc<W>, FlushTaskError>>,
}
@@ -134,8 +139,10 @@ where
back.try_send(buf.flush())
.expect("we just created it with capacity 1");
let cancel = cancel.child_token();
let join_handle = tokio::spawn(
FlushBackgroundTask::new(back, file, gate_guard, cancel, ctx)
FlushBackgroundTask::new(back, file, gate_guard, cancel.clone(), ctx)
.run()
.instrument(span),
);
@@ -143,6 +150,7 @@ where
FlushHandle {
inner: Some(FlushHandleInner {
channel: front,
set_flush_task_cancelled: cancel.drop_guard(),
join_handle,
}),
}
@@ -189,6 +197,7 @@ where
.take()
.expect("must not use after we returned an error");
drop(handle.channel.tx);
drop(handle.set_flush_task_cancelled);
handle.join_handle.await.unwrap()
}

View File

@@ -21,13 +21,13 @@
//! redo Postgres process, but some records it can handle directly with
//! bespoken Rust code.
use std::backtrace::Backtrace;
use std::collections::HashMap;
use std::sync::{Arc, OnceLock};
use std::time::{Duration, Instant, SystemTime};
use anyhow::{Result, bail};
use bytes::{Buf, Bytes};
use pageserver_api::key::rel_block_to_key;
use pageserver_api::key::{Key, rel_block_to_key};
use pageserver_api::record::NeonWalRecord;
use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
use pageserver_api::shard::ShardIdentity;
@@ -38,7 +38,7 @@ use postgres_ffi::{
fsm_logical_to_physical, pg_constants,
};
use tracing::*;
use utils::bin_ser::SerializeError;
use utils::bin_ser::{DeserializeError, SerializeError};
use utils::lsn::Lsn;
use utils::rate_limit::RateLimit;
use utils::{critical, failpoint_support};
@@ -104,12 +104,101 @@ struct WarnIngestLag {
timestamp_invalid_msg_ratelimit: RateLimit,
}
pub struct WalIngestError {
pub backtrace: std::backtrace::Backtrace,
pub kind: WalIngestErrorKind,
}
#[derive(thiserror::Error, Debug)]
pub enum WalIngestErrorKind {
#[error(transparent)]
#[allow(private_interfaces)]
PageReconstructError(#[from] PageReconstructError),
#[error(transparent)]
DeserializationFailure(#[from] DeserializeError),
#[error(transparent)]
SerializationFailure(#[from] SerializeError),
#[error("the request contains data not supported by pageserver: {0} @ {1}")]
InvalidKey(Key, Lsn),
#[error("twophase file for xid {0} already exists")]
FileAlreadyExists(u64),
#[error("slru segment {0:?}/{1} already exists")]
SlruAlreadyExists(SlruKind, u32),
#[error("relation already exists")]
RelationAlreadyExists(RelTag),
#[error("invalid reldir key {0}")]
InvalidRelDirKey(Key),
#[error(transparent)]
LogicalError(anyhow::Error),
#[error(transparent)]
EncodeAuxFileError(anyhow::Error),
#[error(transparent)]
MaybeRelSizeV2Error(anyhow::Error),
#[error("timeline shutting down")]
Cancelled,
}
impl<T> From<T> for WalIngestError
where
WalIngestErrorKind: From<T>,
{
fn from(value: T) -> Self {
WalIngestError {
backtrace: Backtrace::capture(),
kind: WalIngestErrorKind::from(value),
}
}
}
impl std::error::Error for WalIngestError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
self.kind.source()
}
}
impl core::fmt::Display for WalIngestError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
self.kind.fmt(f)
}
}
impl core::fmt::Debug for WalIngestError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
if f.alternate() {
f.debug_map()
.key(&"backtrace")
.value(&self.backtrace)
.key(&"kind")
.value(&self.kind)
.finish()
} else {
writeln!(f, "Error: {:?}", self.kind)?;
if self.backtrace.status() == std::backtrace::BacktraceStatus::Captured {
writeln!(f, "Stack backtrace: {:?}", self.backtrace)?;
}
Ok(())
}
}
}
#[macro_export]
macro_rules! ensure_walingest {
($($t:tt)*) => {
_ = || -> Result<(), anyhow::Error> {
anyhow::ensure!($($t)*);
Ok(())
}().map_err(WalIngestErrorKind::LogicalError)?;
};
}
impl WalIngest {
pub async fn new(
timeline: &Timeline,
startpoint: Lsn,
ctx: &RequestContext,
) -> anyhow::Result<WalIngest> {
) -> Result<WalIngest, WalIngestError> {
// Fetch the latest checkpoint into memory, so that we can compare with it
// quickly in `ingest_record` and update it when it changes.
let checkpoint_bytes = timeline.get_checkpoint(startpoint, ctx).await?;
@@ -145,7 +234,7 @@ impl WalIngest {
interpreted: InterpretedWalRecord,
modification: &mut DatadirModification<'_>,
ctx: &RequestContext,
) -> anyhow::Result<bool> {
) -> Result<bool, WalIngestError> {
WAL_INGEST.records_received.inc();
let prev_len = modification.len();
@@ -288,7 +377,7 @@ impl WalIngest {
}
/// This is the same as AdjustToFullTransactionId(xid) in PostgreSQL
fn adjust_to_full_transaction_id(&self, xid: TransactionId) -> Result<u64> {
fn adjust_to_full_transaction_id(&self, xid: TransactionId) -> Result<u64, WalIngestError> {
let next_full_xid =
enum_pgversion_dispatch!(&self.checkpoint, CheckPoint, cp, { cp.nextXid.value });
@@ -298,9 +387,9 @@ impl WalIngest {
if xid > next_xid {
// Wraparound occurred, must be from a prev epoch.
if epoch == 0 {
bail!(
Err(WalIngestErrorKind::LogicalError(anyhow::anyhow!(
"apparent XID wraparound with prepared transaction XID {xid}, nextXid is {next_full_xid}"
);
)))?;
}
epoch -= 1;
}
@@ -313,7 +402,7 @@ impl WalIngest {
clear_vm_bits: ClearVmBits,
modification: &mut DatadirModification<'_>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
let ClearVmBits {
new_heap_blkno,
old_heap_blkno,
@@ -402,7 +491,7 @@ impl WalIngest {
create: DbaseCreate,
modification: &mut DatadirModification<'_>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
let DbaseCreate {
db_id,
tablespace_id,
@@ -505,7 +594,7 @@ impl WalIngest {
dbase_drop: DbaseDrop,
modification: &mut DatadirModification<'_>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
let DbaseDrop {
db_id,
tablespace_ids,
@@ -523,7 +612,7 @@ impl WalIngest {
create: SmgrCreate,
modification: &mut DatadirModification<'_>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
let SmgrCreate { rel } = create;
self.put_rel_creation(modification, rel, ctx).await?;
Ok(())
@@ -537,7 +626,7 @@ impl WalIngest {
truncate: XlSmgrTruncate,
modification: &mut DatadirModification<'_>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
let XlSmgrTruncate {
blkno,
rnode,
@@ -689,7 +778,7 @@ impl WalIngest {
record: XactRecord,
modification: &mut DatadirModification<'_>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
let (xact_common, is_commit, is_prepared) = match record {
XactRecord::Prepare(XactPrepare { xl_xid, data }) => {
let xid: u64 = if modification.tline.pg_version >= 17 {
@@ -813,7 +902,7 @@ impl WalIngest {
truncate: ClogTruncate,
modification: &mut DatadirModification<'_>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
let ClogTruncate {
pageno,
oldest_xid,
@@ -889,7 +978,7 @@ impl WalIngest {
zero_page: ClogZeroPage,
modification: &mut DatadirModification<'_>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
let ClogZeroPage { segno, rpageno } = zero_page;
self.put_slru_page_image(
@@ -907,7 +996,7 @@ impl WalIngest {
&mut self,
modification: &mut DatadirModification,
xlrec: &XlMultiXactCreate,
) -> Result<()> {
) -> Result<(), WalIngestError> {
// Create WAL record for updating the multixact-offsets page
let pageno = xlrec.mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
@@ -1010,7 +1099,7 @@ impl WalIngest {
modification: &mut DatadirModification<'_>,
xlrec: &XlMultiXactTruncate,
ctx: &RequestContext,
) -> Result<()> {
) -> Result<(), WalIngestError> {
let (maxsegment, startsegment, endsegment) =
enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
cp.oldestMulti = xlrec.end_trunc_off;
@@ -1058,7 +1147,7 @@ impl WalIngest {
zero_page: MultiXactZeroPage,
modification: &mut DatadirModification<'_>,
ctx: &RequestContext,
) -> Result<()> {
) -> Result<(), WalIngestError> {
let MultiXactZeroPage {
slru_kind,
segno,
@@ -1080,7 +1169,7 @@ impl WalIngest {
update: RelmapUpdate,
modification: &mut DatadirModification<'_>,
ctx: &RequestContext,
) -> Result<()> {
) -> Result<(), WalIngestError> {
let RelmapUpdate { update, buf } = update;
modification
@@ -1093,7 +1182,7 @@ impl WalIngest {
raw_record: RawXlogRecord,
modification: &mut DatadirModification<'_>,
ctx: &RequestContext,
) -> Result<()> {
) -> Result<(), WalIngestError> {
let RawXlogRecord { info, lsn, mut buf } = raw_record;
let pg_version = modification.tline.pg_version;
@@ -1235,12 +1324,12 @@ impl WalIngest {
put: PutLogicalMessage,
modification: &mut DatadirModification<'_>,
ctx: &RequestContext,
) -> Result<()> {
) -> Result<(), WalIngestError> {
let PutLogicalMessage { path, buf } = put;
modification.put_file(path.as_str(), &buf, ctx).await
}
fn ingest_standby_record(&mut self, record: StandbyRecord) -> Result<()> {
fn ingest_standby_record(&mut self, record: StandbyRecord) -> Result<(), WalIngestError> {
match record {
StandbyRecord::RunningXacts(running_xacts) => {
enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
@@ -1258,7 +1347,7 @@ impl WalIngest {
&mut self,
record: ReploriginRecord,
modification: &mut DatadirModification<'_>,
) -> Result<()> {
) -> Result<(), WalIngestError> {
match record {
ReploriginRecord::Set(set) => {
modification
@@ -1278,7 +1367,7 @@ impl WalIngest {
modification: &mut DatadirModification<'_>,
rel: RelTag,
ctx: &RequestContext,
) -> Result<()> {
) -> Result<(), WalIngestError> {
modification.put_rel_creation(rel, 0, ctx).await?;
Ok(())
}
@@ -1291,7 +1380,7 @@ impl WalIngest {
blknum: BlockNumber,
img: Bytes,
ctx: &RequestContext,
) -> Result<(), PageReconstructError> {
) -> Result<(), WalIngestError> {
self.handle_rel_extend(modification, rel, blknum, ctx)
.await?;
modification.put_rel_page_image(rel, blknum, img)?;
@@ -1305,7 +1394,7 @@ impl WalIngest {
blknum: BlockNumber,
rec: NeonWalRecord,
ctx: &RequestContext,
) -> Result<()> {
) -> Result<(), WalIngestError> {
self.handle_rel_extend(modification, rel, blknum, ctx)
.await?;
modification.put_rel_wal_record(rel, blknum, rec)?;
@@ -1318,7 +1407,7 @@ impl WalIngest {
rel: RelTag,
nblocks: BlockNumber,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
modification.put_rel_truncation(rel, nblocks, ctx).await?;
Ok(())
}
@@ -1329,7 +1418,7 @@ impl WalIngest {
rel: RelTag,
blknum: BlockNumber,
ctx: &RequestContext,
) -> Result<(), PageReconstructError> {
) -> Result<(), WalIngestError> {
let new_nblocks = blknum + 1;
// Check if the relation exists. We implicitly create relations on first
// record.
@@ -1423,7 +1512,7 @@ impl WalIngest {
blknum: BlockNumber,
img: Bytes,
ctx: &RequestContext,
) -> Result<()> {
) -> Result<(), WalIngestError> {
if !self.shard.is_shard_zero() {
return Ok(());
}
@@ -1441,7 +1530,7 @@ impl WalIngest {
segno: u32,
blknum: BlockNumber,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
// we don't use a cache for this like we do for relations. SLRUS are explcitly
// extended with ZEROPAGE records, not with commit records, so it happens
// a lot less frequently.
@@ -1509,6 +1598,7 @@ async fn get_relsize(
#[allow(clippy::bool_assert_comparison)]
#[cfg(test)]
mod tests {
use anyhow::Result;
use postgres_ffi::RELSEG_SIZE;
use super::*;
@@ -1530,7 +1620,7 @@ mod tests {
}
#[tokio::test]
async fn test_zeroed_checkpoint_decodes_correctly() -> Result<()> {
async fn test_zeroed_checkpoint_decodes_correctly() -> Result<(), anyhow::Error> {
for i in 14..=16 {
dispatch_pgversion!(i, {
pgv::CheckPoint::decode(&pgv::ZERO_CHECKPOINT)?;

View File

@@ -22,6 +22,7 @@ use pageserver_api::controller_api::{
MetadataHealthListUnhealthyResponse, MetadataHealthUpdateRequest, MetadataHealthUpdateResponse,
NodeAvailability, NodeConfigureRequest, NodeRegisterRequest, SafekeeperSchedulingPolicyRequest,
ShardsPreferredAzsRequest, TenantCreateRequest, TenantPolicyRequest, TenantShardMigrateRequest,
TimelineImportRequest,
};
use pageserver_api::models::{
DetachBehavior, LsnLeaseRequest, TenantConfigPatchRequest, TenantConfigRequest,
@@ -1286,6 +1287,37 @@ async fn handle_tenant_import(req: Request<Body>) -> Result<Response<Body>, ApiE
)
}
async fn handle_timeline_import(req: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
check_permissions(&req, Scope::PageServerApi)?;
maybe_rate_limit(&req, tenant_id).await;
let mut req = match maybe_forward(req).await {
ForwardOutcome::Forwarded(res) => {
return res;
}
ForwardOutcome::NotForwarded(req) => req,
};
let import_req = json_request::<TimelineImportRequest>(&mut req).await?;
let state = get_state(&req);
if import_req.tenant_id != tenant_id || import_req.timeline_id != timeline_id {
return Err(ApiError::BadRequest(anyhow::anyhow!(
"tenant id or timeline id mismatch: url={tenant_id}/{timeline_id}, body={}/{}",
import_req.tenant_id,
import_req.timeline_id
)));
}
json_response(
StatusCode::OK,
state.service.timeline_import(import_req).await?,
)
}
async fn handle_tenants_dump(req: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Admin)?;
@@ -1959,6 +1991,16 @@ pub fn make_router(
RequestName("debug_v1_tenant_locate"),
)
})
.post(
"/debug/v1/tenant/:tenant_id/timeline/:timeline_id/import",
|r| {
named_request_span(
r,
handle_timeline_import,
RequestName("debug_v1_timeline_import"),
)
},
)
.get("/debug/v1/scheduler", |r| {
named_request_span(r, handle_scheduler_dump, RequestName("debug_v1_scheduler"))
})

View File

@@ -1852,6 +1852,7 @@ impl Service {
};
if insert {
let config = attach_req.config.clone().unwrap_or_default();
let tsp = TenantShardPersistence {
tenant_id: attach_req.tenant_shard_id.tenant_id.to_string(),
shard_number: attach_req.tenant_shard_id.shard_number.0 as i32,
@@ -1860,7 +1861,7 @@ impl Service {
generation: attach_req.generation_override.or(Some(0)),
generation_pageserver: None,
placement_policy: serde_json::to_string(&PlacementPolicy::Attached(0)).unwrap(),
config: serde_json::to_string(&TenantConfig::default()).unwrap(),
config: serde_json::to_string(&config).unwrap(),
splitting: SplitState::default(),
scheduling_policy: serde_json::to_string(&ShardSchedulingPolicy::default())
.unwrap(),
@@ -1883,16 +1884,16 @@ impl Service {
Ok(()) => {
tracing::info!("Inserted shard {} in database", attach_req.tenant_shard_id);
let mut locked = self.inner.write().unwrap();
locked.tenants.insert(
let mut shard = TenantShard::new(
attach_req.tenant_shard_id,
TenantShard::new(
attach_req.tenant_shard_id,
ShardIdentity::unsharded(),
PlacementPolicy::Attached(0),
None,
),
ShardIdentity::unsharded(),
PlacementPolicy::Attached(0),
None,
);
shard.config = config;
let mut locked = self.inner.write().unwrap();
locked.tenants.insert(attach_req.tenant_shard_id, shard);
tracing::info!("Inserted shard {} in memory", attach_req.tenant_shard_id);
}
}
@@ -1977,11 +1978,12 @@ impl Service {
.set_attached(scheduler, attach_req.node_id);
tracing::info!(
"attach_hook: tenant {} set generation {:?}, pageserver {}",
"attach_hook: tenant {} set generation {:?}, pageserver {}, config {:?}",
attach_req.tenant_shard_id,
tenant_shard.generation,
// TODO: this is an odd number of 0xf's
attach_req.node_id.unwrap_or(utils::id::NodeId(0xfffffff))
attach_req.node_id.unwrap_or(utils::id::NodeId(0xfffffff)),
attach_req.config,
);
// Trick the reconciler into not doing anything for this tenant: this helps

View File

@@ -12,13 +12,16 @@ use crate::persistence::{
use crate::safekeeper::Safekeeper;
use anyhow::Context;
use http_utils::error::ApiError;
use pageserver_api::controller_api::{SafekeeperDescribeResponse, SkSchedulingPolicy};
use pageserver_api::controller_api::{
SafekeeperDescribeResponse, SkSchedulingPolicy, TimelineImportRequest,
};
use pageserver_api::models::{self, SafekeeperInfo, SafekeepersInfo, TimelineInfo};
use safekeeper_api::membership::{MemberSet, SafekeeperId};
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use utils::id::{NodeId, TenantId, TimelineId};
use utils::logging::SecretString;
use utils::lsn::Lsn;
use super::Service;
@@ -298,6 +301,31 @@ impl Service {
timeline_id,
})
}
/// Directly insert the timeline into the database without reconciling it with safekeepers.
///
/// Useful if the timeline already exists on the specified safekeepers,
/// but we want to make it storage controller managed.
pub(crate) async fn timeline_import(&self, req: TimelineImportRequest) -> Result<(), ApiError> {
let persistence = TimelinePersistence {
tenant_id: req.tenant_id.to_string(),
timeline_id: req.timeline_id.to_string(),
start_lsn: Lsn::INVALID.into(),
generation: 1,
sk_set: req.sk_set.iter().map(|sk_id| sk_id.0 as i64).collect(),
new_sk_set: None,
cplane_notified_generation: 1,
deleted_at: None,
};
let inserted = self.persistence.insert_timeline(persistence).await?;
if inserted {
tracing::info!("imported timeline into db");
} else {
tracing::info!("didn't import timeline into db, as it is already present in db");
}
Ok(())
}
/// Perform timeline deletion on safekeepers. Will return success: we persist the deletion into the reconciler.
pub(super) async fn tenant_timeline_delete_safekeepers(
self: &Arc<Self>,

View File

@@ -1986,10 +1986,13 @@ class NeonStorageController(MetricsGetter, LogUtils):
tenant_shard_id: TenantId | TenantShardId,
pageserver_id: int,
generation_override: int | None = None,
config: None | dict[str, Any] = None,
) -> int:
body = {"tenant_shard_id": str(tenant_shard_id), "node_id": pageserver_id}
if generation_override is not None:
body["generation_override"] = generation_override
if config is not None:
body["config"] = config
response = self.request(
"POST",
@@ -2884,13 +2887,14 @@ class NeonPageserver(PgProtocol, LogUtils):
self,
immediate: bool = False,
timeout_in_seconds: int | None = None,
extra_env_vars: dict[str, str] | None = None,
):
"""
High level wrapper for restart: restarts the process, and waits for
tenant state to stabilize.
"""
self.stop(immediate=immediate)
self.start(timeout_in_seconds=timeout_in_seconds)
self.start(timeout_in_seconds=timeout_in_seconds, extra_env_vars=extra_env_vars)
self.quiesce_tenants()
def quiesce_tenants(self):
@@ -2979,11 +2983,12 @@ class NeonPageserver(PgProtocol, LogUtils):
to call into the pageserver HTTP client.
"""
client = self.http_client()
if generation is None:
generation = self.env.storage_controller.attach_hook_issue(tenant_id, self.id)
elif override_storage_controller_generation:
if generation is None or override_storage_controller_generation:
generation = self.env.storage_controller.attach_hook_issue(
tenant_id, self.id, generation
tenant_id,
self.id,
generation_override=generation if override_storage_controller_generation else None,
config=config,
)
return client.tenant_attach(
tenant_id,

View File

@@ -65,13 +65,11 @@ def single_timeline(
assert ps_http.tenant_list() == []
def attach(tenant):
# NB: create the new tenant in the storage controller with the correct tenant config. This
# will pick up the existing tenant data from remote storage. If we just attach it to the
# Pageserver, the storage controller will reset the tenant config to the default.
env.create_tenant(
tenant_id=tenant,
timeline_id=template_timeline,
conf=template_config,
env.pageserver.tenant_attach(
tenant,
config=template_config,
generation=100,
override_storage_controller_generation=True,
)
with concurrent.futures.ThreadPoolExecutor(max_workers=22) as executor:

View File

@@ -66,11 +66,11 @@ def test_basebackup_with_high_slru_count(
n_txns = 500000
def setup_wrapper(env: NeonEnv):
return setup_tenant_template(env, n_txns)
env = setup_pageserver_with_tenants(
neon_env_builder, f"large_slru_count-{n_tenants}-{n_txns}", n_tenants, setup_wrapper
neon_env_builder,
f"large_slru_count-{n_tenants}-{n_txns}",
n_tenants,
lambda env: setup_tenant_template(env, n_txns),
)
run_benchmark(env, pg_bin, record, duration)
@@ -80,10 +80,6 @@ def setup_tenant_template(env: NeonEnv, n_txns: int):
"gc_period": "0s", # disable periodic gc
"checkpoint_timeout": "10 years",
"compaction_period": "0s", # disable periodic compaction
"compaction_threshold": 10,
"compaction_target_size": 134217728,
"checkpoint_distance": 268435456,
"image_creation_threshold": 3,
}
template_tenant, template_timeline = env.create_tenant(set_default=True)

View File

@@ -1,5 +1,8 @@
import concurrent.futures
import dataclasses
import json
import re
import threading
import time
from dataclasses import dataclass
from pathlib import Path
@@ -31,15 +34,15 @@ class PageServicePipeliningConfigPipelined(PageServicePipeliningConfig):
mode: str = "pipelined"
EXECUTION = ["concurrent-futures", "tasks"]
EXECUTION = ["concurrent-futures"]
NON_BATCHABLE: list[PageServicePipeliningConfig] = [PageServicePipeliningConfigSerial()]
for max_batch_size in [1, 32]:
for execution in EXECUTION:
NON_BATCHABLE.append(PageServicePipeliningConfigPipelined(max_batch_size, execution))
BATCHABLE: list[PageServicePipeliningConfig] = [PageServicePipeliningConfigSerial()]
for max_batch_size in [1, 2, 4, 8, 16, 32]:
BATCHABLE: list[PageServicePipeliningConfig] = []
for max_batch_size in [32]:
for execution in EXECUTION:
BATCHABLE.append(PageServicePipeliningConfigPipelined(max_batch_size, execution))
@@ -47,19 +50,6 @@ for max_batch_size in [1, 2, 4, 8, 16, 32]:
@pytest.mark.parametrize(
"tablesize_mib, pipelining_config, target_runtime, effective_io_concurrency, readhead_buffer_size, name",
[
# non-batchable workloads
# (A separate benchmark will consider latency).
*[
(
50,
config,
TARGET_RUNTIME,
1,
128,
f"not batchable {dataclasses.asdict(config)}",
)
for config in NON_BATCHABLE
],
# batchable workloads should show throughput and CPU efficiency improvements
*[
(
@@ -137,7 +127,14 @@ def test_throughput(
env = neon_env_builder.init_start()
ps_http = env.pageserver.http_client()
endpoint = env.endpoints.create_start("main")
endpoint = env.endpoints.create_start(
"main",
config_lines=[
# minimal lfc & small shared buffers to force requests to pageserver
"neon.max_file_cache_size=1MB",
"shared_buffers=10MB",
],
)
conn = endpoint.connect()
cur = conn.cursor()
@@ -155,7 +152,6 @@ def test_throughput(
tablesize = tablesize_mib * 1024 * 1024
npages = tablesize // (8 * 1024)
cur.execute("INSERT INTO t SELECT generate_series(1, %s)", (npages,))
# TODO: can we force postgres to do sequential scans?
#
# Run the workload, collect `Metrics` before and after, calculate difference, normalize.
@@ -211,31 +207,73 @@ def test_throughput(
).value,
)
def workload() -> Metrics:
def workload(disruptor_started: threading.Event) -> Metrics:
disruptor_started.wait()
start = time.time()
iters = 0
while time.time() - start < target_runtime or iters < 2:
log.info("Seqscan %d", iters)
if iters == 1:
# round zero for warming up
before = get_metrics()
cur.execute(
"select clear_buffer_cache()"
) # TODO: what about LFC? doesn't matter right now because LFC isn't enabled by default in tests
cur.execute("select sum(data::bigint) from t")
assert cur.fetchall()[0][0] == npages * (npages + 1) // 2
iters += 1
after = get_metrics()
return (after - before).normalize(iters - 1)
def disruptor(disruptor_started: threading.Event, stop_disruptor: threading.Event):
conn = endpoint.connect()
cur = conn.cursor()
iters = 0
while True:
cur.execute("SELECT pg_logical_emit_message(true, 'test', 'advancelsn')")
if stop_disruptor.is_set():
break
disruptor_started.set()
iters += 1
time.sleep(0.001)
return iters
env.pageserver.patch_config_toml_nonrecursive(
{"page_service_pipelining": dataclasses.asdict(pipelining_config)}
)
env.pageserver.restart()
metrics = workload()
# set trace for log analysis below
env.pageserver.restart(extra_env_vars={"RUST_LOG": "info,pageserver::page_service=trace"})
log.info("Starting workload")
with concurrent.futures.ThreadPoolExecutor() as executor:
disruptor_started = threading.Event()
stop_disruptor = threading.Event()
disruptor_fut = executor.submit(disruptor, disruptor_started, stop_disruptor)
workload_fut = executor.submit(workload, disruptor_started)
metrics = workload_fut.result()
stop_disruptor.set()
ndisruptions = disruptor_fut.result()
log.info("Disruptor issued %d disrupting requests", ndisruptions)
log.info("Results: %s", metrics)
since_last_start: list[str] = []
for line in env.pageserver.logfile.read_text().splitlines():
if "git:" in line:
since_last_start = []
since_last_start.append(line)
stopping_batching_because_re = re.compile(
r"stopping batching because (LSN changed|of batch size|timeline object mismatch|batch key changed|same page was requested at different LSNs|.*)"
)
reasons_for_stopping_batching = {}
for line in since_last_start:
match = stopping_batching_because_re.search(line)
if match:
if match.group(1) not in reasons_for_stopping_batching:
reasons_for_stopping_batching[match.group(1)] = 0
reasons_for_stopping_batching[match.group(1)] += 1
log.info("Reasons for stopping batching: %s", reasons_for_stopping_batching)
#
# Sanity-checks on the collected data
#

View File

@@ -15,7 +15,6 @@ if TYPE_CHECKING:
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin
@pytest.mark.skip("See https://github.com/neondatabase/neon/issues/11395")
def test_pageserver_getpage_throttle(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
env = neon_env_builder.init_start()
@@ -96,17 +95,12 @@ def test_pageserver_getpage_throttle(neon_env_builder: NeonEnvBuilder, pg_bin: P
_, marker_offset = wait_until(lambda: env.pageserver.assert_log_contains(marker, offset=None))
log.info("run pagebench")
duration_secs = 10
duration_secs = 20
actual_ncompleted = run_pagebench_at_max_speed_and_get_total_requests_completed(duration_secs)
log.info("validate the client is capped at the configured rps limit")
expect_ncompleted = duration_secs * rate_limit_rps
delta_abs = abs(expect_ncompleted - actual_ncompleted)
threshold = 0.05 * expect_ncompleted
assert threshold / rate_limit_rps < 0.1 * duration_secs, (
"test self-test: unrealistic expecations regarding precision in this test"
)
assert delta_abs < 0.05 * expect_ncompleted, (
assert pytest.approx(expect_ncompleted, 0.05) == actual_ncompleted, (
"the throttling deviates more than 5percent from the expectation"
)
@@ -120,6 +114,7 @@ def test_pageserver_getpage_throttle(neon_env_builder: NeonEnvBuilder, pg_bin: P
timeout=compaction_period,
)
log.info("validate the metrics")
smgr_query_seconds_post = ps_http.get_metric_value(smgr_metric_name, smgr_metrics_query)
assert smgr_query_seconds_post is not None
throttled_usecs_post = ps_http.get_metric_value(throttle_metric_name, throttle_metrics_query)
@@ -128,12 +123,13 @@ def test_pageserver_getpage_throttle(neon_env_builder: NeonEnvBuilder, pg_bin: P
actual_throttled_usecs = throttled_usecs_post - throttled_usecs_pre
actual_throttled_secs = actual_throttled_usecs / 1_000_000
log.info("validate that the metric doesn't include throttle wait time")
assert duration_secs >= 10 * actual_smgr_query_seconds, (
"smgr metrics should not include throttle wait time"
assert pytest.approx(actual_throttled_secs + actual_smgr_query_seconds, 0.1) == duration_secs, (
"throttling and processing latency = total request time; this assert validates thi holds on average"
)
log.info("validate that the throttling wait time metrics is correct")
assert pytest.approx(actual_throttled_secs + actual_smgr_query_seconds, 0.1) == duration_secs, (
"most of the time in this test is spent throttled because the rate-limit's contribution to latency dominates"
# without this assertion, the test would pass even if the throttling was completely broken
# but the request processing is so slow that it makes up for the latency that a correct throttling
# implementation would add
assert actual_smgr_query_seconds < 0.66 * duration_secs, (
"test self-test: request processing is consuming most of the wall clock time; this risks that we're not actually testing throttling"
)