Download SLRU segments on demand (#6151)

## Problem

See https://github.com/neondatabase/cloud/issues/8673

## Summary of changes


Download missed SLRU segments from page server

## Checklist before requesting a review

- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

## Checklist before merging

- [ ] Do not forget to reformat commit message to not include the above
checklist

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
This commit is contained in:
Konstantin Knizhnik
2024-01-31 21:39:18 +02:00
committed by GitHub
parent 2bfc831c60
commit 9a9d9beaee
21 changed files with 442 additions and 30 deletions

View File

@@ -395,6 +395,11 @@ impl PageServerNode {
.transpose() .transpose()
.context("Failed to parse 'gc_feedback' as bool")?, .context("Failed to parse 'gc_feedback' as bool")?,
heatmap_period: settings.remove("heatmap_period").map(|x| x.to_string()), heatmap_period: settings.remove("heatmap_period").map(|x| x.to_string()),
lazy_slru_download: settings
.remove("lazy_slru_download")
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'lazy_slru_download' as bool")?,
}; };
if !settings.is_empty() { if !settings.is_empty() {
bail!("Unrecognized tenant settings: {settings:?}") bail!("Unrecognized tenant settings: {settings:?}")
@@ -495,6 +500,11 @@ impl PageServerNode {
.transpose() .transpose()
.context("Failed to parse 'gc_feedback' as bool")?, .context("Failed to parse 'gc_feedback' as bool")?,
heatmap_period: settings.remove("heatmap_period").map(|x| x.to_string()), heatmap_period: settings.remove("heatmap_period").map(|x| x.to_string()),
lazy_slru_download: settings
.remove("lazy_slru_download")
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'lazy_slru_download' as bool")?,
} }
}; };

View File

@@ -8,6 +8,7 @@ use std::{
}; };
use byteorder::{BigEndian, ReadBytesExt}; use byteorder::{BigEndian, ReadBytesExt};
use postgres_ffi::BLCKSZ;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_with::serde_as; use serde_with::serde_as;
use strum_macros; use strum_macros;
@@ -271,6 +272,7 @@ pub struct TenantConfig {
pub evictions_low_residence_duration_metric_threshold: Option<String>, pub evictions_low_residence_duration_metric_threshold: Option<String>,
pub gc_feedback: Option<bool>, pub gc_feedback: Option<bool>,
pub heatmap_period: Option<String>, pub heatmap_period: Option<String>,
pub lazy_slru_download: Option<bool>,
} }
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
@@ -646,6 +648,7 @@ pub enum PagestreamFeMessage {
Nblocks(PagestreamNblocksRequest), Nblocks(PagestreamNblocksRequest),
GetPage(PagestreamGetPageRequest), GetPage(PagestreamGetPageRequest),
DbSize(PagestreamDbSizeRequest), DbSize(PagestreamDbSizeRequest),
GetSlruSegment(PagestreamGetSlruSegmentRequest),
} }
// Wrapped in libpq CopyData // Wrapped in libpq CopyData
@@ -656,6 +659,7 @@ pub enum PagestreamBeMessage {
GetPage(PagestreamGetPageResponse), GetPage(PagestreamGetPageResponse),
Error(PagestreamErrorResponse), Error(PagestreamErrorResponse),
DbSize(PagestreamDbSizeResponse), DbSize(PagestreamDbSizeResponse),
GetSlruSegment(PagestreamGetSlruSegmentResponse),
} }
// Keep in sync with `pagestore_client.h` // Keep in sync with `pagestore_client.h`
@@ -666,6 +670,7 @@ enum PagestreamBeMessageTag {
GetPage = 102, GetPage = 102,
Error = 103, Error = 103,
DbSize = 104, DbSize = 104,
GetSlruSegment = 105,
} }
impl TryFrom<u8> for PagestreamBeMessageTag { impl TryFrom<u8> for PagestreamBeMessageTag {
type Error = u8; type Error = u8;
@@ -676,6 +681,7 @@ impl TryFrom<u8> for PagestreamBeMessageTag {
102 => Ok(PagestreamBeMessageTag::GetPage), 102 => Ok(PagestreamBeMessageTag::GetPage),
103 => Ok(PagestreamBeMessageTag::Error), 103 => Ok(PagestreamBeMessageTag::Error),
104 => Ok(PagestreamBeMessageTag::DbSize), 104 => Ok(PagestreamBeMessageTag::DbSize),
105 => Ok(PagestreamBeMessageTag::GetSlruSegment),
_ => Err(value), _ => Err(value),
} }
} }
@@ -710,6 +716,14 @@ pub struct PagestreamDbSizeRequest {
pub dbnode: u32, pub dbnode: u32,
} }
#[derive(Debug, PartialEq, Eq)]
pub struct PagestreamGetSlruSegmentRequest {
pub latest: bool,
pub lsn: Lsn,
pub kind: u8,
pub segno: u32,
}
#[derive(Debug)] #[derive(Debug)]
pub struct PagestreamExistsResponse { pub struct PagestreamExistsResponse {
pub exists: bool, pub exists: bool,
@@ -725,6 +739,11 @@ pub struct PagestreamGetPageResponse {
pub page: Bytes, pub page: Bytes,
} }
#[derive(Debug)]
pub struct PagestreamGetSlruSegmentResponse {
pub segment: Bytes,
}
#[derive(Debug)] #[derive(Debug)]
pub struct PagestreamErrorResponse { pub struct PagestreamErrorResponse {
pub message: String, pub message: String,
@@ -788,6 +807,14 @@ impl PagestreamFeMessage {
bytes.put_u64(req.lsn.0); bytes.put_u64(req.lsn.0);
bytes.put_u32(req.dbnode); bytes.put_u32(req.dbnode);
} }
Self::GetSlruSegment(req) => {
bytes.put_u8(4);
bytes.put_u8(u8::from(req.latest));
bytes.put_u64(req.lsn.0);
bytes.put_u8(req.kind);
bytes.put_u32(req.segno);
}
} }
bytes.into() bytes.into()
@@ -838,6 +865,14 @@ impl PagestreamFeMessage {
lsn: Lsn::from(body.read_u64::<BigEndian>()?), lsn: Lsn::from(body.read_u64::<BigEndian>()?),
dbnode: body.read_u32::<BigEndian>()?, dbnode: body.read_u32::<BigEndian>()?,
})), })),
4 => Ok(PagestreamFeMessage::GetSlruSegment(
PagestreamGetSlruSegmentRequest {
latest: body.read_u8()? != 0,
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
kind: body.read_u8()?,
segno: body.read_u32::<BigEndian>()?,
},
)),
_ => bail!("unknown smgr message tag: {:?}", msg_tag), _ => bail!("unknown smgr message tag: {:?}", msg_tag),
} }
} }
@@ -873,6 +908,12 @@ impl PagestreamBeMessage {
bytes.put_u8(Tag::DbSize as u8); bytes.put_u8(Tag::DbSize as u8);
bytes.put_i64(resp.db_size); bytes.put_i64(resp.db_size);
} }
Self::GetSlruSegment(resp) => {
bytes.put_u8(Tag::GetSlruSegment as u8);
bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32);
bytes.put(&resp.segment[..]);
}
} }
bytes.into() bytes.into()
@@ -913,6 +954,14 @@ impl PagestreamBeMessage {
let db_size = buf.read_i64::<BigEndian>()?; let db_size = buf.read_i64::<BigEndian>()?;
Self::DbSize(PagestreamDbSizeResponse { db_size }) Self::DbSize(PagestreamDbSizeResponse { db_size })
} }
Tag::GetSlruSegment => {
let n_blocks = buf.read_u32::<BigEndian>()?;
let mut segment = vec![0; n_blocks as usize * BLCKSZ as usize];
buf.read_exact(&mut segment)?;
Self::GetSlruSegment(PagestreamGetSlruSegmentResponse {
segment: segment.into(),
})
}
}; };
let remaining = buf.into_inner(); let remaining = buf.into_inner();
if !remaining.is_empty() { if !remaining.is_empty() {
@@ -931,6 +980,7 @@ impl PagestreamBeMessage {
Self::GetPage(_) => "GetPage", Self::GetPage(_) => "GetPage",
Self::Error(_) => "Error", Self::Error(_) => "Error",
Self::DbSize(_) => "DbSize", Self::DbSize(_) => "DbSize",
Self::GetSlruSegment(_) => "GetSlruSegment",
} }
} }
} }

View File

@@ -123,9 +123,11 @@ impl RelTag {
PartialOrd, PartialOrd,
Ord, Ord,
strum_macros::EnumIter, strum_macros::EnumIter,
strum_macros::FromRepr,
)] )]
#[repr(u8)]
pub enum SlruKind { pub enum SlruKind {
Clog, Clog = 0,
MultiXactMembers, MultiXactMembers,
MultiXactOffsets, MultiXactOffsets,
} }

View File

@@ -156,7 +156,8 @@ impl PagestreamClient {
PagestreamBeMessage::Error(e) => anyhow::bail!("Error: {:?}", e), PagestreamBeMessage::Error(e) => anyhow::bail!("Error: {:?}", e),
PagestreamBeMessage::Exists(_) PagestreamBeMessage::Exists(_)
| PagestreamBeMessage::Nblocks(_) | PagestreamBeMessage::Nblocks(_)
| PagestreamBeMessage::DbSize(_) => { | PagestreamBeMessage::DbSize(_)
| PagestreamBeMessage::GetSlruSegment(_) => {
anyhow::bail!( anyhow::bail!(
"unexpected be message kind in response to getpage request: {}", "unexpected be message kind in response to getpage request: {}",
msg.kind() msg.kind()

View File

@@ -222,6 +222,8 @@ where
async fn send_tarball(mut self) -> anyhow::Result<()> { async fn send_tarball(mut self) -> anyhow::Result<()> {
// TODO include checksum // TODO include checksum
let lazy_slru_download = self.timeline.get_lazy_slru_download() && !self.full_backup;
// Create pgdata subdirs structure // Create pgdata subdirs structure
for dir in PGDATA_SUBDIRS.iter() { for dir in PGDATA_SUBDIRS.iter() {
let header = new_tar_header_dir(dir)?; let header = new_tar_header_dir(dir)?;
@@ -248,29 +250,29 @@ where
.context("could not add config file to basebackup tarball")?; .context("could not add config file to basebackup tarball")?;
} }
} }
if !lazy_slru_download {
// Gather non-relational files from object storage pages. // Gather non-relational files from object storage pages.
let slru_partitions = self let slru_partitions = self
.timeline
.get_slru_keyspace(Version::Lsn(self.lsn), self.ctx)
.await?
.partition(Timeline::MAX_GET_VECTORED_KEYS * BLCKSZ as u64);
let mut slru_builder = SlruSegmentsBuilder::new(&mut self.ar);
for part in slru_partitions.parts {
let blocks = self
.timeline .timeline
.get_vectored(&part.ranges, self.lsn, self.ctx) .get_slru_keyspace(Version::Lsn(self.lsn), self.ctx)
.await?; .await?
.partition(Timeline::MAX_GET_VECTORED_KEYS * BLCKSZ as u64);
for (key, block) in blocks { let mut slru_builder = SlruSegmentsBuilder::new(&mut self.ar);
slru_builder.add_block(&key, block?).await?;
for part in slru_partitions.parts {
let blocks = self
.timeline
.get_vectored(&part.ranges, self.lsn, self.ctx)
.await?;
for (key, block) in blocks {
slru_builder.add_block(&key, block?).await?;
}
} }
slru_builder.finish().await?;
} }
slru_builder.finish().await?;
let mut min_restart_lsn: Lsn = Lsn::MAX; let mut min_restart_lsn: Lsn = Lsn::MAX;
// Create tablespace directories // Create tablespace directories
for ((spcnode, dbnode), has_relmap_file) in for ((spcnode, dbnode), has_relmap_file) in

View File

@@ -1043,6 +1043,7 @@ pub enum SmgrQueryType {
GetRelSize, GetRelSize,
GetPageAtLsn, GetPageAtLsn,
GetDbSize, GetDbSize,
GetSlruSegment,
} }
#[derive(Debug)] #[derive(Debug)]
@@ -1159,11 +1160,12 @@ mod smgr_query_time_tests {
#[test] #[test]
fn op_label_name() { fn op_label_name() {
use super::SmgrQueryType::*; use super::SmgrQueryType::*;
let expect: [(super::SmgrQueryType, &'static str); 4] = [ let expect: [(super::SmgrQueryType, &'static str); 5] = [
(GetRelExists, "get_rel_exists"), (GetRelExists, "get_rel_exists"),
(GetRelSize, "get_rel_size"), (GetRelSize, "get_rel_size"),
(GetPageAtLsn, "get_page_at_lsn"), (GetPageAtLsn, "get_page_at_lsn"),
(GetDbSize, "get_db_size"), (GetDbSize, "get_db_size"),
(GetSlruSegment, "get_slru_segment"),
]; ];
for (op, expect) in expect { for (op, expect) in expect {
let actual: &'static str = op.into(); let actual: &'static str = op.into();

View File

@@ -22,7 +22,8 @@ use pageserver_api::models::{
PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse, PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse, PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse, PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse,
PagestreamNblocksRequest, PagestreamNblocksResponse, PagestreamGetSlruSegmentRequest, PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest,
PagestreamNblocksResponse,
}; };
use pageserver_api::shard::ShardIndex; use pageserver_api::shard::ShardIndex;
use pageserver_api::shard::{ShardCount, ShardNumber}; use pageserver_api::shard::{ShardCount, ShardNumber};
@@ -74,8 +75,8 @@ use crate::tenant::GetTimelineError;
use crate::tenant::PageReconstructError; use crate::tenant::PageReconstructError;
use crate::tenant::Timeline; use crate::tenant::Timeline;
use crate::trace::Tracer; use crate::trace::Tracer;
use pageserver_api::key::rel_block_to_key; use pageserver_api::key::rel_block_to_key;
use pageserver_api::reltag::SlruKind;
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID; use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
use postgres_ffi::BLCKSZ; use postgres_ffi::BLCKSZ;
@@ -647,6 +648,15 @@ impl PageServerHandler {
span, span,
) )
} }
PagestreamFeMessage::GetSlruSegment(req) => {
let span = tracing::info_span!("handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.lsn);
(
self.handle_get_slru_segment_request(tenant_id, timeline_id, &req, &ctx)
.instrument(span.clone())
.await,
span,
)
}
}; };
match response { match response {
@@ -1137,6 +1147,33 @@ impl PageServerHandler {
})) }))
} }
async fn handle_get_slru_segment_request(
&mut self,
tenant_id: TenantId,
timeline_id: TimelineId,
req: &PagestreamGetSlruSegmentRequest,
ctx: &RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> {
let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?;
let _timer = timeline
.query_metrics
.start_timer(metrics::SmgrQueryType::GetSlruSegment);
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn =
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
.await?;
let kind = SlruKind::from_repr(req.kind)
.ok_or(PageStreamError::BadRequest("invalid SLRU kind".into()))?;
let segment = timeline.get_slru_segment(kind, req.segno, lsn, ctx).await?;
Ok(PagestreamBeMessage::GetSlruSegment(
PagestreamGetSlruSegmentResponse { segment },
))
}
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
#[instrument(skip_all, fields(?lsn, ?prev_lsn, %full_backup))] #[instrument(skip_all, fields(?lsn, ?prev_lsn, %full_backup))]
async fn handle_basebackup_request<IO>( async fn handle_basebackup_request<IO>(

View File

@@ -12,7 +12,7 @@ use crate::keyspace::{KeySpace, KeySpaceAccum};
use crate::repository::*; use crate::repository::*;
use crate::walrecord::NeonWalRecord; use crate::walrecord::NeonWalRecord;
use anyhow::{ensure, Context}; use anyhow::{ensure, Context};
use bytes::{Buf, Bytes}; use bytes::{Buf, Bytes, BytesMut};
use pageserver_api::key::{ use pageserver_api::key::{
dbdir_key_range, is_rel_block_key, is_slru_block_key, rel_block_to_key, rel_dir_to_key, dbdir_key_range, is_rel_block_key, is_slru_block_key, rel_block_to_key, rel_dir_to_key,
rel_key_range, rel_size_to_key, relmap_file_key, slru_block_to_key, slru_dir_to_key, rel_key_range, rel_size_to_key, relmap_file_key, slru_block_to_key, slru_dir_to_key,
@@ -321,6 +321,27 @@ impl Timeline {
} }
} }
/// Get the whole SLRU segment
pub(crate) async fn get_slru_segment(
&self,
kind: SlruKind,
segno: u32,
lsn: Lsn,
ctx: &RequestContext,
) -> Result<Bytes, PageReconstructError> {
let n_blocks = self
.get_slru_segment_size(kind, segno, Version::Lsn(lsn), ctx)
.await?;
let mut segment = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize);
for blkno in 0..n_blocks {
let block = self
.get_slru_page_at_lsn(kind, segno, blkno, lsn, ctx)
.await?;
segment.extend_from_slice(&block[..BLCKSZ as usize]);
}
Ok(segment.freeze())
}
/// Look up given SLRU page version. /// Look up given SLRU page version.
pub(crate) async fn get_slru_page_at_lsn( pub(crate) async fn get_slru_page_at_lsn(
&self, &self,

View File

@@ -3903,6 +3903,7 @@ pub(crate) mod harness {
), ),
gc_feedback: Some(tenant_conf.gc_feedback), gc_feedback: Some(tenant_conf.gc_feedback),
heatmap_period: Some(tenant_conf.heatmap_period), heatmap_period: Some(tenant_conf.heatmap_period),
lazy_slru_download: Some(tenant_conf.lazy_slru_download),
} }
} }
} }

View File

@@ -345,6 +345,9 @@ pub struct TenantConf {
/// may be disabled if a Tenant will not have secondary locations: only secondary /// may be disabled if a Tenant will not have secondary locations: only secondary
/// locations will use the heatmap uploaded by attached locations. /// locations will use the heatmap uploaded by attached locations.
pub heatmap_period: Duration, pub heatmap_period: Duration,
/// If true then SLRU segments are dowloaded on demand, if false SLRU segments are included in basebackup
pub lazy_slru_download: bool,
} }
/// Same as TenantConf, but this struct preserves the information about /// Same as TenantConf, but this struct preserves the information about
@@ -430,6 +433,10 @@ pub struct TenantConfOpt {
#[serde(with = "humantime_serde")] #[serde(with = "humantime_serde")]
#[serde(default)] #[serde(default)]
pub heatmap_period: Option<Duration>, pub heatmap_period: Option<Duration>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub lazy_slru_download: Option<bool>,
} }
impl TenantConfOpt { impl TenantConfOpt {
@@ -475,6 +482,9 @@ impl TenantConfOpt {
.unwrap_or(global_conf.evictions_low_residence_duration_metric_threshold), .unwrap_or(global_conf.evictions_low_residence_duration_metric_threshold),
gc_feedback: self.gc_feedback.unwrap_or(global_conf.gc_feedback), gc_feedback: self.gc_feedback.unwrap_or(global_conf.gc_feedback),
heatmap_period: self.heatmap_period.unwrap_or(global_conf.heatmap_period), heatmap_period: self.heatmap_period.unwrap_or(global_conf.heatmap_period),
lazy_slru_download: self
.lazy_slru_download
.unwrap_or(global_conf.lazy_slru_download),
} }
} }
} }
@@ -513,6 +523,7 @@ impl Default for TenantConf {
.expect("cannot parse default evictions_low_residence_duration_metric_threshold"), .expect("cannot parse default evictions_low_residence_duration_metric_threshold"),
gc_feedback: false, gc_feedback: false,
heatmap_period: Duration::ZERO, heatmap_period: Duration::ZERO,
lazy_slru_download: false,
} }
} }
} }
@@ -584,6 +595,7 @@ impl From<TenantConfOpt> for models::TenantConfig {
.map(humantime), .map(humantime),
gc_feedback: value.gc_feedback, gc_feedback: value.gc_feedback,
heatmap_period: value.heatmap_period.map(humantime), heatmap_period: value.heatmap_period.map(humantime),
lazy_slru_download: value.lazy_slru_download,
} }
} }
} }

View File

@@ -1287,6 +1287,13 @@ const REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE: u64 = 10;
// Private functions // Private functions
impl Timeline { impl Timeline {
pub fn get_lazy_slru_download(&self) -> bool {
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
tenant_conf
.lazy_slru_download
.unwrap_or(self.conf.default_tenant_conf.lazy_slru_download)
}
fn get_checkpoint_distance(&self) -> u64 { fn get_checkpoint_distance(&self) -> u64 {
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf; let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
tenant_conf tenant_conf

View File

@@ -15,6 +15,7 @@
#include "neon_pgversioncompat.h" #include "neon_pgversioncompat.h"
#include "access/slru.h"
#include "access/xlogdefs.h" #include "access/xlogdefs.h"
#include RELFILEINFO_HDR #include RELFILEINFO_HDR
#include "lib/stringinfo.h" #include "lib/stringinfo.h"
@@ -34,6 +35,7 @@ typedef enum
T_NeonNblocksRequest, T_NeonNblocksRequest,
T_NeonGetPageRequest, T_NeonGetPageRequest,
T_NeonDbSizeRequest, T_NeonDbSizeRequest,
T_NeonGetSlruSegmentRequest,
/* pagestore -> pagestore_client */ /* pagestore -> pagestore_client */
T_NeonExistsResponse = 100, T_NeonExistsResponse = 100,
@@ -41,6 +43,7 @@ typedef enum
T_NeonGetPageResponse, T_NeonGetPageResponse,
T_NeonErrorResponse, T_NeonErrorResponse,
T_NeonDbSizeResponse, T_NeonDbSizeResponse,
T_NeonGetSlruSegmentResponse,
} NeonMessageTag; } NeonMessageTag;
/* base struct for c-style inheritance */ /* base struct for c-style inheritance */
@@ -59,6 +62,13 @@ typedef struct
(errmsg(NEON_TAG "[shard %d] " fmt, shard_no, ##__VA_ARGS__), \ (errmsg(NEON_TAG "[shard %d] " fmt, shard_no, ##__VA_ARGS__), \
errhidestmt(true), errhidecontext(true), errposition(0), internalerrposition(0))) errhidestmt(true), errhidecontext(true), errposition(0), internalerrposition(0)))
/* SLRUs downloadable from page server */
typedef enum {
SLRU_CLOG,
SLRU_MULTIXACT_MEMBERS,
SLRU_MULTIXACT_OFFSETS
} SlruKind;
/* /*
* supertype of all the Neon*Request structs below * supertype of all the Neon*Request structs below
* *
@@ -101,6 +111,13 @@ typedef struct
BlockNumber blkno; BlockNumber blkno;
} NeonGetPageRequest; } NeonGetPageRequest;
typedef struct
{
NeonRequest req;
SlruKind kind;
int segno;
} NeonGetSlruSegmentRequest;
/* supertype of all the Neon*Response structs below */ /* supertype of all the Neon*Response structs below */
typedef struct typedef struct
{ {
@@ -140,6 +157,14 @@ typedef struct
* message */ * message */
} NeonErrorResponse; } NeonErrorResponse;
typedef struct
{
NeonMessageTag tag;
int n_blocks;
char data[BLCKSZ * SLRU_PAGES_PER_SEGMENT];
} NeonGetSlruSegmentResponse;
extern StringInfoData nm_pack_request(NeonRequest *msg); extern StringInfoData nm_pack_request(NeonRequest *msg);
extern NeonResponse *nm_unpack_response(StringInfo s); extern NeonResponse *nm_unpack_response(StringInfo s);
extern char *nm_to_string(NeonMessage *msg); extern char *nm_to_string(NeonMessage *msg);

View File

@@ -1043,12 +1043,25 @@ nm_pack_request(NeonRequest *msg)
break; break;
} }
case T_NeonGetSlruSegmentRequest:
{
NeonGetSlruSegmentRequest *msg_req = (NeonGetSlruSegmentRequest *) msg;
pq_sendbyte(&s, msg_req->req.latest);
pq_sendint64(&s, msg_req->req.lsn);
pq_sendbyte(&s, msg_req->kind);
pq_sendint32(&s, msg_req->segno);
break;
}
/* pagestore -> pagestore_client. We never need to create these. */ /* pagestore -> pagestore_client. We never need to create these. */
case T_NeonExistsResponse: case T_NeonExistsResponse:
case T_NeonNblocksResponse: case T_NeonNblocksResponse:
case T_NeonGetPageResponse: case T_NeonGetPageResponse:
case T_NeonErrorResponse: case T_NeonErrorResponse:
case T_NeonDbSizeResponse: case T_NeonDbSizeResponse:
case T_NeonGetSlruSegmentResponse:
default: default:
neon_log(ERROR, "unexpected neon message tag 0x%02x", msg->tag); neon_log(ERROR, "unexpected neon message tag 0x%02x", msg->tag);
break; break;
@@ -1135,6 +1148,20 @@ nm_unpack_response(StringInfo s)
break; break;
} }
case T_NeonGetSlruSegmentResponse:
{
NeonGetSlruSegmentResponse *msg_resp;
int n_blocks = pq_getmsgint(s, 4);
msg_resp = palloc(sizeof(NeonGetSlruSegmentResponse));
msg_resp->tag = tag;
msg_resp->n_blocks = n_blocks;
memcpy(msg_resp->data, pq_getmsgbytes(s, n_blocks * BLCKSZ), n_blocks * BLCKSZ);
pq_getmsgend(s);
resp = (NeonResponse *) msg_resp;
break;
}
/* /*
* pagestore_client -> pagestore * pagestore_client -> pagestore
* *
@@ -1144,6 +1171,7 @@ nm_unpack_response(StringInfo s)
case T_NeonNblocksRequest: case T_NeonNblocksRequest:
case T_NeonGetPageRequest: case T_NeonGetPageRequest:
case T_NeonDbSizeRequest: case T_NeonDbSizeRequest:
case T_NeonGetSlruSegmentRequest:
default: default:
neon_log(ERROR, "unexpected neon message tag 0x%02x", tag); neon_log(ERROR, "unexpected neon message tag 0x%02x", tag);
break; break;
@@ -1213,7 +1241,18 @@ nm_to_string(NeonMessage *msg)
appendStringInfoChar(&s, '}'); appendStringInfoChar(&s, '}');
break; break;
} }
case T_NeonGetSlruSegmentRequest:
{
NeonGetSlruSegmentRequest *msg_req = (NeonGetSlruSegmentRequest *) msg;
appendStringInfoString(&s, "{\"type\": \"NeonGetSlruSegmentRequest\"");
appendStringInfo(&s, ", \"kind\": %u", msg_req->kind);
appendStringInfo(&s, ", \"segno\": %u", msg_req->segno);
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest);
appendStringInfoChar(&s, '}');
break;
}
/* pagestore -> pagestore_client */ /* pagestore -> pagestore_client */
case T_NeonExistsResponse: case T_NeonExistsResponse:
{ {
@@ -1267,6 +1306,17 @@ nm_to_string(NeonMessage *msg)
msg_resp->db_size); msg_resp->db_size);
appendStringInfoChar(&s, '}'); appendStringInfoChar(&s, '}');
break;
}
case T_NeonGetSlruSegmentResponse:
{
NeonGetSlruSegmentResponse *msg_resp = (NeonGetSlruSegmentResponse *) msg;
appendStringInfoString(&s, "{\"type\": \"NeonGetSlruSegmentResponse\"");
appendStringInfo(&s, ", \"n_blocks\": %u}",
msg_resp->n_blocks);
appendStringInfoChar(&s, '}');
break; break;
} }
@@ -2739,6 +2789,74 @@ neon_end_unlogged_build(SMgrRelation reln)
unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS; unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
} }
#define STRPREFIX(str, prefix) (strncmp(str, prefix, strlen(prefix)) == 0)
static int
neon_read_slru_segment(SMgrRelation reln, const char* path, int segno, void* buffer)
{
XLogRecPtr request_lsn;
/*
* GetRedoStartLsn() returns LSN of basebackup.
* We need to download SLRU segments only once after node startup,
* then SLRUs are maintained locally.
*/
request_lsn = GetRedoStartLsn();
request_lsn = nm_adjust_lsn(request_lsn);
SlruKind kind;
if (STRPREFIX(path, "pg_xact"))
kind = SLRU_CLOG;
else if (STRPREFIX(path, "pg_multixact/members"))
kind = SLRU_MULTIXACT_MEMBERS;
else if (STRPREFIX(path, "pg_multixact/offsets"))
kind = SLRU_MULTIXACT_OFFSETS;
else
return -1;
NeonResponse *resp;
NeonGetSlruSegmentRequest request = {
.req.tag = T_NeonGetSlruSegmentRequest,
.req.latest = false,
.req.lsn = request_lsn,
.kind = kind,
.segno = segno
};
int n_blocks;
shardno_t shard_no = 0; /* All SLRUs are at shard 0 */
do
{
while (!page_server->send(shard_no, &request.req) || !page_server->flush(shard_no));
consume_prefetch_responses();
resp = page_server->receive(shard_no);
} while (resp == NULL);
switch (resp->tag)
{
case T_NeonGetSlruSegmentResponse:
n_blocks = ((NeonGetSlruSegmentResponse *) resp)->n_blocks;
memcpy(buffer, ((NeonGetSlruSegmentResponse *) resp)->data, n_blocks*BLCKSZ);
break;
case T_NeonErrorResponse:
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),
errmsg(NEON_TAG "could not read SLRU %d segment %d at lsn %X/%08X",
kind,
segno,
LSN_FORMAT_ARGS(request_lsn)),
errdetail("page server returned error: %s",
((NeonErrorResponse *) resp)->message)));
break;
default:
neon_log(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag);
}
pfree(resp);
return n_blocks;
}
static void static void
AtEOXact_neon(XactEvent event, void *arg) AtEOXact_neon(XactEvent event, void *arg)
{ {
@@ -2797,6 +2915,8 @@ static const struct f_smgr neon_smgr =
.smgr_start_unlogged_build = neon_start_unlogged_build, .smgr_start_unlogged_build = neon_start_unlogged_build,
.smgr_finish_unlogged_build_phase_1 = neon_finish_unlogged_build_phase_1, .smgr_finish_unlogged_build_phase_1 = neon_finish_unlogged_build_phase_1,
.smgr_end_unlogged_build = neon_end_unlogged_build, .smgr_end_unlogged_build = neon_end_unlogged_build,
.smgr_read_slru_segment = neon_read_slru_segment,
}; };
const f_smgr * const f_smgr *

View File

@@ -3980,8 +3980,17 @@ def check_restored_datadir_content(test_output_dir: Path, env: NeonEnv, endpoint
# list files we're going to compare # list files we're going to compare
assert endpoint.pgdata_dir assert endpoint.pgdata_dir
pgdata_files = list_files_to_compare(Path(endpoint.pgdata_dir)) pgdata_files = list_files_to_compare(Path(endpoint.pgdata_dir))
restored_files = list_files_to_compare(restored_dir_path) restored_files = list_files_to_compare(restored_dir_path)
if pgdata_files != restored_files:
# filter pg_xact and multixact files which are downloaded on demand
pgdata_files = [
f
for f in pgdata_files
if not f.startswith("pg_xact") and not f.startswith("pg_multixact")
]
# check that file sets are equal # check that file sets are equal
assert pgdata_files == restored_files assert pgdata_files == restored_files

View File

@@ -0,0 +1,111 @@
import pytest
import requests
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
from fixtures.neon_fixtures import NeonEnvBuilder
# Start and measure duration with huge SLRU segments.
# This test is similar to test_startup_simple, but it creates huge number of transactions
# and records containing this XIDs. Autovacuum is disable for the table to prevent CLOG truncation.
#
# This test runs pretty quickly and can be informative when used in combination
# with emulated network delay. Some useful delay commands:
#
# 1. Add 2msec delay to all localhost traffic
# `sudo tc qdisc add dev lo root handle 1:0 netem delay 2msec`
#
# 2. Test that it works (you should see 4ms ping)
# `ping localhost`
#
# 3. Revert back to normal
# `sudo tc qdisc del dev lo root netem`
#
# NOTE this test might not represent the real startup time because the basebackup
# for a large database might be larger if there's a lof of transaction metadata,
# or safekeepers might need more syncing, or there might be more operations to
# apply during config step, like more users, databases, or extensions. By default
# we load extensions 'neon,pg_stat_statements,timescaledb,pg_cron', but in this
# test we only load neon.
@pytest.mark.timeout(1000)
def test_lazy_startup(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker):
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
lazy_tenant, _ = env.neon_cli.create_tenant(
conf={
"lazy_slru_download": "true",
}
)
eager_tenant, _ = env.neon_cli.create_tenant(
conf={
"lazy_slru_download": "false",
}
)
tenants = [lazy_tenant, eager_tenant]
slru = "lazy"
for tenant in tenants:
endpoint = env.endpoints.create_start("main", tenant_id=tenant)
endpoint.safe_psql("CREATE TABLE t (pk integer PRIMARY KEY, x integer)")
endpoint.safe_psql("ALTER TABLE t SET (autovacuum_enabled = false)")
endpoint.safe_psql("INSERT INTO t VALUES (1, 0)")
endpoint.safe_psql(
"""
CREATE PROCEDURE updating() as
$$
DECLARE
i integer;
BEGIN
FOR i IN 1..10000000 LOOP
UPDATE t SET x = x + 1 WHERE pk=1;
COMMIT;
END LOOP;
END
$$ LANGUAGE plpgsql
"""
)
endpoint.safe_psql("SET statement_timeout=0")
endpoint.safe_psql("call updating()")
endpoint.stop()
# We do two iterations so we can see if the second startup is faster. It should
# be because the compute node should already be configured with roles, databases,
# extensions, etc from the first run.
for i in range(2):
# Start
with zenbenchmark.record_duration(f"{slru}_{i}_start"):
endpoint.start()
with zenbenchmark.record_duration(f"{slru}_{i}_select"):
sum = endpoint.safe_psql("select sum(x) from t")[0][0]
assert sum == 10000000
# Get metrics
metrics = requests.get(f"http://localhost:{endpoint.http_port}/metrics.json").json()
durations = {
"wait_for_spec_ms": f"{slru}_{i}_wait_for_spec",
"sync_safekeepers_ms": f"{slru}_{i}_sync_safekeepers",
"sync_sk_check_ms": f"{slru}_{i}_sync_sk_check",
"basebackup_ms": f"{slru}_{i}_basebackup",
"start_postgres_ms": f"{slru}_{i}_start_postgres",
"config_ms": f"{slru}_{i}_config",
"total_startup_ms": f"{slru}_{i}_total_startup",
}
for key, name in durations.items():
value = metrics[key]
zenbenchmark.record(name, value, "ms", report=MetricReport.LOWER_IS_BETTER)
basebackup_bytes = metrics["basebackup_bytes"]
zenbenchmark.record(
f"{slru}_{i}_basebackup_bytes",
basebackup_bytes,
"bytes",
report=MetricReport.LOWER_IS_BETTER,
)
# Stop so we can restart
endpoint.stop()
# Imitate optimizations that console would do for the second start
endpoint.respec(skip_pg_catalog_updates=True)
slru = "eager"

View File

@@ -173,6 +173,7 @@ def test_fully_custom_config(positive_env: NeonEnv):
"image_creation_threshold": 7, "image_creation_threshold": 7,
"pitr_interval": "1m", "pitr_interval": "1m",
"lagging_wal_timeout": "23m", "lagging_wal_timeout": "23m",
"lazy_slru_download": True,
"max_lsn_wal_lag": 230000, "max_lsn_wal_lag": 230000,
"min_resident_size_override": 23, "min_resident_size_override": 23,
"trace_read_requests": True, "trace_read_requests": True,

View File

@@ -60,6 +60,7 @@ fn analyze_trace<R: std::io::Read>(mut reader: R) {
match msg { match msg {
PagestreamFeMessage::Exists(_) => {} PagestreamFeMessage::Exists(_) => {}
PagestreamFeMessage::Nblocks(_) => {} PagestreamFeMessage::Nblocks(_) => {}
PagestreamFeMessage::GetSlruSegment(_) => {}
PagestreamFeMessage::GetPage(req) => { PagestreamFeMessage::GetPage(req) => {
total += 1; total += 1;

View File

@@ -1,5 +1,5 @@
{ {
"postgres-v16": "cf302768b2890569956641e0e5ba112ae1445351", "postgres-v16": "f7ea954989a2e7901f858779cff55259f203479a",
"postgres-v15": "b089a8a02c9f6f4379883fddb33cf10a3aa0b14f", "postgres-v15": "81e16cd537053f49e175d4a08ab7c8aec3d9b535",
"postgres-v14": "3de48ce3d9c1f4fac1cdc7029487f8db9e537eac" "postgres-v14": "be7a65fe67dc81d85bbcbebb13e00d94715f4b88"
} }