Compare commits

...

14 Commits

Author SHA1 Message Date
Arpad Müller
548b28b28b WIP remove generic argument from safekeeper 2024-04-28 01:32:12 +02:00
Arpad Müller
f9039f7d73 Remove async_trait again 2024-04-28 01:15:07 +02:00
Arpad Müller
2a7bc782fd Remove generic IO argument 2024-04-28 01:11:59 +02:00
Arpad Müller
307d10b111 Add HandlerSync trait 2024-04-28 01:01:21 +02:00
Arpad Müller
5546c0de35 Same for SafekeeperPostgresHandler 2024-04-28 00:36:45 +02:00
Arpad Müller
d1fb4a4a00 Move PageServerHandler process_query content 2024-04-28 00:30:46 +02:00
Christian Schwarz
bf369f4268 refactor(owned_buffer_io::util::size_tracking_writer): make generic over underlying writer (#7483)
part of https://github.com/neondatabase/neon/issues/7124
2024-04-26 09:19:41 +00:00
Christian Schwarz
70f4a16a05 refactor(owned_buffers_io::BufferedWriter): be generic over the type of buffer (#7482) 2024-04-26 08:30:20 +00:00
John Spray
d63185fa6c storage controller: log hygiene & better error type (#7508)
These are testability/logging improvements spun off from #7475

- Don't log warnings for shutdown errors in compute hook
- Revise logging around heartbeats and reconcile_all so that we aren't
emitting such a large volume of INFO messages under normal quite
conditions.
- Clean up the `last_error` of TenantShard to hold a ReconcileError
instead of a String, and use that properly typed error to suppress
reconciler cancel errors during reconcile_all_now. This is important for
tests that iteratively call that, as otherwise they would get 500 errors
when some reconciler in flight was cancelled (perhaps due to a state
change on the tenant shard starting a new reconciler).
2024-04-26 08:15:59 +00:00
Heikki Linnakangas
ca8fca0e9f Add test to demonstrate the problem with protocol version 1 (#7377) 2024-04-25 20:45:37 +03:00
Heikki Linnakangas
0397427dcf Add test for SLRU download (#7377)
Before PR #7377, on-demand SLRU download always used the basebackup's
LSN in the SLRU download, but that LSN might get garbage-collected away
in the pageserver. We should request the latest LSN, like with GetPage
requests, with the LSN just indicating that we know that the page hasn't
been changed since the LSN (since the basebackup in this case).

Add test to demonstrate the problem. Without the fix, it fails with
"tried to request a page version that was garbage collected" error from
the pageserver.

I wrote this test as part of earlier PR #6693, but that fell through
the cracks and was never applied. PR #7377 superseded the fix from
that older PR, but the test is still valid.
2024-04-25 20:45:37 +03:00
Heikki Linnakangas
a2a44ea213 Refactor how the request LSNs are tracked in compute (#7377)
Instead of thinking in terms of 'latest' and 'lsn' of the request,
each request has two LSNs: the request LSN and 'not_modified_since'
LSN. The request is nominally made at the request LSN, that determines
what page version we want to see. But as a hint, we also include
'not_modified_since'. It tells the pageserver that the page has not
been modified since that LSN, which allows the pageserver to skip
waiting for newer WAL to arrive, and could allow more optimizations in
the future.

Refactor the internal functions to calculate the request LSN to
calculate both LSNs.

Sending two LSNs to the pageserver requires using the new protocol
version 2. The previous commit added the server support for it, but we
still default to the old protocol for compatibility with old
pageservers. The 'neon.protocol_version' GUC can be used to use the
new protocol.

The new protocol addresses one cause of issue #6211, although you can
still get the same error if you have a standby that is lagging behind
so that the page version it needs is genuinely GC'd away.
2024-04-25 20:45:37 +03:00
Heikki Linnakangas
4917f52c88 Server support for new pagestream protocol version (#7377)
In the old protocol version, the client sent with each request:

- latest: bool. If true, the client requested the latest page
  version, and the 'lsn' was just a hint of when the page was last
  modified
- lsn: Lsn, the page version to return

This protocol didn't allow requesting a page at a particular
non-latest LSN and *also* sending a hint on when the page was last
modified. That put a read only compute into an awkward position where
it had to either request each page at the replay-LSN, which could be
very close to the last LSN written in the primary and therefore
require the pageserver to wait for it to arrive, or an older LSN which
could already be garbage collected in the pageserver, resulting in an
error. The new protocol version fixes that by allowing a read only
compute to send both LSNs.

To use the new protocol version, use "pagestream_v2" command instead
of just "pagestream". The old protocol version is still supported, for
compatibility with old computes (and in fact there is no client
support yet, it is added by the next commit).
2024-04-25 20:45:37 +03:00
Heikki Linnakangas
04a682021f Remove the now-unused 'latest' arguments (#7377)
The 'latest' argument was passed to the functions in
pgdatadir_mapping.rs to know when they can update the relsize
cache. Commit e69ff3fc00 changed how the relsize cache is updated,
making the 'latest' argument unused.
2024-04-25 20:45:37 +03:00
30 changed files with 1155 additions and 504 deletions

View File

@@ -848,39 +848,72 @@ impl TryFrom<u8> for PagestreamBeMessageTag {
}
}
// In the V2 protocol version, a GetPage request contains two LSN values:
//
// request_lsn: Get the page version at this point in time. Lsn::Max is a special value that means
// "get the latest version present". It's used by the primary server, which knows that no one else
// is writing WAL. 'not_modified_since' must be set to a proper value even if request_lsn is
// Lsn::Max. Standby servers use the current replay LSN as the request LSN.
//
// not_modified_since: Hint to the pageserver that the client knows that the page has not been
// modified between 'not_modified_since' and the request LSN. It's always correct to set
// 'not_modified_since equal' to 'request_lsn' (unless Lsn::Max is used as the 'request_lsn'), but
// passing an earlier LSN can speed up the request, by allowing the pageserver to process the
// request without waiting for 'request_lsn' to arrive.
//
// The legacy V1 interface contained only one LSN, and a boolean 'latest' flag. The V1 interface was
// sufficient for the primary; the 'lsn' was equivalent to the 'not_modified_since' value, and
// 'latest' was set to true. The V2 interface was added because there was no correct way for a
// standby to request a page at a particular non-latest LSN, and also include the
// 'not_modified_since' hint. That led to an awkward choice of either using an old LSN in the
// request, if the standby knows that the page hasn't been modified since, and risk getting an error
// if that LSN has fallen behind the GC horizon, or requesting the current replay LSN, which could
// require the pageserver unnecessarily to wait for the WAL to arrive up to that point. The new V2
// interface allows sending both LSNs, and let the pageserver do the right thing. There is no
// difference in the responses between V1 and V2.
//
// The Request structs below reflect the V2 interface. If V1 is used, the parse function
// maps the old format requests to the new format.
//
#[derive(Clone, Copy)]
pub enum PagestreamProtocolVersion {
V1,
V2,
}
#[derive(Debug, PartialEq, Eq)]
pub struct PagestreamExistsRequest {
pub latest: bool,
pub lsn: Lsn,
pub request_lsn: Lsn,
pub not_modified_since: Lsn,
pub rel: RelTag,
}
#[derive(Debug, PartialEq, Eq)]
pub struct PagestreamNblocksRequest {
pub latest: bool,
pub lsn: Lsn,
pub request_lsn: Lsn,
pub not_modified_since: Lsn,
pub rel: RelTag,
}
#[derive(Debug, PartialEq, Eq)]
pub struct PagestreamGetPageRequest {
pub latest: bool,
pub lsn: Lsn,
pub request_lsn: Lsn,
pub not_modified_since: Lsn,
pub rel: RelTag,
pub blkno: u32,
}
#[derive(Debug, PartialEq, Eq)]
pub struct PagestreamDbSizeRequest {
pub latest: bool,
pub lsn: Lsn,
pub request_lsn: Lsn,
pub not_modified_since: Lsn,
pub dbnode: u32,
}
#[derive(Debug, PartialEq, Eq)]
pub struct PagestreamGetSlruSegmentRequest {
pub latest: bool,
pub lsn: Lsn,
pub request_lsn: Lsn,
pub not_modified_since: Lsn,
pub kind: u8,
pub segno: u32,
}
@@ -927,14 +960,16 @@ pub struct TenantHistorySize {
}
impl PagestreamFeMessage {
/// Serialize a compute -> pageserver message. This is currently only used in testing
/// tools. Always uses protocol version 2.
pub fn serialize(&self) -> Bytes {
let mut bytes = BytesMut::new();
match self {
Self::Exists(req) => {
bytes.put_u8(0);
bytes.put_u8(u8::from(req.latest));
bytes.put_u64(req.lsn.0);
bytes.put_u64(req.request_lsn.0);
bytes.put_u64(req.not_modified_since.0);
bytes.put_u32(req.rel.spcnode);
bytes.put_u32(req.rel.dbnode);
bytes.put_u32(req.rel.relnode);
@@ -943,8 +978,8 @@ impl PagestreamFeMessage {
Self::Nblocks(req) => {
bytes.put_u8(1);
bytes.put_u8(u8::from(req.latest));
bytes.put_u64(req.lsn.0);
bytes.put_u64(req.request_lsn.0);
bytes.put_u64(req.not_modified_since.0);
bytes.put_u32(req.rel.spcnode);
bytes.put_u32(req.rel.dbnode);
bytes.put_u32(req.rel.relnode);
@@ -953,8 +988,8 @@ impl PagestreamFeMessage {
Self::GetPage(req) => {
bytes.put_u8(2);
bytes.put_u8(u8::from(req.latest));
bytes.put_u64(req.lsn.0);
bytes.put_u64(req.request_lsn.0);
bytes.put_u64(req.not_modified_since.0);
bytes.put_u32(req.rel.spcnode);
bytes.put_u32(req.rel.dbnode);
bytes.put_u32(req.rel.relnode);
@@ -964,15 +999,15 @@ impl PagestreamFeMessage {
Self::DbSize(req) => {
bytes.put_u8(3);
bytes.put_u8(u8::from(req.latest));
bytes.put_u64(req.lsn.0);
bytes.put_u64(req.request_lsn.0);
bytes.put_u64(req.not_modified_since.0);
bytes.put_u32(req.dbnode);
}
Self::GetSlruSegment(req) => {
bytes.put_u8(4);
bytes.put_u8(u8::from(req.latest));
bytes.put_u64(req.lsn.0);
bytes.put_u64(req.request_lsn.0);
bytes.put_u64(req.not_modified_since.0);
bytes.put_u8(req.kind);
bytes.put_u32(req.segno);
}
@@ -981,18 +1016,40 @@ impl PagestreamFeMessage {
bytes.into()
}
pub fn parse<R: std::io::Read>(body: &mut R) -> anyhow::Result<PagestreamFeMessage> {
// TODO these gets can fail
pub fn parse<R: std::io::Read>(
body: &mut R,
protocol_version: PagestreamProtocolVersion,
) -> anyhow::Result<PagestreamFeMessage> {
// these correspond to the NeonMessageTag enum in pagestore_client.h
//
// TODO: consider using protobuf or serde bincode for less error prone
// serialization.
let msg_tag = body.read_u8()?;
let (request_lsn, not_modified_since) = match protocol_version {
PagestreamProtocolVersion::V2 => (
Lsn::from(body.read_u64::<BigEndian>()?),
Lsn::from(body.read_u64::<BigEndian>()?),
),
PagestreamProtocolVersion::V1 => {
// In the old protocol, each message starts with a boolean 'latest' flag,
// followed by 'lsn'. Convert that to the two LSNs, 'request_lsn' and
// 'not_modified_since', used in the new protocol version.
let latest = body.read_u8()? != 0;
let request_lsn = Lsn::from(body.read_u64::<BigEndian>()?);
if latest {
(Lsn::MAX, request_lsn) // get latest version
} else {
(request_lsn, request_lsn) // get version at specified LSN
}
}
};
// The rest of the messages are the same between V1 and V2
match msg_tag {
0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
latest: body.read_u8()? != 0,
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
request_lsn,
not_modified_since,
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
@@ -1001,8 +1058,8 @@ impl PagestreamFeMessage {
},
})),
1 => Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
latest: body.read_u8()? != 0,
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
request_lsn,
not_modified_since,
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
@@ -1011,8 +1068,8 @@ impl PagestreamFeMessage {
},
})),
2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
latest: body.read_u8()? != 0,
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
request_lsn,
not_modified_since,
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
@@ -1022,14 +1079,14 @@ impl PagestreamFeMessage {
blkno: body.read_u32::<BigEndian>()?,
})),
3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
latest: body.read_u8()? != 0,
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
request_lsn,
not_modified_since,
dbnode: body.read_u32::<BigEndian>()?,
})),
4 => Ok(PagestreamFeMessage::GetSlruSegment(
PagestreamGetSlruSegmentRequest {
latest: body.read_u8()? != 0,
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
request_lsn,
not_modified_since,
kind: body.read_u8()?,
segno: body.read_u32::<BigEndian>()?,
},
@@ -1157,8 +1214,8 @@ mod tests {
// Test serialization/deserialization of PagestreamFeMessage
let messages = vec![
PagestreamFeMessage::Exists(PagestreamExistsRequest {
latest: true,
lsn: Lsn(4),
request_lsn: Lsn(4),
not_modified_since: Lsn(3),
rel: RelTag {
forknum: 1,
spcnode: 2,
@@ -1167,8 +1224,8 @@ mod tests {
},
}),
PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
latest: false,
lsn: Lsn(4),
request_lsn: Lsn(4),
not_modified_since: Lsn(4),
rel: RelTag {
forknum: 1,
spcnode: 2,
@@ -1177,8 +1234,8 @@ mod tests {
},
}),
PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
latest: true,
lsn: Lsn(4),
request_lsn: Lsn(4),
not_modified_since: Lsn(3),
rel: RelTag {
forknum: 1,
spcnode: 2,
@@ -1188,14 +1245,16 @@ mod tests {
blkno: 7,
}),
PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
latest: true,
lsn: Lsn(4),
request_lsn: Lsn(4),
not_modified_since: Lsn(3),
dbnode: 7,
}),
];
for msg in messages {
let bytes = msg.serialize();
let reconstructed = PagestreamFeMessage::parse(&mut bytes.reader()).unwrap();
let reconstructed =
PagestreamFeMessage::parse(&mut bytes.reader(), PagestreamProtocolVersion::V2)
.unwrap();
assert!(msg == reconstructed);
}
}

View File

@@ -78,18 +78,18 @@ pub fn is_expected_io_error(e: &io::Error) -> bool {
)
}
#[async_trait::async_trait]
pub trait Handler<IO> {
pub trait Handler<IO>: HandlerSync<IO> {
/// Handle single query.
/// postgres_backend will issue ReadyForQuery after calling this (this
/// might be not what we want after CopyData streaming, but currently we don't
/// care). It will also flush out the output buffer.
async fn process_query(
fn process_query(
&mut self,
pgb: &mut PostgresBackend<IO>,
query_string: &str,
) -> Result<(), QueryError>;
) -> impl Future<Output = Result<(), QueryError>> + Send;
}
pub trait HandlerSync<IO> {
/// Called on startup packet receival, allows to process params.
///
/// If Ok(false) is returned postgres_backend will skip auth -- that is needed for new users

View File

@@ -60,7 +60,7 @@ impl Client {
) -> anyhow::Result<PagestreamClient> {
let copy_both: tokio_postgres::CopyBothDuplex<bytes::Bytes> = self
.client
.copy_both_simple(&format!("pagestream {tenant_id} {timeline_id}"))
.copy_both_simple(&format!("pagestream_v2 {tenant_id} {timeline_id}"))
.await?;
let Client {
cancel_on_client_drop,

View File

@@ -312,8 +312,12 @@ async fn main_impl(
let (rel_tag, block_no) =
key_to_rel_block(key).expect("we filter non-rel-block keys out above");
PagestreamGetPageRequest {
latest: rng.gen_bool(args.req_latest_probability),
lsn: r.timeline_lsn,
request_lsn: if rng.gen_bool(args.req_latest_probability) {
Lsn::MAX
} else {
r.timeline_lsn
},
not_modified_since: r.timeline_lsn,
rel: rel_tag,
blkno: block_no,
}

View File

@@ -376,7 +376,7 @@ where
async fn add_rel(&mut self, src: RelTag, dst: RelTag) -> anyhow::Result<()> {
let nblocks = self
.timeline
.get_rel_size(src, Version::Lsn(self.lsn), false, self.ctx)
.get_rel_size(src, Version::Lsn(self.lsn), self.ctx)
.await?;
// If the relation is empty, create an empty file
@@ -397,7 +397,7 @@ where
for blknum in startblk..endblk {
let img = self
.timeline
.get_rel_page_at_lsn(src, blknum, Version::Lsn(self.lsn), false, self.ctx)
.get_rel_page_at_lsn(src, blknum, Version::Lsn(self.lsn), self.ctx)
.await?;
segment_data.extend_from_slice(&img[..]);
}

View File

@@ -1,13 +1,5 @@
//
//! The Page Service listens for client connections and serves their GetPage@LSN
//! requests.
//
// It is possible to connect here using usual psql/pgbench/libpq. Following
// commands are supported now:
// *status* -- show actual info about this pageserver,
// *pagestream* -- enter mode where smgr and pageserver talk with their
// custom protocol.
//
use anyhow::Context;
use async_compression::tokio::write::GzipEncoder;
@@ -23,7 +15,7 @@ use pageserver_api::models::{
PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse,
PagestreamGetSlruSegmentRequest, PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest,
PagestreamNblocksResponse,
PagestreamNblocksResponse, PagestreamProtocolVersion,
};
use pageserver_api::shard::ShardIndex;
use pageserver_api::shard::ShardNumber;
@@ -551,6 +543,7 @@ impl PageServerHandler {
pgb: &mut PostgresBackend<IO>,
tenant_id: TenantId,
timeline_id: TimelineId,
protocol_version: PagestreamProtocolVersion,
ctx: RequestContext,
) -> Result<(), QueryError>
where
@@ -613,14 +606,15 @@ impl PageServerHandler {
t.trace(&copy_data_bytes)
}
let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
let neon_fe_msg =
PagestreamFeMessage::parse(&mut copy_data_bytes.reader(), protocol_version)?;
// TODO: We could create a new per-request context here, with unique ID.
// Currently we use the same per-timeline context for all requests
let (response, span) = match neon_fe_msg {
PagestreamFeMessage::Exists(req) => {
let span = tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.lsn);
let span = tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.request_lsn);
(
self.handle_get_rel_exists_request(tenant_id, timeline_id, &req, &ctx)
.instrument(span.clone())
@@ -629,7 +623,7 @@ impl PageServerHandler {
)
}
PagestreamFeMessage::Nblocks(req) => {
let span = tracing::info_span!("handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.lsn);
let span = tracing::info_span!("handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.request_lsn);
(
self.handle_get_nblocks_request(tenant_id, timeline_id, &req, &ctx)
.instrument(span.clone())
@@ -639,7 +633,7 @@ impl PageServerHandler {
}
PagestreamFeMessage::GetPage(req) => {
// shard_id is filled in by the handler
let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn);
let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.request_lsn);
(
self.handle_get_page_at_lsn_request(tenant_id, timeline_id, &req, &ctx)
.instrument(span.clone())
@@ -648,7 +642,7 @@ impl PageServerHandler {
)
}
PagestreamFeMessage::DbSize(req) => {
let span = tracing::info_span!("handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.lsn);
let span = tracing::info_span!("handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.request_lsn);
(
self.handle_db_size_request(tenant_id, timeline_id, &req, &ctx)
.instrument(span.clone())
@@ -657,7 +651,7 @@ impl PageServerHandler {
)
}
PagestreamFeMessage::GetSlruSegment(req) => {
let span = tracing::info_span!("handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.lsn);
let span = tracing::info_span!("handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.request_lsn);
(
self.handle_get_slru_segment_request(tenant_id, timeline_id, &req, &ctx)
.instrument(span.clone())
@@ -838,83 +832,80 @@ impl PageServerHandler {
/// Helper function to handle the LSN from client request.
///
/// Each GetPage (and Exists and Nblocks) request includes information about
/// which version of the page is being requested. The client can request the
/// latest version of the page, or the version that's valid at a particular
/// LSN. The primary compute node will always request the latest page
/// version, while a standby will request a version at the LSN that it's
/// currently caught up to.
/// which version of the page is being requested. The primary compute node
/// will always request the latest page version, by setting 'request_lsn' to
/// the last inserted or flushed WAL position, while a standby will request
/// a version at the LSN that it's currently caught up to.
///
/// In either case, if the page server hasn't received the WAL up to the
/// requested LSN yet, we will wait for it to arrive. The return value is
/// the LSN that should be used to look up the page versions.
///
/// In addition to the request LSN, each request carries another LSN,
/// 'not_modified_since', which is a hint to the pageserver that the client
/// knows that the page has not been modified between 'not_modified_since'
/// and the request LSN. This allows skipping the wait, as long as the WAL
/// up to 'not_modified_since' has arrived. If the client doesn't have any
/// information about when the page was modified, it will use
/// not_modified_since == lsn. If the client lies and sends a too low
/// not_modified_hint such that there are in fact later page versions, the
/// behavior is undefined: the pageserver may return any of the page versions
/// or an error.
async fn wait_or_get_last_lsn(
timeline: &Timeline,
mut lsn: Lsn,
latest: bool,
request_lsn: Lsn,
not_modified_since: Lsn,
latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
ctx: &RequestContext,
) -> Result<Lsn, PageStreamError> {
if latest {
// Latest page version was requested. If LSN is given, it is a hint
// to the page server that there have been no modifications to the
// page after that LSN. If we haven't received WAL up to that point,
// wait until it arrives.
let last_record_lsn = timeline.get_last_record_lsn();
let last_record_lsn = timeline.get_last_record_lsn();
// Note: this covers the special case that lsn == Lsn(0). That
// special case means "return the latest version whatever it is",
// and it's used for bootstrapping purposes, when the page server is
// connected directly to the compute node. That is needed because
// when you connect to the compute node, to receive the WAL, the
// walsender process will do a look up in the pg_authid catalog
// table for authentication. That poses a deadlock problem: the
// catalog table lookup will send a GetPage request, but the GetPage
// request will block in the page server because the recent WAL
// hasn't been received yet, and it cannot be received until the
// walsender completes the authentication and starts streaming the
// WAL.
if lsn <= last_record_lsn {
// It might be better to use max(lsn, latest_gc_cutoff_lsn) instead
// last_record_lsn. That would give the same result, since we know
// that there haven't been modifications since 'lsn'. Using an older
// LSN might be faster, because that could allow skipping recent
// layers when finding the page.
lsn = last_record_lsn;
// Sanity check the request
if request_lsn < not_modified_since {
return Err(PageStreamError::BadRequest(
format!(
"invalid request with request LSN {} and not_modified_since {}",
request_lsn, not_modified_since,
)
.into(),
));
}
if request_lsn < **latest_gc_cutoff_lsn {
// Check explicitly for INVALID just to get a less scary error message if the
// request is obviously bogus
return Err(if request_lsn == Lsn::INVALID {
PageStreamError::BadRequest("invalid LSN(0) in request".into())
} else {
timeline
.wait_lsn(
lsn,
crate::tenant::timeline::WaitLsnWaiter::PageService,
ctx,
)
.await?;
// Since we waited for 'lsn' to arrive, that is now the last
// record LSN. (Or close enough for our purposes; the
// last-record LSN can advance immediately after we return
// anyway)
}
} else {
if lsn == Lsn(0) {
return Err(PageStreamError::BadRequest(
"invalid LSN(0) in request".into(),
));
}
PageStreamError::BadRequest(format!(
"tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
request_lsn, **latest_gc_cutoff_lsn
).into())
});
}
// Wait for WAL up to 'not_modified_since' to arrive, if necessary
if not_modified_since > last_record_lsn {
timeline
.wait_lsn(
lsn,
not_modified_since,
crate::tenant::timeline::WaitLsnWaiter::PageService,
ctx,
)
.await?;
// Since we waited for 'not_modified_since' to arrive, that is now the last
// record LSN. (Or close enough for our purposes; the last-record LSN can
// advance immediately after we return anyway)
Ok(not_modified_since)
} else {
// It might be better to use max(not_modified_since, latest_gc_cutoff_lsn)
// here instead. That would give the same result, since we know that there
// haven't been any modifications since 'not_modified_since'. Using an older
// LSN might be faster, because that could allow skipping recent layers when
// finding the page. However, we have historically used 'last_record_lsn', so
// stick to that for now.
Ok(std::cmp::min(last_record_lsn, request_lsn))
}
if lsn < **latest_gc_cutoff_lsn {
return Err(PageStreamError::BadRequest(format!(
"tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
lsn, **latest_gc_cutoff_lsn
).into()));
}
Ok(lsn)
}
#[instrument(skip_all, fields(shard_id))]
@@ -931,12 +922,17 @@ impl PageServerHandler {
.start_timer(metrics::SmgrQueryType::GetRelExists, ctx);
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn =
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
.await?;
let lsn = Self::wait_or_get_last_lsn(
timeline,
req.request_lsn,
req.not_modified_since,
&latest_gc_cutoff_lsn,
ctx,
)
.await?;
let exists = timeline
.get_rel_exists(req.rel, Version::Lsn(lsn), req.latest, ctx)
.get_rel_exists(req.rel, Version::Lsn(lsn), ctx)
.await?;
Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse {
@@ -959,12 +955,17 @@ impl PageServerHandler {
.start_timer(metrics::SmgrQueryType::GetRelSize, ctx);
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn =
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
.await?;
let lsn = Self::wait_or_get_last_lsn(
timeline,
req.request_lsn,
req.not_modified_since,
&latest_gc_cutoff_lsn,
ctx,
)
.await?;
let n_blocks = timeline
.get_rel_size(req.rel, Version::Lsn(lsn), req.latest, ctx)
.get_rel_size(req.rel, Version::Lsn(lsn), ctx)
.await?;
Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse {
@@ -987,18 +988,17 @@ impl PageServerHandler {
.start_timer(metrics::SmgrQueryType::GetDbSize, ctx);
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn =
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
.await?;
let lsn = Self::wait_or_get_last_lsn(
timeline,
req.request_lsn,
req.not_modified_since,
&latest_gc_cutoff_lsn,
ctx,
)
.await?;
let total_blocks = timeline
.get_db_size(
DEFAULTTABLESPACE_OID,
req.dbnode,
Version::Lsn(lsn),
req.latest,
ctx,
)
.get_db_size(DEFAULTTABLESPACE_OID, req.dbnode, Version::Lsn(lsn), ctx)
.await?;
let db_size = total_blocks as i64 * BLCKSZ as i64;
@@ -1165,12 +1165,17 @@ impl PageServerHandler {
.start_timer(metrics::SmgrQueryType::GetPageAtLsn, ctx);
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn =
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
.await?;
let lsn = Self::wait_or_get_last_lsn(
timeline,
req.request_lsn,
req.not_modified_since,
&latest_gc_cutoff_lsn,
ctx,
)
.await?;
let page = timeline
.get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), req.latest, ctx)
.get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), ctx)
.await?;
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
@@ -1193,9 +1198,14 @@ impl PageServerHandler {
.start_timer(metrics::SmgrQueryType::GetSlruSegment, ctx);
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn =
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
.await?;
let lsn = Self::wait_or_get_last_lsn(
timeline,
req.request_lsn,
req.not_modified_since,
&latest_gc_cutoff_lsn,
ctx,
)
.await?;
let kind = SlruKind::from_repr(req.kind)
.ok_or(PageStreamError::BadRequest("invalid SLRU kind".into()))?;
@@ -1358,8 +1368,7 @@ impl PageServerHandler {
}
}
#[async_trait::async_trait]
impl<IO> postgres_backend::Handler<IO> for PageServerHandler
impl<IO> postgres_backend::HandlerSync<IO> for PageServerHandler
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
{
@@ -1399,9 +1408,24 @@ where
) -> Result<(), QueryError> {
Ok(())
}
}
type IO<'s> = std::pin::Pin<&'s mut tokio_io_timeout::TimeoutReader<tokio::net::TcpStream>>;
impl<'s> postgres_backend::Handler<IO<'s>> for PageServerHandler
{
#[instrument(skip_all, fields(tenant_id, timeline_id))]
async fn process_query(
&mut self,
pgb: &mut PostgresBackend<IO<'s>>,
query_string: &str,
) -> Result<(), QueryError> {
self.process_query_(pgb, &query_string).await
}
}
impl PageServerHandler {
async fn process_query_<IO: AsyncRead + AsyncWrite + Send + Sync + Unpin>(
&mut self,
pgb: &mut PostgresBackend<IO>,
query_string: &str,
@@ -1413,7 +1437,34 @@ where
let ctx = self.connection_ctx.attached_child();
debug!("process query {query_string:?}");
if query_string.starts_with("pagestream ") {
if query_string.starts_with("pagestream_v2 ") {
let (_, params_raw) = query_string.split_at("pagestream_v2 ".len());
let params = params_raw.split(' ').collect::<Vec<_>>();
if params.len() != 2 {
return Err(QueryError::Other(anyhow::anyhow!(
"invalid param number for pagestream command"
)));
}
let tenant_id = TenantId::from_str(params[0])
.with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
let timeline_id = TimelineId::from_str(params[1])
.with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
tracing::Span::current()
.record("tenant_id", field::display(tenant_id))
.record("timeline_id", field::display(timeline_id));
self.check_permission(Some(tenant_id))?;
self.handle_pagerequests(
pgb,
tenant_id,
timeline_id,
PagestreamProtocolVersion::V2,
ctx,
)
.await?;
} else if query_string.starts_with("pagestream ") {
let (_, params_raw) = query_string.split_at("pagestream ".len());
let params = params_raw.split(' ').collect::<Vec<_>>();
if params.len() != 2 {
@@ -1432,8 +1483,14 @@ where
self.check_permission(Some(tenant_id))?;
self.handle_pagerequests(pgb, tenant_id, timeline_id, ctx)
.await?;
self.handle_pagerequests(
pgb,
tenant_id,
timeline_id,
PagestreamProtocolVersion::V1,
ctx,
)
.await?;
} else if query_string.starts_with("basebackup ") {
let (_, params_raw) = query_string.split_at("basebackup ".len());
let params = params_raw.split_whitespace().collect::<Vec<_>>();

View File

@@ -176,7 +176,6 @@ impl Timeline {
tag: RelTag,
blknum: BlockNumber,
version: Version<'_>,
latest: bool,
ctx: &RequestContext,
) -> Result<Bytes, PageReconstructError> {
if tag.relnode == 0 {
@@ -185,7 +184,7 @@ impl Timeline {
));
}
let nblocks = self.get_rel_size(tag, version, latest, ctx).await?;
let nblocks = self.get_rel_size(tag, version, ctx).await?;
if blknum >= nblocks {
debug!(
"read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
@@ -207,7 +206,6 @@ impl Timeline {
spcnode: Oid,
dbnode: Oid,
version: Version<'_>,
latest: bool,
ctx: &RequestContext,
) -> Result<usize, PageReconstructError> {
let mut total_blocks = 0;
@@ -215,7 +213,7 @@ impl Timeline {
let rels = self.list_rels(spcnode, dbnode, version, ctx).await?;
for rel in rels {
let n_blocks = self.get_rel_size(rel, version, latest, ctx).await?;
let n_blocks = self.get_rel_size(rel, version, ctx).await?;
total_blocks += n_blocks as usize;
}
Ok(total_blocks)
@@ -226,7 +224,6 @@ impl Timeline {
&self,
tag: RelTag,
version: Version<'_>,
latest: bool,
ctx: &RequestContext,
) -> Result<BlockNumber, PageReconstructError> {
if tag.relnode == 0 {
@@ -240,7 +237,7 @@ impl Timeline {
}
if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
&& !self.get_rel_exists(tag, version, latest, ctx).await?
&& !self.get_rel_exists(tag, version, ctx).await?
{
// FIXME: Postgres sometimes calls smgrcreate() to create
// FSM, and smgrnblocks() on it immediately afterwards,
@@ -263,7 +260,6 @@ impl Timeline {
&self,
tag: RelTag,
version: Version<'_>,
_latest: bool,
ctx: &RequestContext,
) -> Result<bool, PageReconstructError> {
if tag.relnode == 0 {
@@ -1095,7 +1091,7 @@ impl<'a> DatadirModification<'a> {
) -> anyhow::Result<()> {
let total_blocks = self
.tline
.get_db_size(spcnode, dbnode, Version::Modified(self), true, ctx)
.get_db_size(spcnode, dbnode, Version::Modified(self), ctx)
.await?;
// Remove entry from dbdir
@@ -1194,7 +1190,7 @@ impl<'a> DatadirModification<'a> {
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
if self
.tline
.get_rel_exists(rel, Version::Modified(self), true, ctx)
.get_rel_exists(rel, Version::Modified(self), ctx)
.await?
{
let size_key = rel_size_to_key(rel);

View File

@@ -7,6 +7,7 @@ use std::collections::HashSet;
use std::future::Future;
use anyhow::{anyhow, Context};
use bytes::BytesMut;
use camino::{Utf8Path, Utf8PathBuf};
use pageserver_api::shard::TenantShardId;
use tokio::fs::{self, File, OpenOptions};
@@ -182,6 +183,7 @@ async fn download_object<'a>(
#[cfg(target_os = "linux")]
crate::virtual_file::io_engine::IoEngine::TokioEpollUring => {
use crate::virtual_file::owned_buffers_io::{self, util::size_tracking_writer};
use bytes::BytesMut;
async {
let destination_file = VirtualFile::create(dst_path)
.await
@@ -194,10 +196,10 @@ async fn download_object<'a>(
// There's chunks_vectored() on the stream.
let (bytes_amount, destination_file) = async {
let size_tracking = size_tracking_writer::Writer::new(destination_file);
let mut buffered = owned_buffers_io::write::BufferedWriter::<
{ super::BUFFER_SIZE },
_,
>::new(size_tracking);
let mut buffered = owned_buffers_io::write::BufferedWriter::<BytesMut, _>::new(
size_tracking,
BytesMut::with_capacity(super::BUFFER_SIZE),
);
while let Some(res) =
futures::StreamExt::next(&mut download.download_stream).await
{

View File

@@ -32,6 +32,7 @@ pub use io_engine::feature_test as io_engine_feature_test;
pub use io_engine::FeatureTestResult as IoEngineFeatureTestResult;
mod metadata;
mod open_options;
use self::owned_buffers_io::write::OwnedAsyncWriter;
pub(crate) use io_engine::IoEngineKind;
pub(crate) use metadata::Metadata;
pub(crate) use open_options::*;
@@ -1083,6 +1084,17 @@ impl Drop for VirtualFile {
}
}
impl OwnedAsyncWriter for VirtualFile {
#[inline(always)]
async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self,
buf: B,
) -> std::io::Result<(usize, B::Buf)> {
let (buf, res) = VirtualFile::write_all(self, buf).await;
res.map(move |v| (v, buf))
}
}
impl OpenFiles {
fn new(num_slots: usize) -> OpenFiles {
let mut slots = Box::new(Vec::with_capacity(num_slots));

View File

@@ -1,33 +1,36 @@
use crate::virtual_file::{owned_buffers_io::write::OwnedAsyncWriter, VirtualFile};
use crate::virtual_file::owned_buffers_io::write::OwnedAsyncWriter;
use tokio_epoll_uring::{BoundedBuf, IoBuf};
pub struct Writer {
dst: VirtualFile,
pub struct Writer<W> {
dst: W,
bytes_amount: u64,
}
impl Writer {
pub fn new(dst: VirtualFile) -> Self {
impl<W> Writer<W> {
pub fn new(dst: W) -> Self {
Self {
dst,
bytes_amount: 0,
}
}
/// Returns the wrapped `VirtualFile` object as well as the number
/// of bytes that were written to it through this object.
pub fn into_inner(self) -> (u64, VirtualFile) {
pub fn into_inner(self) -> (u64, W) {
(self.bytes_amount, self.dst)
}
}
impl OwnedAsyncWriter for Writer {
impl<W> OwnedAsyncWriter for Writer<W>
where
W: OwnedAsyncWriter,
{
#[inline(always)]
async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self,
buf: B,
) -> std::io::Result<(usize, B::Buf)> {
let (buf, res) = self.dst.write_all(buf).await;
let nwritten = res?;
let (nwritten, buf) = self.dst.write_all(buf).await?;
self.bytes_amount += u64::try_from(nwritten).unwrap();
Ok((nwritten, buf))
}

View File

@@ -10,14 +10,14 @@ pub trait OwnedAsyncWriter {
) -> std::io::Result<(usize, B::Buf)>;
}
/// A wrapper aorund an [`OwnedAsyncWriter`] that batches smaller writers
/// into `BUFFER_SIZE`-sized writes.
/// A wrapper aorund an [`OwnedAsyncWriter`] that uses a [`Buffer`] to batch
/// small writes into larger writes of size [`Buffer::cap`].
///
/// # Passthrough Of Large Writers
///
/// Buffered writes larger than the `BUFFER_SIZE` cause the internal
/// buffer to be flushed, even if it is not full yet. Then, the large
/// buffered write is passed through to the unerlying [`OwnedAsyncWriter`].
/// Calls to [`BufferedWriter::write_buffered`] that are larger than [`Buffer::cap`]
/// cause the internal buffer to be flushed prematurely so that the large
/// buffered write is passed through to the underlying [`OwnedAsyncWriter`].
///
/// This pass-through is generally beneficial for throughput, but if
/// the storage backend of the [`OwnedAsyncWriter`] is a shared resource,
@@ -25,24 +25,25 @@ pub trait OwnedAsyncWriter {
///
/// In such cases, a different implementation that always buffers in memory
/// may be preferable.
pub struct BufferedWriter<const BUFFER_SIZE: usize, W> {
pub struct BufferedWriter<B, W> {
writer: W,
// invariant: always remains Some(buf)
// with buf.capacity() == BUFFER_SIZE except
// - while IO is ongoing => goes back to Some() once the IO completed successfully
// - after an IO error => stays `None` forever
// In these exceptional cases, it's `None`.
buf: Option<BytesMut>,
/// invariant: always remains Some(buf) except
/// - while IO is ongoing => goes back to Some() once the IO completed successfully
/// - after an IO error => stays `None` forever
/// In these exceptional cases, it's `None`.
buf: Option<B>,
}
impl<const BUFFER_SIZE: usize, W> BufferedWriter<BUFFER_SIZE, W>
impl<B, Buf, W> BufferedWriter<B, W>
where
B: Buffer<IoBuf = Buf> + Send,
Buf: IoBuf + Send,
W: OwnedAsyncWriter,
{
pub fn new(writer: W) -> Self {
pub fn new(writer: W, buf: B) -> Self {
Self {
writer,
buf: Some(BytesMut::with_capacity(BUFFER_SIZE)),
buf: Some(buf),
}
}
@@ -53,61 +54,121 @@ where
Ok(writer)
}
pub async fn write_buffered<B: IoBuf>(&mut self, chunk: Slice<B>) -> std::io::Result<()>
#[inline(always)]
fn buf(&self) -> &B {
self.buf
.as_ref()
.expect("must not use after we returned an error")
}
pub async fn write_buffered<S: IoBuf>(&mut self, chunk: Slice<S>) -> std::io::Result<(usize, S)>
where
B: IoBuf + Send,
S: IoBuf + Send,
{
let chunk_len = chunk.len();
// avoid memcpy for the middle of the chunk
if chunk.len() >= BUFFER_SIZE {
if chunk.len() >= self.buf().cap() {
self.flush().await?;
// do a big write, bypassing `buf`
assert_eq!(
self.buf
.as_ref()
.expect("must not use after an error")
.len(),
.pending(),
0
);
let chunk_len = chunk.len();
let (nwritten, chunk) = self.writer.write_all(chunk).await?;
assert_eq!(nwritten, chunk_len);
drop(chunk);
return Ok(());
return Ok((nwritten, chunk));
}
// in-memory copy the < BUFFER_SIZED tail of the chunk
assert!(chunk.len() < BUFFER_SIZE);
let mut chunk = &chunk[..];
while !chunk.is_empty() {
assert!(chunk.len() < self.buf().cap());
let mut slice = &chunk[..];
while !slice.is_empty() {
let buf = self.buf.as_mut().expect("must not use after an error");
let need = BUFFER_SIZE - buf.len();
let have = chunk.len();
let need = buf.cap() - buf.pending();
let have = slice.len();
let n = std::cmp::min(need, have);
buf.extend_from_slice(&chunk[..n]);
chunk = &chunk[n..];
if buf.len() >= BUFFER_SIZE {
assert_eq!(buf.len(), BUFFER_SIZE);
buf.extend_from_slice(&slice[..n]);
slice = &slice[n..];
if buf.pending() >= buf.cap() {
assert_eq!(buf.pending(), buf.cap());
self.flush().await?;
}
}
assert!(chunk.is_empty(), "by now we should have drained the chunk");
Ok(())
assert!(slice.is_empty(), "by now we should have drained the chunk");
Ok((chunk_len, chunk.into_inner()))
}
async fn flush(&mut self) -> std::io::Result<()> {
let buf = self.buf.take().expect("must not use after an error");
if buf.is_empty() {
let buf_len = buf.pending();
if buf_len == 0 {
self.buf = Some(buf);
return std::io::Result::Ok(());
return Ok(());
}
let buf_len = buf.len();
let (nwritten, mut buf) = self.writer.write_all(buf).await?;
let (nwritten, io_buf) = self.writer.write_all(buf.flush()).await?;
assert_eq!(nwritten, buf_len);
buf.clear();
self.buf = Some(buf);
self.buf = Some(Buffer::reuse_after_flush(io_buf));
Ok(())
}
}
/// A [`Buffer`] is used by [`BufferedWriter`] to batch smaller writes into larger ones.
pub trait Buffer {
type IoBuf: IoBuf;
/// Capacity of the buffer. Must not change over the lifetime `self`.`
fn cap(&self) -> usize;
/// Add data to the buffer.
/// Panics if there is not enough room to accomodate `other`'s content, i.e.,
/// panics if `other.len() > self.cap() - self.pending()`.
fn extend_from_slice(&mut self, other: &[u8]);
/// Number of bytes in the buffer.
fn pending(&self) -> usize;
/// Turns `self` into a [`tokio_epoll_uring::Slice`] of the pending data
/// so we can use [`tokio_epoll_uring`] to write it to disk.
fn flush(self) -> Slice<Self::IoBuf>;
/// After the write to disk is done and we have gotten back the slice,
/// [`BufferedWriter`] uses this method to re-use the io buffer.
fn reuse_after_flush(iobuf: Self::IoBuf) -> Self;
}
impl Buffer for BytesMut {
type IoBuf = BytesMut;
#[inline(always)]
fn cap(&self) -> usize {
self.capacity()
}
fn extend_from_slice(&mut self, other: &[u8]) {
BytesMut::extend_from_slice(self, other)
}
#[inline(always)]
fn pending(&self) -> usize {
self.len()
}
fn flush(self) -> Slice<BytesMut> {
if self.is_empty() {
return self.slice_full();
}
let len = self.len();
self.slice(0..len)
}
fn reuse_after_flush(mut iobuf: BytesMut) -> Self {
iobuf.clear();
iobuf
}
}
impl OwnedAsyncWriter for Vec<u8> {
async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self,
@@ -125,6 +186,8 @@ impl OwnedAsyncWriter for Vec<u8> {
#[cfg(test)]
mod tests {
use bytes::BytesMut;
use super::*;
#[derive(Default)]
@@ -158,7 +221,7 @@ mod tests {
#[tokio::test]
async fn test_buffered_writes_only() -> std::io::Result<()> {
let recorder = RecorderWriter::default();
let mut writer = BufferedWriter::<2, _>::new(recorder);
let mut writer = BufferedWriter::new(recorder, BytesMut::with_capacity(2));
write!(writer, b"a");
write!(writer, b"b");
write!(writer, b"c");
@@ -175,7 +238,7 @@ mod tests {
#[tokio::test]
async fn test_passthrough_writes_only() -> std::io::Result<()> {
let recorder = RecorderWriter::default();
let mut writer = BufferedWriter::<2, _>::new(recorder);
let mut writer = BufferedWriter::new(recorder, BytesMut::with_capacity(2));
write!(writer, b"abc");
write!(writer, b"de");
write!(writer, b"");
@@ -191,7 +254,7 @@ mod tests {
#[tokio::test]
async fn test_passthrough_write_with_nonempty_buffer() -> std::io::Result<()> {
let recorder = RecorderWriter::default();
let mut writer = BufferedWriter::<2, _>::new(recorder);
let mut writer = BufferedWriter::new(recorder, BytesMut::with_capacity(2));
write!(writer, b"a");
write!(writer, b"bc");
write!(writer, b"d");

View File

@@ -1034,7 +1034,7 @@ impl WalIngest {
let nblocks = modification
.tline
.get_rel_size(src_rel, Version::Modified(modification), true, ctx)
.get_rel_size(src_rel, Version::Modified(modification), ctx)
.await?;
let dst_rel = RelTag {
spcnode: tablespace_id,
@@ -1068,13 +1068,7 @@ impl WalIngest {
let content = modification
.tline
.get_rel_page_at_lsn(
src_rel,
blknum,
Version::Modified(modification),
true,
ctx,
)
.get_rel_page_at_lsn(src_rel, blknum, Version::Modified(modification), ctx)
.await?;
modification.put_rel_page_image(dst_rel, blknum, content)?;
num_blocks_copied += 1;
@@ -1242,7 +1236,7 @@ impl WalIngest {
};
if modification
.tline
.get_rel_exists(rel, Version::Modified(modification), true, ctx)
.get_rel_exists(rel, Version::Modified(modification), ctx)
.await?
{
self.put_rel_drop(modification, rel, ctx).await?;
@@ -1541,7 +1535,7 @@ impl WalIngest {
nblocks
} else if !modification
.tline
.get_rel_exists(rel, Version::Modified(modification), true, ctx)
.get_rel_exists(rel, Version::Modified(modification), ctx)
.await?
{
// create it with 0 size initially, the logic below will extend it
@@ -1553,7 +1547,7 @@ impl WalIngest {
} else {
modification
.tline
.get_rel_size(rel, Version::Modified(modification), true, ctx)
.get_rel_size(rel, Version::Modified(modification), ctx)
.await?
};
@@ -1650,14 +1644,14 @@ async fn get_relsize(
) -> anyhow::Result<BlockNumber> {
let nblocks = if !modification
.tline
.get_rel_exists(rel, Version::Modified(modification), true, ctx)
.get_rel_exists(rel, Version::Modified(modification), ctx)
.await?
{
0
} else {
modification
.tline
.get_rel_size(rel, Version::Modified(modification), true, ctx)
.get_rel_size(rel, Version::Modified(modification), ctx)
.await?
};
Ok(nblocks)
@@ -1732,29 +1726,29 @@ mod tests {
// The relation was created at LSN 2, not visible at LSN 1 yet.
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), false, &ctx)
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
.await?,
false
);
assert!(tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
.await
.is_err());
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
.await?,
true
);
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
.await?,
1
);
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
.await?,
3
);
@@ -1762,46 +1756,46 @@ mod tests {
// Check page contents at each LSN
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x20)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x20)), &ctx)
.await?,
test_img("foo blk 0 at 2")
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x30)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x30)), &ctx)
.await?,
test_img("foo blk 0 at 3")
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x40)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x40)), &ctx)
.await?,
test_img("foo blk 0 at 3")
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x40)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x40)), &ctx)
.await?,
test_img("foo blk 1 at 4")
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x50)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x50)), &ctx)
.await?,
test_img("foo blk 0 at 3")
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x50)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x50)), &ctx)
.await?,
test_img("foo blk 1 at 4")
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), &ctx)
.await?,
test_img("foo blk 2 at 5")
);
@@ -1817,19 +1811,19 @@ mod tests {
// Check reported size and contents after truncation
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), &ctx)
.await?,
2
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x60)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x60)), &ctx)
.await?,
test_img("foo blk 0 at 3")
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x60)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x60)), &ctx)
.await?,
test_img("foo blk 1 at 4")
);
@@ -1837,13 +1831,13 @@ mod tests {
// should still see the truncated block with older LSN
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
.await?,
3
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), &ctx)
.await?,
test_img("foo blk 2 at 5")
);
@@ -1856,7 +1850,7 @@ mod tests {
m.commit(&ctx).await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x68)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x68)), &ctx)
.await?,
0
);
@@ -1869,19 +1863,19 @@ mod tests {
m.commit(&ctx).await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x70)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x70)), &ctx)
.await?,
2
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x70)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x70)), &ctx)
.await?,
ZERO_PAGE
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x70)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x70)), &ctx)
.await?,
test_img("foo blk 1")
);
@@ -1894,21 +1888,21 @@ mod tests {
m.commit(&ctx).await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
.await?,
1501
);
for blk in 2..1500 {
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, blk, Version::Lsn(Lsn(0x80)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, blk, Version::Lsn(Lsn(0x80)), &ctx)
.await?,
ZERO_PAGE
);
}
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 1500, Version::Lsn(Lsn(0x80)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 1500, Version::Lsn(Lsn(0x80)), &ctx)
.await?,
test_img("foo blk 1500")
);
@@ -1935,13 +1929,13 @@ mod tests {
// Check that rel exists and size is correct
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
.await?,
true
);
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
.await?,
1
);
@@ -1954,7 +1948,7 @@ mod tests {
// Check that rel is not visible anymore
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x30)), false, &ctx)
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x30)), &ctx)
.await?,
false
);
@@ -1972,13 +1966,13 @@ mod tests {
// Check that rel exists and size is correct
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x40)), false, &ctx)
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x40)), &ctx)
.await?,
true
);
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x40)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x40)), &ctx)
.await?,
1
);
@@ -2011,24 +2005,24 @@ mod tests {
// The relation was created at LSN 20, not visible at LSN 1 yet.
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), false, &ctx)
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
.await?,
false
);
assert!(tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
.await
.is_err());
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
.await?,
true
);
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
.await?,
relsize
);
@@ -2039,7 +2033,7 @@ mod tests {
let data = format!("foo blk {} at {}", blkno, lsn);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(lsn), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(lsn), &ctx)
.await?,
test_img(&data)
);
@@ -2056,7 +2050,7 @@ mod tests {
// Check reported size and contents after truncation
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), &ctx)
.await?,
1
);
@@ -2066,7 +2060,7 @@ mod tests {
let data = format!("foo blk {} at {}", blkno, lsn);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x60)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x60)), &ctx)
.await?,
test_img(&data)
);
@@ -2075,7 +2069,7 @@ mod tests {
// should still see all blocks with older LSN
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
.await?,
relsize
);
@@ -2084,7 +2078,7 @@ mod tests {
let data = format!("foo blk {} at {}", blkno, lsn);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x50)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x50)), &ctx)
.await?,
test_img(&data)
);
@@ -2104,13 +2098,13 @@ mod tests {
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x80)), false, &ctx)
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
.await?,
true
);
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
.await?,
relsize
);
@@ -2120,7 +2114,7 @@ mod tests {
let data = format!("foo blk {} at {}", blkno, lsn);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x80)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x80)), &ctx)
.await?,
test_img(&data)
);
@@ -2154,7 +2148,7 @@ mod tests {
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
.await?,
RELSEG_SIZE + 1
);
@@ -2168,7 +2162,7 @@ mod tests {
m.commit(&ctx).await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
.await?,
RELSEG_SIZE
);
@@ -2183,7 +2177,7 @@ mod tests {
m.commit(&ctx).await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
.await?,
RELSEG_SIZE - 1
);
@@ -2201,7 +2195,7 @@ mod tests {
m.commit(&ctx).await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
.await?,
size as BlockNumber
);

View File

@@ -49,6 +49,8 @@ char *neon_auth_token;
int readahead_buffer_size = 128;
int flush_every_n_requests = 8;
int neon_protocol_version = 1;
static int n_reconnect_attempts = 0;
static int max_reconnect_attempts = 60;
static int stripe_size;
@@ -379,7 +381,17 @@ pageserver_connect(shardno_t shard_no, int elevel)
pfree(msg);
return false;
}
query = psprintf("pagestream %s %s", neon_tenant, neon_timeline);
switch (neon_protocol_version)
{
case 2:
query = psprintf("pagestream_v2 %s %s", neon_tenant, neon_timeline);
break;
case 1:
query = psprintf("pagestream %s %s", neon_tenant, neon_timeline);
break;
default:
elog(ERROR, "unexpected neon_protocol_version %d", neon_protocol_version);
}
ret = PQsendQuery(conn, query);
pfree(query);
if (ret != 1)
@@ -440,7 +452,7 @@ pageserver_connect(shardno_t shard_no, int elevel)
return false;
}
neon_shard_log(shard_no, LOG, "libpagestore: connected to '%s'", connstr);
neon_shard_log(shard_no, LOG, "libpagestore: connected to '%s' with protocol version %d", connstr, neon_protocol_version);
page_servers[shard_no].conn = conn;
page_servers[shard_no].wes = wes;
@@ -844,6 +856,16 @@ pg_init_libpagestore(void)
PGC_USERSET,
0, /* no flags required */
NULL, (GucIntAssignHook) &readahead_buffer_resize, NULL);
DefineCustomIntVariable("neon.protocol_version",
"Version of compute<->page server protocol",
NULL,
&neon_protocol_version,
1, /* default to old protocol for now */
1, /* min */
2, /* max */
PGC_SU_BACKEND,
0, /* no flags required */
NULL, NULL, NULL);
relsize_hash_init();

View File

@@ -69,18 +69,33 @@ typedef enum {
SLRU_MULTIXACT_OFFSETS
} SlruKind;
/*
* supertype of all the Neon*Request structs below
/*--
* supertype of all the Neon*Request structs below.
*
* If 'latest' is true, we are requesting the latest page version, and 'lsn'
* is just a hint to the server that we know there are no versions of the page
* (or relation size, for exists/nblocks requests) later than the 'lsn'.
* All requests contain two LSNs:
*
* lsn: request page (or relation size, etc) at this LSN
* not_modified_since: Hint that the page hasn't been modified between
* this LSN and the request LSN (`lsn`).
*
* To request the latest version of a page, you can use MAX_LSN as the request
* LSN.
*
* If you don't know any better, you can always set 'not_modified_since' equal
* to 'lsn', but providing a lower value can speed up processing the request
* in the pageserver, as it doesn't need to wait for the WAL to arrive, and it
* can skip traversing through recent layers which we know to not contain any
* versions for the requested page.
*
* These structs describe the V2 of these requests. The old V1 protocol contained
* just one LSN and a boolean 'latest' flag. If the neon_protocol_version GUC is
* set to 1, we will convert these to the V1 requests before sending.
*/
typedef struct
{
NeonMessageTag tag;
bool latest; /* if true, request latest page version */
XLogRecPtr lsn; /* request page version @ this LSN */
XLogRecPtr lsn;
XLogRecPtr not_modified_since;
} NeonRequest;
typedef struct
@@ -193,6 +208,7 @@ extern int readahead_buffer_size;
extern char *neon_timeline;
extern char *neon_tenant;
extern int32 max_cluster_size;
extern int neon_protocol_version;
extern shardno_t get_shard_number(BufferTag* tag);
@@ -225,14 +241,14 @@ extern bool neon_prefetch(SMgrRelation reln, ForkNumber forknum,
extern void neon_read(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
char *buffer);
extern PGDLLEXPORT void neon_read_at_lsn(NRelFileInfo rnode, ForkNumber forkNum, BlockNumber blkno,
XLogRecPtr request_lsn, bool request_latest, char *buffer);
XLogRecPtr request_lsn, XLogRecPtr not_modified_since, char *buffer);
extern void neon_write(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, char *buffer, bool skipFsync);
#else
extern void neon_read(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
void *buffer);
extern PGDLLEXPORT void neon_read_at_lsn(NRelFileInfo rnode, ForkNumber forkNum, BlockNumber blkno,
XLogRecPtr request_lsn, bool request_latest, void *buffer);
XLogRecPtr request_lsn, XLogRecPtr not_modified_since, void *buffer);
extern void neon_write(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, const void *buffer, bool skipFsync);
#endif

View File

@@ -168,8 +168,8 @@ typedef enum PrefetchStatus
typedef struct PrefetchRequest
{
BufferTag buftag; /* must be first entry in the struct */
XLogRecPtr effective_request_lsn;
XLogRecPtr actual_request_lsn;
XLogRecPtr request_lsn;
XLogRecPtr not_modified_since;
NeonResponse *response; /* may be null */
PrefetchStatus status;
shardno_t shard_no;
@@ -269,19 +269,19 @@ static PrefetchState *MyPState;
) \
)
static XLogRecPtr prefetch_lsn = 0;
static bool compact_prefetch_buffers(void);
static void consume_prefetch_responses(void);
static uint64 prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_lsn);
static uint64 prefetch_register_buffer(BufferTag tag, XLogRecPtr *force_request_lsn, XLogRecPtr *force_not_modified_since);
static bool prefetch_read(PrefetchRequest *slot);
static void prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force_lsn);
static void prefetch_do_request(PrefetchRequest *slot, XLogRecPtr *force_request_lsn, XLogRecPtr *force_not_modified_since);
static bool prefetch_wait_for(uint64 ring_index);
static void prefetch_cleanup_trailing_unused(void);
static inline void prefetch_set_unused(uint64 ring_index);
static XLogRecPtr neon_get_request_lsn(bool *latest, NRelFileInfo rinfo,
ForkNumber forknum, BlockNumber blkno);
static void neon_get_request_lsn(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
XLogRecPtr *request_lsn, XLogRecPtr *not_modified_since);
static bool neon_prefetch_response_usable(XLogRecPtr request_lsn, XLogRecPtr not_modified_since,
PrefetchRequest *slot);
static bool
compact_prefetch_buffers(void)
@@ -338,8 +338,8 @@ compact_prefetch_buffers(void)
target_slot->shard_no = source_slot->shard_no;
target_slot->status = source_slot->status;
target_slot->response = source_slot->response;
target_slot->effective_request_lsn = source_slot->effective_request_lsn;
target_slot->actual_request_lsn = source_slot->actual_request_lsn;
target_slot->request_lsn = source_slot->request_lsn;
target_slot->not_modified_since = source_slot->not_modified_since;
target_slot->my_ring_index = empty_ring_index;
prfh_delete(MyPState->prf_hash, source_slot);
@@ -358,7 +358,8 @@ compact_prefetch_buffers(void)
};
source_slot->response = NULL;
source_slot->my_ring_index = 0;
source_slot->effective_request_lsn = 0;
source_slot->request_lsn = InvalidXLogRecPtr;
source_slot->not_modified_since = InvalidXLogRecPtr;
/* update bookkeeping */
n_moved++;
@@ -683,56 +684,39 @@ prefetch_set_unused(uint64 ring_index)
compact_prefetch_buffers();
}
/*
* Send one prefetch request to the pageserver. To wait for the response, call
* prefetch_wait_for().
*/
static void
prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force_lsn)
prefetch_do_request(PrefetchRequest *slot, XLogRecPtr *force_request_lsn, XLogRecPtr *force_not_modified_since)
{
bool found;
NeonGetPageRequest request = {
.req.tag = T_NeonGetPageRequest,
.req.latest = false,
.req.lsn = 0,
/* lsn and not_modified_since are filled in below */
.rinfo = BufTagGetNRelFileInfo(slot->buftag),
.forknum = slot->buftag.forkNum,
.blkno = slot->buftag.blockNum,
};
if (force_lsn && force_latest)
Assert(((force_request_lsn != NULL) == (force_not_modified_since != NULL)));
if (force_request_lsn)
{
request.req.lsn = *force_lsn;
request.req.latest = *force_latest;
slot->actual_request_lsn = slot->effective_request_lsn = *force_lsn;
request.req.lsn = *force_request_lsn;
request.req.not_modified_since = *force_not_modified_since;
}
else
{
XLogRecPtr lsn = neon_get_request_lsn(
&request.req.latest,
BufTagGetNRelFileInfo(slot->buftag),
slot->buftag.forkNum,
slot->buftag.blockNum
);
/*
* Note: effective_request_lsn is potentially higher than the
* requested LSN, but still correct:
*
* We know there are no changes between the actual requested LSN and
* the value of effective_request_lsn: If there were, the page would
* have been in cache and evicted between those LSN values, which then
* would have had to result in a larger request LSN for this page.
*
* It is possible that a concurrent backend loads the page, modifies
* it and then evicts it again, but the LSN of that eviction cannot be
* smaller than the current WAL insert/redo pointer, which is already
* larger than this prefetch_lsn. So in any case, that would
* invalidate this cache.
*
* The best LSN to use for effective_request_lsn would be
* XLogCtl->Insert.RedoRecPtr, but that's expensive to access.
*/
slot->actual_request_lsn = request.req.lsn = lsn;
prefetch_lsn = Max(prefetch_lsn, lsn);
slot->effective_request_lsn = prefetch_lsn;
neon_get_request_lsn(BufTagGetNRelFileInfo(slot->buftag),
slot->buftag.forkNum,
slot->buftag.blockNum,
&request.req.lsn,
&request.req.not_modified_since);
}
slot->request_lsn = request.req.lsn;
slot->not_modified_since = request.req.not_modified_since;
Assert(slot->response == NULL);
Assert(slot->my_ring_index == MyPState->ring_unused);
@@ -749,7 +733,6 @@ prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force
/* update slot state */
slot->status = PRFS_REQUESTED;
prfh_insert(MyPState->prf_hash, slot, &found);
Assert(!found);
}
@@ -759,22 +742,25 @@ prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force
*
* Register that we may want the contents of BufferTag in the near future.
*
* If force_latest and force_lsn are not NULL, those values are sent to the
* pageserver. If they are NULL, we utilize the lastWrittenLsn -infrastructure
* to fill in these values manually.
* If force_request_lsn and force_not_modified_since are not NULL, those
* values are sent to the pageserver. If they are NULL, we utilize the
* lastWrittenLsn -infrastructure to fill them in.
*
* NOTE: this function may indirectly update MyPState->pfs_hash; which
* invalidates any active pointers into the hash table.
*/
static uint64
prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_lsn)
prefetch_register_buffer(BufferTag tag, XLogRecPtr *force_request_lsn,
XLogRecPtr *force_not_modified_since)
{
uint64 ring_index;
PrefetchRequest req;
PrefetchRequest *slot;
PrfHashEntry *entry;
Assert(((force_request_lsn != NULL) == (force_not_modified_since != NULL)));
/* use an intermediate PrefetchRequest struct to ensure correct alignment */
req.buftag = tag;
Retry:
@@ -792,40 +778,19 @@ Retry:
Assert(BUFFERTAGS_EQUAL(slot->buftag, tag));
/*
* If we want a specific lsn, we do not accept requests that were made
* with a potentially different LSN.
* If the caller specified a request LSN to use, only accept prefetch
* responses that satisfy that request.
*/
if (force_latest && force_lsn)
if (force_request_lsn)
{
/*
* if we want the latest version, any effective_request_lsn <
* request lsn is OK
*/
if (*force_latest)
if (!neon_prefetch_response_usable(*force_request_lsn,
*force_not_modified_since, slot))
{
if (*force_lsn > slot->effective_request_lsn)
{
if (!prefetch_wait_for(ring_index))
goto Retry;
prefetch_set_unused(ring_index);
entry = NULL;
}
}
/*
* if we don't want the latest version, only accept requests with
* the exact same LSN
*/
else
{
if (*force_lsn != slot->effective_request_lsn)
{
if (!prefetch_wait_for(ring_index))
goto Retry;
prefetch_set_unused(ring_index);
entry = NULL;
}
/* Wait for the old request to finish and discard it */
if (!prefetch_wait_for(ring_index))
goto Retry;
prefetch_set_unused(ring_index);
entry = NULL;
}
}
@@ -921,7 +886,7 @@ Retry:
slot->shard_no = get_shard_number(&tag);
slot->my_ring_index = ring_index;
prefetch_do_request(slot, force_latest, force_lsn);
prefetch_do_request(slot, force_request_lsn, force_not_modified_since);
Assert(slot->status == PRFS_REQUESTED);
Assert(MyPState->ring_last <= ring_index &&
ring_index < MyPState->ring_unused);
@@ -950,7 +915,7 @@ page_server_request(void const *req)
BufferTag tag = {0};
shardno_t shard_no;
switch (((NeonRequest *) req)->tag)
switch (messageTag(req))
{
case T_NeonExistsRequest:
CopyNRelFileInfoToBufTag(tag, ((NeonExistsRequest *) req)->rinfo);
@@ -966,11 +931,10 @@ page_server_request(void const *req)
tag.blockNum = ((NeonGetPageRequest *) req)->blkno;
break;
default:
neon_log(ERROR, "Unexpected request tag: %d", ((NeonRequest *) req)->tag);
neon_log(ERROR, "Unexpected request tag: %d", messageTag(req));
}
shard_no = get_shard_number(&tag);
/*
* Current sharding model assumes that all metadata is present only at shard 0.
* We still need to call get_shard_no() to check if shard map is up-to-date.
@@ -997,8 +961,52 @@ nm_pack_request(NeonRequest *msg)
StringInfoData s;
initStringInfo(&s);
pq_sendbyte(&s, msg->tag);
if (neon_protocol_version >= 2)
{
pq_sendbyte(&s, msg->tag);
pq_sendint64(&s, msg->lsn);
pq_sendint64(&s, msg->not_modified_since);
}
else
{
bool latest;
XLogRecPtr lsn;
/*
* In primary, we always request the latest page version.
*/
if (!RecoveryInProgress())
{
latest = true;
lsn = msg->not_modified_since;
}
else
{
/*
* In the protocol V1, we cannot represent that we want to read
* page at LSN X, and we know that it hasn't been modified since
* Y. We can either use 'not_modified_lsn' as the request LSN, and
* risk getting an error if that LSN is too old and has already
* fallen out of the pageserver's GC horizon, or we can send
* 'request_lsn', causing the pageserver to possibly wait for the
* recent WAL to arrive unnecessarily. Or something in between. We
* choose to use the old LSN and risk GC errors, because that's
* what we've done historically.
*/
latest = false;
lsn = msg->not_modified_since;
}
pq_sendbyte(&s, msg->tag);
pq_sendbyte(&s, latest);
pq_sendint64(&s, lsn);
}
/*
* The rest of the request messages are the same between protocol V1 and
* V2
*/
switch (messageTag(msg))
{
/* pagestore_client -> pagestore */
@@ -1006,8 +1014,6 @@ nm_pack_request(NeonRequest *msg)
{
NeonExistsRequest *msg_req = (NeonExistsRequest *) msg;
pq_sendbyte(&s, msg_req->req.latest);
pq_sendint64(&s, msg_req->req.lsn);
pq_sendint32(&s, NInfoGetSpcOid(msg_req->rinfo));
pq_sendint32(&s, NInfoGetDbOid(msg_req->rinfo));
pq_sendint32(&s, NInfoGetRelNumber(msg_req->rinfo));
@@ -1019,8 +1025,6 @@ nm_pack_request(NeonRequest *msg)
{
NeonNblocksRequest *msg_req = (NeonNblocksRequest *) msg;
pq_sendbyte(&s, msg_req->req.latest);
pq_sendint64(&s, msg_req->req.lsn);
pq_sendint32(&s, NInfoGetSpcOid(msg_req->rinfo));
pq_sendint32(&s, NInfoGetDbOid(msg_req->rinfo));
pq_sendint32(&s, NInfoGetRelNumber(msg_req->rinfo));
@@ -1032,8 +1036,6 @@ nm_pack_request(NeonRequest *msg)
{
NeonDbSizeRequest *msg_req = (NeonDbSizeRequest *) msg;
pq_sendbyte(&s, msg_req->req.latest);
pq_sendint64(&s, msg_req->req.lsn);
pq_sendint32(&s, msg_req->dbNode);
break;
@@ -1042,8 +1044,6 @@ nm_pack_request(NeonRequest *msg)
{
NeonGetPageRequest *msg_req = (NeonGetPageRequest *) msg;
pq_sendbyte(&s, msg_req->req.latest);
pq_sendint64(&s, msg_req->req.lsn);
pq_sendint32(&s, NInfoGetSpcOid(msg_req->rinfo));
pq_sendint32(&s, NInfoGetDbOid(msg_req->rinfo));
pq_sendint32(&s, NInfoGetRelNumber(msg_req->rinfo));
@@ -1057,8 +1057,6 @@ nm_pack_request(NeonRequest *msg)
{
NeonGetSlruSegmentRequest *msg_req = (NeonGetSlruSegmentRequest *) msg;
pq_sendbyte(&s, msg_req->req.latest);
pq_sendint64(&s, msg_req->req.lsn);
pq_sendbyte(&s, msg_req->kind);
pq_sendint32(&s, msg_req->segno);
@@ -1209,7 +1207,7 @@ nm_to_string(NeonMessage *msg)
appendStringInfo(&s, ", \"rinfo\": \"%u/%u/%u\"", RelFileInfoFmt(msg_req->rinfo));
appendStringInfo(&s, ", \"forknum\": %d", msg_req->forknum);
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest);
appendStringInfo(&s, ", \"not_modified_since\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.not_modified_since));
appendStringInfoChar(&s, '}');
break;
}
@@ -1222,7 +1220,7 @@ nm_to_string(NeonMessage *msg)
appendStringInfo(&s, ", \"rinfo\": \"%u/%u/%u\"", RelFileInfoFmt(msg_req->rinfo));
appendStringInfo(&s, ", \"forknum\": %d", msg_req->forknum);
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest);
appendStringInfo(&s, ", \"not_modified_since\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.not_modified_since));
appendStringInfoChar(&s, '}');
break;
}
@@ -1236,7 +1234,7 @@ nm_to_string(NeonMessage *msg)
appendStringInfo(&s, ", \"forknum\": %d", msg_req->forknum);
appendStringInfo(&s, ", \"blkno\": %u", msg_req->blkno);
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest);
appendStringInfo(&s, ", \"not_modified_since\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.not_modified_since));
appendStringInfoChar(&s, '}');
break;
}
@@ -1247,7 +1245,7 @@ nm_to_string(NeonMessage *msg)
appendStringInfoString(&s, "{\"type\": \"NeonDbSizeRequest\"");
appendStringInfo(&s, ", \"dbnode\": \"%u\"", msg_req->dbNode);
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest);
appendStringInfo(&s, ", \"not_modified_since\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.not_modified_since));
appendStringInfoChar(&s, '}');
break;
}
@@ -1259,7 +1257,7 @@ nm_to_string(NeonMessage *msg)
appendStringInfo(&s, ", \"kind\": %u", msg_req->kind);
appendStringInfo(&s, ", \"segno\": %u", msg_req->segno);
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest);
appendStringInfo(&s, ", \"not_modified_since\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.not_modified_since));
appendStringInfoChar(&s, '}');
break;
}
@@ -1531,44 +1529,38 @@ nm_adjust_lsn(XLogRecPtr lsn)
/*
* Return LSN for requesting pages and number of blocks from page server
*/
static XLogRecPtr
neon_get_request_lsn(bool *latest, NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno)
static void
neon_get_request_lsn(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
XLogRecPtr *request_lsn, XLogRecPtr *not_modified_since)
{
XLogRecPtr lsn;
XLogRecPtr last_written_lsn;
last_written_lsn = GetLastWrittenLSN(rinfo, forknum, blkno);
last_written_lsn = nm_adjust_lsn(last_written_lsn);
Assert(last_written_lsn != InvalidXLogRecPtr);
if (RecoveryInProgress())
{
/*
* We don't know if WAL has been generated but not yet replayed, so
* we're conservative in our estimates about latest pages.
*/
*latest = false;
/* Request the page at the last replayed LSN. */
*request_lsn = GetXLogReplayRecPtr(NULL);
*not_modified_since = last_written_lsn;
Assert(last_written_lsn <= *request_lsn);
/*
* Get the last written LSN of this page.
*/
lsn = GetLastWrittenLSN(rinfo, forknum, blkno);
lsn = nm_adjust_lsn(lsn);
neon_log(DEBUG1, "neon_get_request_lsn GetXLogReplayRecPtr %X/%X request lsn 0 ",
(uint32) ((lsn) >> 32), (uint32) (lsn));
neon_log(DEBUG1, "neon_get_request_lsn request lsn %X/%X, not_modified_since %X/%X",
LSN_FORMAT_ARGS(*request_lsn), LSN_FORMAT_ARGS(*not_modified_since));
}
else
{
XLogRecPtr flushlsn;
/*
* Use the latest LSN that was evicted from the buffer cache. Any
* pages modified by later WAL records must still in the buffer cache,
* so our request cannot concern those.
* Use the latest LSN that was evicted from the buffer cache as the
* 'not_modified_since' hint. Any pages modified by later WAL records
* must still in the buffer cache, so our request cannot concern
* those.
*/
*latest = true;
lsn = GetLastWrittenLSN(rinfo, forknum, blkno);
Assert(lsn != InvalidXLogRecPtr);
neon_log(DEBUG1, "neon_get_request_lsn GetLastWrittenLSN lsn %X/%X ",
(uint32) ((lsn) >> 32), (uint32) (lsn));
lsn = nm_adjust_lsn(lsn);
LSN_FORMAT_ARGS(last_written_lsn));
/*
* Is it possible that the last-written LSN is ahead of last flush
@@ -1583,16 +1575,109 @@ neon_get_request_lsn(bool *latest, NRelFileInfo rinfo, ForkNumber forknum, Block
#else
flushlsn = GetFlushRecPtr();
#endif
if (lsn > flushlsn)
if (last_written_lsn > flushlsn)
{
neon_log(DEBUG5, "last-written LSN %X/%X is ahead of last flushed LSN %X/%X",
(uint32) (lsn >> 32), (uint32) lsn,
(uint32) (flushlsn >> 32), (uint32) flushlsn);
XLogFlush(lsn);
LSN_FORMAT_ARGS(last_written_lsn),
LSN_FORMAT_ARGS(flushlsn));
XLogFlush(last_written_lsn);
flushlsn = last_written_lsn;
}
/*
* Request the latest version of the page. The most up-to-date request
* LSN we could use would be the current insert LSN, but to avoid the
* overhead of looking it up, use 'flushlsn' instead. This relies on
* the assumption that if the page was modified since the last WAL
* flush, it should still be in the buffer cache, and we wouldn't be
* requesting it.
*/
*request_lsn = flushlsn;
*not_modified_since = last_written_lsn;
}
}
/*
* neon_prefetch_response_usable -- Can a new request be satisfied by old one?
*
* This is used to check if the response to a prefetch request can be used to
* satisfy a page read now.
*/
static bool
neon_prefetch_response_usable(XLogRecPtr request_lsn, XLogRecPtr not_modified_since,
PrefetchRequest *slot)
{
/* sanity check the LSN's on the old and the new request */
Assert(request_lsn >= not_modified_since);
Assert(slot->request_lsn >= slot->not_modified_since);
Assert(slot->status != PRFS_UNUSED);
/*
* The new request's LSN should never be older than the old one. This
* could be an Assert, except that for testing purposes, we do provide an
* interface in neon_test_utils to fetch pages at arbitary LSNs, which
* violates this.
*
* Similarly, the not_modified_since value calculated for a page should
* never move backwards. This assumption is a bit fragile; if we updated
* the last-written cache when we read in a page, for example, then it
* might. But as the code stands, it should not.
*
* (If two backends issue a request at the same time, they might race and
* calculate LSNs "out of order" with each other, but the prefetch queue
* is backend-private at the moment.)
*/
if (request_lsn < slot->request_lsn || not_modified_since < slot->not_modified_since)
{
ereport(LOG,
(errcode(ERRCODE_IO_ERROR),
errmsg(NEON_TAG "request with unexpected LSN after prefetch"),
errdetail("Request %X/%X not_modified_since %X/%X, prefetch %X/%X not_modified_since %X/%X)",
LSN_FORMAT_ARGS(request_lsn), LSN_FORMAT_ARGS(not_modified_since),
LSN_FORMAT_ARGS(slot->request_lsn), LSN_FORMAT_ARGS(slot->not_modified_since))));
return false;
}
return lsn;
/*---
* Each request to the pageserver carries two LSN values:
* `not_modified_since` and `request_lsn`. The (not_modified_since,
* request_lsn] range of each request is effectively a claim that the page
* has not been modified between those LSNs. If the range of the old
* request in the queue overlaps with the new request, we know that the
* page hasn't been modified in the union of the ranges. We can use the
* response to old request to satisfy the new request in that case. For
* example:
*
* 100 500
* Old request: +--------+
*
* 400 800
* New request: +--------+
*
* The old request claims that the page was not modified between LSNs 100
* and 500, and the second claims that it was not modified between 400 and
* 800. Together they mean that the page was not modified between 100 and
* 800. Therefore the response to the old request is also valid for the
* new request.
*
* This logic also holds at the boundary case that the old request's LSN
* matches the new request's not_modified_since LSN exactly:
*
* 100 500
* Old request: +--------+
*
* 500 900
* New request: +--------+
*
* The response to the old request is the page as it was at LSN 500, and
* the page hasn't been changed in the range (500, 900], therefore the
* response is valid also for the new request.
*/
/* this follows from the checks above */
Assert(request_lsn >= slot->not_modified_since);
return not_modified_since <= slot->request_lsn;
}
/*
@@ -1604,8 +1689,8 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum)
bool exists;
NeonResponse *resp;
BlockNumber n_blocks;
bool latest;
XLogRecPtr request_lsn;
XLogRecPtr not_modified_since;
switch (reln->smgr_relpersistence)
{
@@ -1660,12 +1745,13 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum)
return false;
}
request_lsn = neon_get_request_lsn(&latest, InfoFromSMgrRel(reln), forkNum, REL_METADATA_PSEUDO_BLOCKNO);
neon_get_request_lsn(InfoFromSMgrRel(reln), forkNum, REL_METADATA_PSEUDO_BLOCKNO,
&request_lsn, &not_modified_since);
{
NeonExistsRequest request = {
.req.tag = T_NeonExistsRequest,
.req.latest = latest,
.req.lsn = request_lsn,
.req.not_modified_since = not_modified_since,
.rinfo = InfoFromSMgrRel(reln),
.forknum = forkNum};
@@ -2102,10 +2188,10 @@ neon_writeback(SMgrRelation reln, ForkNumber forknum,
void
#if PG_MAJORVERSION_NUM < 16
neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
XLogRecPtr request_lsn, bool request_latest, char *buffer)
XLogRecPtr request_lsn, XLogRecPtr not_modified_since, char *buffer)
#else
neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
XLogRecPtr request_lsn, bool request_latest, void *buffer)
XLogRecPtr request_lsn, XLogRecPtr not_modified_since, void *buffer)
#endif
{
NeonResponse *resp;
@@ -2148,15 +2234,16 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
if (entry != NULL)
{
slot = entry->slot;
if (slot->effective_request_lsn >= request_lsn)
if (neon_prefetch_response_usable(request_lsn, not_modified_since, slot))
{
ring_index = slot->my_ring_index;
pgBufferUsage.prefetch.hits += 1;
}
else /* the current prefetch LSN is not large
* enough, so drop the prefetch */
else
{
/*
* Cannot use this prefetch, discard it
*
* We can't drop cache for not-yet-received requested items. It is
* unlikely this happens, but it can happen if prefetch distance
* is large enough and a backend didn't consume all prefetch
@@ -2181,8 +2268,8 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
{
pgBufferUsage.prefetch.misses += 1;
ring_index = prefetch_register_buffer(buftag, &request_latest,
&request_lsn);
ring_index = prefetch_register_buffer(buftag, &request_lsn,
&not_modified_since);
slot = GetPrfSlot(ring_index);
}
else
@@ -2246,8 +2333,8 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, char *buffer
neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer)
#endif
{
bool latest;
XLogRecPtr request_lsn;
XLogRecPtr not_modified_since;
switch (reln->smgr_relpersistence)
{
@@ -2272,8 +2359,9 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
return;
}
request_lsn = neon_get_request_lsn(&latest, InfoFromSMgrRel(reln), forkNum, blkno);
neon_read_at_lsn(InfoFromSMgrRel(reln), forkNum, blkno, request_lsn, latest, buffer);
neon_get_request_lsn(InfoFromSMgrRel(reln), forkNum, blkno,
&request_lsn, &not_modified_since);
neon_read_at_lsn(InfoFromSMgrRel(reln), forkNum, blkno, request_lsn, not_modified_since, buffer);
#ifdef DEBUG_COMPARE_LOCAL
if (forkNum == MAIN_FORKNUM && IS_LOCAL_REL(reln))
@@ -2442,8 +2530,8 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
{
NeonResponse *resp;
BlockNumber n_blocks;
bool latest;
XLogRecPtr request_lsn;
XLogRecPtr not_modified_since;
switch (reln->smgr_relpersistence)
{
@@ -2470,12 +2558,13 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
return n_blocks;
}
request_lsn = neon_get_request_lsn(&latest, InfoFromSMgrRel(reln), forknum, REL_METADATA_PSEUDO_BLOCKNO);
neon_get_request_lsn(InfoFromSMgrRel(reln), forknum, REL_METADATA_PSEUDO_BLOCKNO,
&request_lsn, &not_modified_since);
{
NeonNblocksRequest request = {
.req.tag = T_NeonNblocksRequest,
.req.latest = latest,
.req.lsn = request_lsn,
.req.not_modified_since = not_modified_since,
.rinfo = InfoFromSMgrRel(reln),
.forknum = forknum,
};
@@ -2523,16 +2612,17 @@ neon_dbsize(Oid dbNode)
{
NeonResponse *resp;
int64 db_size;
XLogRecPtr request_lsn;
bool latest;
XLogRecPtr request_lsn,
not_modified_since;
NRelFileInfo dummy_node = {0};
request_lsn = neon_get_request_lsn(&latest, dummy_node, MAIN_FORKNUM, REL_METADATA_PSEUDO_BLOCKNO);
neon_get_request_lsn(dummy_node, MAIN_FORKNUM, REL_METADATA_PSEUDO_BLOCKNO,
&request_lsn, &not_modified_since);
{
NeonDbSizeRequest request = {
.req.tag = T_NeonDbSizeRequest,
.req.latest = latest,
.req.lsn = request_lsn,
.req.not_modified_since = not_modified_since,
.dbNode = dbNode,
};
@@ -2605,7 +2695,6 @@ neon_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks)
* the most recently inserted WAL record's LSN.
*/
lsn = GetXLogInsertRecPtr();
lsn = nm_adjust_lsn(lsn);
/*
@@ -2805,14 +2894,33 @@ neon_end_unlogged_build(SMgrRelation reln)
static int
neon_read_slru_segment(SMgrRelation reln, const char* path, int segno, void* buffer)
{
XLogRecPtr request_lsn;
/*
* GetRedoStartLsn() returns LSN of basebackup.
* We need to download SLRU segments only once after node startup,
* then SLRUs are maintained locally.
*/
request_lsn = GetRedoStartLsn();
XLogRecPtr request_lsn,
not_modified_since;
if (RecoveryInProgress())
{
request_lsn = GetXLogReplayRecPtr(NULL);
if (request_lsn == InvalidXLogRecPtr)
{
/*
* This happens in neon startup, we start up without replaying any
* records.
*/
request_lsn = GetRedoStartLsn();
}
}
else
request_lsn = GetXLogInsertRecPtr();
request_lsn = nm_adjust_lsn(request_lsn);
/*
* GetRedoStartLsn() returns LSN of basebackup. We know that the SLRU
* segment has not changed since the basebackup, because in order to
* modify it, we would have had to download it already. And once
* downloaded, we never evict SLRU segments from local disk.
*/
not_modified_since = GetRedoStartLsn();
SlruKind kind;
if (STRPREFIX(path, "pg_xact"))
@@ -2827,8 +2935,8 @@ neon_read_slru_segment(SMgrRelation reln, const char* path, int segno, void* buf
NeonResponse *resp;
NeonGetSlruSegmentRequest request = {
.req.tag = T_NeonGetSlruSegmentRequest,
.req.latest = false,
.req.lsn = request_lsn,
.req.not_modified_since = not_modified_since,
.kind = kind,
.segno = segno
@@ -2956,6 +3064,9 @@ neon_extend_rel_size(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
{
BlockNumber relsize;
/* This is only used in WAL replay */
Assert(RecoveryInProgress());
/* Extend the relation if we know its size */
if (get_cached_relsize(rinfo, forknum, &relsize))
{
@@ -2974,14 +3085,13 @@ neon_extend_rel_size(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
* This length is later reused when we open the smgr to read the
* block, which is fine and expected.
*/
NeonResponse *response;
NeonNblocksResponse *nbresponse;
NeonNblocksRequest request = {
.req = (NeonRequest) {
.lsn = end_recptr,
.latest = false,
.tag = T_NeonNblocksRequest,
.lsn = end_recptr,
.not_modified_since = end_recptr,
},
.rinfo = rinfo,
.forknum = forknum,

View File

@@ -7,7 +7,7 @@ OBJS = \
neontest.o
EXTENSION = neon_test_utils
DATA = neon_test_utils--1.0.sql
DATA = neon_test_utils--1.1.sql
PGFILEDESC = "neon_test_utils - helpers for neon testing and debugging"
PG_CONFIG = pg_config

View File

@@ -31,12 +31,12 @@ AS 'MODULE_PATHNAME', 'clear_buffer_cache'
LANGUAGE C STRICT
PARALLEL UNSAFE;
CREATE FUNCTION get_raw_page_at_lsn(relname text, forkname text, blocknum int8, lsn pg_lsn)
CREATE FUNCTION get_raw_page_at_lsn(relname text, forkname text, blocknum int8, request_lsn pg_lsn, not_modified_since pg_lsn)
RETURNS bytea
AS 'MODULE_PATHNAME', 'get_raw_page_at_lsn'
LANGUAGE C PARALLEL UNSAFE;
CREATE FUNCTION get_raw_page_at_lsn(tbspc oid, db oid, relfilenode oid, forknum int8, blocknum int8, lsn pg_lsn)
CREATE FUNCTION get_raw_page_at_lsn(tbspc oid, db oid, relfilenode oid, forknum int8, blocknum int8, request_lsn pg_lsn, not_modified_since pg_lsn)
RETURNS bytea
AS 'MODULE_PATHNAME', 'get_raw_page_at_lsn_ex'
LANGUAGE C PARALLEL UNSAFE;

View File

@@ -1,6 +1,6 @@
# neon_test_utils extension
comment = 'helpers for neon testing and debugging'
default_version = '1.0'
default_version = '1.1'
module_pathname = '$libdir/neon_test_utils'
relocatable = true
trusted = true

View File

@@ -48,10 +48,10 @@ PG_FUNCTION_INFO_V1(neon_xlogflush);
*/
#if PG_MAJORVERSION_NUM < 16
typedef void (*neon_read_at_lsn_type) (NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
XLogRecPtr request_lsn, bool request_latest, char *buffer);
XLogRecPtr request_lsn, XLogRecPtr not_modified_since, char *buffer);
#else
typedef void (*neon_read_at_lsn_type) (NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
XLogRecPtr request_lsn, bool request_latest, void *buffer);
XLogRecPtr request_lsn, XLogRecPtr not_modified_since, void *buffer);
#endif
static neon_read_at_lsn_type neon_read_at_lsn_ptr;
@@ -299,8 +299,11 @@ get_raw_page_at_lsn(PG_FUNCTION_ARGS)
text *forkname;
uint32 blkno;
bool request_latest = PG_ARGISNULL(3);
uint64 read_lsn = request_latest ? GetXLogInsertRecPtr() : PG_GETARG_INT64(3);
XLogRecPtr request_lsn;
XLogRecPtr not_modified_since;
if (PG_NARGS() != 5)
elog(ERROR, "unexpected number of arguments in SQL function signature");
if (PG_ARGISNULL(0) || PG_ARGISNULL(1) || PG_ARGISNULL(2))
PG_RETURN_NULL();
@@ -309,6 +312,9 @@ get_raw_page_at_lsn(PG_FUNCTION_ARGS)
forkname = PG_GETARG_TEXT_PP(1);
blkno = PG_GETARG_UINT32(2);
request_lsn = PG_ARGISNULL(3) ? GetXLogInsertRecPtr() : PG_GETARG_LSN(3);
not_modified_since = PG_ARGISNULL(4) ? request_lsn : PG_GETARG_LSN(4);
if (!superuser())
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
@@ -361,7 +367,7 @@ get_raw_page_at_lsn(PG_FUNCTION_ARGS)
SET_VARSIZE(raw_page, BLCKSZ + VARHDRSZ);
raw_page_data = VARDATA(raw_page);
neon_read_at_lsn(InfoFromRelation(rel), forknum, blkno, read_lsn, request_latest, raw_page_data);
neon_read_at_lsn(InfoFromRelation(rel), forknum, blkno, request_lsn, not_modified_since, raw_page_data);
relation_close(rel, AccessShareLock);
@@ -380,6 +386,9 @@ get_raw_page_at_lsn_ex(PG_FUNCTION_ARGS)
{
char *raw_page_data;
if (PG_NARGS() != 7)
elog(ERROR, "unexpected number of arguments in SQL function signature");
if (!superuser())
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
@@ -403,18 +412,20 @@ get_raw_page_at_lsn_ex(PG_FUNCTION_ARGS)
};
ForkNumber forknum = PG_GETARG_UINT32(3);
uint32 blkno = PG_GETARG_UINT32(4);
bool request_latest = PG_ARGISNULL(5);
uint64 read_lsn = request_latest ? GetXLogInsertRecPtr() : PG_GETARG_INT64(5);
XLogRecPtr request_lsn;
XLogRecPtr not_modified_since;
/* Initialize buffer to copy to */
bytea *raw_page = (bytea *) palloc(BLCKSZ + VARHDRSZ);
request_lsn = PG_ARGISNULL(5) ? GetXLogInsertRecPtr() : PG_GETARG_LSN(5);
not_modified_since = PG_ARGISNULL(6) ? request_lsn : PG_GETARG_LSN(6);
SET_VARSIZE(raw_page, BLCKSZ + VARHDRSZ);
raw_page_data = VARDATA(raw_page);
neon_read_at_lsn(rinfo, forknum, blkno, read_lsn, request_latest, raw_page_data);
neon_read_at_lsn(rinfo, forknum, blkno, request_lsn, not_modified_since, raw_page_data);
PG_RETURN_BYTEA_P(raw_page);
}
}

View File

@@ -75,7 +75,6 @@ pub type ComputeReady = DatabaseInfo;
// TODO: replace with an http-based protocol.
struct MgmtHandler;
#[async_trait::async_trait]
impl postgres_backend::Handler<tokio::net::TcpStream> for MgmtHandler {
async fn process_query(
&mut self,
@@ -89,6 +88,8 @@ impl postgres_backend::Handler<tokio::net::TcpStream> for MgmtHandler {
}
}
impl postgres_backend::HandlerSync<tokio::net::TcpStream> for MgmtHandler {}
fn try_process_query(pgb: &mut PostgresBackendTCP, query: &str) -> Result<(), QueryError> {
let resp: KickSession = serde_json::from_str(query).context("Failed to parse query as json")?;

View File

@@ -2,10 +2,13 @@
//! protocol commands.
use anyhow::Context;
use std::net::TcpStream;
use std::str::{self, FromStr};
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_io_timeout::TimeoutReader;
use tracing::{debug, info, info_span, Instrument};
use utils::measured_stream::MeasuredStream;
use crate::auth::check_permission;
use crate::json_ctrl::{handle_json_ctrl, AppendLogicalMessage};
@@ -95,8 +98,7 @@ fn cmd_to_string(cmd: &SafekeeperPostgresCommand) -> &str {
}
}
#[async_trait::async_trait]
impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::HandlerSync<IO>
for SafekeeperPostgresHandler
{
// tenant_id and timeline_id are passed in connection string params
@@ -191,8 +193,22 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
self.claims = Some(data.claims);
Ok(())
}
}
type IO<'s, R: FnMut(usize), W> =
MeasuredStream<std::pin::Pin<&'s mut TimeoutReader<TcpStream>>, R, W>;
impl<'s, R: FnMut(usize), W> postgres_backend::Handler<IO<'s, R, W>> for SafekeeperPostgresHandler {
async fn process_query(
&mut self,
pgb: &mut PostgresBackend<IO<'s, R, W>>,
query_string: &str,
) -> Result<(), QueryError> {
self.process_query_(pgb, &query_string).await
}
}
impl SafekeeperPostgresHandler {
async fn process_query_<IO: AsyncRead + AsyncWrite + Send + Unpin>(
&mut self,
pgb: &mut PostgresBackend<IO>,
query_string: &str,

View File

@@ -184,6 +184,19 @@ impl HeartbeaterTask {
}
}
}
tracing::info!(
"Heartbeat round complete for {} nodes, {} offline",
new_state.len(),
new_state
.values()
.filter(|s| match s {
PageserverState::Available { .. } => {
false
}
PageserverState::Offline => true,
})
.count()
);
let mut deltas = Vec::new();
let now = Instant::now();

View File

@@ -767,7 +767,10 @@ impl Reconciler {
// It is up to the caller whether they want to drop out on this error, but they don't have to:
// in general we should avoid letting unavailability of the cloud control plane stop us from
// making progress.
tracing::warn!("Failed to notify compute of attached pageserver {node}: {e}");
if !matches!(e, NotifyError::ShuttingDown) {
tracing::warn!("Failed to notify compute of attached pageserver {node}: {e}");
}
// Set this flag so that in our ReconcileResult we will set the flag on the shard that it
// needs to retry at some point.
self.compute_notify_failure = true;

View File

@@ -824,8 +824,7 @@ impl Service {
// Ordering: populate last_error before advancing error_seq,
// so that waiters will see the correct error after waiting.
*(tenant.last_error.lock().unwrap()) = format!("{e}");
tenant.error_waiter.advance(result.sequence);
tenant.set_last_error(result.sequence, e);
for (node_id, o) in result.observed.locations {
tenant.observed.locations.insert(node_id, o);
@@ -2805,7 +2804,14 @@ impl Service {
tenant_shard_id: shard.tenant_shard_id,
node_attached: *shard.intent.get_attached(),
node_secondary: shard.intent.get_secondary().to_vec(),
last_error: shard.last_error.lock().unwrap().clone(),
last_error: shard
.last_error
.lock()
.unwrap()
.as_ref()
.map(|e| format!("{e}"))
.unwrap_or("".to_string())
.clone(),
is_reconciling: shard.reconciler.is_some(),
is_pending_compute_notification: shard.pending_compute_notification,
is_splitting: matches!(shard.splitting, SplitState::Splitting),
@@ -4031,7 +4037,7 @@ impl Service {
// TODO: in the background, we should balance work back onto this pageserver
}
AvailabilityTransition::Unchanged => {
tracing::info!("Node {} no change during config", node_id);
tracing::debug!("Node {} no change during config", node_id);
}
}
@@ -4351,7 +4357,26 @@ impl Service {
};
let waiter_count = waiters.len();
self.await_waiters(waiters, RECONCILE_TIMEOUT).await?;
match self.await_waiters(waiters, RECONCILE_TIMEOUT).await {
Ok(()) => {}
Err(ReconcileWaitError::Failed(_, reconcile_error))
if matches!(*reconcile_error, ReconcileError::Cancel) =>
{
// Ignore reconciler cancel errors: this reconciler might have shut down
// because some other change superceded it. We will return a nonzero number,
// so the caller knows they might have to call again to quiesce the system.
}
Err(e) => {
return Err(e);
}
};
tracing::info!(
"{} reconciles in reconcile_all, {} waiters",
reconciles_spawned,
waiter_count
);
Ok(waiter_count)
}

View File

@@ -38,12 +38,18 @@ use crate::{
};
/// Serialization helper
fn read_mutex_content<S, T>(v: &std::sync::Mutex<T>, serializer: S) -> Result<S::Ok, S::Error>
fn read_last_error<S, T>(v: &std::sync::Mutex<Option<T>>, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::ser::Serializer,
T: Clone + std::fmt::Display,
T: std::fmt::Display,
{
serializer.collect_str(&v.lock().unwrap())
serializer.collect_str(
&v.lock()
.unwrap()
.as_ref()
.map(|e| format!("{e}"))
.unwrap_or("".to_string()),
)
}
/// In-memory state for a particular tenant shard.
@@ -111,11 +117,15 @@ pub(crate) struct TenantShard {
#[serde(skip)]
pub(crate) error_waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
/// The most recent error from a reconcile on this tenant
/// The most recent error from a reconcile on this tenant. This is a nested Arc
/// because:
/// - ReconcileWaiters need to Arc-clone the overall object to read it later
/// - ReconcileWaitError needs to use an `Arc<ReconcileError>` because we can construct
/// many waiters for one shard, and the underlying error types are not Clone.
/// TODO: generalize to an array of recent events
/// TOOD: use a ArcSwap instead of mutex for faster reads?
#[serde(serialize_with = "read_mutex_content")]
pub(crate) last_error: std::sync::Arc<std::sync::Mutex<String>>,
#[serde(serialize_with = "read_last_error")]
pub(crate) last_error: std::sync::Arc<std::sync::Mutex<Option<Arc<ReconcileError>>>>,
/// If we have a pending compute notification that for some reason we weren't able to send,
/// set this to true. If this is set, calls to [`Self::get_reconcile_needed`] will return Yes
@@ -293,18 +303,18 @@ pub(crate) struct ReconcilerWaiter {
seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
error_seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
error: std::sync::Arc<std::sync::Mutex<String>>,
error: std::sync::Arc<std::sync::Mutex<Option<Arc<ReconcileError>>>>,
seq: Sequence,
}
#[derive(thiserror::Error, Debug)]
pub enum ReconcileWaitError {
pub(crate) enum ReconcileWaitError {
#[error("Timeout waiting for shard {0}")]
Timeout(TenantShardId),
#[error("shutting down")]
Shutdown,
#[error("Reconcile error on shard {0}: {1}")]
Failed(TenantShardId, String),
Failed(TenantShardId, Arc<ReconcileError>),
}
#[derive(Eq, PartialEq, Debug)]
@@ -342,7 +352,8 @@ impl ReconcilerWaiter {
SeqWaitError::Timeout => unreachable!()
})?;
return Err(ReconcileWaitError::Failed(self.tenant_shard_id, self.error.lock().unwrap().clone()))
return Err(ReconcileWaitError::Failed(self.tenant_shard_id,
self.error.lock().unwrap().clone().expect("If error_seq_wait was advanced error was set").clone()))
}
}
@@ -873,7 +884,7 @@ impl TenantShard {
active_nodes_dirty || dirty_observed || self.pending_compute_notification;
if !do_reconcile {
tracing::info!("Not dirty, no reconciliation needed.");
tracing::debug!("Not dirty, no reconciliation needed.");
return ReconcileNeeded::No;
}
@@ -1151,6 +1162,13 @@ impl TenantShard {
&self.scheduling_policy
}
pub(crate) fn set_last_error(&mut self, sequence: Sequence, error: ReconcileError) {
// Ordering: always set last_error before advancing sequence, so that sequence
// waiters are guaranteed to see a Some value when they see an error.
*(self.last_error.lock().unwrap()) = Some(Arc::new(error));
self.error_waiter.advance(sequence);
}
pub(crate) fn from_persistent(
tsp: TenantShardPersistence,
intent: IntentState,

View File

@@ -3,7 +3,7 @@ import re
import time
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, wait_replica_caughtup
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, tenant_get_shards, wait_replica_caughtup
# Check for corrupted WAL messages which might otherwise go unnoticed if
@@ -102,3 +102,80 @@ def test_2_replicas_start(neon_simple_env: NeonEnv):
) as secondary2:
wait_replica_caughtup(primary, secondary1)
wait_replica_caughtup(primary, secondary2)
# We had an issue that a standby server made GetPage requests with an
# old LSN, based on the last-written LSN cache, to avoid waits in the
# pageserver. However, requesting a page with a very old LSN, such
# that the GC horizon has already advanced past it, results in an
# error from the pageserver:
# "Bad request: tried to request a page version that was garbage collected"
#
# To avoid that, the compute<-> pageserver protocol was updated so
# that that the standby now sends two LSNs, the old last-written LSN
# and the current replay LSN.
#
# https://github.com/neondatabase/neon/issues/6211
def test_hot_standby_gc(neon_env_builder: NeonEnvBuilder):
tenant_conf = {
# set PITR interval to be small, so we can do GC
"pitr_interval": "0 s",
}
env = neon_env_builder.init_start(initial_tenant_conf=tenant_conf)
timeline_id = env.initial_timeline
tenant_id = env.initial_tenant
with env.endpoints.create_start(
branch_name="main",
endpoint_id="primary",
) as primary:
with env.endpoints.new_replica_start(
origin=primary,
endpoint_id="secondary",
# Protocol version 2 was introduced to fix the issue
# that this test exercises. With protocol version 1 it
# fails.
config_lines=["neon.protocol_version=2"],
) as secondary:
p_cur = primary.connect().cursor()
p_cur.execute("CREATE EXTENSION neon_test_utils")
p_cur.execute("CREATE TABLE test (id int primary key) WITH (autovacuum_enabled=false)")
p_cur.execute("INSERT INTO test SELECT generate_series(1, 10000) AS g")
wait_replica_caughtup(primary, secondary)
s_cur = secondary.connect().cursor()
s_cur.execute("SELECT 1 WHERE pg_is_in_recovery()")
res = s_cur.fetchone()
assert res is not None
s_cur.execute("SELECT COUNT(*) FROM test")
res = s_cur.fetchone()
assert res[0] == 10000
# Clear the cache in the standby, so that when we
# re-execute the query, it will make GetPage
# requests. This does not clear the last-written LSN cache
# so we still remember the LSNs of the pages.
s_cur.execute("SELECT clear_buffer_cache()")
# Do other stuff on the primary, to advance the WAL
p_cur.execute("CREATE TABLE test2 AS SELECT generate_series(1, 1000000) AS g")
# Run GC. The PITR interval is very small, so this advances the GC cutoff LSN
# very close to the primary's current insert LSN.
shards = tenant_get_shards(env, tenant_id, None)
for tenant_shard_id, pageserver in shards:
client = pageserver.http_client()
client.timeline_checkpoint(tenant_shard_id, timeline_id)
client.timeline_compact(tenant_shard_id, timeline_id)
client.timeline_gc(tenant_shard_id, timeline_id, 0)
# Re-execute the query. The GetPage requests that this
# generates use old not_modified_since LSNs, older than
# the GC cutoff, but new request LSNs. (In protocol
# version 1 there was only one LSN, and this failed.)
s_cur.execute("SELECT COUNT(*) FROM test")
res = s_cur.fetchone()
assert res[0] == 10000

View File

@@ -0,0 +1,131 @@
from typing import Optional
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder, tenant_get_shards
from fixtures.types import Lsn
from fixtures.utils import query_scalar
#
# Test on-demand download of the pg_xact SLRUs
#
@pytest.mark.parametrize("shard_count", [None, 4])
def test_ondemand_download_pg_xact(neon_env_builder: NeonEnvBuilder, shard_count: Optional[int]):
if shard_count is not None:
neon_env_builder.num_pageservers = shard_count
tenant_conf = {
"lazy_slru_download": "true",
# set PITR interval to be small, so we can do GC
"pitr_interval": "0 s",
}
env = neon_env_builder.init_start(
initial_tenant_conf=tenant_conf, initial_tenant_shard_count=shard_count
)
timeline_id = env.initial_timeline
tenant_id = env.initial_tenant
endpoint = env.endpoints.create_start("main")
pg_conn = endpoint.connect()
cur = pg_conn.cursor()
cur.execute("CREATE EXTENSION neon_test_utils")
# Create a test table
cur.execute("CREATE TABLE clogtest (id integer)")
cur.execute("INSERT INTO clogtest VALUES (1)")
# Consume a lot of XIDs, to create more pg_xact segments
for _ in range(1000):
cur.execute("select test_consume_xids(10000);")
cur.execute("INSERT INTO clogtest VALUES (2)")
for _ in range(1000):
cur.execute("select test_consume_xids(10000);")
cur.execute("INSERT INTO clogtest VALUES (2)")
for _ in range(1000):
cur.execute("select test_consume_xids(10000);")
cur.execute("INSERT INTO clogtest VALUES (3)")
# Restart postgres. After restart, the new instance will download the
# pg_xact segments lazily.
endpoint.stop()
endpoint.start()
pg_conn = endpoint.connect()
cur = pg_conn.cursor()
# Consume more WAL, so that the pageserver can compact and GC older data,
# including the LSN that we started the new endpoint at,
cur.execute("CREATE TABLE anothertable (i int, t text)")
cur.execute(
"INSERT INTO anothertable SELECT g, 'long string to consume some space' || g FROM generate_series(1, 10000) g"
)
# Run GC
shards = tenant_get_shards(env, tenant_id, None)
for tenant_shard_id, pageserver in shards:
client = pageserver.http_client()
client.timeline_checkpoint(tenant_shard_id, timeline_id)
client.timeline_compact(tenant_shard_id, timeline_id)
client.timeline_gc(tenant_shard_id, timeline_id, 0)
# Test that this can still on-demand download the old pg_xact segments
cur.execute("select xmin, xmax, * from clogtest")
tup = cur.fetchall()
log.info(f"tuples = {tup}")
@pytest.mark.parametrize("shard_count", [None, 4])
def test_ondemand_download_replica(neon_env_builder: NeonEnvBuilder, shard_count: Optional[int]):
if shard_count is not None:
neon_env_builder.num_pageservers = shard_count
tenant_conf = {
"lazy_slru_download": "true",
}
env = neon_env_builder.init_start(
initial_tenant_conf=tenant_conf, initial_tenant_shard_count=shard_count
)
endpoint = env.endpoints.create_start("main")
pg_conn = endpoint.connect()
cur = pg_conn.cursor()
cur.execute("CREATE EXTENSION neon_test_utils")
# Create a test table
cur.execute("CREATE TABLE clogtest (id integer)")
cur.execute("INSERT INTO clogtest VALUES (1)")
# Consume a lot of XIDs, to create more pg_xact segments
for _ in range(1000):
cur.execute("select test_consume_xids(10000);")
# Open a new connection and insert another row, but leave
# the transaction open
pg_conn2 = endpoint.connect()
cur2 = pg_conn2.cursor()
cur2.execute("BEGIN")
cur2.execute("INSERT INTO clogtest VALUES (2)")
# Another insert on the first connection, which is committed.
for _ in range(1000):
cur.execute("select test_consume_xids(10000);")
cur.execute("INSERT INTO clogtest VALUES (3)")
# Start standby at this point in time
lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_insert_lsn()"))
endpoint_at_lsn = env.endpoints.create_start(
branch_name="main", endpoint_id="ep-at-lsn", lsn=lsn
)
# Commit transaction 2, after the standby was launched.
cur2.execute("COMMIT")
# The replica should not see transaction 2 as committed.
conn_replica = endpoint_at_lsn.connect()
cur_replica = conn_replica.cursor()
cur_replica.execute("SELECT * FROM clogtest")
assert cur_replica.fetchall() == [(1,), (3,)]

View File

@@ -17,7 +17,14 @@ def test_read_validation(neon_simple_env: NeonEnv):
env = neon_simple_env
env.neon_cli.create_branch("test_read_validation", "empty")
endpoint = env.endpoints.create_start("test_read_validation")
endpoint = env.endpoints.create_start(
"test_read_validation",
# Use protocol version 2, because the code that constructs the V1 messages
# assumes that a primary always wants to read the latest version of a page,
# and therefore doesn't work with the test functions below to read an older
# page version.
config_lines=["neon.protocol_version=2"],
)
with closing(endpoint.connect()) as con:
with con.cursor() as c:
@@ -64,7 +71,7 @@ def test_read_validation(neon_simple_env: NeonEnv):
log.info("Cache is clear, reading stale page version")
c.execute(
f"select lsn, lower, upper from page_header(get_raw_page_at_lsn('foo', 'main', 0, '{first[0]}'))"
f"select lsn, lower, upper from page_header(get_raw_page_at_lsn('foo', 'main', 0, '{first[0]}', NULL))"
)
direct_first = c.fetchone()
assert first == direct_first, "Failed fetch page at historic lsn"
@@ -77,7 +84,7 @@ def test_read_validation(neon_simple_env: NeonEnv):
log.info("Cache is clear, reading latest page version without cache")
c.execute(
"select lsn, lower, upper from page_header(get_raw_page_at_lsn('foo', 'main', 0, NULL))"
"select lsn, lower, upper from page_header(get_raw_page_at_lsn('foo', 'main', 0, NULL, NULL))"
)
direct_latest = c.fetchone()
assert second == direct_latest, "Failed fetch page at latest lsn"
@@ -92,7 +99,7 @@ def test_read_validation(neon_simple_env: NeonEnv):
)
c.execute(
f"select lsn, lower, upper from page_header(get_raw_page_at_lsn({reln[0]}, {reln[1]}, {reln[2]}, 0, 0, '{first[0]}'))"
f"select lsn, lower, upper from page_header(get_raw_page_at_lsn({reln[0]}, {reln[1]}, {reln[2]}, 0, 0, '{first[0]}', NULL))"
)
direct_first = c.fetchone()
assert first == direct_first, "Failed fetch page at historic lsn using oid"
@@ -102,7 +109,7 @@ def test_read_validation(neon_simple_env: NeonEnv):
)
c.execute(
f"select lsn, lower, upper from page_header(get_raw_page_at_lsn({reln[0]}, {reln[1]}, {reln[2]}, 0, 0, NULL))"
f"select lsn, lower, upper from page_header(get_raw_page_at_lsn({reln[0]}, {reln[1]}, {reln[2]}, 0, 0, NULL, NULL))"
)
direct_latest = c.fetchone()
assert second == direct_latest, "Failed fetch page at latest lsn"
@@ -114,7 +121,7 @@ def test_read_validation(neon_simple_env: NeonEnv):
)
c.execute(
f"select lsn, lower, upper from page_header(get_raw_page_at_lsn({reln[0]}, {reln[1]}, {reln[2]}, 0, 0, '{first[0]}'))"
f"select lsn, lower, upper from page_header(get_raw_page_at_lsn({reln[0]}, {reln[1]}, {reln[2]}, 0, 0, '{first[0]}', NULL))"
)
direct_first = c.fetchone()
assert first == direct_first, "Failed fetch page at historic lsn using oid"
@@ -133,7 +140,14 @@ def test_read_validation_neg(neon_simple_env: NeonEnv):
env.pageserver.allowed_errors.append(".*invalid LSN\\(0\\) in request.*")
endpoint = env.endpoints.create_start("test_read_validation_neg")
endpoint = env.endpoints.create_start(
"test_read_validation_neg",
# Use protocol version 2, because the code that constructs the V1 messages
# assumes that a primary always wants to read the latest version of a page,
# and therefore doesn't work with the test functions below to read an older
# page version.
config_lines=["neon.protocol_version=2"],
)
with closing(endpoint.connect()) as con:
with con.cursor() as c:
@@ -143,7 +157,7 @@ def test_read_validation_neg(neon_simple_env: NeonEnv):
log.info("read a page of a missing relation")
try:
c.execute(
"select lsn, lower, upper from page_header(get_raw_page_at_lsn('Unknown', 'main', 0, '0/0'))"
"select lsn, lower, upper from page_header(get_raw_page_at_lsn('Unknown', 'main', 0, '0/0', NULL))"
)
raise AssertionError("query should have failed")
except UndefinedTable as e:
@@ -155,7 +169,7 @@ def test_read_validation_neg(neon_simple_env: NeonEnv):
log.info("read a page at lsn 0")
try:
c.execute(
"select lsn, lower, upper from page_header(get_raw_page_at_lsn('foo', 'main', 0, '0/0'))"
"select lsn, lower, upper from page_header(get_raw_page_at_lsn('foo', 'main', 0, '0/0', NULL))"
)
raise AssertionError("query should have failed")
except IoError as e:
@@ -164,22 +178,22 @@ def test_read_validation_neg(neon_simple_env: NeonEnv):
log.info("Pass NULL as an input")
expected = (None, None, None)
c.execute(
"select lsn, lower, upper from page_header(get_raw_page_at_lsn(NULL, 'main', 0, '0/0'))"
"select lsn, lower, upper from page_header(get_raw_page_at_lsn(NULL, 'main', 0, '0/0', NULL))"
)
assert c.fetchone() == expected, "Expected null output"
c.execute(
"select lsn, lower, upper from page_header(get_raw_page_at_lsn('foo', NULL, 0, '0/0'))"
"select lsn, lower, upper from page_header(get_raw_page_at_lsn('foo', NULL, 0, '0/0', NULL))"
)
assert c.fetchone() == expected, "Expected null output"
c.execute(
"select lsn, lower, upper from page_header(get_raw_page_at_lsn('foo', 'main', NULL, '0/0'))"
"select lsn, lower, upper from page_header(get_raw_page_at_lsn('foo', 'main', NULL, '0/0', NULL))"
)
assert c.fetchone() == expected, "Expected null output"
# This check is currently failing, reading beyond EOF is returning a 0-page
log.info("Read beyond EOF")
c.execute(
"select lsn, lower, upper from page_header(get_raw_page_at_lsn('foo', 'main', 1, NULL))"
"select lsn, lower, upper from page_header(get_raw_page_at_lsn('foo', 'main', 1, NULL, NULL))"
)

View File

@@ -173,7 +173,9 @@ def test_vm_bit_clear_on_heap_lock(neon_env_builder: NeonEnvBuilder):
# which changes the LSN on the page.
cur.execute("select get_raw_page( 'vmtest_lock', 'vm', 0 )")
vm_page_in_cache = (cur.fetchall()[0][0])[8:100].hex()
cur.execute("select get_raw_page_at_lsn( 'vmtest_lock', 'vm', 0, pg_current_wal_insert_lsn() )")
cur.execute(
"select get_raw_page_at_lsn( 'vmtest_lock', 'vm', 0, pg_current_wal_insert_lsn(), NULL )"
)
vm_page_at_pageserver = (cur.fetchall()[0][0])[8:100].hex()
assert vm_page_at_pageserver == vm_page_in_cache

View File

@@ -7,7 +7,9 @@ use std::{
io::BufReader,
};
use pageserver_api::models::{PagestreamFeMessage, PagestreamGetPageRequest};
use pageserver_api::models::{
PagestreamFeMessage, PagestreamGetPageRequest, PagestreamProtocolVersion,
};
use utils::id::{ConnectionId, TenantId, TimelineId};
use clap::{Parser, Subcommand};
@@ -56,7 +58,7 @@ fn analyze_trace<R: std::io::Read>(mut reader: R) {
let mut prev: Option<PagestreamGetPageRequest> = None;
// Compute stats
while let Ok(msg) = PagestreamFeMessage::parse(&mut reader) {
while let Ok(msg) = PagestreamFeMessage::parse(&mut reader, PagestreamProtocolVersion::V2) {
match msg {
PagestreamFeMessage::Exists(_) => {}
PagestreamFeMessage::Nblocks(_) => {}
@@ -89,7 +91,7 @@ fn analyze_trace<R: std::io::Read>(mut reader: R) {
}
fn dump_trace<R: std::io::Read>(mut reader: R) {
while let Ok(msg) = PagestreamFeMessage::parse(&mut reader) {
while let Ok(msg) = PagestreamFeMessage::parse(&mut reader, PagestreamProtocolVersion::V2) {
println!("{msg:?}");
}
}