mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-17 05:00:38 +00:00
Compare commits
11 Commits
release-co
...
problame/a
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1155945a3e | ||
|
|
42922cebe0 | ||
|
|
69e0c65393 | ||
|
|
2f0677be26 | ||
|
|
062c7b9a76 | ||
|
|
c66444ea15 | ||
|
|
88f01c1ca1 | ||
|
|
a6937a3281 | ||
|
|
3c8565a194 | ||
|
|
979fa0682b | ||
|
|
8884865bca |
@@ -13,7 +13,9 @@ use pageserver_api::controller_api::{
|
||||
NodeConfigureRequest, NodeDescribeResponse, NodeRegisterRequest, TenantCreateRequest,
|
||||
TenantCreateResponse, TenantLocateResponse,
|
||||
};
|
||||
use pageserver_api::models::{TenantConfigRequest, TimelineCreateRequest, TimelineInfo};
|
||||
use pageserver_api::models::{
|
||||
TenantConfig, TenantConfigRequest, TimelineCreateRequest, TimelineInfo,
|
||||
};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_client::mgmt_api::ResponseErrorMessageExt;
|
||||
use postgres_backend::AuthType;
|
||||
@@ -82,7 +84,8 @@ impl NeonStorageControllerStopArgs {
|
||||
pub struct AttachHookRequest {
|
||||
pub tenant_shard_id: TenantShardId,
|
||||
pub node_id: Option<NodeId>,
|
||||
pub generation_override: Option<i32>,
|
||||
pub generation_override: Option<i32>, // only new tenants
|
||||
pub config: Option<TenantConfig>, // only new tenants
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
@@ -805,6 +808,7 @@ impl StorageController {
|
||||
tenant_shard_id,
|
||||
node_id: Some(pageserver_id),
|
||||
generation_override: None,
|
||||
config: None,
|
||||
};
|
||||
|
||||
let response = self
|
||||
|
||||
@@ -7,7 +7,8 @@ use std::time::{Duration, Instant};
|
||||
/// API (`/control/v1` prefix). Implemented by the server
|
||||
/// in [`storage_controller::http`]
|
||||
use serde::{Deserialize, Serialize};
|
||||
use utils::id::{NodeId, TenantId};
|
||||
use utils::id::{NodeId, TenantId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use crate::models::{PageserverUtilization, ShardParameters, TenantConfig};
|
||||
use crate::shard::{ShardStripeSize, TenantShardId};
|
||||
@@ -499,6 +500,15 @@ pub struct SafekeeperSchedulingPolicyRequest {
|
||||
pub scheduling_policy: SkSchedulingPolicy,
|
||||
}
|
||||
|
||||
/// Import request for safekeeper timelines.
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
pub struct TimelineImportRequest {
|
||||
pub tenant_id: TenantId,
|
||||
pub timeline_id: TimelineId,
|
||||
pub start_lsn: Lsn,
|
||||
pub sk_set: Vec<NodeId>,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use serde_json;
|
||||
|
||||
@@ -927,7 +927,7 @@ impl Key {
|
||||
|
||||
/// Guaranteed to return `Ok()` if [`Self::is_rel_block_key`] returns `true` for `key`.
|
||||
#[inline(always)]
|
||||
pub fn to_rel_block(self) -> anyhow::Result<(RelTag, BlockNumber)> {
|
||||
pub fn to_rel_block(self) -> Result<(RelTag, BlockNumber), ToRelBlockError> {
|
||||
Ok(match self.field1 {
|
||||
0x00 => (
|
||||
RelTag {
|
||||
@@ -938,7 +938,7 @@ impl Key {
|
||||
},
|
||||
self.field6,
|
||||
),
|
||||
_ => anyhow::bail!("unexpected value kind 0x{:02x}", self.field1),
|
||||
_ => return Err(ToRelBlockError(self.field1)),
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -951,6 +951,17 @@ impl std::str::FromStr for Key {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ToRelBlockError(u8);
|
||||
|
||||
impl fmt::Display for ToRelBlockError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "unexpected value kind 0x{:02x}", self.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for ToRelBlockError {}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::str::FromStr;
|
||||
|
||||
@@ -126,7 +126,7 @@ async fn ingest(
|
||||
max_concurrency: NonZeroUsize::new(1).unwrap(),
|
||||
});
|
||||
let (_desc, path) = layer
|
||||
.write_to_disk(&ctx, None, l0_flush_state.inner())
|
||||
.write_to_disk(&ctx, None, l0_flush_state.inner(), &gate, cancel.clone())
|
||||
.await?
|
||||
.unwrap();
|
||||
tokio::fs::remove_file(path).await?;
|
||||
|
||||
@@ -3253,7 +3253,7 @@ async fn ingest_aux_files(
|
||||
modification
|
||||
.put_file(&fname, content.as_bytes(), &ctx)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
.map_err(|e| ApiError::InternalServerError(e.into()))?;
|
||||
}
|
||||
modification
|
||||
.commit(&ctx)
|
||||
|
||||
@@ -27,7 +27,7 @@ use crate::context::RequestContext;
|
||||
use crate::metrics::WAL_INGEST;
|
||||
use crate::pgdatadir_mapping::*;
|
||||
use crate::tenant::Timeline;
|
||||
use crate::walingest::WalIngest;
|
||||
use crate::walingest::{WalIngest, WalIngestErrorKind};
|
||||
|
||||
// Returns checkpoint LSN from controlfile
|
||||
pub fn get_lsn_from_controlfile(path: &Utf8Path) -> Result<Lsn> {
|
||||
@@ -157,9 +157,9 @@ async fn import_rel(
|
||||
.put_rel_creation(rel, nblocks as u32, ctx)
|
||||
.await
|
||||
{
|
||||
match e {
|
||||
RelationError::AlreadyExists => {
|
||||
debug!("Relation {} already exist. We must be extending it.", rel)
|
||||
match e.kind {
|
||||
WalIngestErrorKind::RelationAlreadyExists(rel) => {
|
||||
debug!("Relation {rel} already exists. We must be extending it.")
|
||||
}
|
||||
_ => return Err(e.into()),
|
||||
}
|
||||
|
||||
@@ -9,8 +9,9 @@
|
||||
use std::collections::{BTreeMap, HashMap, HashSet, hash_map};
|
||||
use std::ops::{ControlFlow, Range};
|
||||
|
||||
use crate::PERF_TRACE_TARGET;
|
||||
use anyhow::{Context, ensure};
|
||||
use crate::walingest::{WalIngestError, WalIngestErrorKind};
|
||||
use crate::{PERF_TRACE_TARGET, ensure_walingest};
|
||||
use anyhow::Context;
|
||||
use bytes::{Buf, Bytes, BytesMut};
|
||||
use enum_map::Enum;
|
||||
use itertools::Itertools;
|
||||
@@ -136,12 +137,8 @@ impl From<PageReconstructError> for CalculateLogicalSizeError {
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum RelationError {
|
||||
#[error("Relation Already Exists")]
|
||||
AlreadyExists,
|
||||
#[error("invalid relnode")]
|
||||
InvalidRelnode,
|
||||
#[error(transparent)]
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
///
|
||||
@@ -1478,8 +1475,8 @@ impl DatadirModification<'_> {
|
||||
}
|
||||
|
||||
/// Set the current lsn
|
||||
pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> anyhow::Result<()> {
|
||||
ensure!(
|
||||
pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> Result<(), WalIngestError> {
|
||||
ensure_walingest!(
|
||||
lsn >= self.lsn,
|
||||
"setting an older lsn {} than {} is not allowed",
|
||||
lsn,
|
||||
@@ -1578,7 +1575,7 @@ impl DatadirModification<'_> {
|
||||
&mut self,
|
||||
rel: RelTag,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<u32, PageReconstructError> {
|
||||
) -> Result<u32, WalIngestError> {
|
||||
// Get current size and put rel creation if rel doesn't exist
|
||||
//
|
||||
// NOTE: we check the cache first even though get_rel_exists and get_rel_size would
|
||||
@@ -1593,14 +1590,13 @@ impl DatadirModification<'_> {
|
||||
.await?
|
||||
{
|
||||
// create it with 0 size initially, the logic below will extend it
|
||||
self.put_rel_creation(rel, 0, ctx)
|
||||
.await
|
||||
.context("Relation Error")?;
|
||||
self.put_rel_creation(rel, 0, ctx).await?;
|
||||
Ok(0)
|
||||
} else {
|
||||
self.tline
|
||||
Ok(self
|
||||
.tline
|
||||
.get_rel_size(rel, Version::Modified(self), ctx)
|
||||
.await
|
||||
.await?)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1637,11 +1633,14 @@ impl DatadirModification<'_> {
|
||||
// TODO(vlad): remove this argument and replace the shard check with is_key_local
|
||||
shard: &ShardIdentity,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), WalIngestError> {
|
||||
let mut gaps_at_lsns = Vec::default();
|
||||
|
||||
for meta in batch.metadata.iter() {
|
||||
let (rel, blkno) = Key::from_compact(meta.key()).to_rel_block()?;
|
||||
let key = Key::from_compact(meta.key());
|
||||
let (rel, blkno) = key
|
||||
.to_rel_block()
|
||||
.map_err(|_| WalIngestErrorKind::InvalidKey(key, meta.lsn()))?;
|
||||
let new_nblocks = blkno + 1;
|
||||
|
||||
let old_nblocks = self.create_relation_if_required(rel, ctx).await?;
|
||||
@@ -1683,8 +1682,8 @@ impl DatadirModification<'_> {
|
||||
rel: RelTag,
|
||||
blknum: BlockNumber,
|
||||
rec: NeonWalRecord,
|
||||
) -> anyhow::Result<()> {
|
||||
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
|
||||
) -> Result<(), WalIngestError> {
|
||||
ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
|
||||
self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec));
|
||||
Ok(())
|
||||
}
|
||||
@@ -1696,7 +1695,7 @@ impl DatadirModification<'_> {
|
||||
segno: u32,
|
||||
blknum: BlockNumber,
|
||||
rec: NeonWalRecord,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), WalIngestError> {
|
||||
if !self.tline.tenant_shard_id.is_shard_zero() {
|
||||
return Ok(());
|
||||
}
|
||||
@@ -1714,14 +1713,11 @@ impl DatadirModification<'_> {
|
||||
rel: RelTag,
|
||||
blknum: BlockNumber,
|
||||
img: Bytes,
|
||||
) -> anyhow::Result<()> {
|
||||
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
|
||||
) -> Result<(), WalIngestError> {
|
||||
ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
|
||||
let key = rel_block_to_key(rel, blknum);
|
||||
if !key.is_valid_key_on_write_path() {
|
||||
anyhow::bail!(
|
||||
"the request contains data not supported by pageserver at {}",
|
||||
key
|
||||
);
|
||||
Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
|
||||
}
|
||||
self.put(rel_block_to_key(rel, blknum), Value::Image(img));
|
||||
Ok(())
|
||||
@@ -1733,15 +1729,12 @@ impl DatadirModification<'_> {
|
||||
segno: u32,
|
||||
blknum: BlockNumber,
|
||||
img: Bytes,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), WalIngestError> {
|
||||
assert!(self.tline.tenant_shard_id.is_shard_zero());
|
||||
|
||||
let key = slru_block_to_key(kind, segno, blknum);
|
||||
if !key.is_valid_key_on_write_path() {
|
||||
anyhow::bail!(
|
||||
"the request contains data not supported by pageserver at {}",
|
||||
key
|
||||
);
|
||||
Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
|
||||
}
|
||||
self.put(key, Value::Image(img));
|
||||
Ok(())
|
||||
@@ -1751,15 +1744,11 @@ impl DatadirModification<'_> {
|
||||
&mut self,
|
||||
rel: RelTag,
|
||||
blknum: BlockNumber,
|
||||
) -> anyhow::Result<()> {
|
||||
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
|
||||
) -> Result<(), WalIngestError> {
|
||||
ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
|
||||
let key = rel_block_to_key(rel, blknum);
|
||||
if !key.is_valid_key_on_write_path() {
|
||||
anyhow::bail!(
|
||||
"the request contains data not supported by pageserver: {} @ {}",
|
||||
key,
|
||||
self.lsn
|
||||
);
|
||||
Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
|
||||
}
|
||||
|
||||
let batch = self
|
||||
@@ -1776,15 +1765,11 @@ impl DatadirModification<'_> {
|
||||
kind: SlruKind,
|
||||
segno: u32,
|
||||
blknum: BlockNumber,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), WalIngestError> {
|
||||
assert!(self.tline.tenant_shard_id.is_shard_zero());
|
||||
let key = slru_block_to_key(kind, segno, blknum);
|
||||
if !key.is_valid_key_on_write_path() {
|
||||
anyhow::bail!(
|
||||
"the request contains data not supported by pageserver: {} @ {}",
|
||||
key,
|
||||
self.lsn
|
||||
);
|
||||
Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
|
||||
}
|
||||
|
||||
let batch = self
|
||||
@@ -1832,8 +1817,10 @@ impl DatadirModification<'_> {
|
||||
dbnode: Oid,
|
||||
img: Bytes,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
let v2_enabled = self.maybe_enable_rel_size_v2()?;
|
||||
) -> Result<(), WalIngestError> {
|
||||
let v2_enabled = self
|
||||
.maybe_enable_rel_size_v2()
|
||||
.map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
|
||||
|
||||
// Add it to the directory (if it doesn't exist already)
|
||||
let buf = self.get(DBDIR_KEY, ctx).await?;
|
||||
@@ -1874,13 +1861,13 @@ impl DatadirModification<'_> {
|
||||
xid: u64,
|
||||
img: Bytes,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), WalIngestError> {
|
||||
// Add it to the directory entry
|
||||
let dirbuf = self.get(TWOPHASEDIR_KEY, ctx).await?;
|
||||
let newdirbuf = if self.tline.pg_version >= 17 {
|
||||
let mut dir = TwoPhaseDirectoryV17::des(&dirbuf)?;
|
||||
if !dir.xids.insert(xid) {
|
||||
anyhow::bail!("twophase file for xid {} already exists", xid);
|
||||
Err(WalIngestErrorKind::FileAlreadyExists(xid))?;
|
||||
}
|
||||
self.pending_directory_entries.push((
|
||||
DirectoryKind::TwoPhase,
|
||||
@@ -1891,7 +1878,7 @@ impl DatadirModification<'_> {
|
||||
let xid = xid as u32;
|
||||
let mut dir = TwoPhaseDirectory::des(&dirbuf)?;
|
||||
if !dir.xids.insert(xid) {
|
||||
anyhow::bail!("twophase file for xid {} already exists", xid);
|
||||
Err(WalIngestErrorKind::FileAlreadyExists(xid.into()))?;
|
||||
}
|
||||
self.pending_directory_entries.push((
|
||||
DirectoryKind::TwoPhase,
|
||||
@@ -1909,22 +1896,22 @@ impl DatadirModification<'_> {
|
||||
&mut self,
|
||||
origin_id: RepOriginId,
|
||||
origin_lsn: Lsn,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), WalIngestError> {
|
||||
let key = repl_origin_key(origin_id);
|
||||
self.put(key, Value::Image(origin_lsn.ser().unwrap().into()));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn drop_replorigin(&mut self, origin_id: RepOriginId) -> anyhow::Result<()> {
|
||||
pub async fn drop_replorigin(&mut self, origin_id: RepOriginId) -> Result<(), WalIngestError> {
|
||||
self.set_replorigin(origin_id, Lsn::INVALID).await
|
||||
}
|
||||
|
||||
pub fn put_control_file(&mut self, img: Bytes) -> anyhow::Result<()> {
|
||||
pub fn put_control_file(&mut self, img: Bytes) -> Result<(), WalIngestError> {
|
||||
self.put(CONTROLFILE_KEY, Value::Image(img));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn put_checkpoint(&mut self, img: Bytes) -> anyhow::Result<()> {
|
||||
pub fn put_checkpoint(&mut self, img: Bytes) -> Result<(), WalIngestError> {
|
||||
self.put(CHECKPOINT_KEY, Value::Image(img));
|
||||
Ok(())
|
||||
}
|
||||
@@ -1934,7 +1921,7 @@ impl DatadirModification<'_> {
|
||||
spcnode: Oid,
|
||||
dbnode: Oid,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), WalIngestError> {
|
||||
let total_blocks = self
|
||||
.tline
|
||||
.get_db_size(spcnode, dbnode, Version::Modified(self), ctx)
|
||||
@@ -1973,20 +1960,21 @@ impl DatadirModification<'_> {
|
||||
rel: RelTag,
|
||||
nblocks: BlockNumber,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), RelationError> {
|
||||
) -> Result<(), WalIngestError> {
|
||||
if rel.relnode == 0 {
|
||||
return Err(RelationError::InvalidRelnode);
|
||||
Err(WalIngestErrorKind::LogicalError(anyhow::anyhow!(
|
||||
"invalid relnode"
|
||||
)))?;
|
||||
}
|
||||
// It's possible that this is the first rel for this db in this
|
||||
// tablespace. Create the reldir entry for it if so.
|
||||
let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await.context("read db")?)
|
||||
.context("deserialize db")?;
|
||||
let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await?)?;
|
||||
|
||||
let dbdir_exists =
|
||||
if let hash_map::Entry::Vacant(e) = dbdir.dbdirs.entry((rel.spcnode, rel.dbnode)) {
|
||||
// Didn't exist. Update dbdir
|
||||
e.insert(false);
|
||||
let buf = DbDirectory::ser(&dbdir).context("serialize db")?;
|
||||
let buf = DbDirectory::ser(&dbdir)?;
|
||||
self.pending_directory_entries.push((
|
||||
DirectoryKind::Db,
|
||||
MetricsUpdate::Set(dbdir.dbdirs.len() as u64),
|
||||
@@ -2003,27 +1991,25 @@ impl DatadirModification<'_> {
|
||||
RelDirectory::default()
|
||||
} else {
|
||||
// reldir already exists, fetch it
|
||||
RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?)
|
||||
.context("deserialize db")?
|
||||
RelDirectory::des(&self.get(rel_dir_key, ctx).await?)?
|
||||
};
|
||||
|
||||
let v2_enabled = self.maybe_enable_rel_size_v2()?;
|
||||
let v2_enabled = self
|
||||
.maybe_enable_rel_size_v2()
|
||||
.map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
|
||||
|
||||
if v2_enabled {
|
||||
if rel_dir.rels.contains(&(rel.relnode, rel.forknum)) {
|
||||
return Err(RelationError::AlreadyExists);
|
||||
Err(WalIngestErrorKind::RelationAlreadyExists(rel))?;
|
||||
}
|
||||
let sparse_rel_dir_key =
|
||||
rel_tag_sparse_key(rel.spcnode, rel.dbnode, rel.relnode, rel.forknum);
|
||||
// check if the rel_dir_key exists in v2
|
||||
let val = self
|
||||
.sparse_get(sparse_rel_dir_key, ctx)
|
||||
.await
|
||||
.map_err(|e| RelationError::Other(e.into()))?;
|
||||
let val = self.sparse_get(sparse_rel_dir_key, ctx).await?;
|
||||
let val = RelDirExists::decode_option(val)
|
||||
.map_err(|_| RelationError::Other(anyhow::anyhow!("invalid reldir key")))?;
|
||||
.map_err(|_| WalIngestErrorKind::InvalidRelDirKey(sparse_rel_dir_key))?;
|
||||
if val == RelDirExists::Exists {
|
||||
return Err(RelationError::AlreadyExists);
|
||||
Err(WalIngestErrorKind::RelationAlreadyExists(rel))?;
|
||||
}
|
||||
self.put(
|
||||
sparse_rel_dir_key,
|
||||
@@ -2039,9 +2025,7 @@ impl DatadirModification<'_> {
|
||||
// will be key not found errors if we don't create an empty one for rel_size_v2.
|
||||
self.put(
|
||||
rel_dir_key,
|
||||
Value::Image(Bytes::from(
|
||||
RelDirectory::ser(&RelDirectory::default()).context("serialize")?,
|
||||
)),
|
||||
Value::Image(Bytes::from(RelDirectory::ser(&RelDirectory::default())?)),
|
||||
);
|
||||
}
|
||||
self.pending_directory_entries
|
||||
@@ -2049,7 +2033,7 @@ impl DatadirModification<'_> {
|
||||
} else {
|
||||
// Add the new relation to the rel directory entry, and write it back
|
||||
if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
|
||||
return Err(RelationError::AlreadyExists);
|
||||
Err(WalIngestErrorKind::RelationAlreadyExists(rel))?;
|
||||
}
|
||||
if !dbdir_exists {
|
||||
self.pending_directory_entries
|
||||
@@ -2059,9 +2043,7 @@ impl DatadirModification<'_> {
|
||||
.push((DirectoryKind::Rel, MetricsUpdate::Add(1)));
|
||||
self.put(
|
||||
rel_dir_key,
|
||||
Value::Image(Bytes::from(
|
||||
RelDirectory::ser(&rel_dir).context("serialize")?,
|
||||
)),
|
||||
Value::Image(Bytes::from(RelDirectory::ser(&rel_dir)?)),
|
||||
);
|
||||
}
|
||||
|
||||
@@ -2086,8 +2068,8 @@ impl DatadirModification<'_> {
|
||||
rel: RelTag,
|
||||
nblocks: BlockNumber,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
|
||||
) -> Result<(), WalIngestError> {
|
||||
ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
|
||||
if self
|
||||
.tline
|
||||
.get_rel_exists(rel, Version::Modified(self), ctx)
|
||||
@@ -2117,8 +2099,8 @@ impl DatadirModification<'_> {
|
||||
rel: RelTag,
|
||||
nblocks: BlockNumber,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
|
||||
) -> Result<(), WalIngestError> {
|
||||
ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
|
||||
|
||||
// Put size
|
||||
let size_key = rel_size_to_key(rel);
|
||||
@@ -2142,8 +2124,10 @@ impl DatadirModification<'_> {
|
||||
&mut self,
|
||||
drop_relations: HashMap<(u32, u32), Vec<RelTag>>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
let v2_enabled = self.maybe_enable_rel_size_v2()?;
|
||||
) -> Result<(), WalIngestError> {
|
||||
let v2_enabled = self
|
||||
.maybe_enable_rel_size_v2()
|
||||
.map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
|
||||
for ((spc_node, db_node), rel_tags) in drop_relations {
|
||||
let dir_key = rel_dir_to_key(spc_node, db_node);
|
||||
let buf = self.get(dir_key, ctx).await?;
|
||||
@@ -2163,7 +2147,7 @@ impl DatadirModification<'_> {
|
||||
let key =
|
||||
rel_tag_sparse_key(spc_node, db_node, rel_tag.relnode, rel_tag.forknum);
|
||||
let val = RelDirExists::decode_option(self.sparse_get(key, ctx).await?)
|
||||
.map_err(|_| RelationError::Other(anyhow::anyhow!("invalid reldir key")))?;
|
||||
.map_err(|_| WalIngestErrorKind::InvalidKey(key, self.lsn))?;
|
||||
if val == RelDirExists::Exists {
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::RelV2, MetricsUpdate::Sub(1)));
|
||||
@@ -2206,7 +2190,7 @@ impl DatadirModification<'_> {
|
||||
segno: u32,
|
||||
nblocks: BlockNumber,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), WalIngestError> {
|
||||
assert!(self.tline.tenant_shard_id.is_shard_zero());
|
||||
|
||||
// Add it to the directory entry
|
||||
@@ -2215,7 +2199,7 @@ impl DatadirModification<'_> {
|
||||
let mut dir = SlruSegmentDirectory::des(&buf)?;
|
||||
|
||||
if !dir.segments.insert(segno) {
|
||||
anyhow::bail!("slru segment {kind:?}/{segno} already exists");
|
||||
Err(WalIngestErrorKind::SlruAlreadyExists(kind, segno))?;
|
||||
}
|
||||
self.pending_directory_entries.push((
|
||||
DirectoryKind::SlruSegment(kind),
|
||||
@@ -2242,7 +2226,7 @@ impl DatadirModification<'_> {
|
||||
kind: SlruKind,
|
||||
segno: u32,
|
||||
nblocks: BlockNumber,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), WalIngestError> {
|
||||
assert!(self.tline.tenant_shard_id.is_shard_zero());
|
||||
|
||||
// Put size
|
||||
@@ -2258,7 +2242,7 @@ impl DatadirModification<'_> {
|
||||
kind: SlruKind,
|
||||
segno: u32,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), WalIngestError> {
|
||||
// Remove it from the directory entry
|
||||
let dir_key = slru_dir_to_key(kind);
|
||||
let buf = self.get(dir_key, ctx).await?;
|
||||
@@ -2283,7 +2267,7 @@ impl DatadirModification<'_> {
|
||||
}
|
||||
|
||||
/// Drop a relmapper file (pg_filenode.map)
|
||||
pub fn drop_relmap_file(&mut self, _spcnode: Oid, _dbnode: Oid) -> anyhow::Result<()> {
|
||||
pub fn drop_relmap_file(&mut self, _spcnode: Oid, _dbnode: Oid) -> Result<(), WalIngestError> {
|
||||
// TODO
|
||||
Ok(())
|
||||
}
|
||||
@@ -2293,7 +2277,7 @@ impl DatadirModification<'_> {
|
||||
&mut self,
|
||||
xid: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), WalIngestError> {
|
||||
// Remove it from the directory entry
|
||||
let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
|
||||
let newdirbuf = if self.tline.pg_version >= 17 {
|
||||
@@ -2308,7 +2292,8 @@ impl DatadirModification<'_> {
|
||||
));
|
||||
Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
|
||||
} else {
|
||||
let xid: u32 = u32::try_from(xid)?;
|
||||
let xid: u32 = u32::try_from(xid)
|
||||
.map_err(|e| WalIngestErrorKind::LogicalError(anyhow::Error::from(e)))?;
|
||||
let mut dir = TwoPhaseDirectory::des(&buf)?;
|
||||
|
||||
if !dir.xids.remove(&xid) {
|
||||
@@ -2333,7 +2318,7 @@ impl DatadirModification<'_> {
|
||||
path: &str,
|
||||
content: &[u8],
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), WalIngestError> {
|
||||
let key = aux_file::encode_aux_file_key(path);
|
||||
// retrieve the key from the engine
|
||||
let old_val = match self.get(key, ctx).await {
|
||||
@@ -2342,7 +2327,7 @@ impl DatadirModification<'_> {
|
||||
Err(e) => return Err(e.into()),
|
||||
};
|
||||
let files: Vec<(&str, &[u8])> = if let Some(ref old_val) = old_val {
|
||||
aux_file::decode_file_value(old_val)?
|
||||
aux_file::decode_file_value(old_val).map_err(WalIngestErrorKind::EncodeAuxFileError)?
|
||||
} else {
|
||||
Vec::new()
|
||||
};
|
||||
@@ -2387,7 +2372,8 @@ impl DatadirModification<'_> {
|
||||
}
|
||||
(None, true) => warn!("removing non-existing aux file: {}", path),
|
||||
}
|
||||
let new_val = aux_file::encode_file_value(&new_files)?;
|
||||
let new_val = aux_file::encode_file_value(&new_files)
|
||||
.map_err(WalIngestErrorKind::EncodeAuxFileError)?;
|
||||
self.put(key, Value::Image(new_val.into()));
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -22,6 +22,7 @@ use bytes::{BufMut, BytesMut};
|
||||
use pageserver_api::models::ImageCompressionAlgorithm;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::context::RequestContext;
|
||||
@@ -169,7 +170,13 @@ pub struct BlobWriter<const BUFFERED: bool> {
|
||||
}
|
||||
|
||||
impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
||||
pub fn new(inner: VirtualFile, start_offset: u64) -> Self {
|
||||
pub fn new(
|
||||
inner: VirtualFile,
|
||||
start_offset: u64,
|
||||
_gate: &utils::sync::gate::Gate,
|
||||
_cancel: CancellationToken,
|
||||
_ctx: &RequestContext,
|
||||
) -> Self {
|
||||
Self {
|
||||
inner,
|
||||
offset: start_offset,
|
||||
@@ -432,12 +439,14 @@ pub(crate) mod tests {
|
||||
) -> Result<(Utf8TempDir, Utf8PathBuf, Vec<u64>), Error> {
|
||||
let temp_dir = camino_tempfile::tempdir()?;
|
||||
let pathbuf = temp_dir.path().join("file");
|
||||
let gate = utils::sync::gate::Gate::default();
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
// Write part (in block to drop the file)
|
||||
let mut offsets = Vec::new();
|
||||
{
|
||||
let file = VirtualFile::create(pathbuf.as_path(), ctx).await?;
|
||||
let mut wtr = BlobWriter::<BUFFERED>::new(file, 0);
|
||||
let mut wtr = BlobWriter::<BUFFERED>::new(file, 0, &gate, cancel.clone(), ctx);
|
||||
for blob in blobs.iter() {
|
||||
let (_, res) = if compression {
|
||||
let res = wtr
|
||||
|
||||
@@ -28,6 +28,11 @@ pub struct EphemeralFile {
|
||||
_timeline_id: TimelineId,
|
||||
page_cache_file_id: page_cache::FileId,
|
||||
bytes_written: u64,
|
||||
// Always Some except during Drop
|
||||
inner: Option<Inner>,
|
||||
}
|
||||
|
||||
struct Inner {
|
||||
buffered_writer: owned_buffers_io::write::BufferedWriter<IoBufferMut, VirtualFile>,
|
||||
/// Gate guard is held on as long as we need to do operations in the path (delete on drop)
|
||||
_gate_guard: utils::sync::gate::GateGuard,
|
||||
@@ -44,9 +49,9 @@ impl EphemeralFile {
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<EphemeralFile> {
|
||||
static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1);
|
||||
static NEXT_TEMP_DISAMBIGUATOR: AtomicU64 = AtomicU64::new(1);
|
||||
let filename_disambiguator =
|
||||
NEXT_FILENAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
NEXT_TEMP_DISAMBIGUATOR.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
|
||||
let filename = conf
|
||||
.timeline_path(&tenant_shard_id, &timeline_id)
|
||||
@@ -73,34 +78,68 @@ impl EphemeralFile {
|
||||
_timeline_id: timeline_id,
|
||||
page_cache_file_id,
|
||||
bytes_written: 0,
|
||||
buffered_writer: owned_buffers_io::write::BufferedWriter::new(
|
||||
file,
|
||||
|| IoBufferMut::with_capacity(TAIL_SZ),
|
||||
gate.enter()?,
|
||||
cancel.child_token(),
|
||||
ctx,
|
||||
info_span!(parent: None, "ephemeral_file_buffered_writer", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %filename),
|
||||
),
|
||||
_gate_guard: gate.enter()?,
|
||||
inner: Some(Inner {
|
||||
buffered_writer: owned_buffers_io::write::BufferedWriter::new(
|
||||
file,
|
||||
|| IoBufferMut::with_capacity(TAIL_SZ),
|
||||
gate.enter()?,
|
||||
cancel.child_token(),
|
||||
ctx,
|
||||
info_span!(parent: None, "ephemeral_file_buffered_writer", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %filename),
|
||||
),
|
||||
_gate_guard: gate.enter()?,
|
||||
}),
|
||||
})
|
||||
}
|
||||
|
||||
fn buffered_writer(
|
||||
&self,
|
||||
) -> &owned_buffers_io::write::BufferedWriter<IoBufferMut, VirtualFile> {
|
||||
&self
|
||||
.inner
|
||||
.as_ref()
|
||||
.expect("we never take out except during drop")
|
||||
.buffered_writer
|
||||
}
|
||||
fn buffered_writer_mut(
|
||||
&mut self,
|
||||
) -> &mut owned_buffers_io::write::BufferedWriter<IoBufferMut, VirtualFile> {
|
||||
&mut self
|
||||
.inner
|
||||
.as_mut()
|
||||
.expect("we never take out except during drop")
|
||||
.buffered_writer
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for EphemeralFile {
|
||||
fn drop(&mut self) {
|
||||
// unlink the file
|
||||
// we are clear to do this, because we have entered a gate
|
||||
let path = self.buffered_writer.as_inner().path();
|
||||
let res = std::fs::remove_file(path);
|
||||
if let Err(e) = res {
|
||||
if e.kind() != std::io::ErrorKind::NotFound {
|
||||
// just never log the not found errors, we cannot do anything for them; on detach
|
||||
// the tenant directory is already gone.
|
||||
//
|
||||
// not found files might also be related to https://github.com/neondatabase/neon/issues/2442
|
||||
error!("could not remove ephemeral file '{path}': {e}");
|
||||
let inner = self.inner.take().expect("we never take out except here");
|
||||
|
||||
tokio::spawn(async move {
|
||||
let Inner {
|
||||
buffered_writer,
|
||||
_gate_guard,
|
||||
} = inner;
|
||||
|
||||
// XXX kinda ugly that we have this Arc here, would like to call VirtualFile::remove()
|
||||
let virtual_file: Arc<VirtualFile> = buffered_writer.into_inner_no_flush().await;
|
||||
let path = virtual_file.path();
|
||||
let res = std::fs::remove_file(path);
|
||||
if let Err(e) = res {
|
||||
if e.kind() != std::io::ErrorKind::NotFound {
|
||||
// TODO: can we retry?
|
||||
|
||||
// just never log the not found errors, we cannot do anything for them; on detach
|
||||
// the tenant directory is already gone.
|
||||
//
|
||||
// not found files might also be related to https://github.com/neondatabase/neon/issues/2442
|
||||
error!("could not remove ephemeral file '{path}': {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
drop(_gate_guard);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -168,7 +207,7 @@ impl EphemeralFile {
|
||||
|
||||
// Write the payload
|
||||
let (nwritten, control) = self
|
||||
.buffered_writer
|
||||
.buffered_writer_mut()
|
||||
.write_buffered_borrowed_controlled(srcbuf, ctx)
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
@@ -193,9 +232,9 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
|
||||
dst: tokio_epoll_uring::Slice<B>,
|
||||
ctx: &RequestContext,
|
||||
) -> std::io::Result<(tokio_epoll_uring::Slice<B>, usize)> {
|
||||
let submitted_offset = self.buffered_writer.bytes_submitted();
|
||||
let submitted_offset = self.buffered_writer().bytes_submitted();
|
||||
|
||||
let mutable = match self.buffered_writer.inspect_mutable() {
|
||||
let mutable = match self.buffered_writer().inspect_mutable() {
|
||||
Some(mutable) => &mutable[0..mutable.pending()],
|
||||
None => {
|
||||
// Timeline::cancel and hence buffered writer flush was cancelled.
|
||||
@@ -204,7 +243,7 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
|
||||
}
|
||||
};
|
||||
|
||||
let maybe_flushed = self.buffered_writer.inspect_maybe_flushed();
|
||||
let maybe_flushed = self.buffered_writer().inspect_maybe_flushed();
|
||||
|
||||
let dst_cap = dst.bytes_total().into_u64();
|
||||
let end = {
|
||||
@@ -262,7 +301,7 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
|
||||
let mutable_range = Range(std::cmp::max(start, submitted_offset), end);
|
||||
|
||||
let dst = if written_range.len() > 0 {
|
||||
let file: &VirtualFile = self.buffered_writer.as_inner();
|
||||
let file: &VirtualFile = self.buffered_writer().as_inner();
|
||||
let bounds = dst.bounds();
|
||||
let slice = file
|
||||
.read_exact_at(dst.slice(0..written_range.len().into_usize()), start, ctx)
|
||||
@@ -419,7 +458,7 @@ mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mutable = file.buffered_writer.mutable();
|
||||
let mutable = file.buffered_writer().mutable();
|
||||
let cap = mutable.capacity();
|
||||
let align = mutable.align();
|
||||
|
||||
@@ -456,13 +495,13 @@ mod tests {
|
||||
assert_eq!(&buf, &content[range]);
|
||||
}
|
||||
|
||||
let file_contents = std::fs::read(file.buffered_writer.as_inner().path()).unwrap();
|
||||
let file_contents = std::fs::read(file.buffered_writer().as_inner().path()).unwrap();
|
||||
assert!(file_contents == content[0..cap * 2]);
|
||||
|
||||
let maybe_flushed_buffer_contents = file.buffered_writer.inspect_maybe_flushed().unwrap();
|
||||
let maybe_flushed_buffer_contents = file.buffered_writer().inspect_maybe_flushed().unwrap();
|
||||
assert_eq!(&maybe_flushed_buffer_contents[..], &content[cap..cap * 2]);
|
||||
|
||||
let mutable_buffer_contents = file.buffered_writer.mutable();
|
||||
let mutable_buffer_contents = file.buffered_writer().mutable();
|
||||
assert_eq!(mutable_buffer_contents, &content[cap * 2..write_nbytes]);
|
||||
}
|
||||
|
||||
@@ -477,7 +516,7 @@ mod tests {
|
||||
.unwrap();
|
||||
|
||||
// mutable buffer and maybe_flushed buffer each has `cap` bytes.
|
||||
let cap = file.buffered_writer.mutable().capacity();
|
||||
let cap = file.buffered_writer().mutable().capacity();
|
||||
|
||||
let content: Vec<u8> = rand::thread_rng()
|
||||
.sample_iter(rand::distributions::Standard)
|
||||
@@ -489,18 +528,18 @@ mod tests {
|
||||
// assert the state is as this test expects it to be
|
||||
let load_io_buf_res = file.load_to_io_buf(&ctx).await.unwrap();
|
||||
assert_eq!(&load_io_buf_res[..], &content[0..cap * 2 + cap / 2]);
|
||||
let md = file.buffered_writer.as_inner().path().metadata().unwrap();
|
||||
let md = file.buffered_writer().as_inner().path().metadata().unwrap();
|
||||
assert_eq!(
|
||||
md.len(),
|
||||
2 * cap.into_u64(),
|
||||
"buffered writer requires one write to be flushed if we write 2.5x buffer capacity"
|
||||
);
|
||||
assert_eq!(
|
||||
&file.buffered_writer.inspect_maybe_flushed().unwrap()[0..cap],
|
||||
&file.buffered_writer().inspect_maybe_flushed().unwrap()[0..cap],
|
||||
&content[cap..cap * 2]
|
||||
);
|
||||
assert_eq!(
|
||||
&file.buffered_writer.mutable()[0..cap / 2],
|
||||
&file.buffered_writer().mutable()[0..cap / 2],
|
||||
&content[cap * 2..cap * 2 + cap / 2]
|
||||
);
|
||||
}
|
||||
@@ -522,7 +561,7 @@ mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mutable = file.buffered_writer.mutable();
|
||||
let mutable = file.buffered_writer().mutable();
|
||||
let cap = mutable.capacity();
|
||||
let align = mutable.align();
|
||||
let content: Vec<u8> = rand::thread_rng()
|
||||
|
||||
@@ -5,6 +5,7 @@ use std::sync::Arc;
|
||||
use bytes::Bytes;
|
||||
use pageserver_api::key::{KEY_SIZE, Key};
|
||||
use pageserver_api::value::Value;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::id::TimelineId;
|
||||
use utils::lsn::Lsn;
|
||||
use utils::shard::TenantShardId;
|
||||
@@ -179,7 +180,7 @@ impl BatchLayerWriter {
|
||||
|
||||
/// An image writer that takes images and produces multiple image layers.
|
||||
#[must_use]
|
||||
pub struct SplitImageLayerWriter {
|
||||
pub struct SplitImageLayerWriter<'a> {
|
||||
inner: ImageLayerWriter,
|
||||
target_layer_size: u64,
|
||||
lsn: Lsn,
|
||||
@@ -188,9 +189,12 @@ pub struct SplitImageLayerWriter {
|
||||
tenant_shard_id: TenantShardId,
|
||||
batches: BatchLayerWriter,
|
||||
start_key: Key,
|
||||
gate: &'a utils::sync::gate::Gate,
|
||||
cancel: CancellationToken,
|
||||
}
|
||||
|
||||
impl SplitImageLayerWriter {
|
||||
impl<'a> SplitImageLayerWriter<'a> {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn new(
|
||||
conf: &'static PageServerConf,
|
||||
timeline_id: TimelineId,
|
||||
@@ -198,6 +202,8 @@ impl SplitImageLayerWriter {
|
||||
start_key: Key,
|
||||
lsn: Lsn,
|
||||
target_layer_size: u64,
|
||||
gate: &'a utils::sync::gate::Gate,
|
||||
cancel: CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Self> {
|
||||
Ok(Self {
|
||||
@@ -208,6 +214,8 @@ impl SplitImageLayerWriter {
|
||||
tenant_shard_id,
|
||||
&(start_key..Key::MAX),
|
||||
lsn,
|
||||
gate,
|
||||
cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?,
|
||||
@@ -217,6 +225,8 @@ impl SplitImageLayerWriter {
|
||||
batches: BatchLayerWriter::new(conf).await?,
|
||||
lsn,
|
||||
start_key,
|
||||
gate,
|
||||
cancel,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -239,6 +249,8 @@ impl SplitImageLayerWriter {
|
||||
self.tenant_shard_id,
|
||||
&(key..Key::MAX),
|
||||
self.lsn,
|
||||
self.gate,
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -291,7 +303,7 @@ impl SplitImageLayerWriter {
|
||||
/// into a single file. This behavior might change in the future. For reference, the legacy compaction algorithm
|
||||
/// will split them into multiple files based on size.
|
||||
#[must_use]
|
||||
pub struct SplitDeltaLayerWriter {
|
||||
pub struct SplitDeltaLayerWriter<'a> {
|
||||
inner: Option<(Key, DeltaLayerWriter)>,
|
||||
target_layer_size: u64,
|
||||
conf: &'static PageServerConf,
|
||||
@@ -300,15 +312,19 @@ pub struct SplitDeltaLayerWriter {
|
||||
lsn_range: Range<Lsn>,
|
||||
last_key_written: Key,
|
||||
batches: BatchLayerWriter,
|
||||
gate: &'a utils::sync::gate::Gate,
|
||||
cancel: CancellationToken,
|
||||
}
|
||||
|
||||
impl SplitDeltaLayerWriter {
|
||||
impl<'a> SplitDeltaLayerWriter<'a> {
|
||||
pub async fn new(
|
||||
conf: &'static PageServerConf,
|
||||
timeline_id: TimelineId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
lsn_range: Range<Lsn>,
|
||||
target_layer_size: u64,
|
||||
gate: &'a utils::sync::gate::Gate,
|
||||
cancel: CancellationToken,
|
||||
) -> anyhow::Result<Self> {
|
||||
Ok(Self {
|
||||
target_layer_size,
|
||||
@@ -319,6 +335,8 @@ impl SplitDeltaLayerWriter {
|
||||
lsn_range,
|
||||
last_key_written: Key::MIN,
|
||||
batches: BatchLayerWriter::new(conf).await?,
|
||||
gate,
|
||||
cancel,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -344,6 +362,8 @@ impl SplitDeltaLayerWriter {
|
||||
self.tenant_shard_id,
|
||||
key,
|
||||
self.lsn_range.clone(),
|
||||
self.gate,
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?,
|
||||
@@ -362,6 +382,8 @@ impl SplitDeltaLayerWriter {
|
||||
self.tenant_shard_id,
|
||||
key,
|
||||
self.lsn_range.clone(),
|
||||
self.gate,
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -469,6 +491,8 @@ mod tests {
|
||||
get_key(0),
|
||||
Lsn(0x18),
|
||||
4 * 1024 * 1024,
|
||||
&tline.gate,
|
||||
tline.cancel.clone(),
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
@@ -480,6 +504,8 @@ mod tests {
|
||||
tenant.tenant_shard_id,
|
||||
Lsn(0x18)..Lsn(0x20),
|
||||
4 * 1024 * 1024,
|
||||
&tline.gate,
|
||||
tline.cancel.clone(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -546,6 +572,8 @@ mod tests {
|
||||
get_key(0),
|
||||
Lsn(0x18),
|
||||
4 * 1024 * 1024,
|
||||
&tline.gate,
|
||||
tline.cancel.clone(),
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
@@ -556,6 +584,8 @@ mod tests {
|
||||
tenant.tenant_shard_id,
|
||||
Lsn(0x18)..Lsn(0x20),
|
||||
4 * 1024 * 1024,
|
||||
&tline.gate,
|
||||
tline.cancel.clone(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -643,6 +673,8 @@ mod tests {
|
||||
get_key(0),
|
||||
Lsn(0x18),
|
||||
4 * 1024,
|
||||
&tline.gate,
|
||||
tline.cancel.clone(),
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
@@ -654,6 +686,8 @@ mod tests {
|
||||
tenant.tenant_shard_id,
|
||||
Lsn(0x18)..Lsn(0x20),
|
||||
4 * 1024,
|
||||
&tline.gate,
|
||||
tline.cancel.clone(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -730,6 +764,8 @@ mod tests {
|
||||
tenant.tenant_shard_id,
|
||||
Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
|
||||
4 * 1024 * 1024,
|
||||
&tline.gate,
|
||||
tline.cancel.clone(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@@ -34,6 +34,7 @@ use std::ops::Range;
|
||||
use std::os::unix::fs::FileExt;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
|
||||
use anyhow::{Context, Result, bail, ensure};
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
@@ -45,11 +46,10 @@ use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::ImageCompressionAlgorithm;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_api::value::Value;
|
||||
use rand::Rng;
|
||||
use rand::distributions::Alphanumeric;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::sync::OnceCell;
|
||||
use tokio_epoll_uring::IoBuf;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::bin_ser::BeSer;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
@@ -287,19 +287,19 @@ impl DeltaLayer {
|
||||
key_start: Key,
|
||||
lsn_range: &Range<Lsn>,
|
||||
) -> Utf8PathBuf {
|
||||
let rand_string: String = rand::thread_rng()
|
||||
.sample_iter(&Alphanumeric)
|
||||
.take(8)
|
||||
.map(char::from)
|
||||
.collect();
|
||||
// Never reuse a filename in the lifetime of a pageserver process so that we need
|
||||
// not worry about laggard Drop impl's async unlink hitting an already reused filename.
|
||||
static NEXT_TEMP_DISAMBIGUATOR: AtomicU64 = AtomicU64::new(1);
|
||||
let filename_disambiguator =
|
||||
NEXT_TEMP_DISAMBIGUATOR.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
|
||||
conf.timeline_path(tenant_shard_id, timeline_id)
|
||||
.join(format!(
|
||||
"{}-XXX__{:016X}-{:016X}.{}.{}",
|
||||
"{}-XXX__{:016X}-{:016X}.{:x}.{}",
|
||||
key_start,
|
||||
u64::from(lsn_range.start),
|
||||
u64::from(lsn_range.end),
|
||||
rand_string,
|
||||
filename_disambiguator,
|
||||
TEMP_FILE_SUFFIX,
|
||||
))
|
||||
}
|
||||
@@ -394,18 +394,23 @@ struct DeltaLayerWriterInner {
|
||||
|
||||
// Number of key-lsns in the layer.
|
||||
num_keys: usize,
|
||||
|
||||
_gate_guard: utils::sync::gate::GateGuard,
|
||||
}
|
||||
|
||||
impl DeltaLayerWriterInner {
|
||||
///
|
||||
/// Start building a new delta layer.
|
||||
///
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn new(
|
||||
conf: &'static PageServerConf,
|
||||
timeline_id: TimelineId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
key_start: Key,
|
||||
lsn_range: Range<Lsn>,
|
||||
gate: &utils::sync::gate::Gate,
|
||||
cancel: CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Self> {
|
||||
// Create the file initially with a temporary filename. We don't know
|
||||
@@ -420,7 +425,7 @@ impl DeltaLayerWriterInner {
|
||||
let mut file = VirtualFile::create(&path, ctx).await?;
|
||||
// make room for the header block
|
||||
file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
|
||||
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
|
||||
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64, gate, cancel, ctx);
|
||||
|
||||
// Initialize the b-tree index builder
|
||||
let block_buf = BlockBuf::new();
|
||||
@@ -435,6 +440,7 @@ impl DeltaLayerWriterInner {
|
||||
tree: tree_builder,
|
||||
blob_writer,
|
||||
num_keys: 0,
|
||||
_gate_guard: gate.enter()?,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -628,12 +634,15 @@ impl DeltaLayerWriter {
|
||||
///
|
||||
/// Start building a new delta layer.
|
||||
///
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn new(
|
||||
conf: &'static PageServerConf,
|
||||
timeline_id: TimelineId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
key_start: Key,
|
||||
lsn_range: Range<Lsn>,
|
||||
gate: &utils::sync::gate::Gate,
|
||||
cancel: CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Self> {
|
||||
Ok(Self {
|
||||
@@ -644,6 +653,8 @@ impl DeltaLayerWriter {
|
||||
tenant_shard_id,
|
||||
key_start,
|
||||
lsn_range,
|
||||
gate,
|
||||
cancel,
|
||||
ctx,
|
||||
)
|
||||
.await?,
|
||||
@@ -719,12 +730,22 @@ impl DeltaLayerWriter {
|
||||
|
||||
impl Drop for DeltaLayerWriter {
|
||||
fn drop(&mut self) {
|
||||
if let Some(inner) = self.inner.take() {
|
||||
// We want to remove the virtual file here, so it's fine to not
|
||||
// having completely flushed unwritten data.
|
||||
let vfile = inner.blob_writer.into_inner_no_flush();
|
||||
let Some(inner) = self.inner.take() else {
|
||||
return;
|
||||
};
|
||||
|
||||
tokio::spawn(async move {
|
||||
let DeltaLayerWriterInner {
|
||||
blob_writer,
|
||||
_gate_guard,
|
||||
..
|
||||
} = inner;
|
||||
|
||||
let vfile = blob_writer.into_inner_no_flush();
|
||||
vfile.remove();
|
||||
}
|
||||
|
||||
drop(_gate_guard);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1600,8 +1621,8 @@ pub(crate) mod test {
|
||||
use bytes::Bytes;
|
||||
use itertools::MinMaxResult;
|
||||
use pageserver_api::value::Value;
|
||||
use rand::RngCore;
|
||||
use rand::prelude::{SeedableRng, SliceRandom, StdRng};
|
||||
use rand::{Rng, RngCore};
|
||||
|
||||
use super::*;
|
||||
use crate::DEFAULT_PG_VERSION;
|
||||
@@ -1885,6 +1906,8 @@ pub(crate) mod test {
|
||||
harness.tenant_shard_id,
|
||||
entries_meta.key_range.start,
|
||||
entries_meta.lsn_range.clone(),
|
||||
&timeline.gate,
|
||||
timeline.cancel.clone(),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -2079,6 +2102,8 @@ pub(crate) mod test {
|
||||
tenant.tenant_shard_id,
|
||||
Key::MIN,
|
||||
Lsn(0x11)..truncate_at,
|
||||
&branch.gate,
|
||||
branch.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
@@ -2213,6 +2238,8 @@ pub(crate) mod test {
|
||||
tenant.tenant_shard_id,
|
||||
*key_start,
|
||||
(*lsn_min)..lsn_end,
|
||||
&tline.gate,
|
||||
tline.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -32,6 +32,7 @@ use std::ops::Range;
|
||||
use std::os::unix::prelude::FileExt;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
|
||||
use anyhow::{Context, Result, bail, ensure};
|
||||
use bytes::Bytes;
|
||||
@@ -43,11 +44,10 @@ use pageserver_api::key::{DBDIR_KEY, KEY_SIZE, Key};
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::shard::{ShardIdentity, TenantShardId};
|
||||
use pageserver_api::value::Value;
|
||||
use rand::Rng;
|
||||
use rand::distributions::Alphanumeric;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::sync::OnceCell;
|
||||
use tokio_stream::StreamExt;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::bin_ser::BeSer;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
@@ -251,14 +251,17 @@ impl ImageLayer {
|
||||
tenant_shard_id: TenantShardId,
|
||||
fname: &ImageLayerName,
|
||||
) -> Utf8PathBuf {
|
||||
let rand_string: String = rand::thread_rng()
|
||||
.sample_iter(&Alphanumeric)
|
||||
.take(8)
|
||||
.map(char::from)
|
||||
.collect();
|
||||
// Never reuse a filename in the lifetime of a pageserver process so that we need
|
||||
// not worry about laggard Drop impl's async unlink hitting an already reused filename.
|
||||
static NEXT_TEMP_DISAMBIGUATOR: AtomicU64 = AtomicU64::new(1);
|
||||
let filename_disambiguator =
|
||||
NEXT_TEMP_DISAMBIGUATOR.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
|
||||
conf.timeline_path(&tenant_shard_id, &timeline_id)
|
||||
.join(format!("{fname}.{rand_string}.{TEMP_FILE_SUFFIX}"))
|
||||
.join(format!(
|
||||
"{fname}.{:x}.{TEMP_FILE_SUFFIX}",
|
||||
filename_disambiguator
|
||||
))
|
||||
}
|
||||
|
||||
///
|
||||
@@ -742,18 +745,23 @@ struct ImageLayerWriterInner {
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
last_written_key: Key,
|
||||
|
||||
_gate_guard: utils::sync::gate::GateGuard,
|
||||
}
|
||||
|
||||
impl ImageLayerWriterInner {
|
||||
///
|
||||
/// Start building a new image layer.
|
||||
///
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn new(
|
||||
conf: &'static PageServerConf,
|
||||
timeline_id: TimelineId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
key_range: &Range<Key>,
|
||||
lsn: Lsn,
|
||||
gate: &utils::sync::gate::Gate,
|
||||
cancel: CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Self> {
|
||||
// Create the file initially with a temporary filename.
|
||||
@@ -780,7 +788,7 @@ impl ImageLayerWriterInner {
|
||||
};
|
||||
// make room for the header block
|
||||
file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
|
||||
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
|
||||
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64, gate, cancel, ctx);
|
||||
|
||||
// Initialize the b-tree index builder
|
||||
let block_buf = BlockBuf::new();
|
||||
@@ -801,6 +809,7 @@ impl ImageLayerWriterInner {
|
||||
num_keys: 0,
|
||||
#[cfg(feature = "testing")]
|
||||
last_written_key: Key::MIN,
|
||||
_gate_guard: gate.enter()?,
|
||||
};
|
||||
|
||||
Ok(writer)
|
||||
@@ -988,18 +997,30 @@ impl ImageLayerWriter {
|
||||
///
|
||||
/// Start building a new image layer.
|
||||
///
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn new(
|
||||
conf: &'static PageServerConf,
|
||||
timeline_id: TimelineId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
key_range: &Range<Key>,
|
||||
lsn: Lsn,
|
||||
gate: &utils::sync::gate::Gate,
|
||||
cancel: CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<ImageLayerWriter> {
|
||||
Ok(Self {
|
||||
inner: Some(
|
||||
ImageLayerWriterInner::new(conf, timeline_id, tenant_shard_id, key_range, lsn, ctx)
|
||||
.await?,
|
||||
ImageLayerWriterInner::new(
|
||||
conf,
|
||||
timeline_id,
|
||||
tenant_shard_id,
|
||||
key_range,
|
||||
lsn,
|
||||
gate,
|
||||
cancel,
|
||||
ctx,
|
||||
)
|
||||
.await?,
|
||||
),
|
||||
})
|
||||
}
|
||||
@@ -1050,9 +1071,22 @@ impl ImageLayerWriter {
|
||||
|
||||
impl Drop for ImageLayerWriter {
|
||||
fn drop(&mut self) {
|
||||
if let Some(inner) = self.inner.take() {
|
||||
inner.blob_writer.into_inner().remove();
|
||||
}
|
||||
let Some(inner) = self.inner.take() else {
|
||||
return;
|
||||
};
|
||||
|
||||
tokio::spawn(async move {
|
||||
let ImageLayerWriterInner {
|
||||
blob_writer,
|
||||
_gate_guard,
|
||||
..
|
||||
} = inner;
|
||||
|
||||
let vfile = blob_writer.into_inner();
|
||||
vfile.remove();
|
||||
|
||||
drop(_gate_guard);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1203,6 +1237,8 @@ mod test {
|
||||
harness.tenant_shard_id,
|
||||
&range,
|
||||
lsn,
|
||||
&timeline.gate,
|
||||
timeline.cancel.clone(),
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
@@ -1268,6 +1304,8 @@ mod test {
|
||||
harness.tenant_shard_id,
|
||||
&range,
|
||||
lsn,
|
||||
&timeline.gate,
|
||||
timeline.cancel.clone(),
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
@@ -1346,6 +1384,8 @@ mod test {
|
||||
tenant.tenant_shard_id,
|
||||
&key_range,
|
||||
lsn,
|
||||
&tline.gate,
|
||||
tline.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -719,6 +719,8 @@ impl InMemoryLayer {
|
||||
ctx: &RequestContext,
|
||||
key_range: Option<Range<Key>>,
|
||||
l0_flush_global_state: &l0_flush::Inner,
|
||||
gate: &utils::sync::gate::Gate,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<Option<(PersistentLayerDesc, Utf8PathBuf)>> {
|
||||
// Grab the lock in read-mode. We hold it over the I/O, but because this
|
||||
// layer is not writeable anymore, no one should be trying to acquire the
|
||||
@@ -759,6 +761,8 @@ impl InMemoryLayer {
|
||||
self.tenant_shard_id,
|
||||
Key::MIN,
|
||||
self.start_lsn..end_lsn,
|
||||
gate,
|
||||
cancel,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -4805,7 +4805,13 @@ impl Timeline {
|
||||
let ctx = ctx.attached_child();
|
||||
let work = async move {
|
||||
let Some((desc, path)) = frozen_layer
|
||||
.write_to_disk(&ctx, key_range, self_clone.l0_flush_global_state.inner())
|
||||
.write_to_disk(
|
||||
&ctx,
|
||||
key_range,
|
||||
self_clone.l0_flush_global_state.inner(),
|
||||
&self_clone.gate,
|
||||
self_clone.cancel.clone(),
|
||||
)
|
||||
.await?
|
||||
else {
|
||||
return Ok(None);
|
||||
@@ -5343,6 +5349,8 @@ impl Timeline {
|
||||
self.tenant_shard_id,
|
||||
&img_range,
|
||||
lsn,
|
||||
&self.gate,
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -6707,6 +6715,8 @@ impl Timeline {
|
||||
self.tenant_shard_id,
|
||||
&(min_key..end_key),
|
||||
lsn,
|
||||
&self.gate,
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -6768,6 +6778,8 @@ impl Timeline {
|
||||
self.tenant_shard_id,
|
||||
deltas.key_range.start,
|
||||
deltas.lsn_range,
|
||||
&self.gate,
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -747,8 +747,8 @@ impl KeyHistoryRetention {
|
||||
async fn pipe_to(
|
||||
self,
|
||||
key: Key,
|
||||
delta_writer: &mut SplitDeltaLayerWriter,
|
||||
mut image_writer: Option<&mut SplitImageLayerWriter>,
|
||||
delta_writer: &mut SplitDeltaLayerWriter<'_>,
|
||||
mut image_writer: Option<&mut SplitImageLayerWriter<'_>>,
|
||||
stat: &mut CompactionStatistics,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
@@ -1140,6 +1140,7 @@ impl Timeline {
|
||||
) -> Result<(), CompactionError> {
|
||||
let mut drop_layers = Vec::new();
|
||||
let mut layers_to_rewrite: Vec<Layer> = Vec::new();
|
||||
let mut rewrite_max_exceeded: bool = false;
|
||||
|
||||
// We will use the Lsn cutoff of the last GC as a threshold for rewriting layers: if a
|
||||
// layer is behind this Lsn, it indicates that the layer is being retained beyond the
|
||||
@@ -1148,12 +1149,7 @@ impl Timeline {
|
||||
// Holding this read guard also blocks [`Self::gc_timeline`] from entering while we
|
||||
// are rewriting layers.
|
||||
let latest_gc_cutoff = self.get_applied_gc_cutoff_lsn();
|
||||
|
||||
tracing::info!(
|
||||
"starting shard ancestor compaction, latest_gc_cutoff: {}, pitr cutoff {}",
|
||||
*latest_gc_cutoff,
|
||||
self.gc_info.read().unwrap().cutoffs.time
|
||||
);
|
||||
let pitr_cutoff = self.gc_info.read().unwrap().cutoffs.time;
|
||||
|
||||
let layers = self.layers.read().await;
|
||||
for layer_desc in layers.layer_map()?.iter_historic_layers() {
|
||||
@@ -1171,8 +1167,8 @@ impl Timeline {
|
||||
// This ancestral layer only covers keys that belong to other shards.
|
||||
// We include the full metadata in the log: if we had some critical bug that caused
|
||||
// us to incorrectly drop layers, this would simplify manually debugging + reinstating those layers.
|
||||
info!(%layer, old_metadata=?layer.metadata(),
|
||||
"dropping layer after shard split, contains no keys for this shard.",
|
||||
debug!(%layer, old_metadata=?layer.metadata(),
|
||||
"dropping layer after shard split, contains no keys for this shard",
|
||||
);
|
||||
|
||||
if cfg!(debug_assertions) {
|
||||
@@ -1234,9 +1230,10 @@ impl Timeline {
|
||||
}
|
||||
|
||||
if layers_to_rewrite.len() >= rewrite_max {
|
||||
tracing::info!(%layer, "Will rewrite layer on a future compaction, already rewrote {}",
|
||||
debug!(%layer, "Will rewrite layer on a future compaction, already rewrote {}",
|
||||
layers_to_rewrite.len()
|
||||
);
|
||||
rewrite_max_exceeded = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -1244,9 +1241,24 @@ impl Timeline {
|
||||
layers_to_rewrite.push(layer);
|
||||
}
|
||||
|
||||
// Drop read lock on layer map before we start doing time-consuming I/O
|
||||
// Drop read lock on layer map before we start doing time-consuming I/O.
|
||||
drop(layers);
|
||||
|
||||
// Drop out early if there's nothing to do.
|
||||
if layers_to_rewrite.is_empty() && drop_layers.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
info!(
|
||||
"starting shard ancestor compaction, rewriting {} layers and dropping {} layers \
|
||||
(latest_gc_cutoff={} pitr_cutoff={})",
|
||||
layers_to_rewrite.len(),
|
||||
drop_layers.len(),
|
||||
*latest_gc_cutoff,
|
||||
pitr_cutoff,
|
||||
);
|
||||
let started = Instant::now();
|
||||
|
||||
let mut replace_image_layers = Vec::new();
|
||||
|
||||
for layer in layers_to_rewrite {
|
||||
@@ -1254,13 +1266,15 @@ impl Timeline {
|
||||
return Err(CompactionError::ShuttingDown);
|
||||
}
|
||||
|
||||
tracing::info!(layer=%layer, "Rewriting layer after shard split...");
|
||||
info!(layer=%layer, "rewriting layer after shard split");
|
||||
let mut image_layer_writer = ImageLayerWriter::new(
|
||||
self.conf,
|
||||
self.timeline_id,
|
||||
self.tenant_shard_id,
|
||||
&layer.layer_desc().key_range,
|
||||
layer.layer_desc().image_layer_lsn(),
|
||||
&self.gate,
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
@@ -1292,7 +1306,7 @@ impl Timeline {
|
||||
.map_err(CompactionError::Other)?;
|
||||
let new_layer = Layer::finish_creating(self.conf, self, desc, &path)
|
||||
.map_err(CompactionError::Other)?;
|
||||
tracing::info!(layer=%new_layer, "Rewrote layer, {} -> {} bytes",
|
||||
info!(layer=%new_layer, "rewrote layer, {} -> {} bytes",
|
||||
layer.metadata().file_size,
|
||||
new_layer.metadata().file_size);
|
||||
|
||||
@@ -1304,6 +1318,12 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
for layer in &drop_layers {
|
||||
info!(%layer, old_metadata=?layer.metadata(),
|
||||
"dropping layer after shard split (no keys for this shard)",
|
||||
);
|
||||
}
|
||||
|
||||
// At this point, we have replaced local layer files with their rewritten form, but not yet uploaded
|
||||
// metadata to reflect that. If we restart here, the replaced layer files will look invalid (size mismatch
|
||||
// to remote index) and be removed. This is inefficient but safe.
|
||||
@@ -1319,6 +1339,7 @@ impl Timeline {
|
||||
// necessary for correctness, but it simplifies testing, and avoids proceeding with another
|
||||
// Timeline's compaction while this timeline's uploads may be generating lots of disk I/O
|
||||
// load.
|
||||
info!("shard ancestor compaction waiting for uploads");
|
||||
match self.remote_client.wait_completion().await {
|
||||
Ok(()) => (),
|
||||
Err(WaitCompletionError::NotInitialized(ni)) => return Err(CompactionError::from(ni)),
|
||||
@@ -1327,6 +1348,15 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
info!(
|
||||
"shard ancestor compaction done in {:.3}s{}",
|
||||
started.elapsed().as_secs_f64(),
|
||||
match rewrite_max_exceeded {
|
||||
true => format!(", more work pending due to rewrite_max={rewrite_max}"),
|
||||
false => String::new(),
|
||||
}
|
||||
);
|
||||
|
||||
fail::fail_point!("compact-shard-ancestors-persistent");
|
||||
|
||||
Ok(())
|
||||
@@ -1861,6 +1891,8 @@ impl Timeline {
|
||||
debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
|
||||
lsn_range.clone()
|
||||
},
|
||||
&self.gate,
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
@@ -3055,6 +3087,8 @@ impl Timeline {
|
||||
job_desc.compaction_key_range.start,
|
||||
lowest_retain_lsn,
|
||||
self.get_compaction_target_size(),
|
||||
&self.gate,
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
@@ -3071,6 +3105,8 @@ impl Timeline {
|
||||
self.tenant_shard_id,
|
||||
lowest_retain_lsn..end_lsn,
|
||||
self.get_compaction_target_size(),
|
||||
&self.gate,
|
||||
self.cancel.clone(),
|
||||
)
|
||||
.await
|
||||
.context("failed to create delta layer writer")
|
||||
@@ -3167,6 +3203,8 @@ impl Timeline {
|
||||
self.tenant_shard_id,
|
||||
desc.key_range.start,
|
||||
desc.lsn_range.clone(),
|
||||
&self.gate,
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
@@ -3184,6 +3222,8 @@ impl Timeline {
|
||||
self.tenant_shard_id,
|
||||
job_desc.compaction_key_range.end,
|
||||
desc.lsn_range.clone(),
|
||||
&self.gate,
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
@@ -3753,6 +3793,8 @@ impl CompactionJobExecutor for TimelineAdaptor {
|
||||
self.timeline.tenant_shard_id,
|
||||
key_range.start,
|
||||
lsn_range.clone(),
|
||||
&self.timeline.gate,
|
||||
self.timeline.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -3828,6 +3870,8 @@ impl TimelineAdaptor {
|
||||
self.timeline.tenant_shard_id,
|
||||
key_range,
|
||||
lsn,
|
||||
&self.timeline.gate,
|
||||
self.timeline.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -231,6 +231,8 @@ async fn generate_tombstone_image_layer(
|
||||
detached.tenant_shard_id,
|
||||
&key_range,
|
||||
image_lsn,
|
||||
&detached.gate,
|
||||
detached.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
@@ -779,6 +781,8 @@ async fn copy_lsn_prefix(
|
||||
target_timeline.tenant_shard_id,
|
||||
layer.layer_desc().key_range.start,
|
||||
layer.layer_desc().lsn_range.start..end_lsn,
|
||||
&target_timeline.gate,
|
||||
target_timeline.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
|
||||
@@ -738,6 +738,8 @@ impl ChunkProcessingJob {
|
||||
self.timeline.tenant_shard_id,
|
||||
&self.range,
|
||||
self.pgdata_lsn,
|
||||
&self.timeline.gate,
|
||||
self.timeline.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -580,6 +580,7 @@ impl ConnectionManagerState {
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
WalReceiverError::Cancelled => Ok(()),
|
||||
WalReceiverError::Other(e) => {
|
||||
// give out an error to have task_mgr give it a really verbose logging
|
||||
if cancellation.is_cancelled() {
|
||||
|
||||
@@ -73,6 +73,7 @@ pub(super) enum WalReceiverError {
|
||||
/// Generic error
|
||||
Other(anyhow::Error),
|
||||
ClosedGate,
|
||||
Cancelled,
|
||||
}
|
||||
|
||||
impl From<tokio_postgres::Error> for WalReceiverError {
|
||||
@@ -200,6 +201,9 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
// with a similar error.
|
||||
},
|
||||
WalReceiverError::SuccessfulCompletion(_) => {}
|
||||
WalReceiverError::Cancelled => {
|
||||
debug!("Connection cancelled")
|
||||
}
|
||||
WalReceiverError::ClosedGate => {
|
||||
// doesn't happen at runtime
|
||||
}
|
||||
@@ -273,7 +277,12 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
|
||||
let mut waldecoder = WalStreamDecoder::new(startpoint, timeline.pg_version);
|
||||
|
||||
let mut walingest = WalIngest::new(timeline.as_ref(), startpoint, &ctx).await?;
|
||||
let mut walingest = WalIngest::new(timeline.as_ref(), startpoint, &ctx)
|
||||
.await
|
||||
.map_err(|e| match e.kind {
|
||||
crate::walingest::WalIngestErrorKind::Cancelled => WalReceiverError::Cancelled,
|
||||
_ => WalReceiverError::Other(e.into()),
|
||||
})?;
|
||||
|
||||
let shard = vec![*timeline.get_shard_identity()];
|
||||
|
||||
|
||||
@@ -134,6 +134,20 @@ where
|
||||
Ok((bytes_amount, writer))
|
||||
}
|
||||
|
||||
pub async fn into_inner_no_flush(self) -> Arc<W> {
|
||||
let Self {
|
||||
mutable: buf,
|
||||
maybe_flushed: _,
|
||||
writer,
|
||||
mut flush_handle,
|
||||
bytes_submitted: _,
|
||||
} = self;
|
||||
// If the flush task panicked, that's fine.
|
||||
let _ = flush_handle.shutdown().await;
|
||||
assert!(buf.is_some());
|
||||
writer
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn mutable(&self) -> &B {
|
||||
self.mutable.as_ref().expect("must not use after an error")
|
||||
|
||||
@@ -20,6 +20,11 @@ pub struct FlushHandleInner<Buf, W> {
|
||||
/// A bi-directional channel that sends (buffer, offset) for writes,
|
||||
/// and receives recyled buffer.
|
||||
channel: duplex::mpsc::Duplex<FlushRequest<Buf>, FullSlice<Buf>>,
|
||||
/// The flush task is sometimes sensitive to channel disconnection
|
||||
/// (i.e. when we drop [`Self::channel`]), other times sensitive to
|
||||
/// [`FlushBackgroundTask::cancel`], but never both.
|
||||
/// So, also store this drop guard.
|
||||
set_flush_task_cancelled: tokio_util::sync::DropGuard,
|
||||
/// Join handle for the background flush task.
|
||||
join_handle: tokio::task::JoinHandle<Result<Arc<W>, FlushTaskError>>,
|
||||
}
|
||||
@@ -134,8 +139,10 @@ where
|
||||
back.try_send(buf.flush())
|
||||
.expect("we just created it with capacity 1");
|
||||
|
||||
let cancel = cancel.child_token();
|
||||
|
||||
let join_handle = tokio::spawn(
|
||||
FlushBackgroundTask::new(back, file, gate_guard, cancel, ctx)
|
||||
FlushBackgroundTask::new(back, file, gate_guard, cancel.clone(), ctx)
|
||||
.run()
|
||||
.instrument(span),
|
||||
);
|
||||
@@ -143,6 +150,7 @@ where
|
||||
FlushHandle {
|
||||
inner: Some(FlushHandleInner {
|
||||
channel: front,
|
||||
set_flush_task_cancelled: cancel.drop_guard(),
|
||||
join_handle,
|
||||
}),
|
||||
}
|
||||
@@ -189,6 +197,7 @@ where
|
||||
.take()
|
||||
.expect("must not use after we returned an error");
|
||||
drop(handle.channel.tx);
|
||||
drop(handle.set_flush_task_cancelled);
|
||||
handle.join_handle.await.unwrap()
|
||||
}
|
||||
|
||||
|
||||
@@ -21,13 +21,13 @@
|
||||
//! redo Postgres process, but some records it can handle directly with
|
||||
//! bespoken Rust code.
|
||||
|
||||
use std::backtrace::Backtrace;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::{Arc, OnceLock};
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
|
||||
use anyhow::{Result, bail};
|
||||
use bytes::{Buf, Bytes};
|
||||
use pageserver_api::key::rel_block_to_key;
|
||||
use pageserver_api::key::{Key, rel_block_to_key};
|
||||
use pageserver_api::record::NeonWalRecord;
|
||||
use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
|
||||
use pageserver_api::shard::ShardIdentity;
|
||||
@@ -38,7 +38,7 @@ use postgres_ffi::{
|
||||
fsm_logical_to_physical, pg_constants,
|
||||
};
|
||||
use tracing::*;
|
||||
use utils::bin_ser::SerializeError;
|
||||
use utils::bin_ser::{DeserializeError, SerializeError};
|
||||
use utils::lsn::Lsn;
|
||||
use utils::rate_limit::RateLimit;
|
||||
use utils::{critical, failpoint_support};
|
||||
@@ -104,12 +104,101 @@ struct WarnIngestLag {
|
||||
timestamp_invalid_msg_ratelimit: RateLimit,
|
||||
}
|
||||
|
||||
pub struct WalIngestError {
|
||||
pub backtrace: std::backtrace::Backtrace,
|
||||
pub kind: WalIngestErrorKind,
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum WalIngestErrorKind {
|
||||
#[error(transparent)]
|
||||
#[allow(private_interfaces)]
|
||||
PageReconstructError(#[from] PageReconstructError),
|
||||
#[error(transparent)]
|
||||
DeserializationFailure(#[from] DeserializeError),
|
||||
#[error(transparent)]
|
||||
SerializationFailure(#[from] SerializeError),
|
||||
#[error("the request contains data not supported by pageserver: {0} @ {1}")]
|
||||
InvalidKey(Key, Lsn),
|
||||
#[error("twophase file for xid {0} already exists")]
|
||||
FileAlreadyExists(u64),
|
||||
#[error("slru segment {0:?}/{1} already exists")]
|
||||
SlruAlreadyExists(SlruKind, u32),
|
||||
#[error("relation already exists")]
|
||||
RelationAlreadyExists(RelTag),
|
||||
#[error("invalid reldir key {0}")]
|
||||
InvalidRelDirKey(Key),
|
||||
|
||||
#[error(transparent)]
|
||||
LogicalError(anyhow::Error),
|
||||
#[error(transparent)]
|
||||
EncodeAuxFileError(anyhow::Error),
|
||||
#[error(transparent)]
|
||||
MaybeRelSizeV2Error(anyhow::Error),
|
||||
|
||||
#[error("timeline shutting down")]
|
||||
Cancelled,
|
||||
}
|
||||
|
||||
impl<T> From<T> for WalIngestError
|
||||
where
|
||||
WalIngestErrorKind: From<T>,
|
||||
{
|
||||
fn from(value: T) -> Self {
|
||||
WalIngestError {
|
||||
backtrace: Backtrace::capture(),
|
||||
kind: WalIngestErrorKind::from(value),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for WalIngestError {
|
||||
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
|
||||
self.kind.source()
|
||||
}
|
||||
}
|
||||
|
||||
impl core::fmt::Display for WalIngestError {
|
||||
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
|
||||
self.kind.fmt(f)
|
||||
}
|
||||
}
|
||||
|
||||
impl core::fmt::Debug for WalIngestError {
|
||||
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
|
||||
if f.alternate() {
|
||||
f.debug_map()
|
||||
.key(&"backtrace")
|
||||
.value(&self.backtrace)
|
||||
.key(&"kind")
|
||||
.value(&self.kind)
|
||||
.finish()
|
||||
} else {
|
||||
writeln!(f, "Error: {:?}", self.kind)?;
|
||||
if self.backtrace.status() == std::backtrace::BacktraceStatus::Captured {
|
||||
writeln!(f, "Stack backtrace: {:?}", self.backtrace)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! ensure_walingest {
|
||||
($($t:tt)*) => {
|
||||
_ = || -> Result<(), anyhow::Error> {
|
||||
anyhow::ensure!($($t)*);
|
||||
Ok(())
|
||||
}().map_err(WalIngestErrorKind::LogicalError)?;
|
||||
};
|
||||
}
|
||||
|
||||
impl WalIngest {
|
||||
pub async fn new(
|
||||
timeline: &Timeline,
|
||||
startpoint: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<WalIngest> {
|
||||
) -> Result<WalIngest, WalIngestError> {
|
||||
// Fetch the latest checkpoint into memory, so that we can compare with it
|
||||
// quickly in `ingest_record` and update it when it changes.
|
||||
let checkpoint_bytes = timeline.get_checkpoint(startpoint, ctx).await?;
|
||||
@@ -145,7 +234,7 @@ impl WalIngest {
|
||||
interpreted: InterpretedWalRecord,
|
||||
modification: &mut DatadirModification<'_>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<bool> {
|
||||
) -> Result<bool, WalIngestError> {
|
||||
WAL_INGEST.records_received.inc();
|
||||
let prev_len = modification.len();
|
||||
|
||||
@@ -288,7 +377,7 @@ impl WalIngest {
|
||||
}
|
||||
|
||||
/// This is the same as AdjustToFullTransactionId(xid) in PostgreSQL
|
||||
fn adjust_to_full_transaction_id(&self, xid: TransactionId) -> Result<u64> {
|
||||
fn adjust_to_full_transaction_id(&self, xid: TransactionId) -> Result<u64, WalIngestError> {
|
||||
let next_full_xid =
|
||||
enum_pgversion_dispatch!(&self.checkpoint, CheckPoint, cp, { cp.nextXid.value });
|
||||
|
||||
@@ -298,9 +387,9 @@ impl WalIngest {
|
||||
if xid > next_xid {
|
||||
// Wraparound occurred, must be from a prev epoch.
|
||||
if epoch == 0 {
|
||||
bail!(
|
||||
Err(WalIngestErrorKind::LogicalError(anyhow::anyhow!(
|
||||
"apparent XID wraparound with prepared transaction XID {xid}, nextXid is {next_full_xid}"
|
||||
);
|
||||
)))?;
|
||||
}
|
||||
epoch -= 1;
|
||||
}
|
||||
@@ -313,7 +402,7 @@ impl WalIngest {
|
||||
clear_vm_bits: ClearVmBits,
|
||||
modification: &mut DatadirModification<'_>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), WalIngestError> {
|
||||
let ClearVmBits {
|
||||
new_heap_blkno,
|
||||
old_heap_blkno,
|
||||
@@ -402,7 +491,7 @@ impl WalIngest {
|
||||
create: DbaseCreate,
|
||||
modification: &mut DatadirModification<'_>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), WalIngestError> {
|
||||
let DbaseCreate {
|
||||
db_id,
|
||||
tablespace_id,
|
||||
@@ -505,7 +594,7 @@ impl WalIngest {
|
||||
dbase_drop: DbaseDrop,
|
||||
modification: &mut DatadirModification<'_>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), WalIngestError> {
|
||||
let DbaseDrop {
|
||||
db_id,
|
||||
tablespace_ids,
|
||||
@@ -523,7 +612,7 @@ impl WalIngest {
|
||||
create: SmgrCreate,
|
||||
modification: &mut DatadirModification<'_>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), WalIngestError> {
|
||||
let SmgrCreate { rel } = create;
|
||||
self.put_rel_creation(modification, rel, ctx).await?;
|
||||
Ok(())
|
||||
@@ -537,7 +626,7 @@ impl WalIngest {
|
||||
truncate: XlSmgrTruncate,
|
||||
modification: &mut DatadirModification<'_>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), WalIngestError> {
|
||||
let XlSmgrTruncate {
|
||||
blkno,
|
||||
rnode,
|
||||
@@ -689,7 +778,7 @@ impl WalIngest {
|
||||
record: XactRecord,
|
||||
modification: &mut DatadirModification<'_>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), WalIngestError> {
|
||||
let (xact_common, is_commit, is_prepared) = match record {
|
||||
XactRecord::Prepare(XactPrepare { xl_xid, data }) => {
|
||||
let xid: u64 = if modification.tline.pg_version >= 17 {
|
||||
@@ -813,7 +902,7 @@ impl WalIngest {
|
||||
truncate: ClogTruncate,
|
||||
modification: &mut DatadirModification<'_>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), WalIngestError> {
|
||||
let ClogTruncate {
|
||||
pageno,
|
||||
oldest_xid,
|
||||
@@ -889,7 +978,7 @@ impl WalIngest {
|
||||
zero_page: ClogZeroPage,
|
||||
modification: &mut DatadirModification<'_>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), WalIngestError> {
|
||||
let ClogZeroPage { segno, rpageno } = zero_page;
|
||||
|
||||
self.put_slru_page_image(
|
||||
@@ -907,7 +996,7 @@ impl WalIngest {
|
||||
&mut self,
|
||||
modification: &mut DatadirModification,
|
||||
xlrec: &XlMultiXactCreate,
|
||||
) -> Result<()> {
|
||||
) -> Result<(), WalIngestError> {
|
||||
// Create WAL record for updating the multixact-offsets page
|
||||
let pageno = xlrec.mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
|
||||
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
|
||||
@@ -1010,7 +1099,7 @@ impl WalIngest {
|
||||
modification: &mut DatadirModification<'_>,
|
||||
xlrec: &XlMultiXactTruncate,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<()> {
|
||||
) -> Result<(), WalIngestError> {
|
||||
let (maxsegment, startsegment, endsegment) =
|
||||
enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
|
||||
cp.oldestMulti = xlrec.end_trunc_off;
|
||||
@@ -1058,7 +1147,7 @@ impl WalIngest {
|
||||
zero_page: MultiXactZeroPage,
|
||||
modification: &mut DatadirModification<'_>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<()> {
|
||||
) -> Result<(), WalIngestError> {
|
||||
let MultiXactZeroPage {
|
||||
slru_kind,
|
||||
segno,
|
||||
@@ -1080,7 +1169,7 @@ impl WalIngest {
|
||||
update: RelmapUpdate,
|
||||
modification: &mut DatadirModification<'_>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<()> {
|
||||
) -> Result<(), WalIngestError> {
|
||||
let RelmapUpdate { update, buf } = update;
|
||||
|
||||
modification
|
||||
@@ -1093,7 +1182,7 @@ impl WalIngest {
|
||||
raw_record: RawXlogRecord,
|
||||
modification: &mut DatadirModification<'_>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<()> {
|
||||
) -> Result<(), WalIngestError> {
|
||||
let RawXlogRecord { info, lsn, mut buf } = raw_record;
|
||||
let pg_version = modification.tline.pg_version;
|
||||
|
||||
@@ -1235,12 +1324,12 @@ impl WalIngest {
|
||||
put: PutLogicalMessage,
|
||||
modification: &mut DatadirModification<'_>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<()> {
|
||||
) -> Result<(), WalIngestError> {
|
||||
let PutLogicalMessage { path, buf } = put;
|
||||
modification.put_file(path.as_str(), &buf, ctx).await
|
||||
}
|
||||
|
||||
fn ingest_standby_record(&mut self, record: StandbyRecord) -> Result<()> {
|
||||
fn ingest_standby_record(&mut self, record: StandbyRecord) -> Result<(), WalIngestError> {
|
||||
match record {
|
||||
StandbyRecord::RunningXacts(running_xacts) => {
|
||||
enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
|
||||
@@ -1258,7 +1347,7 @@ impl WalIngest {
|
||||
&mut self,
|
||||
record: ReploriginRecord,
|
||||
modification: &mut DatadirModification<'_>,
|
||||
) -> Result<()> {
|
||||
) -> Result<(), WalIngestError> {
|
||||
match record {
|
||||
ReploriginRecord::Set(set) => {
|
||||
modification
|
||||
@@ -1278,7 +1367,7 @@ impl WalIngest {
|
||||
modification: &mut DatadirModification<'_>,
|
||||
rel: RelTag,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<()> {
|
||||
) -> Result<(), WalIngestError> {
|
||||
modification.put_rel_creation(rel, 0, ctx).await?;
|
||||
Ok(())
|
||||
}
|
||||
@@ -1291,7 +1380,7 @@ impl WalIngest {
|
||||
blknum: BlockNumber,
|
||||
img: Bytes,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), PageReconstructError> {
|
||||
) -> Result<(), WalIngestError> {
|
||||
self.handle_rel_extend(modification, rel, blknum, ctx)
|
||||
.await?;
|
||||
modification.put_rel_page_image(rel, blknum, img)?;
|
||||
@@ -1305,7 +1394,7 @@ impl WalIngest {
|
||||
blknum: BlockNumber,
|
||||
rec: NeonWalRecord,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<()> {
|
||||
) -> Result<(), WalIngestError> {
|
||||
self.handle_rel_extend(modification, rel, blknum, ctx)
|
||||
.await?;
|
||||
modification.put_rel_wal_record(rel, blknum, rec)?;
|
||||
@@ -1318,7 +1407,7 @@ impl WalIngest {
|
||||
rel: RelTag,
|
||||
nblocks: BlockNumber,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), WalIngestError> {
|
||||
modification.put_rel_truncation(rel, nblocks, ctx).await?;
|
||||
Ok(())
|
||||
}
|
||||
@@ -1329,7 +1418,7 @@ impl WalIngest {
|
||||
rel: RelTag,
|
||||
blknum: BlockNumber,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), PageReconstructError> {
|
||||
) -> Result<(), WalIngestError> {
|
||||
let new_nblocks = blknum + 1;
|
||||
// Check if the relation exists. We implicitly create relations on first
|
||||
// record.
|
||||
@@ -1423,7 +1512,7 @@ impl WalIngest {
|
||||
blknum: BlockNumber,
|
||||
img: Bytes,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<()> {
|
||||
) -> Result<(), WalIngestError> {
|
||||
if !self.shard.is_shard_zero() {
|
||||
return Ok(());
|
||||
}
|
||||
@@ -1441,7 +1530,7 @@ impl WalIngest {
|
||||
segno: u32,
|
||||
blknum: BlockNumber,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), WalIngestError> {
|
||||
// we don't use a cache for this like we do for relations. SLRUS are explcitly
|
||||
// extended with ZEROPAGE records, not with commit records, so it happens
|
||||
// a lot less frequently.
|
||||
@@ -1509,6 +1598,7 @@ async fn get_relsize(
|
||||
#[allow(clippy::bool_assert_comparison)]
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use anyhow::Result;
|
||||
use postgres_ffi::RELSEG_SIZE;
|
||||
|
||||
use super::*;
|
||||
@@ -1530,7 +1620,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_zeroed_checkpoint_decodes_correctly() -> Result<()> {
|
||||
async fn test_zeroed_checkpoint_decodes_correctly() -> Result<(), anyhow::Error> {
|
||||
for i in 14..=16 {
|
||||
dispatch_pgversion!(i, {
|
||||
pgv::CheckPoint::decode(&pgv::ZERO_CHECKPOINT)?;
|
||||
|
||||
@@ -22,6 +22,7 @@ use pageserver_api::controller_api::{
|
||||
MetadataHealthListUnhealthyResponse, MetadataHealthUpdateRequest, MetadataHealthUpdateResponse,
|
||||
NodeAvailability, NodeConfigureRequest, NodeRegisterRequest, SafekeeperSchedulingPolicyRequest,
|
||||
ShardsPreferredAzsRequest, TenantCreateRequest, TenantPolicyRequest, TenantShardMigrateRequest,
|
||||
TimelineImportRequest,
|
||||
};
|
||||
use pageserver_api::models::{
|
||||
DetachBehavior, LsnLeaseRequest, TenantConfigPatchRequest, TenantConfigRequest,
|
||||
@@ -1286,6 +1287,37 @@ async fn handle_tenant_import(req: Request<Body>) -> Result<Response<Body>, ApiE
|
||||
)
|
||||
}
|
||||
|
||||
async fn handle_timeline_import(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
|
||||
let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
|
||||
check_permissions(&req, Scope::PageServerApi)?;
|
||||
maybe_rate_limit(&req, tenant_id).await;
|
||||
|
||||
let mut req = match maybe_forward(req).await {
|
||||
ForwardOutcome::Forwarded(res) => {
|
||||
return res;
|
||||
}
|
||||
ForwardOutcome::NotForwarded(req) => req,
|
||||
};
|
||||
|
||||
let import_req = json_request::<TimelineImportRequest>(&mut req).await?;
|
||||
|
||||
let state = get_state(&req);
|
||||
|
||||
if import_req.tenant_id != tenant_id || import_req.timeline_id != timeline_id {
|
||||
return Err(ApiError::BadRequest(anyhow::anyhow!(
|
||||
"tenant id or timeline id mismatch: url={tenant_id}/{timeline_id}, body={}/{}",
|
||||
import_req.tenant_id,
|
||||
import_req.timeline_id
|
||||
)));
|
||||
}
|
||||
|
||||
json_response(
|
||||
StatusCode::OK,
|
||||
state.service.timeline_import(import_req).await?,
|
||||
)
|
||||
}
|
||||
|
||||
async fn handle_tenants_dump(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::Admin)?;
|
||||
|
||||
@@ -1959,6 +1991,16 @@ pub fn make_router(
|
||||
RequestName("debug_v1_tenant_locate"),
|
||||
)
|
||||
})
|
||||
.post(
|
||||
"/debug/v1/tenant/:tenant_id/timeline/:timeline_id/import",
|
||||
|r| {
|
||||
named_request_span(
|
||||
r,
|
||||
handle_timeline_import,
|
||||
RequestName("debug_v1_timeline_import"),
|
||||
)
|
||||
},
|
||||
)
|
||||
.get("/debug/v1/scheduler", |r| {
|
||||
named_request_span(r, handle_scheduler_dump, RequestName("debug_v1_scheduler"))
|
||||
})
|
||||
|
||||
@@ -1852,6 +1852,7 @@ impl Service {
|
||||
};
|
||||
|
||||
if insert {
|
||||
let config = attach_req.config.clone().unwrap_or_default();
|
||||
let tsp = TenantShardPersistence {
|
||||
tenant_id: attach_req.tenant_shard_id.tenant_id.to_string(),
|
||||
shard_number: attach_req.tenant_shard_id.shard_number.0 as i32,
|
||||
@@ -1860,7 +1861,7 @@ impl Service {
|
||||
generation: attach_req.generation_override.or(Some(0)),
|
||||
generation_pageserver: None,
|
||||
placement_policy: serde_json::to_string(&PlacementPolicy::Attached(0)).unwrap(),
|
||||
config: serde_json::to_string(&TenantConfig::default()).unwrap(),
|
||||
config: serde_json::to_string(&config).unwrap(),
|
||||
splitting: SplitState::default(),
|
||||
scheduling_policy: serde_json::to_string(&ShardSchedulingPolicy::default())
|
||||
.unwrap(),
|
||||
@@ -1883,16 +1884,16 @@ impl Service {
|
||||
Ok(()) => {
|
||||
tracing::info!("Inserted shard {} in database", attach_req.tenant_shard_id);
|
||||
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
locked.tenants.insert(
|
||||
let mut shard = TenantShard::new(
|
||||
attach_req.tenant_shard_id,
|
||||
TenantShard::new(
|
||||
attach_req.tenant_shard_id,
|
||||
ShardIdentity::unsharded(),
|
||||
PlacementPolicy::Attached(0),
|
||||
None,
|
||||
),
|
||||
ShardIdentity::unsharded(),
|
||||
PlacementPolicy::Attached(0),
|
||||
None,
|
||||
);
|
||||
shard.config = config;
|
||||
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
locked.tenants.insert(attach_req.tenant_shard_id, shard);
|
||||
tracing::info!("Inserted shard {} in memory", attach_req.tenant_shard_id);
|
||||
}
|
||||
}
|
||||
@@ -1977,11 +1978,12 @@ impl Service {
|
||||
.set_attached(scheduler, attach_req.node_id);
|
||||
|
||||
tracing::info!(
|
||||
"attach_hook: tenant {} set generation {:?}, pageserver {}",
|
||||
"attach_hook: tenant {} set generation {:?}, pageserver {}, config {:?}",
|
||||
attach_req.tenant_shard_id,
|
||||
tenant_shard.generation,
|
||||
// TODO: this is an odd number of 0xf's
|
||||
attach_req.node_id.unwrap_or(utils::id::NodeId(0xfffffff))
|
||||
attach_req.node_id.unwrap_or(utils::id::NodeId(0xfffffff)),
|
||||
attach_req.config,
|
||||
);
|
||||
|
||||
// Trick the reconciler into not doing anything for this tenant: this helps
|
||||
|
||||
@@ -12,13 +12,16 @@ use crate::persistence::{
|
||||
use crate::safekeeper::Safekeeper;
|
||||
use anyhow::Context;
|
||||
use http_utils::error::ApiError;
|
||||
use pageserver_api::controller_api::{SafekeeperDescribeResponse, SkSchedulingPolicy};
|
||||
use pageserver_api::controller_api::{
|
||||
SafekeeperDescribeResponse, SkSchedulingPolicy, TimelineImportRequest,
|
||||
};
|
||||
use pageserver_api::models::{self, SafekeeperInfo, SafekeepersInfo, TimelineInfo};
|
||||
use safekeeper_api::membership::{MemberSet, SafekeeperId};
|
||||
use tokio::task::JoinSet;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::id::{NodeId, TenantId, TimelineId};
|
||||
use utils::logging::SecretString;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use super::Service;
|
||||
|
||||
@@ -298,6 +301,31 @@ impl Service {
|
||||
timeline_id,
|
||||
})
|
||||
}
|
||||
|
||||
/// Directly insert the timeline into the database without reconciling it with safekeepers.
|
||||
///
|
||||
/// Useful if the timeline already exists on the specified safekeepers,
|
||||
/// but we want to make it storage controller managed.
|
||||
pub(crate) async fn timeline_import(&self, req: TimelineImportRequest) -> Result<(), ApiError> {
|
||||
let persistence = TimelinePersistence {
|
||||
tenant_id: req.tenant_id.to_string(),
|
||||
timeline_id: req.timeline_id.to_string(),
|
||||
start_lsn: Lsn::INVALID.into(),
|
||||
generation: 1,
|
||||
sk_set: req.sk_set.iter().map(|sk_id| sk_id.0 as i64).collect(),
|
||||
new_sk_set: None,
|
||||
cplane_notified_generation: 1,
|
||||
deleted_at: None,
|
||||
};
|
||||
let inserted = self.persistence.insert_timeline(persistence).await?;
|
||||
if inserted {
|
||||
tracing::info!("imported timeline into db");
|
||||
} else {
|
||||
tracing::info!("didn't import timeline into db, as it is already present in db");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Perform timeline deletion on safekeepers. Will return success: we persist the deletion into the reconciler.
|
||||
pub(super) async fn tenant_timeline_delete_safekeepers(
|
||||
self: &Arc<Self>,
|
||||
|
||||
@@ -1986,10 +1986,13 @@ class NeonStorageController(MetricsGetter, LogUtils):
|
||||
tenant_shard_id: TenantId | TenantShardId,
|
||||
pageserver_id: int,
|
||||
generation_override: int | None = None,
|
||||
config: None | dict[str, Any] = None,
|
||||
) -> int:
|
||||
body = {"tenant_shard_id": str(tenant_shard_id), "node_id": pageserver_id}
|
||||
if generation_override is not None:
|
||||
body["generation_override"] = generation_override
|
||||
if config is not None:
|
||||
body["config"] = config
|
||||
|
||||
response = self.request(
|
||||
"POST",
|
||||
@@ -2884,13 +2887,14 @@ class NeonPageserver(PgProtocol, LogUtils):
|
||||
self,
|
||||
immediate: bool = False,
|
||||
timeout_in_seconds: int | None = None,
|
||||
extra_env_vars: dict[str, str] | None = None,
|
||||
):
|
||||
"""
|
||||
High level wrapper for restart: restarts the process, and waits for
|
||||
tenant state to stabilize.
|
||||
"""
|
||||
self.stop(immediate=immediate)
|
||||
self.start(timeout_in_seconds=timeout_in_seconds)
|
||||
self.start(timeout_in_seconds=timeout_in_seconds, extra_env_vars=extra_env_vars)
|
||||
self.quiesce_tenants()
|
||||
|
||||
def quiesce_tenants(self):
|
||||
@@ -2979,11 +2983,12 @@ class NeonPageserver(PgProtocol, LogUtils):
|
||||
to call into the pageserver HTTP client.
|
||||
"""
|
||||
client = self.http_client()
|
||||
if generation is None:
|
||||
generation = self.env.storage_controller.attach_hook_issue(tenant_id, self.id)
|
||||
elif override_storage_controller_generation:
|
||||
if generation is None or override_storage_controller_generation:
|
||||
generation = self.env.storage_controller.attach_hook_issue(
|
||||
tenant_id, self.id, generation
|
||||
tenant_id,
|
||||
self.id,
|
||||
generation_override=generation if override_storage_controller_generation else None,
|
||||
config=config,
|
||||
)
|
||||
return client.tenant_attach(
|
||||
tenant_id,
|
||||
|
||||
@@ -65,13 +65,11 @@ def single_timeline(
|
||||
assert ps_http.tenant_list() == []
|
||||
|
||||
def attach(tenant):
|
||||
# NB: create the new tenant in the storage controller with the correct tenant config. This
|
||||
# will pick up the existing tenant data from remote storage. If we just attach it to the
|
||||
# Pageserver, the storage controller will reset the tenant config to the default.
|
||||
env.create_tenant(
|
||||
tenant_id=tenant,
|
||||
timeline_id=template_timeline,
|
||||
conf=template_config,
|
||||
env.pageserver.tenant_attach(
|
||||
tenant,
|
||||
config=template_config,
|
||||
generation=100,
|
||||
override_storage_controller_generation=True,
|
||||
)
|
||||
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=22) as executor:
|
||||
|
||||
@@ -66,11 +66,11 @@ def test_basebackup_with_high_slru_count(
|
||||
|
||||
n_txns = 500000
|
||||
|
||||
def setup_wrapper(env: NeonEnv):
|
||||
return setup_tenant_template(env, n_txns)
|
||||
|
||||
env = setup_pageserver_with_tenants(
|
||||
neon_env_builder, f"large_slru_count-{n_tenants}-{n_txns}", n_tenants, setup_wrapper
|
||||
neon_env_builder,
|
||||
f"large_slru_count-{n_tenants}-{n_txns}",
|
||||
n_tenants,
|
||||
lambda env: setup_tenant_template(env, n_txns),
|
||||
)
|
||||
run_benchmark(env, pg_bin, record, duration)
|
||||
|
||||
@@ -80,10 +80,6 @@ def setup_tenant_template(env: NeonEnv, n_txns: int):
|
||||
"gc_period": "0s", # disable periodic gc
|
||||
"checkpoint_timeout": "10 years",
|
||||
"compaction_period": "0s", # disable periodic compaction
|
||||
"compaction_threshold": 10,
|
||||
"compaction_target_size": 134217728,
|
||||
"checkpoint_distance": 268435456,
|
||||
"image_creation_threshold": 3,
|
||||
}
|
||||
|
||||
template_tenant, template_timeline = env.create_tenant(set_default=True)
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
import concurrent.futures
|
||||
import dataclasses
|
||||
import json
|
||||
import re
|
||||
import threading
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
@@ -31,15 +34,15 @@ class PageServicePipeliningConfigPipelined(PageServicePipeliningConfig):
|
||||
mode: str = "pipelined"
|
||||
|
||||
|
||||
EXECUTION = ["concurrent-futures", "tasks"]
|
||||
EXECUTION = ["concurrent-futures"]
|
||||
|
||||
NON_BATCHABLE: list[PageServicePipeliningConfig] = [PageServicePipeliningConfigSerial()]
|
||||
for max_batch_size in [1, 32]:
|
||||
for execution in EXECUTION:
|
||||
NON_BATCHABLE.append(PageServicePipeliningConfigPipelined(max_batch_size, execution))
|
||||
|
||||
BATCHABLE: list[PageServicePipeliningConfig] = [PageServicePipeliningConfigSerial()]
|
||||
for max_batch_size in [1, 2, 4, 8, 16, 32]:
|
||||
BATCHABLE: list[PageServicePipeliningConfig] = []
|
||||
for max_batch_size in [32]:
|
||||
for execution in EXECUTION:
|
||||
BATCHABLE.append(PageServicePipeliningConfigPipelined(max_batch_size, execution))
|
||||
|
||||
@@ -47,19 +50,6 @@ for max_batch_size in [1, 2, 4, 8, 16, 32]:
|
||||
@pytest.mark.parametrize(
|
||||
"tablesize_mib, pipelining_config, target_runtime, effective_io_concurrency, readhead_buffer_size, name",
|
||||
[
|
||||
# non-batchable workloads
|
||||
# (A separate benchmark will consider latency).
|
||||
*[
|
||||
(
|
||||
50,
|
||||
config,
|
||||
TARGET_RUNTIME,
|
||||
1,
|
||||
128,
|
||||
f"not batchable {dataclasses.asdict(config)}",
|
||||
)
|
||||
for config in NON_BATCHABLE
|
||||
],
|
||||
# batchable workloads should show throughput and CPU efficiency improvements
|
||||
*[
|
||||
(
|
||||
@@ -137,7 +127,14 @@ def test_throughput(
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
ps_http = env.pageserver.http_client()
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
endpoint = env.endpoints.create_start(
|
||||
"main",
|
||||
config_lines=[
|
||||
# minimal lfc & small shared buffers to force requests to pageserver
|
||||
"neon.max_file_cache_size=1MB",
|
||||
"shared_buffers=10MB",
|
||||
],
|
||||
)
|
||||
conn = endpoint.connect()
|
||||
cur = conn.cursor()
|
||||
|
||||
@@ -155,7 +152,6 @@ def test_throughput(
|
||||
tablesize = tablesize_mib * 1024 * 1024
|
||||
npages = tablesize // (8 * 1024)
|
||||
cur.execute("INSERT INTO t SELECT generate_series(1, %s)", (npages,))
|
||||
# TODO: can we force postgres to do sequential scans?
|
||||
|
||||
#
|
||||
# Run the workload, collect `Metrics` before and after, calculate difference, normalize.
|
||||
@@ -211,31 +207,73 @@ def test_throughput(
|
||||
).value,
|
||||
)
|
||||
|
||||
def workload() -> Metrics:
|
||||
def workload(disruptor_started: threading.Event) -> Metrics:
|
||||
disruptor_started.wait()
|
||||
start = time.time()
|
||||
iters = 0
|
||||
while time.time() - start < target_runtime or iters < 2:
|
||||
log.info("Seqscan %d", iters)
|
||||
if iters == 1:
|
||||
# round zero for warming up
|
||||
before = get_metrics()
|
||||
cur.execute(
|
||||
"select clear_buffer_cache()"
|
||||
) # TODO: what about LFC? doesn't matter right now because LFC isn't enabled by default in tests
|
||||
cur.execute("select sum(data::bigint) from t")
|
||||
assert cur.fetchall()[0][0] == npages * (npages + 1) // 2
|
||||
iters += 1
|
||||
after = get_metrics()
|
||||
return (after - before).normalize(iters - 1)
|
||||
|
||||
def disruptor(disruptor_started: threading.Event, stop_disruptor: threading.Event):
|
||||
conn = endpoint.connect()
|
||||
cur = conn.cursor()
|
||||
iters = 0
|
||||
while True:
|
||||
cur.execute("SELECT pg_logical_emit_message(true, 'test', 'advancelsn')")
|
||||
if stop_disruptor.is_set():
|
||||
break
|
||||
disruptor_started.set()
|
||||
iters += 1
|
||||
time.sleep(0.001)
|
||||
return iters
|
||||
|
||||
env.pageserver.patch_config_toml_nonrecursive(
|
||||
{"page_service_pipelining": dataclasses.asdict(pipelining_config)}
|
||||
)
|
||||
env.pageserver.restart()
|
||||
metrics = workload()
|
||||
|
||||
# set trace for log analysis below
|
||||
env.pageserver.restart(extra_env_vars={"RUST_LOG": "info,pageserver::page_service=trace"})
|
||||
|
||||
log.info("Starting workload")
|
||||
|
||||
with concurrent.futures.ThreadPoolExecutor() as executor:
|
||||
disruptor_started = threading.Event()
|
||||
stop_disruptor = threading.Event()
|
||||
disruptor_fut = executor.submit(disruptor, disruptor_started, stop_disruptor)
|
||||
workload_fut = executor.submit(workload, disruptor_started)
|
||||
metrics = workload_fut.result()
|
||||
stop_disruptor.set()
|
||||
ndisruptions = disruptor_fut.result()
|
||||
log.info("Disruptor issued %d disrupting requests", ndisruptions)
|
||||
|
||||
log.info("Results: %s", metrics)
|
||||
|
||||
since_last_start: list[str] = []
|
||||
for line in env.pageserver.logfile.read_text().splitlines():
|
||||
if "git:" in line:
|
||||
since_last_start = []
|
||||
since_last_start.append(line)
|
||||
|
||||
stopping_batching_because_re = re.compile(
|
||||
r"stopping batching because (LSN changed|of batch size|timeline object mismatch|batch key changed|same page was requested at different LSNs|.*)"
|
||||
)
|
||||
reasons_for_stopping_batching = {}
|
||||
for line in since_last_start:
|
||||
match = stopping_batching_because_re.search(line)
|
||||
if match:
|
||||
if match.group(1) not in reasons_for_stopping_batching:
|
||||
reasons_for_stopping_batching[match.group(1)] = 0
|
||||
reasons_for_stopping_batching[match.group(1)] += 1
|
||||
|
||||
log.info("Reasons for stopping batching: %s", reasons_for_stopping_batching)
|
||||
|
||||
#
|
||||
# Sanity-checks on the collected data
|
||||
#
|
||||
|
||||
@@ -15,7 +15,6 @@ if TYPE_CHECKING:
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin
|
||||
|
||||
|
||||
@pytest.mark.skip("See https://github.com/neondatabase/neon/issues/11395")
|
||||
def test_pageserver_getpage_throttle(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
@@ -96,17 +95,12 @@ def test_pageserver_getpage_throttle(neon_env_builder: NeonEnvBuilder, pg_bin: P
|
||||
_, marker_offset = wait_until(lambda: env.pageserver.assert_log_contains(marker, offset=None))
|
||||
|
||||
log.info("run pagebench")
|
||||
duration_secs = 10
|
||||
duration_secs = 20
|
||||
actual_ncompleted = run_pagebench_at_max_speed_and_get_total_requests_completed(duration_secs)
|
||||
|
||||
log.info("validate the client is capped at the configured rps limit")
|
||||
expect_ncompleted = duration_secs * rate_limit_rps
|
||||
delta_abs = abs(expect_ncompleted - actual_ncompleted)
|
||||
threshold = 0.05 * expect_ncompleted
|
||||
assert threshold / rate_limit_rps < 0.1 * duration_secs, (
|
||||
"test self-test: unrealistic expecations regarding precision in this test"
|
||||
)
|
||||
assert delta_abs < 0.05 * expect_ncompleted, (
|
||||
assert pytest.approx(expect_ncompleted, 0.05) == actual_ncompleted, (
|
||||
"the throttling deviates more than 5percent from the expectation"
|
||||
)
|
||||
|
||||
@@ -120,6 +114,7 @@ def test_pageserver_getpage_throttle(neon_env_builder: NeonEnvBuilder, pg_bin: P
|
||||
timeout=compaction_period,
|
||||
)
|
||||
|
||||
log.info("validate the metrics")
|
||||
smgr_query_seconds_post = ps_http.get_metric_value(smgr_metric_name, smgr_metrics_query)
|
||||
assert smgr_query_seconds_post is not None
|
||||
throttled_usecs_post = ps_http.get_metric_value(throttle_metric_name, throttle_metrics_query)
|
||||
@@ -128,12 +123,13 @@ def test_pageserver_getpage_throttle(neon_env_builder: NeonEnvBuilder, pg_bin: P
|
||||
actual_throttled_usecs = throttled_usecs_post - throttled_usecs_pre
|
||||
actual_throttled_secs = actual_throttled_usecs / 1_000_000
|
||||
|
||||
log.info("validate that the metric doesn't include throttle wait time")
|
||||
assert duration_secs >= 10 * actual_smgr_query_seconds, (
|
||||
"smgr metrics should not include throttle wait time"
|
||||
assert pytest.approx(actual_throttled_secs + actual_smgr_query_seconds, 0.1) == duration_secs, (
|
||||
"throttling and processing latency = total request time; this assert validates thi holds on average"
|
||||
)
|
||||
|
||||
log.info("validate that the throttling wait time metrics is correct")
|
||||
assert pytest.approx(actual_throttled_secs + actual_smgr_query_seconds, 0.1) == duration_secs, (
|
||||
"most of the time in this test is spent throttled because the rate-limit's contribution to latency dominates"
|
||||
# without this assertion, the test would pass even if the throttling was completely broken
|
||||
# but the request processing is so slow that it makes up for the latency that a correct throttling
|
||||
# implementation would add
|
||||
assert actual_smgr_query_seconds < 0.66 * duration_secs, (
|
||||
"test self-test: request processing is consuming most of the wall clock time; this risks that we're not actually testing throttling"
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user