On demand downloading of SLRU segments

This commit is contained in:
Konstantin Knizhnik
2023-12-15 16:16:50 +02:00
parent ac17c2f69c
commit 1cae50eacc
13 changed files with 255 additions and 23 deletions

View File

@@ -5,6 +5,7 @@ use std::{
};
use byteorder::{BigEndian, ReadBytesExt};
use postgres_ffi::BLCKSZ;
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use strum_macros;
@@ -570,6 +571,7 @@ pub enum PagestreamFeMessage {
Nblocks(PagestreamNblocksRequest),
GetPage(PagestreamGetPageRequest),
DbSize(PagestreamDbSizeRequest),
GetSlruSegment(PagestreamGetSlruSegmentRequest),
}
// Wrapped in libpq CopyData
@@ -579,6 +581,7 @@ pub enum PagestreamBeMessage {
GetPage(PagestreamGetPageResponse),
Error(PagestreamErrorResponse),
DbSize(PagestreamDbSizeResponse),
GetSlruSegment(PagestreamGetSlruSegmentResponse),
}
#[derive(Debug, PartialEq, Eq)]
@@ -610,6 +613,14 @@ pub struct PagestreamDbSizeRequest {
pub dbnode: u32,
}
#[derive(Debug, PartialEq, Eq)]
pub struct PagestreamGetSlruSegmentRequest {
pub latest: bool,
pub lsn: Lsn,
pub kind: u8,
pub segno: u32,
}
#[derive(Debug)]
pub struct PagestreamExistsResponse {
pub exists: bool,
@@ -625,6 +636,11 @@ pub struct PagestreamGetPageResponse {
pub page: Bytes,
}
#[derive(Debug)]
pub struct PagestreamGetSlruSegmentResponse {
pub segment: Bytes,
}
#[derive(Debug)]
pub struct PagestreamErrorResponse {
pub message: String,
@@ -677,6 +693,14 @@ impl PagestreamFeMessage {
bytes.put_u64(req.lsn.0);
bytes.put_u32(req.dbnode);
}
Self::GetSlruSegment(req) => {
bytes.put_u8(4);
bytes.put_u8(u8::from(req.latest));
bytes.put_u64(req.lsn.0);
bytes.put_u8(req.kind);
bytes.put_u32(req.segno);
}
}
bytes.into()
@@ -727,6 +751,14 @@ impl PagestreamFeMessage {
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
dbnode: body.read_u32::<BigEndian>()?,
})),
4 => Ok(PagestreamFeMessage::GetSlruSegment(
PagestreamGetSlruSegmentRequest {
latest: body.read_u8()? != 0,
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
kind: body.read_u8()?,
segno: body.read_u32::<BigEndian>()?,
},
)),
_ => bail!("unknown smgr message tag: {:?}", msg_tag),
}
}
@@ -761,6 +793,12 @@ impl PagestreamBeMessage {
bytes.put_u8(104); /* tag from pagestore_client.h */
bytes.put_i64(resp.db_size);
}
Self::GetSlruSegment(resp) => {
bytes.put_u8(105); /* tag from pagestore_client.h */
bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32);
bytes.put(&resp.segment[..]);
}
}
bytes.into()

View File

@@ -108,9 +108,22 @@ impl RelTag {
/// These files are divided into segments, which are divided into
/// pages of the same BLCKSZ as used for relation files.
///
#[derive(Debug, Clone, Copy, Hash, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
#[derive(
Debug,
Clone,
Copy,
strum_macros::FromRepr,
Hash,
Serialize,
Deserialize,
PartialEq,
Eq,
PartialOrd,
Ord,
)]
#[repr(u8)]
pub enum SlruKind {
Clog,
Clog = 0,
MultiXactMembers,
MultiXactOffsets,
}

View File

@@ -139,6 +139,8 @@ where
async fn send_tarball(mut self) -> anyhow::Result<()> {
// TODO include checksum
let on_demand_slru_download = true; // TODO: should it be feature flag, config parameter or whatever else ?
// Create pgdata subdirs structure
for dir in PGDATA_SUBDIRS.iter() {
let header = new_tar_header_dir(dir)?;
@@ -165,19 +167,20 @@ where
.context("could not add config file to basebackup tarball")?;
}
}
// Gather non-relational files from object storage pages.
for kind in [
SlruKind::Clog,
SlruKind::MultiXactOffsets,
SlruKind::MultiXactMembers,
] {
for segno in self
.timeline
.list_slru_segments(kind, self.lsn, self.ctx)
.await?
{
self.add_slru_segment(kind, segno).await?;
if !on_demand_slru_download {
// Gather non-relational files from object storage pages.
for kind in [
SlruKind::Clog,
SlruKind::MultiXactOffsets,
SlruKind::MultiXactMembers,
] {
for segno in self
.timeline
.list_slru_segments(kind, self.lsn, self.ctx)
.await?
{
self.add_slru_segment(kind, segno).await?;
}
}
}

View File

@@ -961,6 +961,7 @@ pub enum SmgrQueryType {
GetRelSize,
GetPageAtLsn,
GetDbSize,
GetSlruSegment,
}
#[derive(Debug)]
@@ -1030,6 +1031,7 @@ mod smgr_query_time_tests {
(GetRelSize, "get_rel_size"),
(GetPageAtLsn, "get_page_at_lsn"),
(GetDbSize, "get_db_size"),
(GetSlruSegment, "get_slru_segment"),
];
for (op, expect) in expect {
let actual: &'static str = op.into();

View File

@@ -19,7 +19,8 @@ use pageserver_api::models::{
PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse,
PagestreamNblocksRequest, PagestreamNblocksResponse,
PagestreamGetSlruSegmentRequest, PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest,
PagestreamNblocksResponse,
};
use postgres_backend::{self, is_expected_io_error, AuthType, PostgresBackend, QueryError};
use pq_proto::framed::ConnectionError;
@@ -64,6 +65,7 @@ use crate::tenant::mgr::ShardSelector;
use crate::tenant::Timeline;
use crate::trace::Tracer;
use pageserver_api::reltag::SlruKind;
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
use postgres_ffi::BLCKSZ;
@@ -518,6 +520,16 @@ impl PageServerHandler {
span,
)
}
PagestreamFeMessage::GetSlruSegment(req) => {
let _timer = metrics.start_timer(metrics::SmgrQueryType::GetSlruSegment);
let span = tracing::info_span!("handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.lsn);
(
self.handle_get_slru_segment_request(&timeline, &req, &ctx)
.instrument(span.clone())
.await,
span,
)
}
};
if let Err(e) = &response {
@@ -862,6 +874,25 @@ impl PageServerHandler {
}))
}
async fn handle_get_slru_segment_request(
&self,
timeline: &Timeline,
req: &PagestreamGetSlruSegmentRequest,
ctx: &RequestContext,
) -> anyhow::Result<PagestreamBeMessage> {
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn =
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
.await?;
let kind = SlruKind::from_repr(req.kind).ok_or(anyhow::anyhow!("invalid SLRU kind"))?;
let segment = timeline.get_slru_segment(kind, req.segno, lsn, ctx).await?;
Ok(PagestreamBeMessage::GetSlruSegment(
PagestreamGetSlruSegmentResponse { segment },
))
}
#[allow(clippy::too_many_arguments)]
#[instrument(skip_all, fields(?lsn, ?prev_lsn, %full_backup))]
async fn handle_basebackup_request<IO>(

View File

@@ -12,7 +12,7 @@ use crate::keyspace::{KeySpace, KeySpaceAccum};
use crate::repository::*;
use crate::walrecord::NeonWalRecord;
use anyhow::Context;
use bytes::{Buf, Bytes};
use bytes::{Buf, Bytes, BytesMut};
use pageserver_api::key::is_rel_block_key;
use pageserver_api::reltag::{RelTag, SlruKind};
use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
@@ -314,6 +314,25 @@ impl Timeline {
}
}
/// Get the whole SLRU segment
pub async fn get_slru_segment(
&self,
kind: SlruKind,
segno: u32,
lsn: Lsn,
ctx: &RequestContext,
) -> Result<Bytes, PageReconstructError> {
let n_blocks = self.get_slru_segment_size(kind, segno, lsn, ctx).await?;
let mut segment = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize);
for blkno in 0..n_blocks {
let block = self
.get_slru_page_at_lsn(kind, segno, blkno, lsn, ctx)
.await?;
segment.extend_from_slice(&block[..BLCKSZ as usize]);
}
Ok(segment.freeze())
}
/// Look up given SLRU page version.
pub async fn get_slru_page_at_lsn(
&self,

View File

@@ -16,6 +16,7 @@
#include "postgres.h"
#include "neon_pgversioncompat.h"
#include "access/slru.h"
#include "access/xlogdefs.h"
#include RELFILEINFO_HDR
#include "storage/block.h"
@@ -37,6 +38,7 @@ typedef enum
T_NeonNblocksRequest,
T_NeonGetPageRequest,
T_NeonDbSizeRequest,
T_NeonGetSlruSegmentRequest,
/* pagestore -> pagestore_client */
T_NeonExistsResponse = 100,
@@ -44,6 +46,7 @@ typedef enum
T_NeonGetPageResponse,
T_NeonErrorResponse,
T_NeonDbSizeResponse,
T_NeonGetSlruSegmentResponse,
} NeonMessageTag;
/* base struct for c-style inheritance */
@@ -104,6 +107,13 @@ typedef struct
BlockNumber blkno;
} NeonGetPageRequest;
typedef struct
{
NeonRequest req;
SlruKind kind;
int segno;
} NeonGetSlruSegmentRequest;
/* supertype of all the Neon*Response structs below */
typedef struct
{
@@ -143,6 +153,14 @@ typedef struct
* message */
} NeonErrorResponse;
typedef struct
{
NeonMessageTag tag;
int n_blocks;
char data[BLCKSZ * SLRU_PAGES_PER_SEGMENT];
} NeonGetSlruSegmentResponse;
extern StringInfoData nm_pack_request(NeonRequest *msg);
extern NeonResponse *nm_unpack_response(StringInfo s);
extern char *nm_to_string(NeonMessage *msg);

View File

@@ -1034,12 +1034,25 @@ nm_pack_request(NeonRequest *msg)
break;
}
case T_NeonGetSlruSegmentRequest:
{
NeonGetSlruSegmentRequest *msg_req = (NeonGetSlruSegmentRequest *) msg;
pq_sendbyte(&s, msg_req->req.latest);
pq_sendint64(&s, msg_req->req.lsn);
pq_sendbyte(&s, msg_req->kind);
pq_sendint32(&s, msg_req->segno);
break;
}
/* pagestore -> pagestore_client. We never need to create these. */
case T_NeonExistsResponse:
case T_NeonNblocksResponse:
case T_NeonGetPageResponse:
case T_NeonErrorResponse:
case T_NeonDbSizeResponse:
case T_NeonGetSlruSegmentResponse:
default:
neon_log(ERROR, "unexpected neon message tag 0x%02x", msg->tag);
break;
@@ -1126,6 +1139,20 @@ nm_unpack_response(StringInfo s)
break;
}
case T_NeonGetSlruSegmentResponse:
{
NeonGetSlruSegmentResponse *msg_resp;
int n_blocks = pq_getmsgint(s, 4);
msg_resp = palloc(sizeof(NeonGetSlruSegmentResponse));
msg_resp->tag = tag;
msg_resp->n_blocks = n_blocks;
memcpy(msg_resp->data, pq_getmsgbytes(s, n_blocks * BLCKSZ), n_blocks * BLCKSZ);
pq_getmsgend(s);
resp = (NeonResponse *) msg_resp;
break;
}
/*
* pagestore_client -> pagestore
*
@@ -1135,6 +1162,7 @@ nm_unpack_response(StringInfo s)
case T_NeonNblocksRequest:
case T_NeonGetPageRequest:
case T_NeonDbSizeRequest:
case T_NeonGetSlruSegmentRequest:
default:
neon_log(ERROR, "unexpected neon message tag 0x%02x", tag);
break;
@@ -1204,7 +1232,18 @@ nm_to_string(NeonMessage *msg)
appendStringInfoChar(&s, '}');
break;
}
case T_NeonGetSlruSegmentRequest:
{
NeonGetSlruSegmentRequest *msg_req = (NeonGetSlruSegmentRequest *) msg;
appendStringInfoString(&s, "{\"type\": \"NeonGetSlruSegmentRequest\"");
appendStringInfo(&s, ", \"kind\": %u", msg_req->kind);
appendStringInfo(&s, ", \"segno\": %u", msg_req->segno);
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest);
appendStringInfoChar(&s, '}');
break;
}
/* pagestore -> pagestore_client */
case T_NeonExistsResponse:
{
@@ -1258,6 +1297,17 @@ nm_to_string(NeonMessage *msg)
msg_resp->db_size);
appendStringInfoChar(&s, '}');
break;
}
case T_NeonGetSlruSegmentResponse:
{
NeonGetSlruSegmentResponse *msg_resp = (NeonGetSlruSegmentResponse *) msg;
appendStringInfoString(&s, "{\"type\": \"NeonGetSlruSegmentResponse\"");
appendStringInfo(&s, ", \"n_blocks\": %u}",
msg_resp->n_blocks);
appendStringInfoChar(&s, '}');
break;
}
@@ -2727,6 +2777,61 @@ neon_end_unlogged_build(SMgrRelation reln)
unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
}
static int
neon_read_slru_segment(SMgrRelation reln, SlruKind kind, int segno, void* buffer)
{
XLogRecPtr request_lsn;
/* TODO: any better alternative than flush LSN? Actually we to request SLRU at basebackup creation time... */
#if PG_VERSION_NUM >= 150000
request_lsn = GetFlushRecPtr(NULL);
#else
request_lsn = GetFlushRecPtr();
#endif
NeonResponse *resp;
shardno_t shard_no = 0; /* SLRU are at the zero shard */
NeonGetSlruSegmentRequest request = {
.req.tag = T_NeonGetSlruSegmentRequest,
.req.latest = false,
.req.lsn = request_lsn,
.kind = kind,
.segno = segno
};
int n_blocks;
do
{
while (!page_server->send(shard_no, &request.req) || !page_server->flush(shard_no));
consume_prefetch_responses();
resp = page_server->receive(shard_no);
} while (resp == NULL);
switch (resp->tag)
{
case T_NeonGetSlruSegmentResponse:
n_blocks = ((NeonGetSlruSegmentResponse *) resp)->n_blocks;
memcpy(buffer, ((NeonGetSlruSegmentResponse *) resp)->data, n_blocks*BLCKSZ);
break;
case T_NeonErrorResponse:
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),
errmsg(NEON_TAG "could not read SLRU %d segment %d at lsn %X/%08X",
kind,
segno,
(uint32) (request_lsn >> 32), (uint32) request_lsn),
errdetail("page server returned error: %s",
((NeonErrorResponse *) resp)->message)));
break;
default:
neon_log(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag);
}
pfree(resp);
return n_blocks;
}
static void
AtEOXact_neon(XactEvent event, void *arg)
{
@@ -2785,6 +2890,8 @@ static const struct f_smgr neon_smgr =
.smgr_start_unlogged_build = neon_start_unlogged_build,
.smgr_finish_unlogged_build_phase_1 = neon_finish_unlogged_build_phase_1,
.smgr_end_unlogged_build = neon_end_unlogged_build,
.smgr_read_slru_segment = neon_read_slru_segment,
};
const f_smgr *

View File

@@ -60,6 +60,7 @@ fn analyze_trace<R: std::io::Read>(mut reader: R) {
match msg {
PagestreamFeMessage::Exists(_) => {}
PagestreamFeMessage::Nblocks(_) => {}
PagestreamFeMessage::GetSlruSegment(_) => {}
PagestreamFeMessage::GetPage(req) => {
total += 1;

View File

@@ -1,5 +1,5 @@
{
"postgres-v16": "e3a22b72922055f9212eca12700190f118578362",
"postgres-v15": "bc88f539312fcc4bb292ce94ae9db09ab6656e8a",
"postgres-v14": "dd067cf656f6810a25aca6025633d32d02c5085a"
"postgres-v16": "2db481308645758214bb327d74558e9f846f43e9",
"postgres-v15": "af6924bda1a6d3bbe19105d6879f82aaff837fc2",
"postgres-v14": "bab16da2bb1aba792849b4e5429671122abf0732"
}