From 9a9d9beaeef393aa3ad8ba5b7700adfaab857126 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Wed, 31 Jan 2024 21:39:18 +0200 Subject: [PATCH] Download SLRU segments on demand (#6151) ## Problem See https://github.com/neondatabase/cloud/issues/8673 ## Summary of changes Download missed SLRU segments from page server ## Checklist before requesting a review - [ ] I have performed a self-review of my code. - [ ] If it is a core feature, I have added thorough tests. - [ ] Do we need to implement analytics? if so did you add the relevant metrics to the dashboard? - [ ] If this PR requires public announcement, mark it with /release-notes label and add several sentences in this section. ## Checklist before merging - [ ] Do not forget to reformat commit message to not include the above checklist --------- Co-authored-by: Konstantin Knizhnik Co-authored-by: Heikki Linnakangas --- control_plane/src/pageserver.rs | 10 ++ libs/pageserver_api/src/models.rs | 50 ++++++++ libs/pageserver_api/src/reltag.rs | 4 +- pageserver/client/src/page_service.rs | 3 +- pageserver/src/basebackup.rs | 38 +++--- pageserver/src/metrics.rs | 4 +- pageserver/src/page_service.rs | 41 +++++- pageserver/src/pgdatadir_mapping.rs | 23 +++- pageserver/src/tenant.rs | 1 + pageserver/src/tenant/config.rs | 12 ++ pageserver/src/tenant/timeline.rs | 7 + pgxn/neon/pagestore_client.h | 25 ++++ pgxn/neon/pagestore_smgr.c | 120 ++++++++++++++++++ test_runner/fixtures/neon_fixtures.py | 9 ++ test_runner/performance/test_lazy_startup.py | 111 ++++++++++++++++ .../regress/test_attach_tenant_config.py | 1 + trace/src/main.rs | 1 + vendor/postgres-v14 | 2 +- vendor/postgres-v15 | 2 +- vendor/postgres-v16 | 2 +- vendor/revisions.json | 6 +- 21 files changed, 442 insertions(+), 30 deletions(-) create mode 100644 test_runner/performance/test_lazy_startup.py diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index 540d1185a2..a1b0ba4252 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -395,6 +395,11 @@ impl PageServerNode { .transpose() .context("Failed to parse 'gc_feedback' as bool")?, heatmap_period: settings.remove("heatmap_period").map(|x| x.to_string()), + lazy_slru_download: settings + .remove("lazy_slru_download") + .map(|x| x.parse::()) + .transpose() + .context("Failed to parse 'lazy_slru_download' as bool")?, }; if !settings.is_empty() { bail!("Unrecognized tenant settings: {settings:?}") @@ -495,6 +500,11 @@ impl PageServerNode { .transpose() .context("Failed to parse 'gc_feedback' as bool")?, heatmap_period: settings.remove("heatmap_period").map(|x| x.to_string()), + lazy_slru_download: settings + .remove("lazy_slru_download") + .map(|x| x.parse::()) + .transpose() + .context("Failed to parse 'lazy_slru_download' as bool")?, } }; diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index d885553cc7..a7598f9fda 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -8,6 +8,7 @@ use std::{ }; use byteorder::{BigEndian, ReadBytesExt}; +use postgres_ffi::BLCKSZ; use serde::{Deserialize, Serialize}; use serde_with::serde_as; use strum_macros; @@ -271,6 +272,7 @@ pub struct TenantConfig { pub evictions_low_residence_duration_metric_threshold: Option, pub gc_feedback: Option, pub heatmap_period: Option, + pub lazy_slru_download: Option, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] @@ -646,6 +648,7 @@ pub enum PagestreamFeMessage { Nblocks(PagestreamNblocksRequest), GetPage(PagestreamGetPageRequest), DbSize(PagestreamDbSizeRequest), + GetSlruSegment(PagestreamGetSlruSegmentRequest), } // Wrapped in libpq CopyData @@ -656,6 +659,7 @@ pub enum PagestreamBeMessage { GetPage(PagestreamGetPageResponse), Error(PagestreamErrorResponse), DbSize(PagestreamDbSizeResponse), + GetSlruSegment(PagestreamGetSlruSegmentResponse), } // Keep in sync with `pagestore_client.h` @@ -666,6 +670,7 @@ enum PagestreamBeMessageTag { GetPage = 102, Error = 103, DbSize = 104, + GetSlruSegment = 105, } impl TryFrom for PagestreamBeMessageTag { type Error = u8; @@ -676,6 +681,7 @@ impl TryFrom for PagestreamBeMessageTag { 102 => Ok(PagestreamBeMessageTag::GetPage), 103 => Ok(PagestreamBeMessageTag::Error), 104 => Ok(PagestreamBeMessageTag::DbSize), + 105 => Ok(PagestreamBeMessageTag::GetSlruSegment), _ => Err(value), } } @@ -710,6 +716,14 @@ pub struct PagestreamDbSizeRequest { pub dbnode: u32, } +#[derive(Debug, PartialEq, Eq)] +pub struct PagestreamGetSlruSegmentRequest { + pub latest: bool, + pub lsn: Lsn, + pub kind: u8, + pub segno: u32, +} + #[derive(Debug)] pub struct PagestreamExistsResponse { pub exists: bool, @@ -725,6 +739,11 @@ pub struct PagestreamGetPageResponse { pub page: Bytes, } +#[derive(Debug)] +pub struct PagestreamGetSlruSegmentResponse { + pub segment: Bytes, +} + #[derive(Debug)] pub struct PagestreamErrorResponse { pub message: String, @@ -788,6 +807,14 @@ impl PagestreamFeMessage { bytes.put_u64(req.lsn.0); bytes.put_u32(req.dbnode); } + + Self::GetSlruSegment(req) => { + bytes.put_u8(4); + bytes.put_u8(u8::from(req.latest)); + bytes.put_u64(req.lsn.0); + bytes.put_u8(req.kind); + bytes.put_u32(req.segno); + } } bytes.into() @@ -838,6 +865,14 @@ impl PagestreamFeMessage { lsn: Lsn::from(body.read_u64::()?), dbnode: body.read_u32::()?, })), + 4 => Ok(PagestreamFeMessage::GetSlruSegment( + PagestreamGetSlruSegmentRequest { + latest: body.read_u8()? != 0, + lsn: Lsn::from(body.read_u64::()?), + kind: body.read_u8()?, + segno: body.read_u32::()?, + }, + )), _ => bail!("unknown smgr message tag: {:?}", msg_tag), } } @@ -873,6 +908,12 @@ impl PagestreamBeMessage { bytes.put_u8(Tag::DbSize as u8); bytes.put_i64(resp.db_size); } + + Self::GetSlruSegment(resp) => { + bytes.put_u8(Tag::GetSlruSegment as u8); + bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32); + bytes.put(&resp.segment[..]); + } } bytes.into() @@ -913,6 +954,14 @@ impl PagestreamBeMessage { let db_size = buf.read_i64::()?; Self::DbSize(PagestreamDbSizeResponse { db_size }) } + Tag::GetSlruSegment => { + let n_blocks = buf.read_u32::()?; + let mut segment = vec![0; n_blocks as usize * BLCKSZ as usize]; + buf.read_exact(&mut segment)?; + Self::GetSlruSegment(PagestreamGetSlruSegmentResponse { + segment: segment.into(), + }) + } }; let remaining = buf.into_inner(); if !remaining.is_empty() { @@ -931,6 +980,7 @@ impl PagestreamBeMessage { Self::GetPage(_) => "GetPage", Self::Error(_) => "Error", Self::DbSize(_) => "DbSize", + Self::GetSlruSegment(_) => "GetSlruSegment", } } } diff --git a/libs/pageserver_api/src/reltag.rs b/libs/pageserver_api/src/reltag.rs index 3f37af600d..8eb848a514 100644 --- a/libs/pageserver_api/src/reltag.rs +++ b/libs/pageserver_api/src/reltag.rs @@ -123,9 +123,11 @@ impl RelTag { PartialOrd, Ord, strum_macros::EnumIter, + strum_macros::FromRepr, )] +#[repr(u8)] pub enum SlruKind { - Clog, + Clog = 0, MultiXactMembers, MultiXactOffsets, } diff --git a/pageserver/client/src/page_service.rs b/pageserver/client/src/page_service.rs index ff542670f1..49175b3b90 100644 --- a/pageserver/client/src/page_service.rs +++ b/pageserver/client/src/page_service.rs @@ -156,7 +156,8 @@ impl PagestreamClient { PagestreamBeMessage::Error(e) => anyhow::bail!("Error: {:?}", e), PagestreamBeMessage::Exists(_) | PagestreamBeMessage::Nblocks(_) - | PagestreamBeMessage::DbSize(_) => { + | PagestreamBeMessage::DbSize(_) + | PagestreamBeMessage::GetSlruSegment(_) => { anyhow::bail!( "unexpected be message kind in response to getpage request: {}", msg.kind() diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index 009deff0aa..7edfab75d4 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -222,6 +222,8 @@ where async fn send_tarball(mut self) -> anyhow::Result<()> { // TODO include checksum + let lazy_slru_download = self.timeline.get_lazy_slru_download() && !self.full_backup; + // Create pgdata subdirs structure for dir in PGDATA_SUBDIRS.iter() { let header = new_tar_header_dir(dir)?; @@ -248,29 +250,29 @@ where .context("could not add config file to basebackup tarball")?; } } - - // Gather non-relational files from object storage pages. - let slru_partitions = self - .timeline - .get_slru_keyspace(Version::Lsn(self.lsn), self.ctx) - .await? - .partition(Timeline::MAX_GET_VECTORED_KEYS * BLCKSZ as u64); - - let mut slru_builder = SlruSegmentsBuilder::new(&mut self.ar); - - for part in slru_partitions.parts { - let blocks = self + if !lazy_slru_download { + // Gather non-relational files from object storage pages. + let slru_partitions = self .timeline - .get_vectored(&part.ranges, self.lsn, self.ctx) - .await?; + .get_slru_keyspace(Version::Lsn(self.lsn), self.ctx) + .await? + .partition(Timeline::MAX_GET_VECTORED_KEYS * BLCKSZ as u64); - for (key, block) in blocks { - slru_builder.add_block(&key, block?).await?; + let mut slru_builder = SlruSegmentsBuilder::new(&mut self.ar); + + for part in slru_partitions.parts { + let blocks = self + .timeline + .get_vectored(&part.ranges, self.lsn, self.ctx) + .await?; + + for (key, block) in blocks { + slru_builder.add_block(&key, block?).await?; + } } + slru_builder.finish().await?; } - slru_builder.finish().await?; - let mut min_restart_lsn: Lsn = Lsn::MAX; // Create tablespace directories for ((spcnode, dbnode), has_relmap_file) in diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 9b3679e3c2..ed204cb48c 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -1043,6 +1043,7 @@ pub enum SmgrQueryType { GetRelSize, GetPageAtLsn, GetDbSize, + GetSlruSegment, } #[derive(Debug)] @@ -1159,11 +1160,12 @@ mod smgr_query_time_tests { #[test] fn op_label_name() { use super::SmgrQueryType::*; - let expect: [(super::SmgrQueryType, &'static str); 4] = [ + let expect: [(super::SmgrQueryType, &'static str); 5] = [ (GetRelExists, "get_rel_exists"), (GetRelSize, "get_rel_size"), (GetPageAtLsn, "get_page_at_lsn"), (GetDbSize, "get_db_size"), + (GetSlruSegment, "get_slru_segment"), ]; for (op, expect) in expect { let actual: &'static str = op.into(); diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 65191334a6..754c021c88 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -22,7 +22,8 @@ use pageserver_api::models::{ PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse, PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse, PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse, - PagestreamNblocksRequest, PagestreamNblocksResponse, + PagestreamGetSlruSegmentRequest, PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest, + PagestreamNblocksResponse, }; use pageserver_api::shard::ShardIndex; use pageserver_api::shard::{ShardCount, ShardNumber}; @@ -74,8 +75,8 @@ use crate::tenant::GetTimelineError; use crate::tenant::PageReconstructError; use crate::tenant::Timeline; use crate::trace::Tracer; - use pageserver_api::key::rel_block_to_key; +use pageserver_api::reltag::SlruKind; use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID; use postgres_ffi::BLCKSZ; @@ -647,6 +648,15 @@ impl PageServerHandler { span, ) } + PagestreamFeMessage::GetSlruSegment(req) => { + let span = tracing::info_span!("handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.lsn); + ( + self.handle_get_slru_segment_request(tenant_id, timeline_id, &req, &ctx) + .instrument(span.clone()) + .await, + span, + ) + } }; match response { @@ -1137,6 +1147,33 @@ impl PageServerHandler { })) } + async fn handle_get_slru_segment_request( + &mut self, + tenant_id: TenantId, + timeline_id: TimelineId, + req: &PagestreamGetSlruSegmentRequest, + ctx: &RequestContext, + ) -> Result { + let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?; + + let _timer = timeline + .query_metrics + .start_timer(metrics::SmgrQueryType::GetSlruSegment); + + let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); + let lsn = + Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx) + .await?; + + let kind = SlruKind::from_repr(req.kind) + .ok_or(PageStreamError::BadRequest("invalid SLRU kind".into()))?; + let segment = timeline.get_slru_segment(kind, req.segno, lsn, ctx).await?; + + Ok(PagestreamBeMessage::GetSlruSegment( + PagestreamGetSlruSegmentResponse { segment }, + )) + } + #[allow(clippy::too_many_arguments)] #[instrument(skip_all, fields(?lsn, ?prev_lsn, %full_backup))] async fn handle_basebackup_request( diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index b65fe1eddd..a36785a69f 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -12,7 +12,7 @@ use crate::keyspace::{KeySpace, KeySpaceAccum}; use crate::repository::*; use crate::walrecord::NeonWalRecord; use anyhow::{ensure, Context}; -use bytes::{Buf, Bytes}; +use bytes::{Buf, Bytes, BytesMut}; use pageserver_api::key::{ dbdir_key_range, is_rel_block_key, is_slru_block_key, rel_block_to_key, rel_dir_to_key, rel_key_range, rel_size_to_key, relmap_file_key, slru_block_to_key, slru_dir_to_key, @@ -321,6 +321,27 @@ impl Timeline { } } + /// Get the whole SLRU segment + pub(crate) async fn get_slru_segment( + &self, + kind: SlruKind, + segno: u32, + lsn: Lsn, + ctx: &RequestContext, + ) -> Result { + let n_blocks = self + .get_slru_segment_size(kind, segno, Version::Lsn(lsn), ctx) + .await?; + let mut segment = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize); + for blkno in 0..n_blocks { + let block = self + .get_slru_page_at_lsn(kind, segno, blkno, lsn, ctx) + .await?; + segment.extend_from_slice(&block[..BLCKSZ as usize]); + } + Ok(segment.freeze()) + } + /// Look up given SLRU page version. pub(crate) async fn get_slru_page_at_lsn( &self, diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 7a9fef43d2..681fd296ae 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -3903,6 +3903,7 @@ pub(crate) mod harness { ), gc_feedback: Some(tenant_conf.gc_feedback), heatmap_period: Some(tenant_conf.heatmap_period), + lazy_slru_download: Some(tenant_conf.lazy_slru_download), } } } diff --git a/pageserver/src/tenant/config.rs b/pageserver/src/tenant/config.rs index c44164c12d..63bd56cf5f 100644 --- a/pageserver/src/tenant/config.rs +++ b/pageserver/src/tenant/config.rs @@ -345,6 +345,9 @@ pub struct TenantConf { /// may be disabled if a Tenant will not have secondary locations: only secondary /// locations will use the heatmap uploaded by attached locations. pub heatmap_period: Duration, + + /// If true then SLRU segments are dowloaded on demand, if false SLRU segments are included in basebackup + pub lazy_slru_download: bool, } /// Same as TenantConf, but this struct preserves the information about @@ -430,6 +433,10 @@ pub struct TenantConfOpt { #[serde(with = "humantime_serde")] #[serde(default)] pub heatmap_period: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(default)] + pub lazy_slru_download: Option, } impl TenantConfOpt { @@ -475,6 +482,9 @@ impl TenantConfOpt { .unwrap_or(global_conf.evictions_low_residence_duration_metric_threshold), gc_feedback: self.gc_feedback.unwrap_or(global_conf.gc_feedback), heatmap_period: self.heatmap_period.unwrap_or(global_conf.heatmap_period), + lazy_slru_download: self + .lazy_slru_download + .unwrap_or(global_conf.lazy_slru_download), } } } @@ -513,6 +523,7 @@ impl Default for TenantConf { .expect("cannot parse default evictions_low_residence_duration_metric_threshold"), gc_feedback: false, heatmap_period: Duration::ZERO, + lazy_slru_download: false, } } } @@ -584,6 +595,7 @@ impl From for models::TenantConfig { .map(humantime), gc_feedback: value.gc_feedback, heatmap_period: value.heatmap_period.map(humantime), + lazy_slru_download: value.lazy_slru_download, } } } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 70c6ee2042..fc908ad299 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1287,6 +1287,13 @@ const REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE: u64 = 10; // Private functions impl Timeline { + pub fn get_lazy_slru_download(&self) -> bool { + let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf; + tenant_conf + .lazy_slru_download + .unwrap_or(self.conf.default_tenant_conf.lazy_slru_download) + } + fn get_checkpoint_distance(&self) -> u64 { let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf; tenant_conf diff --git a/pgxn/neon/pagestore_client.h b/pgxn/neon/pagestore_client.h index 8c02f357bc..2889ffacae 100644 --- a/pgxn/neon/pagestore_client.h +++ b/pgxn/neon/pagestore_client.h @@ -15,6 +15,7 @@ #include "neon_pgversioncompat.h" +#include "access/slru.h" #include "access/xlogdefs.h" #include RELFILEINFO_HDR #include "lib/stringinfo.h" @@ -34,6 +35,7 @@ typedef enum T_NeonNblocksRequest, T_NeonGetPageRequest, T_NeonDbSizeRequest, + T_NeonGetSlruSegmentRequest, /* pagestore -> pagestore_client */ T_NeonExistsResponse = 100, @@ -41,6 +43,7 @@ typedef enum T_NeonGetPageResponse, T_NeonErrorResponse, T_NeonDbSizeResponse, + T_NeonGetSlruSegmentResponse, } NeonMessageTag; /* base struct for c-style inheritance */ @@ -59,6 +62,13 @@ typedef struct (errmsg(NEON_TAG "[shard %d] " fmt, shard_no, ##__VA_ARGS__), \ errhidestmt(true), errhidecontext(true), errposition(0), internalerrposition(0))) +/* SLRUs downloadable from page server */ +typedef enum { + SLRU_CLOG, + SLRU_MULTIXACT_MEMBERS, + SLRU_MULTIXACT_OFFSETS +} SlruKind; + /* * supertype of all the Neon*Request structs below * @@ -101,6 +111,13 @@ typedef struct BlockNumber blkno; } NeonGetPageRequest; +typedef struct +{ + NeonRequest req; + SlruKind kind; + int segno; +} NeonGetSlruSegmentRequest; + /* supertype of all the Neon*Response structs below */ typedef struct { @@ -140,6 +157,14 @@ typedef struct * message */ } NeonErrorResponse; +typedef struct +{ + NeonMessageTag tag; + int n_blocks; + char data[BLCKSZ * SLRU_PAGES_PER_SEGMENT]; +} NeonGetSlruSegmentResponse; + + extern StringInfoData nm_pack_request(NeonRequest *msg); extern NeonResponse *nm_unpack_response(StringInfo s); extern char *nm_to_string(NeonMessage *msg); diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index 1fa802e6f4..63e8b8dc1f 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -1043,12 +1043,25 @@ nm_pack_request(NeonRequest *msg) break; } + case T_NeonGetSlruSegmentRequest: + { + NeonGetSlruSegmentRequest *msg_req = (NeonGetSlruSegmentRequest *) msg; + + pq_sendbyte(&s, msg_req->req.latest); + pq_sendint64(&s, msg_req->req.lsn); + pq_sendbyte(&s, msg_req->kind); + pq_sendint32(&s, msg_req->segno); + + break; + } + /* pagestore -> pagestore_client. We never need to create these. */ case T_NeonExistsResponse: case T_NeonNblocksResponse: case T_NeonGetPageResponse: case T_NeonErrorResponse: case T_NeonDbSizeResponse: + case T_NeonGetSlruSegmentResponse: default: neon_log(ERROR, "unexpected neon message tag 0x%02x", msg->tag); break; @@ -1135,6 +1148,20 @@ nm_unpack_response(StringInfo s) break; } + case T_NeonGetSlruSegmentResponse: + { + NeonGetSlruSegmentResponse *msg_resp; + int n_blocks = pq_getmsgint(s, 4); + msg_resp = palloc(sizeof(NeonGetSlruSegmentResponse)); + msg_resp->tag = tag; + msg_resp->n_blocks = n_blocks; + memcpy(msg_resp->data, pq_getmsgbytes(s, n_blocks * BLCKSZ), n_blocks * BLCKSZ); + pq_getmsgend(s); + + resp = (NeonResponse *) msg_resp; + break; + } + /* * pagestore_client -> pagestore * @@ -1144,6 +1171,7 @@ nm_unpack_response(StringInfo s) case T_NeonNblocksRequest: case T_NeonGetPageRequest: case T_NeonDbSizeRequest: + case T_NeonGetSlruSegmentRequest: default: neon_log(ERROR, "unexpected neon message tag 0x%02x", tag); break; @@ -1213,7 +1241,18 @@ nm_to_string(NeonMessage *msg) appendStringInfoChar(&s, '}'); break; } + case T_NeonGetSlruSegmentRequest: + { + NeonGetSlruSegmentRequest *msg_req = (NeonGetSlruSegmentRequest *) msg; + appendStringInfoString(&s, "{\"type\": \"NeonGetSlruSegmentRequest\""); + appendStringInfo(&s, ", \"kind\": %u", msg_req->kind); + appendStringInfo(&s, ", \"segno\": %u", msg_req->segno); + appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn)); + appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest); + appendStringInfoChar(&s, '}'); + break; + } /* pagestore -> pagestore_client */ case T_NeonExistsResponse: { @@ -1267,6 +1306,17 @@ nm_to_string(NeonMessage *msg) msg_resp->db_size); appendStringInfoChar(&s, '}'); + break; + } + case T_NeonGetSlruSegmentResponse: + { + NeonGetSlruSegmentResponse *msg_resp = (NeonGetSlruSegmentResponse *) msg; + + appendStringInfoString(&s, "{\"type\": \"NeonGetSlruSegmentResponse\""); + appendStringInfo(&s, ", \"n_blocks\": %u}", + msg_resp->n_blocks); + appendStringInfoChar(&s, '}'); + break; } @@ -2739,6 +2789,74 @@ neon_end_unlogged_build(SMgrRelation reln) unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS; } +#define STRPREFIX(str, prefix) (strncmp(str, prefix, strlen(prefix)) == 0) + +static int +neon_read_slru_segment(SMgrRelation reln, const char* path, int segno, void* buffer) +{ + XLogRecPtr request_lsn; + /* + * GetRedoStartLsn() returns LSN of basebackup. + * We need to download SLRU segments only once after node startup, + * then SLRUs are maintained locally. + */ + request_lsn = GetRedoStartLsn(); + request_lsn = nm_adjust_lsn(request_lsn); + SlruKind kind; + + if (STRPREFIX(path, "pg_xact")) + kind = SLRU_CLOG; + else if (STRPREFIX(path, "pg_multixact/members")) + kind = SLRU_MULTIXACT_MEMBERS; + else if (STRPREFIX(path, "pg_multixact/offsets")) + kind = SLRU_MULTIXACT_OFFSETS; + else + return -1; + + NeonResponse *resp; + NeonGetSlruSegmentRequest request = { + .req.tag = T_NeonGetSlruSegmentRequest, + .req.latest = false, + .req.lsn = request_lsn, + + .kind = kind, + .segno = segno + }; + int n_blocks; + shardno_t shard_no = 0; /* All SLRUs are at shard 0 */ + do + { + while (!page_server->send(shard_no, &request.req) || !page_server->flush(shard_no)); + consume_prefetch_responses(); + resp = page_server->receive(shard_no); + } while (resp == NULL); + + switch (resp->tag) + { + case T_NeonGetSlruSegmentResponse: + n_blocks = ((NeonGetSlruSegmentResponse *) resp)->n_blocks; + memcpy(buffer, ((NeonGetSlruSegmentResponse *) resp)->data, n_blocks*BLCKSZ); + break; + + case T_NeonErrorResponse: + ereport(ERROR, + (errcode(ERRCODE_IO_ERROR), + errmsg(NEON_TAG "could not read SLRU %d segment %d at lsn %X/%08X", + kind, + segno, + LSN_FORMAT_ARGS(request_lsn)), + errdetail("page server returned error: %s", + ((NeonErrorResponse *) resp)->message))); + break; + + default: + neon_log(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag); + } + pfree(resp); + + return n_blocks; +} + static void AtEOXact_neon(XactEvent event, void *arg) { @@ -2797,6 +2915,8 @@ static const struct f_smgr neon_smgr = .smgr_start_unlogged_build = neon_start_unlogged_build, .smgr_finish_unlogged_build_phase_1 = neon_finish_unlogged_build_phase_1, .smgr_end_unlogged_build = neon_end_unlogged_build, + + .smgr_read_slru_segment = neon_read_slru_segment, }; const f_smgr * diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 5be7551a1e..e2a2291dbc 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -3980,8 +3980,17 @@ def check_restored_datadir_content(test_output_dir: Path, env: NeonEnv, endpoint # list files we're going to compare assert endpoint.pgdata_dir pgdata_files = list_files_to_compare(Path(endpoint.pgdata_dir)) + restored_files = list_files_to_compare(restored_dir_path) + if pgdata_files != restored_files: + # filter pg_xact and multixact files which are downloaded on demand + pgdata_files = [ + f + for f in pgdata_files + if not f.startswith("pg_xact") and not f.startswith("pg_multixact") + ] + # check that file sets are equal assert pgdata_files == restored_files diff --git a/test_runner/performance/test_lazy_startup.py b/test_runner/performance/test_lazy_startup.py new file mode 100644 index 0000000000..1a431e272e --- /dev/null +++ b/test_runner/performance/test_lazy_startup.py @@ -0,0 +1,111 @@ +import pytest +import requests +from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker +from fixtures.neon_fixtures import NeonEnvBuilder + + +# Start and measure duration with huge SLRU segments. +# This test is similar to test_startup_simple, but it creates huge number of transactions +# and records containing this XIDs. Autovacuum is disable for the table to prevent CLOG truncation. +# +# This test runs pretty quickly and can be informative when used in combination +# with emulated network delay. Some useful delay commands: +# +# 1. Add 2msec delay to all localhost traffic +# `sudo tc qdisc add dev lo root handle 1:0 netem delay 2msec` +# +# 2. Test that it works (you should see 4ms ping) +# `ping localhost` +# +# 3. Revert back to normal +# `sudo tc qdisc del dev lo root netem` +# +# NOTE this test might not represent the real startup time because the basebackup +# for a large database might be larger if there's a lof of transaction metadata, +# or safekeepers might need more syncing, or there might be more operations to +# apply during config step, like more users, databases, or extensions. By default +# we load extensions 'neon,pg_stat_statements,timescaledb,pg_cron', but in this +# test we only load neon. +@pytest.mark.timeout(1000) +def test_lazy_startup(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker): + neon_env_builder.num_safekeepers = 3 + env = neon_env_builder.init_start() + + lazy_tenant, _ = env.neon_cli.create_tenant( + conf={ + "lazy_slru_download": "true", + } + ) + eager_tenant, _ = env.neon_cli.create_tenant( + conf={ + "lazy_slru_download": "false", + } + ) + tenants = [lazy_tenant, eager_tenant] + slru = "lazy" + for tenant in tenants: + endpoint = env.endpoints.create_start("main", tenant_id=tenant) + endpoint.safe_psql("CREATE TABLE t (pk integer PRIMARY KEY, x integer)") + endpoint.safe_psql("ALTER TABLE t SET (autovacuum_enabled = false)") + endpoint.safe_psql("INSERT INTO t VALUES (1, 0)") + endpoint.safe_psql( + """ + CREATE PROCEDURE updating() as + $$ + DECLARE + i integer; + BEGIN + FOR i IN 1..10000000 LOOP + UPDATE t SET x = x + 1 WHERE pk=1; + COMMIT; + END LOOP; + END + $$ LANGUAGE plpgsql + """ + ) + endpoint.safe_psql("SET statement_timeout=0") + endpoint.safe_psql("call updating()") + + endpoint.stop() + + # We do two iterations so we can see if the second startup is faster. It should + # be because the compute node should already be configured with roles, databases, + # extensions, etc from the first run. + for i in range(2): + # Start + with zenbenchmark.record_duration(f"{slru}_{i}_start"): + endpoint.start() + + with zenbenchmark.record_duration(f"{slru}_{i}_select"): + sum = endpoint.safe_psql("select sum(x) from t")[0][0] + assert sum == 10000000 + + # Get metrics + metrics = requests.get(f"http://localhost:{endpoint.http_port}/metrics.json").json() + durations = { + "wait_for_spec_ms": f"{slru}_{i}_wait_for_spec", + "sync_safekeepers_ms": f"{slru}_{i}_sync_safekeepers", + "sync_sk_check_ms": f"{slru}_{i}_sync_sk_check", + "basebackup_ms": f"{slru}_{i}_basebackup", + "start_postgres_ms": f"{slru}_{i}_start_postgres", + "config_ms": f"{slru}_{i}_config", + "total_startup_ms": f"{slru}_{i}_total_startup", + } + for key, name in durations.items(): + value = metrics[key] + zenbenchmark.record(name, value, "ms", report=MetricReport.LOWER_IS_BETTER) + + basebackup_bytes = metrics["basebackup_bytes"] + zenbenchmark.record( + f"{slru}_{i}_basebackup_bytes", + basebackup_bytes, + "bytes", + report=MetricReport.LOWER_IS_BETTER, + ) + + # Stop so we can restart + endpoint.stop() + + # Imitate optimizations that console would do for the second start + endpoint.respec(skip_pg_catalog_updates=True) + slru = "eager" diff --git a/test_runner/regress/test_attach_tenant_config.py b/test_runner/regress/test_attach_tenant_config.py index ed389b1aa2..7cdc314658 100644 --- a/test_runner/regress/test_attach_tenant_config.py +++ b/test_runner/regress/test_attach_tenant_config.py @@ -173,6 +173,7 @@ def test_fully_custom_config(positive_env: NeonEnv): "image_creation_threshold": 7, "pitr_interval": "1m", "lagging_wal_timeout": "23m", + "lazy_slru_download": True, "max_lsn_wal_lag": 230000, "min_resident_size_override": 23, "trace_read_requests": True, diff --git a/trace/src/main.rs b/trace/src/main.rs index ddd970e95d..4605c124e9 100644 --- a/trace/src/main.rs +++ b/trace/src/main.rs @@ -60,6 +60,7 @@ fn analyze_trace(mut reader: R) { match msg { PagestreamFeMessage::Exists(_) => {} PagestreamFeMessage::Nblocks(_) => {} + PagestreamFeMessage::GetSlruSegment(_) => {} PagestreamFeMessage::GetPage(req) => { total += 1; diff --git a/vendor/postgres-v14 b/vendor/postgres-v14 index 3de48ce3d9..be7a65fe67 160000 --- a/vendor/postgres-v14 +++ b/vendor/postgres-v14 @@ -1 +1 @@ -Subproject commit 3de48ce3d9c1f4fac1cdc7029487f8db9e537eac +Subproject commit be7a65fe67dc81d85bbcbebb13e00d94715f4b88 diff --git a/vendor/postgres-v15 b/vendor/postgres-v15 index b089a8a02c..81e16cd537 160000 --- a/vendor/postgres-v15 +++ b/vendor/postgres-v15 @@ -1 +1 @@ -Subproject commit b089a8a02c9f6f4379883fddb33cf10a3aa0b14f +Subproject commit 81e16cd537053f49e175d4a08ab7c8aec3d9b535 diff --git a/vendor/postgres-v16 b/vendor/postgres-v16 index cf302768b2..f7ea954989 160000 --- a/vendor/postgres-v16 +++ b/vendor/postgres-v16 @@ -1 +1 @@ -Subproject commit cf302768b2890569956641e0e5ba112ae1445351 +Subproject commit f7ea954989a2e7901f858779cff55259f203479a diff --git a/vendor/revisions.json b/vendor/revisions.json index 1211155b7d..80699839ba 100644 --- a/vendor/revisions.json +++ b/vendor/revisions.json @@ -1,5 +1,5 @@ { - "postgres-v16": "cf302768b2890569956641e0e5ba112ae1445351", - "postgres-v15": "b089a8a02c9f6f4379883fddb33cf10a3aa0b14f", - "postgres-v14": "3de48ce3d9c1f4fac1cdc7029487f8db9e537eac" + "postgres-v16": "f7ea954989a2e7901f858779cff55259f203479a", + "postgres-v15": "81e16cd537053f49e175d4a08ab7c8aec3d9b535", + "postgres-v14": "be7a65fe67dc81d85bbcbebb13e00d94715f4b88" }