Compare commits

...

28 Commits

Author SHA1 Message Date
Arpad Müller
548b28b28b WIP remove generic argument from safekeeper 2024-04-28 01:32:12 +02:00
Arpad Müller
f9039f7d73 Remove async_trait again 2024-04-28 01:15:07 +02:00
Arpad Müller
2a7bc782fd Remove generic IO argument 2024-04-28 01:11:59 +02:00
Arpad Müller
307d10b111 Add HandlerSync trait 2024-04-28 01:01:21 +02:00
Arpad Müller
5546c0de35 Same for SafekeeperPostgresHandler 2024-04-28 00:36:45 +02:00
Arpad Müller
d1fb4a4a00 Move PageServerHandler process_query content 2024-04-28 00:30:46 +02:00
Christian Schwarz
bf369f4268 refactor(owned_buffer_io::util::size_tracking_writer): make generic over underlying writer (#7483)
part of https://github.com/neondatabase/neon/issues/7124
2024-04-26 09:19:41 +00:00
Christian Schwarz
70f4a16a05 refactor(owned_buffers_io::BufferedWriter): be generic over the type of buffer (#7482) 2024-04-26 08:30:20 +00:00
John Spray
d63185fa6c storage controller: log hygiene & better error type (#7508)
These are testability/logging improvements spun off from #7475

- Don't log warnings for shutdown errors in compute hook
- Revise logging around heartbeats and reconcile_all so that we aren't
emitting such a large volume of INFO messages under normal quite
conditions.
- Clean up the `last_error` of TenantShard to hold a ReconcileError
instead of a String, and use that properly typed error to suppress
reconciler cancel errors during reconcile_all_now. This is important for
tests that iteratively call that, as otherwise they would get 500 errors
when some reconciler in flight was cancelled (perhaps due to a state
change on the tenant shard starting a new reconciler).
2024-04-26 08:15:59 +00:00
Heikki Linnakangas
ca8fca0e9f Add test to demonstrate the problem with protocol version 1 (#7377) 2024-04-25 20:45:37 +03:00
Heikki Linnakangas
0397427dcf Add test for SLRU download (#7377)
Before PR #7377, on-demand SLRU download always used the basebackup's
LSN in the SLRU download, but that LSN might get garbage-collected away
in the pageserver. We should request the latest LSN, like with GetPage
requests, with the LSN just indicating that we know that the page hasn't
been changed since the LSN (since the basebackup in this case).

Add test to demonstrate the problem. Without the fix, it fails with
"tried to request a page version that was garbage collected" error from
the pageserver.

I wrote this test as part of earlier PR #6693, but that fell through
the cracks and was never applied. PR #7377 superseded the fix from
that older PR, but the test is still valid.
2024-04-25 20:45:37 +03:00
Heikki Linnakangas
a2a44ea213 Refactor how the request LSNs are tracked in compute (#7377)
Instead of thinking in terms of 'latest' and 'lsn' of the request,
each request has two LSNs: the request LSN and 'not_modified_since'
LSN. The request is nominally made at the request LSN, that determines
what page version we want to see. But as a hint, we also include
'not_modified_since'. It tells the pageserver that the page has not
been modified since that LSN, which allows the pageserver to skip
waiting for newer WAL to arrive, and could allow more optimizations in
the future.

Refactor the internal functions to calculate the request LSN to
calculate both LSNs.

Sending two LSNs to the pageserver requires using the new protocol
version 2. The previous commit added the server support for it, but we
still default to the old protocol for compatibility with old
pageservers. The 'neon.protocol_version' GUC can be used to use the
new protocol.

The new protocol addresses one cause of issue #6211, although you can
still get the same error if you have a standby that is lagging behind
so that the page version it needs is genuinely GC'd away.
2024-04-25 20:45:37 +03:00
Heikki Linnakangas
4917f52c88 Server support for new pagestream protocol version (#7377)
In the old protocol version, the client sent with each request:

- latest: bool. If true, the client requested the latest page
  version, and the 'lsn' was just a hint of when the page was last
  modified
- lsn: Lsn, the page version to return

This protocol didn't allow requesting a page at a particular
non-latest LSN and *also* sending a hint on when the page was last
modified. That put a read only compute into an awkward position where
it had to either request each page at the replay-LSN, which could be
very close to the last LSN written in the primary and therefore
require the pageserver to wait for it to arrive, or an older LSN which
could already be garbage collected in the pageserver, resulting in an
error. The new protocol version fixes that by allowing a read only
compute to send both LSNs.

To use the new protocol version, use "pagestream_v2" command instead
of just "pagestream". The old protocol version is still supported, for
compatibility with old computes (and in fact there is no client
support yet, it is added by the next commit).
2024-04-25 20:45:37 +03:00
Heikki Linnakangas
04a682021f Remove the now-unused 'latest' arguments (#7377)
The 'latest' argument was passed to the functions in
pgdatadir_mapping.rs to know when they can update the relsize
cache. Commit e69ff3fc00 changed how the relsize cache is updated,
making the 'latest' argument unused.
2024-04-25 20:45:37 +03:00
Alex Chi Z
c59abedd85 chore(pageserver): temporary metrics on ingestion time (#7515)
As a follow-up on https://github.com/neondatabase/neon/pull/7467, also
measure the ingestion operation speed.

Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-04-25 16:39:27 +00:00
Anna Khanova
5357f40183 proxy: Workaround switch to the regional redis (#7513)
## Problem

Start switching from the global redis to the regional one

## Summary of changes

* Publish cancellations to the regional redis
* Listen notifications from both: global and regional
2024-04-25 15:26:18 +00:00
Vlad Lazar
e4a279db13 pageserver: coalesce read paths (#7477)
## Problem
We are currently supporting two read paths. No bueno.

## Summary of changes
High level: use vectored read path to serve get page requests - gated by
`get_impl` config
Low level:
1. Add ps config, `get_impl` to specify which read path to use when
serving get page requests
2. Fix base cached image handling for the vectored read path. This was
subtly broken: previously we
would not mark keys that went past their cached lsn as complete. This is
a self standing change which
could be its own PR, but I've included it here because writing separate
tests for it is tricky.
3. Fork get page to use either the legacy or vectored implementation 
4. Validate the use of vectored read path when serving get page requests
against the legacy implementation.
Controlled by `validate_vectored_get` ps config.
5. Use the vectored read path to serve get page requests in tests (with
validation).

## Note
Since the vectored read path does not go through the page cache to read
buffers, this change also amounts to a removal of the buffer page cache. Materialized page cache
is still used.
2024-04-25 13:29:17 +01:00
Anna Khanova
b1d47f3911 proxy: Fix cancellations (#7510)
## Problem

Cancellations were published to the channel, that was never read.

## Summary of changes

Fallback to global redis publishing.
2024-04-25 11:38:51 +00:00
Anna Khanova
a3d62b31bb Update connect to compute and wake compute retry configs (#7509)
## Problem

## Summary of changes

Decrease waiting time
2024-04-25 11:16:27 +00:00
Conrad Ludgate
cdccab4bd9 reduce complexity of proxy protocol parse (#7078)
## Problem

The `WithClientIp` AsyncRead/Write abstraction never filled me with much
joy. I would just rather read the protocol header once and then get the
remaining buf and reader.

## Summary of changes

* Replace `WithClientIp::wait_for_addr` with `read_proxy_protocol`.
* Replace `WithClientIp` with `ChainRW`.
* Optimise `ChainRW` to make the standard path more optimal.
2024-04-25 11:14:04 +01:00
John Spray
e8814b6f81 controller: limit Reconciler concurrency (#7493)
## Problem

Storage controller memory can spike very high if we have many tenants
and they all try to reconcile at the same time.

Related:
- https://github.com/neondatabase/neon/issues/7463
- https://github.com/neondatabase/neon/issues/7460

Not closing those issues in this PR, because the test coverage for them
will be in https://github.com/neondatabase/neon/pull/7475

## Summary of changes

- Add a CLI arg `--reconciler-concurrency`, defaulted to 128
- Add a semaphore to Service with this many units
- In `maybe_reconcile_shard`, try to acquire semaphore unit. If we can't
get one, return a ReconcileWaiter for a future sequence number, and push
the TenantShardId onto a channel of delayed IDs.
- In `process_result`, consume from the channel of delayed IDs if there
are semaphore units available and call maybe_reconcile_shard again for
these delayed shards.

This has been tested in https://github.com/neondatabase/neon/pull/7475,
but will land that PR separately because it contains other changes &
needs the test stabilizing. This change is worth merging sooner, because
it fixes a practical issue with larger shard counts.
2024-04-25 10:46:07 +01:00
Arpad Müller
c18d3340b5 Ability to specify the upload_storage_class in S3 bucket configuration (#7461)
Currently we move data to the intended storage class via lifecycle
rules, but those are a daily batch job so data first spends up to a day
in standard storage.

Therefore, make it possible to specify the storage class used for
uploads to S3 so that the data doesn't have to be migrated
automatically.

The advantage of this is that it gives cleaner billing reports.

Part of https://github.com/neondatabase/cloud/issues/11348
2024-04-24 18:48:25 +02:00
Alex Chi Z
447a063f3c fix(metrics): correct maxrss metrics on macos (#7487)
macOS max_rss is in bytes, while Linux is in kilobytes.
https://stackoverflow.com/a/59915669

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-04-24 15:09:23 +00:00
Vlad Lazar
c12861cccd pageserver: finish vectored get early (#7490)
## Problem
If the previous step of the vectored left no further keyspace to
investigate (i.e. keyspace remains empty after removing keys completed in the previous step),
then we'd still grab the layers lock, potentially add an in-mem layer to the fringe
and at some further point read its index without reading any values from it.

## Summary of changes
If there's nothing left in the current keyspace, then skip the search
and just select the next item from the fringe as usual.

When running `test_pg_regress[release-pg16]` with the vectored read path
for singular gets this improved perf drastically (see PR cover letter).

## Correctness
Since no keys remained from the previous range (i.e. we are on a leaf
node) there's nothing that search can find in deeper nodes.
2024-04-24 15:36:23 +01:00
Vlad Lazar
2a3a8ee31d pageserver: publish the same metrics from both read paths (#7486)
## Problem
Vectored and non-vectored read paths don't publish the same set of
metrics. Metrics parity is needed for coalescing the read paths.

## Summary of changes
* Publish reconstruct time and fetching data for reconstruct time from
the vectored read path
* Remove pageserver_getpage_reconstruct_seconds{res="err"} - wasn't used
anyway
2024-04-24 13:52:46 +00:00
Anna Khanova
5dda371c2b Fix a bug with retries (#7494)
## Problem

## Summary of changes

By default, it's 5s retry.
2024-04-24 14:13:18 +01:00
Joonas Koivunen
a60035b23a fix: avoid starving background task permits in eviction task (#7471)
As seen with a recent incident, eviction tasks can cause pageserver-wide
permit starvation on the background task semaphore when synthetic size
calculation takes a long time for a tenant that has more than our permit
number of timelines or multiple tenants that have slow synthetic size
and total number of timelines exceeds the permits. Metric links can be
found in the internal [slack thread].

As a solution, release the permit while waiting for the state guarding
the synthetic size calculation. This will most likely hurt the eviction
task eviction performance, but that does not matter because we are
hoping to get away from it using OnlyImitiate policy anyway and rely
solely on disk usage-based eviction.

[slack thread]:
https://neondb.slack.com/archives/C06UEMLK7FE/p1713810505587809?thread_ts=1713468604.508969&cid=C06UEMLK7FE
2024-04-24 11:38:59 +03:00
Arpad Müller
18fd73d84a get_lsn_by_timestamp: clamp commit_lsn to be >= min_lsn (#7488)
There was an edge case where
`get_lsn_by_timestamp`/`find_lsn_for_timestamp` could have returned an
lsn that is before the limits we enforce: when we did find SLRU entries
with timestamps before the one we search for.

The API contract of `get_lsn_by_timestamp` is to not return something
before the anchestor lsn.

cc https://neondb.slack.com/archives/C03F5SM1N02/p1713871064147029
2024-04-24 00:46:48 +02:00
64 changed files with 2116 additions and 1011 deletions

View File

@@ -477,6 +477,7 @@ jobs:
BUILD_TAG: ${{ needs.tag.outputs.build-tag }}
PAGESERVER_VIRTUAL_FILE_IO_ENGINE: tokio-epoll-uring
PAGESERVER_GET_VECTORED_IMPL: vectored
PAGESERVER_GET_IMPL: vectored
# Temporary disable this step until we figure out why it's so flaky
# Ref https://github.com/neondatabase/neon/issues/4540

View File

@@ -129,6 +129,7 @@ pub struct PageServerConf {
pub(crate) virtual_file_io_engine: Option<String>,
pub(crate) get_vectored_impl: Option<String>,
pub(crate) get_impl: Option<String>,
}
impl Default for PageServerConf {
@@ -141,6 +142,7 @@ impl Default for PageServerConf {
http_auth_type: AuthType::Trust,
virtual_file_io_engine: None,
get_vectored_impl: None,
get_impl: None,
}
}
}

View File

@@ -92,6 +92,7 @@ impl PageServerNode {
http_auth_type,
virtual_file_io_engine,
get_vectored_impl,
get_impl,
} = &self.conf;
let id = format!("id={}", id);
@@ -111,6 +112,11 @@ impl PageServerNode {
} else {
String::new()
};
let get_impl = if let Some(get_impl) = get_impl {
format!("get_impl='{get_impl}'")
} else {
String::new()
};
let broker_endpoint_param = format!("broker_endpoint='{}'", self.env.broker.client_url());
@@ -124,6 +130,7 @@ impl PageServerNode {
broker_endpoint_param,
virtual_file_io_engine,
get_vectored_impl,
get_impl,
];
if let Some(control_plane_api) = &self.env.control_plane_api {

View File

@@ -256,7 +256,16 @@ fn update_rusage_metrics() {
DISK_IO_BYTES
.with_label_values(&["write"])
.set(rusage_stats.ru_oublock * BYTES_IN_BLOCK);
MAXRSS_KB.set(rusage_stats.ru_maxrss);
// On macOS, the unit of maxrss is bytes; on Linux, it's kilobytes. https://stackoverflow.com/a/59915669
#[cfg(target_os = "macos")]
{
MAXRSS_KB.set(rusage_stats.ru_maxrss / 1024);
}
#[cfg(not(target_os = "macos"))]
{
MAXRSS_KB.set(rusage_stats.ru_maxrss);
}
}
fn get_rusage_stats() -> libc::rusage {

View File

@@ -162,6 +162,10 @@ impl KeySpace {
.sum()
}
pub fn is_empty(&self) -> bool {
self.total_size() == 0
}
fn overlaps_at(&self, range: &Range<Key>) -> Option<usize> {
match self.ranges.binary_search_by_key(&range.end, |r| r.start) {
Ok(0) => None,
@@ -178,6 +182,11 @@ impl KeySpace {
pub fn overlaps(&self, range: &Range<Key>) -> bool {
self.overlaps_at(range).is_some()
}
/// Check if the keyspace contains a key
pub fn contains(&self, key: &Key) -> bool {
self.overlaps(&(*key..key.next()))
}
}
///

View File

@@ -848,39 +848,72 @@ impl TryFrom<u8> for PagestreamBeMessageTag {
}
}
// In the V2 protocol version, a GetPage request contains two LSN values:
//
// request_lsn: Get the page version at this point in time. Lsn::Max is a special value that means
// "get the latest version present". It's used by the primary server, which knows that no one else
// is writing WAL. 'not_modified_since' must be set to a proper value even if request_lsn is
// Lsn::Max. Standby servers use the current replay LSN as the request LSN.
//
// not_modified_since: Hint to the pageserver that the client knows that the page has not been
// modified between 'not_modified_since' and the request LSN. It's always correct to set
// 'not_modified_since equal' to 'request_lsn' (unless Lsn::Max is used as the 'request_lsn'), but
// passing an earlier LSN can speed up the request, by allowing the pageserver to process the
// request without waiting for 'request_lsn' to arrive.
//
// The legacy V1 interface contained only one LSN, and a boolean 'latest' flag. The V1 interface was
// sufficient for the primary; the 'lsn' was equivalent to the 'not_modified_since' value, and
// 'latest' was set to true. The V2 interface was added because there was no correct way for a
// standby to request a page at a particular non-latest LSN, and also include the
// 'not_modified_since' hint. That led to an awkward choice of either using an old LSN in the
// request, if the standby knows that the page hasn't been modified since, and risk getting an error
// if that LSN has fallen behind the GC horizon, or requesting the current replay LSN, which could
// require the pageserver unnecessarily to wait for the WAL to arrive up to that point. The new V2
// interface allows sending both LSNs, and let the pageserver do the right thing. There is no
// difference in the responses between V1 and V2.
//
// The Request structs below reflect the V2 interface. If V1 is used, the parse function
// maps the old format requests to the new format.
//
#[derive(Clone, Copy)]
pub enum PagestreamProtocolVersion {
V1,
V2,
}
#[derive(Debug, PartialEq, Eq)]
pub struct PagestreamExistsRequest {
pub latest: bool,
pub lsn: Lsn,
pub request_lsn: Lsn,
pub not_modified_since: Lsn,
pub rel: RelTag,
}
#[derive(Debug, PartialEq, Eq)]
pub struct PagestreamNblocksRequest {
pub latest: bool,
pub lsn: Lsn,
pub request_lsn: Lsn,
pub not_modified_since: Lsn,
pub rel: RelTag,
}
#[derive(Debug, PartialEq, Eq)]
pub struct PagestreamGetPageRequest {
pub latest: bool,
pub lsn: Lsn,
pub request_lsn: Lsn,
pub not_modified_since: Lsn,
pub rel: RelTag,
pub blkno: u32,
}
#[derive(Debug, PartialEq, Eq)]
pub struct PagestreamDbSizeRequest {
pub latest: bool,
pub lsn: Lsn,
pub request_lsn: Lsn,
pub not_modified_since: Lsn,
pub dbnode: u32,
}
#[derive(Debug, PartialEq, Eq)]
pub struct PagestreamGetSlruSegmentRequest {
pub latest: bool,
pub lsn: Lsn,
pub request_lsn: Lsn,
pub not_modified_since: Lsn,
pub kind: u8,
pub segno: u32,
}
@@ -927,14 +960,16 @@ pub struct TenantHistorySize {
}
impl PagestreamFeMessage {
/// Serialize a compute -> pageserver message. This is currently only used in testing
/// tools. Always uses protocol version 2.
pub fn serialize(&self) -> Bytes {
let mut bytes = BytesMut::new();
match self {
Self::Exists(req) => {
bytes.put_u8(0);
bytes.put_u8(u8::from(req.latest));
bytes.put_u64(req.lsn.0);
bytes.put_u64(req.request_lsn.0);
bytes.put_u64(req.not_modified_since.0);
bytes.put_u32(req.rel.spcnode);
bytes.put_u32(req.rel.dbnode);
bytes.put_u32(req.rel.relnode);
@@ -943,8 +978,8 @@ impl PagestreamFeMessage {
Self::Nblocks(req) => {
bytes.put_u8(1);
bytes.put_u8(u8::from(req.latest));
bytes.put_u64(req.lsn.0);
bytes.put_u64(req.request_lsn.0);
bytes.put_u64(req.not_modified_since.0);
bytes.put_u32(req.rel.spcnode);
bytes.put_u32(req.rel.dbnode);
bytes.put_u32(req.rel.relnode);
@@ -953,8 +988,8 @@ impl PagestreamFeMessage {
Self::GetPage(req) => {
bytes.put_u8(2);
bytes.put_u8(u8::from(req.latest));
bytes.put_u64(req.lsn.0);
bytes.put_u64(req.request_lsn.0);
bytes.put_u64(req.not_modified_since.0);
bytes.put_u32(req.rel.spcnode);
bytes.put_u32(req.rel.dbnode);
bytes.put_u32(req.rel.relnode);
@@ -964,15 +999,15 @@ impl PagestreamFeMessage {
Self::DbSize(req) => {
bytes.put_u8(3);
bytes.put_u8(u8::from(req.latest));
bytes.put_u64(req.lsn.0);
bytes.put_u64(req.request_lsn.0);
bytes.put_u64(req.not_modified_since.0);
bytes.put_u32(req.dbnode);
}
Self::GetSlruSegment(req) => {
bytes.put_u8(4);
bytes.put_u8(u8::from(req.latest));
bytes.put_u64(req.lsn.0);
bytes.put_u64(req.request_lsn.0);
bytes.put_u64(req.not_modified_since.0);
bytes.put_u8(req.kind);
bytes.put_u32(req.segno);
}
@@ -981,18 +1016,40 @@ impl PagestreamFeMessage {
bytes.into()
}
pub fn parse<R: std::io::Read>(body: &mut R) -> anyhow::Result<PagestreamFeMessage> {
// TODO these gets can fail
pub fn parse<R: std::io::Read>(
body: &mut R,
protocol_version: PagestreamProtocolVersion,
) -> anyhow::Result<PagestreamFeMessage> {
// these correspond to the NeonMessageTag enum in pagestore_client.h
//
// TODO: consider using protobuf or serde bincode for less error prone
// serialization.
let msg_tag = body.read_u8()?;
let (request_lsn, not_modified_since) = match protocol_version {
PagestreamProtocolVersion::V2 => (
Lsn::from(body.read_u64::<BigEndian>()?),
Lsn::from(body.read_u64::<BigEndian>()?),
),
PagestreamProtocolVersion::V1 => {
// In the old protocol, each message starts with a boolean 'latest' flag,
// followed by 'lsn'. Convert that to the two LSNs, 'request_lsn' and
// 'not_modified_since', used in the new protocol version.
let latest = body.read_u8()? != 0;
let request_lsn = Lsn::from(body.read_u64::<BigEndian>()?);
if latest {
(Lsn::MAX, request_lsn) // get latest version
} else {
(request_lsn, request_lsn) // get version at specified LSN
}
}
};
// The rest of the messages are the same between V1 and V2
match msg_tag {
0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
latest: body.read_u8()? != 0,
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
request_lsn,
not_modified_since,
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
@@ -1001,8 +1058,8 @@ impl PagestreamFeMessage {
},
})),
1 => Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
latest: body.read_u8()? != 0,
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
request_lsn,
not_modified_since,
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
@@ -1011,8 +1068,8 @@ impl PagestreamFeMessage {
},
})),
2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
latest: body.read_u8()? != 0,
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
request_lsn,
not_modified_since,
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
@@ -1022,14 +1079,14 @@ impl PagestreamFeMessage {
blkno: body.read_u32::<BigEndian>()?,
})),
3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
latest: body.read_u8()? != 0,
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
request_lsn,
not_modified_since,
dbnode: body.read_u32::<BigEndian>()?,
})),
4 => Ok(PagestreamFeMessage::GetSlruSegment(
PagestreamGetSlruSegmentRequest {
latest: body.read_u8()? != 0,
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
request_lsn,
not_modified_since,
kind: body.read_u8()?,
segno: body.read_u32::<BigEndian>()?,
},
@@ -1157,8 +1214,8 @@ mod tests {
// Test serialization/deserialization of PagestreamFeMessage
let messages = vec![
PagestreamFeMessage::Exists(PagestreamExistsRequest {
latest: true,
lsn: Lsn(4),
request_lsn: Lsn(4),
not_modified_since: Lsn(3),
rel: RelTag {
forknum: 1,
spcnode: 2,
@@ -1167,8 +1224,8 @@ mod tests {
},
}),
PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
latest: false,
lsn: Lsn(4),
request_lsn: Lsn(4),
not_modified_since: Lsn(4),
rel: RelTag {
forknum: 1,
spcnode: 2,
@@ -1177,8 +1234,8 @@ mod tests {
},
}),
PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
latest: true,
lsn: Lsn(4),
request_lsn: Lsn(4),
not_modified_since: Lsn(3),
rel: RelTag {
forknum: 1,
spcnode: 2,
@@ -1188,14 +1245,16 @@ mod tests {
blkno: 7,
}),
PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
latest: true,
lsn: Lsn(4),
request_lsn: Lsn(4),
not_modified_since: Lsn(3),
dbnode: 7,
}),
];
for msg in messages {
let bytes = msg.serialize();
let reconstructed = PagestreamFeMessage::parse(&mut bytes.reader()).unwrap();
let reconstructed =
PagestreamFeMessage::parse(&mut bytes.reader(), PagestreamProtocolVersion::V2)
.unwrap();
assert!(msg == reconstructed);
}
}

View File

@@ -78,18 +78,18 @@ pub fn is_expected_io_error(e: &io::Error) -> bool {
)
}
#[async_trait::async_trait]
pub trait Handler<IO> {
pub trait Handler<IO>: HandlerSync<IO> {
/// Handle single query.
/// postgres_backend will issue ReadyForQuery after calling this (this
/// might be not what we want after CopyData streaming, but currently we don't
/// care). It will also flush out the output buffer.
async fn process_query(
fn process_query(
&mut self,
pgb: &mut PostgresBackend<IO>,
query_string: &str,
) -> Result<(), QueryError>;
) -> impl Future<Output = Result<(), QueryError>> + Send;
}
pub trait HandlerSync<IO> {
/// Called on startup packet receival, allows to process params.
///
/// If Ok(false) is returned postgres_backend will skip auth -- that is needed for new users

View File

@@ -21,11 +21,13 @@ use std::{
fmt::Debug,
num::{NonZeroU32, NonZeroUsize},
pin::Pin,
str::FromStr,
sync::Arc,
time::{Duration, SystemTime},
};
use anyhow::{bail, Context};
use aws_sdk_s3::types::StorageClass;
use camino::{Utf8Path, Utf8PathBuf};
use bytes::Bytes;
@@ -563,6 +565,7 @@ pub struct S3Config {
/// See [`DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT`] for more details.
pub concurrency_limit: NonZeroUsize,
pub max_keys_per_list_response: Option<i32>,
pub upload_storage_class: Option<StorageClass>,
}
impl Debug for S3Config {
@@ -691,6 +694,18 @@ impl RemoteStorageConfig {
endpoint,
concurrency_limit,
max_keys_per_list_response,
upload_storage_class: toml
.get("upload_storage_class")
.map(|prefix_in_bucket| -> anyhow::Result<_> {
let s = parse_toml_string("upload_storage_class", prefix_in_bucket)?;
let storage_class = StorageClass::from_str(&s).expect("infallible");
#[allow(deprecated)]
if matches!(storage_class, StorageClass::Unknown(_)) {
bail!("Specified storage class unknown to SDK: '{s}'. Allowed values: {:?}", StorageClass::values());
}
Ok(storage_class)
})
.transpose()?,
})
}
(_, _, _, Some(_), None) => {

View File

@@ -30,7 +30,7 @@ use aws_sdk_s3::{
config::{AsyncSleep, Builder, IdentityCache, Region, SharedAsyncSleep},
error::SdkError,
operation::get_object::GetObjectError,
types::{Delete, DeleteMarkerEntry, ObjectIdentifier, ObjectVersion},
types::{Delete, DeleteMarkerEntry, ObjectIdentifier, ObjectVersion, StorageClass},
Client,
};
use aws_smithy_async::rt::sleep::TokioSleep;
@@ -62,6 +62,7 @@ pub struct S3Bucket {
bucket_name: String,
prefix_in_bucket: Option<String>,
max_keys_per_list_response: Option<i32>,
upload_storage_class: Option<StorageClass>,
concurrency_limiter: ConcurrencyLimiter,
// Per-request timeout. Accessible for tests.
pub timeout: Duration,
@@ -154,6 +155,7 @@ impl S3Bucket {
max_keys_per_list_response: aws_config.max_keys_per_list_response,
prefix_in_bucket,
concurrency_limiter: ConcurrencyLimiter::new(aws_config.concurrency_limit.get()),
upload_storage_class: aws_config.upload_storage_class.clone(),
timeout,
})
}
@@ -582,6 +584,7 @@ impl RemoteStorage for S3Bucket {
.bucket(self.bucket_name.clone())
.key(self.relative_path_to_s3_object(to))
.set_metadata(metadata.map(|m| m.0))
.set_storage_class(self.upload_storage_class.clone())
.content_length(from_size_bytes.try_into()?)
.body(bytes_stream)
.send();
@@ -633,6 +636,7 @@ impl RemoteStorage for S3Bucket {
.copy_object()
.bucket(self.bucket_name.clone())
.key(self.relative_path_to_s3_object(to))
.set_storage_class(self.upload_storage_class.clone())
.copy_source(copy_source)
.send();
@@ -890,6 +894,7 @@ impl RemoteStorage for S3Bucket {
.copy_object()
.bucket(self.bucket_name.clone())
.key(key)
.set_storage_class(self.upload_storage_class.clone())
.copy_source(&source_id)
.send();
@@ -1073,6 +1078,7 @@ mod tests {
endpoint: None,
concurrency_limit: NonZeroUsize::new(100).unwrap(),
max_keys_per_list_response: Some(5),
upload_storage_class: None,
};
let storage =
S3Bucket::new(&config, std::time::Duration::ZERO).expect("remote storage init");

View File

@@ -380,6 +380,7 @@ fn create_s3_client(
endpoint: None,
concurrency_limit: NonZeroUsize::new(100).unwrap(),
max_keys_per_list_response,
upload_storage_class: None,
}),
timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
};

View File

@@ -60,7 +60,7 @@ impl Client {
) -> anyhow::Result<PagestreamClient> {
let copy_both: tokio_postgres::CopyBothDuplex<bytes::Bytes> = self
.client
.copy_both_simple(&format!("pagestream {tenant_id} {timeline_id}"))
.copy_both_simple(&format!("pagestream_v2 {tenant_id} {timeline_id}"))
.await?;
let Client {
cancel_on_client_drop,

View File

@@ -312,8 +312,12 @@ async fn main_impl(
let (rel_tag, block_no) =
key_to_rel_block(key).expect("we filter non-rel-block keys out above");
PagestreamGetPageRequest {
latest: rng.gen_bool(args.req_latest_probability),
lsn: r.timeline_lsn,
request_lsn: if rng.gen_bool(args.req_latest_probability) {
Lsn::MAX
} else {
r.timeline_lsn
},
not_modified_since: r.timeline_lsn,
rel: rel_tag,
blkno: block_no,
}

View File

@@ -376,7 +376,7 @@ where
async fn add_rel(&mut self, src: RelTag, dst: RelTag) -> anyhow::Result<()> {
let nblocks = self
.timeline
.get_rel_size(src, Version::Lsn(self.lsn), false, self.ctx)
.get_rel_size(src, Version::Lsn(self.lsn), self.ctx)
.await?;
// If the relation is empty, create an empty file
@@ -397,7 +397,7 @@ where
for blknum in startblk..endblk {
let img = self
.timeline
.get_rel_page_at_lsn(src, blknum, Version::Lsn(self.lsn), false, self.ctx)
.get_rel_page_at_lsn(src, blknum, Version::Lsn(self.lsn), self.ctx)
.await?;
segment_data.extend_from_slice(&img[..]);
}

View File

@@ -121,8 +121,10 @@ fn main() -> anyhow::Result<()> {
&[("node_id", &conf.id.to_string())],
);
// after setting up logging, log the effective IO engine choice
// after setting up logging, log the effective IO engine choice and read path implementations
info!(?conf.virtual_file_io_engine, "starting with virtual_file IO engine");
info!(?conf.get_impl, "starting with get page implementation");
info!(?conf.get_vectored_impl, "starting with vectored get page implementation");
let tenants_path = conf.tenants_path();
if !tenants_path.exists() {

View File

@@ -30,9 +30,9 @@ use utils::{
logging::LogFormat,
};
use crate::tenant::config::TenantConfOpt;
use crate::tenant::timeline::GetVectoredImpl;
use crate::tenant::vectored_blob_io::MaxVectoredReadBytes;
use crate::tenant::{config::TenantConfOpt, timeline::GetImpl};
use crate::tenant::{
TENANTS_SEGMENT_NAME, TENANT_DELETED_MARKER_FILE_NAME, TIMELINES_SEGMENT_NAME,
};
@@ -91,6 +91,8 @@ pub mod defaults {
pub const DEFAULT_GET_VECTORED_IMPL: &str = "sequential";
pub const DEFAULT_GET_IMPL: &str = "legacy";
pub const DEFAULT_MAX_VECTORED_READ_BYTES: usize = 128 * 1024; // 128 KiB
pub const DEFAULT_VALIDATE_VECTORED_GET: bool = true;
@@ -138,6 +140,8 @@ pub mod defaults {
#get_vectored_impl = '{DEFAULT_GET_VECTORED_IMPL}'
#get_impl = '{DEFAULT_GET_IMPL}'
#max_vectored_read_bytes = '{DEFAULT_MAX_VECTORED_READ_BYTES}'
#validate_vectored_get = '{DEFAULT_VALIDATE_VECTORED_GET}'
@@ -284,6 +288,8 @@ pub struct PageServerConf {
pub get_vectored_impl: GetVectoredImpl,
pub get_impl: GetImpl,
pub max_vectored_read_bytes: MaxVectoredReadBytes,
pub validate_vectored_get: bool,
@@ -414,6 +420,8 @@ struct PageServerConfigBuilder {
get_vectored_impl: BuilderValue<GetVectoredImpl>,
get_impl: BuilderValue<GetImpl>,
max_vectored_read_bytes: BuilderValue<MaxVectoredReadBytes>,
validate_vectored_get: BuilderValue<bool>,
@@ -503,6 +511,7 @@ impl PageServerConfigBuilder {
virtual_file_io_engine: Set(DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap()),
get_vectored_impl: Set(DEFAULT_GET_VECTORED_IMPL.parse().unwrap()),
get_impl: Set(DEFAULT_GET_IMPL.parse().unwrap()),
max_vectored_read_bytes: Set(MaxVectoredReadBytes(
NonZeroUsize::new(DEFAULT_MAX_VECTORED_READ_BYTES).unwrap(),
)),
@@ -681,6 +690,10 @@ impl PageServerConfigBuilder {
self.get_vectored_impl = BuilderValue::Set(value);
}
pub fn get_impl(&mut self, value: GetImpl) {
self.get_impl = BuilderValue::Set(value);
}
pub fn get_max_vectored_read_bytes(&mut self, value: MaxVectoredReadBytes) {
self.max_vectored_read_bytes = BuilderValue::Set(value);
}
@@ -750,6 +763,7 @@ impl PageServerConfigBuilder {
secondary_download_concurrency,
ingest_batch_size,
get_vectored_impl,
get_impl,
max_vectored_read_bytes,
validate_vectored_get,
ephemeral_bytes_per_memory_kb,
@@ -1035,6 +1049,9 @@ impl PageServerConf {
"get_vectored_impl" => {
builder.get_vectored_impl(parse_toml_from_str("get_vectored_impl", item)?)
}
"get_impl" => {
builder.get_impl(parse_toml_from_str("get_impl", item)?)
}
"max_vectored_read_bytes" => {
let bytes = parse_toml_u64("max_vectored_read_bytes", item)? as usize;
builder.get_max_vectored_read_bytes(
@@ -1126,6 +1143,7 @@ impl PageServerConf {
ingest_batch_size: defaults::DEFAULT_INGEST_BATCH_SIZE,
virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(),
get_vectored_impl: defaults::DEFAULT_GET_VECTORED_IMPL.parse().unwrap(),
get_impl: defaults::DEFAULT_GET_IMPL.parse().unwrap(),
max_vectored_read_bytes: MaxVectoredReadBytes(
NonZeroUsize::new(defaults::DEFAULT_MAX_VECTORED_READ_BYTES)
.expect("Invalid default constant"),
@@ -1365,6 +1383,7 @@ background_task_maximum_delay = '334 s'
ingest_batch_size: defaults::DEFAULT_INGEST_BATCH_SIZE,
virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(),
get_vectored_impl: defaults::DEFAULT_GET_VECTORED_IMPL.parse().unwrap(),
get_impl: defaults::DEFAULT_GET_IMPL.parse().unwrap(),
max_vectored_read_bytes: MaxVectoredReadBytes(
NonZeroUsize::new(defaults::DEFAULT_MAX_VECTORED_READ_BYTES)
.expect("Invalid default constant")
@@ -1438,6 +1457,7 @@ background_task_maximum_delay = '334 s'
ingest_batch_size: 100,
virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(),
get_vectored_impl: defaults::DEFAULT_GET_VECTORED_IMPL.parse().unwrap(),
get_impl: defaults::DEFAULT_GET_IMPL.parse().unwrap(),
max_vectored_read_bytes: MaxVectoredReadBytes(
NonZeroUsize::new(defaults::DEFAULT_MAX_VECTORED_READ_BYTES)
.expect("Invalid default constant")
@@ -1557,6 +1577,7 @@ broker_endpoint = '{broker_endpoint}'
endpoint: Some(endpoint.clone()),
concurrency_limit: s3_concurrency_limit,
max_keys_per_list_response: None,
upload_storage_class: None,
}),
timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
},

View File

@@ -105,31 +105,39 @@ pub(crate) static VEC_READ_NUM_LAYERS_VISITED: Lazy<Histogram> = Lazy::new(|| {
});
// Metrics collected on operations on the storage repository.
#[derive(
Clone, Copy, enum_map::Enum, strum_macros::EnumString, strum_macros::Display, IntoStaticStr,
)]
pub(crate) enum GetKind {
Singular,
Vectored,
}
pub(crate) struct ReconstructTimeMetrics {
ok: Histogram,
err: Histogram,
singular: Histogram,
vectored: Histogram,
}
pub(crate) static RECONSTRUCT_TIME: Lazy<ReconstructTimeMetrics> = Lazy::new(|| {
let inner = register_histogram_vec!(
"pageserver_getpage_reconstruct_seconds",
"Time spent in reconstruct_value (reconstruct a page from deltas)",
&["result"],
&["get_kind"],
CRITICAL_OP_BUCKETS.into(),
)
.expect("failed to define a metric");
ReconstructTimeMetrics {
ok: inner.get_metric_with_label_values(&["ok"]).unwrap(),
err: inner.get_metric_with_label_values(&["err"]).unwrap(),
singular: inner.with_label_values(&[GetKind::Singular.into()]),
vectored: inner.with_label_values(&[GetKind::Vectored.into()]),
}
});
impl ReconstructTimeMetrics {
pub(crate) fn for_result<T, E>(&self, result: &Result<T, E>) -> &Histogram {
match result {
Ok(_) => &self.ok,
Err(_) => &self.err,
pub(crate) fn for_get_kind(&self, get_kind: GetKind) -> &Histogram {
match get_kind {
GetKind::Singular => &self.singular,
GetKind::Vectored => &self.vectored,
}
}
}
@@ -142,13 +150,33 @@ pub(crate) static MATERIALIZED_PAGE_CACHE_HIT_DIRECT: Lazy<IntCounter> = Lazy::n
.expect("failed to define a metric")
});
pub(crate) static GET_RECONSTRUCT_DATA_TIME: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
pub(crate) struct ReconstructDataTimeMetrics {
singular: Histogram,
vectored: Histogram,
}
impl ReconstructDataTimeMetrics {
pub(crate) fn for_get_kind(&self, get_kind: GetKind) -> &Histogram {
match get_kind {
GetKind::Singular => &self.singular,
GetKind::Vectored => &self.vectored,
}
}
}
pub(crate) static GET_RECONSTRUCT_DATA_TIME: Lazy<ReconstructDataTimeMetrics> = Lazy::new(|| {
let inner = register_histogram_vec!(
"pageserver_getpage_get_reconstruct_data_seconds",
"Time spent in get_reconstruct_value_data",
&["get_kind"],
CRITICAL_OP_BUCKETS.into(),
)
.expect("failed to define a metric")
.expect("failed to define a metric");
ReconstructDataTimeMetrics {
singular: inner.with_label_values(&[GetKind::Singular.into()]),
vectored: inner.with_label_values(&[GetKind::Vectored.into()]),
}
});
pub(crate) static MATERIALIZED_PAGE_CACHE_HIT: Lazy<IntCounter> = Lazy::new(|| {
@@ -1491,35 +1519,6 @@ pub(crate) static DELETION_QUEUE: Lazy<DeletionQueueMetrics> = Lazy::new(|| {
}
});
pub(crate) struct WalIngestMetrics {
pub(crate) bytes_received: IntCounter,
pub(crate) records_received: IntCounter,
pub(crate) records_committed: IntCounter,
pub(crate) records_filtered: IntCounter,
}
pub(crate) static WAL_INGEST: Lazy<WalIngestMetrics> = Lazy::new(|| WalIngestMetrics {
bytes_received: register_int_counter!(
"pageserver_wal_ingest_bytes_received",
"Bytes of WAL ingested from safekeepers",
)
.unwrap(),
records_received: register_int_counter!(
"pageserver_wal_ingest_records_received",
"Number of WAL records received from safekeepers"
)
.expect("failed to define a metric"),
records_committed: register_int_counter!(
"pageserver_wal_ingest_records_committed",
"Number of WAL records which resulted in writes to pageserver storage"
)
.expect("failed to define a metric"),
records_filtered: register_int_counter!(
"pageserver_wal_ingest_records_filtered",
"Number of WAL records filtered out due to sharding"
)
.expect("failed to define a metric"),
});
pub(crate) struct SecondaryModeMetrics {
pub(crate) upload_heatmap: IntCounter,
pub(crate) upload_heatmap_errors: IntCounter,
@@ -1721,6 +1720,43 @@ macro_rules! redo_bytes_histogram_count_buckets {
};
}
pub(crate) struct WalIngestMetrics {
pub(crate) bytes_received: IntCounter,
pub(crate) records_received: IntCounter,
pub(crate) records_committed: IntCounter,
pub(crate) records_filtered: IntCounter,
pub(crate) time_spent_on_ingest: Histogram,
}
pub(crate) static WAL_INGEST: Lazy<WalIngestMetrics> = Lazy::new(|| WalIngestMetrics {
bytes_received: register_int_counter!(
"pageserver_wal_ingest_bytes_received",
"Bytes of WAL ingested from safekeepers",
)
.unwrap(),
records_received: register_int_counter!(
"pageserver_wal_ingest_records_received",
"Number of WAL records received from safekeepers"
)
.expect("failed to define a metric"),
records_committed: register_int_counter!(
"pageserver_wal_ingest_records_committed",
"Number of WAL records which resulted in writes to pageserver storage"
)
.expect("failed to define a metric"),
records_filtered: register_int_counter!(
"pageserver_wal_ingest_records_filtered",
"Number of WAL records filtered out due to sharding"
)
.expect("failed to define a metric"),
time_spent_on_ingest: register_histogram!(
"pageserver_wal_ingest_put_value_seconds",
"Actual time spent on ingesting a record",
redo_histogram_time_buckets!(),
)
.expect("failed to define a metric"),
});
pub(crate) static WAL_REDO_TIME: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"pageserver_wal_redo_seconds",

View File

@@ -1,13 +1,5 @@
//
//! The Page Service listens for client connections and serves their GetPage@LSN
//! requests.
//
// It is possible to connect here using usual psql/pgbench/libpq. Following
// commands are supported now:
// *status* -- show actual info about this pageserver,
// *pagestream* -- enter mode where smgr and pageserver talk with their
// custom protocol.
//
use anyhow::Context;
use async_compression::tokio::write::GzipEncoder;
@@ -23,7 +15,7 @@ use pageserver_api::models::{
PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse,
PagestreamGetSlruSegmentRequest, PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest,
PagestreamNblocksResponse,
PagestreamNblocksResponse, PagestreamProtocolVersion,
};
use pageserver_api::shard::ShardIndex;
use pageserver_api::shard::ShardNumber;
@@ -551,6 +543,7 @@ impl PageServerHandler {
pgb: &mut PostgresBackend<IO>,
tenant_id: TenantId,
timeline_id: TimelineId,
protocol_version: PagestreamProtocolVersion,
ctx: RequestContext,
) -> Result<(), QueryError>
where
@@ -613,14 +606,15 @@ impl PageServerHandler {
t.trace(&copy_data_bytes)
}
let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
let neon_fe_msg =
PagestreamFeMessage::parse(&mut copy_data_bytes.reader(), protocol_version)?;
// TODO: We could create a new per-request context here, with unique ID.
// Currently we use the same per-timeline context for all requests
let (response, span) = match neon_fe_msg {
PagestreamFeMessage::Exists(req) => {
let span = tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.lsn);
let span = tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.request_lsn);
(
self.handle_get_rel_exists_request(tenant_id, timeline_id, &req, &ctx)
.instrument(span.clone())
@@ -629,7 +623,7 @@ impl PageServerHandler {
)
}
PagestreamFeMessage::Nblocks(req) => {
let span = tracing::info_span!("handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.lsn);
let span = tracing::info_span!("handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.request_lsn);
(
self.handle_get_nblocks_request(tenant_id, timeline_id, &req, &ctx)
.instrument(span.clone())
@@ -639,7 +633,7 @@ impl PageServerHandler {
}
PagestreamFeMessage::GetPage(req) => {
// shard_id is filled in by the handler
let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn);
let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.request_lsn);
(
self.handle_get_page_at_lsn_request(tenant_id, timeline_id, &req, &ctx)
.instrument(span.clone())
@@ -648,7 +642,7 @@ impl PageServerHandler {
)
}
PagestreamFeMessage::DbSize(req) => {
let span = tracing::info_span!("handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.lsn);
let span = tracing::info_span!("handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.request_lsn);
(
self.handle_db_size_request(tenant_id, timeline_id, &req, &ctx)
.instrument(span.clone())
@@ -657,7 +651,7 @@ impl PageServerHandler {
)
}
PagestreamFeMessage::GetSlruSegment(req) => {
let span = tracing::info_span!("handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.lsn);
let span = tracing::info_span!("handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.request_lsn);
(
self.handle_get_slru_segment_request(tenant_id, timeline_id, &req, &ctx)
.instrument(span.clone())
@@ -838,83 +832,80 @@ impl PageServerHandler {
/// Helper function to handle the LSN from client request.
///
/// Each GetPage (and Exists and Nblocks) request includes information about
/// which version of the page is being requested. The client can request the
/// latest version of the page, or the version that's valid at a particular
/// LSN. The primary compute node will always request the latest page
/// version, while a standby will request a version at the LSN that it's
/// currently caught up to.
/// which version of the page is being requested. The primary compute node
/// will always request the latest page version, by setting 'request_lsn' to
/// the last inserted or flushed WAL position, while a standby will request
/// a version at the LSN that it's currently caught up to.
///
/// In either case, if the page server hasn't received the WAL up to the
/// requested LSN yet, we will wait for it to arrive. The return value is
/// the LSN that should be used to look up the page versions.
///
/// In addition to the request LSN, each request carries another LSN,
/// 'not_modified_since', which is a hint to the pageserver that the client
/// knows that the page has not been modified between 'not_modified_since'
/// and the request LSN. This allows skipping the wait, as long as the WAL
/// up to 'not_modified_since' has arrived. If the client doesn't have any
/// information about when the page was modified, it will use
/// not_modified_since == lsn. If the client lies and sends a too low
/// not_modified_hint such that there are in fact later page versions, the
/// behavior is undefined: the pageserver may return any of the page versions
/// or an error.
async fn wait_or_get_last_lsn(
timeline: &Timeline,
mut lsn: Lsn,
latest: bool,
request_lsn: Lsn,
not_modified_since: Lsn,
latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
ctx: &RequestContext,
) -> Result<Lsn, PageStreamError> {
if latest {
// Latest page version was requested. If LSN is given, it is a hint
// to the page server that there have been no modifications to the
// page after that LSN. If we haven't received WAL up to that point,
// wait until it arrives.
let last_record_lsn = timeline.get_last_record_lsn();
let last_record_lsn = timeline.get_last_record_lsn();
// Note: this covers the special case that lsn == Lsn(0). That
// special case means "return the latest version whatever it is",
// and it's used for bootstrapping purposes, when the page server is
// connected directly to the compute node. That is needed because
// when you connect to the compute node, to receive the WAL, the
// walsender process will do a look up in the pg_authid catalog
// table for authentication. That poses a deadlock problem: the
// catalog table lookup will send a GetPage request, but the GetPage
// request will block in the page server because the recent WAL
// hasn't been received yet, and it cannot be received until the
// walsender completes the authentication and starts streaming the
// WAL.
if lsn <= last_record_lsn {
// It might be better to use max(lsn, latest_gc_cutoff_lsn) instead
// last_record_lsn. That would give the same result, since we know
// that there haven't been modifications since 'lsn'. Using an older
// LSN might be faster, because that could allow skipping recent
// layers when finding the page.
lsn = last_record_lsn;
// Sanity check the request
if request_lsn < not_modified_since {
return Err(PageStreamError::BadRequest(
format!(
"invalid request with request LSN {} and not_modified_since {}",
request_lsn, not_modified_since,
)
.into(),
));
}
if request_lsn < **latest_gc_cutoff_lsn {
// Check explicitly for INVALID just to get a less scary error message if the
// request is obviously bogus
return Err(if request_lsn == Lsn::INVALID {
PageStreamError::BadRequest("invalid LSN(0) in request".into())
} else {
timeline
.wait_lsn(
lsn,
crate::tenant::timeline::WaitLsnWaiter::PageService,
ctx,
)
.await?;
// Since we waited for 'lsn' to arrive, that is now the last
// record LSN. (Or close enough for our purposes; the
// last-record LSN can advance immediately after we return
// anyway)
}
} else {
if lsn == Lsn(0) {
return Err(PageStreamError::BadRequest(
"invalid LSN(0) in request".into(),
));
}
PageStreamError::BadRequest(format!(
"tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
request_lsn, **latest_gc_cutoff_lsn
).into())
});
}
// Wait for WAL up to 'not_modified_since' to arrive, if necessary
if not_modified_since > last_record_lsn {
timeline
.wait_lsn(
lsn,
not_modified_since,
crate::tenant::timeline::WaitLsnWaiter::PageService,
ctx,
)
.await?;
// Since we waited for 'not_modified_since' to arrive, that is now the last
// record LSN. (Or close enough for our purposes; the last-record LSN can
// advance immediately after we return anyway)
Ok(not_modified_since)
} else {
// It might be better to use max(not_modified_since, latest_gc_cutoff_lsn)
// here instead. That would give the same result, since we know that there
// haven't been any modifications since 'not_modified_since'. Using an older
// LSN might be faster, because that could allow skipping recent layers when
// finding the page. However, we have historically used 'last_record_lsn', so
// stick to that for now.
Ok(std::cmp::min(last_record_lsn, request_lsn))
}
if lsn < **latest_gc_cutoff_lsn {
return Err(PageStreamError::BadRequest(format!(
"tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
lsn, **latest_gc_cutoff_lsn
).into()));
}
Ok(lsn)
}
#[instrument(skip_all, fields(shard_id))]
@@ -931,12 +922,17 @@ impl PageServerHandler {
.start_timer(metrics::SmgrQueryType::GetRelExists, ctx);
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn =
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
.await?;
let lsn = Self::wait_or_get_last_lsn(
timeline,
req.request_lsn,
req.not_modified_since,
&latest_gc_cutoff_lsn,
ctx,
)
.await?;
let exists = timeline
.get_rel_exists(req.rel, Version::Lsn(lsn), req.latest, ctx)
.get_rel_exists(req.rel, Version::Lsn(lsn), ctx)
.await?;
Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse {
@@ -959,12 +955,17 @@ impl PageServerHandler {
.start_timer(metrics::SmgrQueryType::GetRelSize, ctx);
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn =
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
.await?;
let lsn = Self::wait_or_get_last_lsn(
timeline,
req.request_lsn,
req.not_modified_since,
&latest_gc_cutoff_lsn,
ctx,
)
.await?;
let n_blocks = timeline
.get_rel_size(req.rel, Version::Lsn(lsn), req.latest, ctx)
.get_rel_size(req.rel, Version::Lsn(lsn), ctx)
.await?;
Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse {
@@ -987,18 +988,17 @@ impl PageServerHandler {
.start_timer(metrics::SmgrQueryType::GetDbSize, ctx);
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn =
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
.await?;
let lsn = Self::wait_or_get_last_lsn(
timeline,
req.request_lsn,
req.not_modified_since,
&latest_gc_cutoff_lsn,
ctx,
)
.await?;
let total_blocks = timeline
.get_db_size(
DEFAULTTABLESPACE_OID,
req.dbnode,
Version::Lsn(lsn),
req.latest,
ctx,
)
.get_db_size(DEFAULTTABLESPACE_OID, req.dbnode, Version::Lsn(lsn), ctx)
.await?;
let db_size = total_blocks as i64 * BLCKSZ as i64;
@@ -1165,12 +1165,17 @@ impl PageServerHandler {
.start_timer(metrics::SmgrQueryType::GetPageAtLsn, ctx);
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn =
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
.await?;
let lsn = Self::wait_or_get_last_lsn(
timeline,
req.request_lsn,
req.not_modified_since,
&latest_gc_cutoff_lsn,
ctx,
)
.await?;
let page = timeline
.get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), req.latest, ctx)
.get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), ctx)
.await?;
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
@@ -1193,9 +1198,14 @@ impl PageServerHandler {
.start_timer(metrics::SmgrQueryType::GetSlruSegment, ctx);
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn =
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
.await?;
let lsn = Self::wait_or_get_last_lsn(
timeline,
req.request_lsn,
req.not_modified_since,
&latest_gc_cutoff_lsn,
ctx,
)
.await?;
let kind = SlruKind::from_repr(req.kind)
.ok_or(PageStreamError::BadRequest("invalid SLRU kind".into()))?;
@@ -1358,8 +1368,7 @@ impl PageServerHandler {
}
}
#[async_trait::async_trait]
impl<IO> postgres_backend::Handler<IO> for PageServerHandler
impl<IO> postgres_backend::HandlerSync<IO> for PageServerHandler
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
{
@@ -1399,9 +1408,24 @@ where
) -> Result<(), QueryError> {
Ok(())
}
}
type IO<'s> = std::pin::Pin<&'s mut tokio_io_timeout::TimeoutReader<tokio::net::TcpStream>>;
impl<'s> postgres_backend::Handler<IO<'s>> for PageServerHandler
{
#[instrument(skip_all, fields(tenant_id, timeline_id))]
async fn process_query(
&mut self,
pgb: &mut PostgresBackend<IO<'s>>,
query_string: &str,
) -> Result<(), QueryError> {
self.process_query_(pgb, &query_string).await
}
}
impl PageServerHandler {
async fn process_query_<IO: AsyncRead + AsyncWrite + Send + Sync + Unpin>(
&mut self,
pgb: &mut PostgresBackend<IO>,
query_string: &str,
@@ -1413,7 +1437,34 @@ where
let ctx = self.connection_ctx.attached_child();
debug!("process query {query_string:?}");
if query_string.starts_with("pagestream ") {
if query_string.starts_with("pagestream_v2 ") {
let (_, params_raw) = query_string.split_at("pagestream_v2 ".len());
let params = params_raw.split(' ').collect::<Vec<_>>();
if params.len() != 2 {
return Err(QueryError::Other(anyhow::anyhow!(
"invalid param number for pagestream command"
)));
}
let tenant_id = TenantId::from_str(params[0])
.with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
let timeline_id = TimelineId::from_str(params[1])
.with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
tracing::Span::current()
.record("tenant_id", field::display(tenant_id))
.record("timeline_id", field::display(timeline_id));
self.check_permission(Some(tenant_id))?;
self.handle_pagerequests(
pgb,
tenant_id,
timeline_id,
PagestreamProtocolVersion::V2,
ctx,
)
.await?;
} else if query_string.starts_with("pagestream ") {
let (_, params_raw) = query_string.split_at("pagestream ".len());
let params = params_raw.split(' ').collect::<Vec<_>>();
if params.len() != 2 {
@@ -1432,8 +1483,14 @@ where
self.check_permission(Some(tenant_id))?;
self.handle_pagerequests(pgb, tenant_id, timeline_id, ctx)
.await?;
self.handle_pagerequests(
pgb,
tenant_id,
timeline_id,
PagestreamProtocolVersion::V1,
ctx,
)
.await?;
} else if query_string.starts_with("basebackup ") {
let (_, params_raw) = query_string.split_at("basebackup ".len());
let params = params_raw.split_whitespace().collect::<Vec<_>>();

View File

@@ -9,6 +9,7 @@
use super::tenant::{PageReconstructError, Timeline};
use crate::context::RequestContext;
use crate::keyspace::{KeySpace, KeySpaceAccum};
use crate::metrics::WAL_INGEST;
use crate::repository::*;
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
use crate::walrecord::NeonWalRecord;
@@ -175,7 +176,6 @@ impl Timeline {
tag: RelTag,
blknum: BlockNumber,
version: Version<'_>,
latest: bool,
ctx: &RequestContext,
) -> Result<Bytes, PageReconstructError> {
if tag.relnode == 0 {
@@ -184,7 +184,7 @@ impl Timeline {
));
}
let nblocks = self.get_rel_size(tag, version, latest, ctx).await?;
let nblocks = self.get_rel_size(tag, version, ctx).await?;
if blknum >= nblocks {
debug!(
"read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
@@ -206,7 +206,6 @@ impl Timeline {
spcnode: Oid,
dbnode: Oid,
version: Version<'_>,
latest: bool,
ctx: &RequestContext,
) -> Result<usize, PageReconstructError> {
let mut total_blocks = 0;
@@ -214,7 +213,7 @@ impl Timeline {
let rels = self.list_rels(spcnode, dbnode, version, ctx).await?;
for rel in rels {
let n_blocks = self.get_rel_size(rel, version, latest, ctx).await?;
let n_blocks = self.get_rel_size(rel, version, ctx).await?;
total_blocks += n_blocks as usize;
}
Ok(total_blocks)
@@ -225,7 +224,6 @@ impl Timeline {
&self,
tag: RelTag,
version: Version<'_>,
latest: bool,
ctx: &RequestContext,
) -> Result<BlockNumber, PageReconstructError> {
if tag.relnode == 0 {
@@ -239,7 +237,7 @@ impl Timeline {
}
if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
&& !self.get_rel_exists(tag, version, latest, ctx).await?
&& !self.get_rel_exists(tag, version, ctx).await?
{
// FIXME: Postgres sometimes calls smgrcreate() to create
// FSM, and smgrnblocks() on it immediately afterwards,
@@ -262,7 +260,6 @@ impl Timeline {
&self,
tag: RelTag,
version: Version<'_>,
_latest: bool,
ctx: &RequestContext,
) -> Result<bool, PageReconstructError> {
if tag.relnode == 0 {
@@ -448,6 +445,11 @@ impl Timeline {
// include physical changes from later commits that will be marked
// as aborted, and will need to be vacuumed away.
let commit_lsn = Lsn((low - 1) * 8);
// This maxing operation is for the edge case that the search above did
// set found_smaller to true but it never increased the lsn. Then, low
// is still the old min_lsn the subtraction above could possibly give a value
// below the anchestor_lsn.
let commit_lsn = commit_lsn.max(min_lsn);
match (found_smaller, found_larger) {
(false, false) => {
// This can happen if no commit records have been processed yet, e.g.
@@ -1089,7 +1091,7 @@ impl<'a> DatadirModification<'a> {
) -> anyhow::Result<()> {
let total_blocks = self
.tline
.get_db_size(spcnode, dbnode, Version::Modified(self), true, ctx)
.get_db_size(spcnode, dbnode, Version::Modified(self), ctx)
.await?;
// Remove entry from dbdir
@@ -1188,7 +1190,7 @@ impl<'a> DatadirModification<'a> {
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
if self
.tline
.get_rel_exists(rel, Version::Modified(self), true, ctx)
.get_rel_exists(rel, Version::Modified(self), ctx)
.await?
{
let size_key = rel_size_to_key(rel);
@@ -1546,6 +1548,8 @@ impl<'a> DatadirModification<'a> {
pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
let mut writer = self.tline.writer().await;
let timer = WAL_INGEST.time_spent_on_ingest.start_timer();
let pending_nblocks = self.pending_nblocks;
self.pending_nblocks = 0;
@@ -1585,6 +1589,8 @@ impl<'a> DatadirModification<'a> {
writer.update_directory_entries_count(kind, count as u64);
}
timer.observe_duration();
Ok(())
}

View File

@@ -3865,6 +3865,7 @@ mod tests {
use pageserver_api::key::NON_INHERITED_RANGE;
use pageserver_api::keyspace::KeySpace;
use rand::{thread_rng, Rng};
use tests::storage_layer::ValuesReconstructState;
use tests::timeline::{GetVectoredError, ShutdownMode};
static TEST_KEY: Lazy<Key> =
@@ -4653,7 +4654,9 @@ mod tests {
for read in reads {
info!("Doing vectored read on {:?}", read);
let vectored_res = tline.get_vectored_impl(read.clone(), reads_lsn, &ctx).await;
let vectored_res = tline
.get_vectored_impl(read.clone(), reads_lsn, ValuesReconstructState::new(), &ctx)
.await;
tline
.validate_get_vectored_impl(&vectored_res, read, reads_lsn, &ctx)
.await;
@@ -4698,7 +4701,12 @@ mod tests {
let read_lsn = child_timeline.get_last_record_lsn();
let vectored_res = child_timeline
.get_vectored_impl(aux_keyspace.clone(), read_lsn, &ctx)
.get_vectored_impl(
aux_keyspace.clone(),
read_lsn,
ValuesReconstructState::new(),
&ctx,
)
.await;
child_timeline
@@ -4846,7 +4854,12 @@ mod tests {
ranges: vec![key_near_gap..gap_at_key.next(), key_near_end..current_key],
};
let results = child_timeline
.get_vectored_impl(read.clone(), current_lsn, &ctx)
.get_vectored_impl(
read.clone(),
current_lsn,
ValuesReconstructState::new(),
&ctx,
)
.await?;
for (key, img_res) in results {
@@ -4979,6 +4992,7 @@ mod tests {
ranges: vec![child_gap_at_key..child_gap_at_key.next()],
},
query_lsn,
ValuesReconstructState::new(),
&ctx,
)
.await;

View File

@@ -7,6 +7,7 @@ use std::collections::HashSet;
use std::future::Future;
use anyhow::{anyhow, Context};
use bytes::BytesMut;
use camino::{Utf8Path, Utf8PathBuf};
use pageserver_api::shard::TenantShardId;
use tokio::fs::{self, File, OpenOptions};
@@ -182,6 +183,7 @@ async fn download_object<'a>(
#[cfg(target_os = "linux")]
crate::virtual_file::io_engine::IoEngine::TokioEpollUring => {
use crate::virtual_file::owned_buffers_io::{self, util::size_tracking_writer};
use bytes::BytesMut;
async {
let destination_file = VirtualFile::create(dst_path)
.await
@@ -194,10 +196,10 @@ async fn download_object<'a>(
// There's chunks_vectored() on the stream.
let (bytes_amount, destination_file) = async {
let size_tracking = size_tracking_writer::Writer::new(destination_file);
let mut buffered = owned_buffers_io::write::BufferedWriter::<
{ super::BUFFER_SIZE },
_,
>::new(size_tracking);
let mut buffered = owned_buffers_io::write::BufferedWriter::<BytesMut, _>::new(
size_tracking,
BytesMut::with_capacity(super::BUFFER_SIZE),
);
while let Some(res) =
futures::StreamExt::next(&mut download.download_stream).await
{

View File

@@ -148,6 +148,29 @@ impl ValuesReconstructState {
self.layers_visited
}
/// This function is called after reading a keyspace from a layer.
/// It checks if the read path has now moved past the cached Lsn for any keys.
///
/// Implementation note: We intentionally iterate over the keys for which we've
/// already collected some reconstruct data. This avoids scaling complexity with
/// the size of the search space.
pub(crate) fn on_lsn_advanced(&mut self, keyspace: &KeySpace, advanced_to: Lsn) {
for (key, value) in self.keys.iter_mut() {
if !keyspace.contains(key) {
continue;
}
if let Ok(state) = value {
if state.situation != ValueReconstructSituation::Complete
&& state.get_cached_lsn() >= Some(advanced_to)
{
state.situation = ValueReconstructSituation::Complete;
self.keys_done.add_key(*key);
}
}
}
}
/// Update the state collected for a given key.
/// Returns true if this was the last value needed for the key and false otherwise.
///
@@ -172,11 +195,18 @@ impl ValuesReconstructState {
true
}
Value::WalRecord(rec) => {
let reached_cache =
state.get_cached_lsn().map(|clsn| clsn + 1) == Some(lsn);
debug_assert!(
Some(lsn) > state.get_cached_lsn(),
"Attempt to collect a record below cached LSN for walredo: {} < {}",
lsn,
state
.get_cached_lsn()
.expect("Assertion can only fire if a cached lsn is present")
);
let will_init = rec.will_init();
state.records.push((lsn, rec));
will_init || reached_cache
will_init
}
},
};

View File

@@ -217,6 +217,7 @@ pub struct DeltaLayerInner {
// values copied from summary
index_start_blk: u32,
index_root_blk: u32,
lsn_range: Range<Lsn>,
file: VirtualFile,
file_id: FileId,
@@ -745,6 +746,7 @@ impl DeltaLayerInner {
file_id,
index_start_blk: actual_summary.index_start_blk,
index_root_blk: actual_summary.index_root_blk,
lsn_range: actual_summary.lsn_range,
max_vectored_read_bytes,
}))
}
@@ -869,7 +871,7 @@ impl DeltaLayerInner {
let data_end_offset = self.index_start_offset();
let reads = Self::plan_reads(
keyspace,
&keyspace,
lsn_range,
data_end_offset,
index_reader,
@@ -883,11 +885,13 @@ impl DeltaLayerInner {
self.do_reads_and_update_state(reads, reconstruct_state)
.await;
reconstruct_state.on_lsn_advanced(&keyspace, self.lsn_range.start);
Ok(())
}
async fn plan_reads<Reader>(
keyspace: KeySpace,
keyspace: &KeySpace,
lsn_range: Range<Lsn>,
data_end_offset: u64,
index_reader: DiskBtreeReader<Reader, DELTA_KEY_SIZE>,
@@ -1535,7 +1539,7 @@ mod test {
// Plan and validate
let vectored_reads = DeltaLayerInner::plan_reads(
keyspace.clone(),
&keyspace,
lsn_range.clone(),
disk_offset,
reader,
@@ -1787,7 +1791,7 @@ mod test {
let data_end_offset = inner.index_start_blk as u64 * PAGE_SZ as u64;
let vectored_reads = DeltaLayerInner::plan_reads(
keyspace.clone(),
&keyspace,
entries_meta.lsn_range.clone(),
data_end_offset,
index_reader,

View File

@@ -438,6 +438,8 @@ impl InMemoryLayer {
}
}
reconstruct_state.on_lsn_advanced(&keyspace, self.start_lsn);
Ok(())
}
}

View File

@@ -336,6 +336,12 @@ impl Layer {
.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_data, &self.0, ctx)
.instrument(tracing::debug_span!("get_values_reconstruct_data", layer=%self))
.await
.map_err(|err| match err {
GetVectoredError::Other(err) => GetVectoredError::Other(
err.context(format!("get_values_reconstruct_data for layer {self}")),
),
err => err,
})
}
/// Download the layer if evicted.

View File

@@ -62,7 +62,7 @@ impl BackgroundLoopKind {
pub(crate) async fn concurrent_background_tasks_rate_limit_permit(
loop_kind: BackgroundLoopKind,
_ctx: &RequestContext,
) -> impl Drop {
) -> tokio::sync::SemaphorePermit<'static> {
let _guard = crate::metrics::BACKGROUND_LOOP_SEMAPHORE_WAIT_GAUGE
.with_label_values(&[loop_kind.as_static_str()])
.guard();

View File

@@ -86,7 +86,7 @@ use crate::{
use crate::config::PageServerConf;
use crate::keyspace::{KeyPartitioning, KeySpace};
use crate::metrics::{
TimelineMetrics, MATERIALIZED_PAGE_CACHE_HIT, MATERIALIZED_PAGE_CACHE_HIT_DIRECT,
GetKind, TimelineMetrics, MATERIALIZED_PAGE_CACHE_HIT, MATERIALIZED_PAGE_CACHE_HIT_DIRECT,
};
use crate::pgdatadir_mapping::CalculateLogicalSizeError;
use crate::tenant::config::TenantConfOpt;
@@ -119,8 +119,8 @@ use self::layer_manager::LayerManager;
use self::logical_size::LogicalSize;
use self::walreceiver::{WalReceiver, WalReceiverConf};
use super::config::TenantConf;
use super::secondary::heatmap::{HeatMapLayer, HeatMapTimeline};
use super::{config::TenantConf, storage_layer::VectoredValueReconstructState};
use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
use super::{remote_timeline_client::index::IndexPart, storage_layer::LayerFringe};
use super::{remote_timeline_client::RemoteTimelineClient, storage_layer::ReadableLayer};
@@ -653,6 +653,19 @@ impl From<GetVectoredError> for CreateImageLayersError {
}
}
impl From<GetVectoredError> for PageReconstructError {
fn from(e: GetVectoredError) -> Self {
match e {
GetVectoredError::Cancelled => PageReconstructError::Cancelled,
GetVectoredError::InvalidLsn(_) => PageReconstructError::Other(anyhow!("Invalid LSN")),
err @ GetVectoredError::Oversized(_) => PageReconstructError::Other(err.into()),
err @ GetVectoredError::MissingKey(_) => PageReconstructError::Other(err.into()),
GetVectoredError::GetReadyAncestorError(err) => PageReconstructError::from(err),
GetVectoredError::Other(err) => PageReconstructError::Other(err),
}
}
}
impl From<GetReadyAncestorError> for PageReconstructError {
fn from(e: GetReadyAncestorError) -> Self {
use GetReadyAncestorError::*;
@@ -682,6 +695,23 @@ pub enum GetVectoredImpl {
Vectored,
}
#[derive(
Eq,
PartialEq,
Debug,
Copy,
Clone,
strum_macros::EnumString,
strum_macros::Display,
serde_with::DeserializeFromStr,
serde_with::SerializeDisplay,
)]
#[strum(serialize_all = "kebab-case")]
pub enum GetImpl {
Legacy,
Vectored,
}
pub(crate) enum WaitLsnWaiter<'a> {
Timeline(&'a Timeline),
Tenant,
@@ -743,16 +773,6 @@ impl Timeline {
key: Key,
lsn: Lsn,
ctx: &RequestContext,
) -> Result<Bytes, PageReconstructError> {
self.timeline_get_throttle.throttle(ctx, 1).await;
self.get_impl(key, lsn, ctx).await
}
/// Not subject to [`Self::timeline_get_throttle`].
async fn get_impl(
&self,
key: Key,
lsn: Lsn,
ctx: &RequestContext,
) -> Result<Bytes, PageReconstructError> {
if !lsn.is_valid() {
return Err(PageReconstructError::Other(anyhow::anyhow!("Invalid LSN")));
@@ -763,13 +783,7 @@ impl Timeline {
// page_service.
debug_assert!(!self.shard_identity.is_key_disposable(&key));
// XXX: structured stats collection for layer eviction here.
trace!(
"get page request for {}@{} from task kind {:?}",
key,
lsn,
ctx.task_kind()
);
self.timeline_get_throttle.throttle(ctx, 1).await;
// Check the page cache. We will get back the most recent page with lsn <= `lsn`.
// The cached image can be returned directly if there is no WAL between the cached image
@@ -792,12 +806,85 @@ impl Timeline {
None => None,
};
let mut reconstruct_state = ValueReconstructState {
records: Vec::new(),
img: cached_page_img,
};
match self.conf.get_impl {
GetImpl::Legacy => {
let reconstruct_state = ValueReconstructState {
records: Vec::new(),
img: cached_page_img,
};
let timer = crate::metrics::GET_RECONSTRUCT_DATA_TIME.start_timer();
self.get_impl(key, lsn, reconstruct_state, ctx).await
}
GetImpl::Vectored => {
let keyspace = KeySpace {
ranges: vec![key..key.next()],
};
// Initialise the reconstruct state for the key with the cache
// entry returned above.
let mut reconstruct_state = ValuesReconstructState::new();
let mut key_state = VectoredValueReconstructState::default();
key_state.img = cached_page_img;
reconstruct_state.keys.insert(key, Ok(key_state));
let vectored_res = self
.get_vectored_impl(keyspace.clone(), lsn, reconstruct_state, ctx)
.await;
if self.conf.validate_vectored_get {
self.validate_get_vectored_impl(&vectored_res, keyspace, lsn, ctx)
.await;
}
let key_value = vectored_res?.pop_first();
match key_value {
Some((got_key, value)) => {
if got_key != key {
error!(
"Expected {}, but singular vectored get returned {}",
key, got_key
);
Err(PageReconstructError::Other(anyhow!(
"Singular vectored get returned wrong key"
)))
} else {
value
}
}
None => {
error!(
"Expected {}, but singular vectored get returned nothing",
key
);
Err(PageReconstructError::Other(anyhow!(
"Singular vectored get did not return a value for {}",
key
)))
}
}
}
}
}
/// Not subject to [`Self::timeline_get_throttle`].
async fn get_impl(
&self,
key: Key,
lsn: Lsn,
mut reconstruct_state: ValueReconstructState,
ctx: &RequestContext,
) -> Result<Bytes, PageReconstructError> {
// XXX: structured stats collection for layer eviction here.
trace!(
"get page request for {}@{} from task kind {:?}",
key,
lsn,
ctx.task_kind()
);
let timer = crate::metrics::GET_RECONSTRUCT_DATA_TIME
.for_get_kind(GetKind::Singular)
.start_timer();
let path = self
.get_reconstruct_data(key, lsn, &mut reconstruct_state, ctx)
.await?;
@@ -807,7 +894,7 @@ impl Timeline {
let res = self.reconstruct_value(key, lsn, reconstruct_state).await;
let elapsed = start.elapsed();
crate::metrics::RECONSTRUCT_TIME
.for_result(&res)
.for_get_kind(GetKind::Singular)
.observe(elapsed.as_secs_f64());
if cfg!(feature = "testing") && res.is_err() {
@@ -886,7 +973,9 @@ impl Timeline {
self.get_vectored_sequential_impl(keyspace, lsn, ctx).await
}
GetVectoredImpl::Vectored => {
let vectored_res = self.get_vectored_impl(keyspace.clone(), lsn, ctx).await;
let vectored_res = self
.get_vectored_impl(keyspace.clone(), lsn, ValuesReconstructState::new(), ctx)
.await;
if self.conf.validate_vectored_get {
self.validate_get_vectored_impl(&vectored_res, keyspace, lsn, ctx)
@@ -932,7 +1021,9 @@ impl Timeline {
for range in keyspace.ranges {
let mut key = range.start;
while key != range.end {
let block = self.get_impl(key, lsn, ctx).await;
let block = self
.get_impl(key, lsn, ValueReconstructState::default(), ctx)
.await;
use PageReconstructError::*;
match block {
@@ -950,6 +1041,23 @@ impl Timeline {
// level error.
return Err(GetVectoredError::MissingKey(key));
}
Err(Other(err))
if err
.to_string()
.contains("downloading evicted layer file failed") =>
{
return Err(GetVectoredError::Other(err))
}
Err(Other(err))
if err
.chain()
.any(|cause| cause.to_string().contains("layer loading failed")) =>
{
// The intent here is to achieve error parity with the vectored read path.
// When vectored read fails to load a layer it fails the whole read, hence
// we mimic this behaviour here to keep the validation happy.
return Err(GetVectoredError::Other(err));
}
_ => {
values.insert(key, block);
key = key.next();
@@ -965,13 +1073,25 @@ impl Timeline {
&self,
keyspace: KeySpace,
lsn: Lsn,
mut reconstruct_state: ValuesReconstructState,
ctx: &RequestContext,
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
let mut reconstruct_state = ValuesReconstructState::new();
let get_kind = if keyspace.total_size() == 1 {
GetKind::Singular
} else {
GetKind::Vectored
};
let get_data_timer = crate::metrics::GET_RECONSTRUCT_DATA_TIME
.for_get_kind(get_kind)
.start_timer();
self.get_vectored_reconstruct_data(keyspace, lsn, &mut reconstruct_state, ctx)
.await?;
get_data_timer.stop_and_record();
let reconstruct_timer = crate::metrics::RECONSTRUCT_TIME
.for_get_kind(get_kind)
.start_timer();
let mut results: BTreeMap<Key, Result<Bytes, PageReconstructError>> = BTreeMap::new();
let layers_visited = reconstruct_state.get_layers_visited();
for (key, res) in reconstruct_state.keys {
@@ -987,6 +1107,7 @@ impl Timeline {
}
}
}
reconstruct_timer.stop_and_record();
// Note that this is an approximation. Tracking the exact number of layers visited
// per key requires virtually unbounded memory usage and is inefficient
@@ -3127,55 +3248,61 @@ impl Timeline {
unmapped_keyspace.remove_overlapping_with(&keys_done_last_step);
completed_keyspace.merge(&keys_done_last_step);
let guard = timeline.layers.read().await;
let layers = guard.layer_map();
// Do not descent any further if the last layer we visited
// completed all keys in the keyspace it inspected. This is not
// required for correctness, but avoids visiting extra layers
// which turns out to be a perf bottleneck in some cases.
if !unmapped_keyspace.is_empty() {
let guard = timeline.layers.read().await;
let layers = guard.layer_map();
let in_memory_layer = layers.find_in_memory_layer(|l| {
let start_lsn = l.get_lsn_range().start;
cont_lsn > start_lsn
});
let in_memory_layer = layers.find_in_memory_layer(|l| {
let start_lsn = l.get_lsn_range().start;
cont_lsn > start_lsn
});
match in_memory_layer {
Some(l) => {
let lsn_range = l.get_lsn_range().start..cont_lsn;
fringe.update(
ReadableLayer::InMemoryLayer(l),
unmapped_keyspace.clone(),
lsn_range,
);
}
None => {
for range in unmapped_keyspace.ranges.iter() {
let results = layers.range_search(range.clone(), cont_lsn);
match in_memory_layer {
Some(l) => {
let lsn_range = l.get_lsn_range().start..cont_lsn;
fringe.update(
ReadableLayer::InMemoryLayer(l),
unmapped_keyspace.clone(),
lsn_range,
);
}
None => {
for range in unmapped_keyspace.ranges.iter() {
let results = layers.range_search(range.clone(), cont_lsn);
results
.found
.into_iter()
.map(|(SearchResult { layer, lsn_floor }, keyspace_accum)| {
(
ReadableLayer::PersistentLayer(guard.get_from_desc(&layer)),
keyspace_accum.to_keyspace(),
lsn_floor..cont_lsn,
)
})
.for_each(|(layer, keyspace, lsn_range)| {
fringe.update(layer, keyspace, lsn_range)
});
results
.found
.into_iter()
.map(|(SearchResult { layer, lsn_floor }, keyspace_accum)| {
(
ReadableLayer::PersistentLayer(guard.get_from_desc(&layer)),
keyspace_accum.to_keyspace(),
lsn_floor..cont_lsn,
)
})
.for_each(|(layer, keyspace, lsn_range)| {
fringe.update(layer, keyspace, lsn_range)
});
}
}
}
}
// It's safe to drop the layer map lock after planning the next round of reads.
// The fringe keeps readable handles for the layers which are safe to read even
// if layers were compacted or flushed.
//
// The more interesting consideration is: "Why is the read algorithm still correct
// if the layer map changes while it is operating?". Doing a vectored read on a
// timeline boils down to pushing an imaginary lsn boundary downwards for each range
// covered by the read. The layer map tells us how to move the lsn downwards for a
// range at *a particular point in time*. It is fine for the answer to be different
// at two different time points.
drop(guard);
// It's safe to drop the layer map lock after planning the next round of reads.
// The fringe keeps readable handles for the layers which are safe to read even
// if layers were compacted or flushed.
//
// The more interesting consideration is: "Why is the read algorithm still correct
// if the layer map changes while it is operating?". Doing a vectored read on a
// timeline boils down to pushing an imaginary lsn boundary downwards for each range
// covered by the read. The layer map tells us how to move the lsn downwards for a
// range at *a particular point in time*. It is fine for the answer to be different
// at two different time points.
drop(guard);
}
if let Some((layer_to_read, keyspace_to_read, lsn_range)) = fringe.next_layer() {
let next_cont_lsn = lsn_range.start;

View File

@@ -188,24 +188,10 @@ impl Timeline {
) -> ControlFlow<()> {
let now = SystemTime::now();
let acquire_permit = crate::tenant::tasks::concurrent_background_tasks_rate_limit_permit(
BackgroundLoopKind::Eviction,
ctx,
);
let permit = self.acquire_imitation_permit(cancel, ctx).await?;
let _permit = tokio::select! {
permit = acquire_permit => permit,
_ = cancel.cancelled() => return ControlFlow::Break(()),
_ = self.cancel.cancelled() => return ControlFlow::Break(()),
};
match self
.imitate_layer_accesses(tenant, p, cancel, gate, ctx)
.await
{
ControlFlow::Break(()) => return ControlFlow::Break(()),
ControlFlow::Continue(()) => (),
}
self.imitate_layer_accesses(tenant, p, cancel, gate, permit, ctx)
.await?;
#[derive(Debug, Default)]
struct EvictionStats {
@@ -330,19 +316,27 @@ impl Timeline {
gate: &GateGuard,
ctx: &RequestContext,
) -> ControlFlow<()> {
let permit = self.acquire_imitation_permit(cancel, ctx).await?;
self.imitate_layer_accesses(tenant, p, cancel, gate, permit, ctx)
.await
}
async fn acquire_imitation_permit(
&self,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> ControlFlow<(), tokio::sync::SemaphorePermit<'static>> {
let acquire_permit = crate::tenant::tasks::concurrent_background_tasks_rate_limit_permit(
BackgroundLoopKind::Eviction,
ctx,
);
let _permit = tokio::select! {
permit = acquire_permit => permit,
_ = cancel.cancelled() => return ControlFlow::Break(()),
_ = self.cancel.cancelled() => return ControlFlow::Break(()),
};
self.imitate_layer_accesses(tenant, p, cancel, gate, ctx)
.await
tokio::select! {
permit = acquire_permit => ControlFlow::Continue(permit),
_ = cancel.cancelled() => ControlFlow::Break(()),
_ = self.cancel.cancelled() => ControlFlow::Break(()),
}
}
/// If we evict layers but keep cached values derived from those layers, then
@@ -376,6 +370,7 @@ impl Timeline {
p: &EvictionPolicyLayerAccessThreshold,
cancel: &CancellationToken,
gate: &GateGuard,
permit: tokio::sync::SemaphorePermit<'static>,
ctx: &RequestContext,
) -> ControlFlow<()> {
if !self.tenant_shard_id.is_shard_zero() {
@@ -408,7 +403,28 @@ impl Timeline {
// Make one of the tenant's timelines draw the short straw and run the calculation.
// The others wait until the calculation is done so that they take into account the
// imitated accesses that the winner made.
let mut state = tenant.eviction_task_tenant_state.lock().await;
let (mut state, _permit) = {
if let Ok(locked) = tenant.eviction_task_tenant_state.try_lock() {
(locked, permit)
} else {
// we might need to wait for a long time here in case of pathological synthetic
// size calculation performance
drop(permit);
let locked = tokio::select! {
locked = tenant.eviction_task_tenant_state.lock() => locked,
_ = self.cancel.cancelled() => {
return ControlFlow::Break(())
},
_ = cancel.cancelled() => {
return ControlFlow::Break(())
}
};
// then reacquire -- this will be bad if there is a lot of traffic, but because we
// released the permit, the overall latency will be much better.
let permit = self.acquire_imitation_permit(cancel, ctx).await?;
(locked, permit)
}
};
match state.last_layer_access_imitation {
Some(ts) if ts.elapsed() < inter_imitate_period => { /* no need to run */ }
_ => {

View File

@@ -32,6 +32,7 @@ pub use io_engine::feature_test as io_engine_feature_test;
pub use io_engine::FeatureTestResult as IoEngineFeatureTestResult;
mod metadata;
mod open_options;
use self::owned_buffers_io::write::OwnedAsyncWriter;
pub(crate) use io_engine::IoEngineKind;
pub(crate) use metadata::Metadata;
pub(crate) use open_options::*;
@@ -1083,6 +1084,17 @@ impl Drop for VirtualFile {
}
}
impl OwnedAsyncWriter for VirtualFile {
#[inline(always)]
async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self,
buf: B,
) -> std::io::Result<(usize, B::Buf)> {
let (buf, res) = VirtualFile::write_all(self, buf).await;
res.map(move |v| (v, buf))
}
}
impl OpenFiles {
fn new(num_slots: usize) -> OpenFiles {
let mut slots = Box::new(Vec::with_capacity(num_slots));

View File

@@ -1,33 +1,36 @@
use crate::virtual_file::{owned_buffers_io::write::OwnedAsyncWriter, VirtualFile};
use crate::virtual_file::owned_buffers_io::write::OwnedAsyncWriter;
use tokio_epoll_uring::{BoundedBuf, IoBuf};
pub struct Writer {
dst: VirtualFile,
pub struct Writer<W> {
dst: W,
bytes_amount: u64,
}
impl Writer {
pub fn new(dst: VirtualFile) -> Self {
impl<W> Writer<W> {
pub fn new(dst: W) -> Self {
Self {
dst,
bytes_amount: 0,
}
}
/// Returns the wrapped `VirtualFile` object as well as the number
/// of bytes that were written to it through this object.
pub fn into_inner(self) -> (u64, VirtualFile) {
pub fn into_inner(self) -> (u64, W) {
(self.bytes_amount, self.dst)
}
}
impl OwnedAsyncWriter for Writer {
impl<W> OwnedAsyncWriter for Writer<W>
where
W: OwnedAsyncWriter,
{
#[inline(always)]
async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self,
buf: B,
) -> std::io::Result<(usize, B::Buf)> {
let (buf, res) = self.dst.write_all(buf).await;
let nwritten = res?;
let (nwritten, buf) = self.dst.write_all(buf).await?;
self.bytes_amount += u64::try_from(nwritten).unwrap();
Ok((nwritten, buf))
}

View File

@@ -10,14 +10,14 @@ pub trait OwnedAsyncWriter {
) -> std::io::Result<(usize, B::Buf)>;
}
/// A wrapper aorund an [`OwnedAsyncWriter`] that batches smaller writers
/// into `BUFFER_SIZE`-sized writes.
/// A wrapper aorund an [`OwnedAsyncWriter`] that uses a [`Buffer`] to batch
/// small writes into larger writes of size [`Buffer::cap`].
///
/// # Passthrough Of Large Writers
///
/// Buffered writes larger than the `BUFFER_SIZE` cause the internal
/// buffer to be flushed, even if it is not full yet. Then, the large
/// buffered write is passed through to the unerlying [`OwnedAsyncWriter`].
/// Calls to [`BufferedWriter::write_buffered`] that are larger than [`Buffer::cap`]
/// cause the internal buffer to be flushed prematurely so that the large
/// buffered write is passed through to the underlying [`OwnedAsyncWriter`].
///
/// This pass-through is generally beneficial for throughput, but if
/// the storage backend of the [`OwnedAsyncWriter`] is a shared resource,
@@ -25,24 +25,25 @@ pub trait OwnedAsyncWriter {
///
/// In such cases, a different implementation that always buffers in memory
/// may be preferable.
pub struct BufferedWriter<const BUFFER_SIZE: usize, W> {
pub struct BufferedWriter<B, W> {
writer: W,
// invariant: always remains Some(buf)
// with buf.capacity() == BUFFER_SIZE except
// - while IO is ongoing => goes back to Some() once the IO completed successfully
// - after an IO error => stays `None` forever
// In these exceptional cases, it's `None`.
buf: Option<BytesMut>,
/// invariant: always remains Some(buf) except
/// - while IO is ongoing => goes back to Some() once the IO completed successfully
/// - after an IO error => stays `None` forever
/// In these exceptional cases, it's `None`.
buf: Option<B>,
}
impl<const BUFFER_SIZE: usize, W> BufferedWriter<BUFFER_SIZE, W>
impl<B, Buf, W> BufferedWriter<B, W>
where
B: Buffer<IoBuf = Buf> + Send,
Buf: IoBuf + Send,
W: OwnedAsyncWriter,
{
pub fn new(writer: W) -> Self {
pub fn new(writer: W, buf: B) -> Self {
Self {
writer,
buf: Some(BytesMut::with_capacity(BUFFER_SIZE)),
buf: Some(buf),
}
}
@@ -53,61 +54,121 @@ where
Ok(writer)
}
pub async fn write_buffered<B: IoBuf>(&mut self, chunk: Slice<B>) -> std::io::Result<()>
#[inline(always)]
fn buf(&self) -> &B {
self.buf
.as_ref()
.expect("must not use after we returned an error")
}
pub async fn write_buffered<S: IoBuf>(&mut self, chunk: Slice<S>) -> std::io::Result<(usize, S)>
where
B: IoBuf + Send,
S: IoBuf + Send,
{
let chunk_len = chunk.len();
// avoid memcpy for the middle of the chunk
if chunk.len() >= BUFFER_SIZE {
if chunk.len() >= self.buf().cap() {
self.flush().await?;
// do a big write, bypassing `buf`
assert_eq!(
self.buf
.as_ref()
.expect("must not use after an error")
.len(),
.pending(),
0
);
let chunk_len = chunk.len();
let (nwritten, chunk) = self.writer.write_all(chunk).await?;
assert_eq!(nwritten, chunk_len);
drop(chunk);
return Ok(());
return Ok((nwritten, chunk));
}
// in-memory copy the < BUFFER_SIZED tail of the chunk
assert!(chunk.len() < BUFFER_SIZE);
let mut chunk = &chunk[..];
while !chunk.is_empty() {
assert!(chunk.len() < self.buf().cap());
let mut slice = &chunk[..];
while !slice.is_empty() {
let buf = self.buf.as_mut().expect("must not use after an error");
let need = BUFFER_SIZE - buf.len();
let have = chunk.len();
let need = buf.cap() - buf.pending();
let have = slice.len();
let n = std::cmp::min(need, have);
buf.extend_from_slice(&chunk[..n]);
chunk = &chunk[n..];
if buf.len() >= BUFFER_SIZE {
assert_eq!(buf.len(), BUFFER_SIZE);
buf.extend_from_slice(&slice[..n]);
slice = &slice[n..];
if buf.pending() >= buf.cap() {
assert_eq!(buf.pending(), buf.cap());
self.flush().await?;
}
}
assert!(chunk.is_empty(), "by now we should have drained the chunk");
Ok(())
assert!(slice.is_empty(), "by now we should have drained the chunk");
Ok((chunk_len, chunk.into_inner()))
}
async fn flush(&mut self) -> std::io::Result<()> {
let buf = self.buf.take().expect("must not use after an error");
if buf.is_empty() {
let buf_len = buf.pending();
if buf_len == 0 {
self.buf = Some(buf);
return std::io::Result::Ok(());
return Ok(());
}
let buf_len = buf.len();
let (nwritten, mut buf) = self.writer.write_all(buf).await?;
let (nwritten, io_buf) = self.writer.write_all(buf.flush()).await?;
assert_eq!(nwritten, buf_len);
buf.clear();
self.buf = Some(buf);
self.buf = Some(Buffer::reuse_after_flush(io_buf));
Ok(())
}
}
/// A [`Buffer`] is used by [`BufferedWriter`] to batch smaller writes into larger ones.
pub trait Buffer {
type IoBuf: IoBuf;
/// Capacity of the buffer. Must not change over the lifetime `self`.`
fn cap(&self) -> usize;
/// Add data to the buffer.
/// Panics if there is not enough room to accomodate `other`'s content, i.e.,
/// panics if `other.len() > self.cap() - self.pending()`.
fn extend_from_slice(&mut self, other: &[u8]);
/// Number of bytes in the buffer.
fn pending(&self) -> usize;
/// Turns `self` into a [`tokio_epoll_uring::Slice`] of the pending data
/// so we can use [`tokio_epoll_uring`] to write it to disk.
fn flush(self) -> Slice<Self::IoBuf>;
/// After the write to disk is done and we have gotten back the slice,
/// [`BufferedWriter`] uses this method to re-use the io buffer.
fn reuse_after_flush(iobuf: Self::IoBuf) -> Self;
}
impl Buffer for BytesMut {
type IoBuf = BytesMut;
#[inline(always)]
fn cap(&self) -> usize {
self.capacity()
}
fn extend_from_slice(&mut self, other: &[u8]) {
BytesMut::extend_from_slice(self, other)
}
#[inline(always)]
fn pending(&self) -> usize {
self.len()
}
fn flush(self) -> Slice<BytesMut> {
if self.is_empty() {
return self.slice_full();
}
let len = self.len();
self.slice(0..len)
}
fn reuse_after_flush(mut iobuf: BytesMut) -> Self {
iobuf.clear();
iobuf
}
}
impl OwnedAsyncWriter for Vec<u8> {
async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self,
@@ -125,6 +186,8 @@ impl OwnedAsyncWriter for Vec<u8> {
#[cfg(test)]
mod tests {
use bytes::BytesMut;
use super::*;
#[derive(Default)]
@@ -158,7 +221,7 @@ mod tests {
#[tokio::test]
async fn test_buffered_writes_only() -> std::io::Result<()> {
let recorder = RecorderWriter::default();
let mut writer = BufferedWriter::<2, _>::new(recorder);
let mut writer = BufferedWriter::new(recorder, BytesMut::with_capacity(2));
write!(writer, b"a");
write!(writer, b"b");
write!(writer, b"c");
@@ -175,7 +238,7 @@ mod tests {
#[tokio::test]
async fn test_passthrough_writes_only() -> std::io::Result<()> {
let recorder = RecorderWriter::default();
let mut writer = BufferedWriter::<2, _>::new(recorder);
let mut writer = BufferedWriter::new(recorder, BytesMut::with_capacity(2));
write!(writer, b"abc");
write!(writer, b"de");
write!(writer, b"");
@@ -191,7 +254,7 @@ mod tests {
#[tokio::test]
async fn test_passthrough_write_with_nonempty_buffer() -> std::io::Result<()> {
let recorder = RecorderWriter::default();
let mut writer = BufferedWriter::<2, _>::new(recorder);
let mut writer = BufferedWriter::new(recorder, BytesMut::with_capacity(2));
write!(writer, b"a");
write!(writer, b"bc");
write!(writer, b"d");

View File

@@ -1034,7 +1034,7 @@ impl WalIngest {
let nblocks = modification
.tline
.get_rel_size(src_rel, Version::Modified(modification), true, ctx)
.get_rel_size(src_rel, Version::Modified(modification), ctx)
.await?;
let dst_rel = RelTag {
spcnode: tablespace_id,
@@ -1068,13 +1068,7 @@ impl WalIngest {
let content = modification
.tline
.get_rel_page_at_lsn(
src_rel,
blknum,
Version::Modified(modification),
true,
ctx,
)
.get_rel_page_at_lsn(src_rel, blknum, Version::Modified(modification), ctx)
.await?;
modification.put_rel_page_image(dst_rel, blknum, content)?;
num_blocks_copied += 1;
@@ -1242,7 +1236,7 @@ impl WalIngest {
};
if modification
.tline
.get_rel_exists(rel, Version::Modified(modification), true, ctx)
.get_rel_exists(rel, Version::Modified(modification), ctx)
.await?
{
self.put_rel_drop(modification, rel, ctx).await?;
@@ -1541,7 +1535,7 @@ impl WalIngest {
nblocks
} else if !modification
.tline
.get_rel_exists(rel, Version::Modified(modification), true, ctx)
.get_rel_exists(rel, Version::Modified(modification), ctx)
.await?
{
// create it with 0 size initially, the logic below will extend it
@@ -1553,7 +1547,7 @@ impl WalIngest {
} else {
modification
.tline
.get_rel_size(rel, Version::Modified(modification), true, ctx)
.get_rel_size(rel, Version::Modified(modification), ctx)
.await?
};
@@ -1650,14 +1644,14 @@ async fn get_relsize(
) -> anyhow::Result<BlockNumber> {
let nblocks = if !modification
.tline
.get_rel_exists(rel, Version::Modified(modification), true, ctx)
.get_rel_exists(rel, Version::Modified(modification), ctx)
.await?
{
0
} else {
modification
.tline
.get_rel_size(rel, Version::Modified(modification), true, ctx)
.get_rel_size(rel, Version::Modified(modification), ctx)
.await?
};
Ok(nblocks)
@@ -1732,29 +1726,29 @@ mod tests {
// The relation was created at LSN 2, not visible at LSN 1 yet.
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), false, &ctx)
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
.await?,
false
);
assert!(tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
.await
.is_err());
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
.await?,
true
);
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
.await?,
1
);
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
.await?,
3
);
@@ -1762,46 +1756,46 @@ mod tests {
// Check page contents at each LSN
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x20)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x20)), &ctx)
.await?,
test_img("foo blk 0 at 2")
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x30)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x30)), &ctx)
.await?,
test_img("foo blk 0 at 3")
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x40)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x40)), &ctx)
.await?,
test_img("foo blk 0 at 3")
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x40)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x40)), &ctx)
.await?,
test_img("foo blk 1 at 4")
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x50)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x50)), &ctx)
.await?,
test_img("foo blk 0 at 3")
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x50)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x50)), &ctx)
.await?,
test_img("foo blk 1 at 4")
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), &ctx)
.await?,
test_img("foo blk 2 at 5")
);
@@ -1817,19 +1811,19 @@ mod tests {
// Check reported size and contents after truncation
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), &ctx)
.await?,
2
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x60)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x60)), &ctx)
.await?,
test_img("foo blk 0 at 3")
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x60)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x60)), &ctx)
.await?,
test_img("foo blk 1 at 4")
);
@@ -1837,13 +1831,13 @@ mod tests {
// should still see the truncated block with older LSN
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
.await?,
3
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), &ctx)
.await?,
test_img("foo blk 2 at 5")
);
@@ -1856,7 +1850,7 @@ mod tests {
m.commit(&ctx).await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x68)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x68)), &ctx)
.await?,
0
);
@@ -1869,19 +1863,19 @@ mod tests {
m.commit(&ctx).await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x70)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x70)), &ctx)
.await?,
2
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x70)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x70)), &ctx)
.await?,
ZERO_PAGE
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x70)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x70)), &ctx)
.await?,
test_img("foo blk 1")
);
@@ -1894,21 +1888,21 @@ mod tests {
m.commit(&ctx).await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
.await?,
1501
);
for blk in 2..1500 {
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, blk, Version::Lsn(Lsn(0x80)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, blk, Version::Lsn(Lsn(0x80)), &ctx)
.await?,
ZERO_PAGE
);
}
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 1500, Version::Lsn(Lsn(0x80)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 1500, Version::Lsn(Lsn(0x80)), &ctx)
.await?,
test_img("foo blk 1500")
);
@@ -1935,13 +1929,13 @@ mod tests {
// Check that rel exists and size is correct
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
.await?,
true
);
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
.await?,
1
);
@@ -1954,7 +1948,7 @@ mod tests {
// Check that rel is not visible anymore
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x30)), false, &ctx)
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x30)), &ctx)
.await?,
false
);
@@ -1972,13 +1966,13 @@ mod tests {
// Check that rel exists and size is correct
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x40)), false, &ctx)
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x40)), &ctx)
.await?,
true
);
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x40)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x40)), &ctx)
.await?,
1
);
@@ -2011,24 +2005,24 @@ mod tests {
// The relation was created at LSN 20, not visible at LSN 1 yet.
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), false, &ctx)
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
.await?,
false
);
assert!(tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
.await
.is_err());
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
.await?,
true
);
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
.await?,
relsize
);
@@ -2039,7 +2033,7 @@ mod tests {
let data = format!("foo blk {} at {}", blkno, lsn);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(lsn), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(lsn), &ctx)
.await?,
test_img(&data)
);
@@ -2056,7 +2050,7 @@ mod tests {
// Check reported size and contents after truncation
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), &ctx)
.await?,
1
);
@@ -2066,7 +2060,7 @@ mod tests {
let data = format!("foo blk {} at {}", blkno, lsn);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x60)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x60)), &ctx)
.await?,
test_img(&data)
);
@@ -2075,7 +2069,7 @@ mod tests {
// should still see all blocks with older LSN
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
.await?,
relsize
);
@@ -2084,7 +2078,7 @@ mod tests {
let data = format!("foo blk {} at {}", blkno, lsn);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x50)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x50)), &ctx)
.await?,
test_img(&data)
);
@@ -2104,13 +2098,13 @@ mod tests {
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x80)), false, &ctx)
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
.await?,
true
);
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
.await?,
relsize
);
@@ -2120,7 +2114,7 @@ mod tests {
let data = format!("foo blk {} at {}", blkno, lsn);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x80)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x80)), &ctx)
.await?,
test_img(&data)
);
@@ -2154,7 +2148,7 @@ mod tests {
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
.await?,
RELSEG_SIZE + 1
);
@@ -2168,7 +2162,7 @@ mod tests {
m.commit(&ctx).await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
.await?,
RELSEG_SIZE
);
@@ -2183,7 +2177,7 @@ mod tests {
m.commit(&ctx).await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
.await?,
RELSEG_SIZE - 1
);
@@ -2201,7 +2195,7 @@ mod tests {
m.commit(&ctx).await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
.await?,
size as BlockNumber
);

View File

@@ -49,6 +49,8 @@ char *neon_auth_token;
int readahead_buffer_size = 128;
int flush_every_n_requests = 8;
int neon_protocol_version = 1;
static int n_reconnect_attempts = 0;
static int max_reconnect_attempts = 60;
static int stripe_size;
@@ -379,7 +381,17 @@ pageserver_connect(shardno_t shard_no, int elevel)
pfree(msg);
return false;
}
query = psprintf("pagestream %s %s", neon_tenant, neon_timeline);
switch (neon_protocol_version)
{
case 2:
query = psprintf("pagestream_v2 %s %s", neon_tenant, neon_timeline);
break;
case 1:
query = psprintf("pagestream %s %s", neon_tenant, neon_timeline);
break;
default:
elog(ERROR, "unexpected neon_protocol_version %d", neon_protocol_version);
}
ret = PQsendQuery(conn, query);
pfree(query);
if (ret != 1)
@@ -440,7 +452,7 @@ pageserver_connect(shardno_t shard_no, int elevel)
return false;
}
neon_shard_log(shard_no, LOG, "libpagestore: connected to '%s'", connstr);
neon_shard_log(shard_no, LOG, "libpagestore: connected to '%s' with protocol version %d", connstr, neon_protocol_version);
page_servers[shard_no].conn = conn;
page_servers[shard_no].wes = wes;
@@ -844,6 +856,16 @@ pg_init_libpagestore(void)
PGC_USERSET,
0, /* no flags required */
NULL, (GucIntAssignHook) &readahead_buffer_resize, NULL);
DefineCustomIntVariable("neon.protocol_version",
"Version of compute<->page server protocol",
NULL,
&neon_protocol_version,
1, /* default to old protocol for now */
1, /* min */
2, /* max */
PGC_SU_BACKEND,
0, /* no flags required */
NULL, NULL, NULL);
relsize_hash_init();

View File

@@ -69,18 +69,33 @@ typedef enum {
SLRU_MULTIXACT_OFFSETS
} SlruKind;
/*
* supertype of all the Neon*Request structs below
/*--
* supertype of all the Neon*Request structs below.
*
* If 'latest' is true, we are requesting the latest page version, and 'lsn'
* is just a hint to the server that we know there are no versions of the page
* (or relation size, for exists/nblocks requests) later than the 'lsn'.
* All requests contain two LSNs:
*
* lsn: request page (or relation size, etc) at this LSN
* not_modified_since: Hint that the page hasn't been modified between
* this LSN and the request LSN (`lsn`).
*
* To request the latest version of a page, you can use MAX_LSN as the request
* LSN.
*
* If you don't know any better, you can always set 'not_modified_since' equal
* to 'lsn', but providing a lower value can speed up processing the request
* in the pageserver, as it doesn't need to wait for the WAL to arrive, and it
* can skip traversing through recent layers which we know to not contain any
* versions for the requested page.
*
* These structs describe the V2 of these requests. The old V1 protocol contained
* just one LSN and a boolean 'latest' flag. If the neon_protocol_version GUC is
* set to 1, we will convert these to the V1 requests before sending.
*/
typedef struct
{
NeonMessageTag tag;
bool latest; /* if true, request latest page version */
XLogRecPtr lsn; /* request page version @ this LSN */
XLogRecPtr lsn;
XLogRecPtr not_modified_since;
} NeonRequest;
typedef struct
@@ -193,6 +208,7 @@ extern int readahead_buffer_size;
extern char *neon_timeline;
extern char *neon_tenant;
extern int32 max_cluster_size;
extern int neon_protocol_version;
extern shardno_t get_shard_number(BufferTag* tag);
@@ -225,14 +241,14 @@ extern bool neon_prefetch(SMgrRelation reln, ForkNumber forknum,
extern void neon_read(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
char *buffer);
extern PGDLLEXPORT void neon_read_at_lsn(NRelFileInfo rnode, ForkNumber forkNum, BlockNumber blkno,
XLogRecPtr request_lsn, bool request_latest, char *buffer);
XLogRecPtr request_lsn, XLogRecPtr not_modified_since, char *buffer);
extern void neon_write(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, char *buffer, bool skipFsync);
#else
extern void neon_read(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
void *buffer);
extern PGDLLEXPORT void neon_read_at_lsn(NRelFileInfo rnode, ForkNumber forkNum, BlockNumber blkno,
XLogRecPtr request_lsn, bool request_latest, void *buffer);
XLogRecPtr request_lsn, XLogRecPtr not_modified_since, void *buffer);
extern void neon_write(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, const void *buffer, bool skipFsync);
#endif

View File

@@ -168,8 +168,8 @@ typedef enum PrefetchStatus
typedef struct PrefetchRequest
{
BufferTag buftag; /* must be first entry in the struct */
XLogRecPtr effective_request_lsn;
XLogRecPtr actual_request_lsn;
XLogRecPtr request_lsn;
XLogRecPtr not_modified_since;
NeonResponse *response; /* may be null */
PrefetchStatus status;
shardno_t shard_no;
@@ -269,19 +269,19 @@ static PrefetchState *MyPState;
) \
)
static XLogRecPtr prefetch_lsn = 0;
static bool compact_prefetch_buffers(void);
static void consume_prefetch_responses(void);
static uint64 prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_lsn);
static uint64 prefetch_register_buffer(BufferTag tag, XLogRecPtr *force_request_lsn, XLogRecPtr *force_not_modified_since);
static bool prefetch_read(PrefetchRequest *slot);
static void prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force_lsn);
static void prefetch_do_request(PrefetchRequest *slot, XLogRecPtr *force_request_lsn, XLogRecPtr *force_not_modified_since);
static bool prefetch_wait_for(uint64 ring_index);
static void prefetch_cleanup_trailing_unused(void);
static inline void prefetch_set_unused(uint64 ring_index);
static XLogRecPtr neon_get_request_lsn(bool *latest, NRelFileInfo rinfo,
ForkNumber forknum, BlockNumber blkno);
static void neon_get_request_lsn(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
XLogRecPtr *request_lsn, XLogRecPtr *not_modified_since);
static bool neon_prefetch_response_usable(XLogRecPtr request_lsn, XLogRecPtr not_modified_since,
PrefetchRequest *slot);
static bool
compact_prefetch_buffers(void)
@@ -338,8 +338,8 @@ compact_prefetch_buffers(void)
target_slot->shard_no = source_slot->shard_no;
target_slot->status = source_slot->status;
target_slot->response = source_slot->response;
target_slot->effective_request_lsn = source_slot->effective_request_lsn;
target_slot->actual_request_lsn = source_slot->actual_request_lsn;
target_slot->request_lsn = source_slot->request_lsn;
target_slot->not_modified_since = source_slot->not_modified_since;
target_slot->my_ring_index = empty_ring_index;
prfh_delete(MyPState->prf_hash, source_slot);
@@ -358,7 +358,8 @@ compact_prefetch_buffers(void)
};
source_slot->response = NULL;
source_slot->my_ring_index = 0;
source_slot->effective_request_lsn = 0;
source_slot->request_lsn = InvalidXLogRecPtr;
source_slot->not_modified_since = InvalidXLogRecPtr;
/* update bookkeeping */
n_moved++;
@@ -683,56 +684,39 @@ prefetch_set_unused(uint64 ring_index)
compact_prefetch_buffers();
}
/*
* Send one prefetch request to the pageserver. To wait for the response, call
* prefetch_wait_for().
*/
static void
prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force_lsn)
prefetch_do_request(PrefetchRequest *slot, XLogRecPtr *force_request_lsn, XLogRecPtr *force_not_modified_since)
{
bool found;
NeonGetPageRequest request = {
.req.tag = T_NeonGetPageRequest,
.req.latest = false,
.req.lsn = 0,
/* lsn and not_modified_since are filled in below */
.rinfo = BufTagGetNRelFileInfo(slot->buftag),
.forknum = slot->buftag.forkNum,
.blkno = slot->buftag.blockNum,
};
if (force_lsn && force_latest)
Assert(((force_request_lsn != NULL) == (force_not_modified_since != NULL)));
if (force_request_lsn)
{
request.req.lsn = *force_lsn;
request.req.latest = *force_latest;
slot->actual_request_lsn = slot->effective_request_lsn = *force_lsn;
request.req.lsn = *force_request_lsn;
request.req.not_modified_since = *force_not_modified_since;
}
else
{
XLogRecPtr lsn = neon_get_request_lsn(
&request.req.latest,
BufTagGetNRelFileInfo(slot->buftag),
slot->buftag.forkNum,
slot->buftag.blockNum
);
/*
* Note: effective_request_lsn is potentially higher than the
* requested LSN, but still correct:
*
* We know there are no changes between the actual requested LSN and
* the value of effective_request_lsn: If there were, the page would
* have been in cache and evicted between those LSN values, which then
* would have had to result in a larger request LSN for this page.
*
* It is possible that a concurrent backend loads the page, modifies
* it and then evicts it again, but the LSN of that eviction cannot be
* smaller than the current WAL insert/redo pointer, which is already
* larger than this prefetch_lsn. So in any case, that would
* invalidate this cache.
*
* The best LSN to use for effective_request_lsn would be
* XLogCtl->Insert.RedoRecPtr, but that's expensive to access.
*/
slot->actual_request_lsn = request.req.lsn = lsn;
prefetch_lsn = Max(prefetch_lsn, lsn);
slot->effective_request_lsn = prefetch_lsn;
neon_get_request_lsn(BufTagGetNRelFileInfo(slot->buftag),
slot->buftag.forkNum,
slot->buftag.blockNum,
&request.req.lsn,
&request.req.not_modified_since);
}
slot->request_lsn = request.req.lsn;
slot->not_modified_since = request.req.not_modified_since;
Assert(slot->response == NULL);
Assert(slot->my_ring_index == MyPState->ring_unused);
@@ -749,7 +733,6 @@ prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force
/* update slot state */
slot->status = PRFS_REQUESTED;
prfh_insert(MyPState->prf_hash, slot, &found);
Assert(!found);
}
@@ -759,22 +742,25 @@ prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force
*
* Register that we may want the contents of BufferTag in the near future.
*
* If force_latest and force_lsn are not NULL, those values are sent to the
* pageserver. If they are NULL, we utilize the lastWrittenLsn -infrastructure
* to fill in these values manually.
* If force_request_lsn and force_not_modified_since are not NULL, those
* values are sent to the pageserver. If they are NULL, we utilize the
* lastWrittenLsn -infrastructure to fill them in.
*
* NOTE: this function may indirectly update MyPState->pfs_hash; which
* invalidates any active pointers into the hash table.
*/
static uint64
prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_lsn)
prefetch_register_buffer(BufferTag tag, XLogRecPtr *force_request_lsn,
XLogRecPtr *force_not_modified_since)
{
uint64 ring_index;
PrefetchRequest req;
PrefetchRequest *slot;
PrfHashEntry *entry;
Assert(((force_request_lsn != NULL) == (force_not_modified_since != NULL)));
/* use an intermediate PrefetchRequest struct to ensure correct alignment */
req.buftag = tag;
Retry:
@@ -792,40 +778,19 @@ Retry:
Assert(BUFFERTAGS_EQUAL(slot->buftag, tag));
/*
* If we want a specific lsn, we do not accept requests that were made
* with a potentially different LSN.
* If the caller specified a request LSN to use, only accept prefetch
* responses that satisfy that request.
*/
if (force_latest && force_lsn)
if (force_request_lsn)
{
/*
* if we want the latest version, any effective_request_lsn <
* request lsn is OK
*/
if (*force_latest)
if (!neon_prefetch_response_usable(*force_request_lsn,
*force_not_modified_since, slot))
{
if (*force_lsn > slot->effective_request_lsn)
{
if (!prefetch_wait_for(ring_index))
goto Retry;
prefetch_set_unused(ring_index);
entry = NULL;
}
}
/*
* if we don't want the latest version, only accept requests with
* the exact same LSN
*/
else
{
if (*force_lsn != slot->effective_request_lsn)
{
if (!prefetch_wait_for(ring_index))
goto Retry;
prefetch_set_unused(ring_index);
entry = NULL;
}
/* Wait for the old request to finish and discard it */
if (!prefetch_wait_for(ring_index))
goto Retry;
prefetch_set_unused(ring_index);
entry = NULL;
}
}
@@ -921,7 +886,7 @@ Retry:
slot->shard_no = get_shard_number(&tag);
slot->my_ring_index = ring_index;
prefetch_do_request(slot, force_latest, force_lsn);
prefetch_do_request(slot, force_request_lsn, force_not_modified_since);
Assert(slot->status == PRFS_REQUESTED);
Assert(MyPState->ring_last <= ring_index &&
ring_index < MyPState->ring_unused);
@@ -950,7 +915,7 @@ page_server_request(void const *req)
BufferTag tag = {0};
shardno_t shard_no;
switch (((NeonRequest *) req)->tag)
switch (messageTag(req))
{
case T_NeonExistsRequest:
CopyNRelFileInfoToBufTag(tag, ((NeonExistsRequest *) req)->rinfo);
@@ -966,11 +931,10 @@ page_server_request(void const *req)
tag.blockNum = ((NeonGetPageRequest *) req)->blkno;
break;
default:
neon_log(ERROR, "Unexpected request tag: %d", ((NeonRequest *) req)->tag);
neon_log(ERROR, "Unexpected request tag: %d", messageTag(req));
}
shard_no = get_shard_number(&tag);
/*
* Current sharding model assumes that all metadata is present only at shard 0.
* We still need to call get_shard_no() to check if shard map is up-to-date.
@@ -997,8 +961,52 @@ nm_pack_request(NeonRequest *msg)
StringInfoData s;
initStringInfo(&s);
pq_sendbyte(&s, msg->tag);
if (neon_protocol_version >= 2)
{
pq_sendbyte(&s, msg->tag);
pq_sendint64(&s, msg->lsn);
pq_sendint64(&s, msg->not_modified_since);
}
else
{
bool latest;
XLogRecPtr lsn;
/*
* In primary, we always request the latest page version.
*/
if (!RecoveryInProgress())
{
latest = true;
lsn = msg->not_modified_since;
}
else
{
/*
* In the protocol V1, we cannot represent that we want to read
* page at LSN X, and we know that it hasn't been modified since
* Y. We can either use 'not_modified_lsn' as the request LSN, and
* risk getting an error if that LSN is too old and has already
* fallen out of the pageserver's GC horizon, or we can send
* 'request_lsn', causing the pageserver to possibly wait for the
* recent WAL to arrive unnecessarily. Or something in between. We
* choose to use the old LSN and risk GC errors, because that's
* what we've done historically.
*/
latest = false;
lsn = msg->not_modified_since;
}
pq_sendbyte(&s, msg->tag);
pq_sendbyte(&s, latest);
pq_sendint64(&s, lsn);
}
/*
* The rest of the request messages are the same between protocol V1 and
* V2
*/
switch (messageTag(msg))
{
/* pagestore_client -> pagestore */
@@ -1006,8 +1014,6 @@ nm_pack_request(NeonRequest *msg)
{
NeonExistsRequest *msg_req = (NeonExistsRequest *) msg;
pq_sendbyte(&s, msg_req->req.latest);
pq_sendint64(&s, msg_req->req.lsn);
pq_sendint32(&s, NInfoGetSpcOid(msg_req->rinfo));
pq_sendint32(&s, NInfoGetDbOid(msg_req->rinfo));
pq_sendint32(&s, NInfoGetRelNumber(msg_req->rinfo));
@@ -1019,8 +1025,6 @@ nm_pack_request(NeonRequest *msg)
{
NeonNblocksRequest *msg_req = (NeonNblocksRequest *) msg;
pq_sendbyte(&s, msg_req->req.latest);
pq_sendint64(&s, msg_req->req.lsn);
pq_sendint32(&s, NInfoGetSpcOid(msg_req->rinfo));
pq_sendint32(&s, NInfoGetDbOid(msg_req->rinfo));
pq_sendint32(&s, NInfoGetRelNumber(msg_req->rinfo));
@@ -1032,8 +1036,6 @@ nm_pack_request(NeonRequest *msg)
{
NeonDbSizeRequest *msg_req = (NeonDbSizeRequest *) msg;
pq_sendbyte(&s, msg_req->req.latest);
pq_sendint64(&s, msg_req->req.lsn);
pq_sendint32(&s, msg_req->dbNode);
break;
@@ -1042,8 +1044,6 @@ nm_pack_request(NeonRequest *msg)
{
NeonGetPageRequest *msg_req = (NeonGetPageRequest *) msg;
pq_sendbyte(&s, msg_req->req.latest);
pq_sendint64(&s, msg_req->req.lsn);
pq_sendint32(&s, NInfoGetSpcOid(msg_req->rinfo));
pq_sendint32(&s, NInfoGetDbOid(msg_req->rinfo));
pq_sendint32(&s, NInfoGetRelNumber(msg_req->rinfo));
@@ -1057,8 +1057,6 @@ nm_pack_request(NeonRequest *msg)
{
NeonGetSlruSegmentRequest *msg_req = (NeonGetSlruSegmentRequest *) msg;
pq_sendbyte(&s, msg_req->req.latest);
pq_sendint64(&s, msg_req->req.lsn);
pq_sendbyte(&s, msg_req->kind);
pq_sendint32(&s, msg_req->segno);
@@ -1209,7 +1207,7 @@ nm_to_string(NeonMessage *msg)
appendStringInfo(&s, ", \"rinfo\": \"%u/%u/%u\"", RelFileInfoFmt(msg_req->rinfo));
appendStringInfo(&s, ", \"forknum\": %d", msg_req->forknum);
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest);
appendStringInfo(&s, ", \"not_modified_since\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.not_modified_since));
appendStringInfoChar(&s, '}');
break;
}
@@ -1222,7 +1220,7 @@ nm_to_string(NeonMessage *msg)
appendStringInfo(&s, ", \"rinfo\": \"%u/%u/%u\"", RelFileInfoFmt(msg_req->rinfo));
appendStringInfo(&s, ", \"forknum\": %d", msg_req->forknum);
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest);
appendStringInfo(&s, ", \"not_modified_since\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.not_modified_since));
appendStringInfoChar(&s, '}');
break;
}
@@ -1236,7 +1234,7 @@ nm_to_string(NeonMessage *msg)
appendStringInfo(&s, ", \"forknum\": %d", msg_req->forknum);
appendStringInfo(&s, ", \"blkno\": %u", msg_req->blkno);
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest);
appendStringInfo(&s, ", \"not_modified_since\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.not_modified_since));
appendStringInfoChar(&s, '}');
break;
}
@@ -1247,7 +1245,7 @@ nm_to_string(NeonMessage *msg)
appendStringInfoString(&s, "{\"type\": \"NeonDbSizeRequest\"");
appendStringInfo(&s, ", \"dbnode\": \"%u\"", msg_req->dbNode);
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest);
appendStringInfo(&s, ", \"not_modified_since\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.not_modified_since));
appendStringInfoChar(&s, '}');
break;
}
@@ -1259,7 +1257,7 @@ nm_to_string(NeonMessage *msg)
appendStringInfo(&s, ", \"kind\": %u", msg_req->kind);
appendStringInfo(&s, ", \"segno\": %u", msg_req->segno);
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest);
appendStringInfo(&s, ", \"not_modified_since\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.not_modified_since));
appendStringInfoChar(&s, '}');
break;
}
@@ -1531,44 +1529,38 @@ nm_adjust_lsn(XLogRecPtr lsn)
/*
* Return LSN for requesting pages and number of blocks from page server
*/
static XLogRecPtr
neon_get_request_lsn(bool *latest, NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno)
static void
neon_get_request_lsn(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
XLogRecPtr *request_lsn, XLogRecPtr *not_modified_since)
{
XLogRecPtr lsn;
XLogRecPtr last_written_lsn;
last_written_lsn = GetLastWrittenLSN(rinfo, forknum, blkno);
last_written_lsn = nm_adjust_lsn(last_written_lsn);
Assert(last_written_lsn != InvalidXLogRecPtr);
if (RecoveryInProgress())
{
/*
* We don't know if WAL has been generated but not yet replayed, so
* we're conservative in our estimates about latest pages.
*/
*latest = false;
/* Request the page at the last replayed LSN. */
*request_lsn = GetXLogReplayRecPtr(NULL);
*not_modified_since = last_written_lsn;
Assert(last_written_lsn <= *request_lsn);
/*
* Get the last written LSN of this page.
*/
lsn = GetLastWrittenLSN(rinfo, forknum, blkno);
lsn = nm_adjust_lsn(lsn);
neon_log(DEBUG1, "neon_get_request_lsn GetXLogReplayRecPtr %X/%X request lsn 0 ",
(uint32) ((lsn) >> 32), (uint32) (lsn));
neon_log(DEBUG1, "neon_get_request_lsn request lsn %X/%X, not_modified_since %X/%X",
LSN_FORMAT_ARGS(*request_lsn), LSN_FORMAT_ARGS(*not_modified_since));
}
else
{
XLogRecPtr flushlsn;
/*
* Use the latest LSN that was evicted from the buffer cache. Any
* pages modified by later WAL records must still in the buffer cache,
* so our request cannot concern those.
* Use the latest LSN that was evicted from the buffer cache as the
* 'not_modified_since' hint. Any pages modified by later WAL records
* must still in the buffer cache, so our request cannot concern
* those.
*/
*latest = true;
lsn = GetLastWrittenLSN(rinfo, forknum, blkno);
Assert(lsn != InvalidXLogRecPtr);
neon_log(DEBUG1, "neon_get_request_lsn GetLastWrittenLSN lsn %X/%X ",
(uint32) ((lsn) >> 32), (uint32) (lsn));
lsn = nm_adjust_lsn(lsn);
LSN_FORMAT_ARGS(last_written_lsn));
/*
* Is it possible that the last-written LSN is ahead of last flush
@@ -1583,16 +1575,109 @@ neon_get_request_lsn(bool *latest, NRelFileInfo rinfo, ForkNumber forknum, Block
#else
flushlsn = GetFlushRecPtr();
#endif
if (lsn > flushlsn)
if (last_written_lsn > flushlsn)
{
neon_log(DEBUG5, "last-written LSN %X/%X is ahead of last flushed LSN %X/%X",
(uint32) (lsn >> 32), (uint32) lsn,
(uint32) (flushlsn >> 32), (uint32) flushlsn);
XLogFlush(lsn);
LSN_FORMAT_ARGS(last_written_lsn),
LSN_FORMAT_ARGS(flushlsn));
XLogFlush(last_written_lsn);
flushlsn = last_written_lsn;
}
/*
* Request the latest version of the page. The most up-to-date request
* LSN we could use would be the current insert LSN, but to avoid the
* overhead of looking it up, use 'flushlsn' instead. This relies on
* the assumption that if the page was modified since the last WAL
* flush, it should still be in the buffer cache, and we wouldn't be
* requesting it.
*/
*request_lsn = flushlsn;
*not_modified_since = last_written_lsn;
}
}
/*
* neon_prefetch_response_usable -- Can a new request be satisfied by old one?
*
* This is used to check if the response to a prefetch request can be used to
* satisfy a page read now.
*/
static bool
neon_prefetch_response_usable(XLogRecPtr request_lsn, XLogRecPtr not_modified_since,
PrefetchRequest *slot)
{
/* sanity check the LSN's on the old and the new request */
Assert(request_lsn >= not_modified_since);
Assert(slot->request_lsn >= slot->not_modified_since);
Assert(slot->status != PRFS_UNUSED);
/*
* The new request's LSN should never be older than the old one. This
* could be an Assert, except that for testing purposes, we do provide an
* interface in neon_test_utils to fetch pages at arbitary LSNs, which
* violates this.
*
* Similarly, the not_modified_since value calculated for a page should
* never move backwards. This assumption is a bit fragile; if we updated
* the last-written cache when we read in a page, for example, then it
* might. But as the code stands, it should not.
*
* (If two backends issue a request at the same time, they might race and
* calculate LSNs "out of order" with each other, but the prefetch queue
* is backend-private at the moment.)
*/
if (request_lsn < slot->request_lsn || not_modified_since < slot->not_modified_since)
{
ereport(LOG,
(errcode(ERRCODE_IO_ERROR),
errmsg(NEON_TAG "request with unexpected LSN after prefetch"),
errdetail("Request %X/%X not_modified_since %X/%X, prefetch %X/%X not_modified_since %X/%X)",
LSN_FORMAT_ARGS(request_lsn), LSN_FORMAT_ARGS(not_modified_since),
LSN_FORMAT_ARGS(slot->request_lsn), LSN_FORMAT_ARGS(slot->not_modified_since))));
return false;
}
return lsn;
/*---
* Each request to the pageserver carries two LSN values:
* `not_modified_since` and `request_lsn`. The (not_modified_since,
* request_lsn] range of each request is effectively a claim that the page
* has not been modified between those LSNs. If the range of the old
* request in the queue overlaps with the new request, we know that the
* page hasn't been modified in the union of the ranges. We can use the
* response to old request to satisfy the new request in that case. For
* example:
*
* 100 500
* Old request: +--------+
*
* 400 800
* New request: +--------+
*
* The old request claims that the page was not modified between LSNs 100
* and 500, and the second claims that it was not modified between 400 and
* 800. Together they mean that the page was not modified between 100 and
* 800. Therefore the response to the old request is also valid for the
* new request.
*
* This logic also holds at the boundary case that the old request's LSN
* matches the new request's not_modified_since LSN exactly:
*
* 100 500
* Old request: +--------+
*
* 500 900
* New request: +--------+
*
* The response to the old request is the page as it was at LSN 500, and
* the page hasn't been changed in the range (500, 900], therefore the
* response is valid also for the new request.
*/
/* this follows from the checks above */
Assert(request_lsn >= slot->not_modified_since);
return not_modified_since <= slot->request_lsn;
}
/*
@@ -1604,8 +1689,8 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum)
bool exists;
NeonResponse *resp;
BlockNumber n_blocks;
bool latest;
XLogRecPtr request_lsn;
XLogRecPtr not_modified_since;
switch (reln->smgr_relpersistence)
{
@@ -1660,12 +1745,13 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum)
return false;
}
request_lsn = neon_get_request_lsn(&latest, InfoFromSMgrRel(reln), forkNum, REL_METADATA_PSEUDO_BLOCKNO);
neon_get_request_lsn(InfoFromSMgrRel(reln), forkNum, REL_METADATA_PSEUDO_BLOCKNO,
&request_lsn, &not_modified_since);
{
NeonExistsRequest request = {
.req.tag = T_NeonExistsRequest,
.req.latest = latest,
.req.lsn = request_lsn,
.req.not_modified_since = not_modified_since,
.rinfo = InfoFromSMgrRel(reln),
.forknum = forkNum};
@@ -2102,10 +2188,10 @@ neon_writeback(SMgrRelation reln, ForkNumber forknum,
void
#if PG_MAJORVERSION_NUM < 16
neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
XLogRecPtr request_lsn, bool request_latest, char *buffer)
XLogRecPtr request_lsn, XLogRecPtr not_modified_since, char *buffer)
#else
neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
XLogRecPtr request_lsn, bool request_latest, void *buffer)
XLogRecPtr request_lsn, XLogRecPtr not_modified_since, void *buffer)
#endif
{
NeonResponse *resp;
@@ -2148,15 +2234,16 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
if (entry != NULL)
{
slot = entry->slot;
if (slot->effective_request_lsn >= request_lsn)
if (neon_prefetch_response_usable(request_lsn, not_modified_since, slot))
{
ring_index = slot->my_ring_index;
pgBufferUsage.prefetch.hits += 1;
}
else /* the current prefetch LSN is not large
* enough, so drop the prefetch */
else
{
/*
* Cannot use this prefetch, discard it
*
* We can't drop cache for not-yet-received requested items. It is
* unlikely this happens, but it can happen if prefetch distance
* is large enough and a backend didn't consume all prefetch
@@ -2181,8 +2268,8 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
{
pgBufferUsage.prefetch.misses += 1;
ring_index = prefetch_register_buffer(buftag, &request_latest,
&request_lsn);
ring_index = prefetch_register_buffer(buftag, &request_lsn,
&not_modified_since);
slot = GetPrfSlot(ring_index);
}
else
@@ -2246,8 +2333,8 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, char *buffer
neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer)
#endif
{
bool latest;
XLogRecPtr request_lsn;
XLogRecPtr not_modified_since;
switch (reln->smgr_relpersistence)
{
@@ -2272,8 +2359,9 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
return;
}
request_lsn = neon_get_request_lsn(&latest, InfoFromSMgrRel(reln), forkNum, blkno);
neon_read_at_lsn(InfoFromSMgrRel(reln), forkNum, blkno, request_lsn, latest, buffer);
neon_get_request_lsn(InfoFromSMgrRel(reln), forkNum, blkno,
&request_lsn, &not_modified_since);
neon_read_at_lsn(InfoFromSMgrRel(reln), forkNum, blkno, request_lsn, not_modified_since, buffer);
#ifdef DEBUG_COMPARE_LOCAL
if (forkNum == MAIN_FORKNUM && IS_LOCAL_REL(reln))
@@ -2442,8 +2530,8 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
{
NeonResponse *resp;
BlockNumber n_blocks;
bool latest;
XLogRecPtr request_lsn;
XLogRecPtr not_modified_since;
switch (reln->smgr_relpersistence)
{
@@ -2470,12 +2558,13 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
return n_blocks;
}
request_lsn = neon_get_request_lsn(&latest, InfoFromSMgrRel(reln), forknum, REL_METADATA_PSEUDO_BLOCKNO);
neon_get_request_lsn(InfoFromSMgrRel(reln), forknum, REL_METADATA_PSEUDO_BLOCKNO,
&request_lsn, &not_modified_since);
{
NeonNblocksRequest request = {
.req.tag = T_NeonNblocksRequest,
.req.latest = latest,
.req.lsn = request_lsn,
.req.not_modified_since = not_modified_since,
.rinfo = InfoFromSMgrRel(reln),
.forknum = forknum,
};
@@ -2523,16 +2612,17 @@ neon_dbsize(Oid dbNode)
{
NeonResponse *resp;
int64 db_size;
XLogRecPtr request_lsn;
bool latest;
XLogRecPtr request_lsn,
not_modified_since;
NRelFileInfo dummy_node = {0};
request_lsn = neon_get_request_lsn(&latest, dummy_node, MAIN_FORKNUM, REL_METADATA_PSEUDO_BLOCKNO);
neon_get_request_lsn(dummy_node, MAIN_FORKNUM, REL_METADATA_PSEUDO_BLOCKNO,
&request_lsn, &not_modified_since);
{
NeonDbSizeRequest request = {
.req.tag = T_NeonDbSizeRequest,
.req.latest = latest,
.req.lsn = request_lsn,
.req.not_modified_since = not_modified_since,
.dbNode = dbNode,
};
@@ -2605,7 +2695,6 @@ neon_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks)
* the most recently inserted WAL record's LSN.
*/
lsn = GetXLogInsertRecPtr();
lsn = nm_adjust_lsn(lsn);
/*
@@ -2805,14 +2894,33 @@ neon_end_unlogged_build(SMgrRelation reln)
static int
neon_read_slru_segment(SMgrRelation reln, const char* path, int segno, void* buffer)
{
XLogRecPtr request_lsn;
/*
* GetRedoStartLsn() returns LSN of basebackup.
* We need to download SLRU segments only once after node startup,
* then SLRUs are maintained locally.
*/
request_lsn = GetRedoStartLsn();
XLogRecPtr request_lsn,
not_modified_since;
if (RecoveryInProgress())
{
request_lsn = GetXLogReplayRecPtr(NULL);
if (request_lsn == InvalidXLogRecPtr)
{
/*
* This happens in neon startup, we start up without replaying any
* records.
*/
request_lsn = GetRedoStartLsn();
}
}
else
request_lsn = GetXLogInsertRecPtr();
request_lsn = nm_adjust_lsn(request_lsn);
/*
* GetRedoStartLsn() returns LSN of basebackup. We know that the SLRU
* segment has not changed since the basebackup, because in order to
* modify it, we would have had to download it already. And once
* downloaded, we never evict SLRU segments from local disk.
*/
not_modified_since = GetRedoStartLsn();
SlruKind kind;
if (STRPREFIX(path, "pg_xact"))
@@ -2827,8 +2935,8 @@ neon_read_slru_segment(SMgrRelation reln, const char* path, int segno, void* buf
NeonResponse *resp;
NeonGetSlruSegmentRequest request = {
.req.tag = T_NeonGetSlruSegmentRequest,
.req.latest = false,
.req.lsn = request_lsn,
.req.not_modified_since = not_modified_since,
.kind = kind,
.segno = segno
@@ -2956,6 +3064,9 @@ neon_extend_rel_size(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
{
BlockNumber relsize;
/* This is only used in WAL replay */
Assert(RecoveryInProgress());
/* Extend the relation if we know its size */
if (get_cached_relsize(rinfo, forknum, &relsize))
{
@@ -2974,14 +3085,13 @@ neon_extend_rel_size(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
* This length is later reused when we open the smgr to read the
* block, which is fine and expected.
*/
NeonResponse *response;
NeonNblocksResponse *nbresponse;
NeonNblocksRequest request = {
.req = (NeonRequest) {
.lsn = end_recptr,
.latest = false,
.tag = T_NeonNblocksRequest,
.lsn = end_recptr,
.not_modified_since = end_recptr,
},
.rinfo = rinfo,
.forknum = forknum,

View File

@@ -7,7 +7,7 @@ OBJS = \
neontest.o
EXTENSION = neon_test_utils
DATA = neon_test_utils--1.0.sql
DATA = neon_test_utils--1.1.sql
PGFILEDESC = "neon_test_utils - helpers for neon testing and debugging"
PG_CONFIG = pg_config

View File

@@ -31,12 +31,12 @@ AS 'MODULE_PATHNAME', 'clear_buffer_cache'
LANGUAGE C STRICT
PARALLEL UNSAFE;
CREATE FUNCTION get_raw_page_at_lsn(relname text, forkname text, blocknum int8, lsn pg_lsn)
CREATE FUNCTION get_raw_page_at_lsn(relname text, forkname text, blocknum int8, request_lsn pg_lsn, not_modified_since pg_lsn)
RETURNS bytea
AS 'MODULE_PATHNAME', 'get_raw_page_at_lsn'
LANGUAGE C PARALLEL UNSAFE;
CREATE FUNCTION get_raw_page_at_lsn(tbspc oid, db oid, relfilenode oid, forknum int8, blocknum int8, lsn pg_lsn)
CREATE FUNCTION get_raw_page_at_lsn(tbspc oid, db oid, relfilenode oid, forknum int8, blocknum int8, request_lsn pg_lsn, not_modified_since pg_lsn)
RETURNS bytea
AS 'MODULE_PATHNAME', 'get_raw_page_at_lsn_ex'
LANGUAGE C PARALLEL UNSAFE;

View File

@@ -1,6 +1,6 @@
# neon_test_utils extension
comment = 'helpers for neon testing and debugging'
default_version = '1.0'
default_version = '1.1'
module_pathname = '$libdir/neon_test_utils'
relocatable = true
trusted = true

View File

@@ -48,10 +48,10 @@ PG_FUNCTION_INFO_V1(neon_xlogflush);
*/
#if PG_MAJORVERSION_NUM < 16
typedef void (*neon_read_at_lsn_type) (NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
XLogRecPtr request_lsn, bool request_latest, char *buffer);
XLogRecPtr request_lsn, XLogRecPtr not_modified_since, char *buffer);
#else
typedef void (*neon_read_at_lsn_type) (NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
XLogRecPtr request_lsn, bool request_latest, void *buffer);
XLogRecPtr request_lsn, XLogRecPtr not_modified_since, void *buffer);
#endif
static neon_read_at_lsn_type neon_read_at_lsn_ptr;
@@ -299,8 +299,11 @@ get_raw_page_at_lsn(PG_FUNCTION_ARGS)
text *forkname;
uint32 blkno;
bool request_latest = PG_ARGISNULL(3);
uint64 read_lsn = request_latest ? GetXLogInsertRecPtr() : PG_GETARG_INT64(3);
XLogRecPtr request_lsn;
XLogRecPtr not_modified_since;
if (PG_NARGS() != 5)
elog(ERROR, "unexpected number of arguments in SQL function signature");
if (PG_ARGISNULL(0) || PG_ARGISNULL(1) || PG_ARGISNULL(2))
PG_RETURN_NULL();
@@ -309,6 +312,9 @@ get_raw_page_at_lsn(PG_FUNCTION_ARGS)
forkname = PG_GETARG_TEXT_PP(1);
blkno = PG_GETARG_UINT32(2);
request_lsn = PG_ARGISNULL(3) ? GetXLogInsertRecPtr() : PG_GETARG_LSN(3);
not_modified_since = PG_ARGISNULL(4) ? request_lsn : PG_GETARG_LSN(4);
if (!superuser())
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
@@ -361,7 +367,7 @@ get_raw_page_at_lsn(PG_FUNCTION_ARGS)
SET_VARSIZE(raw_page, BLCKSZ + VARHDRSZ);
raw_page_data = VARDATA(raw_page);
neon_read_at_lsn(InfoFromRelation(rel), forknum, blkno, read_lsn, request_latest, raw_page_data);
neon_read_at_lsn(InfoFromRelation(rel), forknum, blkno, request_lsn, not_modified_since, raw_page_data);
relation_close(rel, AccessShareLock);
@@ -380,6 +386,9 @@ get_raw_page_at_lsn_ex(PG_FUNCTION_ARGS)
{
char *raw_page_data;
if (PG_NARGS() != 7)
elog(ERROR, "unexpected number of arguments in SQL function signature");
if (!superuser())
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
@@ -403,18 +412,20 @@ get_raw_page_at_lsn_ex(PG_FUNCTION_ARGS)
};
ForkNumber forknum = PG_GETARG_UINT32(3);
uint32 blkno = PG_GETARG_UINT32(4);
bool request_latest = PG_ARGISNULL(5);
uint64 read_lsn = request_latest ? GetXLogInsertRecPtr() : PG_GETARG_INT64(5);
XLogRecPtr request_lsn;
XLogRecPtr not_modified_since;
/* Initialize buffer to copy to */
bytea *raw_page = (bytea *) palloc(BLCKSZ + VARHDRSZ);
request_lsn = PG_ARGISNULL(5) ? GetXLogInsertRecPtr() : PG_GETARG_LSN(5);
not_modified_since = PG_ARGISNULL(6) ? request_lsn : PG_GETARG_LSN(6);
SET_VARSIZE(raw_page, BLCKSZ + VARHDRSZ);
raw_page_data = VARDATA(raw_page);
neon_read_at_lsn(rinfo, forknum, blkno, read_lsn, request_latest, raw_page_data);
neon_read_at_lsn(rinfo, forknum, blkno, request_lsn, not_modified_since, raw_page_data);
PG_RETURN_BYTEA_P(raw_page);
}
}

View File

@@ -403,27 +403,43 @@ async fn main() -> anyhow::Result<()> {
maintenance_tasks.spawn(usage_metrics::task_main(metrics_config));
client_tasks.spawn(usage_metrics::task_backup(
&metrics_config.backup_metric_collection_config,
cancellation_token,
cancellation_token.clone(),
));
}
if let auth::BackendType::Console(api, _) = &config.auth_backend {
if let proxy::console::provider::ConsoleBackend::Console(api) = &**api {
if let Some(redis_notifications_client) = redis_notifications_client {
let cache = api.caches.project_info.clone();
maintenance_tasks.spawn(notifications::task_main(
redis_notifications_client,
cache.clone(),
cancel_map.clone(),
args.region.clone(),
));
maintenance_tasks.spawn(async move { cache.clone().gc_worker().await });
match (redis_notifications_client, regional_redis_client.clone()) {
(None, None) => {}
(client1, client2) => {
let cache = api.caches.project_info.clone();
if let Some(client) = client1 {
maintenance_tasks.spawn(notifications::task_main(
client,
cache.clone(),
cancel_map.clone(),
args.region.clone(),
));
}
if let Some(client) = client2 {
maintenance_tasks.spawn(notifications::task_main(
client,
cache.clone(),
cancel_map.clone(),
args.region.clone(),
));
}
maintenance_tasks.spawn(async move { cache.clone().gc_worker().await });
}
}
if let Some(regional_redis_client) = regional_redis_client {
let cache = api.caches.endpoints_cache.clone();
let con = regional_redis_client;
let span = tracing::info_span!("endpoints_cache");
maintenance_tasks.spawn(async move { cache.do_read(con).await }.instrument(span));
maintenance_tasks.spawn(
async move { cache.do_read(con, cancellation_token.clone()).await }
.instrument(span),
);
}
}
}

View File

@@ -4,6 +4,7 @@ use std::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};
use dashmap::DashSet;
@@ -13,6 +14,7 @@ use redis::{
};
use serde::Deserialize;
use tokio::sync::Mutex;
use tokio_util::sync::CancellationToken;
use tracing::info;
use crate::{
@@ -111,16 +113,22 @@ impl EndpointsCache {
pub async fn do_read(
&self,
mut con: ConnectionWithCredentialsProvider,
cancellation_token: CancellationToken,
) -> anyhow::Result<Infallible> {
let mut last_id = "0-0".to_string();
loop {
self.ready.store(false, Ordering::Release);
if let Err(e) = con.connect().await {
tracing::error!("error connecting to redis: {:?}", e);
continue;
self.ready.store(false, Ordering::Release);
}
if let Err(e) = self.read_from_stream(&mut con, &mut last_id).await {
tracing::error!("error reading from redis: {:?}", e);
self.ready.store(false, Ordering::Release);
}
if cancellation_token.is_cancelled() {
info!("cancellation token is cancelled, exiting");
tokio::time::sleep(Duration::from_secs(60 * 60 * 24 * 7)).await;
// 1 week.
}
tokio::time::sleep(self.config.retry_interval).await;
}

View File

@@ -533,13 +533,13 @@ pub struct RetryConfig {
impl RetryConfig {
/// Default options for RetryConfig.
/// Total delay for 4 retries with 1s base delay and 2.0 backoff factor is 7s.
/// Total delay for 8 retries with 100ms base delay and 1.6 backoff factor is about 7s.
pub const CONNECT_TO_COMPUTE_DEFAULT_VALUES: &'static str =
"num_retries=4,base_retry_wait_duration=1s,retry_wait_exponent_base=2.0";
/// Total delay for 4 retries with 1s base delay and 2.0 backoff factor is 7s.
/// Cplane has timeout of 60s on each request.
"num_retries=8,base_retry_wait_duration=100ms,retry_wait_exponent_base=1.6";
/// Total delay for 8 retries with 100ms base delay and 1.6 backoff factor is about 7s.
/// Cplane has timeout of 60s on each request. 8m7s in total.
pub const WAKE_COMPUTE_DEFAULT_VALUES: &'static str =
"num_retries=4,base_retry_wait_duration=1s,retry_wait_exponent_base=2.0";
"num_retries=8,base_retry_wait_duration=100ms,retry_wait_exponent_base=1.6";
/// Parse retry options passed via cmdline.
/// Example: [`Self::CONNECT_TO_COMPUTE_DEFAULT_VALUES`].

View File

@@ -75,7 +75,6 @@ pub type ComputeReady = DatabaseInfo;
// TODO: replace with an http-based protocol.
struct MgmtHandler;
#[async_trait::async_trait]
impl postgres_backend::Handler<tokio::net::TcpStream> for MgmtHandler {
async fn process_query(
&mut self,
@@ -89,6 +88,8 @@ impl postgres_backend::Handler<tokio::net::TcpStream> for MgmtHandler {
}
}
impl postgres_backend::HandlerSync<tokio::net::TcpStream> for MgmtHandler {}
fn try_process_query(pgb: &mut PostgresBackendTCP, query: &str) -> Result<(), QueryError> {
let resp: KickSession = serde_json::from_str(query).context("Failed to parse query as json")?;

View File

@@ -413,6 +413,7 @@ mod tests {
)
.unwrap(),
max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE,
upload_storage_class: None,
}),
timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
})

View File

@@ -1,42 +1,26 @@
//! Proxy Protocol V2 implementation
use std::{
future::{poll_fn, Future},
io,
net::SocketAddr,
pin::{pin, Pin},
task::{ready, Context, Poll},
pin::Pin,
task::{Context, Poll},
};
use bytes::{Buf, BytesMut};
use hyper::server::conn::AddrIncoming;
use bytes::BytesMut;
use pin_project_lite::pin_project;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, ReadBuf};
pub struct ProxyProtocolAccept {
pub incoming: AddrIncoming,
pub protocol: &'static str,
}
pin_project! {
pub struct WithClientIp<T> {
/// A chained [`AsyncRead`] with [`AsyncWrite`] passthrough
pub struct ChainRW<T> {
#[pin]
pub inner: T,
buf: BytesMut,
tlv_bytes: u16,
state: ProxyParse,
}
}
#[derive(Clone, PartialEq, Debug)]
enum ProxyParse {
NotStarted,
Finished(SocketAddr),
None,
}
impl<T: AsyncWrite> AsyncWrite for WithClientIp<T> {
impl<T: AsyncWrite> AsyncWrite for ChainRW<T> {
#[inline]
fn poll_write(
self: Pin<&mut Self>,
@@ -71,267 +55,174 @@ impl<T: AsyncWrite> AsyncWrite for WithClientIp<T> {
}
}
impl<T> WithClientIp<T> {
pub fn new(inner: T) -> Self {
WithClientIp {
inner,
buf: BytesMut::with_capacity(128),
tlv_bytes: 0,
state: ProxyParse::NotStarted,
}
}
pub fn client_addr(&self) -> Option<SocketAddr> {
match self.state {
ProxyParse::Finished(socket) => Some(socket),
_ => None,
}
}
}
impl<T: AsyncRead + Unpin> WithClientIp<T> {
pub async fn wait_for_addr(&mut self) -> io::Result<Option<SocketAddr>> {
match self.state {
ProxyParse::NotStarted => {
let mut pin = Pin::new(&mut *self);
let addr = poll_fn(|cx| pin.as_mut().poll_client_ip(cx)).await?;
match addr {
Some(addr) => self.state = ProxyParse::Finished(addr),
None => self.state = ProxyParse::None,
}
Ok(addr)
}
ProxyParse::Finished(addr) => Ok(Some(addr)),
ProxyParse::None => Ok(None),
}
}
}
/// Proxy Protocol Version 2 Header
const HEADER: [u8; 12] = [
0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A,
];
impl<T: AsyncRead> WithClientIp<T> {
/// implementation of <https://www.haproxy.org/download/2.4/doc/proxy-protocol.txt>
/// Version 2 (Binary Format)
fn poll_client_ip(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<Option<SocketAddr>>> {
// The binary header format starts with a constant 12 bytes block containing the protocol signature :
// \x0D \x0A \x0D \x0A \x00 \x0D \x0A \x51 \x55 \x49 \x54 \x0A
while self.buf.len() < 16 {
let mut this = self.as_mut().project();
let bytes_read = pin!(this.inner.read_buf(this.buf)).poll(cx)?;
pub async fn read_proxy_protocol<T: AsyncRead + Unpin>(
mut read: T,
) -> std::io::Result<(ChainRW<T>, Option<SocketAddr>)> {
let mut buf = BytesMut::with_capacity(128);
while buf.len() < 16 {
let bytes_read = read.read_buf(&mut buf).await?;
// exit for bad header
let len = usize::min(self.buf.len(), HEADER.len());
if self.buf[..len] != HEADER[..len] {
return Poll::Ready(Ok(None));
}
// if no more bytes available then exit
if ready!(bytes_read) == 0 {
return Poll::Ready(Ok(None));
};
// exit for bad header
let len = usize::min(buf.len(), HEADER.len());
if buf[..len] != HEADER[..len] {
return Ok((ChainRW { inner: read, buf }, None));
}
// The next byte (the 13th one) is the protocol version and command.
// The highest four bits contains the version. As of this specification, it must
// always be sent as \x2 and the receiver must only accept this value.
let vc = self.buf[12];
let version = vc >> 4;
let command = vc & 0b1111;
if version != 2 {
return Poll::Ready(Err(io::Error::new(
// if no more bytes available then exit
if bytes_read == 0 {
return Ok((ChainRW { inner: read, buf }, None));
};
}
let header = buf.split_to(16);
// The next byte (the 13th one) is the protocol version and command.
// The highest four bits contains the version. As of this specification, it must
// always be sent as \x2 and the receiver must only accept this value.
let vc = header[12];
let version = vc >> 4;
let command = vc & 0b1111;
if version != 2 {
return Err(io::Error::new(
io::ErrorKind::Other,
"invalid proxy protocol version. expected version 2",
));
}
match command {
// the connection was established on purpose by the proxy
// without being relayed. The connection endpoints are the sender and the
// receiver. Such connections exist when the proxy sends health-checks to the
// server. The receiver must accept this connection as valid and must use the
// real connection endpoints and discard the protocol block including the
// family which is ignored.
0 => {}
// the connection was established on behalf of another node,
// and reflects the original connection endpoints. The receiver must then use
// the information provided in the protocol block to get original the address.
1 => {}
// other values are unassigned and must not be emitted by senders. Receivers
// must drop connections presenting unexpected values here.
_ => {
return Err(io::Error::new(
io::ErrorKind::Other,
"invalid proxy protocol version. expected version 2",
)));
"invalid proxy protocol command. expected local (0) or proxy (1)",
))
}
match command {
// the connection was established on purpose by the proxy
// without being relayed. The connection endpoints are the sender and the
// receiver. Such connections exist when the proxy sends health-checks to the
// server. The receiver must accept this connection as valid and must use the
// real connection endpoints and discard the protocol block including the
// family which is ignored.
0 => {}
// the connection was established on behalf of another node,
// and reflects the original connection endpoints. The receiver must then use
// the information provided in the protocol block to get original the address.
1 => {}
// other values are unassigned and must not be emitted by senders. Receivers
// must drop connections presenting unexpected values here.
_ => {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::Other,
"invalid proxy protocol command. expected local (0) or proxy (1)",
)))
}
};
};
// The 14th byte contains the transport protocol and address family. The highest 4
// bits contain the address family, the lowest 4 bits contain the protocol.
let ft = self.buf[13];
let address_length = match ft {
// - \x11 : TCP over IPv4 : the forwarded connection uses TCP over the AF_INET
// protocol family. Address length is 2*4 + 2*2 = 12 bytes.
// - \x12 : UDP over IPv4 : the forwarded connection uses UDP over the AF_INET
// protocol family. Address length is 2*4 + 2*2 = 12 bytes.
0x11 | 0x12 => 12,
// - \x21 : TCP over IPv6 : the forwarded connection uses TCP over the AF_INET6
// protocol family. Address length is 2*16 + 2*2 = 36 bytes.
// - \x22 : UDP over IPv6 : the forwarded connection uses UDP over the AF_INET6
// protocol family. Address length is 2*16 + 2*2 = 36 bytes.
0x21 | 0x22 => 36,
// unspecified or unix stream. ignore the addresses
_ => 0,
};
// The 14th byte contains the transport protocol and address family. The highest 4
// bits contain the address family, the lowest 4 bits contain the protocol.
let ft = header[13];
let address_length = match ft {
// - \x11 : TCP over IPv4 : the forwarded connection uses TCP over the AF_INET
// protocol family. Address length is 2*4 + 2*2 = 12 bytes.
// - \x12 : UDP over IPv4 : the forwarded connection uses UDP over the AF_INET
// protocol family. Address length is 2*4 + 2*2 = 12 bytes.
0x11 | 0x12 => 12,
// - \x21 : TCP over IPv6 : the forwarded connection uses TCP over the AF_INET6
// protocol family. Address length is 2*16 + 2*2 = 36 bytes.
// - \x22 : UDP over IPv6 : the forwarded connection uses UDP over the AF_INET6
// protocol family. Address length is 2*16 + 2*2 = 36 bytes.
0x21 | 0x22 => 36,
// unspecified or unix stream. ignore the addresses
_ => 0,
};
// The 15th and 16th bytes is the address length in bytes in network endian order.
// It is used so that the receiver knows how many address bytes to skip even when
// it does not implement the presented protocol. Thus the length of the protocol
// header in bytes is always exactly 16 + this value. When a sender presents a
// LOCAL connection, it should not present any address so it sets this field to
// zero. Receivers MUST always consider this field to skip the appropriate number
// of bytes and must not assume zero is presented for LOCAL connections. When a
// receiver accepts an incoming connection showing an UNSPEC address family or
// protocol, it may or may not decide to log the address information if present.
let remaining_length = u16::from_be_bytes(self.buf[14..16].try_into().unwrap());
if remaining_length < address_length {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::Other,
"invalid proxy protocol length. not enough to fit requested IP addresses",
)));
// The 15th and 16th bytes is the address length in bytes in network endian order.
// It is used so that the receiver knows how many address bytes to skip even when
// it does not implement the presented protocol. Thus the length of the protocol
// header in bytes is always exactly 16 + this value. When a sender presents a
// LOCAL connection, it should not present any address so it sets this field to
// zero. Receivers MUST always consider this field to skip the appropriate number
// of bytes and must not assume zero is presented for LOCAL connections. When a
// receiver accepts an incoming connection showing an UNSPEC address family or
// protocol, it may or may not decide to log the address information if present.
let remaining_length = u16::from_be_bytes(header[14..16].try_into().unwrap());
if remaining_length < address_length {
return Err(io::Error::new(
io::ErrorKind::Other,
"invalid proxy protocol length. not enough to fit requested IP addresses",
));
}
drop(header);
while buf.len() < remaining_length as usize {
if read.read_buf(&mut buf).await? == 0 {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"stream closed while waiting for proxy protocol addresses",
));
}
while self.buf.len() < 16 + address_length as usize {
let mut this = self.as_mut().project();
if ready!(pin!(this.inner.read_buf(this.buf)).poll(cx)?) == 0 {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"stream closed while waiting for proxy protocol addresses",
)));
}
}
let this = self.as_mut().project();
// we are sure this is a proxy protocol v2 entry and we have read all the bytes we need
// discard the header we have parsed
this.buf.advance(16);
// Starting from the 17th byte, addresses are presented in network byte order.
// The address order is always the same :
// - source layer 3 address in network byte order
// - destination layer 3 address in network byte order
// - source layer 4 address if any, in network byte order (port)
// - destination layer 4 address if any, in network byte order (port)
let addresses = this.buf.split_to(address_length as usize);
let socket = match address_length {
12 => {
let src_addr: [u8; 4] = addresses[0..4].try_into().unwrap();
let src_port = u16::from_be_bytes(addresses[8..10].try_into().unwrap());
Some(SocketAddr::from((src_addr, src_port)))
}
36 => {
let src_addr: [u8; 16] = addresses[0..16].try_into().unwrap();
let src_port = u16::from_be_bytes(addresses[32..34].try_into().unwrap());
Some(SocketAddr::from((src_addr, src_port)))
}
_ => None,
};
*this.tlv_bytes = remaining_length - address_length;
self.as_mut().skip_tlv_inner();
Poll::Ready(Ok(socket))
}
#[cold]
fn read_ip(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let ip = ready!(self.as_mut().poll_client_ip(cx)?);
match ip {
Some(x) => *self.as_mut().project().state = ProxyParse::Finished(x),
None => *self.as_mut().project().state = ProxyParse::None,
// Starting from the 17th byte, addresses are presented in network byte order.
// The address order is always the same :
// - source layer 3 address in network byte order
// - destination layer 3 address in network byte order
// - source layer 4 address if any, in network byte order (port)
// - destination layer 4 address if any, in network byte order (port)
let addresses = buf.split_to(remaining_length as usize);
let socket = match address_length {
12 => {
let src_addr: [u8; 4] = addresses[0..4].try_into().unwrap();
let src_port = u16::from_be_bytes(addresses[8..10].try_into().unwrap());
Some(SocketAddr::from((src_addr, src_port)))
}
Poll::Ready(Ok(()))
}
36 => {
let src_addr: [u8; 16] = addresses[0..16].try_into().unwrap();
let src_port = u16::from_be_bytes(addresses[32..34].try_into().unwrap());
Some(SocketAddr::from((src_addr, src_port)))
}
_ => None,
};
#[cold]
fn skip_tlv(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let mut this = self.as_mut().project();
// we know that this.buf is empty
debug_assert_eq!(this.buf.len(), 0);
this.buf.reserve((*this.tlv_bytes).clamp(0, 1024) as usize);
ready!(pin!(this.inner.read_buf(this.buf)).poll(cx)?);
self.skip_tlv_inner();
Poll::Ready(Ok(()))
}
fn skip_tlv_inner(self: Pin<&mut Self>) {
let tlv_bytes_read = match u16::try_from(self.buf.len()) {
// we read more than u16::MAX therefore we must have read the full tlv_bytes
Err(_) => self.tlv_bytes,
// we might not have read the full tlv bytes yet
Ok(n) => u16::min(n, self.tlv_bytes),
};
let this = self.project();
*this.tlv_bytes -= tlv_bytes_read;
this.buf.advance(tlv_bytes_read as usize);
}
Ok((ChainRW { inner: read, buf }, socket))
}
impl<T: AsyncRead> AsyncRead for WithClientIp<T> {
impl<T: AsyncRead> AsyncRead for ChainRW<T> {
#[inline]
fn poll_read(
mut self: Pin<&mut Self>,
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
// I'm assuming these 3 comparisons will be easy to branch predict.
// especially with the cold attributes
// which should make this read wrapper almost invisible
if let ProxyParse::NotStarted = self.state {
ready!(self.as_mut().read_ip(cx)?);
}
while self.tlv_bytes > 0 {
ready!(self.as_mut().skip_tlv(cx)?)
}
let this = self.project();
if this.buf.is_empty() {
this.inner.poll_read(cx, buf)
if self.buf.is_empty() {
self.project().inner.poll_read(cx, buf)
} else {
// we know that tlv_bytes is 0
debug_assert_eq!(*this.tlv_bytes, 0);
let write = usize::min(this.buf.len(), buf.remaining());
let slice = this.buf.split_to(write).freeze();
buf.put_slice(&slice);
// reset the allocation so it can be freed
if this.buf.is_empty() {
*this.buf = BytesMut::new();
}
Poll::Ready(Ok(()))
self.read_from_buf(buf)
}
}
}
impl<T: AsyncRead> ChainRW<T> {
#[cold]
fn read_from_buf(self: Pin<&mut Self>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
debug_assert!(!self.buf.is_empty());
let this = self.project();
let write = usize::min(this.buf.len(), buf.remaining());
let slice = this.buf.split_to(write).freeze();
buf.put_slice(&slice);
// reset the allocation so it can be freed
if this.buf.is_empty() {
*this.buf = BytesMut::new();
}
Poll::Ready(Ok(()))
}
}
#[cfg(test)]
mod tests {
use std::pin::pin;
use tokio::io::AsyncReadExt;
use crate::protocol2::{ProxyParse, WithClientIp};
use crate::protocol2::read_proxy_protocol;
#[tokio::test]
async fn test_ipv4() {
@@ -353,16 +244,15 @@ mod tests {
let extra_data = [0x55; 256];
let mut read = pin!(WithClientIp::new(header.chain(extra_data.as_slice())));
let (mut read, addr) = read_proxy_protocol(header.chain(extra_data.as_slice()))
.await
.unwrap();
let mut bytes = vec![];
read.read_to_end(&mut bytes).await.unwrap();
assert_eq!(bytes, extra_data);
assert_eq!(
read.state,
ProxyParse::Finished(([127, 0, 0, 1], 65535).into())
);
assert_eq!(addr, Some(([127, 0, 0, 1], 65535).into()));
}
#[tokio::test]
@@ -385,17 +275,17 @@ mod tests {
let extra_data = [0x55; 256];
let mut read = pin!(WithClientIp::new(header.chain(extra_data.as_slice())));
let (mut read, addr) = read_proxy_protocol(header.chain(extra_data.as_slice()))
.await
.unwrap();
let mut bytes = vec![];
read.read_to_end(&mut bytes).await.unwrap();
assert_eq!(bytes, extra_data);
assert_eq!(
read.state,
ProxyParse::Finished(
([15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0], 257).into()
)
addr,
Some(([15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0], 257).into())
);
}
@@ -403,24 +293,24 @@ mod tests {
async fn test_invalid() {
let data = [0x55; 256];
let mut read = pin!(WithClientIp::new(data.as_slice()));
let (mut read, addr) = read_proxy_protocol(data.as_slice()).await.unwrap();
let mut bytes = vec![];
read.read_to_end(&mut bytes).await.unwrap();
assert_eq!(bytes, data);
assert_eq!(read.state, ProxyParse::None);
assert_eq!(addr, None);
}
#[tokio::test]
async fn test_short() {
let data = [0x55; 10];
let mut read = pin!(WithClientIp::new(data.as_slice()));
let (mut read, addr) = read_proxy_protocol(data.as_slice()).await.unwrap();
let mut bytes = vec![];
read.read_to_end(&mut bytes).await.unwrap();
assert_eq!(bytes, data);
assert_eq!(read.state, ProxyParse::None);
assert_eq!(addr, None);
}
#[tokio::test]
@@ -446,15 +336,14 @@ mod tests {
let extra_data = [0xaa; 256];
let mut read = pin!(WithClientIp::new(header.chain(extra_data.as_slice())));
let (mut read, addr) = read_proxy_protocol(header.chain(extra_data.as_slice()))
.await
.unwrap();
let mut bytes = vec![];
read.read_to_end(&mut bytes).await.unwrap();
assert_eq!(bytes, extra_data);
assert_eq!(
read.state,
ProxyParse::Finished(([55, 56, 57, 58], 65535).into())
);
assert_eq!(addr, Some(([55, 56, 57, 58], 65535).into()));
}
}

View File

@@ -17,7 +17,7 @@ use crate::{
context::RequestMonitoring,
error::ReportableError,
metrics::{Metrics, NumClientConnectionsGuard},
protocol2::WithClientIp,
protocol2::read_proxy_protocol,
proxy::handshake::{handshake, HandshakeData},
stream::{PqStream, Stream},
EndpointCacheKey,
@@ -88,20 +88,18 @@ pub async fn task_main(
tracing::info!(protocol = "tcp", %session_id, "accepted new TCP connection");
connections.spawn(async move {
let mut socket = WithClientIp::new(socket);
let mut peer_addr = peer_addr.ip();
match socket.wait_for_addr().await {
Ok(Some(addr)) => peer_addr = addr.ip(),
let (socket, peer_addr) = match read_proxy_protocol(socket).await{
Ok((socket, Some(addr))) => (socket, addr.ip()),
Err(e) => {
error!("per-client task finished with an error: {e:#}");
return;
}
Ok(None) if config.require_client_ip => {
Ok((_socket, None)) if config.require_client_ip => {
error!("missing required client IP");
return;
}
Ok(None) => {}
}
Ok((socket, None)) => (socket, peer_addr.ip())
};
match socket.inner.set_nodelay(true) {
Ok(()) => {},

View File

@@ -174,7 +174,7 @@ async fn dummy_proxy(
tls: Option<TlsConfig>,
auth: impl TestAuth + Send,
) -> anyhow::Result<()> {
let client = WithClientIp::new(client);
let (client, _) = read_proxy_protocol(client).await?;
let mut stream = match handshake(client, tls.as_ref(), false).await? {
HandshakeData::Startup(stream, _) => stream,
HandshakeData::Cancel(_) => bail!("cancellation not supported"),

View File

@@ -33,7 +33,7 @@ use crate::cancellation::CancellationHandlerMain;
use crate::config::ProxyConfig;
use crate::context::RequestMonitoring;
use crate::metrics::Metrics;
use crate::protocol2::WithClientIp;
use crate::protocol2::read_proxy_protocol;
use crate::proxy::run_until_cancelled;
use crate::serverless::backend::PoolingBackend;
use crate::serverless::http_util::{api_error_into_response, json_response};
@@ -158,9 +158,8 @@ async fn connection_handler(
.guard(crate::metrics::Protocol::Http);
// handle PROXY protocol
let mut conn = WithClientIp::new(conn);
let peer = match conn.wait_for_addr().await {
Ok(peer) => peer,
let (conn, peer) = match read_proxy_protocol(conn).await {
Ok(c) => c,
Err(e) => {
tracing::error!(?session_id, %peer_addr, "failed to accept TCP connection: invalid PROXY protocol V2 header: {e:#}");
return;

View File

@@ -2,10 +2,13 @@
//! protocol commands.
use anyhow::Context;
use std::net::TcpStream;
use std::str::{self, FromStr};
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_io_timeout::TimeoutReader;
use tracing::{debug, info, info_span, Instrument};
use utils::measured_stream::MeasuredStream;
use crate::auth::check_permission;
use crate::json_ctrl::{handle_json_ctrl, AppendLogicalMessage};
@@ -95,8 +98,7 @@ fn cmd_to_string(cmd: &SafekeeperPostgresCommand) -> &str {
}
}
#[async_trait::async_trait]
impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::HandlerSync<IO>
for SafekeeperPostgresHandler
{
// tenant_id and timeline_id are passed in connection string params
@@ -191,8 +193,22 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
self.claims = Some(data.claims);
Ok(())
}
}
type IO<'s, R: FnMut(usize), W> =
MeasuredStream<std::pin::Pin<&'s mut TimeoutReader<TcpStream>>, R, W>;
impl<'s, R: FnMut(usize), W> postgres_backend::Handler<IO<'s, R, W>> for SafekeeperPostgresHandler {
async fn process_query(
&mut self,
pgb: &mut PostgresBackend<IO<'s, R, W>>,
query_string: &str,
) -> Result<(), QueryError> {
self.process_query_(pgb, &query_string).await
}
}
impl SafekeeperPostgresHandler {
async fn process_query_<IO: AsyncRead + AsyncWrite + Send + Unpin>(
&mut self,
pgb: &mut PostgresBackend<IO>,
query_string: &str,

View File

@@ -184,6 +184,19 @@ impl HeartbeaterTask {
}
}
}
tracing::info!(
"Heartbeat round complete for {} nodes, {} offline",
new_state.len(),
new_state
.values()
.filter(|s| match s {
PageserverState::Available { .. } => {
false
}
PageserverState::Offline => true,
})
.count()
);
let mut deltas = Vec::new();
let now = Instant::now();

View File

@@ -9,7 +9,9 @@ use std::time::Duration;
use storage_controller::http::make_router;
use storage_controller::metrics::preinitialize_metrics;
use storage_controller::persistence::Persistence;
use storage_controller::service::{Config, Service, MAX_UNAVAILABLE_INTERVAL_DEFAULT};
use storage_controller::service::{
Config, Service, MAX_UNAVAILABLE_INTERVAL_DEFAULT, RECONCILER_CONCURRENCY_DEFAULT,
};
use tokio::signal::unix::SignalKind;
use tokio_util::sync::CancellationToken;
use utils::auth::{JwtAuth, SwappableJwtAuth};
@@ -64,6 +66,10 @@ struct Cli {
/// Grace period before marking unresponsive pageserver offline
#[arg(long)]
max_unavailable_interval: Option<humantime::Duration>,
/// Maximum number of reconcilers that may run in parallel
#[arg(long)]
reconciler_concurrency: Option<usize>,
}
enum StrictMode {
@@ -243,6 +249,9 @@ async fn async_main() -> anyhow::Result<()> {
.max_unavailable_interval
.map(humantime::Duration::into)
.unwrap_or(MAX_UNAVAILABLE_INTERVAL_DEFAULT),
reconciler_concurrency: args
.reconciler_concurrency
.unwrap_or(RECONCILER_CONCURRENCY_DEFAULT),
};
// After loading secrets & config, but before starting anything else, apply database migrations

View File

@@ -51,6 +51,10 @@ pub(super) struct Reconciler {
/// so that we can set [`crate::tenant_shard::TenantShard::pending_compute_notification`] to ensure a later retry.
pub(crate) compute_notify_failure: bool,
/// Reconciler is responsible for keeping alive semaphore units that limit concurrency on how many
/// we will spawn.
pub(crate) _resource_units: ReconcileUnits,
/// A means to abort background reconciliation: it is essential to
/// call this when something changes in the original TenantShard that
/// will make this reconciliation impossible or unnecessary, for
@@ -66,6 +70,19 @@ pub(super) struct Reconciler {
pub(crate) persistence: Arc<Persistence>,
}
/// RAII resource units granted to a Reconciler, which it should keep alive until it finishes doing I/O
pub(crate) struct ReconcileUnits {
_sem_units: tokio::sync::OwnedSemaphorePermit,
}
impl ReconcileUnits {
pub(crate) fn new(sem_units: tokio::sync::OwnedSemaphorePermit) -> Self {
Self {
_sem_units: sem_units,
}
}
}
/// This is a snapshot of [`crate::tenant_shard::IntentState`], but it does not do any
/// reference counting for Scheduler. The IntentState is what the scheduler works with,
/// and the TargetState is just the instruction for a particular Reconciler run.
@@ -750,7 +767,10 @@ impl Reconciler {
// It is up to the caller whether they want to drop out on this error, but they don't have to:
// in general we should avoid letting unavailability of the cloud control plane stop us from
// making progress.
tracing::warn!("Failed to notify compute of attached pageserver {node}: {e}");
if !matches!(e, NotifyError::ShuttingDown) {
tracing::warn!("Failed to notify compute of attached pageserver {node}: {e}");
}
// Set this flag so that in our ReconcileResult we will set the flag on the shard that it
// needs to retry at some point.
self.compute_notify_failure = true;

View File

@@ -10,8 +10,9 @@ use std::{
use crate::{
id_lock_map::IdLockMap,
persistence::{AbortShardSplitStatus, TenantFilter},
reconciler::ReconcileError,
reconciler::{ReconcileError, ReconcileUnits},
scheduler::{ScheduleContext, ScheduleMode},
tenant_shard::ReconcileNeeded,
};
use anyhow::Context;
use control_plane::storage_controller::{
@@ -48,7 +49,7 @@ use pageserver_api::{
},
};
use pageserver_client::mgmt_api;
use tokio::sync::OwnedRwLockWriteGuard;
use tokio::sync::{mpsc::error::TrySendError, OwnedRwLockWriteGuard};
use tokio_util::sync::CancellationToken;
use tracing::instrument;
use utils::{
@@ -90,6 +91,13 @@ pub(crate) const STARTUP_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
pub const MAX_UNAVAILABLE_INTERVAL_DEFAULT: Duration = Duration::from_secs(30);
pub const RECONCILER_CONCURRENCY_DEFAULT: usize = 128;
// Depth of the channel used to enqueue shards for reconciliation when they can't do it immediately.
// This channel is finite-size to avoid using excessive memory if we get into a state where reconciles are finishing more slowly
// than they're being pushed onto the queue.
const MAX_DELAYED_RECONCILES: usize = 10000;
// Top level state available to all HTTP handlers
struct ServiceState {
tenants: BTreeMap<TenantShardId, TenantShard>,
@@ -97,6 +105,9 @@ struct ServiceState {
nodes: Arc<HashMap<NodeId, Node>>,
scheduler: Scheduler,
/// Queue of tenants who are waiting for concurrency limits to permit them to reconcile
delayed_reconcile_rx: tokio::sync::mpsc::Receiver<TenantShardId>,
}
impl ServiceState {
@@ -104,11 +115,13 @@ impl ServiceState {
nodes: HashMap<NodeId, Node>,
tenants: BTreeMap<TenantShardId, TenantShard>,
scheduler: Scheduler,
delayed_reconcile_rx: tokio::sync::mpsc::Receiver<TenantShardId>,
) -> Self {
Self {
tenants,
nodes: Arc::new(nodes),
scheduler,
delayed_reconcile_rx,
}
}
@@ -142,6 +155,9 @@ pub struct Config {
/// considered active. Once the grace period elapses, the next heartbeat failure will
/// mark the pagseserver offline.
pub max_unavailable_interval: Duration,
/// How many Reconcilers may be spawned concurrently
pub reconciler_concurrency: usize,
}
impl From<DatabaseError> for ApiError {
@@ -180,6 +196,17 @@ pub struct Service {
// that transition it to/from Active.
node_op_locks: IdLockMap<NodeId>,
// Limit how many Reconcilers we will spawn concurrently
reconciler_concurrency: Arc<tokio::sync::Semaphore>,
/// Queue of tenants who are waiting for concurrency limits to permit them to reconcile
/// Send into this queue to promptly attempt to reconcile this shard next time units are available.
///
/// Note that this state logically lives inside ServiceInner, but carrying Sender here makes the code simpler
/// by avoiding needing a &mut ref to something inside the ServiceInner. This could be optimized to
/// use a VecDeque instead of a channel to reduce synchronization overhead, at the cost of some code complexity.
delayed_reconcile_tx: tokio::sync::mpsc::Sender<TenantShardId>,
// Process shutdown will fire this token
cancel: CancellationToken,
@@ -742,8 +769,9 @@ impl Service {
}
/// Apply the contents of a [`ReconcileResult`] to our in-memory state: if the reconciliation
/// was successful, this will update the observed state of the tenant such that subsequent
/// calls to [`TenantShard::maybe_reconcile`] will do nothing.
/// was successful and intent hasn't changed since the Reconciler was spawned, this will update
/// the observed state of the tenant such that subsequent calls to [`TenantShard::get_reconcile_needed`]
/// will indicate that reconciliation is not needed.
#[instrument(skip_all, fields(
tenant_id=%result.tenant_shard_id.tenant_id, shard_id=%result.tenant_shard_id.shard_slug(),
sequence=%result.sequence
@@ -796,14 +824,28 @@ impl Service {
// Ordering: populate last_error before advancing error_seq,
// so that waiters will see the correct error after waiting.
*(tenant.last_error.lock().unwrap()) = format!("{e}");
tenant.error_waiter.advance(result.sequence);
tenant.set_last_error(result.sequence, e);
for (node_id, o) in result.observed.locations {
tenant.observed.locations.insert(node_id, o);
}
}
}
// Maybe some other work can proceed now that this job finished.
if self.reconciler_concurrency.available_permits() > 0 {
while let Ok(tenant_shard_id) = locked.delayed_reconcile_rx.try_recv() {
let (nodes, tenants, _scheduler) = locked.parts_mut();
if let Some(shard) = tenants.get_mut(&tenant_shard_id) {
shard.delayed_reconcile = false;
self.maybe_reconcile_shard(shard, nodes);
}
if self.reconciler_concurrency.available_permits() == 0 {
break;
}
}
}
}
async fn process_results(
@@ -986,6 +1028,9 @@ impl Service {
let (startup_completion, startup_complete) = utils::completion::channel();
let (delayed_reconcile_tx, delayed_reconcile_rx) =
tokio::sync::mpsc::channel(MAX_DELAYED_RECONCILES);
let cancel = CancellationToken::new();
let heartbeater = Heartbeater::new(
config.jwt_token.clone(),
@@ -994,13 +1039,20 @@ impl Service {
);
let this = Arc::new(Self {
inner: Arc::new(std::sync::RwLock::new(ServiceState::new(
nodes, tenants, scheduler,
nodes,
tenants,
scheduler,
delayed_reconcile_rx,
))),
config: config.clone(),
persistence,
compute_hook: Arc::new(ComputeHook::new(config)),
compute_hook: Arc::new(ComputeHook::new(config.clone())),
result_tx,
heartbeater,
reconciler_concurrency: Arc::new(tokio::sync::Semaphore::new(
config.reconciler_concurrency,
)),
delayed_reconcile_tx,
abort_tx,
startup_complete: startup_complete.clone(),
cancel,
@@ -1535,7 +1587,7 @@ impl Service {
let (response, waiters) = self.do_tenant_create(create_req).await?;
if let Err(e) = self.await_waiters(waiters, SHORT_RECONCILE_TIMEOUT).await {
if let Err(e) = self.await_waiters(waiters, RECONCILE_TIMEOUT).await {
// Avoid deadlock: reconcile may fail while notifying compute, if the cloud control plane refuses to
// accept compute notifications while it is in the process of creating. Reconciliation will
// be retried in the background.
@@ -2752,7 +2804,14 @@ impl Service {
tenant_shard_id: shard.tenant_shard_id,
node_attached: *shard.intent.get_attached(),
node_secondary: shard.intent.get_secondary().to_vec(),
last_error: shard.last_error.lock().unwrap().clone(),
last_error: shard
.last_error
.lock()
.unwrap()
.as_ref()
.map(|e| format!("{e}"))
.unwrap_or("".to_string())
.clone(),
is_reconciling: shard.reconciler.is_some(),
is_pending_compute_notification: shard.pending_compute_notification,
is_splitting: matches!(shard.splitting, SplitState::Splitting),
@@ -3978,7 +4037,7 @@ impl Service {
// TODO: in the background, we should balance work back onto this pageserver
}
AvailabilityTransition::Unchanged => {
tracing::info!("Node {} no change during config", node_id);
tracing::debug!("Node {} no change during config", node_id);
}
}
@@ -4053,20 +4112,64 @@ impl Service {
Ok(())
}
/// Convenience wrapper around [`TenantShard::maybe_reconcile`] that provides
/// all the references to parts of Self that are needed
/// Wrap [`TenantShard`] reconciliation methods with acquisition of [`Gate`] and [`ReconcileUnits`],
fn maybe_reconcile_shard(
&self,
shard: &mut TenantShard,
nodes: &Arc<HashMap<NodeId, Node>>,
) -> Option<ReconcilerWaiter> {
shard.maybe_reconcile(
let reconcile_needed = shard.get_reconcile_needed(nodes);
match reconcile_needed {
ReconcileNeeded::No => return None,
ReconcileNeeded::WaitExisting(waiter) => return Some(waiter),
ReconcileNeeded::Yes => {
// Fall through to try and acquire units for spawning reconciler
}
};
let units = match self.reconciler_concurrency.clone().try_acquire_owned() {
Ok(u) => ReconcileUnits::new(u),
Err(_) => {
tracing::info!(tenant_id=%shard.tenant_shard_id.tenant_id, shard_id=%shard.tenant_shard_id.shard_slug(),
"Concurrency limited: enqueued for reconcile later");
if !shard.delayed_reconcile {
match self.delayed_reconcile_tx.try_send(shard.tenant_shard_id) {
Err(TrySendError::Closed(_)) => {
// Weird mid-shutdown case?
}
Err(TrySendError::Full(_)) => {
// It is safe to skip sending our ID in the channel: we will eventually get retried by the background reconcile task.
tracing::warn!(
"Many shards are waiting to reconcile: delayed_reconcile queue is full"
);
}
Ok(()) => {
shard.delayed_reconcile = true;
}
}
}
// We won't spawn a reconciler, but we will construct a waiter that waits for the shard's sequence
// number to advance. When this function is eventually called again and succeeds in getting units,
// it will spawn a reconciler that makes this waiter complete.
return Some(shard.future_reconcile_waiter());
}
};
let Ok(gate_guard) = self.gate.enter() else {
// Gate closed: we're shutting down, drop out.
return None;
};
shard.spawn_reconciler(
&self.result_tx,
nodes,
&self.compute_hook,
&self.config,
&self.persistence,
&self.gate,
units,
gate_guard,
&self.cancel,
)
}
@@ -4088,6 +4191,11 @@ impl Service {
schedule_context = ScheduleContext::default();
}
// Skip checking if this shard is already enqueued for reconciliation
if shard.delayed_reconcile {
continue;
}
// Eventual consistency: if an earlier reconcile job failed, and the shard is still
// dirty, spawn another rone
if self.maybe_reconcile_shard(shard, &pageservers).is_some() {
@@ -4249,7 +4357,26 @@ impl Service {
};
let waiter_count = waiters.len();
self.await_waiters(waiters, RECONCILE_TIMEOUT).await?;
match self.await_waiters(waiters, RECONCILE_TIMEOUT).await {
Ok(()) => {}
Err(ReconcileWaitError::Failed(_, reconcile_error))
if matches!(*reconcile_error, ReconcileError::Cancel) =>
{
// Ignore reconciler cancel errors: this reconciler might have shut down
// because some other change superceded it. We will return a nonzero number,
// so the caller knows they might have to call again to quiesce the system.
}
Err(e) => {
return Err(e);
}
};
tracing::info!(
"{} reconciles in reconcile_all, {} waiters",
reconciles_spawned,
waiter_count
);
Ok(waiter_count)
}

View File

@@ -7,6 +7,7 @@ use std::{
use crate::{
metrics::{self, ReconcileCompleteLabelGroup, ReconcileOutcome},
persistence::TenantShardPersistence,
reconciler::ReconcileUnits,
scheduler::{AffinityScore, MaySchedule, ScheduleContext},
};
use pageserver_api::controller_api::{PlacementPolicy, ShardSchedulingPolicy};
@@ -22,7 +23,7 @@ use utils::{
generation::Generation,
id::NodeId,
seqwait::{SeqWait, SeqWaitError},
sync::gate::Gate,
sync::gate::GateGuard,
};
use crate::{
@@ -37,12 +38,18 @@ use crate::{
};
/// Serialization helper
fn read_mutex_content<S, T>(v: &std::sync::Mutex<T>, serializer: S) -> Result<S::Ok, S::Error>
fn read_last_error<S, T>(v: &std::sync::Mutex<Option<T>>, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::ser::Serializer,
T: Clone + std::fmt::Display,
T: std::fmt::Display,
{
serializer.collect_str(&v.lock().unwrap())
serializer.collect_str(
&v.lock()
.unwrap()
.as_ref()
.map(|e| format!("{e}"))
.unwrap_or("".to_string()),
)
}
/// In-memory state for a particular tenant shard.
@@ -95,6 +102,10 @@ pub(crate) struct TenantShard {
/// reconciliation, and timeline creation.
pub(crate) splitting: SplitState,
/// If a tenant was enqueued for later reconcile due to hitting concurrency limit, this flag
/// is set. This flag is cleared when the tenant is popped off the delay queue.
pub(crate) delayed_reconcile: bool,
/// Optionally wait for reconciliation to complete up to a particular
/// sequence number.
#[serde(skip)]
@@ -106,15 +117,19 @@ pub(crate) struct TenantShard {
#[serde(skip)]
pub(crate) error_waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
/// The most recent error from a reconcile on this tenant
/// The most recent error from a reconcile on this tenant. This is a nested Arc
/// because:
/// - ReconcileWaiters need to Arc-clone the overall object to read it later
/// - ReconcileWaitError needs to use an `Arc<ReconcileError>` because we can construct
/// many waiters for one shard, and the underlying error types are not Clone.
/// TODO: generalize to an array of recent events
/// TOOD: use a ArcSwap instead of mutex for faster reads?
#[serde(serialize_with = "read_mutex_content")]
pub(crate) last_error: std::sync::Arc<std::sync::Mutex<String>>,
#[serde(serialize_with = "read_last_error")]
pub(crate) last_error: std::sync::Arc<std::sync::Mutex<Option<Arc<ReconcileError>>>>,
/// If we have a pending compute notification that for some reason we weren't able to send,
/// set this to true. If this is set, calls to [`Self::maybe_reconcile`] will run a task to retry
/// sending it. This is the mechanism by which compute notifications are included in the scope
/// set this to true. If this is set, calls to [`Self::get_reconcile_needed`] will return Yes
/// and trigger a Reconciler run. This is the mechanism by which compute notifications are included in the scope
/// of state that we publish externally in an eventually consistent way.
pub(crate) pending_compute_notification: bool,
@@ -288,18 +303,18 @@ pub(crate) struct ReconcilerWaiter {
seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
error_seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
error: std::sync::Arc<std::sync::Mutex<String>>,
error: std::sync::Arc<std::sync::Mutex<Option<Arc<ReconcileError>>>>,
seq: Sequence,
}
#[derive(thiserror::Error, Debug)]
pub enum ReconcileWaitError {
pub(crate) enum ReconcileWaitError {
#[error("Timeout waiting for shard {0}")]
Timeout(TenantShardId),
#[error("shutting down")]
Shutdown,
#[error("Reconcile error on shard {0}: {1}")]
Failed(TenantShardId, String),
Failed(TenantShardId, Arc<ReconcileError>),
}
#[derive(Eq, PartialEq, Debug)]
@@ -337,7 +352,8 @@ impl ReconcilerWaiter {
SeqWaitError::Timeout => unreachable!()
})?;
return Err(ReconcileWaitError::Failed(self.tenant_shard_id, self.error.lock().unwrap().clone()))
return Err(ReconcileWaitError::Failed(self.tenant_shard_id,
self.error.lock().unwrap().clone().expect("If error_seq_wait was advanced error was set").clone()))
}
}
@@ -353,6 +369,17 @@ pub(crate) struct ReconcilerHandle {
cancel: CancellationToken,
}
pub(crate) enum ReconcileNeeded {
/// shard either doesn't need reconciliation, or is forbidden from spawning a reconciler
/// in its current state (e.g. shard split in progress, or ShardSchedulingPolicy forbids it)
No,
/// shard has a reconciler running, and its intent hasn't changed since that one was
/// spawned: wait for the existing reconciler rather than spawning a new one.
WaitExisting(ReconcilerWaiter),
/// shard needs reconciliation: call into [`TenantShard::spawn_reconciler`]
Yes,
}
/// When a reconcile task completes, it sends this result object
/// to be applied to the primary TenantShard.
pub(crate) struct ReconcileResult {
@@ -396,6 +423,7 @@ impl TenantShard {
reconciler: None,
splitting: SplitState::Idle,
sequence: Sequence(1),
delayed_reconcile: false,
waiter: Arc::new(SeqWait::new(Sequence(0))),
error_waiter: Arc::new(SeqWait::new(Sequence(0))),
last_error: Arc::default(),
@@ -831,16 +859,10 @@ impl TenantShard {
#[allow(clippy::too_many_arguments)]
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
pub(crate) fn maybe_reconcile(
pub(crate) fn get_reconcile_needed(
&mut self,
result_tx: &tokio::sync::mpsc::UnboundedSender<ReconcileResult>,
pageservers: &Arc<HashMap<NodeId, Node>>,
compute_hook: &Arc<ComputeHook>,
service_config: &service::Config,
persistence: &Arc<Persistence>,
gate: &Gate,
cancel: &CancellationToken,
) -> Option<ReconcilerWaiter> {
) -> ReconcileNeeded {
// If there are any ambiguous observed states, and the nodes they refer to are available,
// we should reconcile to clean them up.
let mut dirty_observed = false;
@@ -862,8 +884,8 @@ impl TenantShard {
active_nodes_dirty || dirty_observed || self.pending_compute_notification;
if !do_reconcile {
tracing::info!("Not dirty, no reconciliation needed.");
return None;
tracing::debug!("Not dirty, no reconciliation needed.");
return ReconcileNeeded::No;
}
// If we are currently splitting, then never start a reconciler task: the splitting logic
@@ -871,7 +893,7 @@ impl TenantShard {
// up top, so that we only log this message if we would otherwise have done a reconciliation.
if !matches!(self.splitting, SplitState::Idle) {
tracing::info!("Refusing to reconcile, splitting in progress");
return None;
return ReconcileNeeded::No;
}
// Reconcile already in flight for the current sequence?
@@ -881,7 +903,7 @@ impl TenantShard {
"Reconciliation already in progress for sequence {:?}",
self.sequence,
);
return Some(ReconcilerWaiter {
return ReconcileNeeded::WaitExisting(ReconcilerWaiter {
tenant_shard_id: self.tenant_shard_id,
seq_wait: self.waiter.clone(),
error_seq_wait: self.error_waiter.clone(),
@@ -900,10 +922,67 @@ impl TenantShard {
// We only reach this point if there is work to do and we're going to skip
// doing it: warn it obvious why this tenant isn't doing what it ought to.
tracing::warn!("Skipping reconcile for policy {:?}", self.scheduling_policy);
return None;
return ReconcileNeeded::No;
}
}
ReconcileNeeded::Yes
}
/// Ensure the sequence number is set to a value where waiting for this value will make us wait
/// for the next reconcile: i.e. it is ahead of all completed or running reconcilers.
///
/// Constructing a ReconcilerWaiter with the resulting sequence number gives the property
/// that the waiter will not complete until some future Reconciler is constructed and run.
fn ensure_sequence_ahead(&mut self) {
// Find the highest sequence for which a Reconciler has previously run or is currently
// running
let max_seen = std::cmp::max(
self.reconciler
.as_ref()
.map(|r| r.sequence)
.unwrap_or(Sequence(0)),
std::cmp::max(self.waiter.load(), self.error_waiter.load()),
);
if self.sequence <= max_seen {
self.sequence = max_seen.next();
}
}
/// Create a waiter that will wait for some future Reconciler that hasn't been spawned yet.
///
/// This is appropriate when you can't spawn a recociler (e.g. due to resource limits), but
/// you would like to wait until one gets spawned in the background.
pub(crate) fn future_reconcile_waiter(&mut self) -> ReconcilerWaiter {
self.ensure_sequence_ahead();
ReconcilerWaiter {
tenant_shard_id: self.tenant_shard_id,
seq_wait: self.waiter.clone(),
error_seq_wait: self.error_waiter.clone(),
error: self.last_error.clone(),
seq: self.sequence,
}
}
#[allow(clippy::too_many_arguments)]
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
pub(crate) fn spawn_reconciler(
&mut self,
result_tx: &tokio::sync::mpsc::UnboundedSender<ReconcileResult>,
pageservers: &Arc<HashMap<NodeId, Node>>,
compute_hook: &Arc<ComputeHook>,
service_config: &service::Config,
persistence: &Arc<Persistence>,
units: ReconcileUnits,
gate_guard: GateGuard,
cancel: &CancellationToken,
) -> Option<ReconcilerWaiter> {
// Reconcile in flight for a stale sequence? Our sequence's task will wait for it before
// doing our sequence's work.
let old_handle = self.reconciler.take();
// Build list of nodes from which the reconciler should detach
let mut detach = Vec::new();
for node_id in self.observed.locations.keys() {
@@ -919,18 +998,9 @@ impl TenantShard {
}
}
// Reconcile in flight for a stale sequence? Our sequence's task will wait for it before
// doing our sequence's work.
let old_handle = self.reconciler.take();
let Ok(gate_guard) = gate.enter() else {
// Shutting down, don't start a reconciler
return None;
};
// Advance the sequence before spawning a reconciler, so that sequence waiters
// can distinguish between before+after the reconcile completes.
self.sequence = self.sequence.next();
self.ensure_sequence_ahead();
let reconciler_cancel = cancel.child_token();
let reconciler_intent = TargetState::from_intent(pageservers, &self.intent);
@@ -945,6 +1015,7 @@ impl TenantShard {
compute_hook: compute_hook.clone(),
service_config: service_config.clone(),
_gate_guard: gate_guard,
_resource_units: units,
cancel: reconciler_cancel.clone(),
persistence: persistence.clone(),
compute_notify_failure: false,
@@ -1011,16 +1082,18 @@ impl TenantShard {
status: outcome_label,
});
result_tx
.send(ReconcileResult {
sequence: reconcile_seq,
result,
tenant_shard_id: reconciler.tenant_shard_id,
generation: reconciler.generation,
observed: reconciler.observed,
pending_compute_notification: reconciler.compute_notify_failure,
})
.ok();
// Constructing result implicitly drops Reconciler, freeing any ReconcileUnits before the Service might
// try and schedule more work in response to our result.
let result = ReconcileResult {
sequence: reconcile_seq,
result,
tenant_shard_id: reconciler.tenant_shard_id,
generation: reconciler.generation,
observed: reconciler.observed,
pending_compute_notification: reconciler.compute_notify_failure,
};
result_tx.send(result).ok();
}
.instrument(reconciler_span),
);
@@ -1089,6 +1162,13 @@ impl TenantShard {
&self.scheduling_policy
}
pub(crate) fn set_last_error(&mut self, sequence: Sequence, error: ReconcileError) {
// Ordering: always set last_error before advancing sequence, so that sequence
// waiters are guaranteed to see a Some value when they see an error.
*(self.last_error.lock().unwrap()) = Some(Arc::new(error));
self.error_waiter.advance(sequence);
}
pub(crate) fn from_persistent(
tsp: TenantShardPersistence,
intent: IntentState,
@@ -1111,6 +1191,7 @@ impl TenantShard {
error_waiter: Arc::new(SeqWait::new(Sequence::initial())),
last_error: Arc::default(),
pending_compute_notification: false,
delayed_reconcile: false,
scheduling_policy: serde_json::from_str(&tsp.scheduling_policy).unwrap(),
})
}

View File

@@ -507,6 +507,11 @@ class NeonEnvBuilder:
self.pageserver_get_vectored_impl = "vectored"
log.debug('Overriding pageserver get_vectored_impl config to "vectored"')
self.pageserver_get_impl: Optional[str] = None
if os.getenv("PAGESERVER_GET_IMPL", "") == "vectored":
self.pageserver_get_impl = "vectored"
log.debug('Overriding pageserver get_impl config to "vectored"')
assert test_name.startswith(
"test_"
), "Unexpectedly instantiated from outside a test function"
@@ -1078,6 +1083,8 @@ class NeonEnv:
ps_cfg["virtual_file_io_engine"] = self.pageserver_virtual_file_io_engine
if config.pageserver_get_vectored_impl is not None:
ps_cfg["get_vectored_impl"] = config.pageserver_get_vectored_impl
if config.pageserver_get_impl is not None:
ps_cfg["get_impl"] = config.pageserver_get_impl
# Create a corresponding NeonPageserver object
self.pageservers.append(

View File

@@ -17,11 +17,16 @@ from fixtures.types import TenantId, TimelineId
# Test restarting page server, while safekeeper and compute node keep
# running.
def test_local_corruption(neon_env_builder: NeonEnvBuilder):
if neon_env_builder.pageserver_get_impl == "vectored":
reconstruct_function_name = "get_values_reconstruct_data"
else:
reconstruct_function_name = "get_value_reconstruct_data"
env = neon_env_builder.init_start()
env.pageserver.allowed_errors.extend(
[
".*get_value_reconstruct_data for layer .*",
f".*{reconstruct_function_name} for layer .*",
".*could not find data for key.*",
".*is not active. Current state: Broken.*",
".*will not become active. Current state: Broken.*",
@@ -84,7 +89,7 @@ def test_local_corruption(neon_env_builder: NeonEnvBuilder):
# (We don't check layer file contents on startup, when loading the timeline)
#
# This will change when we implement checksums for layers
with pytest.raises(Exception, match="get_value_reconstruct_data for layer ") as err:
with pytest.raises(Exception, match=f"{reconstruct_function_name} for layer ") as err:
pg2.start()
log.info(
f"As expected, compute startup failed for timeline {tenant2}/{timeline2} with corrupt layers: {err}"

View File

@@ -226,6 +226,11 @@ def test_forward_compatibility(
)
try:
# Previous version neon_local and pageserver are not aware
# of the new config.
# TODO: remove this once the code reaches main
neon_env_builder.pageserver_get_impl = None
neon_env_builder.num_safekeepers = 3
neon_local_binpath = neon_env_builder.neon_binpath
env = neon_env_builder.from_repo_dir(

View File

@@ -3,7 +3,7 @@ import re
import time
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, wait_replica_caughtup
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, tenant_get_shards, wait_replica_caughtup
# Check for corrupted WAL messages which might otherwise go unnoticed if
@@ -102,3 +102,80 @@ def test_2_replicas_start(neon_simple_env: NeonEnv):
) as secondary2:
wait_replica_caughtup(primary, secondary1)
wait_replica_caughtup(primary, secondary2)
# We had an issue that a standby server made GetPage requests with an
# old LSN, based on the last-written LSN cache, to avoid waits in the
# pageserver. However, requesting a page with a very old LSN, such
# that the GC horizon has already advanced past it, results in an
# error from the pageserver:
# "Bad request: tried to request a page version that was garbage collected"
#
# To avoid that, the compute<-> pageserver protocol was updated so
# that that the standby now sends two LSNs, the old last-written LSN
# and the current replay LSN.
#
# https://github.com/neondatabase/neon/issues/6211
def test_hot_standby_gc(neon_env_builder: NeonEnvBuilder):
tenant_conf = {
# set PITR interval to be small, so we can do GC
"pitr_interval": "0 s",
}
env = neon_env_builder.init_start(initial_tenant_conf=tenant_conf)
timeline_id = env.initial_timeline
tenant_id = env.initial_tenant
with env.endpoints.create_start(
branch_name="main",
endpoint_id="primary",
) as primary:
with env.endpoints.new_replica_start(
origin=primary,
endpoint_id="secondary",
# Protocol version 2 was introduced to fix the issue
# that this test exercises. With protocol version 1 it
# fails.
config_lines=["neon.protocol_version=2"],
) as secondary:
p_cur = primary.connect().cursor()
p_cur.execute("CREATE EXTENSION neon_test_utils")
p_cur.execute("CREATE TABLE test (id int primary key) WITH (autovacuum_enabled=false)")
p_cur.execute("INSERT INTO test SELECT generate_series(1, 10000) AS g")
wait_replica_caughtup(primary, secondary)
s_cur = secondary.connect().cursor()
s_cur.execute("SELECT 1 WHERE pg_is_in_recovery()")
res = s_cur.fetchone()
assert res is not None
s_cur.execute("SELECT COUNT(*) FROM test")
res = s_cur.fetchone()
assert res[0] == 10000
# Clear the cache in the standby, so that when we
# re-execute the query, it will make GetPage
# requests. This does not clear the last-written LSN cache
# so we still remember the LSNs of the pages.
s_cur.execute("SELECT clear_buffer_cache()")
# Do other stuff on the primary, to advance the WAL
p_cur.execute("CREATE TABLE test2 AS SELECT generate_series(1, 1000000) AS g")
# Run GC. The PITR interval is very small, so this advances the GC cutoff LSN
# very close to the primary's current insert LSN.
shards = tenant_get_shards(env, tenant_id, None)
for tenant_shard_id, pageserver in shards:
client = pageserver.http_client()
client.timeline_checkpoint(tenant_shard_id, timeline_id)
client.timeline_compact(tenant_shard_id, timeline_id)
client.timeline_gc(tenant_shard_id, timeline_id, 0)
# Re-execute the query. The GetPage requests that this
# generates use old not_modified_since LSNs, older than
# the GC cutoff, but new request LSNs. (In protocol
# version 1 there was only one LSN, and this failed.)
s_cur.execute("SELECT COUNT(*) FROM test")
res = s_cur.fetchone()
assert res[0] == 10000

View File

@@ -4,16 +4,21 @@ import threading
import time
from typing import List
from fixtures.neon_fixtures import NeonEnv
from fixtures.neon_fixtures import DEFAULT_BRANCH_NAME, NeonEnvBuilder
from fixtures.utils import query_scalar
def test_local_file_cache_unlink(neon_simple_env: NeonEnv):
env = neon_simple_env
def test_local_file_cache_unlink(neon_env_builder: NeonEnvBuilder, build_type: str):
if build_type == "debug":
# Disable vectored read path cross validation since it makes the test time out.
neon_env_builder.pageserver_config_override = "validate_vectored_get=false"
env = neon_env_builder.init_start()
cache_dir = os.path.join(env.repo_dir, "file_cache")
os.mkdir(cache_dir)
env.neon_cli.create_branch("empty", ancestor_branch_name=DEFAULT_BRANCH_NAME)
env.neon_cli.create_branch("test_local_file_cache_unlink", "empty")
endpoint = env.endpoints.create_start(

View File

@@ -1,3 +1,4 @@
import re
import time
from datetime import datetime, timedelta, timezone
@@ -109,6 +110,11 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
# Test pageserver get_timestamp_of_lsn API
def test_ts_of_lsn_api(neon_env_builder: NeonEnvBuilder):
if neon_env_builder.pageserver_get_impl == "vectored":
key_not_found_error = r".*Requested key.*not found,*"
else:
key_not_found_error = r".*could not find data for key.*"
env = neon_env_builder.init_start()
new_timeline_id = env.neon_cli.create_branch("test_ts_of_lsn_api")
@@ -177,8 +183,8 @@ def test_ts_of_lsn_api(neon_env_builder: NeonEnvBuilder):
raise RuntimeError("there should have been an 'could not find data for key' error")
except PageserverApiException as error:
assert error.status_code == 500
assert str(error).startswith("could not find data for key")
env.pageserver.allowed_errors.append(".*could not find data for key.*")
assert re.match(key_not_found_error, str(error))
env.pageserver.allowed_errors.append(key_not_found_error)
# Probe a bunch of timestamps in the valid range
step_size = 100

View File

@@ -0,0 +1,131 @@
from typing import Optional
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder, tenant_get_shards
from fixtures.types import Lsn
from fixtures.utils import query_scalar
#
# Test on-demand download of the pg_xact SLRUs
#
@pytest.mark.parametrize("shard_count", [None, 4])
def test_ondemand_download_pg_xact(neon_env_builder: NeonEnvBuilder, shard_count: Optional[int]):
if shard_count is not None:
neon_env_builder.num_pageservers = shard_count
tenant_conf = {
"lazy_slru_download": "true",
# set PITR interval to be small, so we can do GC
"pitr_interval": "0 s",
}
env = neon_env_builder.init_start(
initial_tenant_conf=tenant_conf, initial_tenant_shard_count=shard_count
)
timeline_id = env.initial_timeline
tenant_id = env.initial_tenant
endpoint = env.endpoints.create_start("main")
pg_conn = endpoint.connect()
cur = pg_conn.cursor()
cur.execute("CREATE EXTENSION neon_test_utils")
# Create a test table
cur.execute("CREATE TABLE clogtest (id integer)")
cur.execute("INSERT INTO clogtest VALUES (1)")
# Consume a lot of XIDs, to create more pg_xact segments
for _ in range(1000):
cur.execute("select test_consume_xids(10000);")
cur.execute("INSERT INTO clogtest VALUES (2)")
for _ in range(1000):
cur.execute("select test_consume_xids(10000);")
cur.execute("INSERT INTO clogtest VALUES (2)")
for _ in range(1000):
cur.execute("select test_consume_xids(10000);")
cur.execute("INSERT INTO clogtest VALUES (3)")
# Restart postgres. After restart, the new instance will download the
# pg_xact segments lazily.
endpoint.stop()
endpoint.start()
pg_conn = endpoint.connect()
cur = pg_conn.cursor()
# Consume more WAL, so that the pageserver can compact and GC older data,
# including the LSN that we started the new endpoint at,
cur.execute("CREATE TABLE anothertable (i int, t text)")
cur.execute(
"INSERT INTO anothertable SELECT g, 'long string to consume some space' || g FROM generate_series(1, 10000) g"
)
# Run GC
shards = tenant_get_shards(env, tenant_id, None)
for tenant_shard_id, pageserver in shards:
client = pageserver.http_client()
client.timeline_checkpoint(tenant_shard_id, timeline_id)
client.timeline_compact(tenant_shard_id, timeline_id)
client.timeline_gc(tenant_shard_id, timeline_id, 0)
# Test that this can still on-demand download the old pg_xact segments
cur.execute("select xmin, xmax, * from clogtest")
tup = cur.fetchall()
log.info(f"tuples = {tup}")
@pytest.mark.parametrize("shard_count", [None, 4])
def test_ondemand_download_replica(neon_env_builder: NeonEnvBuilder, shard_count: Optional[int]):
if shard_count is not None:
neon_env_builder.num_pageservers = shard_count
tenant_conf = {
"lazy_slru_download": "true",
}
env = neon_env_builder.init_start(
initial_tenant_conf=tenant_conf, initial_tenant_shard_count=shard_count
)
endpoint = env.endpoints.create_start("main")
pg_conn = endpoint.connect()
cur = pg_conn.cursor()
cur.execute("CREATE EXTENSION neon_test_utils")
# Create a test table
cur.execute("CREATE TABLE clogtest (id integer)")
cur.execute("INSERT INTO clogtest VALUES (1)")
# Consume a lot of XIDs, to create more pg_xact segments
for _ in range(1000):
cur.execute("select test_consume_xids(10000);")
# Open a new connection and insert another row, but leave
# the transaction open
pg_conn2 = endpoint.connect()
cur2 = pg_conn2.cursor()
cur2.execute("BEGIN")
cur2.execute("INSERT INTO clogtest VALUES (2)")
# Another insert on the first connection, which is committed.
for _ in range(1000):
cur.execute("select test_consume_xids(10000);")
cur.execute("INSERT INTO clogtest VALUES (3)")
# Start standby at this point in time
lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_insert_lsn()"))
endpoint_at_lsn = env.endpoints.create_start(
branch_name="main", endpoint_id="ep-at-lsn", lsn=lsn
)
# Commit transaction 2, after the standby was launched.
cur2.execute("COMMIT")
# The replica should not see transaction 2 as committed.
conn_replica = endpoint_at_lsn.connect()
cur_replica = conn_replica.cursor()
cur_replica.execute("SELECT * FROM clogtest")
assert cur_replica.fetchall() == [(1,), (3,)]

View File

@@ -18,6 +18,7 @@ from fixtures.remote_storage import s3_storage
def test_pg_regress(
neon_env_builder: NeonEnvBuilder,
test_output_dir: Path,
build_type: str,
pg_bin,
capsys,
base_dir: Path,
@@ -30,6 +31,11 @@ def test_pg_regress(
"""
if shard_count is not None:
neon_env_builder.num_pageservers = shard_count
if build_type == "debug":
# Disable vectored read path cross validation since it makes the test time out.
neon_env_builder.pageserver_config_override = "validate_vectored_get=false"
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
neon_env_builder.enable_scrub_on_exit()
env = neon_env_builder.init_start(initial_tenant_shard_count=shard_count)

View File

@@ -17,7 +17,14 @@ def test_read_validation(neon_simple_env: NeonEnv):
env = neon_simple_env
env.neon_cli.create_branch("test_read_validation", "empty")
endpoint = env.endpoints.create_start("test_read_validation")
endpoint = env.endpoints.create_start(
"test_read_validation",
# Use protocol version 2, because the code that constructs the V1 messages
# assumes that a primary always wants to read the latest version of a page,
# and therefore doesn't work with the test functions below to read an older
# page version.
config_lines=["neon.protocol_version=2"],
)
with closing(endpoint.connect()) as con:
with con.cursor() as c:
@@ -64,7 +71,7 @@ def test_read_validation(neon_simple_env: NeonEnv):
log.info("Cache is clear, reading stale page version")
c.execute(
f"select lsn, lower, upper from page_header(get_raw_page_at_lsn('foo', 'main', 0, '{first[0]}'))"
f"select lsn, lower, upper from page_header(get_raw_page_at_lsn('foo', 'main', 0, '{first[0]}', NULL))"
)
direct_first = c.fetchone()
assert first == direct_first, "Failed fetch page at historic lsn"
@@ -77,7 +84,7 @@ def test_read_validation(neon_simple_env: NeonEnv):
log.info("Cache is clear, reading latest page version without cache")
c.execute(
"select lsn, lower, upper from page_header(get_raw_page_at_lsn('foo', 'main', 0, NULL))"
"select lsn, lower, upper from page_header(get_raw_page_at_lsn('foo', 'main', 0, NULL, NULL))"
)
direct_latest = c.fetchone()
assert second == direct_latest, "Failed fetch page at latest lsn"
@@ -92,7 +99,7 @@ def test_read_validation(neon_simple_env: NeonEnv):
)
c.execute(
f"select lsn, lower, upper from page_header(get_raw_page_at_lsn({reln[0]}, {reln[1]}, {reln[2]}, 0, 0, '{first[0]}'))"
f"select lsn, lower, upper from page_header(get_raw_page_at_lsn({reln[0]}, {reln[1]}, {reln[2]}, 0, 0, '{first[0]}', NULL))"
)
direct_first = c.fetchone()
assert first == direct_first, "Failed fetch page at historic lsn using oid"
@@ -102,7 +109,7 @@ def test_read_validation(neon_simple_env: NeonEnv):
)
c.execute(
f"select lsn, lower, upper from page_header(get_raw_page_at_lsn({reln[0]}, {reln[1]}, {reln[2]}, 0, 0, NULL))"
f"select lsn, lower, upper from page_header(get_raw_page_at_lsn({reln[0]}, {reln[1]}, {reln[2]}, 0, 0, NULL, NULL))"
)
direct_latest = c.fetchone()
assert second == direct_latest, "Failed fetch page at latest lsn"
@@ -114,7 +121,7 @@ def test_read_validation(neon_simple_env: NeonEnv):
)
c.execute(
f"select lsn, lower, upper from page_header(get_raw_page_at_lsn({reln[0]}, {reln[1]}, {reln[2]}, 0, 0, '{first[0]}'))"
f"select lsn, lower, upper from page_header(get_raw_page_at_lsn({reln[0]}, {reln[1]}, {reln[2]}, 0, 0, '{first[0]}', NULL))"
)
direct_first = c.fetchone()
assert first == direct_first, "Failed fetch page at historic lsn using oid"
@@ -133,7 +140,14 @@ def test_read_validation_neg(neon_simple_env: NeonEnv):
env.pageserver.allowed_errors.append(".*invalid LSN\\(0\\) in request.*")
endpoint = env.endpoints.create_start("test_read_validation_neg")
endpoint = env.endpoints.create_start(
"test_read_validation_neg",
# Use protocol version 2, because the code that constructs the V1 messages
# assumes that a primary always wants to read the latest version of a page,
# and therefore doesn't work with the test functions below to read an older
# page version.
config_lines=["neon.protocol_version=2"],
)
with closing(endpoint.connect()) as con:
with con.cursor() as c:
@@ -143,7 +157,7 @@ def test_read_validation_neg(neon_simple_env: NeonEnv):
log.info("read a page of a missing relation")
try:
c.execute(
"select lsn, lower, upper from page_header(get_raw_page_at_lsn('Unknown', 'main', 0, '0/0'))"
"select lsn, lower, upper from page_header(get_raw_page_at_lsn('Unknown', 'main', 0, '0/0', NULL))"
)
raise AssertionError("query should have failed")
except UndefinedTable as e:
@@ -155,7 +169,7 @@ def test_read_validation_neg(neon_simple_env: NeonEnv):
log.info("read a page at lsn 0")
try:
c.execute(
"select lsn, lower, upper from page_header(get_raw_page_at_lsn('foo', 'main', 0, '0/0'))"
"select lsn, lower, upper from page_header(get_raw_page_at_lsn('foo', 'main', 0, '0/0', NULL))"
)
raise AssertionError("query should have failed")
except IoError as e:
@@ -164,22 +178,22 @@ def test_read_validation_neg(neon_simple_env: NeonEnv):
log.info("Pass NULL as an input")
expected = (None, None, None)
c.execute(
"select lsn, lower, upper from page_header(get_raw_page_at_lsn(NULL, 'main', 0, '0/0'))"
"select lsn, lower, upper from page_header(get_raw_page_at_lsn(NULL, 'main', 0, '0/0', NULL))"
)
assert c.fetchone() == expected, "Expected null output"
c.execute(
"select lsn, lower, upper from page_header(get_raw_page_at_lsn('foo', NULL, 0, '0/0'))"
"select lsn, lower, upper from page_header(get_raw_page_at_lsn('foo', NULL, 0, '0/0', NULL))"
)
assert c.fetchone() == expected, "Expected null output"
c.execute(
"select lsn, lower, upper from page_header(get_raw_page_at_lsn('foo', 'main', NULL, '0/0'))"
"select lsn, lower, upper from page_header(get_raw_page_at_lsn('foo', 'main', NULL, '0/0', NULL))"
)
assert c.fetchone() == expected, "Expected null output"
# This check is currently failing, reading beyond EOF is returning a 0-page
log.info("Read beyond EOF")
c.execute(
"select lsn, lower, upper from page_header(get_raw_page_at_lsn('foo', 'main', 1, NULL))"
"select lsn, lower, upper from page_header(get_raw_page_at_lsn('foo', 'main', 1, NULL, NULL))"
)

View File

@@ -173,7 +173,9 @@ def test_vm_bit_clear_on_heap_lock(neon_env_builder: NeonEnvBuilder):
# which changes the LSN on the page.
cur.execute("select get_raw_page( 'vmtest_lock', 'vm', 0 )")
vm_page_in_cache = (cur.fetchall()[0][0])[8:100].hex()
cur.execute("select get_raw_page_at_lsn( 'vmtest_lock', 'vm', 0, pg_current_wal_insert_lsn() )")
cur.execute(
"select get_raw_page_at_lsn( 'vmtest_lock', 'vm', 0, pg_current_wal_insert_lsn(), NULL )"
)
vm_page_at_pageserver = (cur.fetchall()[0][0])[8:100].hex()
assert vm_page_at_pageserver == vm_page_in_cache

View File

@@ -7,7 +7,9 @@ use std::{
io::BufReader,
};
use pageserver_api::models::{PagestreamFeMessage, PagestreamGetPageRequest};
use pageserver_api::models::{
PagestreamFeMessage, PagestreamGetPageRequest, PagestreamProtocolVersion,
};
use utils::id::{ConnectionId, TenantId, TimelineId};
use clap::{Parser, Subcommand};
@@ -56,7 +58,7 @@ fn analyze_trace<R: std::io::Read>(mut reader: R) {
let mut prev: Option<PagestreamGetPageRequest> = None;
// Compute stats
while let Ok(msg) = PagestreamFeMessage::parse(&mut reader) {
while let Ok(msg) = PagestreamFeMessage::parse(&mut reader, PagestreamProtocolVersion::V2) {
match msg {
PagestreamFeMessage::Exists(_) => {}
PagestreamFeMessage::Nblocks(_) => {}
@@ -89,7 +91,7 @@ fn analyze_trace<R: std::io::Read>(mut reader: R) {
}
fn dump_trace<R: std::io::Read>(mut reader: R) {
while let Ok(msg) = PagestreamFeMessage::parse(&mut reader) {
while let Ok(msg) = PagestreamFeMessage::parse(&mut reader, PagestreamProtocolVersion::V2) {
println!("{msg:?}");
}
}