Compare commits

..

12 Commits

Author SHA1 Message Date
Arseny Sher
f164085515 Move sk waiting functions to sk_utils.py
Triggered by one of these is being used now outside test_wal_acceptor.py
2023-08-31 07:58:42 +03:00
Arseny Sher
cc3acd4201 Make test_pageserver_catchup_while_compute_down exercise fastpath.
Fastpath here means avoiding sync-safekeepers on compute start. This discovers a
bug which we had with not (re)activing timeline on safekeeper restart, fixed by
87f7d6bce3.
2023-08-30 14:14:11 +03:00
Joonas Koivunen
05773708d3 fix: add context for ancestor lsn wait (#5143)
In logs it is confusing to see seqwait timeouts which seemingly arise
from the branched lsn but actually are about the ancestor, leading to
questions like "has the last_record_lsn went back".

Noticed by @problame.
2023-08-30 12:21:41 +03:00
John Spray
382473d9a5 docs: add RFC for remote storage generation numbers (#4919)
## Summary

A scheme of logical "generation numbers" for pageservers and their
attachments is proposed, along with
changes to the remote storage format to include these generation numbers
in S3 keys.

Using the control plane as the issuer of these generation numbers
enables strong anti-split-brain
properties in the pageserver cluster without implementing a consensus
mechanism directly
in the pageservers.

## Motivation

Currently, the pageserver's remote storage format does not provide a
mechanism for addressing
split brain conditions that may happen when replacing a node during
failover or when migrating
a tenant from one pageserver to another. From a remote storage
perspective, a split brain condition
occurs whenever two nodes both think they have the same tenant attached,
and both can write to S3. This
can happen in the case of a network partition, pathologically long
delays (e.g. suspended VM), or software
bugs.

This blocks robust implementation of failover from unresponsive
pageservers, due to the risk that
the unresponsive pageserver is still writing to S3.

---------

Co-authored-by: Christian Schwarz <christian@neon.tech>
Co-authored-by: Arpad Müller <arpad-m@users.noreply.github.com>
Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
2023-08-30 09:49:55 +01:00
Arpad Müller
eb0a698adc Make page cache and read_blk async (#5023)
## Problem

`read_blk` does I/O and thus we would like to make it async. We can't
make the function async as long as the `PageReadGuard` returned by
`read_blk` isn't `Send`. The page cache is called by `read_blk`, and
thus it can't be async without `read_blk` being async. Thus, we have a
circular dependency.

## Summary of changes

Due to the circular dependency, we convert both the page cache and
`read_blk` to async at the same time:

We make the page cache use `tokio::sync` synchronization primitives as
those are `Send`. This makes all the places that acquire a lock require
async though, which we then also do. This includes also asyncification
of the `read_blk` function.

Builds upon #4994, #5015, #5056, and #5129.

Part of #4743.
2023-08-30 09:04:31 +02:00
Arseny Sher
81b6578c44 Allow walsender in recovery mode give WAL till dynamic flush_lsn.
Instead of fixed during the start of replication. To this end, create
term_flush_lsn watch channel similar to commit_lsn one. This allows to continue
recovery streaming if new data appears.
2023-08-29 23:19:40 +03:00
Arseny Sher
bc49c73fee Move wal_stream_connection_config to utils.
It will be used by safekeeper as well.
2023-08-29 23:19:40 +03:00
Arseny Sher
e98580b092 Add term and http endpoint to broker messaged SkTimelineInfo.
We need them for safekeeper peer recovery
https://github.com/neondatabase/neon/pull/4875
2023-08-29 23:19:40 +03:00
Arseny Sher
804ef23043 Rename TermSwitchEntry to TermLsn.
Add derive Ord for easy comparison of <term, lsn> pairs.

part of https://github.com/neondatabase/neon/pull/4875
2023-08-29 23:19:40 +03:00
Arseny Sher
87f7d6bce3 Start and stop per timeline recovery task.
Slightly refactors init: now load_tenant_timelines is also async to properly
init the timeline, but to keep global map lock sync we just acquire it anew for
each timeline.

Recovery task itself is just a stub here.

part of
https://github.com/neondatabase/neon/pull/4875
2023-08-29 23:19:40 +03:00
Arseny Sher
39e3fbbeb0 Add safekeeper peers to TimelineInfo.
Now available under GET /tenant/xxx/timeline/yyy for inspection.
2023-08-29 23:19:40 +03:00
Em Sharnoff
8d2a4aa5f8 vm-monitor: Add flag for when file cache on disk (#5130)
Part 1 of 2, for moving the file cache onto disk.

Because VMs are created by the control plane (and that's where the
filesystem for the file cache is defined), we can't rely on any kind of
synchronization between releases, so the change needs to be
feature-gated (kind of), with the default remaining the same for now.

See also: neondatabase/cloud#6593
2023-08-29 12:44:48 -07:00
39 changed files with 1562 additions and 317 deletions

1
Cargo.lock generated
View File

@@ -5014,6 +5014,7 @@ dependencies = [
"nix 0.26.2",
"once_cell",
"pin-project-lite",
"postgres_connection",
"pq_proto",
"rand",
"regex",

View File

@@ -50,8 +50,6 @@ RUN cd postgres && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/refint.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/xml2.control
RUN for file in /usr/local/pgsql/share/extension/pg_stat_statements--*.sql ; do echo 'GRANT EXECUTE ON FUNCTION pg_stat_statements_reset() TO PUBLIC;' >> $file ; done
#########################################################################################
#
# Layer "postgis-build"

View File

@@ -281,6 +281,7 @@ fn main() -> Result<()> {
let vm_monitor_addr = matches.get_one::<String>("vm-monitor-addr");
let file_cache_connstr = matches.get_one::<String>("filecache-connstr");
let cgroup = matches.get_one::<String>("cgroup");
let file_cache_on_disk = matches.get_flag("file-cache-on-disk");
// Only make a runtime if we need to.
// Note: it seems like you can make a runtime in an inner scope and
@@ -313,6 +314,7 @@ fn main() -> Result<()> {
cgroup: cgroup.cloned(),
pgconnstr: file_cache_connstr.cloned(),
addr: vm_monitor_addr.cloned().unwrap(),
file_cache_on_disk,
})),
token.clone(),
))
@@ -483,6 +485,11 @@ fn cli() -> clap::Command {
)
.value_name("FILECACHE_CONNSTR"),
)
.arg(
Arg::new("file-cache-on-disk")
.long("file-cache-on-disk")
.action(clap::ArgAction::SetTrue),
)
}
#[test]

View File

@@ -0,0 +1,957 @@
# Pageserver: split-brain safety for remote storage through generation numbers
## Summary
A scheme of logical "generation numbers" for tenant attachment to pageservers is proposed, along with
changes to the remote storage format to include these generation numbers in S3 keys.
Using the control plane as the issuer of these generation numbers enables strong anti-split-brain
properties in the pageserver cluster without implementing a consensus mechanism directly
in the pageservers.
## Motivation
Currently, the pageserver's remote storage format does not provide a mechanism for addressing
split brain conditions that may happen when replacing a node or when migrating
a tenant from one pageserver to another.
From a remote storage perspective, a split brain condition occurs whenever two nodes both think
they have the same tenant attached, and both can write to S3. This can happen in the case of a
network partition, pathologically long delays (e.g. suspended VM), or software bugs.
In the current deployment model, control plane guarantees that a tenant is attached to one
pageserver at a time, thereby ruling out split-brain conditions resulting from dual
attachment (however, there is always the risk of a control plane bug). This control
plane guarantee prevents robust response to failures, as if a pageserver is unresponsive
we may not detach from it. The mechanism in this RFC fixes this, by making it safe to
attach to a new, different pageserver even if an unresponsive pageserver may be running.
Futher, lack of safety during split-brain conditions blocks two important features where occasional
split-brain conditions are part of the design assumptions:
- seamless tenant migration ([RFC PR](https://github.com/neondatabase/neon/pull/5029))
- automatic pageserver instance failure handling (aka "failover") (RFC TBD)
### Prior art
- 020-pageserver-s3-coordination.md
- 023-the-state-of-pageserver-tenant-relocation.md
- 026-pageserver-s3-mvcc.md
This RFC has broad similarities to the proposal to implement a MVCC scheme in
S3 object names, but this RFC avoids a general purpose transaction scheme in
favour of more specialized "generations" that work like a transaction ID that
always has the same lifetime as a pageserver process or tenant attachment, whichever
is shorter.
## Requirements
- Accommodate storage backends with no atomic or fencing capability (i.e. work within
S3's limitation that there are no atomics and clients can't be fenced)
- Don't depend on any STONITH or node fencing in the compute layer (i.e. we will not
assume that we can reliably kill and EC2 instance and have it die)
- Scoped per-tenant, not per-pageserver; for _seamless tenant migration_, we need
per-tenant granularity, and for _failover_, we likely want to spread the workload
of the failed pageserver instance to a number of peers, rather than monolithically
moving the entire workload to another machine.
We do not rule out the latter case, but should not constrain ourselves to it.
## Design Tenets
These are not requirements, but are ideas that guide the following design:
- Avoid implementing another consensus system: we already have a strongly consistent
database in the control plane that can do atomic operations where needed, and we also
have a Paxos implementation in the safekeeper.
- Avoiding locking in to specific models of how failover will work (e.g. do not assume that
all the tenants on a pageserver will fail over as a unit).
- Be strictly correct when it comes to data integrity. Occasional failures of availability
are tolerable, occasional data loss is not.
## Non Goals
The changes in this RFC intentionally isolate the design decision of how to define
logical generations numbers and object storage format in a way that is somewhat flexible with
respect to how actual orchestration of failover works.
This RFC intentionally does not cover:
- Failure detection
- Orchestration of failover
- Standby modes to keep data ready for fast migration
- Intentional multi-writer operation on tenants (multi-writer scenarios are assumed to be transient split-brain situations).
- Sharding.
The interaction between this RFC and those features is discussed in [Appendix B](#appendix-b-interoperability-with-other-features)
## Impacted Components
pageserver, control plane, safekeeper (optional)
## Implementation Part 1: Correctness
### Summary
- A per-tenant **generation number** is introduced to uniquely identifying tenant attachments to pageserver processes.
- This generation number increments each time the control plane modifies a tenant (`Project`)'s assigned pageserver, or when the assigned pageserver restarts.
- the control plane is the authority for generation numbers: only it may
increment a generation number.
- **Object keys are suffixed** with the generation number
- **Safety for multiply-attached tenants** is provided by the
generation number in the object key: the competing pageservers will not
try to write to the same keys.
- **Safety in split brain for multiple nodes running with
the same node ID** is provided by the pageserver calling out to the control plane
on startup, to re-attach and thereby increment the generations of any attached tenants
- **Safety for deletions** is achieved by deferring the DELETE from S3 to a point in time where the deleting node has validated with control plane that no attachment with a higher generation has a reference to the to-be-DELETEd key.
- **The control plane is used to issue generation numbers** to avoid the need for
a built-in consensus system in the pageserver, although this could in principle
be changed without changing the storage format.
### Generation numbers
A generation number is associated with each tenant in the control plane,
and each time the attachment status of the tenant changes, this is incremented.
Changes in attachment status include:
- Attaching the tenant to a different pageserver
- A pageserver restarting, and "re-attaching" its tenants on startup
These increments of attachment generation provide invariants we need to avoid
split-brain issues in storage:
- If two pageservers have the same tenant attached, the attachments are guaranteed to have different generation numbers, because the generation would increment
while attaching the second one.
- If there are multiple pageservers running with the same node ID, all the attachments on all pageservers are guaranteed to have different generation numbers, because the generation would increment
when the second node started and re-attached its tenants.
As long as the infrastructure does not transparently replace an underlying
physical machine, we are totally safe. See the later [unsafe case](#unsafe-case-on-badly-behaved-infrastructure) section for details.
### Object Key Changes
#### Generation suffix
All object keys (layer objects and index objects) will contain the attachment
generation as a [suffix](#why-a-generation-suffix-rather-than-prefix).
This suffix is the primary mechanism for protecting against split-brain situations, and
enabling safe multi-attachment of tenants:
- Two pageservers running with the same node ID (e.g. after a failure, where there is
some rogue pageserver still running) will not try to write to the same objects, because at startup they will have re-attached tenants and thereby incremented
generation numbers.
- Multiple attachments (to different pageservers) of the same tenant will not try to write to the same objects, as each attachment would have a distinct generation.
The generation is appended in hex format (8 byte string representing
u32), to all our existing key names. A u32's range limit would permit
27 restarts _per second_ over a 5 year system lifetime: orders of magnitude more than
is realistic.
The exact meaning of the generation suffix can evolve over time if necessary, for
example if we chose to implement a failover mechanism internally to the pageservers
rather than going via the control plane. The storage format just sees it as a number,
with the only semantic property being that the highest numbered index is the latest.
#### Index changes
Since object keys now include a generation suffix, the index of these keys must also be updated. IndexPart currently stores keys and LSNs sufficient to reconstruct key names: this would be extended to store the generation as well.
This will increase the size of the file, but only modestly: layers are already encoded as
their string-ized form, so the overhead is about 10 bytes per layer. This will be less if/when
the index storage format is migrated to a binary format from JSON.
#### Visibility
_This section doesn't describe code changes, but extends on the consequences of the
object key changes given above_
##### Visibility of objects to pageservers
Pageservers can of course list objects in S3 at any time, but in practice their
visible set is based on the contents of their LayerMap, which is initialized
from the `index_part.json.???` that they load.
Starting with the `index_part` from the most recent previous generation
(see [loading index_part](#finding-the-remote-indices-for-timelines)), a pageserver
initially has visibility of all the objects that were referenced in the loaded index.
These objects are guaranteed to remain visible until the current generation is
superseded, via pageservers in older generations avoiding deletions (see [deletion](#deletion)).
The "most recent previous generation" is _not_ necessarily the most recent
in terms of walltime, it is the one that is readable at the time a new generation
starts. Consider the following sequence of a tenant being re-attached to different
pageserver nodes:
- Create + attach on PS1 in generation 1
- PS1 Do some work, write out index_part.json-0001
- Attach to PS2 in generation 2
- Read index_part.json-0001
- PS2 starts doing some work...
- Attach to PS3 in generation 3
- Read index_part.json-0001
- **...PS2 finishes its work: now it writes index_part.json-0002**
- PS3 writes out index_part.json-0003
In the above sequence, the ancestry of indices is:
```
0001 -> 0002
|
-> 0003
```
This is not an issue for safety: if the 0002 references some object that is
not in 0001, then 0003 simply does not see it, and will re-do whatever
work was required (e.g. ingesting WAL or doing compaction). Objects referenced
by only the 0002 index will never be read by future attachment generations, and
will eventually be cleaned up by a scrub (see [scrubbing](#cleaning-up-orphan-objects-scrubbing)).
##### Visibility of LSNs to clients
Because index_part.json is now written with a generation suffix, which data
is visible depends on which generation the reader is operating in:
- If one was passively reading from S3 from outside of a pageserver, the
visibility of data would depend on which index_part.json-<generation> file
one had chosen to read from.
- If two pageservers have the same tenant attached, they may have different
data visible as they're independently replaying the WAL, and maintaining
independent LayerMaps that are written to independent index_part.json files.
Data does not have to be remotely committed to be visible.
- For a pageserver writing with a stale generation, historic LSNs
remain readable until another pageserver (with a higher generation suffix)
decides to execute GC deletions. At this point, we may think of the stale
attachment's generation as having logically ended: during its existence
the generation had a consistent view of the world.
- For a newly attached pageserver, its highest visible LSN may appears to
go backwards with respect to an earlier attachment, if that earlier
attachment had not uploaded all data to S3 before the new attachment.
### Deletion
#### Generation number validation
While writes are de-conflicted by writers always using their own generation number in the key,
deletions are slightly more challenging: if a pageserver A is isolated, and the true active node is
pageserver B, then it is dangerous for A to do any object deletions, even of objects that it wrote
itself, because pageserver's B metadata might reference those objects.
We solve this by inserting a "generation validation" step between the write of a remote index
that un-links a particular object from the index, and the actual deletion of the object, such
that deletions strictly obey the following ordering:
1. Write out index_part.json: this guarantees that any subsequent reader of the metadata will
not try and read the object we unlinked.
2. Call out to control plane to validate that the generation which we use for our attachment is still the latest.
3. If step 2 passes, it is safe to delete the object. Why? The check-in with control plane
together with our visibility rules guarantees that any later generation
will use either the exact `index_part.json` that we uploaded in step 1, or a successor
of it; not an earlier one. In both cases, the `index_part.json` doesn't reference the
key we are deleting anymore, so, the key is invisible to any later attachment generation.
Hence it's safe to delete it.
Note that at step 2 we are only confirming that deletions of objects _no longer referenced
by the specific `index_part.json` written in step 1_ are safe. If we were attempting other deletions concurrently,
these would need their own generation validation step.
If step 2 fails, we may leak the object. This is safe, but has a cost: see [scrubbing](#cleaning-up-orphan-objects-scrubbing). We may avoid this entirely outside of node
failures, if we do proper flushing of deletions on clean shutdown and clean migration.
To avoid doing a huge number of control plane requests to perform generation validation,
validation of many tenants will be done in a single request, and deletions will be queued up
prior to validation: see [Persistent deletion queue](#persistent-deletion-queue) for more.
#### `remote_consistent_lsn` updates
Remote objects are not the only kind of deletion the pageserver does: it also indirectly deletes
WAL data, by feeding back remote_consistent_lsn to safekeepers, as a signal to the safekeepers that
they may drop data below this LSN.
For the same reasons that deletion of objects must be guarded by an attachment generation number
validation step, updates to `remote_consistent_lsn` are subject to the same rules, using
an ordering as follows:
1. upload the index_part that covers data up to LSN `L0` to S3
2. Call out to control plane to validate that the generation which we use for our attachment is still the latest.
3. advance the `remote_consistent_lsn` that we advertise to the safekeepers to `L0`
If step 2 fails, then the `remote_consistent_lsn` advertised
to safekeepers will not advance again until a pageserver
with the latest generation is ready to do so.
**Note:** at step 3 we are not advertising the _latest_ remote_consistent_lsn, we are
advertising the value in the index_part that we uploaded in step 1. This provides
a strong ordering guarantee.
Internally to the pageserver, each timeline will have two remote_consistent_lsn values: the one that
reflects its latest write to remote storage, and the one that reflects the most
recent validation of generation number. It is only the latter value that may
be advertised to the outside world (i.e. to the safekeeper).
The control plane remains unaware of `remote_consistent_lsn`: it only has to validate
the freshness of generation numbers, thereby granting the pageserver permission to
share the information with the safekeeper.
For convenience, in subsequent sections and RFCs we will use "deletion" to mean both deletion
of objects in S3, and updates to the `remote_consistent_lsn`, as updates to the remote consistent
LSN are de-facto deletions done via the safekeeper, and both kinds of deletion are subject to
the same generation validation requirement.
### Pageserver attach/startup changes
#### Attachment
Calls to `/v1/tenant/{tenant_id}/attach` are augmented with an additional
`generation` field in the body.
The pageserver does not persist this: a generation is only good for the lifetime
of a process.
#### Finding the remote indices for timelines
Because index files are now suffixed with generation numbers, the pageserver
cannot always GET the remote index in one request, because it can't always
know a-priori what the latest remote index is.
Typically, the most recent generation to write an index would be our own
generation minus 1. However, this might not be the case: the previous
node might have started and acquired a generation number, and then crashed
before writing out a remote index.
In the general case and as a fallback, the pageserver may list all the `index_part.json`
files for a timeline, sort them by generation, and pick the highest that is `<=`
its current generation for this attachment. The tenant should never load an index
with an attachment generation _newer_ than its own.
These two rules combined ensure that objects written by later generations are never visible to earlier generations.
Note that if a given attachment picks an index part from an earlier generation (say n-2), but crashes & restarts before it writes its own generation's index part, next time it tries to pick an index part there may be an index part from generation n-1.
It would pick the n-1 index part in that case, because it's sorted higher than the previous one from generation n-2.
So, above rules guarantee no determinism in selecting the index part.
are allowed to be attached with stale attachment generations during a multiply-attached
phase in a migration, and in this instance if the old location's pageserver restarts,
it should not try and load the newer generation's index.
To summarize, on starting a timeline, the pageserver will:
1. Issue a GET for index_part.json-<my generation - 1>
2. If 1 failed, issue a ListObjectsv2 request for index_part.json\* and
pick the newest.
One could optimize this further by using the control plane to record specifically
which generation most recently wrote an index_part.json, if necessary, to increase
the probability of finding the index_part.json in one GET. One could also improve
the chances by having pageservers proactively write out index_part.json after they
get a new generation ID.
#### Re-attachment on startup
On startup, the pageserver will call out to an new control plane `/re-attach`
API (see [Generation API](#generation-api)). This returns a list of
tenants that should be attached to the pageserver, and their generation numbers, which
the control plane will increment before returning.
The pageserver should still scan its local disk on startup, but should _delete_
any local content for tenants not indicated in the `/re-attach` response: their
absence is an implicit detach operation.
**Note** if a tenant is omitted from the re-attach response, its local disk content
will be deleted. This will change in subsequent work, when the control plane gains
the concept of a secondary/standby location: a node with local content may revert
to this status and retain some local content.
#### Cleaning up previous generations' remote indices
Deletion of old indices is not necessary for correctness, although it is necessary
to avoid the ListObjects fallback in the previous section becoming ever more expensive.
Once the new attachment has written out its index_part.json, it may asynchronously clean up historic index_part.json
objects that were found.
We may choose to implement this deletion either as an explicit step after we
write out index_part for the first time in a pageserver's lifetime, or for
simplicity just do it periodically as part of the background scrub (see [scrubbing](#cleaning-up-orphan-objects-scrubbing));
### Control Plane Changes
#### Store generations for attaching tenants
- The `Project` table must store the generation number for use when
attaching the tenant to a new pageserver.
- The `/v1/tenant/:tenant_id/attach` pageserver API will require the generation number,
which the control plane can supply by simply incrementing the `Project`'s
generation number each time the tenant is attached to a different server: the same database
transaction that changes the assigned pageserver should also change the generation number.
#### Generation API
This section describes an API that could be provided directly by the control plane,
or built as a separate microservice. In earlier parts of the RFC, when we
discuss the control plane providing generation numbers, we are referring to this API.
The API endpoints used by the pageserver to acquire and validate generation
numbers are quite simple, and only require access to some persistent and
linerizable storage (such as a database).
Building this into the control plane is proposed as a least-effort option to exploit existing infrastructure and implement generation number issuance in the same transaction that mandates it (i.e., the transaction that updates the `Project` assignment to another pageserver).
However, this is not mandatory: this "Generation Number Issuer" could
be built as a microservice. In practice, we will write such a miniature service
anyway, to enable E2E pageserver/compute testing without control plane.
The endpoints required by pageservers are:
##### `/re-attach`
- Request: `{node_id: <u32>}`
- Response:
- 200 `{tenants: [{id: <TenantId>, gen: <u32>}]}`
- 404: unknown node_id
- (Future: 429: flapping detected, perhaps nodes are fighting for the same node ID,
or perhaps this node was in a retry loop)
- (On unknown tenants, omit tenant from `tenants` array)
- Server behavior: query database for which tenants should be attached to this pageserver.
- for each tenant that should be attached, increment the attachment generation and
include the new generation in the response
- Client behavior:
- for all tenants in the response, activate with the new generation number
- for any local disk content _not_ referenced in the response, act as if we
had been asked to detach it (i.e. delete local files)
**Note** the `node_id` in this request will change in future if we move to ephemeral
node IDs, to be replaced with some correlation ID that helps the control plane realize
if a process is running with the same storage as a previous pageserver process (e.g.
we might use EC instance ID, or we might just write some UUID to the disk the first
time we use it)
##### `/validate`
- Request: `{'tenants': [{tenant: <tenant id>, attach_gen: <gen>}, ...]}'`
- Response:
- 200 `{'tenants': [{tenant: <tenant id>, status: <bool>}...]}`
- (On unknown tenants, omit tenant from `tenants` array)
- Purpose: enable the pageserver to discover for the given attachments whether they are still the latest.
- Server behavior: this is a read-only operation: simply compare the generations in the request with
the generations known to the server, and set status to `true` if they match.
- Client behavior: clients must not do deletions within a tenant's remote data until they have
received a response indicating the generation they hold for the attachment is current.
#### Use of `/load` and `/ignore` APIs
Because the pageserver will be changed to only attach tenants on startup
based on the control plane's response to a `/re-attach` request, the load/ignore
APIs no longer make sense in their current form.
The `/load` API becomes functionally equivalent to attach, and will be removed:
any location that used `/load` before should just attach instead.
The `/ignore` API is equivalent to detaching, but without deleting local files.
### Timeline/Branch creation & deletion
All of the previous arguments for safety have described operations within
a timeline, where we may describe a sequence that includes updates to
index_part.json, and where reads and writes are coming from a postgres
endpoint (writes via the safekeeper).
Creating or destroying timeline is a bit different, because writes
are coming from the control plane.
We must be safe against scenarios such as:
- A tenant is attached to pageserver B while pageserver A is
in the middle of servicing an RPC from the control plane to
create or delete a tenant.
- A pageserver A has been sent a timeline creation request
but becomes unresponsive. The tenant is attached to a
different pageserver B, and the timeline creation request
is sent there too.
#### Timeline Creation
If some very slow node tries to do a timeline creation _after_
a more recent generation node has already created the timeline
and written some data into it, that must not cause harm. This
is provided in timeline creations by the way all the objects
within the timeline's remote path include a generation suffix:
a slow node in an old generation that attempts to "create" a timeline
that already exists will just emit an index_part.json with
an old generation suffix.
Timeline IDs are never reused, so we don't have
to worry about the case of create/delete/create cycles. If they
were re-used during a disaster recovery "un-delete" of a timeline,
that special case can be handled by calling out to all available pageservers
to check that they return 404 for the timeline, and to flush their
deletion queues in case they had any deletions pending from the
timeline.
The above makes it safe for control plane to change the assignment of
tenant to pageserver in control plane while a timeline creation is ongoing.
The reason is that the creation request against the new assigned pageserver
uses a new generation number. However, care must be taken by control plane
to ensure that a "timeline creation successul" response from some pageserver
is checked for the pageserver's generation for that timeline's tenant still being the latest.
If it is not the latest, the response does not constitute a successful timeline creation.
It is acceptable to discard such responses, the scrubber will clean up the S3 state.
It is better to issue a timelien deletion request to the stale attachment.
#### Timeline Deletion
Tenant/timeline deletion operations are exempt from generation validation
on deletes, and therefore don't have to go through the same deletion
queue as GC/compaction layer deletions. This is because once a
delete is issued by the control plane, it is a promise that the
control plane will keep trying until the deletion is done, so even stale
pageservers are permitted to go ahead and delete the objects.
The implications of this for control plane are:
- During timeline/tenant deletion, the control plane must wait for the deletion to
be truly complete (status 404) and also handle the case where the pageserver
becomes unavailable, either by waiting for a replacement with the same node_id,
or by *re-attaching the tenant elsewhere.
- The control plane must persist its intent to delete
a timeline/tenant before issuing any RPCs, and then once it starts, it must
keep retrying until the tenant/timeline is gone. This is already handled
by using a persistent `Operation` record that is retried indefinitely.
Timeline deletion may result in a special kind of object leak, where
the latest generation attachment completes a deletion (including erasing
all objects in the timeline path), but some slow/partitioned node is
writing into the timeline path with a stale generation number. This would
not be caught by any per-timeline scrubbing (see [scrubbing](#cleaning-up-orphan-objects-scrubbing)), since scrubbing happens on the
attached pageserver, and once the timeline is deleted it isn't attached anywhere.
This scenario should be pretty rare, and the control plane can make it even
rarer by ensuring that if a tenant is in a multi-attached state (e.g. during
migration), we wait for that to complete before processing the deletion. Beyond
that, we may implement some other top-level scrub of timelines in
an external tool, to identify any tenant/timeline paths that are not found
in the control plane database.
#### Examples
- Deletion, node restarts partway through:
- By the time we returned 202, we have written a remote delete marker
- Any subsequent incarnation of the same node_id will see the remote
delete marker and continue to process the deletion
- If the original pageserver is lost permanently and no replacement
with the same node_id is available, then the control plane must recover
by re-attaching the tenant to a different node.
- Creation, node becomes unresponsive partway through.
- Control plane will see HTTP request timeout, keep re-issuing
request to whoever is the latest attachment point for the tenant
until it succeeds.
- Stale nodes may be trying to execute timeline creation: they will
write out index_part.json files with
stale attachment generation: these will be eventually cleaned up
by the same mechanism as other old indices.
### Unsafe case on badly behaved infrastructure
This section is only relevant if running on a different environment
than EC2 machines with ephemeral disks.
If we ever run pageservers on infrastructure that might transparently restart
a pageserver while leaving an old process running (e.g. a VM gets rescheduled
without the old one being fenced), then there is a risk of corruption, when
the control plane attaches the tenant, as follows:
- If the control plane sends an `/attach` request to node A, then node A dies
and is replaced, and the control plane's retries the request without
incrementing that attachment ID, then it could end up with two physical nodes
both using the same generation number.
- This is not an issue when using EC2 instances with ephemeral storage, as long
as the control plane never re-uses a node ID, but it would need re-examining
if running on different infrastructure.
- To robustly protect against this class of issue, we would either:
- add a "node generation" to distinguish between different processes holding the
same node_id.
- or, dispense with static node_id entirely and issue an ephemeral ID to each
pageserver process when it starts.
## Implementation Part 2: Optimizations
### Persistent deletion queue
Between writing our a new index_part.json that doesn't reference an object,
and executing the deletion, an object passes through a window where it is
only referenced in memory, and could be leaked if the pageserver is stopped
uncleanly. That introduces conflicting incentives: on the one hand, we would
like to delay and batch deletions to
1. minimize the cost of the mandatory validations calls to control plane, and
2. minimize cost for DeleteObjects requests.
On the other hand we would also like to minimize leakage by executing
deletions promptly.
To resolve this, we may make the deletion queue persistent
and then executing these in the background at a later time.
_Note: The deletion queue's reason for existence is optimization rather than correctness,
so there is a lot of flexibility in exactly how the it should work,
as long as it obeys the rule to validate generations before executing deletions,
so the following details are not essential to the overall RFC._
#### Scope
The deletion queue will be global per pageserver, not per-tenant. There
are several reasons for this choice:
- Use the queue as a central point to coalesce validation requests to the
control plane: this avoids individual `Timeline` objects ever touching
the control plane API, and avoids them having to know the rules about
validating deletions. This separation of concerns will avoid burdening
the already many-LoC `Timeline` type with even more responsibility.
- Decouple the deletion queue from Tenant attachment lifetime: we may
"hibernate" an inactive tenant by tearing down its `Tenant`/`Timeline`
objects in the pageserver, without having to wait for deletions to be done.
- Amortize the cost of I/O for the persistent queue, instead of having many
tiny queues.
- Coalesce deletions into a smaller number of larger DeleteObjects calls
Because of the cost of doing I/O for persistence, and the desire to coalesce
generation validation requests across tenants, and coalesce deletions into
larger DeleteObjects requests, there will be one deletion queue per pageserver
rather than one per tenant. This has the added benefit that when deactivating
a tenant, we do not have to drain their deletion queue: deletions can proceed
for a tenant whose main `Tenant` object has been torn down.
#### Flow of deletion
The flow of a deletion is becomes:
1. Need for deletion of an object (=> layer file) is identified.
2. Unlink the object from all the places that reference it (=> `index_part.json`).
3. Enqueue the deletion to a persistent queue.
Each entry is `tenant_id, attachment_generation, S3 key`.
4. Validate & execute in batches:
4.1 For a batch of entries, call into control plane.
4.2 For the subset of entries that passed validation, execute a `DeleteObjects` S3 DELETE request for their S3 keys.
As outlined in the Part 1 on correctness, it is critical that deletions are only
executed once the key is not referenced anywhere in S3.
This property is obviously upheld by the scheme above.
#### We Accept Object Leakage In Acceptable Circumcstances
If we crash in the flow above between (2) and (3), we lose track of unreferenced object.
Further, enqueuing a single to the persistent queue may not be durable immediately to amortize cost of flush to disk.
This is acceptable for now, it can be caught by [the scrubber](#cleaning-up-orphan-objects-scrubbing).
There are various measures we can take to improve this in the future.
1. Cap amount of time until enqueued entry becomes durable (timeout for flush-to-tisk)
2. Proactively flush:
- On graceful shutdown, as we anticipate that some or
all of our attachments may be re-assigned while we are offline.
- On tenant detach.
3. For each entry, keep track of whether it has passed (2).
Only admit entries to (4) one they have passed (2).
This requires re-writing / two queue entries (intent, commit) per deletion.
The important take-away with any of the above is that it's not
disastrous to leak objects in exceptional circumstances.
#### Operations that may skip the queue
Deletions of an entire timeline are [exempt](#Timeline-Deletion) from generation number validation. Once the
control plane sends the deletion request, there is no requirement to retain the readability
of any data within the timeline, and all objects within the timeline path may be deleted
at any time from the control plane's deletion request onwards.
Since deletions of smaller timelines won't have enough objects to compose a full sized
DeleteObjects request, it is still useful to send these through the last part of the
deletion pipeline to coalesce with other executing deletions: to enable this, the
deletion queue should expose two input channels: one for deletions that must be
processed in a generation-aware way, and a fast path for timeline deletions, where
that fast path may skip validation and the persistent queue.
### Cleaning up orphan objects (scrubbing)
An orphan object is any object which is no longer referenced by a running node or by metadata.
Examples of how orphan objects arise:
- A node PUTs a layer object, then crashes before it writes the
index_part.json that references that layer.
- A stale node carries on running for some time, and writes out an unbounded number of
objects while it believes itself to be the rightful writer for a tenant.
- A pageserver crashes between un-linking an object from the index, and persisting
the object to its deletion queue.
Orphan objects are functionally harmless, but have a small cost due to S3 capacity consumed. We
may clean them up at some time in the future, but doing a ListObjectsv2 operation and cross
referencing with the latest metadata to identify objects which are not referenced.
Scrubbing will be done only by an attached pageserver (not some third party process), and deletions requested during scrub will go through the same
validation as all other deletions: the attachment generation must be
fresh. This avoids the possibility of a stale pageserver incorrectly
thinking than an object written by a newer generation is stale, and deleting
it.
It is not strictly necessary that scrubbing be done by an attached
pageserver: it could also be done externally. However, an external
scrubber would still require the same validation procedure that
a pageserver's deletion queue performs, before actually erasing
objects.
## Operational impact
### Availability
Coordination of generation numbers via the control plane introduce a dependency for certain
operations:
1. Starting new pageservers (or activating pageservers after a restart)
2. Executing enqueued deletions
3. Advertising updated `remote_consistent_lsn` to enable WAL trimming
Item 1. would mean that some in-place restarts that previously would have resumed service even if the control plane were
unavailable, will now not resume service to users until the control plane is available. We could
avoid this by having a timeout on communication with the control plane, and after some timeout,
resume service with the previous generation numbers (assuming this was persisted to disk). However,
this is unlikely to be needed as the control plane is already an essential & highly available component. Also, having a node re-use an old generation number would complicate
reasoning about the system, as it would break the invariant that a generation number uniquely identifies
a tenant's attachment to a given pageserver _process_: it would merely identify the tenant's attachment
to the pageserver _machine_ or its _on-disk-state_.
Item 2. is a non-issue operationally: it's harmless to delay deletions, the only impact of objects pending deletion is
the S3 capacity cost.
Item 3. could be an issue if safekeepers are low on disk space and the control plane is unavailable for a long time. If this became an issue,
we could adjust the safekeeper to delete segments from local disk sooner, as soon as they're uploaded to S3, rather than waiting for
remote_consistent_lsn to advance.
For a managed service, the general approach should be to make sure we are monitoring & respond fast enough
that control plane outages are bounded in time.
There is also the fact that control plane runs in a single region.
The latency for distant regions is not a big concern for us because all request types added by this RFC are either infrequent or not in the way of the data path.
However, we lose region isolation for the operations listed above.
The ongoing work to split console and control will give us per-region control plane, and all operations in this RFC can be handled by these per-region control planes.
With that in mind, we accept the trade-offs outlined in this paragraph.
We will also implement an "escape hatch" config generation numbers, where in a major disaster outage,
we may manually run pageservers with a hand-selected generation number, so that we can bring them online
independently of a control plane.
### Rollout
Although there is coupling between components, we may deploy most of the new data plane components
independently of the control plane: initially they can just use a static generation number.
#### Phase 1
The pageserver is deployed with some special config to:
- Always act like everything is generation 1 and do not wait for a control plane issued generation on attach
- Skip the places in deletion and remote_consistent_lsn updates where we would call into control plane
#### Phase 2
The control plane changes are deployed: control plane will now track and increment generation numbers.
#### Phase 3
The pageserver is deployed with its control-plane-dependent changes enabled: it will now require
the control plane to service re-attach requests on startup, and handle generation
validation requests.
### On-disk backward compatibility
Backward compatibility with existing data is straightforward:
- When reading the index, we may assume that any layer whose metadata doesn't include
generations will have a path without generation suffix.
- When locating the index file on attachment, we may use the "fallback" listing path
and if there is only an index without generation suffix, that is the one we load.
It is not necessary to re-write existing layers: even new index files will be able
to represent generation-less layers.
### On-disk forward compatibility
We will do a two phase rollout, probably over multiple releases because we will naturally
have some of the read-side code ready before the overall functionality is ready:
1. Deploy pageservers which understand the new index format and generation suffixes
in keys, but do not write objects with generation numbers in the keys.
2. Deploy pageservers that write objects with generation numbers in the keys.
Old pageservers will be oblivious to generation numbers. That means that they can't
read objects with generation numbers in the name. This is why we must
first step must deploy the ability to read, before the second step
starts writing them.
# Frequently Asked Questions
## Why a generation _suffix_ rather than _prefix_?
The choice is motivated by object listing, since one can list by prefix but not
suffix.
In [finding remote indices](#finding-the-remote-indices-for-timelines), we rely
on being able to do a prefix listing for `<tenant>/<timeline>/index_part.json*`.
That relies on the prefix listing.
The converse case of using a generation prefix and listing by generation is
not needed: one could imagine listing by generation while scrubbing (so that
a particular generation's layers could be scrubbed), but this is not part
of normal operations, and the [scrubber](#cleaning-up-orphan-objects-scrubbing) probably won't work that way anyway.
## Wouldn't it be simpler to have a separate deletion queue per timeline?
Functionally speaking, we could. That's how RemoteTimelineClient currently works,
but this approach does not map well to a long-lived persistent queue with
generation validation.
Anything we do per-timeline generates tiny random I/O, on a pageserver with
tens of thousands of timelines operating: to be ready for high scale, we should:
- A) Amortize costs where we can (e.g. a shared deletion queue)
- B) Expect to put tenants into a quiescent state while they're not
busy: i.e. we shouldn't keep a tenant alive to service its deletion queue.
This was discussed in the [scope](#scope) part of the deletion queue section.
# Appendix A: Examples of use in high availability/failover
The generation numbers proposed in this RFC are adaptable to a variety of different
failover scenarios and models. The sections below sketch how they would work in practice.
### In-place restart of a pageserver
"In-place" here means that the restart is done before any other element in the system
has taken action in response to the node being down.
- After restart, the node issues a re-attach request to the control plane, and
receives new generation numbers for all its attached tenants.
- Tenants may be activated with the generation number in the re-attach response.
- If any of its attachments were in fact stale (i.e. had be reassigned to another
node while this node was offline), then
- the re-attach response will inform the tenant about this by not including
the tenant of this by _not_ incrementing the generation for that attachment.
- This will implicitly block deletions in the tenant, but as an optimization
the pageserver should also proactively stop doing S3 uploads when it notices this stale-generation state.
- The control plane is expected to eventually detach this tenant from the
pageserver.
If the control plane does not include a tenant in the re-attach response,
but there is still local state for the tenant in the filesystem, the pageserver
deletes the local state in response and does not load/active the tenant.
See the [earlier section on pageserver startup](#pageserver-attachstartup-changes) for details.
Control plane can use this mechanism to clean up a pageserver that has been
down for so long that all its tenants were migrated away before it came back
up again and asked for re-attach.
### Failure of a pageserver
In this context, read "failure" as the most ambiguous possible case, where
a pageserver is unavailable to clients and control plane, but may still be executing and talking
to S3.
#### Case A: re-attachment to other nodes
1. Let's say node 0 becomes unresponsive in a cluster of three nodes 0, 1, 2.
2. Some external mechanism notices that the node is unavailable and initiates
movement of all tenants attached to that node to a different node according
to some distribution rule.
In this example, it would mean incrementing the generation
of all tenants that were attached to node 0, as each tenant's assigned pageserver changes.
3. A tenant which is now attached to node 1 will _also_ still be attached to node
0, from the perspective of node 0. Node 0 will still be using its old generation,
node 1 will be using a newer generation.
4. S3 writes will continue from nodes 0 and 1: there will be an index_part.json-00000001
\_and\* an index_part.json-00000002. Objects written under the old suffix
after the new attachment was created do not matter from the rest of the system's
perspective: the endpoints are reading from the new attachment location. Objects
written by node 0 are just garbage that can be cleaned up at leisure. Node 0 will
not do any deletions because it can't synchronize with control plane, or if it could,
its deletion queue processing would get errors for the validation requests.
#### Case B: direct node replacement with same node_id and drive
This is the scenario we would experience if running pageservers in some dynamic
VM/container environment that would auto-replace a given node_id when it became
unresponsive, with the node's storage supplied by some network block device
that is attached to the replacement VM/container.
1. Let's say node 0 fails, and there may be some other peers but they aren't relevant.
2. Some external mechanism notices that the node is unavailable, and creates
a "new node 0" (Node 0b) which is a physically separate server. The original node 0
(Node 0a) may still be running, because we do not assume the environment fences nodes.
3. On startup, node 0b re-attaches and gets higher generation numbers for
all tenants.
4. S3 writes continue from nodes 0a and 0b, but the writes do not collide due to different
generation in the suffix, and the writes from node 0a are not visible to the rest
of the system because endpoints are reading only from node 0b.
# Appendix B: interoperability with other features
## Sharded Keyspace
The design in this RFC maps neatly to a sharded keyspace design where subsets of the key space
for a tenant are assigned to different pageservers:
- the "unit of work" for attachments becomes something like a TenantShard rather than a Tenant
- TenantShards get generation numbers just as Tenants do.
- Write workload (ingest, compaction) for a tenant is spread out across pageservers via
TenantShards, but each TenantShard still has exactly one valid writer at a time.
## Read replicas
_This section is about a passive reader of S3 pageserver state, not a postgres
read replica_
For historical reads to LSNs below the remote persistent LSN, any node may act as a reader at any
time: remote data is logically immutable data, and the use of deferred deletion in this RFC helps
mitigate the fact that remote data is not _physically_ immutable (i.e. the actual data for a given
page moves around as compaction happens).
A read replica needs to be aware of generations in remote data in order to read the latest
metadata (find the index_part.json with the latest suffix). It may either query this
from the control plane, or find it with ListObjectsv2 request
## Seamless migration
To make tenant migration totally seamless, we will probably want to intentionally double-attach
a tenant briefly, serving reads from the old node while waiting for the new node to be ready.
This RFC enables that double-attachment: two nodes may be attached at the same time, with the migration destination
having a higher generation number. The old node will be able to ingest and serve reads, but not
do any deletes. The new node's attachment must also avoid deleting layers that the old node may
still use. A new piece of state
will be needed for this in the control plane's definition of an attachment.
## Warm secondary locations
To enable faster tenant movement after a pageserver is lost, we will probably want to spend some
disk capacity on keeping standby locations populated with local disk data.
There's no conflict between this RFC and that: implementing warm secondary locations on a per-tenant basis
would be a separate change to the control plane to store standby location(s) for a tenant. Because
the standbys do not write to S3, they do not need to be assigned generation numbers. When a tenant is
re-attached to a standby location, that would increment the tenant attachment generation and this
would work the same as any other attachment change, but with a warm cache.
## Ephemeral node IDs
This RFC intentionally avoids changing anything fundamental about how pageservers are identified
and registered with the control plane, to avoid coupling the implementation of pageserver split
brain protection with more fundamental changes in the management of the pageservers.
Moving to ephemeral node IDs would provide an extra layer of
resilience in the system, as it would prevent the control plane
accidentally attaching to two physical nodes with the same
generation, if somehow there were two physical nodes with
the same node IDs (currently we rely on EC2 guarantees to
eliminate this scenario). With ephemeral node IDs, there would be
no possibility of that happening, no matter the behavior of
underlying infrastructure.
Nothing fundamental in the pageserver's handling of generations needs to change to handle ephemeral node IDs, since we hardly use the
`node_id` anywhere. The `/re-attach` API would be extended
to enable the pageserver to obtain its ephemeral ID, and provide
some correlation identifier (e.g. EC instance ID), to help the
control plane re-attach tenants to the same physical server that
previously had them attached.

View File

@@ -31,6 +31,8 @@ fn lsn_invalid() -> Lsn {
#[serde_as]
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct SkTimelineInfo {
/// Term.
pub term: Option<u64>,
/// Term of the last entry.
pub last_log_term: Option<u64>,
/// LSN of the last record.
@@ -58,4 +60,6 @@ pub struct SkTimelineInfo {
/// A connection string to use for WAL receiving.
#[serde(default)]
pub safekeeper_connstr: Option<String>,
#[serde(default)]
pub http_connstr: Option<String>,
}

View File

@@ -38,6 +38,7 @@ url.workspace = true
uuid.workspace = true
pq_proto.workspace = true
postgres_connection.workspace = true
metrics.workspace = true
workspace_hack.workspace = true

View File

@@ -58,6 +58,8 @@ pub mod serde_regex;
pub mod pageserver_feedback;
pub mod postgres_client;
pub mod tracing_span_assert;
pub mod rate_limit;

View File

@@ -0,0 +1,37 @@
//! Postgres client connection code common to other crates (safekeeper and
//! pageserver) which depends on tenant/timeline ids and thus not fitting into
//! postgres_connection crate.
use anyhow::Context;
use postgres_connection::{parse_host_port, PgConnectionConfig};
use crate::id::TenantTimelineId;
/// Create client config for fetching WAL from safekeeper on particular timeline.
/// listen_pg_addr_str is in form host:\[port\].
pub fn wal_stream_connection_config(
TenantTimelineId {
tenant_id,
timeline_id,
}: TenantTimelineId,
listen_pg_addr_str: &str,
auth_token: Option<&str>,
availability_zone: Option<&str>,
) -> anyhow::Result<PgConnectionConfig> {
let (host, port) =
parse_host_port(listen_pg_addr_str).context("Unable to parse listen_pg_addr_str")?;
let port = port.unwrap_or(5432);
let mut connstr = PgConnectionConfig::new_host_port(host, port)
.extend_options([
"-c".to_owned(),
format!("timeline_id={}", timeline_id),
format!("tenant_id={}", tenant_id),
])
.set_password(auth_token.map(|s| s.to_owned()));
if let Some(availability_zone) = availability_zone {
connstr = connstr.extend_options([format!("availability_zone={}", availability_zone)]);
}
Ok(connstr)
}

View File

@@ -59,8 +59,8 @@ pub struct FileCacheConfig {
spread_factor: f64,
}
impl Default for FileCacheConfig {
fn default() -> Self {
impl FileCacheConfig {
pub fn default_in_memory() -> Self {
Self {
in_memory: true,
// 75 %
@@ -71,9 +71,19 @@ impl Default for FileCacheConfig {
spread_factor: 0.1,
}
}
}
impl FileCacheConfig {
pub fn default_on_disk() -> Self {
Self {
in_memory: false,
resource_multiplier: 0.75,
// 256 MiB - lower than when in memory because overcommitting is safe; if we don't have
// memory, the kernel will just evict from its page cache, rather than e.g. killing
// everything.
min_remaining_after_cache: NonZeroU64::new(256 * MiB).unwrap(),
spread_factor: 0.1,
}
}
/// Make sure fields of the config are consistent.
pub fn validate(&self) -> anyhow::Result<()> {
// Single field validity

View File

@@ -39,6 +39,16 @@ pub struct Args {
#[arg(short, long)]
pub pgconnstr: Option<String>,
/// Flag to signal that the Postgres file cache is on disk (i.e. not in memory aside from the
/// kernel's page cache), and therefore should not count against available memory.
//
// NB: Ideally this flag would directly refer to whether the file cache is in memory (rather
// than a roundabout way, via whether it's on disk), but in order to be backwards compatible
// during the switch away from an in-memory file cache, we had to default to the previous
// behavior.
#[arg(long)]
pub file_cache_on_disk: bool,
/// The address we should listen on for connection requests. For the
/// agent, this is 0.0.0.0:10301. For the informant, this is 127.0.0.1:10369.
#[arg(short, long)]

View File

@@ -110,10 +110,10 @@ impl Runner {
// memory limits.
if let Some(connstr) = &args.pgconnstr {
info!("initializing file cache");
let config: FileCacheConfig = Default::default();
if !config.in_memory {
panic!("file cache not in-memory implemented")
}
let config = match args.file_cache_on_disk {
true => FileCacheConfig::default_on_disk(),
false => FileCacheConfig::default_in_memory(),
};
let mut file_cache = FileCacheState::new(connstr, config, token.clone())
.await
@@ -140,7 +140,10 @@ impl Runner {
if actual_size != new_size {
info!("file cache size actually got set to {actual_size}")
}
file_cache_reserved_bytes = actual_size;
// Mark the resources given to the file cache as reserved, but only if it's in memory.
if !args.file_cache_on_disk {
file_cache_reserved_bytes = actual_size;
}
state.filecache = Some(file_cache);
}
@@ -227,18 +230,17 @@ impl Runner {
let mut status = vec![];
let mut file_cache_mem_usage = 0;
if let Some(file_cache) = &mut self.filecache {
if !file_cache.config.in_memory {
panic!("file cache not in-memory unimplemented")
}
let actual_usage = file_cache
.set_file_cache_size(expected_file_cache_mem_usage)
.await
.context("failed to set file cache size")?;
file_cache_mem_usage = actual_usage;
if file_cache.config.in_memory {
file_cache_mem_usage = actual_usage;
}
let message = format!(
"set file cache size to {} MiB",
bytes_to_mebibytes(actual_usage)
"set file cache size to {} MiB (in memory = {})",
bytes_to_mebibytes(actual_usage),
file_cache.config.in_memory,
);
info!("downscale: {message}");
status.push(message);
@@ -289,10 +291,6 @@ impl Runner {
// Get the file cache's expected contribution to the memory usage
let mut file_cache_mem_usage = 0;
if let Some(file_cache) = &mut self.filecache {
if !file_cache.config.in_memory {
panic!("file cache not in-memory unimplemented");
}
let expected_usage = file_cache.config.calculate_cache_size(usable_system_memory);
info!(
target = bytes_to_mebibytes(expected_usage),
@@ -304,6 +302,9 @@ impl Runner {
.set_file_cache_size(expected_usage)
.await
.context("failed to set file cache size")?;
if file_cache.config.in_memory {
file_cache_mem_usage = actual_usage;
}
if actual_usage != expected_usage {
warn!(
@@ -312,7 +313,6 @@ impl Runner {
bytes_to_mebibytes(actual_usage)
)
}
file_cache_mem_usage = actual_usage;
}
if let Some(cgroup) = &self.cgroup {

View File

@@ -97,7 +97,7 @@ pub(crate) fn parse_filename(name: &str) -> Option<LayerFile> {
// Finds the max_holes largest holes, ignoring any that are smaller than MIN_HOLE_LENGTH"
async fn get_holes(path: &Path, max_holes: usize) -> Result<Vec<Hole>> {
let file = FileBlockReader::new(VirtualFile::open(path)?);
let summary_blk = file.read_blk(0)?;
let summary_blk = file.read_blk(0).await?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
actual_summary.index_start_blk,

View File

@@ -48,7 +48,7 @@ async fn read_delta_file(path: impl AsRef<Path>) -> Result<()> {
virtual_file::init(10);
page_cache::init(100);
let file = FileBlockReader::new(VirtualFile::open(path)?);
let summary_blk = file.read_blk(0)?;
let summary_blk = file.read_blk(0).await?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
actual_summary.index_start_blk,

View File

@@ -75,10 +75,7 @@
use std::{
collections::{hash_map::Entry, HashMap},
convert::TryInto,
sync::{
atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering},
RwLock, RwLockReadGuard, RwLockWriteGuard, TryLockError,
},
sync::atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering},
};
use anyhow::Context;
@@ -162,7 +159,7 @@ struct Version {
}
struct Slot {
inner: RwLock<SlotInner>,
inner: tokio::sync::RwLock<SlotInner>,
usage_count: AtomicU8,
}
@@ -220,9 +217,9 @@ pub struct PageCache {
///
/// If you add support for caching different kinds of objects, each object kind
/// can have a separate mapping map, next to this field.
materialized_page_map: RwLock<HashMap<MaterializedPageHashKey, Vec<Version>>>,
materialized_page_map: std::sync::RwLock<HashMap<MaterializedPageHashKey, Vec<Version>>>,
immutable_page_map: RwLock<HashMap<(FileId, u32), usize>>,
immutable_page_map: std::sync::RwLock<HashMap<(FileId, u32), usize>>,
/// The actual buffers with their metadata.
slots: Box<[Slot]>,
@@ -238,7 +235,7 @@ pub struct PageCache {
/// PageReadGuard is a "lease" on a buffer, for reading. The page is kept locked
/// until the guard is dropped.
///
pub struct PageReadGuard<'i>(RwLockReadGuard<'i, SlotInner>);
pub struct PageReadGuard<'i>(tokio::sync::RwLockReadGuard<'i, SlotInner>);
impl std::ops::Deref for PageReadGuard<'_> {
type Target = [u8; PAGE_SZ];
@@ -265,7 +262,7 @@ impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
/// to initialize.
///
pub struct PageWriteGuard<'i> {
inner: RwLockWriteGuard<'i, SlotInner>,
inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>,
// Are the page contents currently valid?
// Used to mark pages as invalid that are assigned but not yet filled with data.
@@ -343,7 +340,7 @@ impl PageCache {
/// The 'lsn' is an upper bound, this will return the latest version of
/// the given block, but not newer than 'lsn'. Returns the actual LSN of the
/// returned page.
pub fn lookup_materialized_page(
pub async fn lookup_materialized_page(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
@@ -363,7 +360,7 @@ impl PageCache {
lsn,
};
if let Some(guard) = self.try_lock_for_read(&mut cache_key) {
if let Some(guard) = self.try_lock_for_read(&mut cache_key).await {
if let CacheKey::MaterializedPage {
hash_key: _,
lsn: available_lsn,
@@ -390,7 +387,7 @@ impl PageCache {
///
/// Store an image of the given page in the cache.
///
pub fn memorize_materialized_page(
pub async fn memorize_materialized_page(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
@@ -407,7 +404,7 @@ impl PageCache {
lsn,
};
match self.lock_for_write(&cache_key)? {
match self.lock_for_write(&cache_key).await? {
WriteBufResult::Found(write_guard) => {
// We already had it in cache. Another thread must've put it there
// concurrently. Check that it had the same contents that we
@@ -425,10 +422,14 @@ impl PageCache {
// Section 1.2: Public interface functions for working with immutable file pages.
pub fn read_immutable_buf(&self, file_id: FileId, blkno: u32) -> anyhow::Result<ReadBufResult> {
pub async fn read_immutable_buf(
&self,
file_id: FileId,
blkno: u32,
) -> anyhow::Result<ReadBufResult> {
let mut cache_key = CacheKey::ImmutableFilePage { file_id, blkno };
self.lock_for_read(&mut cache_key)
self.lock_for_read(&mut cache_key).await
}
//
@@ -448,14 +449,14 @@ impl PageCache {
///
/// If no page is found, returns None and *cache_key is left unmodified.
///
fn try_lock_for_read(&self, cache_key: &mut CacheKey) -> Option<PageReadGuard> {
async fn try_lock_for_read(&self, cache_key: &mut CacheKey) -> Option<PageReadGuard> {
let cache_key_orig = cache_key.clone();
if let Some(slot_idx) = self.search_mapping(cache_key) {
// The page was found in the mapping. Lock the slot, and re-check
// that it's still what we expected (because we released the mapping
// lock already, another thread could have evicted the page)
let slot = &self.slots[slot_idx];
let inner = slot.inner.read().unwrap();
let inner = slot.inner.read().await;
if inner.key.as_ref() == Some(cache_key) {
slot.inc_usage_count();
return Some(PageReadGuard(inner));
@@ -496,7 +497,7 @@ impl PageCache {
/// }
/// ```
///
fn lock_for_read(&self, cache_key: &mut CacheKey) -> anyhow::Result<ReadBufResult> {
async fn lock_for_read(&self, cache_key: &mut CacheKey) -> anyhow::Result<ReadBufResult> {
let (read_access, hit) = match cache_key {
CacheKey::MaterializedPage { .. } => {
unreachable!("Materialized pages use lookup_materialized_page")
@@ -511,7 +512,7 @@ impl PageCache {
let mut is_first_iteration = true;
loop {
// First check if the key already exists in the cache.
if let Some(read_guard) = self.try_lock_for_read(cache_key) {
if let Some(read_guard) = self.try_lock_for_read(cache_key).await {
if is_first_iteration {
hit.inc();
}
@@ -554,13 +555,13 @@ impl PageCache {
/// found, returns None.
///
/// When locking a page for writing, the search criteria is always "exact".
fn try_lock_for_write(&self, cache_key: &CacheKey) -> Option<PageWriteGuard> {
async fn try_lock_for_write(&self, cache_key: &CacheKey) -> Option<PageWriteGuard> {
if let Some(slot_idx) = self.search_mapping_for_write(cache_key) {
// The page was found in the mapping. Lock the slot, and re-check
// that it's still what we expected (because we don't released the mapping
// lock already, another thread could have evicted the page)
let slot = &self.slots[slot_idx];
let inner = slot.inner.write().unwrap();
let inner = slot.inner.write().await;
if inner.key.as_ref() == Some(cache_key) {
slot.inc_usage_count();
return Some(PageWriteGuard { inner, valid: true });
@@ -573,10 +574,10 @@ impl PageCache {
///
/// Similar to lock_for_read(), but the returned buffer is write-locked and
/// may be modified by the caller even if it's already found in the cache.
fn lock_for_write(&self, cache_key: &CacheKey) -> anyhow::Result<WriteBufResult> {
async fn lock_for_write(&self, cache_key: &CacheKey) -> anyhow::Result<WriteBufResult> {
loop {
// First check if the key already exists in the cache.
if let Some(write_guard) = self.try_lock_for_write(cache_key) {
if let Some(write_guard) = self.try_lock_for_write(cache_key).await {
return Ok(WriteBufResult::Found(write_guard));
}
@@ -757,7 +758,7 @@ impl PageCache {
/// Find a slot to evict.
///
/// On return, the slot is empty and write-locked.
fn find_victim(&self) -> anyhow::Result<(usize, RwLockWriteGuard<SlotInner>)> {
fn find_victim(&self) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard<SlotInner>)> {
let iter_limit = self.slots.len() * 10;
let mut iters = 0;
loop {
@@ -769,10 +770,7 @@ impl PageCache {
if slot.dec_usage_count() == 0 {
let mut inner = match slot.inner.try_write() {
Ok(inner) => inner,
Err(TryLockError::Poisoned(err)) => {
anyhow::bail!("buffer lock was poisoned: {err:?}")
}
Err(TryLockError::WouldBlock) => {
Err(_err) => {
// If we have looped through the whole buffer pool 10 times
// and still haven't found a victim buffer, something's wrong.
// Maybe all the buffers were in locked. That could happen in
@@ -816,7 +814,7 @@ impl PageCache {
let buf: &mut [u8; PAGE_SZ] = chunk.try_into().unwrap();
Slot {
inner: RwLock::new(SlotInner { key: None, buf }),
inner: tokio::sync::RwLock::new(SlotInner { key: None, buf }),
usage_count: AtomicU8::new(0),
}
})

View File

@@ -33,7 +33,7 @@ impl<'a> BlockCursor<'a> {
let mut blknum = (offset / PAGE_SZ as u64) as u32;
let mut off = (offset % PAGE_SZ as u64) as usize;
let mut buf = self.read_blk(blknum)?;
let mut buf = self.read_blk(blknum).await?;
// peek at the first byte, to determine if it's a 1- or 4-byte length
let first_len_byte = buf[off];
@@ -49,7 +49,7 @@ impl<'a> BlockCursor<'a> {
// it is split across two pages
len_buf[..thislen].copy_from_slice(&buf[off..PAGE_SZ]);
blknum += 1;
buf = self.read_blk(blknum)?;
buf = self.read_blk(blknum).await?;
len_buf[thislen..].copy_from_slice(&buf[0..4 - thislen]);
off = 4 - thislen;
} else {
@@ -70,7 +70,7 @@ impl<'a> BlockCursor<'a> {
if page_remain == 0 {
// continue on next page
blknum += 1;
buf = self.read_blk(blknum)?;
buf = self.read_blk(blknum).await?;
off = 0;
page_remain = PAGE_SZ;
}

View File

@@ -39,7 +39,7 @@ pub enum BlockLease<'a> {
PageReadGuard(PageReadGuard<'static>),
EphemeralFileMutableTail(&'a [u8; PAGE_SZ]),
#[cfg(test)]
Rc(std::rc::Rc<[u8; PAGE_SZ]>),
Arc(std::sync::Arc<[u8; PAGE_SZ]>),
}
impl From<PageReadGuard<'static>> for BlockLease<'static> {
@@ -49,9 +49,9 @@ impl From<PageReadGuard<'static>> for BlockLease<'static> {
}
#[cfg(test)]
impl<'a> From<std::rc::Rc<[u8; PAGE_SZ]>> for BlockLease<'a> {
fn from(value: std::rc::Rc<[u8; PAGE_SZ]>) -> Self {
BlockLease::Rc(value)
impl<'a> From<std::sync::Arc<[u8; PAGE_SZ]>> for BlockLease<'a> {
fn from(value: std::sync::Arc<[u8; PAGE_SZ]>) -> Self {
BlockLease::Arc(value)
}
}
@@ -63,7 +63,7 @@ impl<'a> Deref for BlockLease<'a> {
BlockLease::PageReadGuard(v) => v.deref(),
BlockLease::EphemeralFileMutableTail(v) => v,
#[cfg(test)]
BlockLease::Rc(v) => v.deref(),
BlockLease::Arc(v) => v.deref(),
}
}
}
@@ -83,13 +83,13 @@ pub(crate) enum BlockReaderRef<'a> {
impl<'a> BlockReaderRef<'a> {
#[inline(always)]
fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
async fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
use BlockReaderRef::*;
match self {
FileBlockReaderVirtual(r) => r.read_blk(blknum),
FileBlockReaderFile(r) => r.read_blk(blknum),
EphemeralFile(r) => r.read_blk(blknum),
Adapter(r) => r.read_blk(blknum),
FileBlockReaderVirtual(r) => r.read_blk(blknum).await,
FileBlockReaderFile(r) => r.read_blk(blknum).await,
EphemeralFile(r) => r.read_blk(blknum).await,
Adapter(r) => r.read_blk(blknum).await,
#[cfg(test)]
TestDisk(r) => r.read_blk(blknum),
}
@@ -134,8 +134,8 @@ impl<'a> BlockCursor<'a> {
/// access to the contents of the page. (For the page cache, the
/// lease object represents a lock on the buffer.)
#[inline(always)]
pub fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
self.reader.read_blk(blknum)
pub async fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
self.reader.read_blk(blknum).await
}
}
@@ -170,11 +170,12 @@ where
/// Returns a "lease" object that can be used to
/// access to the contents of the page. (For the page cache, the
/// lease object represents a lock on the buffer.)
pub fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
pub async fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
let cache = page_cache::get();
loop {
match cache
.read_immutable_buf(self.file_id, blknum)
.await
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,

View File

@@ -262,7 +262,7 @@ where
let block_cursor = self.reader.block_cursor();
while let Some((node_blknum, opt_iter)) = stack.pop() {
// Locate the node.
let node_buf = block_cursor.read_blk(self.start_blk + node_blknum)?;
let node_buf = block_cursor.read_blk(self.start_blk + node_blknum).await?;
let node = OnDiskNode::deparse(node_buf.as_ref())?;
let prefix_len = node.prefix_len as usize;
@@ -357,7 +357,7 @@ where
let block_cursor = self.reader.block_cursor();
while let Some((blknum, path, depth, child_idx, key_off)) = stack.pop() {
let blk = block_cursor.read_blk(self.start_blk + blknum)?;
let blk = block_cursor.read_blk(self.start_blk + blknum).await?;
let buf: &[u8] = blk.as_ref();
let node = OnDiskNode::<L>::deparse(buf)?;
@@ -704,7 +704,7 @@ pub(crate) mod tests {
pub(crate) fn read_blk(&self, blknum: u32) -> io::Result<BlockLease> {
let mut buf = [0u8; PAGE_SZ];
buf.copy_from_slice(&self.blocks[blknum as usize]);
Ok(std::rc::Rc::new(buf).into())
Ok(std::sync::Arc::new(buf).into())
}
}
impl BlockReader for TestDisk {

View File

@@ -61,13 +61,14 @@ impl EphemeralFile {
self.len
}
pub(crate) fn read_blk(&self, blknum: u32) -> Result<BlockLease, io::Error> {
pub(crate) async fn read_blk(&self, blknum: u32) -> Result<BlockLease, io::Error> {
let flushed_blknums = 0..self.len / PAGE_SZ as u64;
if flushed_blknums.contains(&(blknum as u64)) {
let cache = page_cache::get();
loop {
match cache
.read_immutable_buf(self.page_cache_file_id, blknum)
.await
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
@@ -135,10 +136,13 @@ impl EphemeralFile {
// Pre-warm the page cache with what we just wrote.
// This isn't necessary for coherency/correctness, but it's how we've always done it.
let cache = page_cache::get();
match cache.read_immutable_buf(
self.ephemeral_file.page_cache_file_id,
self.blknum,
) {
match cache
.read_immutable_buf(
self.ephemeral_file.page_cache_file_id,
self.blknum,
)
.await
{
Ok(page_cache::ReadBufResult::Found(_guard)) => {
// This function takes &mut self, so, it shouldn't be possible to reach this point.
unreachable!("we just wrote blknum {} and this function takes &mut self, so, no concurrent read_blk is possible", self.blknum);

View File

@@ -467,7 +467,7 @@ impl DeltaLayer {
PathOrConf::Path(_) => None,
};
let loaded = DeltaLayerInner::load(&path, summary)?;
let loaded = DeltaLayerInner::load(&path, summary).await?;
if let PathOrConf::Path(ref path) = self.path_or_conf {
// not production code
@@ -841,12 +841,15 @@ impl Drop for DeltaLayerWriter {
}
impl DeltaLayerInner {
pub(super) fn load(path: &std::path::Path, summary: Option<Summary>) -> anyhow::Result<Self> {
pub(super) async fn load(
path: &std::path::Path,
summary: Option<Summary>,
) -> anyhow::Result<Self> {
let file = VirtualFile::open(path)
.with_context(|| format!("Failed to open file '{}'", path.display()))?;
let file = FileBlockReader::new(file);
let summary_blk = file.read_blk(0)?;
let summary_blk = file.read_blk(0).await?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
if let Some(mut expected_summary) = summary {
@@ -1028,7 +1031,7 @@ impl<'a> ValueRef<'a> {
pub(crate) struct Adapter<T>(T);
impl<T: AsRef<DeltaLayerInner>> Adapter<T> {
pub(crate) fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
self.0.as_ref().file.read_blk(blknum)
pub(crate) async fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
self.0.as_ref().file.read_blk(blknum).await
}
}

View File

@@ -349,7 +349,8 @@ impl ImageLayer {
PathOrConf::Path(_) => None,
};
let loaded = ImageLayerInner::load(&path, self.desc.image_layer_lsn(), expected_summary)?;
let loaded =
ImageLayerInner::load(&path, self.desc.image_layer_lsn(), expected_summary).await?;
if let PathOrConf::Path(ref path) = self.path_or_conf {
// not production code
@@ -432,7 +433,7 @@ impl ImageLayer {
}
impl ImageLayerInner {
pub(super) fn load(
pub(super) async fn load(
path: &std::path::Path,
lsn: Lsn,
summary: Option<Summary>,
@@ -440,7 +441,7 @@ impl ImageLayerInner {
let file = VirtualFile::open(path)
.with_context(|| format!("Failed to open file '{}'", path.display()))?;
let file = FileBlockReader::new(file);
let summary_blk = file.read_blk(0)?;
let summary_blk = file.read_blk(0).await?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
if let Some(mut expected_summary) = summary {

View File

@@ -465,7 +465,7 @@ impl Timeline {
// The cached image can be returned directly if there is no WAL between the cached image
// and requested LSN. The cached image can also be used to reduce the amount of WAL needed
// for redo.
let cached_page_img = match self.lookup_cached_page(&key, lsn) {
let cached_page_img = match self.lookup_cached_page(&key, lsn).await {
Some((cached_lsn, cached_img)) => {
match cached_lsn.cmp(&lsn) {
Ordering::Less => {} // there might be WAL between cached_lsn and lsn, we need to check
@@ -494,6 +494,7 @@ impl Timeline {
RECONSTRUCT_TIME
.observe_closure_duration(|| self.reconstruct_value(key, lsn, reconstruct_state))
.await
}
/// Get last or prev record separately. Same as get_last_record_rlsn().last/prev.
@@ -2264,7 +2265,15 @@ impl Timeline {
)));
}
}
ancestor.wait_lsn(timeline.ancestor_lsn, ctx).await?;
ancestor
.wait_lsn(timeline.ancestor_lsn, ctx)
.await
.with_context(|| {
format!(
"wait for lsn {} on ancestor timeline_id={}",
timeline.ancestor_lsn, ancestor.timeline_id
)
})?;
timeline_owned = ancestor;
timeline = &*timeline_owned;
@@ -2443,13 +2452,14 @@ impl Timeline {
}
}
fn lookup_cached_page(&self, key: &Key, lsn: Lsn) -> Option<(Lsn, Bytes)> {
async fn lookup_cached_page(&self, key: &Key, lsn: Lsn) -> Option<(Lsn, Bytes)> {
let cache = page_cache::get();
// FIXME: It's pointless to check the cache for things that are not 8kB pages.
// We should look at the key to determine if it's a cacheable object
let (lsn, read_guard) =
cache.lookup_materialized_page(self.tenant_id, self.timeline_id, key, lsn)?;
let (lsn, read_guard) = cache
.lookup_materialized_page(self.tenant_id, self.timeline_id, key, lsn)
.await?;
let img = Bytes::from(read_guard.to_vec());
Some((lsn, img))
}
@@ -4131,7 +4141,7 @@ impl Timeline {
///
/// Reconstruct a value, using the given base image and WAL records in 'data'.
///
fn reconstruct_value(
async fn reconstruct_value(
&self,
key: Key,
request_lsn: Lsn,
@@ -4200,6 +4210,7 @@ impl Timeline {
last_rec_lsn,
&img,
)
.await
.context("Materialized page memoization failed")
{
return Err(PageReconstructError::from(e));

View File

@@ -31,10 +31,11 @@ use storage_broker::Streaming;
use tokio::select;
use tracing::*;
use postgres_connection::{parse_host_port, PgConnectionConfig};
use postgres_connection::PgConnectionConfig;
use utils::backoff::{
exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS,
};
use utils::postgres_client::wal_stream_connection_config;
use utils::{
id::{NodeId, TenantTimelineId},
lsn::Lsn,
@@ -879,33 +880,6 @@ impl ReconnectReason {
}
}
fn wal_stream_connection_config(
TenantTimelineId {
tenant_id,
timeline_id,
}: TenantTimelineId,
listen_pg_addr_str: &str,
auth_token: Option<&str>,
availability_zone: Option<&str>,
) -> anyhow::Result<PgConnectionConfig> {
let (host, port) =
parse_host_port(listen_pg_addr_str).context("Unable to parse listen_pg_addr_str")?;
let port = port.unwrap_or(5432);
let mut connstr = PgConnectionConfig::new_host_port(host, port)
.extend_options([
"-c".to_owned(),
format!("timeline_id={}", timeline_id),
format!("tenant_id={}", tenant_id),
])
.set_password(auth_token.map(|s| s.to_owned()));
if let Some(availability_zone) = availability_zone {
connstr = connstr.extend_options([format!("availability_zone={}", availability_zone)]);
}
Ok(connstr)
}
#[cfg(test)]
mod tests {
use super::*;
@@ -921,6 +895,7 @@ mod tests {
timeline: SafekeeperTimelineInfo {
safekeeper_id: 0,
tenant_timeline_id: None,
term: 0,
last_log_term: 0,
flush_lsn: 0,
commit_lsn,
@@ -929,6 +904,7 @@ mod tests {
peer_horizon_lsn: 0,
local_start_lsn: 0,
safekeeper_connstr: safekeeper_connstr.to_owned(),
http_connstr: safekeeper_connstr.to_owned(),
availability_zone: None,
},
latest_update,

View File

@@ -341,21 +341,35 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
let (wal_backup_launcher_tx, wal_backup_launcher_rx) = mpsc::channel(100);
// Load all timelines from disk to memory.
GlobalTimelines::init(conf.clone(), wal_backup_launcher_tx)?;
// Keep handles to main tasks to die if any of them disappears.
let mut tasks_handles: FuturesUnordered<BoxFuture<(String, JoinTaskRes)>> =
FuturesUnordered::new();
// Start wal backup launcher before loading timelines as we'll notify it
// through the channel about timelines which need offloading, not draining
// the channel would cause deadlock.
let current_thread_rt = conf
.current_thread_runtime
.then(|| Handle::try_current().expect("no runtime in main"));
let conf_ = conf.clone();
let wal_backup_handle = current_thread_rt
.as_ref()
.unwrap_or_else(|| WAL_BACKUP_RUNTIME.handle())
.spawn(wal_backup::wal_backup_launcher_task_main(
conf_,
wal_backup_launcher_rx,
))
.map(|res| ("WAL backup launcher".to_owned(), res));
tasks_handles.push(Box::pin(wal_backup_handle));
// Load all timelines from disk to memory.
GlobalTimelines::init(conf.clone(), wal_backup_launcher_tx).await?;
let conf_ = conf.clone();
// Run everything in current thread rt, if asked.
if conf.current_thread_runtime {
info!("running in current thread runtime");
}
let current_thread_rt = conf
.current_thread_runtime
.then(|| Handle::try_current().expect("no runtime in main"));
let wal_service_handle = current_thread_rt
.as_ref()
@@ -408,17 +422,6 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
.map(|res| ("WAL remover".to_owned(), res));
tasks_handles.push(Box::pin(wal_remover_handle));
let conf_ = conf.clone();
let wal_backup_handle = current_thread_rt
.as_ref()
.unwrap_or_else(|| WAL_BACKUP_RUNTIME.handle())
.spawn(wal_backup::wal_backup_launcher_task_main(
conf_,
wal_backup_launcher_rx,
))
.map(|res| ("WAL backup launcher".to_owned(), res));
tasks_handles.push(Box::pin(wal_backup_handle));
set_build_info_metric(GIT_VERSION);
// TODO: update tokio-stream, convert to real async Stream with

View File

@@ -1,7 +1,6 @@
//! Code to deal with safekeeper control file upgrades
use crate::safekeeper::{
AcceptorState, PersistedPeers, PgUuid, SafeKeeperState, ServerInfo, Term, TermHistory,
TermSwitchEntry,
AcceptorState, PersistedPeers, PgUuid, SafeKeeperState, ServerInfo, Term, TermHistory, TermLsn,
};
use anyhow::{bail, Result};
use pq_proto::SystemId;
@@ -145,7 +144,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState>
let oldstate = SafeKeeperStateV1::des(&buf[..buf.len()])?;
let ac = AcceptorState {
term: oldstate.acceptor_state.term,
term_history: TermHistory(vec![TermSwitchEntry {
term_history: TermHistory(vec![TermLsn {
term: oldstate.acceptor_state.epoch,
lsn: Lsn(0),
}]),

View File

@@ -19,6 +19,7 @@ use crate::receive_wal::WalReceiverState;
use crate::safekeeper::ServerInfo;
use crate::safekeeper::Term;
use crate::send_wal::WalSenderState;
use crate::timeline::PeerInfo;
use crate::{debug_dump, pull_timeline};
use crate::timelines_global_map::TimelineDeleteForceResult;
@@ -101,6 +102,7 @@ pub struct TimelineStatus {
pub peer_horizon_lsn: Lsn,
#[serde_as(as = "DisplayFromStr")]
pub remote_consistent_lsn: Lsn,
pub peers: Vec<PeerInfo>,
pub walsenders: Vec<WalSenderState>,
pub walreceivers: Vec<WalReceiverState>,
}
@@ -140,6 +142,7 @@ async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body
term_history,
};
let conf = get_conf(&request);
// Note: we report in memory values which can be lost.
let status = TimelineStatus {
tenant_id: ttid.tenant_id,
@@ -153,6 +156,7 @@ async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body
backup_lsn: inmem.backup_lsn,
peer_horizon_lsn: inmem.peer_horizon_lsn,
remote_consistent_lsn: tli.get_walsenders().get_remote_consistent_lsn(),
peers: tli.get_peers(conf).await,
walsenders: tli.get_walsenders().get_all(),
walreceivers: tli.get_walreceivers().get_all(),
};
@@ -282,12 +286,14 @@ async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<B
tenant_id: ttid.tenant_id.as_ref().to_owned(),
timeline_id: ttid.timeline_id.as_ref().to_owned(),
}),
term: sk_info.term.unwrap_or(0),
last_log_term: sk_info.last_log_term.unwrap_or(0),
flush_lsn: sk_info.flush_lsn.0,
commit_lsn: sk_info.commit_lsn.0,
remote_consistent_lsn: sk_info.remote_consistent_lsn.0,
peer_horizon_lsn: sk_info.peer_horizon_lsn.0,
safekeeper_connstr: sk_info.safekeeper_connstr.unwrap_or_else(|| "".to_owned()),
http_connstr: sk_info.http_connstr.unwrap_or_else(|| "".to_owned()),
backup_lsn: sk_info.backup_lsn.0,
local_start_lsn: sk_info.local_start_lsn.0,
availability_zone: None,

View File

@@ -21,7 +21,7 @@ use crate::safekeeper::{AcceptorProposerMessage, AppendResponse, ServerInfo};
use crate::safekeeper::{
AppendRequest, AppendRequestHeader, ProposerAcceptorMessage, ProposerElected,
};
use crate::safekeeper::{SafeKeeperState, Term, TermHistory, TermSwitchEntry};
use crate::safekeeper::{SafeKeeperState, Term, TermHistory, TermLsn};
use crate::timeline::Timeline;
use crate::GlobalTimelines;
use postgres_backend::PostgresBackend;
@@ -119,7 +119,7 @@ async fn send_proposer_elected(tli: &Arc<Timeline>, term: Term, lsn: Lsn) -> any
let history = tli.get_state().await.1.acceptor_state.term_history;
let history = history.up_to(lsn.checked_sub(1u64).unwrap());
let mut history_entries = history.0;
history_entries.push(TermSwitchEntry { term, lsn });
history_entries.push(TermLsn { term, lsn });
let history = TermHistory(history_entries);
let proposer_elected_request = ProposerAcceptorMessage::Elected(ProposerElected {

View File

@@ -19,6 +19,7 @@ pub mod json_ctrl;
pub mod metrics;
pub mod pull_timeline;
pub mod receive_wal;
pub mod recovery;
pub mod remove_wal;
pub mod safekeeper;
pub mod send_wal;

View File

@@ -227,7 +227,9 @@ async fn pull_timeline(status: TimelineStatus, host: String) -> Result<Response>
tokio::fs::create_dir_all(conf.tenant_dir(&ttid.tenant_id)).await?;
tokio::fs::rename(tli_dir_path, &timeline_path).await?;
let tli = GlobalTimelines::load_timeline(ttid).context("Failed to load timeline after copy")?;
let tli = GlobalTimelines::load_timeline(ttid)
.await
.context("Failed to load timeline after copy")?;
info!(
"Loaded timeline {}, flush_lsn={}",

View File

@@ -0,0 +1,40 @@
//! This module implements pulling WAL from peer safekeepers if compute can't
//! provide it, i.e. safekeeper lags too much.
use std::sync::Arc;
use tokio::{select, time::sleep, time::Duration};
use tracing::{info, instrument};
use crate::{timeline::Timeline, SafeKeeperConf};
/// Entrypoint for per timeline task which always runs, checking whether
/// recovery for this safekeeper is needed and starting it if so.
#[instrument(name = "recovery task", skip_all, fields(ttid = %tli.ttid))]
pub async fn recovery_main(tli: Arc<Timeline>, _conf: SafeKeeperConf) {
info!("started");
let mut cancellation_rx = match tli.get_cancellation_rx() {
Ok(rx) => rx,
Err(_) => {
info!("timeline canceled during task start");
return;
}
};
select! {
_ = recovery_main_loop(tli) => { unreachable!() }
_ = cancellation_rx.changed() => {
info!("stopped");
}
}
}
const CHECK_INTERVAL_MS: u64 = 2000;
/// Check regularly whether we need to start recovery.
async fn recovery_main_loop(_tli: Arc<Timeline>) {
let check_duration = Duration::from_millis(CHECK_INTERVAL_MS);
loop {
sleep(check_duration).await;
}
}

View File

@@ -34,22 +34,33 @@ pub const UNKNOWN_SERVER_VERSION: u32 = 0;
/// Consensus logical timestamp.
pub type Term = u64;
const INVALID_TERM: Term = 0;
pub const INVALID_TERM: Term = 0;
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct TermSwitchEntry {
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
pub struct TermLsn {
pub term: Term,
pub lsn: Lsn,
}
// Creation from tuple provides less typing (e.g. for unit tests).
impl From<(Term, Lsn)> for TermLsn {
fn from(pair: (Term, Lsn)) -> TermLsn {
TermLsn {
term: pair.0,
lsn: pair.1,
}
}
}
#[derive(Clone, Serialize, Deserialize)]
pub struct TermHistory(pub Vec<TermSwitchEntry>);
pub struct TermHistory(pub Vec<TermLsn>);
impl TermHistory {
pub fn empty() -> TermHistory {
TermHistory(Vec::new())
}
// Parse TermHistory as n_entries followed by TermSwitchEntry pairs
// Parse TermHistory as n_entries followed by TermLsn pairs
pub fn from_bytes(bytes: &mut Bytes) -> Result<TermHistory> {
if bytes.remaining() < 4 {
bail!("TermHistory misses len");
@@ -60,7 +71,7 @@ impl TermHistory {
if bytes.remaining() < 16 {
bail!("TermHistory is incomplete");
}
res.push(TermSwitchEntry {
res.push(TermLsn {
term: bytes.get_u64_le(),
lsn: bytes.get_u64_le().into(),
})
@@ -557,12 +568,17 @@ where
.up_to(self.flush_lsn())
}
/// Get current term.
pub fn get_term(&self) -> Term {
self.state.acceptor_state.term
}
pub fn get_epoch(&self) -> Term {
self.state.acceptor_state.get_epoch(self.flush_lsn())
}
/// wal_store wrapper avoiding commit_lsn <= flush_lsn violation when we don't have WAL yet.
fn flush_lsn(&self) -> Lsn {
pub fn flush_lsn(&self) -> Lsn {
max(self.wal_store.flush_lsn(), self.state.timeline_start_lsn)
}
@@ -1138,7 +1154,7 @@ mod tests {
let pem = ProposerElected {
term: 1,
start_streaming_at: Lsn(1),
term_history: TermHistory(vec![TermSwitchEntry {
term_history: TermHistory(vec![TermLsn {
term: 1,
lsn: Lsn(3),
}]),

View File

@@ -2,12 +2,12 @@
//! with the "START_REPLICATION" message, and registry of walsenders.
use crate::handler::SafekeeperPostgresHandler;
use crate::safekeeper::Term;
use crate::safekeeper::{Term, TermLsn};
use crate::timeline::Timeline;
use crate::wal_service::ConnectionId;
use crate::wal_storage::WalReader;
use crate::GlobalTimelines;
use anyhow::Context as AnyhowContext;
use anyhow::{bail, Context as AnyhowContext};
use bytes::Bytes;
use parking_lot::Mutex;
use postgres_backend::PostgresBackend;
@@ -390,26 +390,25 @@ impl SafekeeperPostgresHandler {
self.appname.clone(),
));
let commit_lsn_watch_rx = tli.get_commit_lsn_watch_rx();
// Walproposer gets special handling: safekeeper must give proposer all
// local WAL till the end, whether committed or not (walproposer will
// hang otherwise). That's because walproposer runs the consensus and
// synchronizes safekeepers on the most advanced one.
// Walsender can operate in one of two modes which we select by
// application_name: give only committed WAL (used by pageserver) or all
// existing WAL (up to flush_lsn, used by walproposer or peer recovery).
// The second case is always driven by a consensus leader which term
// must generally be also supplied. However we're sloppy to do this in
// walproposer recovery which will be removed soon. So TODO is to make
// it not Option'al then.
//
// There is a small risk of this WAL getting concurrently garbaged if
// another compute rises which collects majority and starts fixing log
// on this safekeeper itself. That's ok as (old) proposer will never be
// able to commit such WAL.
let stop_pos: Option<Lsn> = if self.is_walproposer_recovery() {
let wal_end = tli.get_flush_lsn().await;
Some(wal_end)
// Fetching WAL without term in recovery creates a small risk of this
// WAL getting concurrently garbaged if another compute rises which
// collects majority and starts fixing log on this safekeeper itself.
// That's ok as (old) proposer will never be able to commit such WAL.
let end_watch = if self.is_walproposer_recovery() {
EndWatch::Flush(tli.get_term_flush_lsn_watch_rx())
} else {
None
EndWatch::Commit(tli.get_commit_lsn_watch_rx())
};
// take the latest commit_lsn if don't have stop_pos
let end_pos = stop_pos.unwrap_or(*commit_lsn_watch_rx.borrow());
// we don't check term here; it will be checked on first waiting/WAL reading anyway.
let end_pos = end_watch.get();
if end_pos < start_pos {
warn!(
@@ -419,8 +418,10 @@ impl SafekeeperPostgresHandler {
}
info!(
"starting streaming from {:?} till {:?}, available WAL ends at {}",
start_pos, stop_pos, end_pos
"starting streaming from {:?}, available WAL ends at {}, recovery={}",
start_pos,
end_pos,
matches!(end_watch, EndWatch::Flush(_))
);
// switch to copy
@@ -445,9 +446,8 @@ impl SafekeeperPostgresHandler {
appname,
start_pos,
end_pos,
stop_pos,
term,
commit_lsn_watch_rx,
end_watch,
ws_guard: ws_guard.clone(),
wal_reader,
send_buf: [0; MAX_SEND_SIZE],
@@ -466,6 +466,32 @@ impl SafekeeperPostgresHandler {
}
}
/// Walsender streams either up to commit_lsn (normally) or flush_lsn in the
/// given term (recovery by walproposer or peer safekeeper).
enum EndWatch {
Commit(Receiver<Lsn>),
Flush(Receiver<TermLsn>),
}
impl EndWatch {
/// Get current end of WAL.
fn get(&self) -> Lsn {
match self {
EndWatch::Commit(r) => *r.borrow(),
EndWatch::Flush(r) => r.borrow().lsn,
}
}
/// Wait for the update.
async fn changed(&mut self) -> anyhow::Result<()> {
match self {
EndWatch::Commit(r) => r.changed().await?,
EndWatch::Flush(r) => r.changed().await?,
}
Ok(())
}
}
/// A half driving sending WAL.
struct WalSender<'a, IO> {
pgb: &'a mut PostgresBackend<IO>,
@@ -480,14 +506,12 @@ struct WalSender<'a, IO> {
// We send this LSN to the receiver as wal_end, so that it knows how much
// WAL this safekeeper has. This LSN should be as fresh as possible.
end_pos: Lsn,
// If present, terminate after reaching this position; used by walproposer
// in recovery.
stop_pos: Option<Lsn>,
/// When streaming uncommitted part, the term the client acts as the leader
/// in. Streaming is stopped if local term changes to a different (higher)
/// value.
term: Option<Term>,
commit_lsn_watch_rx: Receiver<Lsn>,
/// Watch channel receiver to learn end of available WAL (and wait for its advancement).
end_watch: EndWatch,
ws_guard: Arc<WalSenderGuard>,
wal_reader: WalReader,
// buffer for readling WAL into to send it
@@ -497,29 +521,20 @@ struct WalSender<'a, IO> {
impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
/// Send WAL until
/// - an error occurs
/// - if we are streaming to walproposer, we've streamed until stop_pos
/// (recovery finished)
/// - receiver is caughtup and there is no computes
/// - receiver is caughtup and there is no computes (if streaming up to commit_lsn)
///
/// Err(CopyStreamHandlerEnd) is always returned; Result is used only for ?
/// convenience.
async fn run(&mut self) -> Result<(), CopyStreamHandlerEnd> {
loop {
// If we are streaming to walproposer, check it is time to stop.
if let Some(stop_pos) = self.stop_pos {
if self.start_pos >= stop_pos {
// recovery finished
return Err(CopyStreamHandlerEnd::ServerInitiated(format!(
"ending streaming to walproposer at {}, recovery finished",
self.start_pos
)));
}
} else {
// Wait for the next portion if it is not there yet, or just
// update our end of WAL available for sending value, we
// communicate it to the receiver.
self.wait_wal().await?;
}
// Wait for the next portion if it is not there yet, or just
// update our end of WAL available for sending value, we
// communicate it to the receiver.
self.wait_wal().await?;
assert!(
self.end_pos > self.start_pos,
"nothing to send after waiting for WAL"
);
// try to send as much as available, capped by MAX_SEND_SIZE
let mut send_size = self
@@ -567,7 +582,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
/// exit in the meanwhile
async fn wait_wal(&mut self) -> Result<(), CopyStreamHandlerEnd> {
loop {
self.end_pos = *self.commit_lsn_watch_rx.borrow();
self.end_pos = self.end_watch.get();
if self.end_pos > self.start_pos {
// We have something to send.
trace!("got end_pos {:?}, streaming", self.end_pos);
@@ -575,27 +590,31 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
}
// Wait for WAL to appear, now self.end_pos == self.start_pos.
if let Some(lsn) = wait_for_lsn(&mut self.commit_lsn_watch_rx, self.start_pos).await? {
if let Some(lsn) = wait_for_lsn(&mut self.end_watch, self.term, self.start_pos).await? {
self.end_pos = lsn;
trace!("got end_pos {:?}, streaming", self.end_pos);
return Ok(());
}
// Timed out waiting for WAL, check for termination and send KA
if let Some(remote_consistent_lsn) = self
.ws_guard
.walsenders
.get_ws_remote_consistent_lsn(self.ws_guard.id)
{
if self.tli.should_walsender_stop(remote_consistent_lsn).await {
// Terminate if there is nothing more to send.
// Note that "ending streaming" part of the string is used by
// pageserver to identify WalReceiverError::SuccessfulCompletion,
// do not change this string without updating pageserver.
return Err(CopyStreamHandlerEnd::ServerInitiated(format!(
// Timed out waiting for WAL, check for termination and send KA.
// Check for termination only if we are streaming up to commit_lsn
// (to pageserver).
if let EndWatch::Commit(_) = self.end_watch {
if let Some(remote_consistent_lsn) = self
.ws_guard
.walsenders
.get_ws_remote_consistent_lsn(self.ws_guard.id)
{
if self.tli.should_walsender_stop(remote_consistent_lsn).await {
// Terminate if there is nothing more to send.
// Note that "ending streaming" part of the string is used by
// pageserver to identify WalReceiverError::SuccessfulCompletion,
// do not change this string without updating pageserver.
return Err(CopyStreamHandlerEnd::ServerInitiated(format!(
"ending streaming to {:?} at {}, receiver is caughtup and there is no computes",
self.appname, self.start_pos,
)));
}
}
}
@@ -663,22 +682,32 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> ReplyReader<IO> {
const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1);
/// Wait until we have commit_lsn > lsn or timeout expires. Returns
/// - Ok(Some(commit_lsn)) if needed lsn is successfully observed;
/// Wait until we have available WAL > start_pos or timeout expires. Returns
/// - Ok(Some(end_pos)) if needed lsn is successfully observed;
/// - Ok(None) if timeout expired;
/// - Err in case of error (if watch channel is in trouble, shouldn't happen).
async fn wait_for_lsn(rx: &mut Receiver<Lsn>, lsn: Lsn) -> anyhow::Result<Option<Lsn>> {
/// - Err in case of error -- only if 1) term changed while fetching in recovery
/// mode 2) watch channel closed, which must never happen.
async fn wait_for_lsn(
rx: &mut EndWatch,
client_term: Option<Term>,
start_pos: Lsn,
) -> anyhow::Result<Option<Lsn>> {
let res = timeout(POLL_STATE_TIMEOUT, async move {
let mut commit_lsn;
loop {
rx.changed().await?;
commit_lsn = *rx.borrow();
if commit_lsn > lsn {
break;
let end_pos = rx.get();
if end_pos > start_pos {
return Ok(end_pos);
}
if let EndWatch::Flush(rx) = rx {
let curr_term = rx.borrow().term;
if let Some(client_term) = client_term {
if curr_term != client_term {
bail!("term changed: requested {}, now {}", client_term, curr_term);
}
}
}
rx.changed().await?;
}
Ok(commit_lsn)
})
.await;

View File

@@ -3,8 +3,11 @@
use anyhow::{anyhow, bail, Result};
use postgres_ffi::XLogSegNo;
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use tokio::fs;
use serde_with::DisplayFromStr;
use std::cmp::max;
use std::path::PathBuf;
use std::sync::Arc;
@@ -24,9 +27,10 @@ use storage_broker::proto::SafekeeperTimelineInfo;
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
use crate::receive_wal::WalReceivers;
use crate::recovery::recovery_main;
use crate::safekeeper::{
AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState,
SafekeeperMemState, ServerInfo, Term,
SafekeeperMemState, ServerInfo, Term, TermLsn, INVALID_TERM,
};
use crate::send_wal::WalSenders;
use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION};
@@ -37,18 +41,25 @@ use crate::SafeKeeperConf;
use crate::{debug_dump, wal_storage};
/// Things safekeeper should know about timeline state on peers.
#[derive(Debug, Clone)]
#[serde_as]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PeerInfo {
pub sk_id: NodeId,
/// Term of the last entry.
_last_log_term: Term,
/// LSN of the last record.
#[serde_as(as = "DisplayFromStr")]
_flush_lsn: Lsn,
#[serde_as(as = "DisplayFromStr")]
pub commit_lsn: Lsn,
/// Since which LSN safekeeper has WAL. TODO: remove this once we fill new
/// sk since backup_lsn.
#[serde_as(as = "DisplayFromStr")]
pub local_start_lsn: Lsn,
/// When info was received.
/// When info was received. Serde annotations are not very useful but make
/// the code compile -- we don't rely on this field externally.
#[serde(skip)]
#[serde(default = "Instant::now")]
ts: Instant,
}
@@ -237,8 +248,9 @@ impl SharedState {
tenant_id: ttid.tenant_id.as_ref().to_owned(),
timeline_id: ttid.timeline_id.as_ref().to_owned(),
}),
term: self.sk.state.acceptor_state.term,
last_log_term: self.sk.get_epoch(),
flush_lsn: self.sk.wal_store.flush_lsn().0,
flush_lsn: self.sk.flush_lsn().0,
// note: this value is not flushed to control file yet and can be lost
commit_lsn: self.sk.inmem.commit_lsn.0,
remote_consistent_lsn: remote_consistent_lsn.0,
@@ -247,6 +259,7 @@ impl SharedState {
.advertise_pg_addr
.to_owned()
.unwrap_or(conf.listen_pg_addr.clone()),
http_connstr: conf.listen_http_addr.to_owned(),
backup_lsn: self.sk.inmem.backup_lsn.0,
local_start_lsn: self.sk.state.local_start_lsn.0,
availability_zone: conf.availability_zone.clone(),
@@ -296,6 +309,13 @@ pub struct Timeline {
commit_lsn_watch_tx: watch::Sender<Lsn>,
commit_lsn_watch_rx: watch::Receiver<Lsn>,
/// Broadcasts (current term, flush_lsn) updates, walsender is interested in
/// them when sending in recovery mode (to walproposer or peers). Note: this
/// is just a notification, WAL reading should always done with lock held as
/// term can change otherwise.
term_flush_lsn_watch_tx: watch::Sender<TermLsn>,
term_flush_lsn_watch_rx: watch::Receiver<TermLsn>,
/// Safekeeper and other state, that should remain consistent and
/// synchronized with the disk. This is tokio mutex as we write WAL to disk
/// while holding it, ensuring that consensus checks are in order.
@@ -317,16 +337,20 @@ pub struct Timeline {
impl Timeline {
/// Load existing timeline from disk.
pub fn load_timeline(
conf: SafeKeeperConf,
conf: &SafeKeeperConf,
ttid: TenantTimelineId,
wal_backup_launcher_tx: Sender<TenantTimelineId>,
) -> Result<Timeline> {
let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered();
let shared_state = SharedState::restore(&conf, &ttid)?;
let shared_state = SharedState::restore(conf, &ttid)?;
let rcl = shared_state.sk.state.remote_consistent_lsn;
let (commit_lsn_watch_tx, commit_lsn_watch_rx) =
watch::channel(shared_state.sk.state.commit_lsn);
let (term_flush_lsn_watch_tx, term_flush_lsn_watch_rx) = watch::channel(TermLsn::from((
shared_state.sk.get_term(),
shared_state.sk.flush_lsn(),
)));
let (cancellation_tx, cancellation_rx) = watch::channel(false);
Ok(Timeline {
@@ -334,6 +358,8 @@ impl Timeline {
wal_backup_launcher_tx,
commit_lsn_watch_tx,
commit_lsn_watch_rx,
term_flush_lsn_watch_tx,
term_flush_lsn_watch_rx,
mutex: Mutex::new(shared_state),
walsenders: WalSenders::new(rcl),
walreceivers: WalReceivers::new(),
@@ -345,7 +371,7 @@ impl Timeline {
/// Create a new timeline, which is not yet persisted to disk.
pub fn create_empty(
conf: SafeKeeperConf,
conf: &SafeKeeperConf,
ttid: TenantTimelineId,
wal_backup_launcher_tx: Sender<TenantTimelineId>,
server_info: ServerInfo,
@@ -353,6 +379,8 @@ impl Timeline {
local_start_lsn: Lsn,
) -> Result<Timeline> {
let (commit_lsn_watch_tx, commit_lsn_watch_rx) = watch::channel(Lsn::INVALID);
let (term_flush_lsn_watch_tx, term_flush_lsn_watch_rx) =
watch::channel(TermLsn::from((INVALID_TERM, Lsn::INVALID)));
let (cancellation_tx, cancellation_rx) = watch::channel(false);
let state = SafeKeeperState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn);
@@ -361,7 +389,9 @@ impl Timeline {
wal_backup_launcher_tx,
commit_lsn_watch_tx,
commit_lsn_watch_rx,
mutex: Mutex::new(SharedState::create_new(&conf, &ttid, state)?),
term_flush_lsn_watch_tx,
term_flush_lsn_watch_rx,
mutex: Mutex::new(SharedState::create_new(conf, &ttid, state)?),
walsenders: WalSenders::new(Lsn(0)),
walreceivers: WalReceivers::new(),
cancellation_rx,
@@ -370,12 +400,16 @@ impl Timeline {
})
}
/// Initialize fresh timeline on disk and start background tasks. If bootstrap
/// Initialize fresh timeline on disk and start background tasks. If init
/// fails, timeline is cancelled and cannot be used anymore.
///
/// Bootstrap is transactional, so if it fails, created files will be deleted,
/// Init is transactional, so if it fails, created files will be deleted,
/// and state on disk should remain unchanged.
pub async fn bootstrap(&self, shared_state: &mut MutexGuard<'_, SharedState>) -> Result<()> {
pub async fn init_new(
self: &Arc<Timeline>,
shared_state: &mut MutexGuard<'_, SharedState>,
conf: &SafeKeeperConf,
) -> Result<()> {
match fs::metadata(&self.timeline_dir).await {
Ok(_) => {
// Timeline directory exists on disk, we should leave state unchanged
@@ -391,7 +425,7 @@ impl Timeline {
// Create timeline directory.
fs::create_dir_all(&self.timeline_dir).await?;
// Write timeline to disk and TODO: start background tasks.
// Write timeline to disk and start background tasks.
if let Err(e) = shared_state.sk.persist().await {
// Bootstrap failed, cancel timeline and remove timeline directory.
self.cancel(shared_state);
@@ -405,12 +439,16 @@ impl Timeline {
return Err(e);
}
// TODO: add more initialization steps here
self.update_status(shared_state);
self.bootstrap(conf);
Ok(())
}
/// Bootstrap new or existing timeline starting background stasks.
pub fn bootstrap(self: &Arc<Timeline>, conf: &SafeKeeperConf) {
// Start recovery task which always runs on the timeline.
tokio::spawn(recovery_main(self.clone(), conf.clone()));
}
/// Delete timeline from disk completely, by removing timeline directory. Background
/// timeline activities will stop eventually.
pub async fn delete_from_disk(
@@ -444,6 +482,16 @@ impl Timeline {
*self.cancellation_rx.borrow()
}
/// Returns watch channel which gets value when timeline is cancelled. It is
/// guaranteed to have not cancelled value observed (errors otherwise).
pub fn get_cancellation_rx(&self) -> Result<watch::Receiver<bool>> {
let rx = self.cancellation_rx.clone();
if *rx.borrow() {
bail!(TimelineError::Cancelled(self.ttid));
}
Ok(rx)
}
/// Take a writing mutual exclusive lock on timeline shared_state.
pub async fn write_shared_state(&self) -> MutexGuard<SharedState> {
self.mutex.lock().await
@@ -520,6 +568,11 @@ impl Timeline {
self.commit_lsn_watch_rx.clone()
}
/// Returns term_flush_lsn watch channel.
pub fn get_term_flush_lsn_watch_rx(&self) -> watch::Receiver<TermLsn> {
self.term_flush_lsn_watch_rx.clone()
}
/// Pass arrived message to the safekeeper.
pub async fn process_msg(
&self,
@@ -531,6 +584,7 @@ impl Timeline {
let mut rmsg: Option<AcceptorProposerMessage>;
let commit_lsn: Lsn;
let term_flush_lsn: TermLsn;
{
let mut shared_state = self.write_shared_state().await;
rmsg = shared_state.sk.process_msg(msg).await?;
@@ -544,8 +598,11 @@ impl Timeline {
}
commit_lsn = shared_state.sk.inmem.commit_lsn;
term_flush_lsn =
TermLsn::from((shared_state.sk.get_term(), shared_state.sk.flush_lsn()));
}
self.commit_lsn_watch_tx.send(commit_lsn)?;
self.term_flush_lsn_watch_tx.send(term_flush_lsn)?;
Ok(rmsg)
}

View File

@@ -11,7 +11,7 @@ use serde::Serialize;
use std::collections::HashMap;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::{Arc, Mutex, MutexGuard};
use std::sync::{Arc, Mutex};
use tokio::sync::mpsc::Sender;
use tracing::*;
use utils::id::{TenantId, TenantTimelineId, TimelineId};
@@ -71,19 +71,23 @@ pub struct GlobalTimelines;
impl GlobalTimelines {
/// Inject dependencies needed for the timeline constructors and load all timelines to memory.
pub fn init(
pub async fn init(
conf: SafeKeeperConf,
wal_backup_launcher_tx: Sender<TenantTimelineId>,
) -> Result<()> {
let mut state = TIMELINES_STATE.lock().unwrap();
assert!(state.wal_backup_launcher_tx.is_none());
state.wal_backup_launcher_tx = Some(wal_backup_launcher_tx);
state.conf = Some(conf);
// clippy isn't smart enough to understand that drop(state) releases the
// lock, so use explicit block
let tenants_dir = {
let mut state = TIMELINES_STATE.lock().unwrap();
assert!(state.wal_backup_launcher_tx.is_none());
state.wal_backup_launcher_tx = Some(wal_backup_launcher_tx);
state.conf = Some(conf);
// Iterate through all directories and load tenants for all directories
// named as a valid tenant_id.
// Iterate through all directories and load tenants for all directories
// named as a valid tenant_id.
state.get_conf().workdir.clone()
};
let mut tenant_count = 0;
let tenants_dir = state.get_conf().workdir.clone();
for tenants_dir_entry in std::fs::read_dir(&tenants_dir)
.with_context(|| format!("failed to list tenants dir {}", tenants_dir.display()))?
{
@@ -93,7 +97,7 @@ impl GlobalTimelines {
TenantId::from_str(tenants_dir_entry.file_name().to_str().unwrap_or(""))
{
tenant_count += 1;
GlobalTimelines::load_tenant_timelines(&mut state, tenant_id)?;
GlobalTimelines::load_tenant_timelines(tenant_id).await?;
}
}
Err(e) => error!(
@@ -108,7 +112,7 @@ impl GlobalTimelines {
info!(
"found {} tenants directories, successfully loaded {} timelines",
tenant_count,
state.timelines.len()
TIMELINES_STATE.lock().unwrap().timelines.len()
);
Ok(())
}
@@ -116,17 +120,21 @@ impl GlobalTimelines {
/// Loads all timelines for the given tenant to memory. Returns fs::read_dir
/// errors if any.
///
/// Note: This function (and all reading/loading below) is sync because
/// timelines are loaded while holding GlobalTimelinesState lock. Which is
/// fine as this is called only from single threaded main runtime on boot,
/// but clippy complains anyway, and suppressing that isn't trivial as async
/// is the keyword, ha. That only other user is pull_timeline.rs for which
/// being blocked is not that bad, and we can do spawn_blocking.
fn load_tenant_timelines(
state: &mut MutexGuard<'_, GlobalTimelinesState>,
tenant_id: TenantId,
) -> Result<()> {
let timelines_dir = state.get_conf().tenant_dir(&tenant_id);
/// It is async for update_status_notify sake. Since TIMELINES_STATE lock is
/// sync and there is no important reason to make it async (it is always
/// held for a short while) we just lock and unlock it for each timeline --
/// this function is called during init when nothing else is running, so
/// this is fine.
async fn load_tenant_timelines(tenant_id: TenantId) -> Result<()> {
let (conf, wal_backup_launcher_tx) = {
let state = TIMELINES_STATE.lock().unwrap();
(
state.get_conf().clone(),
state.wal_backup_launcher_tx.as_ref().unwrap().clone(),
)
};
let timelines_dir = conf.tenant_dir(&tenant_id);
for timelines_dir_entry in std::fs::read_dir(&timelines_dir)
.with_context(|| format!("failed to list timelines dir {}", timelines_dir.display()))?
{
@@ -136,13 +144,16 @@ impl GlobalTimelines {
TimelineId::from_str(timeline_dir_entry.file_name().to_str().unwrap_or(""))
{
let ttid = TenantTimelineId::new(tenant_id, timeline_id);
match Timeline::load_timeline(
state.get_conf().clone(),
ttid,
state.wal_backup_launcher_tx.as_ref().unwrap().clone(),
) {
match Timeline::load_timeline(&conf, ttid, wal_backup_launcher_tx.clone()) {
Ok(timeline) => {
state.timelines.insert(ttid, Arc::new(timeline));
let tli = Arc::new(timeline);
TIMELINES_STATE
.lock()
.unwrap()
.timelines
.insert(ttid, tli.clone());
tli.bootstrap(&conf);
tli.update_status_notify().await.unwrap();
}
// If we can't load a timeline, it's most likely because of a corrupted
// directory. We will log an error and won't allow to delete/recreate
@@ -168,18 +179,22 @@ impl GlobalTimelines {
}
/// Load timeline from disk to the memory.
pub fn load_timeline(ttid: TenantTimelineId) -> Result<Arc<Timeline>> {
pub async fn load_timeline(ttid: TenantTimelineId) -> Result<Arc<Timeline>> {
let (conf, wal_backup_launcher_tx) = TIMELINES_STATE.lock().unwrap().get_dependencies();
match Timeline::load_timeline(conf, ttid, wal_backup_launcher_tx) {
match Timeline::load_timeline(&conf, ttid, wal_backup_launcher_tx) {
Ok(timeline) => {
let tli = Arc::new(timeline);
// TODO: prevent concurrent timeline creation/loading
TIMELINES_STATE
.lock()
.unwrap()
.timelines
.insert(ttid, tli.clone());
tli.bootstrap(&conf);
Ok(tli)
}
// If we can't load a timeline, it's bad. Caller will figure it out.
@@ -217,7 +232,7 @@ impl GlobalTimelines {
info!("creating new timeline {}", ttid);
let timeline = Arc::new(Timeline::create_empty(
conf,
&conf,
ttid,
wal_backup_launcher_tx,
server_info,
@@ -240,23 +255,24 @@ impl GlobalTimelines {
// Write the new timeline to the disk and start background workers.
// Bootstrap is transactional, so if it fails, the timeline will be deleted,
// and the state on disk should remain unchanged.
if let Err(e) = timeline.bootstrap(&mut shared_state).await {
// Note: the most likely reason for bootstrap failure is that the timeline
if let Err(e) = timeline.init_new(&mut shared_state, &conf).await {
// Note: the most likely reason for init failure is that the timeline
// directory already exists on disk. This happens when timeline is corrupted
// and wasn't loaded from disk on startup because of that. We want to preserve
// the timeline directory in this case, for further inspection.
// TODO: this is an unusual error, perhaps we should send it to sentry
// TODO: compute will try to create timeline every second, we should add backoff
error!("failed to bootstrap timeline {}: {}", ttid, e);
error!("failed to init new timeline {}: {}", ttid, e);
// Timeline failed to bootstrap, it cannot be used. Remove it from the map.
// Timeline failed to init, it cannot be used. Remove it from the map.
TIMELINES_STATE.lock().unwrap().timelines.remove(&ttid);
return Err(e);
}
// We are done with bootstrap, release the lock, return the timeline.
// {} block forces release before .await
}
timeline.update_status_notify().await?;
timeline.wal_backup_launcher_tx.send(timeline.ttid).await?;
Ok(timeline)
}

View File

@@ -125,6 +125,7 @@ async fn publish(client: Option<BrokerClientChannel>, n_keys: u64) {
tenant_id: vec![0xFF; 16],
timeline_id: tli_from_u64(counter % n_keys),
}),
term: 0,
last_log_term: 0,
flush_lsn: counter,
commit_lsn: 2,
@@ -132,6 +133,7 @@ async fn publish(client: Option<BrokerClientChannel>, n_keys: u64) {
remote_consistent_lsn: 4,
peer_horizon_lsn: 5,
safekeeper_connstr: "zenith-1-sk-1.local:7676".to_owned(),
http_connstr: "zenith-1-sk-1.local:7677".to_owned(),
local_start_lsn: 0,
availability_zone: None,
};

View File

@@ -22,6 +22,8 @@ message SubscribeSafekeeperInfoRequest {
message SafekeeperTimelineInfo {
uint64 safekeeper_id = 1;
TenantTimelineId tenant_timeline_id = 2;
// Safekeeper term
uint64 term = 12;
// Term of the last entry.
uint64 last_log_term = 3;
// LSN of the last record.
@@ -36,6 +38,8 @@ message SafekeeperTimelineInfo {
uint64 local_start_lsn = 9;
// A connection string to use for WAL receiving.
string safekeeper_connstr = 10;
// HTTP endpoint connection string
string http_connstr = 13;
// Availability zone of a safekeeper.
optional string availability_zone = 11;
}

View File

@@ -519,6 +519,7 @@ mod tests {
tenant_id: vec![0x00; 16],
timeline_id,
}),
term: 0,
last_log_term: 0,
flush_lsn: 1,
commit_lsn: 2,
@@ -526,6 +527,7 @@ mod tests {
remote_consistent_lsn: 4,
peer_horizon_lsn: 5,
safekeeper_connstr: "neon-1-sk-1.local:7676".to_owned(),
http_connstr: "neon-1-sk-1.local:7677".to_owned(),
local_start_lsn: 0,
availability_zone: None,
}

View File

@@ -0,0 +1,63 @@
# safekeeper utils
import os
import time
from fixtures.log_helper import log
from fixtures.neon_fixtures import Safekeeper, get_dir_size
from fixtures.types import Lsn, TenantId, TimelineId
# my grammar guard startles at the name, but let's consider it is a shortening from "is it true that ...."
def is_segs_not_exist(segs, http_cli, tenant_id, timeline_id):
segs_existense = [f"{f}: {os.path.exists(f)}" for f in segs]
log.info(
f"waiting for segments removal, sk info: {http_cli.timeline_status(tenant_id=tenant_id, timeline_id=timeline_id)}, segments_existence: {segs_existense}"
)
return all(not os.path.exists(p) for p in segs)
def is_flush_lsn_caught_up(sk: Safekeeper, tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn):
http_cli = sk.http_client()
tli_status = http_cli.timeline_status(tenant_id, timeline_id)
log.info(f"sk status is {tli_status}")
return tli_status.flush_lsn >= lsn
def is_commit_lsn_equals_flush_lsn(sk: Safekeeper, tenant_id: TenantId, timeline_id: TimelineId):
http_cli = sk.http_client()
tli_status = http_cli.timeline_status(tenant_id, timeline_id)
log.info(f"sk status is {tli_status}")
return tli_status.flush_lsn == tli_status.commit_lsn
def is_segment_offloaded(
sk: Safekeeper, tenant_id: TenantId, timeline_id: TimelineId, seg_end: Lsn
):
http_cli = sk.http_client()
tli_status = http_cli.timeline_status(tenant_id, timeline_id)
log.info(f"sk status is {tli_status}")
return tli_status.backup_lsn >= seg_end
def is_wal_trimmed(sk: Safekeeper, tenant_id: TenantId, timeline_id: TimelineId, target_size_mb):
http_cli = sk.http_client()
tli_status = http_cli.timeline_status(tenant_id, timeline_id)
sk_wal_size = get_dir_size(os.path.join(sk.data_dir(), str(tenant_id), str(timeline_id)))
sk_wal_size_mb = sk_wal_size / 1024 / 1024
log.info(f"Safekeeper id={sk.id} wal_size={sk_wal_size_mb:.2f}MB status={tli_status}")
return sk_wal_size_mb <= target_size_mb
# Wait for something, defined as f() returning True, raising error if this
# doesn't happen without timeout seconds.
# TODO: unite with wait_until preserving good logs
def wait(f, desc, timeout=30):
started_at = time.time()
while True:
if f():
break
elapsed = time.time() - started_at
if elapsed > timeout:
raise RuntimeError(f"timed out waiting {elapsed:.0f}s for {desc}")
time.sleep(0.5)

View File

@@ -1,4 +1,8 @@
import time
from functools import partial
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.sk_utils import is_commit_lsn_equals_flush_lsn, wait
# Test safekeeper sync and pageserver catch up
@@ -9,7 +13,9 @@ def test_pageserver_catchup_while_compute_down(neon_env_builder: NeonEnvBuilder)
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
env.neon_cli.create_branch("test_pageserver_catchup_while_compute_down")
tenant_id = env.initial_tenant
timeline_id = env.neon_cli.create_branch("test_pageserver_catchup_while_compute_down")
# Make shared_buffers large to ensure we won't query pageserver while it is down.
endpoint = env.endpoints.create_start(
"test_pageserver_catchup_while_compute_down", config_lines=["shared_buffers=512MB"]
@@ -45,12 +51,26 @@ def test_pageserver_catchup_while_compute_down(neon_env_builder: NeonEnvBuilder)
FROM generate_series(1, 10000) g
"""
)
endpoint.stop()
# wait until safekeepers catch up. This forces/tests fast path which avoids
# sync-safekeepers on next compute start.
for sk in env.safekeepers:
wait(
partial(is_commit_lsn_equals_flush_lsn, sk, tenant_id, timeline_id),
"commit_lsn to reach flush_lsn",
)
# stop safekeepers gracefully
env.safekeepers[0].stop()
env.safekeepers[1].stop()
env.safekeepers[2].stop()
# Wait until in flight messages to broker arrive so pageserver won't know
# where to connect if timeline is not activated on safekeeper after restart
# -- we had such a bug once.
time.sleep(1)
# start everything again
# safekeepers must synchronize and pageserver must catch up
env.pageserver.start()
@@ -59,7 +79,7 @@ def test_pageserver_catchup_while_compute_down(neon_env_builder: NeonEnvBuilder)
env.safekeepers[2].start()
# restart compute node
endpoint.stop_and_destroy().create_start("test_pageserver_catchup_while_compute_down")
endpoint = env.endpoints.create_start("test_pageserver_catchup_while_compute_down")
# Ensure that basebackup went correct and pageserver returned all data
pg_conn = endpoint.connect()

View File

@@ -40,6 +40,13 @@ from fixtures.remote_storage import (
RemoteStorageUsers,
available_remote_storages,
)
from fixtures.sk_utils import (
is_flush_lsn_caught_up,
is_segment_offloaded,
is_segs_not_exist,
is_wal_trimmed,
wait,
)
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import get_dir_size, query_scalar, start_in_background
@@ -385,54 +392,11 @@ def test_wal_removal(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
# wait till first segment is removed on all safekeepers
wait(
lambda first_segments=first_segments: all(not os.path.exists(p) for p in first_segments),
partial(is_segs_not_exist, first_segments, http_cli, tenant_id, timeline_id),
"first segment get removed",
wait_f=lambda http_cli=http_cli, tenant_id=tenant_id, timeline_id=timeline_id: log.info(
f"waiting for segments removal, sk info: {http_cli.timeline_status(tenant_id=tenant_id, timeline_id=timeline_id)}"
),
)
# Wait for something, defined as f() returning True, raising error if this
# doesn't happen without timeout seconds, and calling wait_f while waiting.
def wait(f, desc, timeout=30, wait_f=None):
started_at = time.time()
while True:
if f():
break
elapsed = time.time() - started_at
if elapsed > timeout:
raise RuntimeError(f"timed out waiting {elapsed:.0f}s for {desc}")
time.sleep(0.5)
if wait_f is not None:
wait_f()
def is_segment_offloaded(
sk: Safekeeper, tenant_id: TenantId, timeline_id: TimelineId, seg_end: Lsn
):
http_cli = sk.http_client()
tli_status = http_cli.timeline_status(tenant_id, timeline_id)
log.info(f"sk status is {tli_status}")
return tli_status.backup_lsn >= seg_end
def is_flush_lsn_caught_up(sk: Safekeeper, tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn):
http_cli = sk.http_client()
tli_status = http_cli.timeline_status(tenant_id, timeline_id)
log.info(f"sk status is {tli_status}")
return tli_status.flush_lsn >= lsn
def is_wal_trimmed(sk: Safekeeper, tenant_id: TenantId, timeline_id: TimelineId, target_size_mb):
http_cli = sk.http_client()
tli_status = http_cli.timeline_status(tenant_id, timeline_id)
sk_wal_size = get_dir_size(os.path.join(sk.data_dir(), str(tenant_id), str(timeline_id)))
sk_wal_size_mb = sk_wal_size / 1024 / 1024
log.info(f"Safekeeper id={sk.id} wal_size={sk_wal_size_mb:.2f}MB status={tli_status}")
return sk_wal_size_mb <= target_size_mb
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
def test_wal_backup(neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind):
neon_env_builder.num_safekeepers = 3