mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-27 10:00:38 +00:00
pageserver: only store SLRUs & aux files on shard zero (#9786)
## Problem Since https://github.com/neondatabase/neon/pull/9423 the non-zero shards no longer need SLRU content in order to do GC. This data is now redundant on shards >0. One release cycle after merging that PR, we may merge this one, which also stops writing those pages to shards > 0, reaping the efficiency benefit. Closes: https://github.com/neondatabase/neon/issues/7512 Closes: https://github.com/neondatabase/neon/issues/9641 ## Summary of changes - Avoid storing SLRUs on non-zero shards - Bonus: avoid storing aux files on non-zero shards
This commit is contained in:
@@ -575,18 +575,24 @@ async fn import_file(
|
||||
} else if file_path.starts_with("pg_xact") {
|
||||
let slru = SlruKind::Clog;
|
||||
|
||||
import_slru(modification, slru, file_path, reader, len, ctx).await?;
|
||||
debug!("imported clog slru");
|
||||
if modification.tline.tenant_shard_id.is_shard_zero() {
|
||||
import_slru(modification, slru, file_path, reader, len, ctx).await?;
|
||||
debug!("imported clog slru");
|
||||
}
|
||||
} else if file_path.starts_with("pg_multixact/offsets") {
|
||||
let slru = SlruKind::MultiXactOffsets;
|
||||
|
||||
import_slru(modification, slru, file_path, reader, len, ctx).await?;
|
||||
debug!("imported multixact offsets slru");
|
||||
if modification.tline.tenant_shard_id.is_shard_zero() {
|
||||
import_slru(modification, slru, file_path, reader, len, ctx).await?;
|
||||
debug!("imported multixact offsets slru");
|
||||
}
|
||||
} else if file_path.starts_with("pg_multixact/members") {
|
||||
let slru = SlruKind::MultiXactMembers;
|
||||
|
||||
import_slru(modification, slru, file_path, reader, len, ctx).await?;
|
||||
debug!("imported multixact members slru");
|
||||
if modification.tline.tenant_shard_id.is_shard_zero() {
|
||||
import_slru(modification, slru, file_path, reader, len, ctx).await?;
|
||||
debug!("imported multixact members slru");
|
||||
}
|
||||
} else if file_path.starts_with("pg_twophase") {
|
||||
let bytes = read_all_bytes(reader).await?;
|
||||
|
||||
|
||||
@@ -530,6 +530,7 @@ impl Timeline {
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Bytes, PageReconstructError> {
|
||||
assert!(self.tenant_shard_id.is_shard_zero());
|
||||
let n_blocks = self
|
||||
.get_slru_segment_size(kind, segno, Version::Lsn(lsn), ctx)
|
||||
.await?;
|
||||
@@ -552,6 +553,7 @@ impl Timeline {
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Bytes, PageReconstructError> {
|
||||
assert!(self.tenant_shard_id.is_shard_zero());
|
||||
let key = slru_block_to_key(kind, segno, blknum);
|
||||
self.get(key, lsn, ctx).await
|
||||
}
|
||||
@@ -564,6 +566,7 @@ impl Timeline {
|
||||
version: Version<'_>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<BlockNumber, PageReconstructError> {
|
||||
assert!(self.tenant_shard_id.is_shard_zero());
|
||||
let key = slru_segment_size_to_key(kind, segno);
|
||||
let mut buf = version.get(self, key, ctx).await?;
|
||||
Ok(buf.get_u32_le())
|
||||
@@ -577,6 +580,7 @@ impl Timeline {
|
||||
version: Version<'_>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<bool, PageReconstructError> {
|
||||
assert!(self.tenant_shard_id.is_shard_zero());
|
||||
// fetch directory listing
|
||||
let key = slru_dir_to_key(kind);
|
||||
let buf = version.get(self, key, ctx).await?;
|
||||
@@ -1047,26 +1051,28 @@ impl Timeline {
|
||||
}
|
||||
|
||||
// Iterate SLRUs next
|
||||
for kind in [
|
||||
SlruKind::Clog,
|
||||
SlruKind::MultiXactMembers,
|
||||
SlruKind::MultiXactOffsets,
|
||||
] {
|
||||
let slrudir_key = slru_dir_to_key(kind);
|
||||
result.add_key(slrudir_key);
|
||||
let buf = self.get(slrudir_key, lsn, ctx).await?;
|
||||
let dir = SlruSegmentDirectory::des(&buf)?;
|
||||
let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
|
||||
segments.sort_unstable();
|
||||
for segno in segments {
|
||||
let segsize_key = slru_segment_size_to_key(kind, segno);
|
||||
let mut buf = self.get(segsize_key, lsn, ctx).await?;
|
||||
let segsize = buf.get_u32_le();
|
||||
if self.tenant_shard_id.is_shard_zero() {
|
||||
for kind in [
|
||||
SlruKind::Clog,
|
||||
SlruKind::MultiXactMembers,
|
||||
SlruKind::MultiXactOffsets,
|
||||
] {
|
||||
let slrudir_key = slru_dir_to_key(kind);
|
||||
result.add_key(slrudir_key);
|
||||
let buf = self.get(slrudir_key, lsn, ctx).await?;
|
||||
let dir = SlruSegmentDirectory::des(&buf)?;
|
||||
let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
|
||||
segments.sort_unstable();
|
||||
for segno in segments {
|
||||
let segsize_key = slru_segment_size_to_key(kind, segno);
|
||||
let mut buf = self.get(segsize_key, lsn, ctx).await?;
|
||||
let segsize = buf.get_u32_le();
|
||||
|
||||
result.add_range(
|
||||
slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize),
|
||||
);
|
||||
result.add_key(segsize_key);
|
||||
result.add_range(
|
||||
slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize),
|
||||
);
|
||||
result.add_key(segsize_key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1468,6 +1474,10 @@ impl<'a> DatadirModification<'a> {
|
||||
blknum: BlockNumber,
|
||||
rec: NeonWalRecord,
|
||||
) -> anyhow::Result<()> {
|
||||
if !self.tline.tenant_shard_id.is_shard_zero() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
self.put(
|
||||
slru_block_to_key(kind, segno, blknum),
|
||||
Value::WalRecord(rec),
|
||||
@@ -1501,6 +1511,8 @@ impl<'a> DatadirModification<'a> {
|
||||
blknum: BlockNumber,
|
||||
img: Bytes,
|
||||
) -> anyhow::Result<()> {
|
||||
assert!(self.tline.tenant_shard_id.is_shard_zero());
|
||||
|
||||
let key = slru_block_to_key(kind, segno, blknum);
|
||||
if !key.is_valid_key_on_write_path() {
|
||||
anyhow::bail!(
|
||||
@@ -1542,6 +1554,7 @@ impl<'a> DatadirModification<'a> {
|
||||
segno: u32,
|
||||
blknum: BlockNumber,
|
||||
) -> anyhow::Result<()> {
|
||||
assert!(self.tline.tenant_shard_id.is_shard_zero());
|
||||
let key = slru_block_to_key(kind, segno, blknum);
|
||||
if !key.is_valid_key_on_write_path() {
|
||||
anyhow::bail!(
|
||||
@@ -1853,6 +1866,8 @@ impl<'a> DatadirModification<'a> {
|
||||
nblocks: BlockNumber,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
assert!(self.tline.tenant_shard_id.is_shard_zero());
|
||||
|
||||
// Add it to the directory entry
|
||||
let dir_key = slru_dir_to_key(kind);
|
||||
let buf = self.get(dir_key, ctx).await?;
|
||||
@@ -1885,6 +1900,8 @@ impl<'a> DatadirModification<'a> {
|
||||
segno: u32,
|
||||
nblocks: BlockNumber,
|
||||
) -> anyhow::Result<()> {
|
||||
assert!(self.tline.tenant_shard_id.is_shard_zero());
|
||||
|
||||
// Put size
|
||||
let size_key = slru_segment_size_to_key(kind, segno);
|
||||
let buf = nblocks.to_le_bytes();
|
||||
|
||||
@@ -129,22 +129,23 @@ impl Flow {
|
||||
}
|
||||
|
||||
// Import SLRUs
|
||||
|
||||
// pg_xact (01:00 keyspace)
|
||||
self.import_slru(SlruKind::Clog, &self.storage.pgdata().join("pg_xact"))
|
||||
if self.timeline.tenant_shard_id.is_shard_zero() {
|
||||
// pg_xact (01:00 keyspace)
|
||||
self.import_slru(SlruKind::Clog, &self.storage.pgdata().join("pg_xact"))
|
||||
.await?;
|
||||
// pg_multixact/members (01:01 keyspace)
|
||||
self.import_slru(
|
||||
SlruKind::MultiXactMembers,
|
||||
&self.storage.pgdata().join("pg_multixact/members"),
|
||||
)
|
||||
.await?;
|
||||
// pg_multixact/members (01:01 keyspace)
|
||||
self.import_slru(
|
||||
SlruKind::MultiXactMembers,
|
||||
&self.storage.pgdata().join("pg_multixact/members"),
|
||||
)
|
||||
.await?;
|
||||
// pg_multixact/offsets (01:02 keyspace)
|
||||
self.import_slru(
|
||||
SlruKind::MultiXactOffsets,
|
||||
&self.storage.pgdata().join("pg_multixact/offsets"),
|
||||
)
|
||||
.await?;
|
||||
// pg_multixact/offsets (01:02 keyspace)
|
||||
self.import_slru(
|
||||
SlruKind::MultiXactOffsets,
|
||||
&self.storage.pgdata().join("pg_multixact/offsets"),
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
// Import pg_twophase.
|
||||
// TODO: as empty
|
||||
@@ -302,6 +303,8 @@ impl Flow {
|
||||
}
|
||||
|
||||
async fn import_slru(&mut self, kind: SlruKind, path: &RemotePath) -> anyhow::Result<()> {
|
||||
assert!(self.timeline.tenant_shard_id.is_shard_zero());
|
||||
|
||||
let segments = self.storage.listfilesindir(path).await?;
|
||||
let segments: Vec<(String, u32, usize)> = segments
|
||||
.into_iter()
|
||||
@@ -337,7 +340,6 @@ impl Flow {
|
||||
debug!(%p, segno=%segno, %size, %start_key, %end_key, "scheduling SLRU segment");
|
||||
self.tasks
|
||||
.push(AnyImportTask::SlruBlocks(ImportSlruBlocksTask::new(
|
||||
*self.timeline.get_shard_identity(),
|
||||
start_key..end_key,
|
||||
&p,
|
||||
self.storage.clone(),
|
||||
@@ -631,21 +633,14 @@ impl ImportTask for ImportRelBlocksTask {
|
||||
}
|
||||
|
||||
struct ImportSlruBlocksTask {
|
||||
shard_identity: ShardIdentity,
|
||||
key_range: Range<Key>,
|
||||
path: RemotePath,
|
||||
storage: RemoteStorageWrapper,
|
||||
}
|
||||
|
||||
impl ImportSlruBlocksTask {
|
||||
fn new(
|
||||
shard_identity: ShardIdentity,
|
||||
key_range: Range<Key>,
|
||||
path: &RemotePath,
|
||||
storage: RemoteStorageWrapper,
|
||||
) -> Self {
|
||||
fn new(key_range: Range<Key>, path: &RemotePath, storage: RemoteStorageWrapper) -> Self {
|
||||
ImportSlruBlocksTask {
|
||||
shard_identity,
|
||||
key_range,
|
||||
path: path.clone(),
|
||||
storage,
|
||||
@@ -673,17 +668,13 @@ impl ImportTask for ImportSlruBlocksTask {
|
||||
let mut file_offset = 0;
|
||||
while blknum < end_blk {
|
||||
let key = slru_block_to_key(kind, segno, blknum);
|
||||
assert!(
|
||||
!self.shard_identity.is_key_disposable(&key),
|
||||
"SLRU keys need to go into every shard"
|
||||
);
|
||||
let buf = &buf[file_offset..(file_offset + 8192)];
|
||||
file_offset += 8192;
|
||||
layer_writer
|
||||
.put_image(key, Bytes::copy_from_slice(buf), ctx)
|
||||
.await?;
|
||||
blknum += 1;
|
||||
nimages += 1;
|
||||
blknum += 1;
|
||||
}
|
||||
Ok(nimages)
|
||||
}
|
||||
|
||||
@@ -1392,6 +1392,10 @@ impl WalIngest {
|
||||
img: Bytes,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<()> {
|
||||
if !self.shard.is_shard_zero() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
self.handle_slru_extend(modification, kind, segno, blknum, ctx)
|
||||
.await?;
|
||||
modification.put_slru_page_image(kind, segno, blknum, img)?;
|
||||
|
||||
Reference in New Issue
Block a user