mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-27 01:50:38 +00:00
Compare commits
14 Commits
jcsp/shard
...
arpad/less
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
548b28b28b | ||
|
|
f9039f7d73 | ||
|
|
2a7bc782fd | ||
|
|
307d10b111 | ||
|
|
5546c0de35 | ||
|
|
d1fb4a4a00 | ||
|
|
bf369f4268 | ||
|
|
70f4a16a05 | ||
|
|
d63185fa6c | ||
|
|
ca8fca0e9f | ||
|
|
0397427dcf | ||
|
|
a2a44ea213 | ||
|
|
4917f52c88 | ||
|
|
04a682021f |
@@ -848,39 +848,72 @@ impl TryFrom<u8> for PagestreamBeMessageTag {
|
||||
}
|
||||
}
|
||||
|
||||
// In the V2 protocol version, a GetPage request contains two LSN values:
|
||||
//
|
||||
// request_lsn: Get the page version at this point in time. Lsn::Max is a special value that means
|
||||
// "get the latest version present". It's used by the primary server, which knows that no one else
|
||||
// is writing WAL. 'not_modified_since' must be set to a proper value even if request_lsn is
|
||||
// Lsn::Max. Standby servers use the current replay LSN as the request LSN.
|
||||
//
|
||||
// not_modified_since: Hint to the pageserver that the client knows that the page has not been
|
||||
// modified between 'not_modified_since' and the request LSN. It's always correct to set
|
||||
// 'not_modified_since equal' to 'request_lsn' (unless Lsn::Max is used as the 'request_lsn'), but
|
||||
// passing an earlier LSN can speed up the request, by allowing the pageserver to process the
|
||||
// request without waiting for 'request_lsn' to arrive.
|
||||
//
|
||||
// The legacy V1 interface contained only one LSN, and a boolean 'latest' flag. The V1 interface was
|
||||
// sufficient for the primary; the 'lsn' was equivalent to the 'not_modified_since' value, and
|
||||
// 'latest' was set to true. The V2 interface was added because there was no correct way for a
|
||||
// standby to request a page at a particular non-latest LSN, and also include the
|
||||
// 'not_modified_since' hint. That led to an awkward choice of either using an old LSN in the
|
||||
// request, if the standby knows that the page hasn't been modified since, and risk getting an error
|
||||
// if that LSN has fallen behind the GC horizon, or requesting the current replay LSN, which could
|
||||
// require the pageserver unnecessarily to wait for the WAL to arrive up to that point. The new V2
|
||||
// interface allows sending both LSNs, and let the pageserver do the right thing. There is no
|
||||
// difference in the responses between V1 and V2.
|
||||
//
|
||||
// The Request structs below reflect the V2 interface. If V1 is used, the parse function
|
||||
// maps the old format requests to the new format.
|
||||
//
|
||||
#[derive(Clone, Copy)]
|
||||
pub enum PagestreamProtocolVersion {
|
||||
V1,
|
||||
V2,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
pub struct PagestreamExistsRequest {
|
||||
pub latest: bool,
|
||||
pub lsn: Lsn,
|
||||
pub request_lsn: Lsn,
|
||||
pub not_modified_since: Lsn,
|
||||
pub rel: RelTag,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
pub struct PagestreamNblocksRequest {
|
||||
pub latest: bool,
|
||||
pub lsn: Lsn,
|
||||
pub request_lsn: Lsn,
|
||||
pub not_modified_since: Lsn,
|
||||
pub rel: RelTag,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
pub struct PagestreamGetPageRequest {
|
||||
pub latest: bool,
|
||||
pub lsn: Lsn,
|
||||
pub request_lsn: Lsn,
|
||||
pub not_modified_since: Lsn,
|
||||
pub rel: RelTag,
|
||||
pub blkno: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
pub struct PagestreamDbSizeRequest {
|
||||
pub latest: bool,
|
||||
pub lsn: Lsn,
|
||||
pub request_lsn: Lsn,
|
||||
pub not_modified_since: Lsn,
|
||||
pub dbnode: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
pub struct PagestreamGetSlruSegmentRequest {
|
||||
pub latest: bool,
|
||||
pub lsn: Lsn,
|
||||
pub request_lsn: Lsn,
|
||||
pub not_modified_since: Lsn,
|
||||
pub kind: u8,
|
||||
pub segno: u32,
|
||||
}
|
||||
@@ -927,14 +960,16 @@ pub struct TenantHistorySize {
|
||||
}
|
||||
|
||||
impl PagestreamFeMessage {
|
||||
/// Serialize a compute -> pageserver message. This is currently only used in testing
|
||||
/// tools. Always uses protocol version 2.
|
||||
pub fn serialize(&self) -> Bytes {
|
||||
let mut bytes = BytesMut::new();
|
||||
|
||||
match self {
|
||||
Self::Exists(req) => {
|
||||
bytes.put_u8(0);
|
||||
bytes.put_u8(u8::from(req.latest));
|
||||
bytes.put_u64(req.lsn.0);
|
||||
bytes.put_u64(req.request_lsn.0);
|
||||
bytes.put_u64(req.not_modified_since.0);
|
||||
bytes.put_u32(req.rel.spcnode);
|
||||
bytes.put_u32(req.rel.dbnode);
|
||||
bytes.put_u32(req.rel.relnode);
|
||||
@@ -943,8 +978,8 @@ impl PagestreamFeMessage {
|
||||
|
||||
Self::Nblocks(req) => {
|
||||
bytes.put_u8(1);
|
||||
bytes.put_u8(u8::from(req.latest));
|
||||
bytes.put_u64(req.lsn.0);
|
||||
bytes.put_u64(req.request_lsn.0);
|
||||
bytes.put_u64(req.not_modified_since.0);
|
||||
bytes.put_u32(req.rel.spcnode);
|
||||
bytes.put_u32(req.rel.dbnode);
|
||||
bytes.put_u32(req.rel.relnode);
|
||||
@@ -953,8 +988,8 @@ impl PagestreamFeMessage {
|
||||
|
||||
Self::GetPage(req) => {
|
||||
bytes.put_u8(2);
|
||||
bytes.put_u8(u8::from(req.latest));
|
||||
bytes.put_u64(req.lsn.0);
|
||||
bytes.put_u64(req.request_lsn.0);
|
||||
bytes.put_u64(req.not_modified_since.0);
|
||||
bytes.put_u32(req.rel.spcnode);
|
||||
bytes.put_u32(req.rel.dbnode);
|
||||
bytes.put_u32(req.rel.relnode);
|
||||
@@ -964,15 +999,15 @@ impl PagestreamFeMessage {
|
||||
|
||||
Self::DbSize(req) => {
|
||||
bytes.put_u8(3);
|
||||
bytes.put_u8(u8::from(req.latest));
|
||||
bytes.put_u64(req.lsn.0);
|
||||
bytes.put_u64(req.request_lsn.0);
|
||||
bytes.put_u64(req.not_modified_since.0);
|
||||
bytes.put_u32(req.dbnode);
|
||||
}
|
||||
|
||||
Self::GetSlruSegment(req) => {
|
||||
bytes.put_u8(4);
|
||||
bytes.put_u8(u8::from(req.latest));
|
||||
bytes.put_u64(req.lsn.0);
|
||||
bytes.put_u64(req.request_lsn.0);
|
||||
bytes.put_u64(req.not_modified_since.0);
|
||||
bytes.put_u8(req.kind);
|
||||
bytes.put_u32(req.segno);
|
||||
}
|
||||
@@ -981,18 +1016,40 @@ impl PagestreamFeMessage {
|
||||
bytes.into()
|
||||
}
|
||||
|
||||
pub fn parse<R: std::io::Read>(body: &mut R) -> anyhow::Result<PagestreamFeMessage> {
|
||||
// TODO these gets can fail
|
||||
|
||||
pub fn parse<R: std::io::Read>(
|
||||
body: &mut R,
|
||||
protocol_version: PagestreamProtocolVersion,
|
||||
) -> anyhow::Result<PagestreamFeMessage> {
|
||||
// these correspond to the NeonMessageTag enum in pagestore_client.h
|
||||
//
|
||||
// TODO: consider using protobuf or serde bincode for less error prone
|
||||
// serialization.
|
||||
let msg_tag = body.read_u8()?;
|
||||
|
||||
let (request_lsn, not_modified_since) = match protocol_version {
|
||||
PagestreamProtocolVersion::V2 => (
|
||||
Lsn::from(body.read_u64::<BigEndian>()?),
|
||||
Lsn::from(body.read_u64::<BigEndian>()?),
|
||||
),
|
||||
PagestreamProtocolVersion::V1 => {
|
||||
// In the old protocol, each message starts with a boolean 'latest' flag,
|
||||
// followed by 'lsn'. Convert that to the two LSNs, 'request_lsn' and
|
||||
// 'not_modified_since', used in the new protocol version.
|
||||
let latest = body.read_u8()? != 0;
|
||||
let request_lsn = Lsn::from(body.read_u64::<BigEndian>()?);
|
||||
if latest {
|
||||
(Lsn::MAX, request_lsn) // get latest version
|
||||
} else {
|
||||
(request_lsn, request_lsn) // get version at specified LSN
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// The rest of the messages are the same between V1 and V2
|
||||
match msg_tag {
|
||||
0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
|
||||
latest: body.read_u8()? != 0,
|
||||
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
|
||||
request_lsn,
|
||||
not_modified_since,
|
||||
rel: RelTag {
|
||||
spcnode: body.read_u32::<BigEndian>()?,
|
||||
dbnode: body.read_u32::<BigEndian>()?,
|
||||
@@ -1001,8 +1058,8 @@ impl PagestreamFeMessage {
|
||||
},
|
||||
})),
|
||||
1 => Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
|
||||
latest: body.read_u8()? != 0,
|
||||
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
|
||||
request_lsn,
|
||||
not_modified_since,
|
||||
rel: RelTag {
|
||||
spcnode: body.read_u32::<BigEndian>()?,
|
||||
dbnode: body.read_u32::<BigEndian>()?,
|
||||
@@ -1011,8 +1068,8 @@ impl PagestreamFeMessage {
|
||||
},
|
||||
})),
|
||||
2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
|
||||
latest: body.read_u8()? != 0,
|
||||
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
|
||||
request_lsn,
|
||||
not_modified_since,
|
||||
rel: RelTag {
|
||||
spcnode: body.read_u32::<BigEndian>()?,
|
||||
dbnode: body.read_u32::<BigEndian>()?,
|
||||
@@ -1022,14 +1079,14 @@ impl PagestreamFeMessage {
|
||||
blkno: body.read_u32::<BigEndian>()?,
|
||||
})),
|
||||
3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
|
||||
latest: body.read_u8()? != 0,
|
||||
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
|
||||
request_lsn,
|
||||
not_modified_since,
|
||||
dbnode: body.read_u32::<BigEndian>()?,
|
||||
})),
|
||||
4 => Ok(PagestreamFeMessage::GetSlruSegment(
|
||||
PagestreamGetSlruSegmentRequest {
|
||||
latest: body.read_u8()? != 0,
|
||||
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
|
||||
request_lsn,
|
||||
not_modified_since,
|
||||
kind: body.read_u8()?,
|
||||
segno: body.read_u32::<BigEndian>()?,
|
||||
},
|
||||
@@ -1157,8 +1214,8 @@ mod tests {
|
||||
// Test serialization/deserialization of PagestreamFeMessage
|
||||
let messages = vec![
|
||||
PagestreamFeMessage::Exists(PagestreamExistsRequest {
|
||||
latest: true,
|
||||
lsn: Lsn(4),
|
||||
request_lsn: Lsn(4),
|
||||
not_modified_since: Lsn(3),
|
||||
rel: RelTag {
|
||||
forknum: 1,
|
||||
spcnode: 2,
|
||||
@@ -1167,8 +1224,8 @@ mod tests {
|
||||
},
|
||||
}),
|
||||
PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
|
||||
latest: false,
|
||||
lsn: Lsn(4),
|
||||
request_lsn: Lsn(4),
|
||||
not_modified_since: Lsn(4),
|
||||
rel: RelTag {
|
||||
forknum: 1,
|
||||
spcnode: 2,
|
||||
@@ -1177,8 +1234,8 @@ mod tests {
|
||||
},
|
||||
}),
|
||||
PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
|
||||
latest: true,
|
||||
lsn: Lsn(4),
|
||||
request_lsn: Lsn(4),
|
||||
not_modified_since: Lsn(3),
|
||||
rel: RelTag {
|
||||
forknum: 1,
|
||||
spcnode: 2,
|
||||
@@ -1188,14 +1245,16 @@ mod tests {
|
||||
blkno: 7,
|
||||
}),
|
||||
PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
|
||||
latest: true,
|
||||
lsn: Lsn(4),
|
||||
request_lsn: Lsn(4),
|
||||
not_modified_since: Lsn(3),
|
||||
dbnode: 7,
|
||||
}),
|
||||
];
|
||||
for msg in messages {
|
||||
let bytes = msg.serialize();
|
||||
let reconstructed = PagestreamFeMessage::parse(&mut bytes.reader()).unwrap();
|
||||
let reconstructed =
|
||||
PagestreamFeMessage::parse(&mut bytes.reader(), PagestreamProtocolVersion::V2)
|
||||
.unwrap();
|
||||
assert!(msg == reconstructed);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -78,18 +78,18 @@ pub fn is_expected_io_error(e: &io::Error) -> bool {
|
||||
)
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait Handler<IO> {
|
||||
pub trait Handler<IO>: HandlerSync<IO> {
|
||||
/// Handle single query.
|
||||
/// postgres_backend will issue ReadyForQuery after calling this (this
|
||||
/// might be not what we want after CopyData streaming, but currently we don't
|
||||
/// care). It will also flush out the output buffer.
|
||||
async fn process_query(
|
||||
fn process_query(
|
||||
&mut self,
|
||||
pgb: &mut PostgresBackend<IO>,
|
||||
query_string: &str,
|
||||
) -> Result<(), QueryError>;
|
||||
|
||||
) -> impl Future<Output = Result<(), QueryError>> + Send;
|
||||
}
|
||||
pub trait HandlerSync<IO> {
|
||||
/// Called on startup packet receival, allows to process params.
|
||||
///
|
||||
/// If Ok(false) is returned postgres_backend will skip auth -- that is needed for new users
|
||||
|
||||
@@ -60,7 +60,7 @@ impl Client {
|
||||
) -> anyhow::Result<PagestreamClient> {
|
||||
let copy_both: tokio_postgres::CopyBothDuplex<bytes::Bytes> = self
|
||||
.client
|
||||
.copy_both_simple(&format!("pagestream {tenant_id} {timeline_id}"))
|
||||
.copy_both_simple(&format!("pagestream_v2 {tenant_id} {timeline_id}"))
|
||||
.await?;
|
||||
let Client {
|
||||
cancel_on_client_drop,
|
||||
|
||||
@@ -312,8 +312,12 @@ async fn main_impl(
|
||||
let (rel_tag, block_no) =
|
||||
key_to_rel_block(key).expect("we filter non-rel-block keys out above");
|
||||
PagestreamGetPageRequest {
|
||||
latest: rng.gen_bool(args.req_latest_probability),
|
||||
lsn: r.timeline_lsn,
|
||||
request_lsn: if rng.gen_bool(args.req_latest_probability) {
|
||||
Lsn::MAX
|
||||
} else {
|
||||
r.timeline_lsn
|
||||
},
|
||||
not_modified_since: r.timeline_lsn,
|
||||
rel: rel_tag,
|
||||
blkno: block_no,
|
||||
}
|
||||
|
||||
@@ -376,7 +376,7 @@ where
|
||||
async fn add_rel(&mut self, src: RelTag, dst: RelTag) -> anyhow::Result<()> {
|
||||
let nblocks = self
|
||||
.timeline
|
||||
.get_rel_size(src, Version::Lsn(self.lsn), false, self.ctx)
|
||||
.get_rel_size(src, Version::Lsn(self.lsn), self.ctx)
|
||||
.await?;
|
||||
|
||||
// If the relation is empty, create an empty file
|
||||
@@ -397,7 +397,7 @@ where
|
||||
for blknum in startblk..endblk {
|
||||
let img = self
|
||||
.timeline
|
||||
.get_rel_page_at_lsn(src, blknum, Version::Lsn(self.lsn), false, self.ctx)
|
||||
.get_rel_page_at_lsn(src, blknum, Version::Lsn(self.lsn), self.ctx)
|
||||
.await?;
|
||||
segment_data.extend_from_slice(&img[..]);
|
||||
}
|
||||
|
||||
@@ -1,13 +1,5 @@
|
||||
//
|
||||
//! The Page Service listens for client connections and serves their GetPage@LSN
|
||||
//! requests.
|
||||
//
|
||||
// It is possible to connect here using usual psql/pgbench/libpq. Following
|
||||
// commands are supported now:
|
||||
// *status* -- show actual info about this pageserver,
|
||||
// *pagestream* -- enter mode where smgr and pageserver talk with their
|
||||
// custom protocol.
|
||||
//
|
||||
|
||||
use anyhow::Context;
|
||||
use async_compression::tokio::write::GzipEncoder;
|
||||
@@ -23,7 +15,7 @@ use pageserver_api::models::{
|
||||
PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
|
||||
PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse,
|
||||
PagestreamGetSlruSegmentRequest, PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest,
|
||||
PagestreamNblocksResponse,
|
||||
PagestreamNblocksResponse, PagestreamProtocolVersion,
|
||||
};
|
||||
use pageserver_api::shard::ShardIndex;
|
||||
use pageserver_api::shard::ShardNumber;
|
||||
@@ -551,6 +543,7 @@ impl PageServerHandler {
|
||||
pgb: &mut PostgresBackend<IO>,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
protocol_version: PagestreamProtocolVersion,
|
||||
ctx: RequestContext,
|
||||
) -> Result<(), QueryError>
|
||||
where
|
||||
@@ -613,14 +606,15 @@ impl PageServerHandler {
|
||||
t.trace(©_data_bytes)
|
||||
}
|
||||
|
||||
let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
|
||||
let neon_fe_msg =
|
||||
PagestreamFeMessage::parse(&mut copy_data_bytes.reader(), protocol_version)?;
|
||||
|
||||
// TODO: We could create a new per-request context here, with unique ID.
|
||||
// Currently we use the same per-timeline context for all requests
|
||||
|
||||
let (response, span) = match neon_fe_msg {
|
||||
PagestreamFeMessage::Exists(req) => {
|
||||
let span = tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.lsn);
|
||||
let span = tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.request_lsn);
|
||||
(
|
||||
self.handle_get_rel_exists_request(tenant_id, timeline_id, &req, &ctx)
|
||||
.instrument(span.clone())
|
||||
@@ -629,7 +623,7 @@ impl PageServerHandler {
|
||||
)
|
||||
}
|
||||
PagestreamFeMessage::Nblocks(req) => {
|
||||
let span = tracing::info_span!("handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.lsn);
|
||||
let span = tracing::info_span!("handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.request_lsn);
|
||||
(
|
||||
self.handle_get_nblocks_request(tenant_id, timeline_id, &req, &ctx)
|
||||
.instrument(span.clone())
|
||||
@@ -639,7 +633,7 @@ impl PageServerHandler {
|
||||
}
|
||||
PagestreamFeMessage::GetPage(req) => {
|
||||
// shard_id is filled in by the handler
|
||||
let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn);
|
||||
let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.request_lsn);
|
||||
(
|
||||
self.handle_get_page_at_lsn_request(tenant_id, timeline_id, &req, &ctx)
|
||||
.instrument(span.clone())
|
||||
@@ -648,7 +642,7 @@ impl PageServerHandler {
|
||||
)
|
||||
}
|
||||
PagestreamFeMessage::DbSize(req) => {
|
||||
let span = tracing::info_span!("handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.lsn);
|
||||
let span = tracing::info_span!("handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.request_lsn);
|
||||
(
|
||||
self.handle_db_size_request(tenant_id, timeline_id, &req, &ctx)
|
||||
.instrument(span.clone())
|
||||
@@ -657,7 +651,7 @@ impl PageServerHandler {
|
||||
)
|
||||
}
|
||||
PagestreamFeMessage::GetSlruSegment(req) => {
|
||||
let span = tracing::info_span!("handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.lsn);
|
||||
let span = tracing::info_span!("handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.request_lsn);
|
||||
(
|
||||
self.handle_get_slru_segment_request(tenant_id, timeline_id, &req, &ctx)
|
||||
.instrument(span.clone())
|
||||
@@ -838,83 +832,80 @@ impl PageServerHandler {
|
||||
/// Helper function to handle the LSN from client request.
|
||||
///
|
||||
/// Each GetPage (and Exists and Nblocks) request includes information about
|
||||
/// which version of the page is being requested. The client can request the
|
||||
/// latest version of the page, or the version that's valid at a particular
|
||||
/// LSN. The primary compute node will always request the latest page
|
||||
/// version, while a standby will request a version at the LSN that it's
|
||||
/// currently caught up to.
|
||||
/// which version of the page is being requested. The primary compute node
|
||||
/// will always request the latest page version, by setting 'request_lsn' to
|
||||
/// the last inserted or flushed WAL position, while a standby will request
|
||||
/// a version at the LSN that it's currently caught up to.
|
||||
///
|
||||
/// In either case, if the page server hasn't received the WAL up to the
|
||||
/// requested LSN yet, we will wait for it to arrive. The return value is
|
||||
/// the LSN that should be used to look up the page versions.
|
||||
///
|
||||
/// In addition to the request LSN, each request carries another LSN,
|
||||
/// 'not_modified_since', which is a hint to the pageserver that the client
|
||||
/// knows that the page has not been modified between 'not_modified_since'
|
||||
/// and the request LSN. This allows skipping the wait, as long as the WAL
|
||||
/// up to 'not_modified_since' has arrived. If the client doesn't have any
|
||||
/// information about when the page was modified, it will use
|
||||
/// not_modified_since == lsn. If the client lies and sends a too low
|
||||
/// not_modified_hint such that there are in fact later page versions, the
|
||||
/// behavior is undefined: the pageserver may return any of the page versions
|
||||
/// or an error.
|
||||
async fn wait_or_get_last_lsn(
|
||||
timeline: &Timeline,
|
||||
mut lsn: Lsn,
|
||||
latest: bool,
|
||||
request_lsn: Lsn,
|
||||
not_modified_since: Lsn,
|
||||
latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Lsn, PageStreamError> {
|
||||
if latest {
|
||||
// Latest page version was requested. If LSN is given, it is a hint
|
||||
// to the page server that there have been no modifications to the
|
||||
// page after that LSN. If we haven't received WAL up to that point,
|
||||
// wait until it arrives.
|
||||
let last_record_lsn = timeline.get_last_record_lsn();
|
||||
let last_record_lsn = timeline.get_last_record_lsn();
|
||||
|
||||
// Note: this covers the special case that lsn == Lsn(0). That
|
||||
// special case means "return the latest version whatever it is",
|
||||
// and it's used for bootstrapping purposes, when the page server is
|
||||
// connected directly to the compute node. That is needed because
|
||||
// when you connect to the compute node, to receive the WAL, the
|
||||
// walsender process will do a look up in the pg_authid catalog
|
||||
// table for authentication. That poses a deadlock problem: the
|
||||
// catalog table lookup will send a GetPage request, but the GetPage
|
||||
// request will block in the page server because the recent WAL
|
||||
// hasn't been received yet, and it cannot be received until the
|
||||
// walsender completes the authentication and starts streaming the
|
||||
// WAL.
|
||||
if lsn <= last_record_lsn {
|
||||
// It might be better to use max(lsn, latest_gc_cutoff_lsn) instead
|
||||
// last_record_lsn. That would give the same result, since we know
|
||||
// that there haven't been modifications since 'lsn'. Using an older
|
||||
// LSN might be faster, because that could allow skipping recent
|
||||
// layers when finding the page.
|
||||
lsn = last_record_lsn;
|
||||
// Sanity check the request
|
||||
if request_lsn < not_modified_since {
|
||||
return Err(PageStreamError::BadRequest(
|
||||
format!(
|
||||
"invalid request with request LSN {} and not_modified_since {}",
|
||||
request_lsn, not_modified_since,
|
||||
)
|
||||
.into(),
|
||||
));
|
||||
}
|
||||
|
||||
if request_lsn < **latest_gc_cutoff_lsn {
|
||||
// Check explicitly for INVALID just to get a less scary error message if the
|
||||
// request is obviously bogus
|
||||
return Err(if request_lsn == Lsn::INVALID {
|
||||
PageStreamError::BadRequest("invalid LSN(0) in request".into())
|
||||
} else {
|
||||
timeline
|
||||
.wait_lsn(
|
||||
lsn,
|
||||
crate::tenant::timeline::WaitLsnWaiter::PageService,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
// Since we waited for 'lsn' to arrive, that is now the last
|
||||
// record LSN. (Or close enough for our purposes; the
|
||||
// last-record LSN can advance immediately after we return
|
||||
// anyway)
|
||||
}
|
||||
} else {
|
||||
if lsn == Lsn(0) {
|
||||
return Err(PageStreamError::BadRequest(
|
||||
"invalid LSN(0) in request".into(),
|
||||
));
|
||||
}
|
||||
PageStreamError::BadRequest(format!(
|
||||
"tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
|
||||
request_lsn, **latest_gc_cutoff_lsn
|
||||
).into())
|
||||
});
|
||||
}
|
||||
|
||||
// Wait for WAL up to 'not_modified_since' to arrive, if necessary
|
||||
if not_modified_since > last_record_lsn {
|
||||
timeline
|
||||
.wait_lsn(
|
||||
lsn,
|
||||
not_modified_since,
|
||||
crate::tenant::timeline::WaitLsnWaiter::PageService,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
// Since we waited for 'not_modified_since' to arrive, that is now the last
|
||||
// record LSN. (Or close enough for our purposes; the last-record LSN can
|
||||
// advance immediately after we return anyway)
|
||||
Ok(not_modified_since)
|
||||
} else {
|
||||
// It might be better to use max(not_modified_since, latest_gc_cutoff_lsn)
|
||||
// here instead. That would give the same result, since we know that there
|
||||
// haven't been any modifications since 'not_modified_since'. Using an older
|
||||
// LSN might be faster, because that could allow skipping recent layers when
|
||||
// finding the page. However, we have historically used 'last_record_lsn', so
|
||||
// stick to that for now.
|
||||
Ok(std::cmp::min(last_record_lsn, request_lsn))
|
||||
}
|
||||
|
||||
if lsn < **latest_gc_cutoff_lsn {
|
||||
return Err(PageStreamError::BadRequest(format!(
|
||||
"tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
|
||||
lsn, **latest_gc_cutoff_lsn
|
||||
).into()));
|
||||
}
|
||||
Ok(lsn)
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(shard_id))]
|
||||
@@ -931,12 +922,17 @@ impl PageServerHandler {
|
||||
.start_timer(metrics::SmgrQueryType::GetRelExists, ctx);
|
||||
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
let lsn =
|
||||
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
|
||||
.await?;
|
||||
let lsn = Self::wait_or_get_last_lsn(
|
||||
timeline,
|
||||
req.request_lsn,
|
||||
req.not_modified_since,
|
||||
&latest_gc_cutoff_lsn,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let exists = timeline
|
||||
.get_rel_exists(req.rel, Version::Lsn(lsn), req.latest, ctx)
|
||||
.get_rel_exists(req.rel, Version::Lsn(lsn), ctx)
|
||||
.await?;
|
||||
|
||||
Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse {
|
||||
@@ -959,12 +955,17 @@ impl PageServerHandler {
|
||||
.start_timer(metrics::SmgrQueryType::GetRelSize, ctx);
|
||||
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
let lsn =
|
||||
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
|
||||
.await?;
|
||||
let lsn = Self::wait_or_get_last_lsn(
|
||||
timeline,
|
||||
req.request_lsn,
|
||||
req.not_modified_since,
|
||||
&latest_gc_cutoff_lsn,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let n_blocks = timeline
|
||||
.get_rel_size(req.rel, Version::Lsn(lsn), req.latest, ctx)
|
||||
.get_rel_size(req.rel, Version::Lsn(lsn), ctx)
|
||||
.await?;
|
||||
|
||||
Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse {
|
||||
@@ -987,18 +988,17 @@ impl PageServerHandler {
|
||||
.start_timer(metrics::SmgrQueryType::GetDbSize, ctx);
|
||||
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
let lsn =
|
||||
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
|
||||
.await?;
|
||||
let lsn = Self::wait_or_get_last_lsn(
|
||||
timeline,
|
||||
req.request_lsn,
|
||||
req.not_modified_since,
|
||||
&latest_gc_cutoff_lsn,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let total_blocks = timeline
|
||||
.get_db_size(
|
||||
DEFAULTTABLESPACE_OID,
|
||||
req.dbnode,
|
||||
Version::Lsn(lsn),
|
||||
req.latest,
|
||||
ctx,
|
||||
)
|
||||
.get_db_size(DEFAULTTABLESPACE_OID, req.dbnode, Version::Lsn(lsn), ctx)
|
||||
.await?;
|
||||
let db_size = total_blocks as i64 * BLCKSZ as i64;
|
||||
|
||||
@@ -1165,12 +1165,17 @@ impl PageServerHandler {
|
||||
.start_timer(metrics::SmgrQueryType::GetPageAtLsn, ctx);
|
||||
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
let lsn =
|
||||
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
|
||||
.await?;
|
||||
let lsn = Self::wait_or_get_last_lsn(
|
||||
timeline,
|
||||
req.request_lsn,
|
||||
req.not_modified_since,
|
||||
&latest_gc_cutoff_lsn,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let page = timeline
|
||||
.get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), req.latest, ctx)
|
||||
.get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), ctx)
|
||||
.await?;
|
||||
|
||||
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
|
||||
@@ -1193,9 +1198,14 @@ impl PageServerHandler {
|
||||
.start_timer(metrics::SmgrQueryType::GetSlruSegment, ctx);
|
||||
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
let lsn =
|
||||
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
|
||||
.await?;
|
||||
let lsn = Self::wait_or_get_last_lsn(
|
||||
timeline,
|
||||
req.request_lsn,
|
||||
req.not_modified_since,
|
||||
&latest_gc_cutoff_lsn,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let kind = SlruKind::from_repr(req.kind)
|
||||
.ok_or(PageStreamError::BadRequest("invalid SLRU kind".into()))?;
|
||||
@@ -1358,8 +1368,7 @@ impl PageServerHandler {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<IO> postgres_backend::Handler<IO> for PageServerHandler
|
||||
impl<IO> postgres_backend::HandlerSync<IO> for PageServerHandler
|
||||
where
|
||||
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
|
||||
{
|
||||
@@ -1399,9 +1408,24 @@ where
|
||||
) -> Result<(), QueryError> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
type IO<'s> = std::pin::Pin<&'s mut tokio_io_timeout::TimeoutReader<tokio::net::TcpStream>>;
|
||||
|
||||
impl<'s> postgres_backend::Handler<IO<'s>> for PageServerHandler
|
||||
{
|
||||
#[instrument(skip_all, fields(tenant_id, timeline_id))]
|
||||
async fn process_query(
|
||||
&mut self,
|
||||
pgb: &mut PostgresBackend<IO<'s>>,
|
||||
query_string: &str,
|
||||
) -> Result<(), QueryError> {
|
||||
self.process_query_(pgb, &query_string).await
|
||||
}
|
||||
}
|
||||
|
||||
impl PageServerHandler {
|
||||
async fn process_query_<IO: AsyncRead + AsyncWrite + Send + Sync + Unpin>(
|
||||
&mut self,
|
||||
pgb: &mut PostgresBackend<IO>,
|
||||
query_string: &str,
|
||||
@@ -1413,7 +1437,34 @@ where
|
||||
|
||||
let ctx = self.connection_ctx.attached_child();
|
||||
debug!("process query {query_string:?}");
|
||||
if query_string.starts_with("pagestream ") {
|
||||
if query_string.starts_with("pagestream_v2 ") {
|
||||
let (_, params_raw) = query_string.split_at("pagestream_v2 ".len());
|
||||
let params = params_raw.split(' ').collect::<Vec<_>>();
|
||||
if params.len() != 2 {
|
||||
return Err(QueryError::Other(anyhow::anyhow!(
|
||||
"invalid param number for pagestream command"
|
||||
)));
|
||||
}
|
||||
let tenant_id = TenantId::from_str(params[0])
|
||||
.with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
|
||||
let timeline_id = TimelineId::from_str(params[1])
|
||||
.with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
|
||||
|
||||
tracing::Span::current()
|
||||
.record("tenant_id", field::display(tenant_id))
|
||||
.record("timeline_id", field::display(timeline_id));
|
||||
|
||||
self.check_permission(Some(tenant_id))?;
|
||||
|
||||
self.handle_pagerequests(
|
||||
pgb,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
PagestreamProtocolVersion::V2,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
} else if query_string.starts_with("pagestream ") {
|
||||
let (_, params_raw) = query_string.split_at("pagestream ".len());
|
||||
let params = params_raw.split(' ').collect::<Vec<_>>();
|
||||
if params.len() != 2 {
|
||||
@@ -1432,8 +1483,14 @@ where
|
||||
|
||||
self.check_permission(Some(tenant_id))?;
|
||||
|
||||
self.handle_pagerequests(pgb, tenant_id, timeline_id, ctx)
|
||||
.await?;
|
||||
self.handle_pagerequests(
|
||||
pgb,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
PagestreamProtocolVersion::V1,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
} else if query_string.starts_with("basebackup ") {
|
||||
let (_, params_raw) = query_string.split_at("basebackup ".len());
|
||||
let params = params_raw.split_whitespace().collect::<Vec<_>>();
|
||||
|
||||
@@ -176,7 +176,6 @@ impl Timeline {
|
||||
tag: RelTag,
|
||||
blknum: BlockNumber,
|
||||
version: Version<'_>,
|
||||
latest: bool,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Bytes, PageReconstructError> {
|
||||
if tag.relnode == 0 {
|
||||
@@ -185,7 +184,7 @@ impl Timeline {
|
||||
));
|
||||
}
|
||||
|
||||
let nblocks = self.get_rel_size(tag, version, latest, ctx).await?;
|
||||
let nblocks = self.get_rel_size(tag, version, ctx).await?;
|
||||
if blknum >= nblocks {
|
||||
debug!(
|
||||
"read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
|
||||
@@ -207,7 +206,6 @@ impl Timeline {
|
||||
spcnode: Oid,
|
||||
dbnode: Oid,
|
||||
version: Version<'_>,
|
||||
latest: bool,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<usize, PageReconstructError> {
|
||||
let mut total_blocks = 0;
|
||||
@@ -215,7 +213,7 @@ impl Timeline {
|
||||
let rels = self.list_rels(spcnode, dbnode, version, ctx).await?;
|
||||
|
||||
for rel in rels {
|
||||
let n_blocks = self.get_rel_size(rel, version, latest, ctx).await?;
|
||||
let n_blocks = self.get_rel_size(rel, version, ctx).await?;
|
||||
total_blocks += n_blocks as usize;
|
||||
}
|
||||
Ok(total_blocks)
|
||||
@@ -226,7 +224,6 @@ impl Timeline {
|
||||
&self,
|
||||
tag: RelTag,
|
||||
version: Version<'_>,
|
||||
latest: bool,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<BlockNumber, PageReconstructError> {
|
||||
if tag.relnode == 0 {
|
||||
@@ -240,7 +237,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
|
||||
&& !self.get_rel_exists(tag, version, latest, ctx).await?
|
||||
&& !self.get_rel_exists(tag, version, ctx).await?
|
||||
{
|
||||
// FIXME: Postgres sometimes calls smgrcreate() to create
|
||||
// FSM, and smgrnblocks() on it immediately afterwards,
|
||||
@@ -263,7 +260,6 @@ impl Timeline {
|
||||
&self,
|
||||
tag: RelTag,
|
||||
version: Version<'_>,
|
||||
_latest: bool,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<bool, PageReconstructError> {
|
||||
if tag.relnode == 0 {
|
||||
@@ -1095,7 +1091,7 @@ impl<'a> DatadirModification<'a> {
|
||||
) -> anyhow::Result<()> {
|
||||
let total_blocks = self
|
||||
.tline
|
||||
.get_db_size(spcnode, dbnode, Version::Modified(self), true, ctx)
|
||||
.get_db_size(spcnode, dbnode, Version::Modified(self), ctx)
|
||||
.await?;
|
||||
|
||||
// Remove entry from dbdir
|
||||
@@ -1194,7 +1190,7 @@ impl<'a> DatadirModification<'a> {
|
||||
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
|
||||
if self
|
||||
.tline
|
||||
.get_rel_exists(rel, Version::Modified(self), true, ctx)
|
||||
.get_rel_exists(rel, Version::Modified(self), ctx)
|
||||
.await?
|
||||
{
|
||||
let size_key = rel_size_to_key(rel);
|
||||
|
||||
@@ -7,6 +7,7 @@ use std::collections::HashSet;
|
||||
use std::future::Future;
|
||||
|
||||
use anyhow::{anyhow, Context};
|
||||
use bytes::BytesMut;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use tokio::fs::{self, File, OpenOptions};
|
||||
@@ -182,6 +183,7 @@ async fn download_object<'a>(
|
||||
#[cfg(target_os = "linux")]
|
||||
crate::virtual_file::io_engine::IoEngine::TokioEpollUring => {
|
||||
use crate::virtual_file::owned_buffers_io::{self, util::size_tracking_writer};
|
||||
use bytes::BytesMut;
|
||||
async {
|
||||
let destination_file = VirtualFile::create(dst_path)
|
||||
.await
|
||||
@@ -194,10 +196,10 @@ async fn download_object<'a>(
|
||||
// There's chunks_vectored() on the stream.
|
||||
let (bytes_amount, destination_file) = async {
|
||||
let size_tracking = size_tracking_writer::Writer::new(destination_file);
|
||||
let mut buffered = owned_buffers_io::write::BufferedWriter::<
|
||||
{ super::BUFFER_SIZE },
|
||||
_,
|
||||
>::new(size_tracking);
|
||||
let mut buffered = owned_buffers_io::write::BufferedWriter::<BytesMut, _>::new(
|
||||
size_tracking,
|
||||
BytesMut::with_capacity(super::BUFFER_SIZE),
|
||||
);
|
||||
while let Some(res) =
|
||||
futures::StreamExt::next(&mut download.download_stream).await
|
||||
{
|
||||
|
||||
@@ -32,6 +32,7 @@ pub use io_engine::feature_test as io_engine_feature_test;
|
||||
pub use io_engine::FeatureTestResult as IoEngineFeatureTestResult;
|
||||
mod metadata;
|
||||
mod open_options;
|
||||
use self::owned_buffers_io::write::OwnedAsyncWriter;
|
||||
pub(crate) use io_engine::IoEngineKind;
|
||||
pub(crate) use metadata::Metadata;
|
||||
pub(crate) use open_options::*;
|
||||
@@ -1083,6 +1084,17 @@ impl Drop for VirtualFile {
|
||||
}
|
||||
}
|
||||
|
||||
impl OwnedAsyncWriter for VirtualFile {
|
||||
#[inline(always)]
|
||||
async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
|
||||
&mut self,
|
||||
buf: B,
|
||||
) -> std::io::Result<(usize, B::Buf)> {
|
||||
let (buf, res) = VirtualFile::write_all(self, buf).await;
|
||||
res.map(move |v| (v, buf))
|
||||
}
|
||||
}
|
||||
|
||||
impl OpenFiles {
|
||||
fn new(num_slots: usize) -> OpenFiles {
|
||||
let mut slots = Box::new(Vec::with_capacity(num_slots));
|
||||
|
||||
@@ -1,33 +1,36 @@
|
||||
use crate::virtual_file::{owned_buffers_io::write::OwnedAsyncWriter, VirtualFile};
|
||||
use crate::virtual_file::owned_buffers_io::write::OwnedAsyncWriter;
|
||||
use tokio_epoll_uring::{BoundedBuf, IoBuf};
|
||||
|
||||
pub struct Writer {
|
||||
dst: VirtualFile,
|
||||
pub struct Writer<W> {
|
||||
dst: W,
|
||||
bytes_amount: u64,
|
||||
}
|
||||
|
||||
impl Writer {
|
||||
pub fn new(dst: VirtualFile) -> Self {
|
||||
impl<W> Writer<W> {
|
||||
pub fn new(dst: W) -> Self {
|
||||
Self {
|
||||
dst,
|
||||
bytes_amount: 0,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the wrapped `VirtualFile` object as well as the number
|
||||
/// of bytes that were written to it through this object.
|
||||
pub fn into_inner(self) -> (u64, VirtualFile) {
|
||||
pub fn into_inner(self) -> (u64, W) {
|
||||
(self.bytes_amount, self.dst)
|
||||
}
|
||||
}
|
||||
|
||||
impl OwnedAsyncWriter for Writer {
|
||||
impl<W> OwnedAsyncWriter for Writer<W>
|
||||
where
|
||||
W: OwnedAsyncWriter,
|
||||
{
|
||||
#[inline(always)]
|
||||
async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
|
||||
&mut self,
|
||||
buf: B,
|
||||
) -> std::io::Result<(usize, B::Buf)> {
|
||||
let (buf, res) = self.dst.write_all(buf).await;
|
||||
let nwritten = res?;
|
||||
let (nwritten, buf) = self.dst.write_all(buf).await?;
|
||||
self.bytes_amount += u64::try_from(nwritten).unwrap();
|
||||
Ok((nwritten, buf))
|
||||
}
|
||||
|
||||
@@ -10,14 +10,14 @@ pub trait OwnedAsyncWriter {
|
||||
) -> std::io::Result<(usize, B::Buf)>;
|
||||
}
|
||||
|
||||
/// A wrapper aorund an [`OwnedAsyncWriter`] that batches smaller writers
|
||||
/// into `BUFFER_SIZE`-sized writes.
|
||||
/// A wrapper aorund an [`OwnedAsyncWriter`] that uses a [`Buffer`] to batch
|
||||
/// small writes into larger writes of size [`Buffer::cap`].
|
||||
///
|
||||
/// # Passthrough Of Large Writers
|
||||
///
|
||||
/// Buffered writes larger than the `BUFFER_SIZE` cause the internal
|
||||
/// buffer to be flushed, even if it is not full yet. Then, the large
|
||||
/// buffered write is passed through to the unerlying [`OwnedAsyncWriter`].
|
||||
/// Calls to [`BufferedWriter::write_buffered`] that are larger than [`Buffer::cap`]
|
||||
/// cause the internal buffer to be flushed prematurely so that the large
|
||||
/// buffered write is passed through to the underlying [`OwnedAsyncWriter`].
|
||||
///
|
||||
/// This pass-through is generally beneficial for throughput, but if
|
||||
/// the storage backend of the [`OwnedAsyncWriter`] is a shared resource,
|
||||
@@ -25,24 +25,25 @@ pub trait OwnedAsyncWriter {
|
||||
///
|
||||
/// In such cases, a different implementation that always buffers in memory
|
||||
/// may be preferable.
|
||||
pub struct BufferedWriter<const BUFFER_SIZE: usize, W> {
|
||||
pub struct BufferedWriter<B, W> {
|
||||
writer: W,
|
||||
// invariant: always remains Some(buf)
|
||||
// with buf.capacity() == BUFFER_SIZE except
|
||||
// - while IO is ongoing => goes back to Some() once the IO completed successfully
|
||||
// - after an IO error => stays `None` forever
|
||||
// In these exceptional cases, it's `None`.
|
||||
buf: Option<BytesMut>,
|
||||
/// invariant: always remains Some(buf) except
|
||||
/// - while IO is ongoing => goes back to Some() once the IO completed successfully
|
||||
/// - after an IO error => stays `None` forever
|
||||
/// In these exceptional cases, it's `None`.
|
||||
buf: Option<B>,
|
||||
}
|
||||
|
||||
impl<const BUFFER_SIZE: usize, W> BufferedWriter<BUFFER_SIZE, W>
|
||||
impl<B, Buf, W> BufferedWriter<B, W>
|
||||
where
|
||||
B: Buffer<IoBuf = Buf> + Send,
|
||||
Buf: IoBuf + Send,
|
||||
W: OwnedAsyncWriter,
|
||||
{
|
||||
pub fn new(writer: W) -> Self {
|
||||
pub fn new(writer: W, buf: B) -> Self {
|
||||
Self {
|
||||
writer,
|
||||
buf: Some(BytesMut::with_capacity(BUFFER_SIZE)),
|
||||
buf: Some(buf),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -53,61 +54,121 @@ where
|
||||
Ok(writer)
|
||||
}
|
||||
|
||||
pub async fn write_buffered<B: IoBuf>(&mut self, chunk: Slice<B>) -> std::io::Result<()>
|
||||
#[inline(always)]
|
||||
fn buf(&self) -> &B {
|
||||
self.buf
|
||||
.as_ref()
|
||||
.expect("must not use after we returned an error")
|
||||
}
|
||||
|
||||
pub async fn write_buffered<S: IoBuf>(&mut self, chunk: Slice<S>) -> std::io::Result<(usize, S)>
|
||||
where
|
||||
B: IoBuf + Send,
|
||||
S: IoBuf + Send,
|
||||
{
|
||||
let chunk_len = chunk.len();
|
||||
// avoid memcpy for the middle of the chunk
|
||||
if chunk.len() >= BUFFER_SIZE {
|
||||
if chunk.len() >= self.buf().cap() {
|
||||
self.flush().await?;
|
||||
// do a big write, bypassing `buf`
|
||||
assert_eq!(
|
||||
self.buf
|
||||
.as_ref()
|
||||
.expect("must not use after an error")
|
||||
.len(),
|
||||
.pending(),
|
||||
0
|
||||
);
|
||||
let chunk_len = chunk.len();
|
||||
let (nwritten, chunk) = self.writer.write_all(chunk).await?;
|
||||
assert_eq!(nwritten, chunk_len);
|
||||
drop(chunk);
|
||||
return Ok(());
|
||||
return Ok((nwritten, chunk));
|
||||
}
|
||||
// in-memory copy the < BUFFER_SIZED tail of the chunk
|
||||
assert!(chunk.len() < BUFFER_SIZE);
|
||||
let mut chunk = &chunk[..];
|
||||
while !chunk.is_empty() {
|
||||
assert!(chunk.len() < self.buf().cap());
|
||||
let mut slice = &chunk[..];
|
||||
while !slice.is_empty() {
|
||||
let buf = self.buf.as_mut().expect("must not use after an error");
|
||||
let need = BUFFER_SIZE - buf.len();
|
||||
let have = chunk.len();
|
||||
let need = buf.cap() - buf.pending();
|
||||
let have = slice.len();
|
||||
let n = std::cmp::min(need, have);
|
||||
buf.extend_from_slice(&chunk[..n]);
|
||||
chunk = &chunk[n..];
|
||||
if buf.len() >= BUFFER_SIZE {
|
||||
assert_eq!(buf.len(), BUFFER_SIZE);
|
||||
buf.extend_from_slice(&slice[..n]);
|
||||
slice = &slice[n..];
|
||||
if buf.pending() >= buf.cap() {
|
||||
assert_eq!(buf.pending(), buf.cap());
|
||||
self.flush().await?;
|
||||
}
|
||||
}
|
||||
assert!(chunk.is_empty(), "by now we should have drained the chunk");
|
||||
Ok(())
|
||||
assert!(slice.is_empty(), "by now we should have drained the chunk");
|
||||
Ok((chunk_len, chunk.into_inner()))
|
||||
}
|
||||
|
||||
async fn flush(&mut self) -> std::io::Result<()> {
|
||||
let buf = self.buf.take().expect("must not use after an error");
|
||||
if buf.is_empty() {
|
||||
let buf_len = buf.pending();
|
||||
if buf_len == 0 {
|
||||
self.buf = Some(buf);
|
||||
return std::io::Result::Ok(());
|
||||
return Ok(());
|
||||
}
|
||||
let buf_len = buf.len();
|
||||
let (nwritten, mut buf) = self.writer.write_all(buf).await?;
|
||||
let (nwritten, io_buf) = self.writer.write_all(buf.flush()).await?;
|
||||
assert_eq!(nwritten, buf_len);
|
||||
buf.clear();
|
||||
self.buf = Some(buf);
|
||||
self.buf = Some(Buffer::reuse_after_flush(io_buf));
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// A [`Buffer`] is used by [`BufferedWriter`] to batch smaller writes into larger ones.
|
||||
pub trait Buffer {
|
||||
type IoBuf: IoBuf;
|
||||
|
||||
/// Capacity of the buffer. Must not change over the lifetime `self`.`
|
||||
fn cap(&self) -> usize;
|
||||
|
||||
/// Add data to the buffer.
|
||||
/// Panics if there is not enough room to accomodate `other`'s content, i.e.,
|
||||
/// panics if `other.len() > self.cap() - self.pending()`.
|
||||
fn extend_from_slice(&mut self, other: &[u8]);
|
||||
|
||||
/// Number of bytes in the buffer.
|
||||
fn pending(&self) -> usize;
|
||||
|
||||
/// Turns `self` into a [`tokio_epoll_uring::Slice`] of the pending data
|
||||
/// so we can use [`tokio_epoll_uring`] to write it to disk.
|
||||
fn flush(self) -> Slice<Self::IoBuf>;
|
||||
|
||||
/// After the write to disk is done and we have gotten back the slice,
|
||||
/// [`BufferedWriter`] uses this method to re-use the io buffer.
|
||||
fn reuse_after_flush(iobuf: Self::IoBuf) -> Self;
|
||||
}
|
||||
|
||||
impl Buffer for BytesMut {
|
||||
type IoBuf = BytesMut;
|
||||
|
||||
#[inline(always)]
|
||||
fn cap(&self) -> usize {
|
||||
self.capacity()
|
||||
}
|
||||
|
||||
fn extend_from_slice(&mut self, other: &[u8]) {
|
||||
BytesMut::extend_from_slice(self, other)
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
fn pending(&self) -> usize {
|
||||
self.len()
|
||||
}
|
||||
|
||||
fn flush(self) -> Slice<BytesMut> {
|
||||
if self.is_empty() {
|
||||
return self.slice_full();
|
||||
}
|
||||
let len = self.len();
|
||||
self.slice(0..len)
|
||||
}
|
||||
|
||||
fn reuse_after_flush(mut iobuf: BytesMut) -> Self {
|
||||
iobuf.clear();
|
||||
iobuf
|
||||
}
|
||||
}
|
||||
|
||||
impl OwnedAsyncWriter for Vec<u8> {
|
||||
async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
|
||||
&mut self,
|
||||
@@ -125,6 +186,8 @@ impl OwnedAsyncWriter for Vec<u8> {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use bytes::BytesMut;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[derive(Default)]
|
||||
@@ -158,7 +221,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_buffered_writes_only() -> std::io::Result<()> {
|
||||
let recorder = RecorderWriter::default();
|
||||
let mut writer = BufferedWriter::<2, _>::new(recorder);
|
||||
let mut writer = BufferedWriter::new(recorder, BytesMut::with_capacity(2));
|
||||
write!(writer, b"a");
|
||||
write!(writer, b"b");
|
||||
write!(writer, b"c");
|
||||
@@ -175,7 +238,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_passthrough_writes_only() -> std::io::Result<()> {
|
||||
let recorder = RecorderWriter::default();
|
||||
let mut writer = BufferedWriter::<2, _>::new(recorder);
|
||||
let mut writer = BufferedWriter::new(recorder, BytesMut::with_capacity(2));
|
||||
write!(writer, b"abc");
|
||||
write!(writer, b"de");
|
||||
write!(writer, b"");
|
||||
@@ -191,7 +254,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_passthrough_write_with_nonempty_buffer() -> std::io::Result<()> {
|
||||
let recorder = RecorderWriter::default();
|
||||
let mut writer = BufferedWriter::<2, _>::new(recorder);
|
||||
let mut writer = BufferedWriter::new(recorder, BytesMut::with_capacity(2));
|
||||
write!(writer, b"a");
|
||||
write!(writer, b"bc");
|
||||
write!(writer, b"d");
|
||||
|
||||
@@ -1034,7 +1034,7 @@ impl WalIngest {
|
||||
|
||||
let nblocks = modification
|
||||
.tline
|
||||
.get_rel_size(src_rel, Version::Modified(modification), true, ctx)
|
||||
.get_rel_size(src_rel, Version::Modified(modification), ctx)
|
||||
.await?;
|
||||
let dst_rel = RelTag {
|
||||
spcnode: tablespace_id,
|
||||
@@ -1068,13 +1068,7 @@ impl WalIngest {
|
||||
|
||||
let content = modification
|
||||
.tline
|
||||
.get_rel_page_at_lsn(
|
||||
src_rel,
|
||||
blknum,
|
||||
Version::Modified(modification),
|
||||
true,
|
||||
ctx,
|
||||
)
|
||||
.get_rel_page_at_lsn(src_rel, blknum, Version::Modified(modification), ctx)
|
||||
.await?;
|
||||
modification.put_rel_page_image(dst_rel, blknum, content)?;
|
||||
num_blocks_copied += 1;
|
||||
@@ -1242,7 +1236,7 @@ impl WalIngest {
|
||||
};
|
||||
if modification
|
||||
.tline
|
||||
.get_rel_exists(rel, Version::Modified(modification), true, ctx)
|
||||
.get_rel_exists(rel, Version::Modified(modification), ctx)
|
||||
.await?
|
||||
{
|
||||
self.put_rel_drop(modification, rel, ctx).await?;
|
||||
@@ -1541,7 +1535,7 @@ impl WalIngest {
|
||||
nblocks
|
||||
} else if !modification
|
||||
.tline
|
||||
.get_rel_exists(rel, Version::Modified(modification), true, ctx)
|
||||
.get_rel_exists(rel, Version::Modified(modification), ctx)
|
||||
.await?
|
||||
{
|
||||
// create it with 0 size initially, the logic below will extend it
|
||||
@@ -1553,7 +1547,7 @@ impl WalIngest {
|
||||
} else {
|
||||
modification
|
||||
.tline
|
||||
.get_rel_size(rel, Version::Modified(modification), true, ctx)
|
||||
.get_rel_size(rel, Version::Modified(modification), ctx)
|
||||
.await?
|
||||
};
|
||||
|
||||
@@ -1650,14 +1644,14 @@ async fn get_relsize(
|
||||
) -> anyhow::Result<BlockNumber> {
|
||||
let nblocks = if !modification
|
||||
.tline
|
||||
.get_rel_exists(rel, Version::Modified(modification), true, ctx)
|
||||
.get_rel_exists(rel, Version::Modified(modification), ctx)
|
||||
.await?
|
||||
{
|
||||
0
|
||||
} else {
|
||||
modification
|
||||
.tline
|
||||
.get_rel_size(rel, Version::Modified(modification), true, ctx)
|
||||
.get_rel_size(rel, Version::Modified(modification), ctx)
|
||||
.await?
|
||||
};
|
||||
Ok(nblocks)
|
||||
@@ -1732,29 +1726,29 @@ mod tests {
|
||||
// The relation was created at LSN 2, not visible at LSN 1 yet.
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), false, &ctx)
|
||||
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
|
||||
.await?,
|
||||
false
|
||||
);
|
||||
assert!(tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), false, &ctx)
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
|
||||
.await
|
||||
.is_err());
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
|
||||
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
|
||||
.await?,
|
||||
true
|
||||
);
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
|
||||
.await?,
|
||||
1
|
||||
);
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), false, &ctx)
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
|
||||
.await?,
|
||||
3
|
||||
);
|
||||
@@ -1762,46 +1756,46 @@ mod tests {
|
||||
// Check page contents at each LSN
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x20)), false, &ctx)
|
||||
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x20)), &ctx)
|
||||
.await?,
|
||||
test_img("foo blk 0 at 2")
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x30)), false, &ctx)
|
||||
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x30)), &ctx)
|
||||
.await?,
|
||||
test_img("foo blk 0 at 3")
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x40)), false, &ctx)
|
||||
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x40)), &ctx)
|
||||
.await?,
|
||||
test_img("foo blk 0 at 3")
|
||||
);
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x40)), false, &ctx)
|
||||
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x40)), &ctx)
|
||||
.await?,
|
||||
test_img("foo blk 1 at 4")
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x50)), false, &ctx)
|
||||
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x50)), &ctx)
|
||||
.await?,
|
||||
test_img("foo blk 0 at 3")
|
||||
);
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x50)), false, &ctx)
|
||||
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x50)), &ctx)
|
||||
.await?,
|
||||
test_img("foo blk 1 at 4")
|
||||
);
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), false, &ctx)
|
||||
.get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), &ctx)
|
||||
.await?,
|
||||
test_img("foo blk 2 at 5")
|
||||
);
|
||||
@@ -1817,19 +1811,19 @@ mod tests {
|
||||
// Check reported size and contents after truncation
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), false, &ctx)
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), &ctx)
|
||||
.await?,
|
||||
2
|
||||
);
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x60)), false, &ctx)
|
||||
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x60)), &ctx)
|
||||
.await?,
|
||||
test_img("foo blk 0 at 3")
|
||||
);
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x60)), false, &ctx)
|
||||
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x60)), &ctx)
|
||||
.await?,
|
||||
test_img("foo blk 1 at 4")
|
||||
);
|
||||
@@ -1837,13 +1831,13 @@ mod tests {
|
||||
// should still see the truncated block with older LSN
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), false, &ctx)
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
|
||||
.await?,
|
||||
3
|
||||
);
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), false, &ctx)
|
||||
.get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), &ctx)
|
||||
.await?,
|
||||
test_img("foo blk 2 at 5")
|
||||
);
|
||||
@@ -1856,7 +1850,7 @@ mod tests {
|
||||
m.commit(&ctx).await?;
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x68)), false, &ctx)
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x68)), &ctx)
|
||||
.await?,
|
||||
0
|
||||
);
|
||||
@@ -1869,19 +1863,19 @@ mod tests {
|
||||
m.commit(&ctx).await?;
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x70)), false, &ctx)
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x70)), &ctx)
|
||||
.await?,
|
||||
2
|
||||
);
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x70)), false, &ctx)
|
||||
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x70)), &ctx)
|
||||
.await?,
|
||||
ZERO_PAGE
|
||||
);
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x70)), false, &ctx)
|
||||
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x70)), &ctx)
|
||||
.await?,
|
||||
test_img("foo blk 1")
|
||||
);
|
||||
@@ -1894,21 +1888,21 @@ mod tests {
|
||||
m.commit(&ctx).await?;
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), false, &ctx)
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
|
||||
.await?,
|
||||
1501
|
||||
);
|
||||
for blk in 2..1500 {
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_page_at_lsn(TESTREL_A, blk, Version::Lsn(Lsn(0x80)), false, &ctx)
|
||||
.get_rel_page_at_lsn(TESTREL_A, blk, Version::Lsn(Lsn(0x80)), &ctx)
|
||||
.await?,
|
||||
ZERO_PAGE
|
||||
);
|
||||
}
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_page_at_lsn(TESTREL_A, 1500, Version::Lsn(Lsn(0x80)), false, &ctx)
|
||||
.get_rel_page_at_lsn(TESTREL_A, 1500, Version::Lsn(Lsn(0x80)), &ctx)
|
||||
.await?,
|
||||
test_img("foo blk 1500")
|
||||
);
|
||||
@@ -1935,13 +1929,13 @@ mod tests {
|
||||
// Check that rel exists and size is correct
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
|
||||
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
|
||||
.await?,
|
||||
true
|
||||
);
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
|
||||
.await?,
|
||||
1
|
||||
);
|
||||
@@ -1954,7 +1948,7 @@ mod tests {
|
||||
// Check that rel is not visible anymore
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x30)), false, &ctx)
|
||||
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x30)), &ctx)
|
||||
.await?,
|
||||
false
|
||||
);
|
||||
@@ -1972,13 +1966,13 @@ mod tests {
|
||||
// Check that rel exists and size is correct
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x40)), false, &ctx)
|
||||
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x40)), &ctx)
|
||||
.await?,
|
||||
true
|
||||
);
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x40)), false, &ctx)
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x40)), &ctx)
|
||||
.await?,
|
||||
1
|
||||
);
|
||||
@@ -2011,24 +2005,24 @@ mod tests {
|
||||
// The relation was created at LSN 20, not visible at LSN 1 yet.
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), false, &ctx)
|
||||
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
|
||||
.await?,
|
||||
false
|
||||
);
|
||||
assert!(tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), false, &ctx)
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
|
||||
.await
|
||||
.is_err());
|
||||
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
|
||||
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
|
||||
.await?,
|
||||
true
|
||||
);
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
|
||||
.await?,
|
||||
relsize
|
||||
);
|
||||
@@ -2039,7 +2033,7 @@ mod tests {
|
||||
let data = format!("foo blk {} at {}", blkno, lsn);
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(lsn), false, &ctx)
|
||||
.get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(lsn), &ctx)
|
||||
.await?,
|
||||
test_img(&data)
|
||||
);
|
||||
@@ -2056,7 +2050,7 @@ mod tests {
|
||||
// Check reported size and contents after truncation
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), false, &ctx)
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), &ctx)
|
||||
.await?,
|
||||
1
|
||||
);
|
||||
@@ -2066,7 +2060,7 @@ mod tests {
|
||||
let data = format!("foo blk {} at {}", blkno, lsn);
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x60)), false, &ctx)
|
||||
.get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x60)), &ctx)
|
||||
.await?,
|
||||
test_img(&data)
|
||||
);
|
||||
@@ -2075,7 +2069,7 @@ mod tests {
|
||||
// should still see all blocks with older LSN
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), false, &ctx)
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
|
||||
.await?,
|
||||
relsize
|
||||
);
|
||||
@@ -2084,7 +2078,7 @@ mod tests {
|
||||
let data = format!("foo blk {} at {}", blkno, lsn);
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x50)), false, &ctx)
|
||||
.get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x50)), &ctx)
|
||||
.await?,
|
||||
test_img(&data)
|
||||
);
|
||||
@@ -2104,13 +2098,13 @@ mod tests {
|
||||
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x80)), false, &ctx)
|
||||
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
|
||||
.await?,
|
||||
true
|
||||
);
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), false, &ctx)
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
|
||||
.await?,
|
||||
relsize
|
||||
);
|
||||
@@ -2120,7 +2114,7 @@ mod tests {
|
||||
let data = format!("foo blk {} at {}", blkno, lsn);
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x80)), false, &ctx)
|
||||
.get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x80)), &ctx)
|
||||
.await?,
|
||||
test_img(&data)
|
||||
);
|
||||
@@ -2154,7 +2148,7 @@ mod tests {
|
||||
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), false, &ctx)
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
|
||||
.await?,
|
||||
RELSEG_SIZE + 1
|
||||
);
|
||||
@@ -2168,7 +2162,7 @@ mod tests {
|
||||
m.commit(&ctx).await?;
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), false, &ctx)
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
|
||||
.await?,
|
||||
RELSEG_SIZE
|
||||
);
|
||||
@@ -2183,7 +2177,7 @@ mod tests {
|
||||
m.commit(&ctx).await?;
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), false, &ctx)
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
|
||||
.await?,
|
||||
RELSEG_SIZE - 1
|
||||
);
|
||||
@@ -2201,7 +2195,7 @@ mod tests {
|
||||
m.commit(&ctx).await?;
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), false, &ctx)
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
|
||||
.await?,
|
||||
size as BlockNumber
|
||||
);
|
||||
|
||||
@@ -49,6 +49,8 @@ char *neon_auth_token;
|
||||
int readahead_buffer_size = 128;
|
||||
int flush_every_n_requests = 8;
|
||||
|
||||
int neon_protocol_version = 1;
|
||||
|
||||
static int n_reconnect_attempts = 0;
|
||||
static int max_reconnect_attempts = 60;
|
||||
static int stripe_size;
|
||||
@@ -379,7 +381,17 @@ pageserver_connect(shardno_t shard_no, int elevel)
|
||||
pfree(msg);
|
||||
return false;
|
||||
}
|
||||
query = psprintf("pagestream %s %s", neon_tenant, neon_timeline);
|
||||
switch (neon_protocol_version)
|
||||
{
|
||||
case 2:
|
||||
query = psprintf("pagestream_v2 %s %s", neon_tenant, neon_timeline);
|
||||
break;
|
||||
case 1:
|
||||
query = psprintf("pagestream %s %s", neon_tenant, neon_timeline);
|
||||
break;
|
||||
default:
|
||||
elog(ERROR, "unexpected neon_protocol_version %d", neon_protocol_version);
|
||||
}
|
||||
ret = PQsendQuery(conn, query);
|
||||
pfree(query);
|
||||
if (ret != 1)
|
||||
@@ -440,7 +452,7 @@ pageserver_connect(shardno_t shard_no, int elevel)
|
||||
return false;
|
||||
}
|
||||
|
||||
neon_shard_log(shard_no, LOG, "libpagestore: connected to '%s'", connstr);
|
||||
neon_shard_log(shard_no, LOG, "libpagestore: connected to '%s' with protocol version %d", connstr, neon_protocol_version);
|
||||
page_servers[shard_no].conn = conn;
|
||||
page_servers[shard_no].wes = wes;
|
||||
|
||||
@@ -844,6 +856,16 @@ pg_init_libpagestore(void)
|
||||
PGC_USERSET,
|
||||
0, /* no flags required */
|
||||
NULL, (GucIntAssignHook) &readahead_buffer_resize, NULL);
|
||||
DefineCustomIntVariable("neon.protocol_version",
|
||||
"Version of compute<->page server protocol",
|
||||
NULL,
|
||||
&neon_protocol_version,
|
||||
1, /* default to old protocol for now */
|
||||
1, /* min */
|
||||
2, /* max */
|
||||
PGC_SU_BACKEND,
|
||||
0, /* no flags required */
|
||||
NULL, NULL, NULL);
|
||||
|
||||
relsize_hash_init();
|
||||
|
||||
|
||||
@@ -69,18 +69,33 @@ typedef enum {
|
||||
SLRU_MULTIXACT_OFFSETS
|
||||
} SlruKind;
|
||||
|
||||
/*
|
||||
* supertype of all the Neon*Request structs below
|
||||
/*--
|
||||
* supertype of all the Neon*Request structs below.
|
||||
*
|
||||
* If 'latest' is true, we are requesting the latest page version, and 'lsn'
|
||||
* is just a hint to the server that we know there are no versions of the page
|
||||
* (or relation size, for exists/nblocks requests) later than the 'lsn'.
|
||||
* All requests contain two LSNs:
|
||||
*
|
||||
* lsn: request page (or relation size, etc) at this LSN
|
||||
* not_modified_since: Hint that the page hasn't been modified between
|
||||
* this LSN and the request LSN (`lsn`).
|
||||
*
|
||||
* To request the latest version of a page, you can use MAX_LSN as the request
|
||||
* LSN.
|
||||
*
|
||||
* If you don't know any better, you can always set 'not_modified_since' equal
|
||||
* to 'lsn', but providing a lower value can speed up processing the request
|
||||
* in the pageserver, as it doesn't need to wait for the WAL to arrive, and it
|
||||
* can skip traversing through recent layers which we know to not contain any
|
||||
* versions for the requested page.
|
||||
*
|
||||
* These structs describe the V2 of these requests. The old V1 protocol contained
|
||||
* just one LSN and a boolean 'latest' flag. If the neon_protocol_version GUC is
|
||||
* set to 1, we will convert these to the V1 requests before sending.
|
||||
*/
|
||||
typedef struct
|
||||
{
|
||||
NeonMessageTag tag;
|
||||
bool latest; /* if true, request latest page version */
|
||||
XLogRecPtr lsn; /* request page version @ this LSN */
|
||||
XLogRecPtr lsn;
|
||||
XLogRecPtr not_modified_since;
|
||||
} NeonRequest;
|
||||
|
||||
typedef struct
|
||||
@@ -193,6 +208,7 @@ extern int readahead_buffer_size;
|
||||
extern char *neon_timeline;
|
||||
extern char *neon_tenant;
|
||||
extern int32 max_cluster_size;
|
||||
extern int neon_protocol_version;
|
||||
|
||||
extern shardno_t get_shard_number(BufferTag* tag);
|
||||
|
||||
@@ -225,14 +241,14 @@ extern bool neon_prefetch(SMgrRelation reln, ForkNumber forknum,
|
||||
extern void neon_read(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
|
||||
char *buffer);
|
||||
extern PGDLLEXPORT void neon_read_at_lsn(NRelFileInfo rnode, ForkNumber forkNum, BlockNumber blkno,
|
||||
XLogRecPtr request_lsn, bool request_latest, char *buffer);
|
||||
XLogRecPtr request_lsn, XLogRecPtr not_modified_since, char *buffer);
|
||||
extern void neon_write(SMgrRelation reln, ForkNumber forknum,
|
||||
BlockNumber blocknum, char *buffer, bool skipFsync);
|
||||
#else
|
||||
extern void neon_read(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
|
||||
void *buffer);
|
||||
extern PGDLLEXPORT void neon_read_at_lsn(NRelFileInfo rnode, ForkNumber forkNum, BlockNumber blkno,
|
||||
XLogRecPtr request_lsn, bool request_latest, void *buffer);
|
||||
XLogRecPtr request_lsn, XLogRecPtr not_modified_since, void *buffer);
|
||||
extern void neon_write(SMgrRelation reln, ForkNumber forknum,
|
||||
BlockNumber blocknum, const void *buffer, bool skipFsync);
|
||||
#endif
|
||||
|
||||
@@ -168,8 +168,8 @@ typedef enum PrefetchStatus
|
||||
typedef struct PrefetchRequest
|
||||
{
|
||||
BufferTag buftag; /* must be first entry in the struct */
|
||||
XLogRecPtr effective_request_lsn;
|
||||
XLogRecPtr actual_request_lsn;
|
||||
XLogRecPtr request_lsn;
|
||||
XLogRecPtr not_modified_since;
|
||||
NeonResponse *response; /* may be null */
|
||||
PrefetchStatus status;
|
||||
shardno_t shard_no;
|
||||
@@ -269,19 +269,19 @@ static PrefetchState *MyPState;
|
||||
) \
|
||||
)
|
||||
|
||||
static XLogRecPtr prefetch_lsn = 0;
|
||||
|
||||
static bool compact_prefetch_buffers(void);
|
||||
static void consume_prefetch_responses(void);
|
||||
static uint64 prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_lsn);
|
||||
static uint64 prefetch_register_buffer(BufferTag tag, XLogRecPtr *force_request_lsn, XLogRecPtr *force_not_modified_since);
|
||||
static bool prefetch_read(PrefetchRequest *slot);
|
||||
static void prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force_lsn);
|
||||
static void prefetch_do_request(PrefetchRequest *slot, XLogRecPtr *force_request_lsn, XLogRecPtr *force_not_modified_since);
|
||||
static bool prefetch_wait_for(uint64 ring_index);
|
||||
static void prefetch_cleanup_trailing_unused(void);
|
||||
static inline void prefetch_set_unused(uint64 ring_index);
|
||||
|
||||
static XLogRecPtr neon_get_request_lsn(bool *latest, NRelFileInfo rinfo,
|
||||
ForkNumber forknum, BlockNumber blkno);
|
||||
static void neon_get_request_lsn(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
|
||||
XLogRecPtr *request_lsn, XLogRecPtr *not_modified_since);
|
||||
static bool neon_prefetch_response_usable(XLogRecPtr request_lsn, XLogRecPtr not_modified_since,
|
||||
PrefetchRequest *slot);
|
||||
|
||||
static bool
|
||||
compact_prefetch_buffers(void)
|
||||
@@ -338,8 +338,8 @@ compact_prefetch_buffers(void)
|
||||
target_slot->shard_no = source_slot->shard_no;
|
||||
target_slot->status = source_slot->status;
|
||||
target_slot->response = source_slot->response;
|
||||
target_slot->effective_request_lsn = source_slot->effective_request_lsn;
|
||||
target_slot->actual_request_lsn = source_slot->actual_request_lsn;
|
||||
target_slot->request_lsn = source_slot->request_lsn;
|
||||
target_slot->not_modified_since = source_slot->not_modified_since;
|
||||
target_slot->my_ring_index = empty_ring_index;
|
||||
|
||||
prfh_delete(MyPState->prf_hash, source_slot);
|
||||
@@ -358,7 +358,8 @@ compact_prefetch_buffers(void)
|
||||
};
|
||||
source_slot->response = NULL;
|
||||
source_slot->my_ring_index = 0;
|
||||
source_slot->effective_request_lsn = 0;
|
||||
source_slot->request_lsn = InvalidXLogRecPtr;
|
||||
source_slot->not_modified_since = InvalidXLogRecPtr;
|
||||
|
||||
/* update bookkeeping */
|
||||
n_moved++;
|
||||
@@ -683,56 +684,39 @@ prefetch_set_unused(uint64 ring_index)
|
||||
compact_prefetch_buffers();
|
||||
}
|
||||
|
||||
/*
|
||||
* Send one prefetch request to the pageserver. To wait for the response, call
|
||||
* prefetch_wait_for().
|
||||
*/
|
||||
static void
|
||||
prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force_lsn)
|
||||
prefetch_do_request(PrefetchRequest *slot, XLogRecPtr *force_request_lsn, XLogRecPtr *force_not_modified_since)
|
||||
{
|
||||
bool found;
|
||||
NeonGetPageRequest request = {
|
||||
.req.tag = T_NeonGetPageRequest,
|
||||
.req.latest = false,
|
||||
.req.lsn = 0,
|
||||
/* lsn and not_modified_since are filled in below */
|
||||
.rinfo = BufTagGetNRelFileInfo(slot->buftag),
|
||||
.forknum = slot->buftag.forkNum,
|
||||
.blkno = slot->buftag.blockNum,
|
||||
};
|
||||
|
||||
if (force_lsn && force_latest)
|
||||
Assert(((force_request_lsn != NULL) == (force_not_modified_since != NULL)));
|
||||
|
||||
if (force_request_lsn)
|
||||
{
|
||||
request.req.lsn = *force_lsn;
|
||||
request.req.latest = *force_latest;
|
||||
slot->actual_request_lsn = slot->effective_request_lsn = *force_lsn;
|
||||
request.req.lsn = *force_request_lsn;
|
||||
request.req.not_modified_since = *force_not_modified_since;
|
||||
}
|
||||
else
|
||||
{
|
||||
XLogRecPtr lsn = neon_get_request_lsn(
|
||||
&request.req.latest,
|
||||
BufTagGetNRelFileInfo(slot->buftag),
|
||||
slot->buftag.forkNum,
|
||||
slot->buftag.blockNum
|
||||
);
|
||||
|
||||
/*
|
||||
* Note: effective_request_lsn is potentially higher than the
|
||||
* requested LSN, but still correct:
|
||||
*
|
||||
* We know there are no changes between the actual requested LSN and
|
||||
* the value of effective_request_lsn: If there were, the page would
|
||||
* have been in cache and evicted between those LSN values, which then
|
||||
* would have had to result in a larger request LSN for this page.
|
||||
*
|
||||
* It is possible that a concurrent backend loads the page, modifies
|
||||
* it and then evicts it again, but the LSN of that eviction cannot be
|
||||
* smaller than the current WAL insert/redo pointer, which is already
|
||||
* larger than this prefetch_lsn. So in any case, that would
|
||||
* invalidate this cache.
|
||||
*
|
||||
* The best LSN to use for effective_request_lsn would be
|
||||
* XLogCtl->Insert.RedoRecPtr, but that's expensive to access.
|
||||
*/
|
||||
slot->actual_request_lsn = request.req.lsn = lsn;
|
||||
prefetch_lsn = Max(prefetch_lsn, lsn);
|
||||
slot->effective_request_lsn = prefetch_lsn;
|
||||
neon_get_request_lsn(BufTagGetNRelFileInfo(slot->buftag),
|
||||
slot->buftag.forkNum,
|
||||
slot->buftag.blockNum,
|
||||
&request.req.lsn,
|
||||
&request.req.not_modified_since);
|
||||
}
|
||||
slot->request_lsn = request.req.lsn;
|
||||
slot->not_modified_since = request.req.not_modified_since;
|
||||
|
||||
Assert(slot->response == NULL);
|
||||
Assert(slot->my_ring_index == MyPState->ring_unused);
|
||||
@@ -749,7 +733,6 @@ prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force
|
||||
/* update slot state */
|
||||
slot->status = PRFS_REQUESTED;
|
||||
|
||||
|
||||
prfh_insert(MyPState->prf_hash, slot, &found);
|
||||
Assert(!found);
|
||||
}
|
||||
@@ -759,22 +742,25 @@ prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force
|
||||
*
|
||||
* Register that we may want the contents of BufferTag in the near future.
|
||||
*
|
||||
* If force_latest and force_lsn are not NULL, those values are sent to the
|
||||
* pageserver. If they are NULL, we utilize the lastWrittenLsn -infrastructure
|
||||
* to fill in these values manually.
|
||||
* If force_request_lsn and force_not_modified_since are not NULL, those
|
||||
* values are sent to the pageserver. If they are NULL, we utilize the
|
||||
* lastWrittenLsn -infrastructure to fill them in.
|
||||
*
|
||||
* NOTE: this function may indirectly update MyPState->pfs_hash; which
|
||||
* invalidates any active pointers into the hash table.
|
||||
*/
|
||||
|
||||
static uint64
|
||||
prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_lsn)
|
||||
prefetch_register_buffer(BufferTag tag, XLogRecPtr *force_request_lsn,
|
||||
XLogRecPtr *force_not_modified_since)
|
||||
{
|
||||
uint64 ring_index;
|
||||
PrefetchRequest req;
|
||||
PrefetchRequest *slot;
|
||||
PrfHashEntry *entry;
|
||||
|
||||
Assert(((force_request_lsn != NULL) == (force_not_modified_since != NULL)));
|
||||
|
||||
/* use an intermediate PrefetchRequest struct to ensure correct alignment */
|
||||
req.buftag = tag;
|
||||
Retry:
|
||||
@@ -792,40 +778,19 @@ Retry:
|
||||
Assert(BUFFERTAGS_EQUAL(slot->buftag, tag));
|
||||
|
||||
/*
|
||||
* If we want a specific lsn, we do not accept requests that were made
|
||||
* with a potentially different LSN.
|
||||
* If the caller specified a request LSN to use, only accept prefetch
|
||||
* responses that satisfy that request.
|
||||
*/
|
||||
if (force_latest && force_lsn)
|
||||
if (force_request_lsn)
|
||||
{
|
||||
/*
|
||||
* if we want the latest version, any effective_request_lsn <
|
||||
* request lsn is OK
|
||||
*/
|
||||
if (*force_latest)
|
||||
if (!neon_prefetch_response_usable(*force_request_lsn,
|
||||
*force_not_modified_since, slot))
|
||||
{
|
||||
if (*force_lsn > slot->effective_request_lsn)
|
||||
{
|
||||
if (!prefetch_wait_for(ring_index))
|
||||
goto Retry;
|
||||
prefetch_set_unused(ring_index);
|
||||
entry = NULL;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/*
|
||||
* if we don't want the latest version, only accept requests with
|
||||
* the exact same LSN
|
||||
*/
|
||||
else
|
||||
{
|
||||
if (*force_lsn != slot->effective_request_lsn)
|
||||
{
|
||||
if (!prefetch_wait_for(ring_index))
|
||||
goto Retry;
|
||||
prefetch_set_unused(ring_index);
|
||||
entry = NULL;
|
||||
}
|
||||
/* Wait for the old request to finish and discard it */
|
||||
if (!prefetch_wait_for(ring_index))
|
||||
goto Retry;
|
||||
prefetch_set_unused(ring_index);
|
||||
entry = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -921,7 +886,7 @@ Retry:
|
||||
slot->shard_no = get_shard_number(&tag);
|
||||
slot->my_ring_index = ring_index;
|
||||
|
||||
prefetch_do_request(slot, force_latest, force_lsn);
|
||||
prefetch_do_request(slot, force_request_lsn, force_not_modified_since);
|
||||
Assert(slot->status == PRFS_REQUESTED);
|
||||
Assert(MyPState->ring_last <= ring_index &&
|
||||
ring_index < MyPState->ring_unused);
|
||||
@@ -950,7 +915,7 @@ page_server_request(void const *req)
|
||||
BufferTag tag = {0};
|
||||
shardno_t shard_no;
|
||||
|
||||
switch (((NeonRequest *) req)->tag)
|
||||
switch (messageTag(req))
|
||||
{
|
||||
case T_NeonExistsRequest:
|
||||
CopyNRelFileInfoToBufTag(tag, ((NeonExistsRequest *) req)->rinfo);
|
||||
@@ -966,11 +931,10 @@ page_server_request(void const *req)
|
||||
tag.blockNum = ((NeonGetPageRequest *) req)->blkno;
|
||||
break;
|
||||
default:
|
||||
neon_log(ERROR, "Unexpected request tag: %d", ((NeonRequest *) req)->tag);
|
||||
neon_log(ERROR, "Unexpected request tag: %d", messageTag(req));
|
||||
}
|
||||
shard_no = get_shard_number(&tag);
|
||||
|
||||
|
||||
/*
|
||||
* Current sharding model assumes that all metadata is present only at shard 0.
|
||||
* We still need to call get_shard_no() to check if shard map is up-to-date.
|
||||
@@ -997,8 +961,52 @@ nm_pack_request(NeonRequest *msg)
|
||||
StringInfoData s;
|
||||
|
||||
initStringInfo(&s);
|
||||
pq_sendbyte(&s, msg->tag);
|
||||
|
||||
if (neon_protocol_version >= 2)
|
||||
{
|
||||
pq_sendbyte(&s, msg->tag);
|
||||
pq_sendint64(&s, msg->lsn);
|
||||
pq_sendint64(&s, msg->not_modified_since);
|
||||
}
|
||||
else
|
||||
{
|
||||
bool latest;
|
||||
XLogRecPtr lsn;
|
||||
|
||||
/*
|
||||
* In primary, we always request the latest page version.
|
||||
*/
|
||||
if (!RecoveryInProgress())
|
||||
{
|
||||
latest = true;
|
||||
lsn = msg->not_modified_since;
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* In the protocol V1, we cannot represent that we want to read
|
||||
* page at LSN X, and we know that it hasn't been modified since
|
||||
* Y. We can either use 'not_modified_lsn' as the request LSN, and
|
||||
* risk getting an error if that LSN is too old and has already
|
||||
* fallen out of the pageserver's GC horizon, or we can send
|
||||
* 'request_lsn', causing the pageserver to possibly wait for the
|
||||
* recent WAL to arrive unnecessarily. Or something in between. We
|
||||
* choose to use the old LSN and risk GC errors, because that's
|
||||
* what we've done historically.
|
||||
*/
|
||||
latest = false;
|
||||
lsn = msg->not_modified_since;
|
||||
}
|
||||
|
||||
pq_sendbyte(&s, msg->tag);
|
||||
pq_sendbyte(&s, latest);
|
||||
pq_sendint64(&s, lsn);
|
||||
}
|
||||
|
||||
/*
|
||||
* The rest of the request messages are the same between protocol V1 and
|
||||
* V2
|
||||
*/
|
||||
switch (messageTag(msg))
|
||||
{
|
||||
/* pagestore_client -> pagestore */
|
||||
@@ -1006,8 +1014,6 @@ nm_pack_request(NeonRequest *msg)
|
||||
{
|
||||
NeonExistsRequest *msg_req = (NeonExistsRequest *) msg;
|
||||
|
||||
pq_sendbyte(&s, msg_req->req.latest);
|
||||
pq_sendint64(&s, msg_req->req.lsn);
|
||||
pq_sendint32(&s, NInfoGetSpcOid(msg_req->rinfo));
|
||||
pq_sendint32(&s, NInfoGetDbOid(msg_req->rinfo));
|
||||
pq_sendint32(&s, NInfoGetRelNumber(msg_req->rinfo));
|
||||
@@ -1019,8 +1025,6 @@ nm_pack_request(NeonRequest *msg)
|
||||
{
|
||||
NeonNblocksRequest *msg_req = (NeonNblocksRequest *) msg;
|
||||
|
||||
pq_sendbyte(&s, msg_req->req.latest);
|
||||
pq_sendint64(&s, msg_req->req.lsn);
|
||||
pq_sendint32(&s, NInfoGetSpcOid(msg_req->rinfo));
|
||||
pq_sendint32(&s, NInfoGetDbOid(msg_req->rinfo));
|
||||
pq_sendint32(&s, NInfoGetRelNumber(msg_req->rinfo));
|
||||
@@ -1032,8 +1036,6 @@ nm_pack_request(NeonRequest *msg)
|
||||
{
|
||||
NeonDbSizeRequest *msg_req = (NeonDbSizeRequest *) msg;
|
||||
|
||||
pq_sendbyte(&s, msg_req->req.latest);
|
||||
pq_sendint64(&s, msg_req->req.lsn);
|
||||
pq_sendint32(&s, msg_req->dbNode);
|
||||
|
||||
break;
|
||||
@@ -1042,8 +1044,6 @@ nm_pack_request(NeonRequest *msg)
|
||||
{
|
||||
NeonGetPageRequest *msg_req = (NeonGetPageRequest *) msg;
|
||||
|
||||
pq_sendbyte(&s, msg_req->req.latest);
|
||||
pq_sendint64(&s, msg_req->req.lsn);
|
||||
pq_sendint32(&s, NInfoGetSpcOid(msg_req->rinfo));
|
||||
pq_sendint32(&s, NInfoGetDbOid(msg_req->rinfo));
|
||||
pq_sendint32(&s, NInfoGetRelNumber(msg_req->rinfo));
|
||||
@@ -1057,8 +1057,6 @@ nm_pack_request(NeonRequest *msg)
|
||||
{
|
||||
NeonGetSlruSegmentRequest *msg_req = (NeonGetSlruSegmentRequest *) msg;
|
||||
|
||||
pq_sendbyte(&s, msg_req->req.latest);
|
||||
pq_sendint64(&s, msg_req->req.lsn);
|
||||
pq_sendbyte(&s, msg_req->kind);
|
||||
pq_sendint32(&s, msg_req->segno);
|
||||
|
||||
@@ -1209,7 +1207,7 @@ nm_to_string(NeonMessage *msg)
|
||||
appendStringInfo(&s, ", \"rinfo\": \"%u/%u/%u\"", RelFileInfoFmt(msg_req->rinfo));
|
||||
appendStringInfo(&s, ", \"forknum\": %d", msg_req->forknum);
|
||||
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
|
||||
appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest);
|
||||
appendStringInfo(&s, ", \"not_modified_since\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.not_modified_since));
|
||||
appendStringInfoChar(&s, '}');
|
||||
break;
|
||||
}
|
||||
@@ -1222,7 +1220,7 @@ nm_to_string(NeonMessage *msg)
|
||||
appendStringInfo(&s, ", \"rinfo\": \"%u/%u/%u\"", RelFileInfoFmt(msg_req->rinfo));
|
||||
appendStringInfo(&s, ", \"forknum\": %d", msg_req->forknum);
|
||||
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
|
||||
appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest);
|
||||
appendStringInfo(&s, ", \"not_modified_since\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.not_modified_since));
|
||||
appendStringInfoChar(&s, '}');
|
||||
break;
|
||||
}
|
||||
@@ -1236,7 +1234,7 @@ nm_to_string(NeonMessage *msg)
|
||||
appendStringInfo(&s, ", \"forknum\": %d", msg_req->forknum);
|
||||
appendStringInfo(&s, ", \"blkno\": %u", msg_req->blkno);
|
||||
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
|
||||
appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest);
|
||||
appendStringInfo(&s, ", \"not_modified_since\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.not_modified_since));
|
||||
appendStringInfoChar(&s, '}');
|
||||
break;
|
||||
}
|
||||
@@ -1247,7 +1245,7 @@ nm_to_string(NeonMessage *msg)
|
||||
appendStringInfoString(&s, "{\"type\": \"NeonDbSizeRequest\"");
|
||||
appendStringInfo(&s, ", \"dbnode\": \"%u\"", msg_req->dbNode);
|
||||
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
|
||||
appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest);
|
||||
appendStringInfo(&s, ", \"not_modified_since\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.not_modified_since));
|
||||
appendStringInfoChar(&s, '}');
|
||||
break;
|
||||
}
|
||||
@@ -1259,7 +1257,7 @@ nm_to_string(NeonMessage *msg)
|
||||
appendStringInfo(&s, ", \"kind\": %u", msg_req->kind);
|
||||
appendStringInfo(&s, ", \"segno\": %u", msg_req->segno);
|
||||
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
|
||||
appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest);
|
||||
appendStringInfo(&s, ", \"not_modified_since\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.not_modified_since));
|
||||
appendStringInfoChar(&s, '}');
|
||||
break;
|
||||
}
|
||||
@@ -1531,44 +1529,38 @@ nm_adjust_lsn(XLogRecPtr lsn)
|
||||
/*
|
||||
* Return LSN for requesting pages and number of blocks from page server
|
||||
*/
|
||||
static XLogRecPtr
|
||||
neon_get_request_lsn(bool *latest, NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno)
|
||||
static void
|
||||
neon_get_request_lsn(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
|
||||
XLogRecPtr *request_lsn, XLogRecPtr *not_modified_since)
|
||||
{
|
||||
XLogRecPtr lsn;
|
||||
XLogRecPtr last_written_lsn;
|
||||
|
||||
last_written_lsn = GetLastWrittenLSN(rinfo, forknum, blkno);
|
||||
last_written_lsn = nm_adjust_lsn(last_written_lsn);
|
||||
Assert(last_written_lsn != InvalidXLogRecPtr);
|
||||
|
||||
if (RecoveryInProgress())
|
||||
{
|
||||
/*
|
||||
* We don't know if WAL has been generated but not yet replayed, so
|
||||
* we're conservative in our estimates about latest pages.
|
||||
*/
|
||||
*latest = false;
|
||||
/* Request the page at the last replayed LSN. */
|
||||
*request_lsn = GetXLogReplayRecPtr(NULL);
|
||||
*not_modified_since = last_written_lsn;
|
||||
Assert(last_written_lsn <= *request_lsn);
|
||||
|
||||
/*
|
||||
* Get the last written LSN of this page.
|
||||
*/
|
||||
lsn = GetLastWrittenLSN(rinfo, forknum, blkno);
|
||||
lsn = nm_adjust_lsn(lsn);
|
||||
|
||||
neon_log(DEBUG1, "neon_get_request_lsn GetXLogReplayRecPtr %X/%X request lsn 0 ",
|
||||
(uint32) ((lsn) >> 32), (uint32) (lsn));
|
||||
neon_log(DEBUG1, "neon_get_request_lsn request lsn %X/%X, not_modified_since %X/%X",
|
||||
LSN_FORMAT_ARGS(*request_lsn), LSN_FORMAT_ARGS(*not_modified_since));
|
||||
}
|
||||
else
|
||||
{
|
||||
XLogRecPtr flushlsn;
|
||||
|
||||
/*
|
||||
* Use the latest LSN that was evicted from the buffer cache. Any
|
||||
* pages modified by later WAL records must still in the buffer cache,
|
||||
* so our request cannot concern those.
|
||||
* Use the latest LSN that was evicted from the buffer cache as the
|
||||
* 'not_modified_since' hint. Any pages modified by later WAL records
|
||||
* must still in the buffer cache, so our request cannot concern
|
||||
* those.
|
||||
*/
|
||||
*latest = true;
|
||||
lsn = GetLastWrittenLSN(rinfo, forknum, blkno);
|
||||
Assert(lsn != InvalidXLogRecPtr);
|
||||
neon_log(DEBUG1, "neon_get_request_lsn GetLastWrittenLSN lsn %X/%X ",
|
||||
(uint32) ((lsn) >> 32), (uint32) (lsn));
|
||||
|
||||
lsn = nm_adjust_lsn(lsn);
|
||||
LSN_FORMAT_ARGS(last_written_lsn));
|
||||
|
||||
/*
|
||||
* Is it possible that the last-written LSN is ahead of last flush
|
||||
@@ -1583,16 +1575,109 @@ neon_get_request_lsn(bool *latest, NRelFileInfo rinfo, ForkNumber forknum, Block
|
||||
#else
|
||||
flushlsn = GetFlushRecPtr();
|
||||
#endif
|
||||
if (lsn > flushlsn)
|
||||
if (last_written_lsn > flushlsn)
|
||||
{
|
||||
neon_log(DEBUG5, "last-written LSN %X/%X is ahead of last flushed LSN %X/%X",
|
||||
(uint32) (lsn >> 32), (uint32) lsn,
|
||||
(uint32) (flushlsn >> 32), (uint32) flushlsn);
|
||||
XLogFlush(lsn);
|
||||
LSN_FORMAT_ARGS(last_written_lsn),
|
||||
LSN_FORMAT_ARGS(flushlsn));
|
||||
XLogFlush(last_written_lsn);
|
||||
flushlsn = last_written_lsn;
|
||||
}
|
||||
|
||||
/*
|
||||
* Request the latest version of the page. The most up-to-date request
|
||||
* LSN we could use would be the current insert LSN, but to avoid the
|
||||
* overhead of looking it up, use 'flushlsn' instead. This relies on
|
||||
* the assumption that if the page was modified since the last WAL
|
||||
* flush, it should still be in the buffer cache, and we wouldn't be
|
||||
* requesting it.
|
||||
*/
|
||||
*request_lsn = flushlsn;
|
||||
*not_modified_since = last_written_lsn;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* neon_prefetch_response_usable -- Can a new request be satisfied by old one?
|
||||
*
|
||||
* This is used to check if the response to a prefetch request can be used to
|
||||
* satisfy a page read now.
|
||||
*/
|
||||
static bool
|
||||
neon_prefetch_response_usable(XLogRecPtr request_lsn, XLogRecPtr not_modified_since,
|
||||
PrefetchRequest *slot)
|
||||
{
|
||||
/* sanity check the LSN's on the old and the new request */
|
||||
Assert(request_lsn >= not_modified_since);
|
||||
Assert(slot->request_lsn >= slot->not_modified_since);
|
||||
Assert(slot->status != PRFS_UNUSED);
|
||||
|
||||
/*
|
||||
* The new request's LSN should never be older than the old one. This
|
||||
* could be an Assert, except that for testing purposes, we do provide an
|
||||
* interface in neon_test_utils to fetch pages at arbitary LSNs, which
|
||||
* violates this.
|
||||
*
|
||||
* Similarly, the not_modified_since value calculated for a page should
|
||||
* never move backwards. This assumption is a bit fragile; if we updated
|
||||
* the last-written cache when we read in a page, for example, then it
|
||||
* might. But as the code stands, it should not.
|
||||
*
|
||||
* (If two backends issue a request at the same time, they might race and
|
||||
* calculate LSNs "out of order" with each other, but the prefetch queue
|
||||
* is backend-private at the moment.)
|
||||
*/
|
||||
if (request_lsn < slot->request_lsn || not_modified_since < slot->not_modified_since)
|
||||
{
|
||||
ereport(LOG,
|
||||
(errcode(ERRCODE_IO_ERROR),
|
||||
errmsg(NEON_TAG "request with unexpected LSN after prefetch"),
|
||||
errdetail("Request %X/%X not_modified_since %X/%X, prefetch %X/%X not_modified_since %X/%X)",
|
||||
LSN_FORMAT_ARGS(request_lsn), LSN_FORMAT_ARGS(not_modified_since),
|
||||
LSN_FORMAT_ARGS(slot->request_lsn), LSN_FORMAT_ARGS(slot->not_modified_since))));
|
||||
return false;
|
||||
}
|
||||
|
||||
return lsn;
|
||||
/*---
|
||||
* Each request to the pageserver carries two LSN values:
|
||||
* `not_modified_since` and `request_lsn`. The (not_modified_since,
|
||||
* request_lsn] range of each request is effectively a claim that the page
|
||||
* has not been modified between those LSNs. If the range of the old
|
||||
* request in the queue overlaps with the new request, we know that the
|
||||
* page hasn't been modified in the union of the ranges. We can use the
|
||||
* response to old request to satisfy the new request in that case. For
|
||||
* example:
|
||||
*
|
||||
* 100 500
|
||||
* Old request: +--------+
|
||||
*
|
||||
* 400 800
|
||||
* New request: +--------+
|
||||
*
|
||||
* The old request claims that the page was not modified between LSNs 100
|
||||
* and 500, and the second claims that it was not modified between 400 and
|
||||
* 800. Together they mean that the page was not modified between 100 and
|
||||
* 800. Therefore the response to the old request is also valid for the
|
||||
* new request.
|
||||
*
|
||||
* This logic also holds at the boundary case that the old request's LSN
|
||||
* matches the new request's not_modified_since LSN exactly:
|
||||
*
|
||||
* 100 500
|
||||
* Old request: +--------+
|
||||
*
|
||||
* 500 900
|
||||
* New request: +--------+
|
||||
*
|
||||
* The response to the old request is the page as it was at LSN 500, and
|
||||
* the page hasn't been changed in the range (500, 900], therefore the
|
||||
* response is valid also for the new request.
|
||||
*/
|
||||
|
||||
/* this follows from the checks above */
|
||||
Assert(request_lsn >= slot->not_modified_since);
|
||||
|
||||
return not_modified_since <= slot->request_lsn;
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -1604,8 +1689,8 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum)
|
||||
bool exists;
|
||||
NeonResponse *resp;
|
||||
BlockNumber n_blocks;
|
||||
bool latest;
|
||||
XLogRecPtr request_lsn;
|
||||
XLogRecPtr not_modified_since;
|
||||
|
||||
switch (reln->smgr_relpersistence)
|
||||
{
|
||||
@@ -1660,12 +1745,13 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum)
|
||||
return false;
|
||||
}
|
||||
|
||||
request_lsn = neon_get_request_lsn(&latest, InfoFromSMgrRel(reln), forkNum, REL_METADATA_PSEUDO_BLOCKNO);
|
||||
neon_get_request_lsn(InfoFromSMgrRel(reln), forkNum, REL_METADATA_PSEUDO_BLOCKNO,
|
||||
&request_lsn, ¬_modified_since);
|
||||
{
|
||||
NeonExistsRequest request = {
|
||||
.req.tag = T_NeonExistsRequest,
|
||||
.req.latest = latest,
|
||||
.req.lsn = request_lsn,
|
||||
.req.not_modified_since = not_modified_since,
|
||||
.rinfo = InfoFromSMgrRel(reln),
|
||||
.forknum = forkNum};
|
||||
|
||||
@@ -2102,10 +2188,10 @@ neon_writeback(SMgrRelation reln, ForkNumber forknum,
|
||||
void
|
||||
#if PG_MAJORVERSION_NUM < 16
|
||||
neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
XLogRecPtr request_lsn, bool request_latest, char *buffer)
|
||||
XLogRecPtr request_lsn, XLogRecPtr not_modified_since, char *buffer)
|
||||
#else
|
||||
neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
XLogRecPtr request_lsn, bool request_latest, void *buffer)
|
||||
XLogRecPtr request_lsn, XLogRecPtr not_modified_since, void *buffer)
|
||||
#endif
|
||||
{
|
||||
NeonResponse *resp;
|
||||
@@ -2148,15 +2234,16 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
if (entry != NULL)
|
||||
{
|
||||
slot = entry->slot;
|
||||
if (slot->effective_request_lsn >= request_lsn)
|
||||
if (neon_prefetch_response_usable(request_lsn, not_modified_since, slot))
|
||||
{
|
||||
ring_index = slot->my_ring_index;
|
||||
pgBufferUsage.prefetch.hits += 1;
|
||||
}
|
||||
else /* the current prefetch LSN is not large
|
||||
* enough, so drop the prefetch */
|
||||
else
|
||||
{
|
||||
/*
|
||||
* Cannot use this prefetch, discard it
|
||||
*
|
||||
* We can't drop cache for not-yet-received requested items. It is
|
||||
* unlikely this happens, but it can happen if prefetch distance
|
||||
* is large enough and a backend didn't consume all prefetch
|
||||
@@ -2181,8 +2268,8 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
{
|
||||
pgBufferUsage.prefetch.misses += 1;
|
||||
|
||||
ring_index = prefetch_register_buffer(buftag, &request_latest,
|
||||
&request_lsn);
|
||||
ring_index = prefetch_register_buffer(buftag, &request_lsn,
|
||||
¬_modified_since);
|
||||
slot = GetPrfSlot(ring_index);
|
||||
}
|
||||
else
|
||||
@@ -2246,8 +2333,8 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, char *buffer
|
||||
neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer)
|
||||
#endif
|
||||
{
|
||||
bool latest;
|
||||
XLogRecPtr request_lsn;
|
||||
XLogRecPtr not_modified_since;
|
||||
|
||||
switch (reln->smgr_relpersistence)
|
||||
{
|
||||
@@ -2272,8 +2359,9 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
|
||||
return;
|
||||
}
|
||||
|
||||
request_lsn = neon_get_request_lsn(&latest, InfoFromSMgrRel(reln), forkNum, blkno);
|
||||
neon_read_at_lsn(InfoFromSMgrRel(reln), forkNum, blkno, request_lsn, latest, buffer);
|
||||
neon_get_request_lsn(InfoFromSMgrRel(reln), forkNum, blkno,
|
||||
&request_lsn, ¬_modified_since);
|
||||
neon_read_at_lsn(InfoFromSMgrRel(reln), forkNum, blkno, request_lsn, not_modified_since, buffer);
|
||||
|
||||
#ifdef DEBUG_COMPARE_LOCAL
|
||||
if (forkNum == MAIN_FORKNUM && IS_LOCAL_REL(reln))
|
||||
@@ -2442,8 +2530,8 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
|
||||
{
|
||||
NeonResponse *resp;
|
||||
BlockNumber n_blocks;
|
||||
bool latest;
|
||||
XLogRecPtr request_lsn;
|
||||
XLogRecPtr not_modified_since;
|
||||
|
||||
switch (reln->smgr_relpersistence)
|
||||
{
|
||||
@@ -2470,12 +2558,13 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
|
||||
return n_blocks;
|
||||
}
|
||||
|
||||
request_lsn = neon_get_request_lsn(&latest, InfoFromSMgrRel(reln), forknum, REL_METADATA_PSEUDO_BLOCKNO);
|
||||
neon_get_request_lsn(InfoFromSMgrRel(reln), forknum, REL_METADATA_PSEUDO_BLOCKNO,
|
||||
&request_lsn, ¬_modified_since);
|
||||
{
|
||||
NeonNblocksRequest request = {
|
||||
.req.tag = T_NeonNblocksRequest,
|
||||
.req.latest = latest,
|
||||
.req.lsn = request_lsn,
|
||||
.req.not_modified_since = not_modified_since,
|
||||
.rinfo = InfoFromSMgrRel(reln),
|
||||
.forknum = forknum,
|
||||
};
|
||||
@@ -2523,16 +2612,17 @@ neon_dbsize(Oid dbNode)
|
||||
{
|
||||
NeonResponse *resp;
|
||||
int64 db_size;
|
||||
XLogRecPtr request_lsn;
|
||||
bool latest;
|
||||
XLogRecPtr request_lsn,
|
||||
not_modified_since;
|
||||
NRelFileInfo dummy_node = {0};
|
||||
|
||||
request_lsn = neon_get_request_lsn(&latest, dummy_node, MAIN_FORKNUM, REL_METADATA_PSEUDO_BLOCKNO);
|
||||
neon_get_request_lsn(dummy_node, MAIN_FORKNUM, REL_METADATA_PSEUDO_BLOCKNO,
|
||||
&request_lsn, ¬_modified_since);
|
||||
{
|
||||
NeonDbSizeRequest request = {
|
||||
.req.tag = T_NeonDbSizeRequest,
|
||||
.req.latest = latest,
|
||||
.req.lsn = request_lsn,
|
||||
.req.not_modified_since = not_modified_since,
|
||||
.dbNode = dbNode,
|
||||
};
|
||||
|
||||
@@ -2605,7 +2695,6 @@ neon_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks)
|
||||
* the most recently inserted WAL record's LSN.
|
||||
*/
|
||||
lsn = GetXLogInsertRecPtr();
|
||||
|
||||
lsn = nm_adjust_lsn(lsn);
|
||||
|
||||
/*
|
||||
@@ -2805,14 +2894,33 @@ neon_end_unlogged_build(SMgrRelation reln)
|
||||
static int
|
||||
neon_read_slru_segment(SMgrRelation reln, const char* path, int segno, void* buffer)
|
||||
{
|
||||
XLogRecPtr request_lsn;
|
||||
/*
|
||||
* GetRedoStartLsn() returns LSN of basebackup.
|
||||
* We need to download SLRU segments only once after node startup,
|
||||
* then SLRUs are maintained locally.
|
||||
*/
|
||||
request_lsn = GetRedoStartLsn();
|
||||
XLogRecPtr request_lsn,
|
||||
not_modified_since;
|
||||
|
||||
if (RecoveryInProgress())
|
||||
{
|
||||
request_lsn = GetXLogReplayRecPtr(NULL);
|
||||
if (request_lsn == InvalidXLogRecPtr)
|
||||
{
|
||||
/*
|
||||
* This happens in neon startup, we start up without replaying any
|
||||
* records.
|
||||
*/
|
||||
request_lsn = GetRedoStartLsn();
|
||||
}
|
||||
}
|
||||
else
|
||||
request_lsn = GetXLogInsertRecPtr();
|
||||
request_lsn = nm_adjust_lsn(request_lsn);
|
||||
|
||||
/*
|
||||
* GetRedoStartLsn() returns LSN of basebackup. We know that the SLRU
|
||||
* segment has not changed since the basebackup, because in order to
|
||||
* modify it, we would have had to download it already. And once
|
||||
* downloaded, we never evict SLRU segments from local disk.
|
||||
*/
|
||||
not_modified_since = GetRedoStartLsn();
|
||||
|
||||
SlruKind kind;
|
||||
|
||||
if (STRPREFIX(path, "pg_xact"))
|
||||
@@ -2827,8 +2935,8 @@ neon_read_slru_segment(SMgrRelation reln, const char* path, int segno, void* buf
|
||||
NeonResponse *resp;
|
||||
NeonGetSlruSegmentRequest request = {
|
||||
.req.tag = T_NeonGetSlruSegmentRequest,
|
||||
.req.latest = false,
|
||||
.req.lsn = request_lsn,
|
||||
.req.not_modified_since = not_modified_since,
|
||||
|
||||
.kind = kind,
|
||||
.segno = segno
|
||||
@@ -2956,6 +3064,9 @@ neon_extend_rel_size(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
|
||||
{
|
||||
BlockNumber relsize;
|
||||
|
||||
/* This is only used in WAL replay */
|
||||
Assert(RecoveryInProgress());
|
||||
|
||||
/* Extend the relation if we know its size */
|
||||
if (get_cached_relsize(rinfo, forknum, &relsize))
|
||||
{
|
||||
@@ -2974,14 +3085,13 @@ neon_extend_rel_size(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
|
||||
* This length is later reused when we open the smgr to read the
|
||||
* block, which is fine and expected.
|
||||
*/
|
||||
|
||||
NeonResponse *response;
|
||||
NeonNblocksResponse *nbresponse;
|
||||
NeonNblocksRequest request = {
|
||||
.req = (NeonRequest) {
|
||||
.lsn = end_recptr,
|
||||
.latest = false,
|
||||
.tag = T_NeonNblocksRequest,
|
||||
.lsn = end_recptr,
|
||||
.not_modified_since = end_recptr,
|
||||
},
|
||||
.rinfo = rinfo,
|
||||
.forknum = forknum,
|
||||
|
||||
@@ -7,7 +7,7 @@ OBJS = \
|
||||
neontest.o
|
||||
|
||||
EXTENSION = neon_test_utils
|
||||
DATA = neon_test_utils--1.0.sql
|
||||
DATA = neon_test_utils--1.1.sql
|
||||
PGFILEDESC = "neon_test_utils - helpers for neon testing and debugging"
|
||||
|
||||
PG_CONFIG = pg_config
|
||||
|
||||
@@ -31,12 +31,12 @@ AS 'MODULE_PATHNAME', 'clear_buffer_cache'
|
||||
LANGUAGE C STRICT
|
||||
PARALLEL UNSAFE;
|
||||
|
||||
CREATE FUNCTION get_raw_page_at_lsn(relname text, forkname text, blocknum int8, lsn pg_lsn)
|
||||
CREATE FUNCTION get_raw_page_at_lsn(relname text, forkname text, blocknum int8, request_lsn pg_lsn, not_modified_since pg_lsn)
|
||||
RETURNS bytea
|
||||
AS 'MODULE_PATHNAME', 'get_raw_page_at_lsn'
|
||||
LANGUAGE C PARALLEL UNSAFE;
|
||||
|
||||
CREATE FUNCTION get_raw_page_at_lsn(tbspc oid, db oid, relfilenode oid, forknum int8, blocknum int8, lsn pg_lsn)
|
||||
CREATE FUNCTION get_raw_page_at_lsn(tbspc oid, db oid, relfilenode oid, forknum int8, blocknum int8, request_lsn pg_lsn, not_modified_since pg_lsn)
|
||||
RETURNS bytea
|
||||
AS 'MODULE_PATHNAME', 'get_raw_page_at_lsn_ex'
|
||||
LANGUAGE C PARALLEL UNSAFE;
|
||||
@@ -1,6 +1,6 @@
|
||||
# neon_test_utils extension
|
||||
comment = 'helpers for neon testing and debugging'
|
||||
default_version = '1.0'
|
||||
default_version = '1.1'
|
||||
module_pathname = '$libdir/neon_test_utils'
|
||||
relocatable = true
|
||||
trusted = true
|
||||
|
||||
@@ -48,10 +48,10 @@ PG_FUNCTION_INFO_V1(neon_xlogflush);
|
||||
*/
|
||||
#if PG_MAJORVERSION_NUM < 16
|
||||
typedef void (*neon_read_at_lsn_type) (NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
XLogRecPtr request_lsn, bool request_latest, char *buffer);
|
||||
XLogRecPtr request_lsn, XLogRecPtr not_modified_since, char *buffer);
|
||||
#else
|
||||
typedef void (*neon_read_at_lsn_type) (NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
XLogRecPtr request_lsn, bool request_latest, void *buffer);
|
||||
XLogRecPtr request_lsn, XLogRecPtr not_modified_since, void *buffer);
|
||||
#endif
|
||||
|
||||
static neon_read_at_lsn_type neon_read_at_lsn_ptr;
|
||||
@@ -299,8 +299,11 @@ get_raw_page_at_lsn(PG_FUNCTION_ARGS)
|
||||
text *forkname;
|
||||
uint32 blkno;
|
||||
|
||||
bool request_latest = PG_ARGISNULL(3);
|
||||
uint64 read_lsn = request_latest ? GetXLogInsertRecPtr() : PG_GETARG_INT64(3);
|
||||
XLogRecPtr request_lsn;
|
||||
XLogRecPtr not_modified_since;
|
||||
|
||||
if (PG_NARGS() != 5)
|
||||
elog(ERROR, "unexpected number of arguments in SQL function signature");
|
||||
|
||||
if (PG_ARGISNULL(0) || PG_ARGISNULL(1) || PG_ARGISNULL(2))
|
||||
PG_RETURN_NULL();
|
||||
@@ -309,6 +312,9 @@ get_raw_page_at_lsn(PG_FUNCTION_ARGS)
|
||||
forkname = PG_GETARG_TEXT_PP(1);
|
||||
blkno = PG_GETARG_UINT32(2);
|
||||
|
||||
request_lsn = PG_ARGISNULL(3) ? GetXLogInsertRecPtr() : PG_GETARG_LSN(3);
|
||||
not_modified_since = PG_ARGISNULL(4) ? request_lsn : PG_GETARG_LSN(4);
|
||||
|
||||
if (!superuser())
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
|
||||
@@ -361,7 +367,7 @@ get_raw_page_at_lsn(PG_FUNCTION_ARGS)
|
||||
SET_VARSIZE(raw_page, BLCKSZ + VARHDRSZ);
|
||||
raw_page_data = VARDATA(raw_page);
|
||||
|
||||
neon_read_at_lsn(InfoFromRelation(rel), forknum, blkno, read_lsn, request_latest, raw_page_data);
|
||||
neon_read_at_lsn(InfoFromRelation(rel), forknum, blkno, request_lsn, not_modified_since, raw_page_data);
|
||||
|
||||
relation_close(rel, AccessShareLock);
|
||||
|
||||
@@ -380,6 +386,9 @@ get_raw_page_at_lsn_ex(PG_FUNCTION_ARGS)
|
||||
{
|
||||
char *raw_page_data;
|
||||
|
||||
if (PG_NARGS() != 7)
|
||||
elog(ERROR, "unexpected number of arguments in SQL function signature");
|
||||
|
||||
if (!superuser())
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
|
||||
@@ -403,18 +412,20 @@ get_raw_page_at_lsn_ex(PG_FUNCTION_ARGS)
|
||||
};
|
||||
|
||||
ForkNumber forknum = PG_GETARG_UINT32(3);
|
||||
|
||||
uint32 blkno = PG_GETARG_UINT32(4);
|
||||
bool request_latest = PG_ARGISNULL(5);
|
||||
uint64 read_lsn = request_latest ? GetXLogInsertRecPtr() : PG_GETARG_INT64(5);
|
||||
XLogRecPtr request_lsn;
|
||||
XLogRecPtr not_modified_since;
|
||||
|
||||
/* Initialize buffer to copy to */
|
||||
bytea *raw_page = (bytea *) palloc(BLCKSZ + VARHDRSZ);
|
||||
|
||||
request_lsn = PG_ARGISNULL(5) ? GetXLogInsertRecPtr() : PG_GETARG_LSN(5);
|
||||
not_modified_since = PG_ARGISNULL(6) ? request_lsn : PG_GETARG_LSN(6);
|
||||
|
||||
SET_VARSIZE(raw_page, BLCKSZ + VARHDRSZ);
|
||||
raw_page_data = VARDATA(raw_page);
|
||||
|
||||
neon_read_at_lsn(rinfo, forknum, blkno, read_lsn, request_latest, raw_page_data);
|
||||
neon_read_at_lsn(rinfo, forknum, blkno, request_lsn, not_modified_since, raw_page_data);
|
||||
PG_RETURN_BYTEA_P(raw_page);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -75,7 +75,6 @@ pub type ComputeReady = DatabaseInfo;
|
||||
|
||||
// TODO: replace with an http-based protocol.
|
||||
struct MgmtHandler;
|
||||
#[async_trait::async_trait]
|
||||
impl postgres_backend::Handler<tokio::net::TcpStream> for MgmtHandler {
|
||||
async fn process_query(
|
||||
&mut self,
|
||||
@@ -89,6 +88,8 @@ impl postgres_backend::Handler<tokio::net::TcpStream> for MgmtHandler {
|
||||
}
|
||||
}
|
||||
|
||||
impl postgres_backend::HandlerSync<tokio::net::TcpStream> for MgmtHandler {}
|
||||
|
||||
fn try_process_query(pgb: &mut PostgresBackendTCP, query: &str) -> Result<(), QueryError> {
|
||||
let resp: KickSession = serde_json::from_str(query).context("Failed to parse query as json")?;
|
||||
|
||||
|
||||
@@ -2,10 +2,13 @@
|
||||
//! protocol commands.
|
||||
|
||||
use anyhow::Context;
|
||||
use std::net::TcpStream;
|
||||
use std::str::{self, FromStr};
|
||||
use std::sync::Arc;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio_io_timeout::TimeoutReader;
|
||||
use tracing::{debug, info, info_span, Instrument};
|
||||
use utils::measured_stream::MeasuredStream;
|
||||
|
||||
use crate::auth::check_permission;
|
||||
use crate::json_ctrl::{handle_json_ctrl, AppendLogicalMessage};
|
||||
@@ -95,8 +98,7 @@ fn cmd_to_string(cmd: &SafekeeperPostgresCommand) -> &str {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
|
||||
impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::HandlerSync<IO>
|
||||
for SafekeeperPostgresHandler
|
||||
{
|
||||
// tenant_id and timeline_id are passed in connection string params
|
||||
@@ -191,8 +193,22 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
|
||||
self.claims = Some(data.claims);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
type IO<'s, R: FnMut(usize), W> =
|
||||
MeasuredStream<std::pin::Pin<&'s mut TimeoutReader<TcpStream>>, R, W>;
|
||||
|
||||
impl<'s, R: FnMut(usize), W> postgres_backend::Handler<IO<'s, R, W>> for SafekeeperPostgresHandler {
|
||||
async fn process_query(
|
||||
&mut self,
|
||||
pgb: &mut PostgresBackend<IO<'s, R, W>>,
|
||||
query_string: &str,
|
||||
) -> Result<(), QueryError> {
|
||||
self.process_query_(pgb, &query_string).await
|
||||
}
|
||||
}
|
||||
impl SafekeeperPostgresHandler {
|
||||
async fn process_query_<IO: AsyncRead + AsyncWrite + Send + Unpin>(
|
||||
&mut self,
|
||||
pgb: &mut PostgresBackend<IO>,
|
||||
query_string: &str,
|
||||
|
||||
@@ -184,6 +184,19 @@ impl HeartbeaterTask {
|
||||
}
|
||||
}
|
||||
}
|
||||
tracing::info!(
|
||||
"Heartbeat round complete for {} nodes, {} offline",
|
||||
new_state.len(),
|
||||
new_state
|
||||
.values()
|
||||
.filter(|s| match s {
|
||||
PageserverState::Available { .. } => {
|
||||
false
|
||||
}
|
||||
PageserverState::Offline => true,
|
||||
})
|
||||
.count()
|
||||
);
|
||||
|
||||
let mut deltas = Vec::new();
|
||||
let now = Instant::now();
|
||||
|
||||
@@ -767,7 +767,10 @@ impl Reconciler {
|
||||
// It is up to the caller whether they want to drop out on this error, but they don't have to:
|
||||
// in general we should avoid letting unavailability of the cloud control plane stop us from
|
||||
// making progress.
|
||||
tracing::warn!("Failed to notify compute of attached pageserver {node}: {e}");
|
||||
if !matches!(e, NotifyError::ShuttingDown) {
|
||||
tracing::warn!("Failed to notify compute of attached pageserver {node}: {e}");
|
||||
}
|
||||
|
||||
// Set this flag so that in our ReconcileResult we will set the flag on the shard that it
|
||||
// needs to retry at some point.
|
||||
self.compute_notify_failure = true;
|
||||
|
||||
@@ -824,8 +824,7 @@ impl Service {
|
||||
|
||||
// Ordering: populate last_error before advancing error_seq,
|
||||
// so that waiters will see the correct error after waiting.
|
||||
*(tenant.last_error.lock().unwrap()) = format!("{e}");
|
||||
tenant.error_waiter.advance(result.sequence);
|
||||
tenant.set_last_error(result.sequence, e);
|
||||
|
||||
for (node_id, o) in result.observed.locations {
|
||||
tenant.observed.locations.insert(node_id, o);
|
||||
@@ -2805,7 +2804,14 @@ impl Service {
|
||||
tenant_shard_id: shard.tenant_shard_id,
|
||||
node_attached: *shard.intent.get_attached(),
|
||||
node_secondary: shard.intent.get_secondary().to_vec(),
|
||||
last_error: shard.last_error.lock().unwrap().clone(),
|
||||
last_error: shard
|
||||
.last_error
|
||||
.lock()
|
||||
.unwrap()
|
||||
.as_ref()
|
||||
.map(|e| format!("{e}"))
|
||||
.unwrap_or("".to_string())
|
||||
.clone(),
|
||||
is_reconciling: shard.reconciler.is_some(),
|
||||
is_pending_compute_notification: shard.pending_compute_notification,
|
||||
is_splitting: matches!(shard.splitting, SplitState::Splitting),
|
||||
@@ -4031,7 +4037,7 @@ impl Service {
|
||||
// TODO: in the background, we should balance work back onto this pageserver
|
||||
}
|
||||
AvailabilityTransition::Unchanged => {
|
||||
tracing::info!("Node {} no change during config", node_id);
|
||||
tracing::debug!("Node {} no change during config", node_id);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4351,7 +4357,26 @@ impl Service {
|
||||
};
|
||||
|
||||
let waiter_count = waiters.len();
|
||||
self.await_waiters(waiters, RECONCILE_TIMEOUT).await?;
|
||||
match self.await_waiters(waiters, RECONCILE_TIMEOUT).await {
|
||||
Ok(()) => {}
|
||||
Err(ReconcileWaitError::Failed(_, reconcile_error))
|
||||
if matches!(*reconcile_error, ReconcileError::Cancel) =>
|
||||
{
|
||||
// Ignore reconciler cancel errors: this reconciler might have shut down
|
||||
// because some other change superceded it. We will return a nonzero number,
|
||||
// so the caller knows they might have to call again to quiesce the system.
|
||||
}
|
||||
Err(e) => {
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
|
||||
tracing::info!(
|
||||
"{} reconciles in reconcile_all, {} waiters",
|
||||
reconciles_spawned,
|
||||
waiter_count
|
||||
);
|
||||
|
||||
Ok(waiter_count)
|
||||
}
|
||||
|
||||
|
||||
@@ -38,12 +38,18 @@ use crate::{
|
||||
};
|
||||
|
||||
/// Serialization helper
|
||||
fn read_mutex_content<S, T>(v: &std::sync::Mutex<T>, serializer: S) -> Result<S::Ok, S::Error>
|
||||
fn read_last_error<S, T>(v: &std::sync::Mutex<Option<T>>, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::ser::Serializer,
|
||||
T: Clone + std::fmt::Display,
|
||||
T: std::fmt::Display,
|
||||
{
|
||||
serializer.collect_str(&v.lock().unwrap())
|
||||
serializer.collect_str(
|
||||
&v.lock()
|
||||
.unwrap()
|
||||
.as_ref()
|
||||
.map(|e| format!("{e}"))
|
||||
.unwrap_or("".to_string()),
|
||||
)
|
||||
}
|
||||
|
||||
/// In-memory state for a particular tenant shard.
|
||||
@@ -111,11 +117,15 @@ pub(crate) struct TenantShard {
|
||||
#[serde(skip)]
|
||||
pub(crate) error_waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
|
||||
|
||||
/// The most recent error from a reconcile on this tenant
|
||||
/// The most recent error from a reconcile on this tenant. This is a nested Arc
|
||||
/// because:
|
||||
/// - ReconcileWaiters need to Arc-clone the overall object to read it later
|
||||
/// - ReconcileWaitError needs to use an `Arc<ReconcileError>` because we can construct
|
||||
/// many waiters for one shard, and the underlying error types are not Clone.
|
||||
/// TODO: generalize to an array of recent events
|
||||
/// TOOD: use a ArcSwap instead of mutex for faster reads?
|
||||
#[serde(serialize_with = "read_mutex_content")]
|
||||
pub(crate) last_error: std::sync::Arc<std::sync::Mutex<String>>,
|
||||
#[serde(serialize_with = "read_last_error")]
|
||||
pub(crate) last_error: std::sync::Arc<std::sync::Mutex<Option<Arc<ReconcileError>>>>,
|
||||
|
||||
/// If we have a pending compute notification that for some reason we weren't able to send,
|
||||
/// set this to true. If this is set, calls to [`Self::get_reconcile_needed`] will return Yes
|
||||
@@ -293,18 +303,18 @@ pub(crate) struct ReconcilerWaiter {
|
||||
|
||||
seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
|
||||
error_seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
|
||||
error: std::sync::Arc<std::sync::Mutex<String>>,
|
||||
error: std::sync::Arc<std::sync::Mutex<Option<Arc<ReconcileError>>>>,
|
||||
seq: Sequence,
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum ReconcileWaitError {
|
||||
pub(crate) enum ReconcileWaitError {
|
||||
#[error("Timeout waiting for shard {0}")]
|
||||
Timeout(TenantShardId),
|
||||
#[error("shutting down")]
|
||||
Shutdown,
|
||||
#[error("Reconcile error on shard {0}: {1}")]
|
||||
Failed(TenantShardId, String),
|
||||
Failed(TenantShardId, Arc<ReconcileError>),
|
||||
}
|
||||
|
||||
#[derive(Eq, PartialEq, Debug)]
|
||||
@@ -342,7 +352,8 @@ impl ReconcilerWaiter {
|
||||
SeqWaitError::Timeout => unreachable!()
|
||||
})?;
|
||||
|
||||
return Err(ReconcileWaitError::Failed(self.tenant_shard_id, self.error.lock().unwrap().clone()))
|
||||
return Err(ReconcileWaitError::Failed(self.tenant_shard_id,
|
||||
self.error.lock().unwrap().clone().expect("If error_seq_wait was advanced error was set").clone()))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -873,7 +884,7 @@ impl TenantShard {
|
||||
active_nodes_dirty || dirty_observed || self.pending_compute_notification;
|
||||
|
||||
if !do_reconcile {
|
||||
tracing::info!("Not dirty, no reconciliation needed.");
|
||||
tracing::debug!("Not dirty, no reconciliation needed.");
|
||||
return ReconcileNeeded::No;
|
||||
}
|
||||
|
||||
@@ -1151,6 +1162,13 @@ impl TenantShard {
|
||||
&self.scheduling_policy
|
||||
}
|
||||
|
||||
pub(crate) fn set_last_error(&mut self, sequence: Sequence, error: ReconcileError) {
|
||||
// Ordering: always set last_error before advancing sequence, so that sequence
|
||||
// waiters are guaranteed to see a Some value when they see an error.
|
||||
*(self.last_error.lock().unwrap()) = Some(Arc::new(error));
|
||||
self.error_waiter.advance(sequence);
|
||||
}
|
||||
|
||||
pub(crate) fn from_persistent(
|
||||
tsp: TenantShardPersistence,
|
||||
intent: IntentState,
|
||||
|
||||
@@ -3,7 +3,7 @@ import re
|
||||
import time
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, wait_replica_caughtup
|
||||
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, tenant_get_shards, wait_replica_caughtup
|
||||
|
||||
|
||||
# Check for corrupted WAL messages which might otherwise go unnoticed if
|
||||
@@ -102,3 +102,80 @@ def test_2_replicas_start(neon_simple_env: NeonEnv):
|
||||
) as secondary2:
|
||||
wait_replica_caughtup(primary, secondary1)
|
||||
wait_replica_caughtup(primary, secondary2)
|
||||
|
||||
|
||||
# We had an issue that a standby server made GetPage requests with an
|
||||
# old LSN, based on the last-written LSN cache, to avoid waits in the
|
||||
# pageserver. However, requesting a page with a very old LSN, such
|
||||
# that the GC horizon has already advanced past it, results in an
|
||||
# error from the pageserver:
|
||||
# "Bad request: tried to request a page version that was garbage collected"
|
||||
#
|
||||
# To avoid that, the compute<-> pageserver protocol was updated so
|
||||
# that that the standby now sends two LSNs, the old last-written LSN
|
||||
# and the current replay LSN.
|
||||
#
|
||||
# https://github.com/neondatabase/neon/issues/6211
|
||||
def test_hot_standby_gc(neon_env_builder: NeonEnvBuilder):
|
||||
tenant_conf = {
|
||||
# set PITR interval to be small, so we can do GC
|
||||
"pitr_interval": "0 s",
|
||||
}
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=tenant_conf)
|
||||
timeline_id = env.initial_timeline
|
||||
tenant_id = env.initial_tenant
|
||||
|
||||
with env.endpoints.create_start(
|
||||
branch_name="main",
|
||||
endpoint_id="primary",
|
||||
) as primary:
|
||||
with env.endpoints.new_replica_start(
|
||||
origin=primary,
|
||||
endpoint_id="secondary",
|
||||
# Protocol version 2 was introduced to fix the issue
|
||||
# that this test exercises. With protocol version 1 it
|
||||
# fails.
|
||||
config_lines=["neon.protocol_version=2"],
|
||||
) as secondary:
|
||||
p_cur = primary.connect().cursor()
|
||||
p_cur.execute("CREATE EXTENSION neon_test_utils")
|
||||
p_cur.execute("CREATE TABLE test (id int primary key) WITH (autovacuum_enabled=false)")
|
||||
p_cur.execute("INSERT INTO test SELECT generate_series(1, 10000) AS g")
|
||||
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
|
||||
s_cur = secondary.connect().cursor()
|
||||
|
||||
s_cur.execute("SELECT 1 WHERE pg_is_in_recovery()")
|
||||
res = s_cur.fetchone()
|
||||
assert res is not None
|
||||
|
||||
s_cur.execute("SELECT COUNT(*) FROM test")
|
||||
res = s_cur.fetchone()
|
||||
assert res[0] == 10000
|
||||
|
||||
# Clear the cache in the standby, so that when we
|
||||
# re-execute the query, it will make GetPage
|
||||
# requests. This does not clear the last-written LSN cache
|
||||
# so we still remember the LSNs of the pages.
|
||||
s_cur.execute("SELECT clear_buffer_cache()")
|
||||
|
||||
# Do other stuff on the primary, to advance the WAL
|
||||
p_cur.execute("CREATE TABLE test2 AS SELECT generate_series(1, 1000000) AS g")
|
||||
|
||||
# Run GC. The PITR interval is very small, so this advances the GC cutoff LSN
|
||||
# very close to the primary's current insert LSN.
|
||||
shards = tenant_get_shards(env, tenant_id, None)
|
||||
for tenant_shard_id, pageserver in shards:
|
||||
client = pageserver.http_client()
|
||||
client.timeline_checkpoint(tenant_shard_id, timeline_id)
|
||||
client.timeline_compact(tenant_shard_id, timeline_id)
|
||||
client.timeline_gc(tenant_shard_id, timeline_id, 0)
|
||||
|
||||
# Re-execute the query. The GetPage requests that this
|
||||
# generates use old not_modified_since LSNs, older than
|
||||
# the GC cutoff, but new request LSNs. (In protocol
|
||||
# version 1 there was only one LSN, and this failed.)
|
||||
s_cur.execute("SELECT COUNT(*) FROM test")
|
||||
res = s_cur.fetchone()
|
||||
assert res[0] == 10000
|
||||
|
||||
131
test_runner/regress/test_ondemand_slru_download.py
Normal file
131
test_runner/regress/test_ondemand_slru_download.py
Normal file
@@ -0,0 +1,131 @@
|
||||
from typing import Optional
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, tenant_get_shards
|
||||
from fixtures.types import Lsn
|
||||
from fixtures.utils import query_scalar
|
||||
|
||||
|
||||
#
|
||||
# Test on-demand download of the pg_xact SLRUs
|
||||
#
|
||||
@pytest.mark.parametrize("shard_count", [None, 4])
|
||||
def test_ondemand_download_pg_xact(neon_env_builder: NeonEnvBuilder, shard_count: Optional[int]):
|
||||
if shard_count is not None:
|
||||
neon_env_builder.num_pageservers = shard_count
|
||||
|
||||
tenant_conf = {
|
||||
"lazy_slru_download": "true",
|
||||
# set PITR interval to be small, so we can do GC
|
||||
"pitr_interval": "0 s",
|
||||
}
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_conf=tenant_conf, initial_tenant_shard_count=shard_count
|
||||
)
|
||||
|
||||
timeline_id = env.initial_timeline
|
||||
tenant_id = env.initial_tenant
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
pg_conn = endpoint.connect()
|
||||
cur = pg_conn.cursor()
|
||||
|
||||
cur.execute("CREATE EXTENSION neon_test_utils")
|
||||
|
||||
# Create a test table
|
||||
cur.execute("CREATE TABLE clogtest (id integer)")
|
||||
cur.execute("INSERT INTO clogtest VALUES (1)")
|
||||
|
||||
# Consume a lot of XIDs, to create more pg_xact segments
|
||||
for _ in range(1000):
|
||||
cur.execute("select test_consume_xids(10000);")
|
||||
cur.execute("INSERT INTO clogtest VALUES (2)")
|
||||
for _ in range(1000):
|
||||
cur.execute("select test_consume_xids(10000);")
|
||||
cur.execute("INSERT INTO clogtest VALUES (2)")
|
||||
for _ in range(1000):
|
||||
cur.execute("select test_consume_xids(10000);")
|
||||
cur.execute("INSERT INTO clogtest VALUES (3)")
|
||||
|
||||
# Restart postgres. After restart, the new instance will download the
|
||||
# pg_xact segments lazily.
|
||||
endpoint.stop()
|
||||
endpoint.start()
|
||||
pg_conn = endpoint.connect()
|
||||
cur = pg_conn.cursor()
|
||||
|
||||
# Consume more WAL, so that the pageserver can compact and GC older data,
|
||||
# including the LSN that we started the new endpoint at,
|
||||
cur.execute("CREATE TABLE anothertable (i int, t text)")
|
||||
cur.execute(
|
||||
"INSERT INTO anothertable SELECT g, 'long string to consume some space' || g FROM generate_series(1, 10000) g"
|
||||
)
|
||||
|
||||
# Run GC
|
||||
shards = tenant_get_shards(env, tenant_id, None)
|
||||
for tenant_shard_id, pageserver in shards:
|
||||
client = pageserver.http_client()
|
||||
client.timeline_checkpoint(tenant_shard_id, timeline_id)
|
||||
client.timeline_compact(tenant_shard_id, timeline_id)
|
||||
client.timeline_gc(tenant_shard_id, timeline_id, 0)
|
||||
|
||||
# Test that this can still on-demand download the old pg_xact segments
|
||||
cur.execute("select xmin, xmax, * from clogtest")
|
||||
tup = cur.fetchall()
|
||||
log.info(f"tuples = {tup}")
|
||||
|
||||
|
||||
@pytest.mark.parametrize("shard_count", [None, 4])
|
||||
def test_ondemand_download_replica(neon_env_builder: NeonEnvBuilder, shard_count: Optional[int]):
|
||||
if shard_count is not None:
|
||||
neon_env_builder.num_pageservers = shard_count
|
||||
|
||||
tenant_conf = {
|
||||
"lazy_slru_download": "true",
|
||||
}
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_conf=tenant_conf, initial_tenant_shard_count=shard_count
|
||||
)
|
||||
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
pg_conn = endpoint.connect()
|
||||
cur = pg_conn.cursor()
|
||||
|
||||
cur.execute("CREATE EXTENSION neon_test_utils")
|
||||
|
||||
# Create a test table
|
||||
cur.execute("CREATE TABLE clogtest (id integer)")
|
||||
cur.execute("INSERT INTO clogtest VALUES (1)")
|
||||
|
||||
# Consume a lot of XIDs, to create more pg_xact segments
|
||||
for _ in range(1000):
|
||||
cur.execute("select test_consume_xids(10000);")
|
||||
|
||||
# Open a new connection and insert another row, but leave
|
||||
# the transaction open
|
||||
pg_conn2 = endpoint.connect()
|
||||
cur2 = pg_conn2.cursor()
|
||||
cur2.execute("BEGIN")
|
||||
cur2.execute("INSERT INTO clogtest VALUES (2)")
|
||||
|
||||
# Another insert on the first connection, which is committed.
|
||||
for _ in range(1000):
|
||||
cur.execute("select test_consume_xids(10000);")
|
||||
cur.execute("INSERT INTO clogtest VALUES (3)")
|
||||
|
||||
# Start standby at this point in time
|
||||
lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_insert_lsn()"))
|
||||
endpoint_at_lsn = env.endpoints.create_start(
|
||||
branch_name="main", endpoint_id="ep-at-lsn", lsn=lsn
|
||||
)
|
||||
|
||||
# Commit transaction 2, after the standby was launched.
|
||||
cur2.execute("COMMIT")
|
||||
|
||||
# The replica should not see transaction 2 as committed.
|
||||
conn_replica = endpoint_at_lsn.connect()
|
||||
cur_replica = conn_replica.cursor()
|
||||
cur_replica.execute("SELECT * FROM clogtest")
|
||||
assert cur_replica.fetchall() == [(1,), (3,)]
|
||||
@@ -17,7 +17,14 @@ def test_read_validation(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
env.neon_cli.create_branch("test_read_validation", "empty")
|
||||
|
||||
endpoint = env.endpoints.create_start("test_read_validation")
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_read_validation",
|
||||
# Use protocol version 2, because the code that constructs the V1 messages
|
||||
# assumes that a primary always wants to read the latest version of a page,
|
||||
# and therefore doesn't work with the test functions below to read an older
|
||||
# page version.
|
||||
config_lines=["neon.protocol_version=2"],
|
||||
)
|
||||
|
||||
with closing(endpoint.connect()) as con:
|
||||
with con.cursor() as c:
|
||||
@@ -64,7 +71,7 @@ def test_read_validation(neon_simple_env: NeonEnv):
|
||||
log.info("Cache is clear, reading stale page version")
|
||||
|
||||
c.execute(
|
||||
f"select lsn, lower, upper from page_header(get_raw_page_at_lsn('foo', 'main', 0, '{first[0]}'))"
|
||||
f"select lsn, lower, upper from page_header(get_raw_page_at_lsn('foo', 'main', 0, '{first[0]}', NULL))"
|
||||
)
|
||||
direct_first = c.fetchone()
|
||||
assert first == direct_first, "Failed fetch page at historic lsn"
|
||||
@@ -77,7 +84,7 @@ def test_read_validation(neon_simple_env: NeonEnv):
|
||||
log.info("Cache is clear, reading latest page version without cache")
|
||||
|
||||
c.execute(
|
||||
"select lsn, lower, upper from page_header(get_raw_page_at_lsn('foo', 'main', 0, NULL))"
|
||||
"select lsn, lower, upper from page_header(get_raw_page_at_lsn('foo', 'main', 0, NULL, NULL))"
|
||||
)
|
||||
direct_latest = c.fetchone()
|
||||
assert second == direct_latest, "Failed fetch page at latest lsn"
|
||||
@@ -92,7 +99,7 @@ def test_read_validation(neon_simple_env: NeonEnv):
|
||||
)
|
||||
|
||||
c.execute(
|
||||
f"select lsn, lower, upper from page_header(get_raw_page_at_lsn({reln[0]}, {reln[1]}, {reln[2]}, 0, 0, '{first[0]}'))"
|
||||
f"select lsn, lower, upper from page_header(get_raw_page_at_lsn({reln[0]}, {reln[1]}, {reln[2]}, 0, 0, '{first[0]}', NULL))"
|
||||
)
|
||||
direct_first = c.fetchone()
|
||||
assert first == direct_first, "Failed fetch page at historic lsn using oid"
|
||||
@@ -102,7 +109,7 @@ def test_read_validation(neon_simple_env: NeonEnv):
|
||||
)
|
||||
|
||||
c.execute(
|
||||
f"select lsn, lower, upper from page_header(get_raw_page_at_lsn({reln[0]}, {reln[1]}, {reln[2]}, 0, 0, NULL))"
|
||||
f"select lsn, lower, upper from page_header(get_raw_page_at_lsn({reln[0]}, {reln[1]}, {reln[2]}, 0, 0, NULL, NULL))"
|
||||
)
|
||||
direct_latest = c.fetchone()
|
||||
assert second == direct_latest, "Failed fetch page at latest lsn"
|
||||
@@ -114,7 +121,7 @@ def test_read_validation(neon_simple_env: NeonEnv):
|
||||
)
|
||||
|
||||
c.execute(
|
||||
f"select lsn, lower, upper from page_header(get_raw_page_at_lsn({reln[0]}, {reln[1]}, {reln[2]}, 0, 0, '{first[0]}'))"
|
||||
f"select lsn, lower, upper from page_header(get_raw_page_at_lsn({reln[0]}, {reln[1]}, {reln[2]}, 0, 0, '{first[0]}', NULL))"
|
||||
)
|
||||
direct_first = c.fetchone()
|
||||
assert first == direct_first, "Failed fetch page at historic lsn using oid"
|
||||
@@ -133,7 +140,14 @@ def test_read_validation_neg(neon_simple_env: NeonEnv):
|
||||
|
||||
env.pageserver.allowed_errors.append(".*invalid LSN\\(0\\) in request.*")
|
||||
|
||||
endpoint = env.endpoints.create_start("test_read_validation_neg")
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_read_validation_neg",
|
||||
# Use protocol version 2, because the code that constructs the V1 messages
|
||||
# assumes that a primary always wants to read the latest version of a page,
|
||||
# and therefore doesn't work with the test functions below to read an older
|
||||
# page version.
|
||||
config_lines=["neon.protocol_version=2"],
|
||||
)
|
||||
|
||||
with closing(endpoint.connect()) as con:
|
||||
with con.cursor() as c:
|
||||
@@ -143,7 +157,7 @@ def test_read_validation_neg(neon_simple_env: NeonEnv):
|
||||
log.info("read a page of a missing relation")
|
||||
try:
|
||||
c.execute(
|
||||
"select lsn, lower, upper from page_header(get_raw_page_at_lsn('Unknown', 'main', 0, '0/0'))"
|
||||
"select lsn, lower, upper from page_header(get_raw_page_at_lsn('Unknown', 'main', 0, '0/0', NULL))"
|
||||
)
|
||||
raise AssertionError("query should have failed")
|
||||
except UndefinedTable as e:
|
||||
@@ -155,7 +169,7 @@ def test_read_validation_neg(neon_simple_env: NeonEnv):
|
||||
log.info("read a page at lsn 0")
|
||||
try:
|
||||
c.execute(
|
||||
"select lsn, lower, upper from page_header(get_raw_page_at_lsn('foo', 'main', 0, '0/0'))"
|
||||
"select lsn, lower, upper from page_header(get_raw_page_at_lsn('foo', 'main', 0, '0/0', NULL))"
|
||||
)
|
||||
raise AssertionError("query should have failed")
|
||||
except IoError as e:
|
||||
@@ -164,22 +178,22 @@ def test_read_validation_neg(neon_simple_env: NeonEnv):
|
||||
log.info("Pass NULL as an input")
|
||||
expected = (None, None, None)
|
||||
c.execute(
|
||||
"select lsn, lower, upper from page_header(get_raw_page_at_lsn(NULL, 'main', 0, '0/0'))"
|
||||
"select lsn, lower, upper from page_header(get_raw_page_at_lsn(NULL, 'main', 0, '0/0', NULL))"
|
||||
)
|
||||
assert c.fetchone() == expected, "Expected null output"
|
||||
|
||||
c.execute(
|
||||
"select lsn, lower, upper from page_header(get_raw_page_at_lsn('foo', NULL, 0, '0/0'))"
|
||||
"select lsn, lower, upper from page_header(get_raw_page_at_lsn('foo', NULL, 0, '0/0', NULL))"
|
||||
)
|
||||
assert c.fetchone() == expected, "Expected null output"
|
||||
|
||||
c.execute(
|
||||
"select lsn, lower, upper from page_header(get_raw_page_at_lsn('foo', 'main', NULL, '0/0'))"
|
||||
"select lsn, lower, upper from page_header(get_raw_page_at_lsn('foo', 'main', NULL, '0/0', NULL))"
|
||||
)
|
||||
assert c.fetchone() == expected, "Expected null output"
|
||||
|
||||
# This check is currently failing, reading beyond EOF is returning a 0-page
|
||||
log.info("Read beyond EOF")
|
||||
c.execute(
|
||||
"select lsn, lower, upper from page_header(get_raw_page_at_lsn('foo', 'main', 1, NULL))"
|
||||
"select lsn, lower, upper from page_header(get_raw_page_at_lsn('foo', 'main', 1, NULL, NULL))"
|
||||
)
|
||||
|
||||
@@ -173,7 +173,9 @@ def test_vm_bit_clear_on_heap_lock(neon_env_builder: NeonEnvBuilder):
|
||||
# which changes the LSN on the page.
|
||||
cur.execute("select get_raw_page( 'vmtest_lock', 'vm', 0 )")
|
||||
vm_page_in_cache = (cur.fetchall()[0][0])[8:100].hex()
|
||||
cur.execute("select get_raw_page_at_lsn( 'vmtest_lock', 'vm', 0, pg_current_wal_insert_lsn() )")
|
||||
cur.execute(
|
||||
"select get_raw_page_at_lsn( 'vmtest_lock', 'vm', 0, pg_current_wal_insert_lsn(), NULL )"
|
||||
)
|
||||
vm_page_at_pageserver = (cur.fetchall()[0][0])[8:100].hex()
|
||||
|
||||
assert vm_page_at_pageserver == vm_page_in_cache
|
||||
|
||||
@@ -7,7 +7,9 @@ use std::{
|
||||
io::BufReader,
|
||||
};
|
||||
|
||||
use pageserver_api::models::{PagestreamFeMessage, PagestreamGetPageRequest};
|
||||
use pageserver_api::models::{
|
||||
PagestreamFeMessage, PagestreamGetPageRequest, PagestreamProtocolVersion,
|
||||
};
|
||||
use utils::id::{ConnectionId, TenantId, TimelineId};
|
||||
|
||||
use clap::{Parser, Subcommand};
|
||||
@@ -56,7 +58,7 @@ fn analyze_trace<R: std::io::Read>(mut reader: R) {
|
||||
let mut prev: Option<PagestreamGetPageRequest> = None;
|
||||
|
||||
// Compute stats
|
||||
while let Ok(msg) = PagestreamFeMessage::parse(&mut reader) {
|
||||
while let Ok(msg) = PagestreamFeMessage::parse(&mut reader, PagestreamProtocolVersion::V2) {
|
||||
match msg {
|
||||
PagestreamFeMessage::Exists(_) => {}
|
||||
PagestreamFeMessage::Nblocks(_) => {}
|
||||
@@ -89,7 +91,7 @@ fn analyze_trace<R: std::io::Read>(mut reader: R) {
|
||||
}
|
||||
|
||||
fn dump_trace<R: std::io::Read>(mut reader: R) {
|
||||
while let Ok(msg) = PagestreamFeMessage::parse(&mut reader) {
|
||||
while let Ok(msg) = PagestreamFeMessage::parse(&mut reader, PagestreamProtocolVersion::V2) {
|
||||
println!("{msg:?}");
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user