mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-20 19:10:37 +00:00
Compare commits
28 Commits
alek/delet
...
problame/l
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
15b8618d25 | ||
|
|
3e9a731e76 | ||
|
|
257b5d4865 | ||
|
|
05773708d3 | ||
|
|
382473d9a5 | ||
|
|
eb0a698adc | ||
|
|
81b6578c44 | ||
|
|
bc49c73fee | ||
|
|
e98580b092 | ||
|
|
804ef23043 | ||
|
|
87f7d6bce3 | ||
|
|
39e3fbbeb0 | ||
|
|
8d2a4aa5f8 | ||
|
|
d1fcdf75b3 | ||
|
|
7e39a96441 | ||
|
|
babefdd3f9 | ||
|
|
805fee1483 | ||
|
|
85d6d9dc85 | ||
|
|
e40ee7c3d1 | ||
|
|
0fe3b3646a | ||
|
|
529f8b5016 | ||
|
|
fbcd174489 | ||
|
|
7b5489a0bb | ||
|
|
40268dcd8d | ||
|
|
4436c84751 | ||
|
|
b758bf47ca | ||
|
|
024e306f73 | ||
|
|
f71c82e5de |
@@ -145,7 +145,11 @@ runs:
|
||||
|
||||
if [ "${RERUN_FLAKY}" == "true" ]; then
|
||||
mkdir -p $TEST_OUTPUT
|
||||
poetry run ./scripts/flaky_tests.py "${TEST_RESULT_CONNSTR}" --days 10 --output "$TEST_OUTPUT/flaky.json"
|
||||
poetry run ./scripts/flaky_tests.py "${TEST_RESULT_CONNSTR}" \
|
||||
--days 7 \
|
||||
--output "$TEST_OUTPUT/flaky.json" \
|
||||
--pg-version "${DEFAULT_PG_VERSION}" \
|
||||
--build-type "${BUILD_TYPE}"
|
||||
|
||||
EXTRA_PARAMS="--flaky-tests-json $TEST_OUTPUT/flaky.json $EXTRA_PARAMS"
|
||||
fi
|
||||
|
||||
5
.github/workflows/build_and_test.yml
vendored
5
.github/workflows/build_and_test.yml
vendored
@@ -752,7 +752,7 @@ jobs:
|
||||
run:
|
||||
shell: sh -eu {0}
|
||||
env:
|
||||
VM_BUILDER_VERSION: v0.16.3
|
||||
VM_BUILDER_VERSION: v0.17.5
|
||||
|
||||
steps:
|
||||
- name: Checkout
|
||||
@@ -775,6 +775,7 @@ jobs:
|
||||
run: |
|
||||
./vm-builder \
|
||||
-enable-file-cache \
|
||||
-cgroup-uid=postgres \
|
||||
-src=369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}} \
|
||||
-dst=369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}
|
||||
|
||||
@@ -903,7 +904,7 @@ jobs:
|
||||
container:
|
||||
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/base:pinned
|
||||
options: --init
|
||||
needs: [ promote-images, tag ]
|
||||
needs: [ tag ]
|
||||
steps:
|
||||
- name: Set PR's status to pending and request a remote CI test
|
||||
run: |
|
||||
|
||||
13
CODEOWNERS
13
CODEOWNERS
@@ -1,11 +1,12 @@
|
||||
/compute_tools/ @neondatabase/control-plane
|
||||
/compute_tools/ @neondatabase/control-plane @neondatabase/compute
|
||||
/control_plane/ @neondatabase/compute @neondatabase/storage
|
||||
/libs/pageserver_api/ @neondatabase/compute @neondatabase/storage
|
||||
/libs/postgres_ffi/ @neondatabase/compute
|
||||
/libs/remote_storage/ @neondatabase/storage
|
||||
/libs/safekeeper_api/ @neondatabase/safekeepers
|
||||
/pageserver/ @neondatabase/compute @neondatabase/storage
|
||||
/libs/postgres_ffi/ @neondatabase/compute
|
||||
/libs/remote_storage/ @neondatabase/storage
|
||||
/libs/safekeeper_api/ @neondatabase/safekeepers
|
||||
/libs/vm_monitor/ @neondatabase/autoscaling @neondatabase/compute
|
||||
/pageserver/ @neondatabase/compute @neondatabase/storage
|
||||
/pgxn/ @neondatabase/compute
|
||||
/proxy/ @neondatabase/control-plane
|
||||
/proxy/ @neondatabase/proxy
|
||||
/safekeeper/ @neondatabase/safekeepers
|
||||
/vendor/ @neondatabase/compute
|
||||
|
||||
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -5014,6 +5014,7 @@ dependencies = [
|
||||
"nix 0.26.2",
|
||||
"once_cell",
|
||||
"pin-project-lite",
|
||||
"postgres_connection",
|
||||
"pq_proto",
|
||||
"rand",
|
||||
"regex",
|
||||
|
||||
@@ -211,8 +211,8 @@ RUN wget https://github.com/df7cb/postgresql-unit/archive/refs/tags/7.7.tar.gz -
|
||||
FROM build-deps AS vector-pg-build
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
RUN wget https://github.com/pgvector/pgvector/archive/refs/tags/v0.4.4.tar.gz -O pgvector.tar.gz && \
|
||||
echo "1cb70a63f8928e396474796c22a20be9f7285a8a013009deb8152445b61b72e6 pgvector.tar.gz" | sha256sum --check && \
|
||||
RUN wget https://github.com/pgvector/pgvector/archive/refs/tags/v0.5.0.tar.gz -O pgvector.tar.gz && \
|
||||
echo "d8aa3504b215467ca528525a6de12c3f85f9891b091ce0e5864dd8a9b757f77b pgvector.tar.gz" | sha256sum --check && \
|
||||
mkdir pgvector-src && cd pgvector-src && tar xvzf ../pgvector.tar.gz --strip-components=1 -C . && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
|
||||
|
||||
@@ -19,9 +19,10 @@ Also `compute_ctl` spawns two separate service threads:
|
||||
- `http-endpoint` runs a Hyper HTTP API server, which serves readiness and the
|
||||
last activity requests.
|
||||
|
||||
If the `vm-informant` binary is present at `/bin/vm-informant`, it will also be started. For VM
|
||||
compute nodes, `vm-informant` communicates with the VM autoscaling system. It coordinates
|
||||
downscaling and (eventually) will request immediate upscaling under resource pressure.
|
||||
If `AUTOSCALING` environment variable is set, `compute_ctl` will start the
|
||||
`vm-monitor` located in [`neon/libs/vm_monitor`]. For VM compute nodes,
|
||||
`vm-monitor` communicates with the VM autoscaling system. It coordinates
|
||||
downscaling and requests immediate upscaling under resource pressure.
|
||||
|
||||
Usage example:
|
||||
```sh
|
||||
|
||||
@@ -20,9 +20,10 @@
|
||||
//! - `http-endpoint` runs a Hyper HTTP API server, which serves readiness and the
|
||||
//! last activity requests.
|
||||
//!
|
||||
//! If the `vm-informant` binary is present at `/bin/vm-informant`, it will also be started. For VM
|
||||
//! compute nodes, `vm-informant` communicates with the VM autoscaling system. It coordinates
|
||||
//! downscaling and (eventually) will request immediate upscaling under resource pressure.
|
||||
//! If `AUTOSCALING` environment variable is set, `compute_ctl` will start the
|
||||
//! `vm-monitor` located in [`neon/libs/vm_monitor`]. For VM compute nodes,
|
||||
//! `vm-monitor` communicates with the VM autoscaling system. It coordinates
|
||||
//! downscaling and requests immediate upscaling under resource pressure.
|
||||
//!
|
||||
//! Usage example:
|
||||
//! ```sh
|
||||
@@ -278,8 +279,9 @@ fn main() -> Result<()> {
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::warn;
|
||||
let vm_monitor_addr = matches.get_one::<String>("vm-monitor-addr");
|
||||
let cgroup = matches.get_one::<String>("filecache-connstr");
|
||||
let file_cache_connstr = matches.get_one::<String>("cgroup");
|
||||
let file_cache_connstr = matches.get_one::<String>("filecache-connstr");
|
||||
let cgroup = matches.get_one::<String>("cgroup");
|
||||
let file_cache_on_disk = matches.get_flag("file-cache-on-disk");
|
||||
|
||||
// Only make a runtime if we need to.
|
||||
// Note: it seems like you can make a runtime in an inner scope and
|
||||
@@ -312,6 +314,7 @@ fn main() -> Result<()> {
|
||||
cgroup: cgroup.cloned(),
|
||||
pgconnstr: file_cache_connstr.cloned(),
|
||||
addr: vm_monitor_addr.cloned().unwrap(),
|
||||
file_cache_on_disk,
|
||||
})),
|
||||
token.clone(),
|
||||
))
|
||||
@@ -482,6 +485,11 @@ fn cli() -> clap::Command {
|
||||
)
|
||||
.value_name("FILECACHE_CONNSTR"),
|
||||
)
|
||||
.arg(
|
||||
Arg::new("file-cache-on-disk")
|
||||
.long("file-cache-on-disk")
|
||||
.action(clap::ArgAction::SetTrue),
|
||||
)
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use std::collections::HashMap;
|
||||
use std::env;
|
||||
use std::fs;
|
||||
use std::io::BufRead;
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
@@ -175,6 +176,27 @@ impl TryFrom<ComputeSpec> for ParsedSpec {
|
||||
}
|
||||
}
|
||||
|
||||
/// If we are a VM, returns a [`Command`] that will run in the `neon-postgres`
|
||||
/// cgroup. Otherwise returns the default `Command::new(cmd)`
|
||||
///
|
||||
/// This function should be used to start postgres, as it will start it in the
|
||||
/// neon-postgres cgroup if we are a VM. This allows autoscaling to control
|
||||
/// postgres' resource usage. The cgroup will exist in VMs because vm-builder
|
||||
/// creates it during the sysinit phase of its inittab.
|
||||
fn maybe_cgexec(cmd: &str) -> Command {
|
||||
// The cplane sets this env var for autoscaling computes.
|
||||
// use `var_os` so we don't have to worry about the variable being valid
|
||||
// unicode. Should never be an concern . . . but just in case
|
||||
if env::var_os("AUTOSCALING").is_some() {
|
||||
let mut command = Command::new("cgexec");
|
||||
command.args(["-g", "memory:neon-postgres"]);
|
||||
command.arg(cmd);
|
||||
command
|
||||
} else {
|
||||
Command::new(cmd)
|
||||
}
|
||||
}
|
||||
|
||||
/// Create special neon_superuser role, that's a slightly nerfed version of a real superuser
|
||||
/// that we give to customers
|
||||
fn create_neon_superuser(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
|
||||
@@ -451,7 +473,7 @@ impl ComputeNode {
|
||||
pub fn sync_safekeepers(&self, storage_auth_token: Option<String>) -> Result<Lsn> {
|
||||
let start_time = Utc::now();
|
||||
|
||||
let sync_handle = Command::new(&self.pgbin)
|
||||
let sync_handle = maybe_cgexec(&self.pgbin)
|
||||
.args(["--sync-safekeepers"])
|
||||
.env("PGDATA", &self.pgdata) // we cannot use -D in this mode
|
||||
.envs(if let Some(storage_auth_token) = &storage_auth_token {
|
||||
@@ -586,7 +608,7 @@ impl ComputeNode {
|
||||
|
||||
// Start postgres
|
||||
info!("starting postgres");
|
||||
let mut pg = Command::new(&self.pgbin)
|
||||
let mut pg = maybe_cgexec(&self.pgbin)
|
||||
.args(["-D", pgdata])
|
||||
.spawn()
|
||||
.expect("cannot start postgres process");
|
||||
@@ -614,7 +636,7 @@ impl ComputeNode {
|
||||
let pgdata_path = Path::new(&self.pgdata);
|
||||
|
||||
// Run postgres as a child process.
|
||||
let mut pg = Command::new(&self.pgbin)
|
||||
let mut pg = maybe_cgexec(&self.pgbin)
|
||||
.args(["-D", &self.pgdata])
|
||||
.envs(if let Some(storage_auth_token) = &storage_auth_token {
|
||||
vec![("NEON_AUTH_TOKEN", storage_auth_token)]
|
||||
|
||||
957
docs/rfcs/025-generation-numbers.md
Normal file
957
docs/rfcs/025-generation-numbers.md
Normal file
@@ -0,0 +1,957 @@
|
||||
# Pageserver: split-brain safety for remote storage through generation numbers
|
||||
|
||||
## Summary
|
||||
|
||||
A scheme of logical "generation numbers" for tenant attachment to pageservers is proposed, along with
|
||||
changes to the remote storage format to include these generation numbers in S3 keys.
|
||||
|
||||
Using the control plane as the issuer of these generation numbers enables strong anti-split-brain
|
||||
properties in the pageserver cluster without implementing a consensus mechanism directly
|
||||
in the pageservers.
|
||||
|
||||
## Motivation
|
||||
|
||||
Currently, the pageserver's remote storage format does not provide a mechanism for addressing
|
||||
split brain conditions that may happen when replacing a node or when migrating
|
||||
a tenant from one pageserver to another.
|
||||
|
||||
From a remote storage perspective, a split brain condition occurs whenever two nodes both think
|
||||
they have the same tenant attached, and both can write to S3. This can happen in the case of a
|
||||
network partition, pathologically long delays (e.g. suspended VM), or software bugs.
|
||||
|
||||
In the current deployment model, control plane guarantees that a tenant is attached to one
|
||||
pageserver at a time, thereby ruling out split-brain conditions resulting from dual
|
||||
attachment (however, there is always the risk of a control plane bug). This control
|
||||
plane guarantee prevents robust response to failures, as if a pageserver is unresponsive
|
||||
we may not detach from it. The mechanism in this RFC fixes this, by making it safe to
|
||||
attach to a new, different pageserver even if an unresponsive pageserver may be running.
|
||||
|
||||
Futher, lack of safety during split-brain conditions blocks two important features where occasional
|
||||
split-brain conditions are part of the design assumptions:
|
||||
|
||||
- seamless tenant migration ([RFC PR](https://github.com/neondatabase/neon/pull/5029))
|
||||
- automatic pageserver instance failure handling (aka "failover") (RFC TBD)
|
||||
|
||||
### Prior art
|
||||
|
||||
- 020-pageserver-s3-coordination.md
|
||||
- 023-the-state-of-pageserver-tenant-relocation.md
|
||||
- 026-pageserver-s3-mvcc.md
|
||||
|
||||
This RFC has broad similarities to the proposal to implement a MVCC scheme in
|
||||
S3 object names, but this RFC avoids a general purpose transaction scheme in
|
||||
favour of more specialized "generations" that work like a transaction ID that
|
||||
always has the same lifetime as a pageserver process or tenant attachment, whichever
|
||||
is shorter.
|
||||
|
||||
## Requirements
|
||||
|
||||
- Accommodate storage backends with no atomic or fencing capability (i.e. work within
|
||||
S3's limitation that there are no atomics and clients can't be fenced)
|
||||
- Don't depend on any STONITH or node fencing in the compute layer (i.e. we will not
|
||||
assume that we can reliably kill and EC2 instance and have it die)
|
||||
- Scoped per-tenant, not per-pageserver; for _seamless tenant migration_, we need
|
||||
per-tenant granularity, and for _failover_, we likely want to spread the workload
|
||||
of the failed pageserver instance to a number of peers, rather than monolithically
|
||||
moving the entire workload to another machine.
|
||||
We do not rule out the latter case, but should not constrain ourselves to it.
|
||||
|
||||
## Design Tenets
|
||||
|
||||
These are not requirements, but are ideas that guide the following design:
|
||||
|
||||
- Avoid implementing another consensus system: we already have a strongly consistent
|
||||
database in the control plane that can do atomic operations where needed, and we also
|
||||
have a Paxos implementation in the safekeeper.
|
||||
- Avoiding locking in to specific models of how failover will work (e.g. do not assume that
|
||||
all the tenants on a pageserver will fail over as a unit).
|
||||
- Be strictly correct when it comes to data integrity. Occasional failures of availability
|
||||
are tolerable, occasional data loss is not.
|
||||
|
||||
## Non Goals
|
||||
|
||||
The changes in this RFC intentionally isolate the design decision of how to define
|
||||
logical generations numbers and object storage format in a way that is somewhat flexible with
|
||||
respect to how actual orchestration of failover works.
|
||||
|
||||
This RFC intentionally does not cover:
|
||||
|
||||
- Failure detection
|
||||
- Orchestration of failover
|
||||
- Standby modes to keep data ready for fast migration
|
||||
- Intentional multi-writer operation on tenants (multi-writer scenarios are assumed to be transient split-brain situations).
|
||||
- Sharding.
|
||||
|
||||
The interaction between this RFC and those features is discussed in [Appendix B](#appendix-b-interoperability-with-other-features)
|
||||
|
||||
## Impacted Components
|
||||
|
||||
pageserver, control plane, safekeeper (optional)
|
||||
|
||||
## Implementation Part 1: Correctness
|
||||
|
||||
### Summary
|
||||
|
||||
- A per-tenant **generation number** is introduced to uniquely identifying tenant attachments to pageserver processes.
|
||||
|
||||
- This generation number increments each time the control plane modifies a tenant (`Project`)'s assigned pageserver, or when the assigned pageserver restarts.
|
||||
- the control plane is the authority for generation numbers: only it may
|
||||
increment a generation number.
|
||||
|
||||
- **Object keys are suffixed** with the generation number
|
||||
- **Safety for multiply-attached tenants** is provided by the
|
||||
generation number in the object key: the competing pageservers will not
|
||||
try to write to the same keys.
|
||||
- **Safety in split brain for multiple nodes running with
|
||||
the same node ID** is provided by the pageserver calling out to the control plane
|
||||
on startup, to re-attach and thereby increment the generations of any attached tenants
|
||||
- **Safety for deletions** is achieved by deferring the DELETE from S3 to a point in time where the deleting node has validated with control plane that no attachment with a higher generation has a reference to the to-be-DELETEd key.
|
||||
- **The control plane is used to issue generation numbers** to avoid the need for
|
||||
a built-in consensus system in the pageserver, although this could in principle
|
||||
be changed without changing the storage format.
|
||||
|
||||
### Generation numbers
|
||||
|
||||
A generation number is associated with each tenant in the control plane,
|
||||
and each time the attachment status of the tenant changes, this is incremented.
|
||||
Changes in attachment status include:
|
||||
|
||||
- Attaching the tenant to a different pageserver
|
||||
- A pageserver restarting, and "re-attaching" its tenants on startup
|
||||
|
||||
These increments of attachment generation provide invariants we need to avoid
|
||||
split-brain issues in storage:
|
||||
|
||||
- If two pageservers have the same tenant attached, the attachments are guaranteed to have different generation numbers, because the generation would increment
|
||||
while attaching the second one.
|
||||
- If there are multiple pageservers running with the same node ID, all the attachments on all pageservers are guaranteed to have different generation numbers, because the generation would increment
|
||||
when the second node started and re-attached its tenants.
|
||||
|
||||
As long as the infrastructure does not transparently replace an underlying
|
||||
physical machine, we are totally safe. See the later [unsafe case](#unsafe-case-on-badly-behaved-infrastructure) section for details.
|
||||
|
||||
### Object Key Changes
|
||||
|
||||
#### Generation suffix
|
||||
|
||||
All object keys (layer objects and index objects) will contain the attachment
|
||||
generation as a [suffix](#why-a-generation-suffix-rather-than-prefix).
|
||||
This suffix is the primary mechanism for protecting against split-brain situations, and
|
||||
enabling safe multi-attachment of tenants:
|
||||
|
||||
- Two pageservers running with the same node ID (e.g. after a failure, where there is
|
||||
some rogue pageserver still running) will not try to write to the same objects, because at startup they will have re-attached tenants and thereby incremented
|
||||
generation numbers.
|
||||
- Multiple attachments (to different pageservers) of the same tenant will not try to write to the same objects, as each attachment would have a distinct generation.
|
||||
|
||||
The generation is appended in hex format (8 byte string representing
|
||||
u32), to all our existing key names. A u32's range limit would permit
|
||||
27 restarts _per second_ over a 5 year system lifetime: orders of magnitude more than
|
||||
is realistic.
|
||||
|
||||
The exact meaning of the generation suffix can evolve over time if necessary, for
|
||||
example if we chose to implement a failover mechanism internally to the pageservers
|
||||
rather than going via the control plane. The storage format just sees it as a number,
|
||||
with the only semantic property being that the highest numbered index is the latest.
|
||||
|
||||
#### Index changes
|
||||
|
||||
Since object keys now include a generation suffix, the index of these keys must also be updated. IndexPart currently stores keys and LSNs sufficient to reconstruct key names: this would be extended to store the generation as well.
|
||||
|
||||
This will increase the size of the file, but only modestly: layers are already encoded as
|
||||
their string-ized form, so the overhead is about 10 bytes per layer. This will be less if/when
|
||||
the index storage format is migrated to a binary format from JSON.
|
||||
|
||||
#### Visibility
|
||||
|
||||
_This section doesn't describe code changes, but extends on the consequences of the
|
||||
object key changes given above_
|
||||
|
||||
##### Visibility of objects to pageservers
|
||||
|
||||
Pageservers can of course list objects in S3 at any time, but in practice their
|
||||
visible set is based on the contents of their LayerMap, which is initialized
|
||||
from the `index_part.json.???` that they load.
|
||||
|
||||
Starting with the `index_part` from the most recent previous generation
|
||||
(see [loading index_part](#finding-the-remote-indices-for-timelines)), a pageserver
|
||||
initially has visibility of all the objects that were referenced in the loaded index.
|
||||
These objects are guaranteed to remain visible until the current generation is
|
||||
superseded, via pageservers in older generations avoiding deletions (see [deletion](#deletion)).
|
||||
|
||||
The "most recent previous generation" is _not_ necessarily the most recent
|
||||
in terms of walltime, it is the one that is readable at the time a new generation
|
||||
starts. Consider the following sequence of a tenant being re-attached to different
|
||||
pageserver nodes:
|
||||
|
||||
- Create + attach on PS1 in generation 1
|
||||
- PS1 Do some work, write out index_part.json-0001
|
||||
- Attach to PS2 in generation 2
|
||||
- Read index_part.json-0001
|
||||
- PS2 starts doing some work...
|
||||
- Attach to PS3 in generation 3
|
||||
- Read index_part.json-0001
|
||||
- **...PS2 finishes its work: now it writes index_part.json-0002**
|
||||
- PS3 writes out index_part.json-0003
|
||||
|
||||
In the above sequence, the ancestry of indices is:
|
||||
|
||||
```
|
||||
0001 -> 0002
|
||||
|
|
||||
-> 0003
|
||||
```
|
||||
|
||||
This is not an issue for safety: if the 0002 references some object that is
|
||||
not in 0001, then 0003 simply does not see it, and will re-do whatever
|
||||
work was required (e.g. ingesting WAL or doing compaction). Objects referenced
|
||||
by only the 0002 index will never be read by future attachment generations, and
|
||||
will eventually be cleaned up by a scrub (see [scrubbing](#cleaning-up-orphan-objects-scrubbing)).
|
||||
|
||||
##### Visibility of LSNs to clients
|
||||
|
||||
Because index_part.json is now written with a generation suffix, which data
|
||||
is visible depends on which generation the reader is operating in:
|
||||
|
||||
- If one was passively reading from S3 from outside of a pageserver, the
|
||||
visibility of data would depend on which index_part.json-<generation> file
|
||||
one had chosen to read from.
|
||||
- If two pageservers have the same tenant attached, they may have different
|
||||
data visible as they're independently replaying the WAL, and maintaining
|
||||
independent LayerMaps that are written to independent index_part.json files.
|
||||
Data does not have to be remotely committed to be visible.
|
||||
- For a pageserver writing with a stale generation, historic LSNs
|
||||
remain readable until another pageserver (with a higher generation suffix)
|
||||
decides to execute GC deletions. At this point, we may think of the stale
|
||||
attachment's generation as having logically ended: during its existence
|
||||
the generation had a consistent view of the world.
|
||||
- For a newly attached pageserver, its highest visible LSN may appears to
|
||||
go backwards with respect to an earlier attachment, if that earlier
|
||||
attachment had not uploaded all data to S3 before the new attachment.
|
||||
|
||||
### Deletion
|
||||
|
||||
#### Generation number validation
|
||||
|
||||
While writes are de-conflicted by writers always using their own generation number in the key,
|
||||
deletions are slightly more challenging: if a pageserver A is isolated, and the true active node is
|
||||
pageserver B, then it is dangerous for A to do any object deletions, even of objects that it wrote
|
||||
itself, because pageserver's B metadata might reference those objects.
|
||||
|
||||
We solve this by inserting a "generation validation" step between the write of a remote index
|
||||
that un-links a particular object from the index, and the actual deletion of the object, such
|
||||
that deletions strictly obey the following ordering:
|
||||
|
||||
1. Write out index_part.json: this guarantees that any subsequent reader of the metadata will
|
||||
not try and read the object we unlinked.
|
||||
2. Call out to control plane to validate that the generation which we use for our attachment is still the latest.
|
||||
3. If step 2 passes, it is safe to delete the object. Why? The check-in with control plane
|
||||
together with our visibility rules guarantees that any later generation
|
||||
will use either the exact `index_part.json` that we uploaded in step 1, or a successor
|
||||
of it; not an earlier one. In both cases, the `index_part.json` doesn't reference the
|
||||
key we are deleting anymore, so, the key is invisible to any later attachment generation.
|
||||
Hence it's safe to delete it.
|
||||
|
||||
Note that at step 2 we are only confirming that deletions of objects _no longer referenced
|
||||
by the specific `index_part.json` written in step 1_ are safe. If we were attempting other deletions concurrently,
|
||||
these would need their own generation validation step.
|
||||
|
||||
If step 2 fails, we may leak the object. This is safe, but has a cost: see [scrubbing](#cleaning-up-orphan-objects-scrubbing). We may avoid this entirely outside of node
|
||||
failures, if we do proper flushing of deletions on clean shutdown and clean migration.
|
||||
|
||||
To avoid doing a huge number of control plane requests to perform generation validation,
|
||||
validation of many tenants will be done in a single request, and deletions will be queued up
|
||||
prior to validation: see [Persistent deletion queue](#persistent-deletion-queue) for more.
|
||||
|
||||
#### `remote_consistent_lsn` updates
|
||||
|
||||
Remote objects are not the only kind of deletion the pageserver does: it also indirectly deletes
|
||||
WAL data, by feeding back remote_consistent_lsn to safekeepers, as a signal to the safekeepers that
|
||||
they may drop data below this LSN.
|
||||
|
||||
For the same reasons that deletion of objects must be guarded by an attachment generation number
|
||||
validation step, updates to `remote_consistent_lsn` are subject to the same rules, using
|
||||
an ordering as follows:
|
||||
|
||||
1. upload the index_part that covers data up to LSN `L0` to S3
|
||||
2. Call out to control plane to validate that the generation which we use for our attachment is still the latest.
|
||||
3. advance the `remote_consistent_lsn` that we advertise to the safekeepers to `L0`
|
||||
|
||||
If step 2 fails, then the `remote_consistent_lsn` advertised
|
||||
to safekeepers will not advance again until a pageserver
|
||||
with the latest generation is ready to do so.
|
||||
|
||||
**Note:** at step 3 we are not advertising the _latest_ remote_consistent_lsn, we are
|
||||
advertising the value in the index_part that we uploaded in step 1. This provides
|
||||
a strong ordering guarantee.
|
||||
|
||||
Internally to the pageserver, each timeline will have two remote_consistent_lsn values: the one that
|
||||
reflects its latest write to remote storage, and the one that reflects the most
|
||||
recent validation of generation number. It is only the latter value that may
|
||||
be advertised to the outside world (i.e. to the safekeeper).
|
||||
|
||||
The control plane remains unaware of `remote_consistent_lsn`: it only has to validate
|
||||
the freshness of generation numbers, thereby granting the pageserver permission to
|
||||
share the information with the safekeeper.
|
||||
|
||||
For convenience, in subsequent sections and RFCs we will use "deletion" to mean both deletion
|
||||
of objects in S3, and updates to the `remote_consistent_lsn`, as updates to the remote consistent
|
||||
LSN are de-facto deletions done via the safekeeper, and both kinds of deletion are subject to
|
||||
the same generation validation requirement.
|
||||
|
||||
### Pageserver attach/startup changes
|
||||
|
||||
#### Attachment
|
||||
|
||||
Calls to `/v1/tenant/{tenant_id}/attach` are augmented with an additional
|
||||
`generation` field in the body.
|
||||
|
||||
The pageserver does not persist this: a generation is only good for the lifetime
|
||||
of a process.
|
||||
|
||||
#### Finding the remote indices for timelines
|
||||
|
||||
Because index files are now suffixed with generation numbers, the pageserver
|
||||
cannot always GET the remote index in one request, because it can't always
|
||||
know a-priori what the latest remote index is.
|
||||
|
||||
Typically, the most recent generation to write an index would be our own
|
||||
generation minus 1. However, this might not be the case: the previous
|
||||
node might have started and acquired a generation number, and then crashed
|
||||
before writing out a remote index.
|
||||
|
||||
In the general case and as a fallback, the pageserver may list all the `index_part.json`
|
||||
files for a timeline, sort them by generation, and pick the highest that is `<=`
|
||||
its current generation for this attachment. The tenant should never load an index
|
||||
with an attachment generation _newer_ than its own.
|
||||
These two rules combined ensure that objects written by later generations are never visible to earlier generations.
|
||||
|
||||
Note that if a given attachment picks an index part from an earlier generation (say n-2), but crashes & restarts before it writes its own generation's index part, next time it tries to pick an index part there may be an index part from generation n-1.
|
||||
It would pick the n-1 index part in that case, because it's sorted higher than the previous one from generation n-2.
|
||||
So, above rules guarantee no determinism in selecting the index part.
|
||||
are allowed to be attached with stale attachment generations during a multiply-attached
|
||||
phase in a migration, and in this instance if the old location's pageserver restarts,
|
||||
it should not try and load the newer generation's index.
|
||||
|
||||
To summarize, on starting a timeline, the pageserver will:
|
||||
|
||||
1. Issue a GET for index_part.json-<my generation - 1>
|
||||
2. If 1 failed, issue a ListObjectsv2 request for index_part.json\* and
|
||||
pick the newest.
|
||||
|
||||
One could optimize this further by using the control plane to record specifically
|
||||
which generation most recently wrote an index_part.json, if necessary, to increase
|
||||
the probability of finding the index_part.json in one GET. One could also improve
|
||||
the chances by having pageservers proactively write out index_part.json after they
|
||||
get a new generation ID.
|
||||
|
||||
#### Re-attachment on startup
|
||||
|
||||
On startup, the pageserver will call out to an new control plane `/re-attach`
|
||||
API (see [Generation API](#generation-api)). This returns a list of
|
||||
tenants that should be attached to the pageserver, and their generation numbers, which
|
||||
the control plane will increment before returning.
|
||||
|
||||
The pageserver should still scan its local disk on startup, but should _delete_
|
||||
any local content for tenants not indicated in the `/re-attach` response: their
|
||||
absence is an implicit detach operation.
|
||||
|
||||
**Note** if a tenant is omitted from the re-attach response, its local disk content
|
||||
will be deleted. This will change in subsequent work, when the control plane gains
|
||||
the concept of a secondary/standby location: a node with local content may revert
|
||||
to this status and retain some local content.
|
||||
|
||||
#### Cleaning up previous generations' remote indices
|
||||
|
||||
Deletion of old indices is not necessary for correctness, although it is necessary
|
||||
to avoid the ListObjects fallback in the previous section becoming ever more expensive.
|
||||
|
||||
Once the new attachment has written out its index_part.json, it may asynchronously clean up historic index_part.json
|
||||
objects that were found.
|
||||
|
||||
We may choose to implement this deletion either as an explicit step after we
|
||||
write out index_part for the first time in a pageserver's lifetime, or for
|
||||
simplicity just do it periodically as part of the background scrub (see [scrubbing](#cleaning-up-orphan-objects-scrubbing));
|
||||
|
||||
### Control Plane Changes
|
||||
|
||||
#### Store generations for attaching tenants
|
||||
|
||||
- The `Project` table must store the generation number for use when
|
||||
attaching the tenant to a new pageserver.
|
||||
- The `/v1/tenant/:tenant_id/attach` pageserver API will require the generation number,
|
||||
which the control plane can supply by simply incrementing the `Project`'s
|
||||
generation number each time the tenant is attached to a different server: the same database
|
||||
transaction that changes the assigned pageserver should also change the generation number.
|
||||
|
||||
#### Generation API
|
||||
|
||||
This section describes an API that could be provided directly by the control plane,
|
||||
or built as a separate microservice. In earlier parts of the RFC, when we
|
||||
discuss the control plane providing generation numbers, we are referring to this API.
|
||||
|
||||
The API endpoints used by the pageserver to acquire and validate generation
|
||||
numbers are quite simple, and only require access to some persistent and
|
||||
linerizable storage (such as a database).
|
||||
|
||||
Building this into the control plane is proposed as a least-effort option to exploit existing infrastructure and implement generation number issuance in the same transaction that mandates it (i.e., the transaction that updates the `Project` assignment to another pageserver).
|
||||
However, this is not mandatory: this "Generation Number Issuer" could
|
||||
be built as a microservice. In practice, we will write such a miniature service
|
||||
anyway, to enable E2E pageserver/compute testing without control plane.
|
||||
|
||||
The endpoints required by pageservers are:
|
||||
|
||||
##### `/re-attach`
|
||||
|
||||
- Request: `{node_id: <u32>}`
|
||||
- Response:
|
||||
- 200 `{tenants: [{id: <TenantId>, gen: <u32>}]}`
|
||||
- 404: unknown node_id
|
||||
- (Future: 429: flapping detected, perhaps nodes are fighting for the same node ID,
|
||||
or perhaps this node was in a retry loop)
|
||||
- (On unknown tenants, omit tenant from `tenants` array)
|
||||
- Server behavior: query database for which tenants should be attached to this pageserver.
|
||||
- for each tenant that should be attached, increment the attachment generation and
|
||||
include the new generation in the response
|
||||
- Client behavior:
|
||||
- for all tenants in the response, activate with the new generation number
|
||||
- for any local disk content _not_ referenced in the response, act as if we
|
||||
had been asked to detach it (i.e. delete local files)
|
||||
|
||||
**Note** the `node_id` in this request will change in future if we move to ephemeral
|
||||
node IDs, to be replaced with some correlation ID that helps the control plane realize
|
||||
if a process is running with the same storage as a previous pageserver process (e.g.
|
||||
we might use EC instance ID, or we might just write some UUID to the disk the first
|
||||
time we use it)
|
||||
|
||||
##### `/validate`
|
||||
|
||||
- Request: `{'tenants': [{tenant: <tenant id>, attach_gen: <gen>}, ...]}'`
|
||||
- Response:
|
||||
- 200 `{'tenants': [{tenant: <tenant id>, status: <bool>}...]}`
|
||||
- (On unknown tenants, omit tenant from `tenants` array)
|
||||
- Purpose: enable the pageserver to discover for the given attachments whether they are still the latest.
|
||||
- Server behavior: this is a read-only operation: simply compare the generations in the request with
|
||||
the generations known to the server, and set status to `true` if they match.
|
||||
- Client behavior: clients must not do deletions within a tenant's remote data until they have
|
||||
received a response indicating the generation they hold for the attachment is current.
|
||||
|
||||
#### Use of `/load` and `/ignore` APIs
|
||||
|
||||
Because the pageserver will be changed to only attach tenants on startup
|
||||
based on the control plane's response to a `/re-attach` request, the load/ignore
|
||||
APIs no longer make sense in their current form.
|
||||
|
||||
The `/load` API becomes functionally equivalent to attach, and will be removed:
|
||||
any location that used `/load` before should just attach instead.
|
||||
|
||||
The `/ignore` API is equivalent to detaching, but without deleting local files.
|
||||
|
||||
### Timeline/Branch creation & deletion
|
||||
|
||||
All of the previous arguments for safety have described operations within
|
||||
a timeline, where we may describe a sequence that includes updates to
|
||||
index_part.json, and where reads and writes are coming from a postgres
|
||||
endpoint (writes via the safekeeper).
|
||||
|
||||
Creating or destroying timeline is a bit different, because writes
|
||||
are coming from the control plane.
|
||||
|
||||
We must be safe against scenarios such as:
|
||||
|
||||
- A tenant is attached to pageserver B while pageserver A is
|
||||
in the middle of servicing an RPC from the control plane to
|
||||
create or delete a tenant.
|
||||
- A pageserver A has been sent a timeline creation request
|
||||
but becomes unresponsive. The tenant is attached to a
|
||||
different pageserver B, and the timeline creation request
|
||||
is sent there too.
|
||||
|
||||
#### Timeline Creation
|
||||
|
||||
If some very slow node tries to do a timeline creation _after_
|
||||
a more recent generation node has already created the timeline
|
||||
and written some data into it, that must not cause harm. This
|
||||
is provided in timeline creations by the way all the objects
|
||||
within the timeline's remote path include a generation suffix:
|
||||
a slow node in an old generation that attempts to "create" a timeline
|
||||
that already exists will just emit an index_part.json with
|
||||
an old generation suffix.
|
||||
|
||||
Timeline IDs are never reused, so we don't have
|
||||
to worry about the case of create/delete/create cycles. If they
|
||||
were re-used during a disaster recovery "un-delete" of a timeline,
|
||||
that special case can be handled by calling out to all available pageservers
|
||||
to check that they return 404 for the timeline, and to flush their
|
||||
deletion queues in case they had any deletions pending from the
|
||||
timeline.
|
||||
|
||||
The above makes it safe for control plane to change the assignment of
|
||||
tenant to pageserver in control plane while a timeline creation is ongoing.
|
||||
The reason is that the creation request against the new assigned pageserver
|
||||
uses a new generation number. However, care must be taken by control plane
|
||||
to ensure that a "timeline creation successul" response from some pageserver
|
||||
is checked for the pageserver's generation for that timeline's tenant still being the latest.
|
||||
If it is not the latest, the response does not constitute a successful timeline creation.
|
||||
It is acceptable to discard such responses, the scrubber will clean up the S3 state.
|
||||
It is better to issue a timelien deletion request to the stale attachment.
|
||||
|
||||
#### Timeline Deletion
|
||||
|
||||
Tenant/timeline deletion operations are exempt from generation validation
|
||||
on deletes, and therefore don't have to go through the same deletion
|
||||
queue as GC/compaction layer deletions. This is because once a
|
||||
delete is issued by the control plane, it is a promise that the
|
||||
control plane will keep trying until the deletion is done, so even stale
|
||||
pageservers are permitted to go ahead and delete the objects.
|
||||
|
||||
The implications of this for control plane are:
|
||||
|
||||
- During timeline/tenant deletion, the control plane must wait for the deletion to
|
||||
be truly complete (status 404) and also handle the case where the pageserver
|
||||
becomes unavailable, either by waiting for a replacement with the same node_id,
|
||||
or by *re-attaching the tenant elsewhere.
|
||||
|
||||
- The control plane must persist its intent to delete
|
||||
a timeline/tenant before issuing any RPCs, and then once it starts, it must
|
||||
keep retrying until the tenant/timeline is gone. This is already handled
|
||||
by using a persistent `Operation` record that is retried indefinitely.
|
||||
|
||||
Timeline deletion may result in a special kind of object leak, where
|
||||
the latest generation attachment completes a deletion (including erasing
|
||||
all objects in the timeline path), but some slow/partitioned node is
|
||||
writing into the timeline path with a stale generation number. This would
|
||||
not be caught by any per-timeline scrubbing (see [scrubbing](#cleaning-up-orphan-objects-scrubbing)), since scrubbing happens on the
|
||||
attached pageserver, and once the timeline is deleted it isn't attached anywhere.
|
||||
This scenario should be pretty rare, and the control plane can make it even
|
||||
rarer by ensuring that if a tenant is in a multi-attached state (e.g. during
|
||||
migration), we wait for that to complete before processing the deletion. Beyond
|
||||
that, we may implement some other top-level scrub of timelines in
|
||||
an external tool, to identify any tenant/timeline paths that are not found
|
||||
in the control plane database.
|
||||
|
||||
#### Examples
|
||||
|
||||
- Deletion, node restarts partway through:
|
||||
- By the time we returned 202, we have written a remote delete marker
|
||||
- Any subsequent incarnation of the same node_id will see the remote
|
||||
delete marker and continue to process the deletion
|
||||
- If the original pageserver is lost permanently and no replacement
|
||||
with the same node_id is available, then the control plane must recover
|
||||
by re-attaching the tenant to a different node.
|
||||
- Creation, node becomes unresponsive partway through.
|
||||
- Control plane will see HTTP request timeout, keep re-issuing
|
||||
request to whoever is the latest attachment point for the tenant
|
||||
until it succeeds.
|
||||
- Stale nodes may be trying to execute timeline creation: they will
|
||||
write out index_part.json files with
|
||||
stale attachment generation: these will be eventually cleaned up
|
||||
by the same mechanism as other old indices.
|
||||
|
||||
### Unsafe case on badly behaved infrastructure
|
||||
|
||||
This section is only relevant if running on a different environment
|
||||
than EC2 machines with ephemeral disks.
|
||||
|
||||
If we ever run pageservers on infrastructure that might transparently restart
|
||||
a pageserver while leaving an old process running (e.g. a VM gets rescheduled
|
||||
without the old one being fenced), then there is a risk of corruption, when
|
||||
the control plane attaches the tenant, as follows:
|
||||
|
||||
- If the control plane sends an `/attach` request to node A, then node A dies
|
||||
and is replaced, and the control plane's retries the request without
|
||||
incrementing that attachment ID, then it could end up with two physical nodes
|
||||
both using the same generation number.
|
||||
- This is not an issue when using EC2 instances with ephemeral storage, as long
|
||||
as the control plane never re-uses a node ID, but it would need re-examining
|
||||
if running on different infrastructure.
|
||||
- To robustly protect against this class of issue, we would either:
|
||||
- add a "node generation" to distinguish between different processes holding the
|
||||
same node_id.
|
||||
- or, dispense with static node_id entirely and issue an ephemeral ID to each
|
||||
pageserver process when it starts.
|
||||
|
||||
## Implementation Part 2: Optimizations
|
||||
|
||||
### Persistent deletion queue
|
||||
|
||||
Between writing our a new index_part.json that doesn't reference an object,
|
||||
and executing the deletion, an object passes through a window where it is
|
||||
only referenced in memory, and could be leaked if the pageserver is stopped
|
||||
uncleanly. That introduces conflicting incentives: on the one hand, we would
|
||||
like to delay and batch deletions to
|
||||
1. minimize the cost of the mandatory validations calls to control plane, and
|
||||
2. minimize cost for DeleteObjects requests.
|
||||
On the other hand we would also like to minimize leakage by executing
|
||||
deletions promptly.
|
||||
|
||||
To resolve this, we may make the deletion queue persistent
|
||||
and then executing these in the background at a later time.
|
||||
|
||||
_Note: The deletion queue's reason for existence is optimization rather than correctness,
|
||||
so there is a lot of flexibility in exactly how the it should work,
|
||||
as long as it obeys the rule to validate generations before executing deletions,
|
||||
so the following details are not essential to the overall RFC._
|
||||
|
||||
#### Scope
|
||||
|
||||
The deletion queue will be global per pageserver, not per-tenant. There
|
||||
are several reasons for this choice:
|
||||
|
||||
- Use the queue as a central point to coalesce validation requests to the
|
||||
control plane: this avoids individual `Timeline` objects ever touching
|
||||
the control plane API, and avoids them having to know the rules about
|
||||
validating deletions. This separation of concerns will avoid burdening
|
||||
the already many-LoC `Timeline` type with even more responsibility.
|
||||
- Decouple the deletion queue from Tenant attachment lifetime: we may
|
||||
"hibernate" an inactive tenant by tearing down its `Tenant`/`Timeline`
|
||||
objects in the pageserver, without having to wait for deletions to be done.
|
||||
- Amortize the cost of I/O for the persistent queue, instead of having many
|
||||
tiny queues.
|
||||
- Coalesce deletions into a smaller number of larger DeleteObjects calls
|
||||
|
||||
Because of the cost of doing I/O for persistence, and the desire to coalesce
|
||||
generation validation requests across tenants, and coalesce deletions into
|
||||
larger DeleteObjects requests, there will be one deletion queue per pageserver
|
||||
rather than one per tenant. This has the added benefit that when deactivating
|
||||
a tenant, we do not have to drain their deletion queue: deletions can proceed
|
||||
for a tenant whose main `Tenant` object has been torn down.
|
||||
|
||||
#### Flow of deletion
|
||||
|
||||
The flow of a deletion is becomes:
|
||||
|
||||
1. Need for deletion of an object (=> layer file) is identified.
|
||||
2. Unlink the object from all the places that reference it (=> `index_part.json`).
|
||||
3. Enqueue the deletion to a persistent queue.
|
||||
Each entry is `tenant_id, attachment_generation, S3 key`.
|
||||
4. Validate & execute in batches:
|
||||
4.1 For a batch of entries, call into control plane.
|
||||
4.2 For the subset of entries that passed validation, execute a `DeleteObjects` S3 DELETE request for their S3 keys.
|
||||
|
||||
As outlined in the Part 1 on correctness, it is critical that deletions are only
|
||||
executed once the key is not referenced anywhere in S3.
|
||||
This property is obviously upheld by the scheme above.
|
||||
|
||||
#### We Accept Object Leakage In Acceptable Circumcstances
|
||||
|
||||
If we crash in the flow above between (2) and (3), we lose track of unreferenced object.
|
||||
Further, enqueuing a single to the persistent queue may not be durable immediately to amortize cost of flush to disk.
|
||||
This is acceptable for now, it can be caught by [the scrubber](#cleaning-up-orphan-objects-scrubbing).
|
||||
|
||||
There are various measures we can take to improve this in the future.
|
||||
1. Cap amount of time until enqueued entry becomes durable (timeout for flush-to-tisk)
|
||||
2. Proactively flush:
|
||||
- On graceful shutdown, as we anticipate that some or
|
||||
all of our attachments may be re-assigned while we are offline.
|
||||
- On tenant detach.
|
||||
3. For each entry, keep track of whether it has passed (2).
|
||||
Only admit entries to (4) one they have passed (2).
|
||||
This requires re-writing / two queue entries (intent, commit) per deletion.
|
||||
|
||||
The important take-away with any of the above is that it's not
|
||||
disastrous to leak objects in exceptional circumstances.
|
||||
|
||||
#### Operations that may skip the queue
|
||||
|
||||
Deletions of an entire timeline are [exempt](#Timeline-Deletion) from generation number validation. Once the
|
||||
control plane sends the deletion request, there is no requirement to retain the readability
|
||||
of any data within the timeline, and all objects within the timeline path may be deleted
|
||||
at any time from the control plane's deletion request onwards.
|
||||
|
||||
Since deletions of smaller timelines won't have enough objects to compose a full sized
|
||||
DeleteObjects request, it is still useful to send these through the last part of the
|
||||
deletion pipeline to coalesce with other executing deletions: to enable this, the
|
||||
deletion queue should expose two input channels: one for deletions that must be
|
||||
processed in a generation-aware way, and a fast path for timeline deletions, where
|
||||
that fast path may skip validation and the persistent queue.
|
||||
|
||||
### Cleaning up orphan objects (scrubbing)
|
||||
|
||||
An orphan object is any object which is no longer referenced by a running node or by metadata.
|
||||
|
||||
Examples of how orphan objects arise:
|
||||
|
||||
- A node PUTs a layer object, then crashes before it writes the
|
||||
index_part.json that references that layer.
|
||||
- A stale node carries on running for some time, and writes out an unbounded number of
|
||||
objects while it believes itself to be the rightful writer for a tenant.
|
||||
- A pageserver crashes between un-linking an object from the index, and persisting
|
||||
the object to its deletion queue.
|
||||
|
||||
Orphan objects are functionally harmless, but have a small cost due to S3 capacity consumed. We
|
||||
may clean them up at some time in the future, but doing a ListObjectsv2 operation and cross
|
||||
referencing with the latest metadata to identify objects which are not referenced.
|
||||
|
||||
Scrubbing will be done only by an attached pageserver (not some third party process), and deletions requested during scrub will go through the same
|
||||
validation as all other deletions: the attachment generation must be
|
||||
fresh. This avoids the possibility of a stale pageserver incorrectly
|
||||
thinking than an object written by a newer generation is stale, and deleting
|
||||
it.
|
||||
|
||||
It is not strictly necessary that scrubbing be done by an attached
|
||||
pageserver: it could also be done externally. However, an external
|
||||
scrubber would still require the same validation procedure that
|
||||
a pageserver's deletion queue performs, before actually erasing
|
||||
objects.
|
||||
|
||||
## Operational impact
|
||||
|
||||
### Availability
|
||||
|
||||
Coordination of generation numbers via the control plane introduce a dependency for certain
|
||||
operations:
|
||||
|
||||
1. Starting new pageservers (or activating pageservers after a restart)
|
||||
2. Executing enqueued deletions
|
||||
3. Advertising updated `remote_consistent_lsn` to enable WAL trimming
|
||||
|
||||
Item 1. would mean that some in-place restarts that previously would have resumed service even if the control plane were
|
||||
unavailable, will now not resume service to users until the control plane is available. We could
|
||||
avoid this by having a timeout on communication with the control plane, and after some timeout,
|
||||
resume service with the previous generation numbers (assuming this was persisted to disk). However,
|
||||
this is unlikely to be needed as the control plane is already an essential & highly available component. Also, having a node re-use an old generation number would complicate
|
||||
reasoning about the system, as it would break the invariant that a generation number uniquely identifies
|
||||
a tenant's attachment to a given pageserver _process_: it would merely identify the tenant's attachment
|
||||
to the pageserver _machine_ or its _on-disk-state_.
|
||||
|
||||
Item 2. is a non-issue operationally: it's harmless to delay deletions, the only impact of objects pending deletion is
|
||||
the S3 capacity cost.
|
||||
|
||||
Item 3. could be an issue if safekeepers are low on disk space and the control plane is unavailable for a long time. If this became an issue,
|
||||
we could adjust the safekeeper to delete segments from local disk sooner, as soon as they're uploaded to S3, rather than waiting for
|
||||
remote_consistent_lsn to advance.
|
||||
|
||||
For a managed service, the general approach should be to make sure we are monitoring & respond fast enough
|
||||
that control plane outages are bounded in time.
|
||||
|
||||
There is also the fact that control plane runs in a single region.
|
||||
The latency for distant regions is not a big concern for us because all request types added by this RFC are either infrequent or not in the way of the data path.
|
||||
However, we lose region isolation for the operations listed above.
|
||||
The ongoing work to split console and control will give us per-region control plane, and all operations in this RFC can be handled by these per-region control planes.
|
||||
With that in mind, we accept the trade-offs outlined in this paragraph.
|
||||
|
||||
We will also implement an "escape hatch" config generation numbers, where in a major disaster outage,
|
||||
we may manually run pageservers with a hand-selected generation number, so that we can bring them online
|
||||
independently of a control plane.
|
||||
|
||||
### Rollout
|
||||
|
||||
Although there is coupling between components, we may deploy most of the new data plane components
|
||||
independently of the control plane: initially they can just use a static generation number.
|
||||
|
||||
#### Phase 1
|
||||
|
||||
The pageserver is deployed with some special config to:
|
||||
|
||||
- Always act like everything is generation 1 and do not wait for a control plane issued generation on attach
|
||||
- Skip the places in deletion and remote_consistent_lsn updates where we would call into control plane
|
||||
|
||||
#### Phase 2
|
||||
|
||||
The control plane changes are deployed: control plane will now track and increment generation numbers.
|
||||
|
||||
#### Phase 3
|
||||
|
||||
The pageserver is deployed with its control-plane-dependent changes enabled: it will now require
|
||||
the control plane to service re-attach requests on startup, and handle generation
|
||||
validation requests.
|
||||
|
||||
### On-disk backward compatibility
|
||||
|
||||
Backward compatibility with existing data is straightforward:
|
||||
|
||||
- When reading the index, we may assume that any layer whose metadata doesn't include
|
||||
generations will have a path without generation suffix.
|
||||
- When locating the index file on attachment, we may use the "fallback" listing path
|
||||
and if there is only an index without generation suffix, that is the one we load.
|
||||
|
||||
It is not necessary to re-write existing layers: even new index files will be able
|
||||
to represent generation-less layers.
|
||||
|
||||
### On-disk forward compatibility
|
||||
|
||||
We will do a two phase rollout, probably over multiple releases because we will naturally
|
||||
have some of the read-side code ready before the overall functionality is ready:
|
||||
|
||||
1. Deploy pageservers which understand the new index format and generation suffixes
|
||||
in keys, but do not write objects with generation numbers in the keys.
|
||||
2. Deploy pageservers that write objects with generation numbers in the keys.
|
||||
|
||||
Old pageservers will be oblivious to generation numbers. That means that they can't
|
||||
read objects with generation numbers in the name. This is why we must
|
||||
first step must deploy the ability to read, before the second step
|
||||
starts writing them.
|
||||
|
||||
# Frequently Asked Questions
|
||||
|
||||
## Why a generation _suffix_ rather than _prefix_?
|
||||
|
||||
The choice is motivated by object listing, since one can list by prefix but not
|
||||
suffix.
|
||||
|
||||
In [finding remote indices](#finding-the-remote-indices-for-timelines), we rely
|
||||
on being able to do a prefix listing for `<tenant>/<timeline>/index_part.json*`.
|
||||
That relies on the prefix listing.
|
||||
|
||||
The converse case of using a generation prefix and listing by generation is
|
||||
not needed: one could imagine listing by generation while scrubbing (so that
|
||||
a particular generation's layers could be scrubbed), but this is not part
|
||||
of normal operations, and the [scrubber](#cleaning-up-orphan-objects-scrubbing) probably won't work that way anyway.
|
||||
|
||||
## Wouldn't it be simpler to have a separate deletion queue per timeline?
|
||||
|
||||
Functionally speaking, we could. That's how RemoteTimelineClient currently works,
|
||||
but this approach does not map well to a long-lived persistent queue with
|
||||
generation validation.
|
||||
|
||||
Anything we do per-timeline generates tiny random I/O, on a pageserver with
|
||||
tens of thousands of timelines operating: to be ready for high scale, we should:
|
||||
|
||||
- A) Amortize costs where we can (e.g. a shared deletion queue)
|
||||
- B) Expect to put tenants into a quiescent state while they're not
|
||||
busy: i.e. we shouldn't keep a tenant alive to service its deletion queue.
|
||||
|
||||
This was discussed in the [scope](#scope) part of the deletion queue section.
|
||||
|
||||
# Appendix A: Examples of use in high availability/failover
|
||||
|
||||
The generation numbers proposed in this RFC are adaptable to a variety of different
|
||||
failover scenarios and models. The sections below sketch how they would work in practice.
|
||||
|
||||
### In-place restart of a pageserver
|
||||
|
||||
"In-place" here means that the restart is done before any other element in the system
|
||||
has taken action in response to the node being down.
|
||||
|
||||
- After restart, the node issues a re-attach request to the control plane, and
|
||||
receives new generation numbers for all its attached tenants.
|
||||
- Tenants may be activated with the generation number in the re-attach response.
|
||||
- If any of its attachments were in fact stale (i.e. had be reassigned to another
|
||||
node while this node was offline), then
|
||||
- the re-attach response will inform the tenant about this by not including
|
||||
the tenant of this by _not_ incrementing the generation for that attachment.
|
||||
- This will implicitly block deletions in the tenant, but as an optimization
|
||||
the pageserver should also proactively stop doing S3 uploads when it notices this stale-generation state.
|
||||
- The control plane is expected to eventually detach this tenant from the
|
||||
pageserver.
|
||||
|
||||
If the control plane does not include a tenant in the re-attach response,
|
||||
but there is still local state for the tenant in the filesystem, the pageserver
|
||||
deletes the local state in response and does not load/active the tenant.
|
||||
See the [earlier section on pageserver startup](#pageserver-attachstartup-changes) for details.
|
||||
Control plane can use this mechanism to clean up a pageserver that has been
|
||||
down for so long that all its tenants were migrated away before it came back
|
||||
up again and asked for re-attach.
|
||||
|
||||
### Failure of a pageserver
|
||||
|
||||
In this context, read "failure" as the most ambiguous possible case, where
|
||||
a pageserver is unavailable to clients and control plane, but may still be executing and talking
|
||||
to S3.
|
||||
|
||||
#### Case A: re-attachment to other nodes
|
||||
|
||||
1. Let's say node 0 becomes unresponsive in a cluster of three nodes 0, 1, 2.
|
||||
2. Some external mechanism notices that the node is unavailable and initiates
|
||||
movement of all tenants attached to that node to a different node according
|
||||
to some distribution rule.
|
||||
In this example, it would mean incrementing the generation
|
||||
of all tenants that were attached to node 0, as each tenant's assigned pageserver changes.
|
||||
3. A tenant which is now attached to node 1 will _also_ still be attached to node
|
||||
0, from the perspective of node 0. Node 0 will still be using its old generation,
|
||||
node 1 will be using a newer generation.
|
||||
4. S3 writes will continue from nodes 0 and 1: there will be an index_part.json-00000001
|
||||
\_and\* an index_part.json-00000002. Objects written under the old suffix
|
||||
after the new attachment was created do not matter from the rest of the system's
|
||||
perspective: the endpoints are reading from the new attachment location. Objects
|
||||
written by node 0 are just garbage that can be cleaned up at leisure. Node 0 will
|
||||
not do any deletions because it can't synchronize with control plane, or if it could,
|
||||
its deletion queue processing would get errors for the validation requests.
|
||||
|
||||
#### Case B: direct node replacement with same node_id and drive
|
||||
|
||||
This is the scenario we would experience if running pageservers in some dynamic
|
||||
VM/container environment that would auto-replace a given node_id when it became
|
||||
unresponsive, with the node's storage supplied by some network block device
|
||||
that is attached to the replacement VM/container.
|
||||
|
||||
1. Let's say node 0 fails, and there may be some other peers but they aren't relevant.
|
||||
2. Some external mechanism notices that the node is unavailable, and creates
|
||||
a "new node 0" (Node 0b) which is a physically separate server. The original node 0
|
||||
(Node 0a) may still be running, because we do not assume the environment fences nodes.
|
||||
3. On startup, node 0b re-attaches and gets higher generation numbers for
|
||||
all tenants.
|
||||
4. S3 writes continue from nodes 0a and 0b, but the writes do not collide due to different
|
||||
generation in the suffix, and the writes from node 0a are not visible to the rest
|
||||
of the system because endpoints are reading only from node 0b.
|
||||
|
||||
# Appendix B: interoperability with other features
|
||||
|
||||
## Sharded Keyspace
|
||||
|
||||
The design in this RFC maps neatly to a sharded keyspace design where subsets of the key space
|
||||
for a tenant are assigned to different pageservers:
|
||||
|
||||
- the "unit of work" for attachments becomes something like a TenantShard rather than a Tenant
|
||||
- TenantShards get generation numbers just as Tenants do.
|
||||
- Write workload (ingest, compaction) for a tenant is spread out across pageservers via
|
||||
TenantShards, but each TenantShard still has exactly one valid writer at a time.
|
||||
|
||||
## Read replicas
|
||||
|
||||
_This section is about a passive reader of S3 pageserver state, not a postgres
|
||||
read replica_
|
||||
|
||||
For historical reads to LSNs below the remote persistent LSN, any node may act as a reader at any
|
||||
time: remote data is logically immutable data, and the use of deferred deletion in this RFC helps
|
||||
mitigate the fact that remote data is not _physically_ immutable (i.e. the actual data for a given
|
||||
page moves around as compaction happens).
|
||||
|
||||
A read replica needs to be aware of generations in remote data in order to read the latest
|
||||
metadata (find the index_part.json with the latest suffix). It may either query this
|
||||
from the control plane, or find it with ListObjectsv2 request
|
||||
|
||||
## Seamless migration
|
||||
|
||||
To make tenant migration totally seamless, we will probably want to intentionally double-attach
|
||||
a tenant briefly, serving reads from the old node while waiting for the new node to be ready.
|
||||
|
||||
This RFC enables that double-attachment: two nodes may be attached at the same time, with the migration destination
|
||||
having a higher generation number. The old node will be able to ingest and serve reads, but not
|
||||
do any deletes. The new node's attachment must also avoid deleting layers that the old node may
|
||||
still use. A new piece of state
|
||||
will be needed for this in the control plane's definition of an attachment.
|
||||
|
||||
## Warm secondary locations
|
||||
|
||||
To enable faster tenant movement after a pageserver is lost, we will probably want to spend some
|
||||
disk capacity on keeping standby locations populated with local disk data.
|
||||
|
||||
There's no conflict between this RFC and that: implementing warm secondary locations on a per-tenant basis
|
||||
would be a separate change to the control plane to store standby location(s) for a tenant. Because
|
||||
the standbys do not write to S3, they do not need to be assigned generation numbers. When a tenant is
|
||||
re-attached to a standby location, that would increment the tenant attachment generation and this
|
||||
would work the same as any other attachment change, but with a warm cache.
|
||||
|
||||
## Ephemeral node IDs
|
||||
|
||||
This RFC intentionally avoids changing anything fundamental about how pageservers are identified
|
||||
and registered with the control plane, to avoid coupling the implementation of pageserver split
|
||||
brain protection with more fundamental changes in the management of the pageservers.
|
||||
|
||||
Moving to ephemeral node IDs would provide an extra layer of
|
||||
resilience in the system, as it would prevent the control plane
|
||||
accidentally attaching to two physical nodes with the same
|
||||
generation, if somehow there were two physical nodes with
|
||||
the same node IDs (currently we rely on EC2 guarantees to
|
||||
eliminate this scenario). With ephemeral node IDs, there would be
|
||||
no possibility of that happening, no matter the behavior of
|
||||
underlying infrastructure.
|
||||
|
||||
Nothing fundamental in the pageserver's handling of generations needs to change to handle ephemeral node IDs, since we hardly use the
|
||||
`node_id` anywhere. The `/re-attach` API would be extended
|
||||
to enable the pageserver to obtain its ephemeral ID, and provide
|
||||
some correlation identifier (e.g. EC instance ID), to help the
|
||||
control plane re-attach tenants to the same physical server that
|
||||
previously had them attached.
|
||||
@@ -31,6 +31,8 @@ fn lsn_invalid() -> Lsn {
|
||||
#[serde_as]
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
pub struct SkTimelineInfo {
|
||||
/// Term.
|
||||
pub term: Option<u64>,
|
||||
/// Term of the last entry.
|
||||
pub last_log_term: Option<u64>,
|
||||
/// LSN of the last record.
|
||||
@@ -58,4 +60,6 @@ pub struct SkTimelineInfo {
|
||||
/// A connection string to use for WAL receiving.
|
||||
#[serde(default)]
|
||||
pub safekeeper_connstr: Option<String>,
|
||||
#[serde(default)]
|
||||
pub http_connstr: Option<String>,
|
||||
}
|
||||
|
||||
@@ -38,6 +38,7 @@ url.workspace = true
|
||||
uuid.workspace = true
|
||||
|
||||
pq_proto.workspace = true
|
||||
postgres_connection.workspace = true
|
||||
metrics.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
|
||||
|
||||
@@ -58,6 +58,8 @@ pub mod serde_regex;
|
||||
|
||||
pub mod pageserver_feedback;
|
||||
|
||||
pub mod postgres_client;
|
||||
|
||||
pub mod tracing_span_assert;
|
||||
|
||||
pub mod rate_limit;
|
||||
|
||||
37
libs/utils/src/postgres_client.rs
Normal file
37
libs/utils/src/postgres_client.rs
Normal file
@@ -0,0 +1,37 @@
|
||||
//! Postgres client connection code common to other crates (safekeeper and
|
||||
//! pageserver) which depends on tenant/timeline ids and thus not fitting into
|
||||
//! postgres_connection crate.
|
||||
|
||||
use anyhow::Context;
|
||||
use postgres_connection::{parse_host_port, PgConnectionConfig};
|
||||
|
||||
use crate::id::TenantTimelineId;
|
||||
|
||||
/// Create client config for fetching WAL from safekeeper on particular timeline.
|
||||
/// listen_pg_addr_str is in form host:\[port\].
|
||||
pub fn wal_stream_connection_config(
|
||||
TenantTimelineId {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
}: TenantTimelineId,
|
||||
listen_pg_addr_str: &str,
|
||||
auth_token: Option<&str>,
|
||||
availability_zone: Option<&str>,
|
||||
) -> anyhow::Result<PgConnectionConfig> {
|
||||
let (host, port) =
|
||||
parse_host_port(listen_pg_addr_str).context("Unable to parse listen_pg_addr_str")?;
|
||||
let port = port.unwrap_or(5432);
|
||||
let mut connstr = PgConnectionConfig::new_host_port(host, port)
|
||||
.extend_options([
|
||||
"-c".to_owned(),
|
||||
format!("timeline_id={}", timeline_id),
|
||||
format!("tenant_id={}", tenant_id),
|
||||
])
|
||||
.set_password(auth_token.map(|s| s.to_owned()));
|
||||
|
||||
if let Some(availability_zone) = availability_zone {
|
||||
connstr = connstr.extend_options([format!("availability_zone={}", availability_zone)]);
|
||||
}
|
||||
|
||||
Ok(connstr)
|
||||
}
|
||||
@@ -16,3 +16,19 @@ in the `neon-postgres` cgroup and set its `memory.{max,high}`.
|
||||
* See also: [`neondatabase/vm-monitor`](https://github.com/neondatabase/vm-monitor/),
|
||||
where initial development of the monitor happened. The repository is no longer
|
||||
maintained but the commit history may be useful for debugging.
|
||||
|
||||
## Structure
|
||||
|
||||
The `vm-monitor` is loosely comprised of a few systems. These are:
|
||||
* the server: this is just a simple `axum` server that accepts requests and
|
||||
upgrades them to websocket connections. The server only allows one connection at
|
||||
a time. This means that upon receiving a new connection, the server will terminate
|
||||
and old one if it exists.
|
||||
* the filecache: a struct that allows communication with the Postgres file cache.
|
||||
On startup, we connect to the filecache and hold on to the connection for the
|
||||
entire monitor lifetime.
|
||||
* the cgroup watcher: the `CgroupWatcher` manages the `neon-postgres` cgroup by
|
||||
listening for `memory.high` events and setting its `memory.{high,max}` values.
|
||||
* the runner: the runner marries the filecache and cgroup watcher together,
|
||||
communicating with the agent throught the `Dispatcher`, and then calling filecache
|
||||
and cgroup watcher functions as needed to upscale and downscale
|
||||
|
||||
@@ -634,7 +634,7 @@ impl CgroupWatcher {
|
||||
.context("failed to get memory subsystem")?
|
||||
.set_mem(cgroups_rs::memory::SetMemory {
|
||||
low: None,
|
||||
high: Some(MaxValue::Value(bytes.min(i64::MAX as u64) as i64)),
|
||||
high: Some(MaxValue::Value(u64::min(bytes, i64::MAX as u64) as i64)),
|
||||
min: None,
|
||||
max: None,
|
||||
})
|
||||
@@ -654,8 +654,10 @@ impl CgroupWatcher {
|
||||
.set_mem(cgroups_rs::memory::SetMemory {
|
||||
min: None,
|
||||
low: None,
|
||||
high: Some(MaxValue::Value(limits.high.min(i64::MAX as u64) as i64)),
|
||||
max: Some(MaxValue::Value(limits.max.min(i64::MAX as u64) as i64)),
|
||||
high: Some(MaxValue::Value(
|
||||
u64::min(limits.high, i64::MAX as u64) as i64
|
||||
)),
|
||||
max: Some(MaxValue::Value(u64::min(limits.max, i64::MAX as u64) as i64)),
|
||||
})
|
||||
.context("failed to set memory limits")
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
//! Managing the websocket connection and other signals in the monitor.
|
||||
//!
|
||||
//! Contains types that manage the interaction (not data interchange, see `protocol`)
|
||||
//! between informant and monitor, allowing us to to process and send messages in a
|
||||
//! between agent and monitor, allowing us to to process and send messages in a
|
||||
//! straightforward way. The dispatcher also manages that signals that come from
|
||||
//! the cgroup (requesting upscale), and the signals that go to the cgroup
|
||||
//! (notifying it of upscale).
|
||||
@@ -24,16 +24,16 @@ use crate::protocol::{
|
||||
/// The central handler for all communications in the monitor.
|
||||
///
|
||||
/// The dispatcher has two purposes:
|
||||
/// 1. Manage the connection to the informant, sending and receiving messages.
|
||||
/// 1. Manage the connection to the agent, sending and receiving messages.
|
||||
/// 2. Communicate with the cgroup manager, notifying it when upscale is received,
|
||||
/// and sending a message to the informant when the cgroup manager requests
|
||||
/// and sending a message to the agent when the cgroup manager requests
|
||||
/// upscale.
|
||||
#[derive(Debug)]
|
||||
pub struct Dispatcher {
|
||||
/// We read informant messages of of `source`
|
||||
/// We read agent messages of of `source`
|
||||
pub(crate) source: SplitStream<WebSocket>,
|
||||
|
||||
/// We send messages to the informant through `sink`
|
||||
/// We send messages to the agent through `sink`
|
||||
sink: SplitSink<WebSocket, Message>,
|
||||
|
||||
/// Used to notify the cgroup when we are upscaled.
|
||||
@@ -43,7 +43,7 @@ pub struct Dispatcher {
|
||||
/// we send an `UpscaleRequst` to the agent.
|
||||
pub(crate) request_upscale_events: mpsc::Receiver<()>,
|
||||
|
||||
/// The protocol version we have agreed to use with the informant. This is negotiated
|
||||
/// The protocol version we have agreed to use with the agent. This is negotiated
|
||||
/// during the creation of the dispatcher, and should be the highest shared protocol
|
||||
/// version.
|
||||
///
|
||||
@@ -56,9 +56,9 @@ pub struct Dispatcher {
|
||||
impl Dispatcher {
|
||||
/// Creates a new dispatcher using the passed-in connection.
|
||||
///
|
||||
/// Performs a negotiation with the informant to determine the highest protocol
|
||||
/// Performs a negotiation with the agent to determine the highest protocol
|
||||
/// version that both support. This consists of two steps:
|
||||
/// 1. Wait for the informant to sent the range of protocols it supports.
|
||||
/// 1. Wait for the agent to sent the range of protocols it supports.
|
||||
/// 2. Send a protocol version that works for us as well, or an error if there
|
||||
/// is no compatible version.
|
||||
pub async fn new(
|
||||
@@ -69,7 +69,7 @@ impl Dispatcher {
|
||||
let (mut sink, mut source) = stream.split();
|
||||
|
||||
// Figure out the highest protocol version we both support
|
||||
info!("waiting for informant to send protocol version range");
|
||||
info!("waiting for agent to send protocol version range");
|
||||
let Some(message) = source.next().await else {
|
||||
bail!("websocket connection closed while performing protocol handshake")
|
||||
};
|
||||
@@ -79,7 +79,7 @@ impl Dispatcher {
|
||||
let Message::Text(message_text) = message else {
|
||||
// All messages should be in text form, since we don't do any
|
||||
// pinging/ponging. See nhooyr/websocket's implementation and the
|
||||
// informant/agent for more info
|
||||
// agent for more info
|
||||
bail!("received non-text message during proocol handshake: {message:?}")
|
||||
};
|
||||
|
||||
@@ -88,32 +88,30 @@ impl Dispatcher {
|
||||
max: PROTOCOL_MAX_VERSION,
|
||||
};
|
||||
|
||||
let informant_range: ProtocolRange = serde_json::from_str(&message_text)
|
||||
let agent_range: ProtocolRange = serde_json::from_str(&message_text)
|
||||
.context("failed to deserialize protocol version range")?;
|
||||
|
||||
info!(range = ?informant_range, "received protocol version range");
|
||||
info!(range = ?agent_range, "received protocol version range");
|
||||
|
||||
let highest_shared_version = match monitor_range.highest_shared_version(&informant_range) {
|
||||
let highest_shared_version = match monitor_range.highest_shared_version(&agent_range) {
|
||||
Ok(version) => {
|
||||
sink.send(Message::Text(
|
||||
serde_json::to_string(&ProtocolResponse::Version(version)).unwrap(),
|
||||
))
|
||||
.await
|
||||
.context("failed to notify informant of negotiated protocol version")?;
|
||||
.context("failed to notify agent of negotiated protocol version")?;
|
||||
version
|
||||
}
|
||||
Err(e) => {
|
||||
sink.send(Message::Text(
|
||||
serde_json::to_string(&ProtocolResponse::Error(format!(
|
||||
"Received protocol version range {} which does not overlap with {}",
|
||||
informant_range, monitor_range
|
||||
agent_range, monitor_range
|
||||
)))
|
||||
.unwrap(),
|
||||
))
|
||||
.await
|
||||
.context(
|
||||
"failed to notify informant of no overlap between protocol version ranges",
|
||||
)?;
|
||||
.context("failed to notify agent of no overlap between protocol version ranges")?;
|
||||
Err(e).context("error determining suitable protocol version range")?
|
||||
}
|
||||
};
|
||||
@@ -137,7 +135,7 @@ impl Dispatcher {
|
||||
.context("failed to send resources and oneshot sender across channel")
|
||||
}
|
||||
|
||||
/// Send a message to the informant.
|
||||
/// Send a message to the agent.
|
||||
///
|
||||
/// Although this function is small, it has one major benefit: it is the only
|
||||
/// way to send data accross the connection, and you can only pass in a proper
|
||||
|
||||
@@ -59,8 +59,8 @@ pub struct FileCacheConfig {
|
||||
spread_factor: f64,
|
||||
}
|
||||
|
||||
impl Default for FileCacheConfig {
|
||||
fn default() -> Self {
|
||||
impl FileCacheConfig {
|
||||
pub fn default_in_memory() -> Self {
|
||||
Self {
|
||||
in_memory: true,
|
||||
// 75 %
|
||||
@@ -71,9 +71,19 @@ impl Default for FileCacheConfig {
|
||||
spread_factor: 0.1,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl FileCacheConfig {
|
||||
pub fn default_on_disk() -> Self {
|
||||
Self {
|
||||
in_memory: false,
|
||||
resource_multiplier: 0.75,
|
||||
// 256 MiB - lower than when in memory because overcommitting is safe; if we don't have
|
||||
// memory, the kernel will just evict from its page cache, rather than e.g. killing
|
||||
// everything.
|
||||
min_remaining_after_cache: NonZeroU64::new(256 * MiB).unwrap(),
|
||||
spread_factor: 0.1,
|
||||
}
|
||||
}
|
||||
|
||||
/// Make sure fields of the config are consistent.
|
||||
pub fn validate(&self) -> anyhow::Result<()> {
|
||||
// Single field validity
|
||||
@@ -132,11 +142,11 @@ impl FileCacheConfig {
|
||||
|
||||
// Conversions to ensure we don't overflow from floating-point ops
|
||||
let size_from_spread =
|
||||
0_i64.max((available as f64 / (1.0 + self.spread_factor)) as i64) as u64;
|
||||
i64::max(0, (available as f64 / (1.0 + self.spread_factor)) as i64) as u64;
|
||||
|
||||
let size_from_normal = (total as f64 * self.resource_multiplier) as u64;
|
||||
|
||||
let byte_size = size_from_spread.min(size_from_normal);
|
||||
let byte_size = u64::min(size_from_spread, size_from_normal);
|
||||
|
||||
// The file cache operates in units of mebibytes, so the sizes we produce should
|
||||
// be rounded to a mebibyte. We round down to be conservative.
|
||||
@@ -268,7 +278,7 @@ impl FileCacheState {
|
||||
.context("failed to extract max file cache size from query result")?;
|
||||
|
||||
let max_mb = max_bytes / MiB;
|
||||
let num_mb = (num_bytes / MiB).max(max_mb);
|
||||
let num_mb = u64::min(num_bytes, max_bytes) / MiB;
|
||||
|
||||
let capped = if num_bytes > max_bytes {
|
||||
" (capped by maximum size)"
|
||||
|
||||
@@ -39,6 +39,16 @@ pub struct Args {
|
||||
#[arg(short, long)]
|
||||
pub pgconnstr: Option<String>,
|
||||
|
||||
/// Flag to signal that the Postgres file cache is on disk (i.e. not in memory aside from the
|
||||
/// kernel's page cache), and therefore should not count against available memory.
|
||||
//
|
||||
// NB: Ideally this flag would directly refer to whether the file cache is in memory (rather
|
||||
// than a roundabout way, via whether it's on disk), but in order to be backwards compatible
|
||||
// during the switch away from an in-memory file cache, we had to default to the previous
|
||||
// behavior.
|
||||
#[arg(long)]
|
||||
pub file_cache_on_disk: bool,
|
||||
|
||||
/// The address we should listen on for connection requests. For the
|
||||
/// agent, this is 0.0.0.0:10301. For the informant, this is 127.0.0.1:10369.
|
||||
#[arg(short, long)]
|
||||
@@ -146,7 +156,7 @@ pub async fn start(args: &'static Args, token: CancellationToken) -> anyhow::Res
|
||||
|
||||
/// Handles incoming websocket connections.
|
||||
///
|
||||
/// If we are already to connected to an informant, we kill that old connection
|
||||
/// If we are already to connected to an agent, we kill that old connection
|
||||
/// and accept the new one.
|
||||
#[tracing::instrument(name = "/monitor", skip_all, fields(?args))]
|
||||
pub async fn ws_handler(
|
||||
@@ -196,10 +206,10 @@ async fn start_monitor(
|
||||
return;
|
||||
}
|
||||
};
|
||||
info!("connected to informant");
|
||||
info!("connected to agent");
|
||||
|
||||
match monitor.run().await {
|
||||
Ok(()) => info!("monitor was killed due to new connection"),
|
||||
Err(e) => error!(error = ?e, "monitor terminated by itself"),
|
||||
Err(e) => error!(error = ?e, "monitor terminated unexpectedly"),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,13 +1,13 @@
|
||||
//! Types representing protocols and actual informant-monitor messages.
|
||||
//! Types representing protocols and actual agent-monitor messages.
|
||||
//!
|
||||
//! The pervasive use of serde modifiers throughout this module is to ease
|
||||
//! serialization on the go side. Because go does not have enums (which model
|
||||
//! messages well), it is harder to model messages, and we accomodate that with
|
||||
//! serde.
|
||||
//!
|
||||
//! *Note*: the informant sends and receives messages in different ways.
|
||||
//! *Note*: the agent sends and receives messages in different ways.
|
||||
//!
|
||||
//! The informant serializes messages in the form and then sends them. The use
|
||||
//! The agent serializes messages in the form and then sends them. The use
|
||||
//! of `#[serde(tag = "type", content = "content")]` allows us to use `Type`
|
||||
//! to determine how to deserialize `Content`.
|
||||
//! ```ignore
|
||||
@@ -25,9 +25,9 @@
|
||||
//! Id uint64
|
||||
//! }
|
||||
//! ```
|
||||
//! After reading the type field, the informant will decode the entire message
|
||||
//! After reading the type field, the agent will decode the entire message
|
||||
//! again, this time into the correct type using the embedded fields.
|
||||
//! Because the informant cannot just extract the json contained in a certain field
|
||||
//! Because the agent cannot just extract the json contained in a certain field
|
||||
//! (it initially deserializes to `map[string]interface{}`), we keep the fields
|
||||
//! at the top level, so the entire piece of json can be deserialized into a struct,
|
||||
//! such as a `DownscaleResult`, with the `Type` and `Id` fields ignored.
|
||||
@@ -37,7 +37,7 @@ use std::cmp;
|
||||
|
||||
use serde::{de::Error, Deserialize, Serialize};
|
||||
|
||||
/// A Message we send to the informant.
|
||||
/// A Message we send to the agent.
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
pub struct OutboundMsg {
|
||||
#[serde(flatten)]
|
||||
@@ -51,31 +51,31 @@ impl OutboundMsg {
|
||||
}
|
||||
}
|
||||
|
||||
/// The different underlying message types we can send to the informant.
|
||||
/// The different underlying message types we can send to the agent.
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
#[serde(tag = "type")]
|
||||
pub enum OutboundMsgKind {
|
||||
/// Indicates that the informant sent an invalid message, i.e, we couldn't
|
||||
/// Indicates that the agent sent an invalid message, i.e, we couldn't
|
||||
/// properly deserialize it.
|
||||
InvalidMessage { error: String },
|
||||
/// Indicates that we experienced an internal error while processing a message.
|
||||
/// For example, if a cgroup operation fails while trying to handle an upscale,
|
||||
/// we return `InternalError`.
|
||||
InternalError { error: String },
|
||||
/// Returned to the informant once we have finished handling an upscale. If the
|
||||
/// Returned to the agent once we have finished handling an upscale. If the
|
||||
/// handling was unsuccessful, an `InternalError` will get returned instead.
|
||||
/// *Note*: this is a struct variant because of the way go serializes struct{}
|
||||
UpscaleConfirmation {},
|
||||
/// Indicates to the monitor that we are urgently requesting resources.
|
||||
/// *Note*: this is a struct variant because of the way go serializes struct{}
|
||||
UpscaleRequest {},
|
||||
/// Returned to the informant once we have finished attempting to downscale. If
|
||||
/// Returned to the agent once we have finished attempting to downscale. If
|
||||
/// an error occured trying to do so, an `InternalError` will get returned instead.
|
||||
/// However, if we are simply unsuccessful (for example, do to needing the resources),
|
||||
/// that gets included in the `DownscaleResult`.
|
||||
DownscaleResult {
|
||||
// FIXME for the future (once the informant is deprecated)
|
||||
// As of the time of writing, the informant/agent version of this struct is
|
||||
// As of the time of writing, the agent/informant version of this struct is
|
||||
// called api.DownscaleResult. This struct has uppercase fields which are
|
||||
// serialized as such. Thus, we serialize using uppercase names so we don't
|
||||
// have to make a breaking change to the agent<->informant protocol. Once
|
||||
@@ -88,12 +88,12 @@ pub enum OutboundMsgKind {
|
||||
status: String,
|
||||
},
|
||||
/// Part of the bidirectional heartbeat. The heartbeat is initiated by the
|
||||
/// informant.
|
||||
/// agent.
|
||||
/// *Note*: this is a struct variant because of the way go serializes struct{}
|
||||
HealthCheck {},
|
||||
}
|
||||
|
||||
/// A message received form the informant.
|
||||
/// A message received form the agent.
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
pub struct InboundMsg {
|
||||
#[serde(flatten)]
|
||||
@@ -101,7 +101,7 @@ pub struct InboundMsg {
|
||||
pub(crate) id: usize,
|
||||
}
|
||||
|
||||
/// The different underlying message types we can receive from the informant.
|
||||
/// The different underlying message types we can receive from the agent.
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
#[serde(tag = "type", content = "content")]
|
||||
pub enum InboundMsgKind {
|
||||
@@ -120,14 +120,14 @@ pub enum InboundMsgKind {
|
||||
/// when done.
|
||||
DownscaleRequest { target: Resources },
|
||||
/// Part of the bidirectional heartbeat. The heartbeat is initiated by the
|
||||
/// informant.
|
||||
/// agent.
|
||||
/// *Note*: this is a struct variant because of the way go serializes struct{}
|
||||
HealthCheck {},
|
||||
}
|
||||
|
||||
/// Represents the resources granted to a VM.
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
|
||||
// Renamed because the agent/informant has multiple resources types:
|
||||
// Renamed because the agent has multiple resources types:
|
||||
// `Resources` (milliCPU/memory slots)
|
||||
// `Allocation` (vCPU/bytes) <- what we correspond to
|
||||
#[serde(rename(serialize = "Allocation", deserialize = "Allocation"))]
|
||||
@@ -151,7 +151,7 @@ pub const PROTOCOL_MAX_VERSION: ProtocolVersion = ProtocolVersion::V1_0;
|
||||
pub struct ProtocolVersion(u8);
|
||||
|
||||
impl ProtocolVersion {
|
||||
/// Represents v1.0 of the informant<-> monitor protocol - the initial version
|
||||
/// Represents v1.0 of the agent<-> monitor protocol - the initial version
|
||||
///
|
||||
/// Currently the latest version.
|
||||
const V1_0: ProtocolVersion = ProtocolVersion(1);
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
//! Exposes the `Runner`, which handles messages received from informant and
|
||||
//! Exposes the `Runner`, which handles messages received from agent and
|
||||
//! sends upscale requests.
|
||||
//!
|
||||
//! This is the "Monitor" part of the monitor binary and is the main entrypoint for
|
||||
@@ -21,8 +21,8 @@ use crate::filecache::{FileCacheConfig, FileCacheState};
|
||||
use crate::protocol::{InboundMsg, InboundMsgKind, OutboundMsg, OutboundMsgKind, Resources};
|
||||
use crate::{bytes_to_mebibytes, get_total_system_memory, spawn_with_cancel, Args, MiB};
|
||||
|
||||
/// Central struct that interacts with informant, dispatcher, and cgroup to handle
|
||||
/// signals from the informant.
|
||||
/// Central struct that interacts with agent, dispatcher, and cgroup to handle
|
||||
/// signals from the agent.
|
||||
#[derive(Debug)]
|
||||
pub struct Runner {
|
||||
config: Config,
|
||||
@@ -110,10 +110,10 @@ impl Runner {
|
||||
// memory limits.
|
||||
if let Some(connstr) = &args.pgconnstr {
|
||||
info!("initializing file cache");
|
||||
let config: FileCacheConfig = Default::default();
|
||||
if !config.in_memory {
|
||||
panic!("file cache not in-memory implemented")
|
||||
}
|
||||
let config = match args.file_cache_on_disk {
|
||||
true => FileCacheConfig::default_on_disk(),
|
||||
false => FileCacheConfig::default_in_memory(),
|
||||
};
|
||||
|
||||
let mut file_cache = FileCacheState::new(connstr, config, token.clone())
|
||||
.await
|
||||
@@ -140,7 +140,10 @@ impl Runner {
|
||||
if actual_size != new_size {
|
||||
info!("file cache size actually got set to {actual_size}")
|
||||
}
|
||||
file_cache_reserved_bytes = actual_size;
|
||||
// Mark the resources given to the file cache as reserved, but only if it's in memory.
|
||||
if !args.file_cache_on_disk {
|
||||
file_cache_reserved_bytes = actual_size;
|
||||
}
|
||||
|
||||
state.filecache = Some(file_cache);
|
||||
}
|
||||
@@ -227,18 +230,17 @@ impl Runner {
|
||||
let mut status = vec![];
|
||||
let mut file_cache_mem_usage = 0;
|
||||
if let Some(file_cache) = &mut self.filecache {
|
||||
if !file_cache.config.in_memory {
|
||||
panic!("file cache not in-memory unimplemented")
|
||||
}
|
||||
|
||||
let actual_usage = file_cache
|
||||
.set_file_cache_size(expected_file_cache_mem_usage)
|
||||
.await
|
||||
.context("failed to set file cache size")?;
|
||||
file_cache_mem_usage = actual_usage;
|
||||
if file_cache.config.in_memory {
|
||||
file_cache_mem_usage = actual_usage;
|
||||
}
|
||||
let message = format!(
|
||||
"set file cache size to {} MiB",
|
||||
bytes_to_mebibytes(actual_usage)
|
||||
"set file cache size to {} MiB (in memory = {})",
|
||||
bytes_to_mebibytes(actual_usage),
|
||||
file_cache.config.in_memory,
|
||||
);
|
||||
info!("downscale: {message}");
|
||||
status.push(message);
|
||||
@@ -289,10 +291,6 @@ impl Runner {
|
||||
// Get the file cache's expected contribution to the memory usage
|
||||
let mut file_cache_mem_usage = 0;
|
||||
if let Some(file_cache) = &mut self.filecache {
|
||||
if !file_cache.config.in_memory {
|
||||
panic!("file cache not in-memory unimplemented");
|
||||
}
|
||||
|
||||
let expected_usage = file_cache.config.calculate_cache_size(usable_system_memory);
|
||||
info!(
|
||||
target = bytes_to_mebibytes(expected_usage),
|
||||
@@ -304,6 +302,9 @@ impl Runner {
|
||||
.set_file_cache_size(expected_usage)
|
||||
.await
|
||||
.context("failed to set file cache size")?;
|
||||
if file_cache.config.in_memory {
|
||||
file_cache_mem_usage = actual_usage;
|
||||
}
|
||||
|
||||
if actual_usage != expected_usage {
|
||||
warn!(
|
||||
@@ -312,7 +313,6 @@ impl Runner {
|
||||
bytes_to_mebibytes(actual_usage)
|
||||
)
|
||||
}
|
||||
file_cache_mem_usage = actual_usage;
|
||||
}
|
||||
|
||||
if let Some(cgroup) = &self.cgroup {
|
||||
@@ -371,7 +371,7 @@ impl Runner {
|
||||
Ok(None)
|
||||
}
|
||||
InboundMsgKind::InternalError { error } => {
|
||||
warn!(error, id, "informant experienced an internal error");
|
||||
warn!(error, id, "agent experienced an internal error");
|
||||
Ok(None)
|
||||
}
|
||||
InboundMsgKind::HealthCheck {} => {
|
||||
@@ -405,10 +405,12 @@ impl Runner {
|
||||
.await
|
||||
.context("failed to send message")?;
|
||||
}
|
||||
// there is a message from the informant
|
||||
// there is a message from the agent
|
||||
msg = self.dispatcher.source.next() => {
|
||||
if let Some(msg) = msg {
|
||||
info!(message = ?msg, "received message");
|
||||
// Don't use 'message' as a key as the string also uses
|
||||
// that for its key
|
||||
info!(?msg, "received message");
|
||||
match msg {
|
||||
Ok(msg) => {
|
||||
let message: InboundMsg = match msg {
|
||||
@@ -417,8 +419,10 @@ impl Runner {
|
||||
}
|
||||
other => {
|
||||
warn!(
|
||||
message = ?other,
|
||||
"informant should only send text messages but received different type"
|
||||
// Don't use 'message' as a key as the
|
||||
// string also uses that for its key
|
||||
msg = ?other,
|
||||
"agent should only send text messages but received different type"
|
||||
);
|
||||
continue
|
||||
},
|
||||
@@ -429,7 +433,7 @@ impl Runner {
|
||||
Ok(None) => continue,
|
||||
Err(e) => {
|
||||
let error = e.to_string();
|
||||
warn!(%error, "error handling message");
|
||||
warn!(?error, "error handling message");
|
||||
OutboundMsg::new(
|
||||
OutboundMsgKind::InternalError {
|
||||
error
|
||||
|
||||
@@ -97,7 +97,7 @@ pub(crate) fn parse_filename(name: &str) -> Option<LayerFile> {
|
||||
// Finds the max_holes largest holes, ignoring any that are smaller than MIN_HOLE_LENGTH"
|
||||
async fn get_holes(path: &Path, max_holes: usize) -> Result<Vec<Hole>> {
|
||||
let file = FileBlockReader::new(VirtualFile::open(path)?);
|
||||
let summary_blk = file.read_blk(0)?;
|
||||
let summary_blk = file.read_blk(0).await?;
|
||||
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
|
||||
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
|
||||
actual_summary.index_start_blk,
|
||||
|
||||
@@ -48,7 +48,7 @@ async fn read_delta_file(path: impl AsRef<Path>) -> Result<()> {
|
||||
virtual_file::init(10);
|
||||
page_cache::init(100);
|
||||
let file = FileBlockReader::new(VirtualFile::open(path)?);
|
||||
let summary_blk = file.read_blk(0)?;
|
||||
let summary_blk = file.read_blk(0).await?;
|
||||
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
|
||||
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
|
||||
actual_summary.index_start_blk,
|
||||
|
||||
@@ -581,6 +581,31 @@ fn start_pageserver(
|
||||
);
|
||||
}
|
||||
|
||||
task_mgr::spawn(
|
||||
BACKGROUND_RUNTIME.handle(),
|
||||
TaskKind::BackgroundRuntimeTurnaroundMeasure,
|
||||
None,
|
||||
None,
|
||||
"background runtime turnaround measure",
|
||||
true,
|
||||
async move {
|
||||
let server = hyper::Server::try_bind(&"0.0.0.0:2342".parse().unwrap()).expect("bind");
|
||||
let server = server
|
||||
.serve(hyper::service::make_service_fn(|_| async move {
|
||||
Ok::<_, std::convert::Infallible>(hyper::service::service_fn(
|
||||
move |_: hyper::Request<hyper::Body>| async move {
|
||||
Ok::<_, std::convert::Infallible>(hyper::Response::new(
|
||||
hyper::Body::from(format!("alive")),
|
||||
))
|
||||
},
|
||||
))
|
||||
}))
|
||||
.with_graceful_shutdown(task_mgr::shutdown_watcher());
|
||||
server.await?;
|
||||
Ok(())
|
||||
},
|
||||
);
|
||||
|
||||
let mut shutdown_pageserver = Some(shutdown_pageserver.drop_guard());
|
||||
|
||||
// All started up! Now just sit and wait for shutdown signal.
|
||||
|
||||
@@ -75,10 +75,7 @@
|
||||
use std::{
|
||||
collections::{hash_map::Entry, HashMap},
|
||||
convert::TryInto,
|
||||
sync::{
|
||||
atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering},
|
||||
RwLock, RwLockReadGuard, RwLockWriteGuard, TryLockError,
|
||||
},
|
||||
sync::atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering},
|
||||
};
|
||||
|
||||
use anyhow::Context;
|
||||
@@ -162,7 +159,7 @@ struct Version {
|
||||
}
|
||||
|
||||
struct Slot {
|
||||
inner: RwLock<SlotInner>,
|
||||
inner: tokio::sync::RwLock<SlotInner>,
|
||||
usage_count: AtomicU8,
|
||||
}
|
||||
|
||||
@@ -203,6 +200,11 @@ impl Slot {
|
||||
Err(usage_count) => usage_count,
|
||||
}
|
||||
}
|
||||
|
||||
/// Sets the usage count to a specific value.
|
||||
fn set_usage_count(&self, count: u8) {
|
||||
self.usage_count.store(count, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
pub struct PageCache {
|
||||
@@ -215,9 +217,9 @@ pub struct PageCache {
|
||||
///
|
||||
/// If you add support for caching different kinds of objects, each object kind
|
||||
/// can have a separate mapping map, next to this field.
|
||||
materialized_page_map: RwLock<HashMap<MaterializedPageHashKey, Vec<Version>>>,
|
||||
materialized_page_map: std::sync::RwLock<HashMap<MaterializedPageHashKey, Vec<Version>>>,
|
||||
|
||||
immutable_page_map: RwLock<HashMap<(FileId, u32), usize>>,
|
||||
immutable_page_map: std::sync::RwLock<HashMap<(FileId, u32), usize>>,
|
||||
|
||||
/// The actual buffers with their metadata.
|
||||
slots: Box<[Slot]>,
|
||||
@@ -233,7 +235,7 @@ pub struct PageCache {
|
||||
/// PageReadGuard is a "lease" on a buffer, for reading. The page is kept locked
|
||||
/// until the guard is dropped.
|
||||
///
|
||||
pub struct PageReadGuard<'i>(RwLockReadGuard<'i, SlotInner>);
|
||||
pub struct PageReadGuard<'i>(tokio::sync::RwLockReadGuard<'i, SlotInner>);
|
||||
|
||||
impl std::ops::Deref for PageReadGuard<'_> {
|
||||
type Target = [u8; PAGE_SZ];
|
||||
@@ -260,9 +262,10 @@ impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
|
||||
/// to initialize.
|
||||
///
|
||||
pub struct PageWriteGuard<'i> {
|
||||
inner: RwLockWriteGuard<'i, SlotInner>,
|
||||
inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>,
|
||||
|
||||
// Are the page contents currently valid?
|
||||
// Used to mark pages as invalid that are assigned but not yet filled with data.
|
||||
valid: bool,
|
||||
}
|
||||
|
||||
@@ -337,7 +340,7 @@ impl PageCache {
|
||||
/// The 'lsn' is an upper bound, this will return the latest version of
|
||||
/// the given block, but not newer than 'lsn'. Returns the actual LSN of the
|
||||
/// returned page.
|
||||
pub fn lookup_materialized_page(
|
||||
pub async fn lookup_materialized_page(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
@@ -357,7 +360,7 @@ impl PageCache {
|
||||
lsn,
|
||||
};
|
||||
|
||||
if let Some(guard) = self.try_lock_for_read(&mut cache_key) {
|
||||
if let Some(guard) = self.try_lock_for_read(&mut cache_key).await {
|
||||
if let CacheKey::MaterializedPage {
|
||||
hash_key: _,
|
||||
lsn: available_lsn,
|
||||
@@ -384,7 +387,7 @@ impl PageCache {
|
||||
///
|
||||
/// Store an image of the given page in the cache.
|
||||
///
|
||||
pub fn memorize_materialized_page(
|
||||
pub async fn memorize_materialized_page(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
@@ -401,7 +404,7 @@ impl PageCache {
|
||||
lsn,
|
||||
};
|
||||
|
||||
match self.lock_for_write(&cache_key)? {
|
||||
match self.lock_for_write(&cache_key).await? {
|
||||
WriteBufResult::Found(write_guard) => {
|
||||
// We already had it in cache. Another thread must've put it there
|
||||
// concurrently. Check that it had the same contents that we
|
||||
@@ -419,31 +422,14 @@ impl PageCache {
|
||||
|
||||
// Section 1.2: Public interface functions for working with immutable file pages.
|
||||
|
||||
pub fn read_immutable_buf(&self, file_id: FileId, blkno: u32) -> anyhow::Result<ReadBufResult> {
|
||||
pub async fn read_immutable_buf(
|
||||
&self,
|
||||
file_id: FileId,
|
||||
blkno: u32,
|
||||
) -> anyhow::Result<ReadBufResult> {
|
||||
let mut cache_key = CacheKey::ImmutableFilePage { file_id, blkno };
|
||||
|
||||
self.lock_for_read(&mut cache_key)
|
||||
}
|
||||
|
||||
/// Immediately drop all buffers belonging to given file
|
||||
pub fn drop_buffers_for_immutable(&self, drop_file_id: FileId) {
|
||||
for slot_idx in 0..self.slots.len() {
|
||||
let slot = &self.slots[slot_idx];
|
||||
|
||||
let mut inner = slot.inner.write().unwrap();
|
||||
if let Some(key) = &inner.key {
|
||||
match key {
|
||||
CacheKey::ImmutableFilePage { file_id, blkno: _ }
|
||||
if *file_id == drop_file_id =>
|
||||
{
|
||||
// remove mapping for old buffer
|
||||
self.remove_mapping(key);
|
||||
inner.key = None;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
self.lock_for_read(&mut cache_key).await
|
||||
}
|
||||
|
||||
//
|
||||
@@ -463,14 +449,14 @@ impl PageCache {
|
||||
///
|
||||
/// If no page is found, returns None and *cache_key is left unmodified.
|
||||
///
|
||||
fn try_lock_for_read(&self, cache_key: &mut CacheKey) -> Option<PageReadGuard> {
|
||||
async fn try_lock_for_read(&self, cache_key: &mut CacheKey) -> Option<PageReadGuard> {
|
||||
let cache_key_orig = cache_key.clone();
|
||||
if let Some(slot_idx) = self.search_mapping(cache_key) {
|
||||
// The page was found in the mapping. Lock the slot, and re-check
|
||||
// that it's still what we expected (because we released the mapping
|
||||
// lock already, another thread could have evicted the page)
|
||||
let slot = &self.slots[slot_idx];
|
||||
let inner = slot.inner.read().unwrap();
|
||||
let inner = slot.inner.read().await;
|
||||
if inner.key.as_ref() == Some(cache_key) {
|
||||
slot.inc_usage_count();
|
||||
return Some(PageReadGuard(inner));
|
||||
@@ -511,7 +497,7 @@ impl PageCache {
|
||||
/// }
|
||||
/// ```
|
||||
///
|
||||
fn lock_for_read(&self, cache_key: &mut CacheKey) -> anyhow::Result<ReadBufResult> {
|
||||
async fn lock_for_read(&self, cache_key: &mut CacheKey) -> anyhow::Result<ReadBufResult> {
|
||||
let (read_access, hit) = match cache_key {
|
||||
CacheKey::MaterializedPage { .. } => {
|
||||
unreachable!("Materialized pages use lookup_materialized_page")
|
||||
@@ -526,7 +512,7 @@ impl PageCache {
|
||||
let mut is_first_iteration = true;
|
||||
loop {
|
||||
// First check if the key already exists in the cache.
|
||||
if let Some(read_guard) = self.try_lock_for_read(cache_key) {
|
||||
if let Some(read_guard) = self.try_lock_for_read(cache_key).await {
|
||||
if is_first_iteration {
|
||||
hit.inc();
|
||||
}
|
||||
@@ -556,7 +542,7 @@ impl PageCache {
|
||||
// Make the slot ready
|
||||
let slot = &self.slots[slot_idx];
|
||||
inner.key = Some(cache_key.clone());
|
||||
slot.usage_count.store(1, Ordering::Relaxed);
|
||||
slot.set_usage_count(1);
|
||||
|
||||
return Ok(ReadBufResult::NotFound(PageWriteGuard {
|
||||
inner,
|
||||
@@ -569,13 +555,13 @@ impl PageCache {
|
||||
/// found, returns None.
|
||||
///
|
||||
/// When locking a page for writing, the search criteria is always "exact".
|
||||
fn try_lock_for_write(&self, cache_key: &CacheKey) -> Option<PageWriteGuard> {
|
||||
async fn try_lock_for_write(&self, cache_key: &CacheKey) -> Option<PageWriteGuard> {
|
||||
if let Some(slot_idx) = self.search_mapping_for_write(cache_key) {
|
||||
// The page was found in the mapping. Lock the slot, and re-check
|
||||
// that it's still what we expected (because we don't released the mapping
|
||||
// lock already, another thread could have evicted the page)
|
||||
let slot = &self.slots[slot_idx];
|
||||
let inner = slot.inner.write().unwrap();
|
||||
let inner = slot.inner.write().await;
|
||||
if inner.key.as_ref() == Some(cache_key) {
|
||||
slot.inc_usage_count();
|
||||
return Some(PageWriteGuard { inner, valid: true });
|
||||
@@ -588,10 +574,10 @@ impl PageCache {
|
||||
///
|
||||
/// Similar to lock_for_read(), but the returned buffer is write-locked and
|
||||
/// may be modified by the caller even if it's already found in the cache.
|
||||
fn lock_for_write(&self, cache_key: &CacheKey) -> anyhow::Result<WriteBufResult> {
|
||||
async fn lock_for_write(&self, cache_key: &CacheKey) -> anyhow::Result<WriteBufResult> {
|
||||
loop {
|
||||
// First check if the key already exists in the cache.
|
||||
if let Some(write_guard) = self.try_lock_for_write(cache_key) {
|
||||
if let Some(write_guard) = self.try_lock_for_write(cache_key).await {
|
||||
return Ok(WriteBufResult::Found(write_guard));
|
||||
}
|
||||
|
||||
@@ -617,7 +603,7 @@ impl PageCache {
|
||||
// Make the slot ready
|
||||
let slot = &self.slots[slot_idx];
|
||||
inner.key = Some(cache_key.clone());
|
||||
slot.usage_count.store(1, Ordering::Relaxed);
|
||||
slot.set_usage_count(1);
|
||||
|
||||
return Ok(WriteBufResult::NotFound(PageWriteGuard {
|
||||
inner,
|
||||
@@ -772,7 +758,7 @@ impl PageCache {
|
||||
/// Find a slot to evict.
|
||||
///
|
||||
/// On return, the slot is empty and write-locked.
|
||||
fn find_victim(&self) -> anyhow::Result<(usize, RwLockWriteGuard<SlotInner>)> {
|
||||
fn find_victim(&self) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard<SlotInner>)> {
|
||||
let iter_limit = self.slots.len() * 10;
|
||||
let mut iters = 0;
|
||||
loop {
|
||||
@@ -784,10 +770,7 @@ impl PageCache {
|
||||
if slot.dec_usage_count() == 0 {
|
||||
let mut inner = match slot.inner.try_write() {
|
||||
Ok(inner) => inner,
|
||||
Err(TryLockError::Poisoned(err)) => {
|
||||
anyhow::bail!("buffer lock was poisoned: {err:?}")
|
||||
}
|
||||
Err(TryLockError::WouldBlock) => {
|
||||
Err(_err) => {
|
||||
// If we have looped through the whole buffer pool 10 times
|
||||
// and still haven't found a victim buffer, something's wrong.
|
||||
// Maybe all the buffers were in locked. That could happen in
|
||||
@@ -816,6 +799,8 @@ impl PageCache {
|
||||
fn new(num_pages: usize) -> Self {
|
||||
assert!(num_pages > 0, "page cache size must be > 0");
|
||||
|
||||
// We use Box::leak here and into_boxed_slice to avoid leaking uninitialized
|
||||
// memory that Vec's might contain.
|
||||
let page_buffer = Box::leak(vec![0u8; num_pages * PAGE_SZ].into_boxed_slice());
|
||||
|
||||
let size_metrics = &crate::metrics::PAGE_CACHE_SIZE;
|
||||
@@ -829,7 +814,7 @@ impl PageCache {
|
||||
let buf: &mut [u8; PAGE_SZ] = chunk.try_into().unwrap();
|
||||
|
||||
Slot {
|
||||
inner: RwLock::new(SlotInner { key: None, buf }),
|
||||
inner: tokio::sync::RwLock::new(SlotInner { key: None, buf }),
|
||||
usage_count: AtomicU8::new(0),
|
||||
}
|
||||
})
|
||||
|
||||
@@ -292,6 +292,8 @@ pub enum TaskKind {
|
||||
|
||||
DebugTool,
|
||||
|
||||
BackgroundRuntimeTurnaroundMeasure,
|
||||
|
||||
#[cfg(test)]
|
||||
UnitTest,
|
||||
}
|
||||
|
||||
@@ -698,10 +698,7 @@ impl Tenant {
|
||||
debug!("successfully downloaded index part for timeline {timeline_id}");
|
||||
match index_part {
|
||||
MaybeDeletedIndexPart::IndexPart(index_part) => {
|
||||
timeline_ancestors.insert(
|
||||
timeline_id,
|
||||
index_part.parse_metadata().context("parse_metadata")?,
|
||||
);
|
||||
timeline_ancestors.insert(timeline_id, index_part.metadata.clone());
|
||||
remote_index_and_client.insert(timeline_id, (index_part, client));
|
||||
}
|
||||
MaybeDeletedIndexPart::Deleted(index_part) => {
|
||||
@@ -752,7 +749,7 @@ impl Tenant {
|
||||
DeleteTimelineFlow::resume_deletion(
|
||||
Arc::clone(self),
|
||||
timeline_id,
|
||||
&index_part.parse_metadata().context("parse_metadata")?,
|
||||
&index_part.metadata,
|
||||
Some(remote_timeline_client),
|
||||
None,
|
||||
)
|
||||
@@ -1314,10 +1311,7 @@ impl Tenant {
|
||||
}
|
||||
};
|
||||
|
||||
let remote_metadata = index_part
|
||||
.parse_metadata()
|
||||
.context("parse_metadata")
|
||||
.map_err(LoadLocalTimelineError::Load)?;
|
||||
let remote_metadata = index_part.metadata.clone();
|
||||
(
|
||||
Some(RemoteStartupData {
|
||||
index_part,
|
||||
@@ -4107,7 +4101,7 @@ mod tests {
|
||||
let mut found_error_message = false;
|
||||
let mut err_source = err.source();
|
||||
while let Some(source) = err_source {
|
||||
if source.to_string() == "metadata checksum mismatch" {
|
||||
if source.to_string().contains("metadata checksum mismatch") {
|
||||
found_error_message = true;
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -33,7 +33,7 @@ impl<'a> BlockCursor<'a> {
|
||||
let mut blknum = (offset / PAGE_SZ as u64) as u32;
|
||||
let mut off = (offset % PAGE_SZ as u64) as usize;
|
||||
|
||||
let mut buf = self.read_blk(blknum)?;
|
||||
let mut buf = self.read_blk(blknum).await?;
|
||||
|
||||
// peek at the first byte, to determine if it's a 1- or 4-byte length
|
||||
let first_len_byte = buf[off];
|
||||
@@ -49,7 +49,7 @@ impl<'a> BlockCursor<'a> {
|
||||
// it is split across two pages
|
||||
len_buf[..thislen].copy_from_slice(&buf[off..PAGE_SZ]);
|
||||
blknum += 1;
|
||||
buf = self.read_blk(blknum)?;
|
||||
buf = self.read_blk(blknum).await?;
|
||||
len_buf[thislen..].copy_from_slice(&buf[0..4 - thislen]);
|
||||
off = 4 - thislen;
|
||||
} else {
|
||||
@@ -70,7 +70,7 @@ impl<'a> BlockCursor<'a> {
|
||||
if page_remain == 0 {
|
||||
// continue on next page
|
||||
blknum += 1;
|
||||
buf = self.read_blk(blknum)?;
|
||||
buf = self.read_blk(blknum).await?;
|
||||
off = 0;
|
||||
page_remain = PAGE_SZ;
|
||||
}
|
||||
|
||||
@@ -39,7 +39,7 @@ pub enum BlockLease<'a> {
|
||||
PageReadGuard(PageReadGuard<'static>),
|
||||
EphemeralFileMutableTail(&'a [u8; PAGE_SZ]),
|
||||
#[cfg(test)]
|
||||
Rc(std::rc::Rc<[u8; PAGE_SZ]>),
|
||||
Arc(std::sync::Arc<[u8; PAGE_SZ]>),
|
||||
}
|
||||
|
||||
impl From<PageReadGuard<'static>> for BlockLease<'static> {
|
||||
@@ -49,9 +49,9 @@ impl From<PageReadGuard<'static>> for BlockLease<'static> {
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
impl<'a> From<std::rc::Rc<[u8; PAGE_SZ]>> for BlockLease<'a> {
|
||||
fn from(value: std::rc::Rc<[u8; PAGE_SZ]>) -> Self {
|
||||
BlockLease::Rc(value)
|
||||
impl<'a> From<std::sync::Arc<[u8; PAGE_SZ]>> for BlockLease<'a> {
|
||||
fn from(value: std::sync::Arc<[u8; PAGE_SZ]>) -> Self {
|
||||
BlockLease::Arc(value)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -63,7 +63,7 @@ impl<'a> Deref for BlockLease<'a> {
|
||||
BlockLease::PageReadGuard(v) => v.deref(),
|
||||
BlockLease::EphemeralFileMutableTail(v) => v,
|
||||
#[cfg(test)]
|
||||
BlockLease::Rc(v) => v.deref(),
|
||||
BlockLease::Arc(v) => v.deref(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -83,13 +83,13 @@ pub(crate) enum BlockReaderRef<'a> {
|
||||
|
||||
impl<'a> BlockReaderRef<'a> {
|
||||
#[inline(always)]
|
||||
fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
|
||||
async fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
|
||||
use BlockReaderRef::*;
|
||||
match self {
|
||||
FileBlockReaderVirtual(r) => r.read_blk(blknum),
|
||||
FileBlockReaderFile(r) => r.read_blk(blknum),
|
||||
EphemeralFile(r) => r.read_blk(blknum),
|
||||
Adapter(r) => r.read_blk(blknum),
|
||||
FileBlockReaderVirtual(r) => r.read_blk(blknum).await,
|
||||
FileBlockReaderFile(r) => r.read_blk(blknum).await,
|
||||
EphemeralFile(r) => r.read_blk(blknum).await,
|
||||
Adapter(r) => r.read_blk(blknum).await,
|
||||
#[cfg(test)]
|
||||
TestDisk(r) => r.read_blk(blknum),
|
||||
}
|
||||
@@ -134,8 +134,8 @@ impl<'a> BlockCursor<'a> {
|
||||
/// access to the contents of the page. (For the page cache, the
|
||||
/// lease object represents a lock on the buffer.)
|
||||
#[inline(always)]
|
||||
pub fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
|
||||
self.reader.read_blk(blknum)
|
||||
pub async fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
|
||||
self.reader.read_blk(blknum).await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -170,11 +170,12 @@ where
|
||||
/// Returns a "lease" object that can be used to
|
||||
/// access to the contents of the page. (For the page cache, the
|
||||
/// lease object represents a lock on the buffer.)
|
||||
pub fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
|
||||
pub async fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
|
||||
let cache = page_cache::get();
|
||||
loop {
|
||||
match cache
|
||||
.read_immutable_buf(self.file_id, blknum)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
|
||||
@@ -262,7 +262,7 @@ where
|
||||
let block_cursor = self.reader.block_cursor();
|
||||
while let Some((node_blknum, opt_iter)) = stack.pop() {
|
||||
// Locate the node.
|
||||
let node_buf = block_cursor.read_blk(self.start_blk + node_blknum)?;
|
||||
let node_buf = block_cursor.read_blk(self.start_blk + node_blknum).await?;
|
||||
|
||||
let node = OnDiskNode::deparse(node_buf.as_ref())?;
|
||||
let prefix_len = node.prefix_len as usize;
|
||||
@@ -357,7 +357,7 @@ where
|
||||
let block_cursor = self.reader.block_cursor();
|
||||
|
||||
while let Some((blknum, path, depth, child_idx, key_off)) = stack.pop() {
|
||||
let blk = block_cursor.read_blk(self.start_blk + blknum)?;
|
||||
let blk = block_cursor.read_blk(self.start_blk + blknum).await?;
|
||||
let buf: &[u8] = blk.as_ref();
|
||||
let node = OnDiskNode::<L>::deparse(buf)?;
|
||||
|
||||
@@ -704,7 +704,7 @@ pub(crate) mod tests {
|
||||
pub(crate) fn read_blk(&self, blknum: u32) -> io::Result<BlockLease> {
|
||||
let mut buf = [0u8; PAGE_SZ];
|
||||
buf.copy_from_slice(&self.blocks[blknum as usize]);
|
||||
Ok(std::rc::Rc::new(buf).into())
|
||||
Ok(std::sync::Arc::new(buf).into())
|
||||
}
|
||||
}
|
||||
impl BlockReader for TestDisk {
|
||||
|
||||
@@ -61,13 +61,14 @@ impl EphemeralFile {
|
||||
self.len
|
||||
}
|
||||
|
||||
pub(crate) fn read_blk(&self, blknum: u32) -> Result<BlockLease, io::Error> {
|
||||
pub(crate) async fn read_blk(&self, blknum: u32) -> Result<BlockLease, io::Error> {
|
||||
let flushed_blknums = 0..self.len / PAGE_SZ as u64;
|
||||
if flushed_blknums.contains(&(blknum as u64)) {
|
||||
let cache = page_cache::get();
|
||||
loop {
|
||||
match cache
|
||||
.read_immutable_buf(self.page_cache_file_id, blknum)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
@@ -135,10 +136,13 @@ impl EphemeralFile {
|
||||
// Pre-warm the page cache with what we just wrote.
|
||||
// This isn't necessary for coherency/correctness, but it's how we've always done it.
|
||||
let cache = page_cache::get();
|
||||
match cache.read_immutable_buf(
|
||||
self.ephemeral_file.page_cache_file_id,
|
||||
self.blknum,
|
||||
) {
|
||||
match cache
|
||||
.read_immutable_buf(
|
||||
self.ephemeral_file.page_cache_file_id,
|
||||
self.blknum,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(page_cache::ReadBufResult::Found(_guard)) => {
|
||||
// This function takes &mut self, so, it shouldn't be possible to reach this point.
|
||||
unreachable!("we just wrote blknum {} and this function takes &mut self, so, no concurrent read_blk is possible", self.blknum);
|
||||
@@ -221,9 +225,8 @@ pub fn is_ephemeral_file(filename: &str) -> bool {
|
||||
|
||||
impl Drop for EphemeralFile {
|
||||
fn drop(&mut self) {
|
||||
// drop all pages from page cache
|
||||
let cache = page_cache::get();
|
||||
cache.drop_buffers_for_immutable(self.page_cache_file_id);
|
||||
// There might still be pages in the [`crate::page_cache`] for this file.
|
||||
// We leave them there, [`crate::page_cache::PageCache::find_victim`] will evict them when needed.
|
||||
|
||||
// unlink the file
|
||||
let res = std::fs::remove_file(&self.file.path);
|
||||
|
||||
@@ -12,7 +12,7 @@ use std::fs::{File, OpenOptions};
|
||||
use std::io::{self, Write};
|
||||
|
||||
use anyhow::{bail, ensure, Context};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde::{de::Error, Deserialize, Serialize, Serializer};
|
||||
use thiserror::Error;
|
||||
use tracing::info_span;
|
||||
use utils::bin_ser::SerializeError;
|
||||
@@ -232,6 +232,28 @@ impl TimelineMetadata {
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de> Deserialize<'de> for TimelineMetadata {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
{
|
||||
let bytes = Vec::<u8>::deserialize(deserializer)?;
|
||||
Self::from_bytes(bytes.as_slice()).map_err(|e| D::Error::custom(format!("{e}")))
|
||||
}
|
||||
}
|
||||
|
||||
impl Serialize for TimelineMetadata {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
let bytes = self
|
||||
.to_bytes()
|
||||
.map_err(|e| serde::ser::Error::custom(format!("{e}")))?;
|
||||
bytes.serialize(serializer)
|
||||
}
|
||||
}
|
||||
|
||||
/// Save timeline metadata to file
|
||||
pub fn save_metadata(
|
||||
conf: &'static PageServerConf,
|
||||
|
||||
@@ -541,8 +541,7 @@ impl RemoteTimelineClient {
|
||||
// ahead of what's _actually_ on the remote during index upload.
|
||||
upload_queue.latest_metadata = metadata.clone();
|
||||
|
||||
let metadata_bytes = upload_queue.latest_metadata.to_bytes()?;
|
||||
self.schedule_index_upload(upload_queue, metadata_bytes);
|
||||
self.schedule_index_upload(upload_queue, upload_queue.latest_metadata.clone());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -562,8 +561,7 @@ impl RemoteTimelineClient {
|
||||
let upload_queue = guard.initialized_mut()?;
|
||||
|
||||
if upload_queue.latest_files_changes_since_metadata_upload_scheduled > 0 {
|
||||
let metadata_bytes = upload_queue.latest_metadata.to_bytes()?;
|
||||
self.schedule_index_upload(upload_queue, metadata_bytes);
|
||||
self.schedule_index_upload(upload_queue, upload_queue.latest_metadata.clone());
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -573,7 +571,7 @@ impl RemoteTimelineClient {
|
||||
fn schedule_index_upload(
|
||||
self: &Arc<Self>,
|
||||
upload_queue: &mut UploadQueueInitialized,
|
||||
metadata_bytes: Vec<u8>,
|
||||
metadata: TimelineMetadata,
|
||||
) {
|
||||
info!(
|
||||
"scheduling metadata upload with {} files ({} changed)",
|
||||
@@ -586,7 +584,7 @@ impl RemoteTimelineClient {
|
||||
let index_part = IndexPart::new(
|
||||
upload_queue.latest_files.clone(),
|
||||
disk_consistent_lsn,
|
||||
metadata_bytes,
|
||||
metadata,
|
||||
);
|
||||
let op = UploadOp::UploadMetadata(index_part, disk_consistent_lsn);
|
||||
self.calls_unfinished_metric_begin(&op);
|
||||
@@ -642,7 +640,7 @@ impl RemoteTimelineClient {
|
||||
|
||||
// Deleting layers doesn't affect the values stored in TimelineMetadata,
|
||||
// so we don't need update it. Just serialize it.
|
||||
let metadata_bytes = upload_queue.latest_metadata.to_bytes()?;
|
||||
let metadata = upload_queue.latest_metadata.clone();
|
||||
|
||||
// Update the remote index file, removing the to-be-deleted files from the index,
|
||||
// before deleting the actual files.
|
||||
@@ -653,12 +651,13 @@ impl RemoteTimelineClient {
|
||||
// to syntactically forbid ? or bail! calls here.
|
||||
let no_bail_here = || {
|
||||
for name in names {
|
||||
upload_queue.latest_files.remove(name);
|
||||
upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1;
|
||||
if upload_queue.latest_files.remove(name).is_some() {
|
||||
upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1;
|
||||
}
|
||||
}
|
||||
|
||||
if upload_queue.latest_files_changes_since_metadata_upload_scheduled > 0 {
|
||||
self.schedule_index_upload(upload_queue, metadata_bytes);
|
||||
self.schedule_index_upload(upload_queue, metadata);
|
||||
}
|
||||
|
||||
// schedule the actual deletions
|
||||
@@ -1610,8 +1609,7 @@ mod tests {
|
||||
&layer_file_name_2.file_name(),
|
||||
],
|
||||
);
|
||||
let downloaded_metadata = index_part.parse_metadata().unwrap();
|
||||
assert_eq!(downloaded_metadata, metadata);
|
||||
assert_eq!(index_part.metadata, metadata);
|
||||
|
||||
// Schedule upload and then a deletion. Check that the deletion is queued
|
||||
client
|
||||
|
||||
@@ -77,7 +77,9 @@ pub struct IndexPart {
|
||||
// private because internally we would read from metadata instead.
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
disk_consistent_lsn: Lsn,
|
||||
metadata_bytes: Vec<u8>,
|
||||
|
||||
#[serde(rename = "metadata_bytes")]
|
||||
pub metadata: TimelineMetadata,
|
||||
}
|
||||
|
||||
impl IndexPart {
|
||||
@@ -95,7 +97,7 @@ impl IndexPart {
|
||||
pub fn new(
|
||||
layers_and_metadata: HashMap<LayerFileName, LayerFileMetadata>,
|
||||
disk_consistent_lsn: Lsn,
|
||||
metadata_bytes: Vec<u8>,
|
||||
metadata: TimelineMetadata,
|
||||
) -> Self {
|
||||
let mut timeline_layers = HashSet::with_capacity(layers_and_metadata.len());
|
||||
let mut layer_metadata = HashMap::with_capacity(layers_and_metadata.len());
|
||||
@@ -111,14 +113,10 @@ impl IndexPart {
|
||||
timeline_layers,
|
||||
layer_metadata,
|
||||
disk_consistent_lsn,
|
||||
metadata_bytes,
|
||||
metadata,
|
||||
deleted_at: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn parse_metadata(&self) -> anyhow::Result<TimelineMetadata> {
|
||||
TimelineMetadata::from_bytes(&self.metadata_bytes)
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<&UploadQueueInitialized> for IndexPart {
|
||||
@@ -126,12 +124,12 @@ impl TryFrom<&UploadQueueInitialized> for IndexPart {
|
||||
|
||||
fn try_from(upload_queue: &UploadQueueInitialized) -> Result<Self, Self::Error> {
|
||||
let disk_consistent_lsn = upload_queue.latest_metadata.disk_consistent_lsn();
|
||||
let metadata_bytes = upload_queue.latest_metadata.to_bytes()?;
|
||||
let metadata = upload_queue.latest_metadata.clone();
|
||||
|
||||
Ok(Self::new(
|
||||
upload_queue.latest_files.clone(),
|
||||
disk_consistent_lsn,
|
||||
metadata_bytes,
|
||||
metadata,
|
||||
))
|
||||
}
|
||||
}
|
||||
@@ -182,7 +180,7 @@ mod tests {
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
metadata_bytes: [113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0].to_vec(),
|
||||
metadata: TimelineMetadata::from_bytes(&[113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]).unwrap(),
|
||||
deleted_at: None,
|
||||
};
|
||||
|
||||
@@ -201,7 +199,7 @@ mod tests {
|
||||
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51": { "file_size": 9007199254741001 }
|
||||
},
|
||||
"disk_consistent_lsn":"0/16960E8",
|
||||
"metadata_bytes":[112,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
|
||||
"metadata_bytes":[113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
|
||||
}"#;
|
||||
|
||||
let expected = IndexPart {
|
||||
@@ -219,7 +217,7 @@ mod tests {
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
metadata_bytes: [112,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0].to_vec(),
|
||||
metadata: TimelineMetadata::from_bytes(&[113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]).unwrap(),
|
||||
deleted_at: None,
|
||||
};
|
||||
|
||||
@@ -238,7 +236,7 @@ mod tests {
|
||||
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51": { "file_size": 9007199254741001 }
|
||||
},
|
||||
"disk_consistent_lsn":"0/16960E8",
|
||||
"metadata_bytes":[112,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],
|
||||
"metadata_bytes":[113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],
|
||||
"deleted_at": "2023-07-31T09:00:00.123"
|
||||
}"#;
|
||||
|
||||
@@ -257,7 +255,7 @@ mod tests {
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
metadata_bytes: [112,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0].to_vec(),
|
||||
metadata: TimelineMetadata::from_bytes(&[113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]).unwrap(),
|
||||
deleted_at: Some(chrono::NaiveDateTime::parse_from_str(
|
||||
"2023-07-31T09:00:00.123000000", "%Y-%m-%dT%H:%M:%S.%f").unwrap())
|
||||
};
|
||||
@@ -281,7 +279,7 @@ mod tests {
|
||||
timeline_layers: HashSet::new(),
|
||||
layer_metadata: HashMap::new(),
|
||||
disk_consistent_lsn: "0/2532648".parse::<Lsn>().unwrap(),
|
||||
metadata_bytes: [
|
||||
metadata: TimelineMetadata::from_bytes(&[
|
||||
136, 151, 49, 208, 0, 70, 0, 4, 0, 0, 0, 0, 2, 83, 38, 72, 1, 0, 0, 0, 0, 2, 83,
|
||||
38, 32, 1, 87, 198, 240, 135, 97, 119, 45, 125, 38, 29, 155, 161, 140, 141, 255,
|
||||
210, 0, 0, 0, 0, 2, 83, 38, 72, 0, 0, 0, 0, 1, 73, 240, 192, 0, 0, 0, 0, 1, 73,
|
||||
@@ -302,8 +300,8 @@ mod tests {
|
||||
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
|
||||
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
|
||||
0, 0,
|
||||
]
|
||||
.to_vec(),
|
||||
])
|
||||
.unwrap(),
|
||||
deleted_at: None,
|
||||
};
|
||||
|
||||
|
||||
@@ -467,7 +467,7 @@ impl DeltaLayer {
|
||||
PathOrConf::Path(_) => None,
|
||||
};
|
||||
|
||||
let loaded = DeltaLayerInner::load(&path, summary)?;
|
||||
let loaded = DeltaLayerInner::load(&path, summary).await?;
|
||||
|
||||
if let PathOrConf::Path(ref path) = self.path_or_conf {
|
||||
// not production code
|
||||
@@ -841,12 +841,15 @@ impl Drop for DeltaLayerWriter {
|
||||
}
|
||||
|
||||
impl DeltaLayerInner {
|
||||
pub(super) fn load(path: &std::path::Path, summary: Option<Summary>) -> anyhow::Result<Self> {
|
||||
pub(super) async fn load(
|
||||
path: &std::path::Path,
|
||||
summary: Option<Summary>,
|
||||
) -> anyhow::Result<Self> {
|
||||
let file = VirtualFile::open(path)
|
||||
.with_context(|| format!("Failed to open file '{}'", path.display()))?;
|
||||
let file = FileBlockReader::new(file);
|
||||
|
||||
let summary_blk = file.read_blk(0)?;
|
||||
let summary_blk = file.read_blk(0).await?;
|
||||
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
|
||||
|
||||
if let Some(mut expected_summary) = summary {
|
||||
@@ -854,11 +857,11 @@ impl DeltaLayerInner {
|
||||
expected_summary.index_start_blk = actual_summary.index_start_blk;
|
||||
expected_summary.index_root_blk = actual_summary.index_root_blk;
|
||||
if actual_summary != expected_summary {
|
||||
bail!(
|
||||
"in-file summary does not match expected summary. actual = {:?} expected = {:?}",
|
||||
actual_summary,
|
||||
expected_summary
|
||||
);
|
||||
// bail!(
|
||||
// "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
|
||||
// actual_summary,
|
||||
// expected_summary
|
||||
// );
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1028,7 +1031,7 @@ impl<'a> ValueRef<'a> {
|
||||
pub(crate) struct Adapter<T>(T);
|
||||
|
||||
impl<T: AsRef<DeltaLayerInner>> Adapter<T> {
|
||||
pub(crate) fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
|
||||
self.0.as_ref().file.read_blk(blknum)
|
||||
pub(crate) async fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
|
||||
self.0.as_ref().file.read_blk(blknum).await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -349,7 +349,8 @@ impl ImageLayer {
|
||||
PathOrConf::Path(_) => None,
|
||||
};
|
||||
|
||||
let loaded = ImageLayerInner::load(&path, self.desc.image_layer_lsn(), expected_summary)?;
|
||||
let loaded =
|
||||
ImageLayerInner::load(&path, self.desc.image_layer_lsn(), expected_summary).await?;
|
||||
|
||||
if let PathOrConf::Path(ref path) = self.path_or_conf {
|
||||
// not production code
|
||||
@@ -432,7 +433,7 @@ impl ImageLayer {
|
||||
}
|
||||
|
||||
impl ImageLayerInner {
|
||||
pub(super) fn load(
|
||||
pub(super) async fn load(
|
||||
path: &std::path::Path,
|
||||
lsn: Lsn,
|
||||
summary: Option<Summary>,
|
||||
@@ -440,7 +441,7 @@ impl ImageLayerInner {
|
||||
let file = VirtualFile::open(path)
|
||||
.with_context(|| format!("Failed to open file '{}'", path.display()))?;
|
||||
let file = FileBlockReader::new(file);
|
||||
let summary_blk = file.read_blk(0)?;
|
||||
let summary_blk = file.read_blk(0).await?;
|
||||
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
|
||||
|
||||
if let Some(mut expected_summary) = summary {
|
||||
@@ -449,11 +450,11 @@ impl ImageLayerInner {
|
||||
expected_summary.index_root_blk = actual_summary.index_root_blk;
|
||||
|
||||
if actual_summary != expected_summary {
|
||||
bail!(
|
||||
"in-file summary does not match expected summary. actual = {:?} expected = {:?}",
|
||||
actual_summary,
|
||||
expected_summary
|
||||
);
|
||||
// bail!(
|
||||
// "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
|
||||
// actual_summary,
|
||||
// expected_summary
|
||||
// );
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -465,7 +465,7 @@ impl Timeline {
|
||||
// The cached image can be returned directly if there is no WAL between the cached image
|
||||
// and requested LSN. The cached image can also be used to reduce the amount of WAL needed
|
||||
// for redo.
|
||||
let cached_page_img = match self.lookup_cached_page(&key, lsn) {
|
||||
let cached_page_img = match self.lookup_cached_page(&key, lsn).await {
|
||||
Some((cached_lsn, cached_img)) => {
|
||||
match cached_lsn.cmp(&lsn) {
|
||||
Ordering::Less => {} // there might be WAL between cached_lsn and lsn, we need to check
|
||||
@@ -494,6 +494,7 @@ impl Timeline {
|
||||
|
||||
RECONSTRUCT_TIME
|
||||
.observe_closure_duration(|| self.reconstruct_value(key, lsn, reconstruct_state))
|
||||
.await
|
||||
}
|
||||
|
||||
/// Get last or prev record separately. Same as get_last_record_rlsn().last/prev.
|
||||
@@ -630,38 +631,38 @@ impl Timeline {
|
||||
) -> anyhow::Result<()> {
|
||||
const ROUNDS: usize = 2;
|
||||
|
||||
static CONCURRENT_COMPACTIONS: once_cell::sync::Lazy<tokio::sync::Semaphore> =
|
||||
once_cell::sync::Lazy::new(|| {
|
||||
let total_threads = *task_mgr::BACKGROUND_RUNTIME_WORKER_THREADS;
|
||||
let permits = usize::max(
|
||||
1,
|
||||
// while a lot of the work is done on spawn_blocking, we still do
|
||||
// repartitioning in the async context. this should give leave us some workers
|
||||
// unblocked to be blocked on other work, hopefully easing any outside visible
|
||||
// effects of restarts.
|
||||
//
|
||||
// 6/8 is a guess; previously we ran with unlimited 8 and more from
|
||||
// spawn_blocking.
|
||||
(total_threads * 3).checked_div(4).unwrap_or(0),
|
||||
);
|
||||
assert_ne!(permits, 0, "we will not be adding in permits later");
|
||||
assert!(
|
||||
permits < total_threads,
|
||||
"need threads avail for shorter work"
|
||||
);
|
||||
tokio::sync::Semaphore::new(permits)
|
||||
});
|
||||
// static CONCURRENT_COMPACTIONS: once_cell::sync::Lazy<tokio::sync::Semaphore> =
|
||||
// once_cell::sync::Lazy::new(|| {
|
||||
// let total_threads = *task_mgr::BACKGROUND_RUNTIME_WORKER_THREADS;
|
||||
// let permits = usize::max(
|
||||
// 1,
|
||||
// // while a lot of the work is done on spawn_blocking, we still do
|
||||
// // repartitioning in the async context. this should give leave us some workers
|
||||
// // unblocked to be blocked on other work, hopefully easing any outside visible
|
||||
// // effects of restarts.
|
||||
// //
|
||||
// // 6/8 is a guess; previously we ran with unlimited 8 and more from
|
||||
// // spawn_blocking.
|
||||
// (total_threads * 3).checked_div(4).unwrap_or(0),
|
||||
// );
|
||||
// assert_ne!(permits, 0, "we will not be adding in permits later");
|
||||
// assert!(
|
||||
// permits < total_threads,
|
||||
// "need threads avail for shorter work"
|
||||
// );
|
||||
// tokio::sync::Semaphore::new(permits)
|
||||
// });
|
||||
|
||||
// this wait probably never needs any "long time spent" logging, because we already nag if
|
||||
// compaction task goes over it's period (20s) which is quite often in production.
|
||||
let _permit = tokio::select! {
|
||||
permit = CONCURRENT_COMPACTIONS.acquire() => {
|
||||
permit
|
||||
},
|
||||
_ = cancel.cancelled() => {
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
// // this wait probably never needs any "long time spent" logging, because we already nag if
|
||||
// // compaction task goes over it's period (20s) which is quite often in production.
|
||||
// let _permit = tokio::select! {
|
||||
// permit = CONCURRENT_COMPACTIONS.acquire() => {
|
||||
// permit
|
||||
// },
|
||||
// _ = cancel.cancelled() => {
|
||||
// return Ok(());
|
||||
// }
|
||||
// };
|
||||
|
||||
let last_record_lsn = self.get_last_record_lsn();
|
||||
|
||||
@@ -1614,7 +1615,7 @@ impl Timeline {
|
||||
let (conf, tenant_id, timeline_id) = (self.conf, self.tenant_id, self.timeline_id);
|
||||
let span = tracing::Span::current();
|
||||
|
||||
let (loaded_layers, needs_upload, total_physical_size) = tokio::task::spawn_blocking({
|
||||
let (loaded_layers, to_sync, total_physical_size) = tokio::task::spawn_blocking({
|
||||
move || {
|
||||
let _g = span.entered();
|
||||
let discovered = init::scan_timeline_dir(&timeline_path)?;
|
||||
@@ -1660,6 +1661,7 @@ impl Timeline {
|
||||
|
||||
let mut loaded_layers = Vec::new();
|
||||
let mut needs_upload = Vec::new();
|
||||
let mut needs_cleanup = Vec::new();
|
||||
let mut total_physical_size = 0;
|
||||
|
||||
for (name, decision) in decided {
|
||||
@@ -1675,14 +1677,10 @@ impl Timeline {
|
||||
Err(FutureLayer { local }) => {
|
||||
if local.is_some() {
|
||||
path.push(name.file_name());
|
||||
init::cleanup_future_layer(&path, name, disk_consistent_lsn)?;
|
||||
init::cleanup_future_layer(&path, &name, disk_consistent_lsn)?;
|
||||
path.pop();
|
||||
} else {
|
||||
// we cannot do anything for remote layers, but not continuing to
|
||||
// process it will leave it out index_part.json as well.
|
||||
}
|
||||
//
|
||||
// we do not currently schedule deletions for these.
|
||||
needs_cleanup.push(name);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
@@ -1736,7 +1734,11 @@ impl Timeline {
|
||||
|
||||
loaded_layers.push(layer);
|
||||
}
|
||||
Ok((loaded_layers, needs_upload, total_physical_size))
|
||||
Ok((
|
||||
loaded_layers,
|
||||
(needs_upload, needs_cleanup),
|
||||
total_physical_size,
|
||||
))
|
||||
}
|
||||
})
|
||||
.await
|
||||
@@ -1748,9 +1750,11 @@ impl Timeline {
|
||||
guard.initialize_local_layers(loaded_layers, disk_consistent_lsn + 1);
|
||||
|
||||
if let Some(rtc) = self.remote_client.as_ref() {
|
||||
let (needs_upload, needs_cleanup) = to_sync;
|
||||
for (layer, m) in needs_upload {
|
||||
rtc.schedule_layer_file_upload(&layer.layer_desc().filename(), &m)?;
|
||||
}
|
||||
rtc.schedule_layer_file_deletion(&needs_cleanup)?;
|
||||
rtc.schedule_index_upload_for_file_changes()?;
|
||||
// Tenant::create_timeline will wait for these uploads to happen before returning, or
|
||||
// on retry.
|
||||
@@ -2261,7 +2265,15 @@ impl Timeline {
|
||||
)));
|
||||
}
|
||||
}
|
||||
ancestor.wait_lsn(timeline.ancestor_lsn, ctx).await?;
|
||||
ancestor
|
||||
.wait_lsn(timeline.ancestor_lsn, ctx)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"wait for lsn {} on ancestor timeline_id={}",
|
||||
timeline.ancestor_lsn, ancestor.timeline_id
|
||||
)
|
||||
})?;
|
||||
|
||||
timeline_owned = ancestor;
|
||||
timeline = &*timeline_owned;
|
||||
@@ -2440,13 +2452,14 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
fn lookup_cached_page(&self, key: &Key, lsn: Lsn) -> Option<(Lsn, Bytes)> {
|
||||
async fn lookup_cached_page(&self, key: &Key, lsn: Lsn) -> Option<(Lsn, Bytes)> {
|
||||
let cache = page_cache::get();
|
||||
|
||||
// FIXME: It's pointless to check the cache for things that are not 8kB pages.
|
||||
// We should look at the key to determine if it's a cacheable object
|
||||
let (lsn, read_guard) =
|
||||
cache.lookup_materialized_page(self.tenant_id, self.timeline_id, key, lsn)?;
|
||||
let (lsn, read_guard) = cache
|
||||
.lookup_materialized_page(self.tenant_id, self.timeline_id, key, lsn)
|
||||
.await?;
|
||||
let img = Bytes::from(read_guard.to_vec());
|
||||
Some((lsn, img))
|
||||
}
|
||||
@@ -4128,7 +4141,7 @@ impl Timeline {
|
||||
///
|
||||
/// Reconstruct a value, using the given base image and WAL records in 'data'.
|
||||
///
|
||||
fn reconstruct_value(
|
||||
async fn reconstruct_value(
|
||||
&self,
|
||||
key: Key,
|
||||
request_lsn: Lsn,
|
||||
@@ -4197,6 +4210,7 @@ impl Timeline {
|
||||
last_rec_lsn,
|
||||
&img,
|
||||
)
|
||||
.await
|
||||
.context("Materialized page memoization failed")
|
||||
{
|
||||
return Err(PageReconstructError::from(e));
|
||||
|
||||
@@ -183,7 +183,7 @@ pub(super) fn cleanup_local_file_for_remote(
|
||||
|
||||
pub(super) fn cleanup_future_layer(
|
||||
path: &Path,
|
||||
name: LayerFileName,
|
||||
name: &LayerFileName,
|
||||
disk_consistent_lsn: Lsn,
|
||||
) -> anyhow::Result<()> {
|
||||
use LayerFileName::*;
|
||||
|
||||
@@ -31,10 +31,11 @@ use storage_broker::Streaming;
|
||||
use tokio::select;
|
||||
use tracing::*;
|
||||
|
||||
use postgres_connection::{parse_host_port, PgConnectionConfig};
|
||||
use postgres_connection::PgConnectionConfig;
|
||||
use utils::backoff::{
|
||||
exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS,
|
||||
};
|
||||
use utils::postgres_client::wal_stream_connection_config;
|
||||
use utils::{
|
||||
id::{NodeId, TenantTimelineId},
|
||||
lsn::Lsn,
|
||||
@@ -879,33 +880,6 @@ impl ReconnectReason {
|
||||
}
|
||||
}
|
||||
|
||||
fn wal_stream_connection_config(
|
||||
TenantTimelineId {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
}: TenantTimelineId,
|
||||
listen_pg_addr_str: &str,
|
||||
auth_token: Option<&str>,
|
||||
availability_zone: Option<&str>,
|
||||
) -> anyhow::Result<PgConnectionConfig> {
|
||||
let (host, port) =
|
||||
parse_host_port(listen_pg_addr_str).context("Unable to parse listen_pg_addr_str")?;
|
||||
let port = port.unwrap_or(5432);
|
||||
let mut connstr = PgConnectionConfig::new_host_port(host, port)
|
||||
.extend_options([
|
||||
"-c".to_owned(),
|
||||
format!("timeline_id={}", timeline_id),
|
||||
format!("tenant_id={}", tenant_id),
|
||||
])
|
||||
.set_password(auth_token.map(|s| s.to_owned()));
|
||||
|
||||
if let Some(availability_zone) = availability_zone {
|
||||
connstr = connstr.extend_options([format!("availability_zone={}", availability_zone)]);
|
||||
}
|
||||
|
||||
Ok(connstr)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
@@ -921,6 +895,7 @@ mod tests {
|
||||
timeline: SafekeeperTimelineInfo {
|
||||
safekeeper_id: 0,
|
||||
tenant_timeline_id: None,
|
||||
term: 0,
|
||||
last_log_term: 0,
|
||||
flush_lsn: 0,
|
||||
commit_lsn,
|
||||
@@ -929,6 +904,7 @@ mod tests {
|
||||
peer_horizon_lsn: 0,
|
||||
local_start_lsn: 0,
|
||||
safekeeper_connstr: safekeeper_connstr.to_owned(),
|
||||
http_connstr: safekeeper_connstr.to_owned(),
|
||||
availability_zone: None,
|
||||
},
|
||||
latest_update,
|
||||
|
||||
@@ -148,17 +148,16 @@ impl UploadQueue {
|
||||
);
|
||||
}
|
||||
|
||||
let index_part_metadata = index_part.parse_metadata()?;
|
||||
info!(
|
||||
"initializing upload queue with remote index_part.disk_consistent_lsn: {}",
|
||||
index_part_metadata.disk_consistent_lsn()
|
||||
index_part.metadata.disk_consistent_lsn()
|
||||
);
|
||||
|
||||
let state = UploadQueueInitialized {
|
||||
latest_files: files,
|
||||
latest_files_changes_since_metadata_upload_scheduled: 0,
|
||||
latest_metadata: index_part_metadata.clone(),
|
||||
last_uploaded_consistent_lsn: index_part_metadata.disk_consistent_lsn(),
|
||||
latest_metadata: index_part.metadata.clone(),
|
||||
last_uploaded_consistent_lsn: index_part.metadata.disk_consistent_lsn(),
|
||||
// what follows are boring default initializations
|
||||
task_counter: 0,
|
||||
num_inprogress_layer_uploads: 0,
|
||||
|
||||
@@ -341,21 +341,35 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
|
||||
|
||||
let (wal_backup_launcher_tx, wal_backup_launcher_rx) = mpsc::channel(100);
|
||||
|
||||
// Load all timelines from disk to memory.
|
||||
GlobalTimelines::init(conf.clone(), wal_backup_launcher_tx)?;
|
||||
|
||||
// Keep handles to main tasks to die if any of them disappears.
|
||||
let mut tasks_handles: FuturesUnordered<BoxFuture<(String, JoinTaskRes)>> =
|
||||
FuturesUnordered::new();
|
||||
|
||||
// Start wal backup launcher before loading timelines as we'll notify it
|
||||
// through the channel about timelines which need offloading, not draining
|
||||
// the channel would cause deadlock.
|
||||
let current_thread_rt = conf
|
||||
.current_thread_runtime
|
||||
.then(|| Handle::try_current().expect("no runtime in main"));
|
||||
let conf_ = conf.clone();
|
||||
let wal_backup_handle = current_thread_rt
|
||||
.as_ref()
|
||||
.unwrap_or_else(|| WAL_BACKUP_RUNTIME.handle())
|
||||
.spawn(wal_backup::wal_backup_launcher_task_main(
|
||||
conf_,
|
||||
wal_backup_launcher_rx,
|
||||
))
|
||||
.map(|res| ("WAL backup launcher".to_owned(), res));
|
||||
tasks_handles.push(Box::pin(wal_backup_handle));
|
||||
|
||||
// Load all timelines from disk to memory.
|
||||
GlobalTimelines::init(conf.clone(), wal_backup_launcher_tx).await?;
|
||||
|
||||
let conf_ = conf.clone();
|
||||
// Run everything in current thread rt, if asked.
|
||||
if conf.current_thread_runtime {
|
||||
info!("running in current thread runtime");
|
||||
}
|
||||
let current_thread_rt = conf
|
||||
.current_thread_runtime
|
||||
.then(|| Handle::try_current().expect("no runtime in main"));
|
||||
|
||||
let wal_service_handle = current_thread_rt
|
||||
.as_ref()
|
||||
@@ -408,17 +422,6 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
|
||||
.map(|res| ("WAL remover".to_owned(), res));
|
||||
tasks_handles.push(Box::pin(wal_remover_handle));
|
||||
|
||||
let conf_ = conf.clone();
|
||||
let wal_backup_handle = current_thread_rt
|
||||
.as_ref()
|
||||
.unwrap_or_else(|| WAL_BACKUP_RUNTIME.handle())
|
||||
.spawn(wal_backup::wal_backup_launcher_task_main(
|
||||
conf_,
|
||||
wal_backup_launcher_rx,
|
||||
))
|
||||
.map(|res| ("WAL backup launcher".to_owned(), res));
|
||||
tasks_handles.push(Box::pin(wal_backup_handle));
|
||||
|
||||
set_build_info_metric(GIT_VERSION);
|
||||
|
||||
// TODO: update tokio-stream, convert to real async Stream with
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
//! Code to deal with safekeeper control file upgrades
|
||||
use crate::safekeeper::{
|
||||
AcceptorState, PersistedPeers, PgUuid, SafeKeeperState, ServerInfo, Term, TermHistory,
|
||||
TermSwitchEntry,
|
||||
AcceptorState, PersistedPeers, PgUuid, SafeKeeperState, ServerInfo, Term, TermHistory, TermLsn,
|
||||
};
|
||||
use anyhow::{bail, Result};
|
||||
use pq_proto::SystemId;
|
||||
@@ -145,7 +144,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState>
|
||||
let oldstate = SafeKeeperStateV1::des(&buf[..buf.len()])?;
|
||||
let ac = AcceptorState {
|
||||
term: oldstate.acceptor_state.term,
|
||||
term_history: TermHistory(vec![TermSwitchEntry {
|
||||
term_history: TermHistory(vec![TermLsn {
|
||||
term: oldstate.acceptor_state.epoch,
|
||||
lsn: Lsn(0),
|
||||
}]),
|
||||
|
||||
@@ -19,6 +19,7 @@ use crate::receive_wal::WalReceiverState;
|
||||
use crate::safekeeper::ServerInfo;
|
||||
use crate::safekeeper::Term;
|
||||
use crate::send_wal::WalSenderState;
|
||||
use crate::timeline::PeerInfo;
|
||||
use crate::{debug_dump, pull_timeline};
|
||||
|
||||
use crate::timelines_global_map::TimelineDeleteForceResult;
|
||||
@@ -101,6 +102,7 @@ pub struct TimelineStatus {
|
||||
pub peer_horizon_lsn: Lsn,
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub remote_consistent_lsn: Lsn,
|
||||
pub peers: Vec<PeerInfo>,
|
||||
pub walsenders: Vec<WalSenderState>,
|
||||
pub walreceivers: Vec<WalReceiverState>,
|
||||
}
|
||||
@@ -140,6 +142,7 @@ async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body
|
||||
term_history,
|
||||
};
|
||||
|
||||
let conf = get_conf(&request);
|
||||
// Note: we report in memory values which can be lost.
|
||||
let status = TimelineStatus {
|
||||
tenant_id: ttid.tenant_id,
|
||||
@@ -153,6 +156,7 @@ async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body
|
||||
backup_lsn: inmem.backup_lsn,
|
||||
peer_horizon_lsn: inmem.peer_horizon_lsn,
|
||||
remote_consistent_lsn: tli.get_walsenders().get_remote_consistent_lsn(),
|
||||
peers: tli.get_peers(conf).await,
|
||||
walsenders: tli.get_walsenders().get_all(),
|
||||
walreceivers: tli.get_walreceivers().get_all(),
|
||||
};
|
||||
@@ -282,12 +286,14 @@ async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<B
|
||||
tenant_id: ttid.tenant_id.as_ref().to_owned(),
|
||||
timeline_id: ttid.timeline_id.as_ref().to_owned(),
|
||||
}),
|
||||
term: sk_info.term.unwrap_or(0),
|
||||
last_log_term: sk_info.last_log_term.unwrap_or(0),
|
||||
flush_lsn: sk_info.flush_lsn.0,
|
||||
commit_lsn: sk_info.commit_lsn.0,
|
||||
remote_consistent_lsn: sk_info.remote_consistent_lsn.0,
|
||||
peer_horizon_lsn: sk_info.peer_horizon_lsn.0,
|
||||
safekeeper_connstr: sk_info.safekeeper_connstr.unwrap_or_else(|| "".to_owned()),
|
||||
http_connstr: sk_info.http_connstr.unwrap_or_else(|| "".to_owned()),
|
||||
backup_lsn: sk_info.backup_lsn.0,
|
||||
local_start_lsn: sk_info.local_start_lsn.0,
|
||||
availability_zone: None,
|
||||
|
||||
@@ -21,7 +21,7 @@ use crate::safekeeper::{AcceptorProposerMessage, AppendResponse, ServerInfo};
|
||||
use crate::safekeeper::{
|
||||
AppendRequest, AppendRequestHeader, ProposerAcceptorMessage, ProposerElected,
|
||||
};
|
||||
use crate::safekeeper::{SafeKeeperState, Term, TermHistory, TermSwitchEntry};
|
||||
use crate::safekeeper::{SafeKeeperState, Term, TermHistory, TermLsn};
|
||||
use crate::timeline::Timeline;
|
||||
use crate::GlobalTimelines;
|
||||
use postgres_backend::PostgresBackend;
|
||||
@@ -119,7 +119,7 @@ async fn send_proposer_elected(tli: &Arc<Timeline>, term: Term, lsn: Lsn) -> any
|
||||
let history = tli.get_state().await.1.acceptor_state.term_history;
|
||||
let history = history.up_to(lsn.checked_sub(1u64).unwrap());
|
||||
let mut history_entries = history.0;
|
||||
history_entries.push(TermSwitchEntry { term, lsn });
|
||||
history_entries.push(TermLsn { term, lsn });
|
||||
let history = TermHistory(history_entries);
|
||||
|
||||
let proposer_elected_request = ProposerAcceptorMessage::Elected(ProposerElected {
|
||||
|
||||
@@ -19,6 +19,7 @@ pub mod json_ctrl;
|
||||
pub mod metrics;
|
||||
pub mod pull_timeline;
|
||||
pub mod receive_wal;
|
||||
pub mod recovery;
|
||||
pub mod remove_wal;
|
||||
pub mod safekeeper;
|
||||
pub mod send_wal;
|
||||
|
||||
@@ -227,7 +227,9 @@ async fn pull_timeline(status: TimelineStatus, host: String) -> Result<Response>
|
||||
tokio::fs::create_dir_all(conf.tenant_dir(&ttid.tenant_id)).await?;
|
||||
tokio::fs::rename(tli_dir_path, &timeline_path).await?;
|
||||
|
||||
let tli = GlobalTimelines::load_timeline(ttid).context("Failed to load timeline after copy")?;
|
||||
let tli = GlobalTimelines::load_timeline(ttid)
|
||||
.await
|
||||
.context("Failed to load timeline after copy")?;
|
||||
|
||||
info!(
|
||||
"Loaded timeline {}, flush_lsn={}",
|
||||
|
||||
40
safekeeper/src/recovery.rs
Normal file
40
safekeeper/src/recovery.rs
Normal file
@@ -0,0 +1,40 @@
|
||||
//! This module implements pulling WAL from peer safekeepers if compute can't
|
||||
//! provide it, i.e. safekeeper lags too much.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use tokio::{select, time::sleep, time::Duration};
|
||||
use tracing::{info, instrument};
|
||||
|
||||
use crate::{timeline::Timeline, SafeKeeperConf};
|
||||
|
||||
/// Entrypoint for per timeline task which always runs, checking whether
|
||||
/// recovery for this safekeeper is needed and starting it if so.
|
||||
#[instrument(name = "recovery task", skip_all, fields(ttid = %tli.ttid))]
|
||||
pub async fn recovery_main(tli: Arc<Timeline>, _conf: SafeKeeperConf) {
|
||||
info!("started");
|
||||
let mut cancellation_rx = match tli.get_cancellation_rx() {
|
||||
Ok(rx) => rx,
|
||||
Err(_) => {
|
||||
info!("timeline canceled during task start");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
select! {
|
||||
_ = recovery_main_loop(tli) => { unreachable!() }
|
||||
_ = cancellation_rx.changed() => {
|
||||
info!("stopped");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const CHECK_INTERVAL_MS: u64 = 2000;
|
||||
|
||||
/// Check regularly whether we need to start recovery.
|
||||
async fn recovery_main_loop(_tli: Arc<Timeline>) {
|
||||
let check_duration = Duration::from_millis(CHECK_INTERVAL_MS);
|
||||
loop {
|
||||
sleep(check_duration).await;
|
||||
}
|
||||
}
|
||||
@@ -34,22 +34,33 @@ pub const UNKNOWN_SERVER_VERSION: u32 = 0;
|
||||
|
||||
/// Consensus logical timestamp.
|
||||
pub type Term = u64;
|
||||
const INVALID_TERM: Term = 0;
|
||||
pub const INVALID_TERM: Term = 0;
|
||||
|
||||
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
|
||||
pub struct TermSwitchEntry {
|
||||
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
|
||||
pub struct TermLsn {
|
||||
pub term: Term,
|
||||
pub lsn: Lsn,
|
||||
}
|
||||
|
||||
// Creation from tuple provides less typing (e.g. for unit tests).
|
||||
impl From<(Term, Lsn)> for TermLsn {
|
||||
fn from(pair: (Term, Lsn)) -> TermLsn {
|
||||
TermLsn {
|
||||
term: pair.0,
|
||||
lsn: pair.1,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize)]
|
||||
pub struct TermHistory(pub Vec<TermSwitchEntry>);
|
||||
pub struct TermHistory(pub Vec<TermLsn>);
|
||||
|
||||
impl TermHistory {
|
||||
pub fn empty() -> TermHistory {
|
||||
TermHistory(Vec::new())
|
||||
}
|
||||
|
||||
// Parse TermHistory as n_entries followed by TermSwitchEntry pairs
|
||||
// Parse TermHistory as n_entries followed by TermLsn pairs
|
||||
pub fn from_bytes(bytes: &mut Bytes) -> Result<TermHistory> {
|
||||
if bytes.remaining() < 4 {
|
||||
bail!("TermHistory misses len");
|
||||
@@ -60,7 +71,7 @@ impl TermHistory {
|
||||
if bytes.remaining() < 16 {
|
||||
bail!("TermHistory is incomplete");
|
||||
}
|
||||
res.push(TermSwitchEntry {
|
||||
res.push(TermLsn {
|
||||
term: bytes.get_u64_le(),
|
||||
lsn: bytes.get_u64_le().into(),
|
||||
})
|
||||
@@ -557,12 +568,17 @@ where
|
||||
.up_to(self.flush_lsn())
|
||||
}
|
||||
|
||||
/// Get current term.
|
||||
pub fn get_term(&self) -> Term {
|
||||
self.state.acceptor_state.term
|
||||
}
|
||||
|
||||
pub fn get_epoch(&self) -> Term {
|
||||
self.state.acceptor_state.get_epoch(self.flush_lsn())
|
||||
}
|
||||
|
||||
/// wal_store wrapper avoiding commit_lsn <= flush_lsn violation when we don't have WAL yet.
|
||||
fn flush_lsn(&self) -> Lsn {
|
||||
pub fn flush_lsn(&self) -> Lsn {
|
||||
max(self.wal_store.flush_lsn(), self.state.timeline_start_lsn)
|
||||
}
|
||||
|
||||
@@ -1138,7 +1154,7 @@ mod tests {
|
||||
let pem = ProposerElected {
|
||||
term: 1,
|
||||
start_streaming_at: Lsn(1),
|
||||
term_history: TermHistory(vec![TermSwitchEntry {
|
||||
term_history: TermHistory(vec![TermLsn {
|
||||
term: 1,
|
||||
lsn: Lsn(3),
|
||||
}]),
|
||||
|
||||
@@ -2,12 +2,12 @@
|
||||
//! with the "START_REPLICATION" message, and registry of walsenders.
|
||||
|
||||
use crate::handler::SafekeeperPostgresHandler;
|
||||
use crate::safekeeper::Term;
|
||||
use crate::safekeeper::{Term, TermLsn};
|
||||
use crate::timeline::Timeline;
|
||||
use crate::wal_service::ConnectionId;
|
||||
use crate::wal_storage::WalReader;
|
||||
use crate::GlobalTimelines;
|
||||
use anyhow::Context as AnyhowContext;
|
||||
use anyhow::{bail, Context as AnyhowContext};
|
||||
use bytes::Bytes;
|
||||
use parking_lot::Mutex;
|
||||
use postgres_backend::PostgresBackend;
|
||||
@@ -390,26 +390,25 @@ impl SafekeeperPostgresHandler {
|
||||
self.appname.clone(),
|
||||
));
|
||||
|
||||
let commit_lsn_watch_rx = tli.get_commit_lsn_watch_rx();
|
||||
|
||||
// Walproposer gets special handling: safekeeper must give proposer all
|
||||
// local WAL till the end, whether committed or not (walproposer will
|
||||
// hang otherwise). That's because walproposer runs the consensus and
|
||||
// synchronizes safekeepers on the most advanced one.
|
||||
// Walsender can operate in one of two modes which we select by
|
||||
// application_name: give only committed WAL (used by pageserver) or all
|
||||
// existing WAL (up to flush_lsn, used by walproposer or peer recovery).
|
||||
// The second case is always driven by a consensus leader which term
|
||||
// must generally be also supplied. However we're sloppy to do this in
|
||||
// walproposer recovery which will be removed soon. So TODO is to make
|
||||
// it not Option'al then.
|
||||
//
|
||||
// There is a small risk of this WAL getting concurrently garbaged if
|
||||
// another compute rises which collects majority and starts fixing log
|
||||
// on this safekeeper itself. That's ok as (old) proposer will never be
|
||||
// able to commit such WAL.
|
||||
let stop_pos: Option<Lsn> = if self.is_walproposer_recovery() {
|
||||
let wal_end = tli.get_flush_lsn().await;
|
||||
Some(wal_end)
|
||||
// Fetching WAL without term in recovery creates a small risk of this
|
||||
// WAL getting concurrently garbaged if another compute rises which
|
||||
// collects majority and starts fixing log on this safekeeper itself.
|
||||
// That's ok as (old) proposer will never be able to commit such WAL.
|
||||
let end_watch = if self.is_walproposer_recovery() {
|
||||
EndWatch::Flush(tli.get_term_flush_lsn_watch_rx())
|
||||
} else {
|
||||
None
|
||||
EndWatch::Commit(tli.get_commit_lsn_watch_rx())
|
||||
};
|
||||
|
||||
// take the latest commit_lsn if don't have stop_pos
|
||||
let end_pos = stop_pos.unwrap_or(*commit_lsn_watch_rx.borrow());
|
||||
// we don't check term here; it will be checked on first waiting/WAL reading anyway.
|
||||
let end_pos = end_watch.get();
|
||||
|
||||
if end_pos < start_pos {
|
||||
warn!(
|
||||
@@ -419,8 +418,10 @@ impl SafekeeperPostgresHandler {
|
||||
}
|
||||
|
||||
info!(
|
||||
"starting streaming from {:?} till {:?}, available WAL ends at {}",
|
||||
start_pos, stop_pos, end_pos
|
||||
"starting streaming from {:?}, available WAL ends at {}, recovery={}",
|
||||
start_pos,
|
||||
end_pos,
|
||||
matches!(end_watch, EndWatch::Flush(_))
|
||||
);
|
||||
|
||||
// switch to copy
|
||||
@@ -445,9 +446,8 @@ impl SafekeeperPostgresHandler {
|
||||
appname,
|
||||
start_pos,
|
||||
end_pos,
|
||||
stop_pos,
|
||||
term,
|
||||
commit_lsn_watch_rx,
|
||||
end_watch,
|
||||
ws_guard: ws_guard.clone(),
|
||||
wal_reader,
|
||||
send_buf: [0; MAX_SEND_SIZE],
|
||||
@@ -466,6 +466,32 @@ impl SafekeeperPostgresHandler {
|
||||
}
|
||||
}
|
||||
|
||||
/// Walsender streams either up to commit_lsn (normally) or flush_lsn in the
|
||||
/// given term (recovery by walproposer or peer safekeeper).
|
||||
enum EndWatch {
|
||||
Commit(Receiver<Lsn>),
|
||||
Flush(Receiver<TermLsn>),
|
||||
}
|
||||
|
||||
impl EndWatch {
|
||||
/// Get current end of WAL.
|
||||
fn get(&self) -> Lsn {
|
||||
match self {
|
||||
EndWatch::Commit(r) => *r.borrow(),
|
||||
EndWatch::Flush(r) => r.borrow().lsn,
|
||||
}
|
||||
}
|
||||
|
||||
/// Wait for the update.
|
||||
async fn changed(&mut self) -> anyhow::Result<()> {
|
||||
match self {
|
||||
EndWatch::Commit(r) => r.changed().await?,
|
||||
EndWatch::Flush(r) => r.changed().await?,
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// A half driving sending WAL.
|
||||
struct WalSender<'a, IO> {
|
||||
pgb: &'a mut PostgresBackend<IO>,
|
||||
@@ -480,14 +506,12 @@ struct WalSender<'a, IO> {
|
||||
// We send this LSN to the receiver as wal_end, so that it knows how much
|
||||
// WAL this safekeeper has. This LSN should be as fresh as possible.
|
||||
end_pos: Lsn,
|
||||
// If present, terminate after reaching this position; used by walproposer
|
||||
// in recovery.
|
||||
stop_pos: Option<Lsn>,
|
||||
/// When streaming uncommitted part, the term the client acts as the leader
|
||||
/// in. Streaming is stopped if local term changes to a different (higher)
|
||||
/// value.
|
||||
term: Option<Term>,
|
||||
commit_lsn_watch_rx: Receiver<Lsn>,
|
||||
/// Watch channel receiver to learn end of available WAL (and wait for its advancement).
|
||||
end_watch: EndWatch,
|
||||
ws_guard: Arc<WalSenderGuard>,
|
||||
wal_reader: WalReader,
|
||||
// buffer for readling WAL into to send it
|
||||
@@ -497,29 +521,20 @@ struct WalSender<'a, IO> {
|
||||
impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
|
||||
/// Send WAL until
|
||||
/// - an error occurs
|
||||
/// - if we are streaming to walproposer, we've streamed until stop_pos
|
||||
/// (recovery finished)
|
||||
/// - receiver is caughtup and there is no computes
|
||||
/// - receiver is caughtup and there is no computes (if streaming up to commit_lsn)
|
||||
///
|
||||
/// Err(CopyStreamHandlerEnd) is always returned; Result is used only for ?
|
||||
/// convenience.
|
||||
async fn run(&mut self) -> Result<(), CopyStreamHandlerEnd> {
|
||||
loop {
|
||||
// If we are streaming to walproposer, check it is time to stop.
|
||||
if let Some(stop_pos) = self.stop_pos {
|
||||
if self.start_pos >= stop_pos {
|
||||
// recovery finished
|
||||
return Err(CopyStreamHandlerEnd::ServerInitiated(format!(
|
||||
"ending streaming to walproposer at {}, recovery finished",
|
||||
self.start_pos
|
||||
)));
|
||||
}
|
||||
} else {
|
||||
// Wait for the next portion if it is not there yet, or just
|
||||
// update our end of WAL available for sending value, we
|
||||
// communicate it to the receiver.
|
||||
self.wait_wal().await?;
|
||||
}
|
||||
// Wait for the next portion if it is not there yet, or just
|
||||
// update our end of WAL available for sending value, we
|
||||
// communicate it to the receiver.
|
||||
self.wait_wal().await?;
|
||||
assert!(
|
||||
self.end_pos > self.start_pos,
|
||||
"nothing to send after waiting for WAL"
|
||||
);
|
||||
|
||||
// try to send as much as available, capped by MAX_SEND_SIZE
|
||||
let mut send_size = self
|
||||
@@ -567,7 +582,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
|
||||
/// exit in the meanwhile
|
||||
async fn wait_wal(&mut self) -> Result<(), CopyStreamHandlerEnd> {
|
||||
loop {
|
||||
self.end_pos = *self.commit_lsn_watch_rx.borrow();
|
||||
self.end_pos = self.end_watch.get();
|
||||
if self.end_pos > self.start_pos {
|
||||
// We have something to send.
|
||||
trace!("got end_pos {:?}, streaming", self.end_pos);
|
||||
@@ -575,27 +590,31 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
|
||||
}
|
||||
|
||||
// Wait for WAL to appear, now self.end_pos == self.start_pos.
|
||||
if let Some(lsn) = wait_for_lsn(&mut self.commit_lsn_watch_rx, self.start_pos).await? {
|
||||
if let Some(lsn) = wait_for_lsn(&mut self.end_watch, self.term, self.start_pos).await? {
|
||||
self.end_pos = lsn;
|
||||
trace!("got end_pos {:?}, streaming", self.end_pos);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Timed out waiting for WAL, check for termination and send KA
|
||||
if let Some(remote_consistent_lsn) = self
|
||||
.ws_guard
|
||||
.walsenders
|
||||
.get_ws_remote_consistent_lsn(self.ws_guard.id)
|
||||
{
|
||||
if self.tli.should_walsender_stop(remote_consistent_lsn).await {
|
||||
// Terminate if there is nothing more to send.
|
||||
// Note that "ending streaming" part of the string is used by
|
||||
// pageserver to identify WalReceiverError::SuccessfulCompletion,
|
||||
// do not change this string without updating pageserver.
|
||||
return Err(CopyStreamHandlerEnd::ServerInitiated(format!(
|
||||
// Timed out waiting for WAL, check for termination and send KA.
|
||||
// Check for termination only if we are streaming up to commit_lsn
|
||||
// (to pageserver).
|
||||
if let EndWatch::Commit(_) = self.end_watch {
|
||||
if let Some(remote_consistent_lsn) = self
|
||||
.ws_guard
|
||||
.walsenders
|
||||
.get_ws_remote_consistent_lsn(self.ws_guard.id)
|
||||
{
|
||||
if self.tli.should_walsender_stop(remote_consistent_lsn).await {
|
||||
// Terminate if there is nothing more to send.
|
||||
// Note that "ending streaming" part of the string is used by
|
||||
// pageserver to identify WalReceiverError::SuccessfulCompletion,
|
||||
// do not change this string without updating pageserver.
|
||||
return Err(CopyStreamHandlerEnd::ServerInitiated(format!(
|
||||
"ending streaming to {:?} at {}, receiver is caughtup and there is no computes",
|
||||
self.appname, self.start_pos,
|
||||
)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -663,22 +682,32 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> ReplyReader<IO> {
|
||||
|
||||
const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1);
|
||||
|
||||
/// Wait until we have commit_lsn > lsn or timeout expires. Returns
|
||||
/// - Ok(Some(commit_lsn)) if needed lsn is successfully observed;
|
||||
/// Wait until we have available WAL > start_pos or timeout expires. Returns
|
||||
/// - Ok(Some(end_pos)) if needed lsn is successfully observed;
|
||||
/// - Ok(None) if timeout expired;
|
||||
/// - Err in case of error (if watch channel is in trouble, shouldn't happen).
|
||||
async fn wait_for_lsn(rx: &mut Receiver<Lsn>, lsn: Lsn) -> anyhow::Result<Option<Lsn>> {
|
||||
/// - Err in case of error -- only if 1) term changed while fetching in recovery
|
||||
/// mode 2) watch channel closed, which must never happen.
|
||||
async fn wait_for_lsn(
|
||||
rx: &mut EndWatch,
|
||||
client_term: Option<Term>,
|
||||
start_pos: Lsn,
|
||||
) -> anyhow::Result<Option<Lsn>> {
|
||||
let res = timeout(POLL_STATE_TIMEOUT, async move {
|
||||
let mut commit_lsn;
|
||||
loop {
|
||||
rx.changed().await?;
|
||||
commit_lsn = *rx.borrow();
|
||||
if commit_lsn > lsn {
|
||||
break;
|
||||
let end_pos = rx.get();
|
||||
if end_pos > start_pos {
|
||||
return Ok(end_pos);
|
||||
}
|
||||
if let EndWatch::Flush(rx) = rx {
|
||||
let curr_term = rx.borrow().term;
|
||||
if let Some(client_term) = client_term {
|
||||
if curr_term != client_term {
|
||||
bail!("term changed: requested {}, now {}", client_term, curr_term);
|
||||
}
|
||||
}
|
||||
}
|
||||
rx.changed().await?;
|
||||
}
|
||||
|
||||
Ok(commit_lsn)
|
||||
})
|
||||
.await;
|
||||
|
||||
|
||||
@@ -3,8 +3,11 @@
|
||||
|
||||
use anyhow::{anyhow, bail, Result};
|
||||
use postgres_ffi::XLogSegNo;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_with::serde_as;
|
||||
use tokio::fs;
|
||||
|
||||
use serde_with::DisplayFromStr;
|
||||
use std::cmp::max;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
@@ -24,9 +27,10 @@ use storage_broker::proto::SafekeeperTimelineInfo;
|
||||
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
|
||||
|
||||
use crate::receive_wal::WalReceivers;
|
||||
use crate::recovery::recovery_main;
|
||||
use crate::safekeeper::{
|
||||
AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState,
|
||||
SafekeeperMemState, ServerInfo, Term,
|
||||
SafekeeperMemState, ServerInfo, Term, TermLsn, INVALID_TERM,
|
||||
};
|
||||
use crate::send_wal::WalSenders;
|
||||
use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION};
|
||||
@@ -37,18 +41,25 @@ use crate::SafeKeeperConf;
|
||||
use crate::{debug_dump, wal_storage};
|
||||
|
||||
/// Things safekeeper should know about timeline state on peers.
|
||||
#[derive(Debug, Clone)]
|
||||
#[serde_as]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct PeerInfo {
|
||||
pub sk_id: NodeId,
|
||||
/// Term of the last entry.
|
||||
_last_log_term: Term,
|
||||
/// LSN of the last record.
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
_flush_lsn: Lsn,
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub commit_lsn: Lsn,
|
||||
/// Since which LSN safekeeper has WAL. TODO: remove this once we fill new
|
||||
/// sk since backup_lsn.
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub local_start_lsn: Lsn,
|
||||
/// When info was received.
|
||||
/// When info was received. Serde annotations are not very useful but make
|
||||
/// the code compile -- we don't rely on this field externally.
|
||||
#[serde(skip)]
|
||||
#[serde(default = "Instant::now")]
|
||||
ts: Instant,
|
||||
}
|
||||
|
||||
@@ -237,8 +248,9 @@ impl SharedState {
|
||||
tenant_id: ttid.tenant_id.as_ref().to_owned(),
|
||||
timeline_id: ttid.timeline_id.as_ref().to_owned(),
|
||||
}),
|
||||
term: self.sk.state.acceptor_state.term,
|
||||
last_log_term: self.sk.get_epoch(),
|
||||
flush_lsn: self.sk.wal_store.flush_lsn().0,
|
||||
flush_lsn: self.sk.flush_lsn().0,
|
||||
// note: this value is not flushed to control file yet and can be lost
|
||||
commit_lsn: self.sk.inmem.commit_lsn.0,
|
||||
remote_consistent_lsn: remote_consistent_lsn.0,
|
||||
@@ -247,6 +259,7 @@ impl SharedState {
|
||||
.advertise_pg_addr
|
||||
.to_owned()
|
||||
.unwrap_or(conf.listen_pg_addr.clone()),
|
||||
http_connstr: conf.listen_http_addr.to_owned(),
|
||||
backup_lsn: self.sk.inmem.backup_lsn.0,
|
||||
local_start_lsn: self.sk.state.local_start_lsn.0,
|
||||
availability_zone: conf.availability_zone.clone(),
|
||||
@@ -296,6 +309,13 @@ pub struct Timeline {
|
||||
commit_lsn_watch_tx: watch::Sender<Lsn>,
|
||||
commit_lsn_watch_rx: watch::Receiver<Lsn>,
|
||||
|
||||
/// Broadcasts (current term, flush_lsn) updates, walsender is interested in
|
||||
/// them when sending in recovery mode (to walproposer or peers). Note: this
|
||||
/// is just a notification, WAL reading should always done with lock held as
|
||||
/// term can change otherwise.
|
||||
term_flush_lsn_watch_tx: watch::Sender<TermLsn>,
|
||||
term_flush_lsn_watch_rx: watch::Receiver<TermLsn>,
|
||||
|
||||
/// Safekeeper and other state, that should remain consistent and
|
||||
/// synchronized with the disk. This is tokio mutex as we write WAL to disk
|
||||
/// while holding it, ensuring that consensus checks are in order.
|
||||
@@ -317,16 +337,20 @@ pub struct Timeline {
|
||||
impl Timeline {
|
||||
/// Load existing timeline from disk.
|
||||
pub fn load_timeline(
|
||||
conf: SafeKeeperConf,
|
||||
conf: &SafeKeeperConf,
|
||||
ttid: TenantTimelineId,
|
||||
wal_backup_launcher_tx: Sender<TenantTimelineId>,
|
||||
) -> Result<Timeline> {
|
||||
let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered();
|
||||
|
||||
let shared_state = SharedState::restore(&conf, &ttid)?;
|
||||
let shared_state = SharedState::restore(conf, &ttid)?;
|
||||
let rcl = shared_state.sk.state.remote_consistent_lsn;
|
||||
let (commit_lsn_watch_tx, commit_lsn_watch_rx) =
|
||||
watch::channel(shared_state.sk.state.commit_lsn);
|
||||
let (term_flush_lsn_watch_tx, term_flush_lsn_watch_rx) = watch::channel(TermLsn::from((
|
||||
shared_state.sk.get_term(),
|
||||
shared_state.sk.flush_lsn(),
|
||||
)));
|
||||
let (cancellation_tx, cancellation_rx) = watch::channel(false);
|
||||
|
||||
Ok(Timeline {
|
||||
@@ -334,6 +358,8 @@ impl Timeline {
|
||||
wal_backup_launcher_tx,
|
||||
commit_lsn_watch_tx,
|
||||
commit_lsn_watch_rx,
|
||||
term_flush_lsn_watch_tx,
|
||||
term_flush_lsn_watch_rx,
|
||||
mutex: Mutex::new(shared_state),
|
||||
walsenders: WalSenders::new(rcl),
|
||||
walreceivers: WalReceivers::new(),
|
||||
@@ -345,7 +371,7 @@ impl Timeline {
|
||||
|
||||
/// Create a new timeline, which is not yet persisted to disk.
|
||||
pub fn create_empty(
|
||||
conf: SafeKeeperConf,
|
||||
conf: &SafeKeeperConf,
|
||||
ttid: TenantTimelineId,
|
||||
wal_backup_launcher_tx: Sender<TenantTimelineId>,
|
||||
server_info: ServerInfo,
|
||||
@@ -353,6 +379,8 @@ impl Timeline {
|
||||
local_start_lsn: Lsn,
|
||||
) -> Result<Timeline> {
|
||||
let (commit_lsn_watch_tx, commit_lsn_watch_rx) = watch::channel(Lsn::INVALID);
|
||||
let (term_flush_lsn_watch_tx, term_flush_lsn_watch_rx) =
|
||||
watch::channel(TermLsn::from((INVALID_TERM, Lsn::INVALID)));
|
||||
let (cancellation_tx, cancellation_rx) = watch::channel(false);
|
||||
let state = SafeKeeperState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn);
|
||||
|
||||
@@ -361,7 +389,9 @@ impl Timeline {
|
||||
wal_backup_launcher_tx,
|
||||
commit_lsn_watch_tx,
|
||||
commit_lsn_watch_rx,
|
||||
mutex: Mutex::new(SharedState::create_new(&conf, &ttid, state)?),
|
||||
term_flush_lsn_watch_tx,
|
||||
term_flush_lsn_watch_rx,
|
||||
mutex: Mutex::new(SharedState::create_new(conf, &ttid, state)?),
|
||||
walsenders: WalSenders::new(Lsn(0)),
|
||||
walreceivers: WalReceivers::new(),
|
||||
cancellation_rx,
|
||||
@@ -370,12 +400,16 @@ impl Timeline {
|
||||
})
|
||||
}
|
||||
|
||||
/// Initialize fresh timeline on disk and start background tasks. If bootstrap
|
||||
/// Initialize fresh timeline on disk and start background tasks. If init
|
||||
/// fails, timeline is cancelled and cannot be used anymore.
|
||||
///
|
||||
/// Bootstrap is transactional, so if it fails, created files will be deleted,
|
||||
/// Init is transactional, so if it fails, created files will be deleted,
|
||||
/// and state on disk should remain unchanged.
|
||||
pub async fn bootstrap(&self, shared_state: &mut MutexGuard<'_, SharedState>) -> Result<()> {
|
||||
pub async fn init_new(
|
||||
self: &Arc<Timeline>,
|
||||
shared_state: &mut MutexGuard<'_, SharedState>,
|
||||
conf: &SafeKeeperConf,
|
||||
) -> Result<()> {
|
||||
match fs::metadata(&self.timeline_dir).await {
|
||||
Ok(_) => {
|
||||
// Timeline directory exists on disk, we should leave state unchanged
|
||||
@@ -391,7 +425,7 @@ impl Timeline {
|
||||
// Create timeline directory.
|
||||
fs::create_dir_all(&self.timeline_dir).await?;
|
||||
|
||||
// Write timeline to disk and TODO: start background tasks.
|
||||
// Write timeline to disk and start background tasks.
|
||||
if let Err(e) = shared_state.sk.persist().await {
|
||||
// Bootstrap failed, cancel timeline and remove timeline directory.
|
||||
self.cancel(shared_state);
|
||||
@@ -405,12 +439,16 @@ impl Timeline {
|
||||
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
// TODO: add more initialization steps here
|
||||
self.update_status(shared_state);
|
||||
self.bootstrap(conf);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Bootstrap new or existing timeline starting background stasks.
|
||||
pub fn bootstrap(self: &Arc<Timeline>, conf: &SafeKeeperConf) {
|
||||
// Start recovery task which always runs on the timeline.
|
||||
tokio::spawn(recovery_main(self.clone(), conf.clone()));
|
||||
}
|
||||
|
||||
/// Delete timeline from disk completely, by removing timeline directory. Background
|
||||
/// timeline activities will stop eventually.
|
||||
pub async fn delete_from_disk(
|
||||
@@ -444,6 +482,16 @@ impl Timeline {
|
||||
*self.cancellation_rx.borrow()
|
||||
}
|
||||
|
||||
/// Returns watch channel which gets value when timeline is cancelled. It is
|
||||
/// guaranteed to have not cancelled value observed (errors otherwise).
|
||||
pub fn get_cancellation_rx(&self) -> Result<watch::Receiver<bool>> {
|
||||
let rx = self.cancellation_rx.clone();
|
||||
if *rx.borrow() {
|
||||
bail!(TimelineError::Cancelled(self.ttid));
|
||||
}
|
||||
Ok(rx)
|
||||
}
|
||||
|
||||
/// Take a writing mutual exclusive lock on timeline shared_state.
|
||||
pub async fn write_shared_state(&self) -> MutexGuard<SharedState> {
|
||||
self.mutex.lock().await
|
||||
@@ -520,6 +568,11 @@ impl Timeline {
|
||||
self.commit_lsn_watch_rx.clone()
|
||||
}
|
||||
|
||||
/// Returns term_flush_lsn watch channel.
|
||||
pub fn get_term_flush_lsn_watch_rx(&self) -> watch::Receiver<TermLsn> {
|
||||
self.term_flush_lsn_watch_rx.clone()
|
||||
}
|
||||
|
||||
/// Pass arrived message to the safekeeper.
|
||||
pub async fn process_msg(
|
||||
&self,
|
||||
@@ -531,6 +584,7 @@ impl Timeline {
|
||||
|
||||
let mut rmsg: Option<AcceptorProposerMessage>;
|
||||
let commit_lsn: Lsn;
|
||||
let term_flush_lsn: TermLsn;
|
||||
{
|
||||
let mut shared_state = self.write_shared_state().await;
|
||||
rmsg = shared_state.sk.process_msg(msg).await?;
|
||||
@@ -544,8 +598,11 @@ impl Timeline {
|
||||
}
|
||||
|
||||
commit_lsn = shared_state.sk.inmem.commit_lsn;
|
||||
term_flush_lsn =
|
||||
TermLsn::from((shared_state.sk.get_term(), shared_state.sk.flush_lsn()));
|
||||
}
|
||||
self.commit_lsn_watch_tx.send(commit_lsn)?;
|
||||
self.term_flush_lsn_watch_tx.send(term_flush_lsn)?;
|
||||
Ok(rmsg)
|
||||
}
|
||||
|
||||
|
||||
@@ -11,7 +11,7 @@ use serde::Serialize;
|
||||
use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
use std::str::FromStr;
|
||||
use std::sync::{Arc, Mutex, MutexGuard};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tracing::*;
|
||||
use utils::id::{TenantId, TenantTimelineId, TimelineId};
|
||||
@@ -71,19 +71,23 @@ pub struct GlobalTimelines;
|
||||
|
||||
impl GlobalTimelines {
|
||||
/// Inject dependencies needed for the timeline constructors and load all timelines to memory.
|
||||
pub fn init(
|
||||
pub async fn init(
|
||||
conf: SafeKeeperConf,
|
||||
wal_backup_launcher_tx: Sender<TenantTimelineId>,
|
||||
) -> Result<()> {
|
||||
let mut state = TIMELINES_STATE.lock().unwrap();
|
||||
assert!(state.wal_backup_launcher_tx.is_none());
|
||||
state.wal_backup_launcher_tx = Some(wal_backup_launcher_tx);
|
||||
state.conf = Some(conf);
|
||||
// clippy isn't smart enough to understand that drop(state) releases the
|
||||
// lock, so use explicit block
|
||||
let tenants_dir = {
|
||||
let mut state = TIMELINES_STATE.lock().unwrap();
|
||||
assert!(state.wal_backup_launcher_tx.is_none());
|
||||
state.wal_backup_launcher_tx = Some(wal_backup_launcher_tx);
|
||||
state.conf = Some(conf);
|
||||
|
||||
// Iterate through all directories and load tenants for all directories
|
||||
// named as a valid tenant_id.
|
||||
// Iterate through all directories and load tenants for all directories
|
||||
// named as a valid tenant_id.
|
||||
state.get_conf().workdir.clone()
|
||||
};
|
||||
let mut tenant_count = 0;
|
||||
let tenants_dir = state.get_conf().workdir.clone();
|
||||
for tenants_dir_entry in std::fs::read_dir(&tenants_dir)
|
||||
.with_context(|| format!("failed to list tenants dir {}", tenants_dir.display()))?
|
||||
{
|
||||
@@ -93,7 +97,7 @@ impl GlobalTimelines {
|
||||
TenantId::from_str(tenants_dir_entry.file_name().to_str().unwrap_or(""))
|
||||
{
|
||||
tenant_count += 1;
|
||||
GlobalTimelines::load_tenant_timelines(&mut state, tenant_id)?;
|
||||
GlobalTimelines::load_tenant_timelines(tenant_id).await?;
|
||||
}
|
||||
}
|
||||
Err(e) => error!(
|
||||
@@ -108,7 +112,7 @@ impl GlobalTimelines {
|
||||
info!(
|
||||
"found {} tenants directories, successfully loaded {} timelines",
|
||||
tenant_count,
|
||||
state.timelines.len()
|
||||
TIMELINES_STATE.lock().unwrap().timelines.len()
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
@@ -116,17 +120,21 @@ impl GlobalTimelines {
|
||||
/// Loads all timelines for the given tenant to memory. Returns fs::read_dir
|
||||
/// errors if any.
|
||||
///
|
||||
/// Note: This function (and all reading/loading below) is sync because
|
||||
/// timelines are loaded while holding GlobalTimelinesState lock. Which is
|
||||
/// fine as this is called only from single threaded main runtime on boot,
|
||||
/// but clippy complains anyway, and suppressing that isn't trivial as async
|
||||
/// is the keyword, ha. That only other user is pull_timeline.rs for which
|
||||
/// being blocked is not that bad, and we can do spawn_blocking.
|
||||
fn load_tenant_timelines(
|
||||
state: &mut MutexGuard<'_, GlobalTimelinesState>,
|
||||
tenant_id: TenantId,
|
||||
) -> Result<()> {
|
||||
let timelines_dir = state.get_conf().tenant_dir(&tenant_id);
|
||||
/// It is async for update_status_notify sake. Since TIMELINES_STATE lock is
|
||||
/// sync and there is no important reason to make it async (it is always
|
||||
/// held for a short while) we just lock and unlock it for each timeline --
|
||||
/// this function is called during init when nothing else is running, so
|
||||
/// this is fine.
|
||||
async fn load_tenant_timelines(tenant_id: TenantId) -> Result<()> {
|
||||
let (conf, wal_backup_launcher_tx) = {
|
||||
let state = TIMELINES_STATE.lock().unwrap();
|
||||
(
|
||||
state.get_conf().clone(),
|
||||
state.wal_backup_launcher_tx.as_ref().unwrap().clone(),
|
||||
)
|
||||
};
|
||||
|
||||
let timelines_dir = conf.tenant_dir(&tenant_id);
|
||||
for timelines_dir_entry in std::fs::read_dir(&timelines_dir)
|
||||
.with_context(|| format!("failed to list timelines dir {}", timelines_dir.display()))?
|
||||
{
|
||||
@@ -136,13 +144,16 @@ impl GlobalTimelines {
|
||||
TimelineId::from_str(timeline_dir_entry.file_name().to_str().unwrap_or(""))
|
||||
{
|
||||
let ttid = TenantTimelineId::new(tenant_id, timeline_id);
|
||||
match Timeline::load_timeline(
|
||||
state.get_conf().clone(),
|
||||
ttid,
|
||||
state.wal_backup_launcher_tx.as_ref().unwrap().clone(),
|
||||
) {
|
||||
match Timeline::load_timeline(&conf, ttid, wal_backup_launcher_tx.clone()) {
|
||||
Ok(timeline) => {
|
||||
state.timelines.insert(ttid, Arc::new(timeline));
|
||||
let tli = Arc::new(timeline);
|
||||
TIMELINES_STATE
|
||||
.lock()
|
||||
.unwrap()
|
||||
.timelines
|
||||
.insert(ttid, tli.clone());
|
||||
tli.bootstrap(&conf);
|
||||
tli.update_status_notify().await.unwrap();
|
||||
}
|
||||
// If we can't load a timeline, it's most likely because of a corrupted
|
||||
// directory. We will log an error and won't allow to delete/recreate
|
||||
@@ -168,18 +179,22 @@ impl GlobalTimelines {
|
||||
}
|
||||
|
||||
/// Load timeline from disk to the memory.
|
||||
pub fn load_timeline(ttid: TenantTimelineId) -> Result<Arc<Timeline>> {
|
||||
pub async fn load_timeline(ttid: TenantTimelineId) -> Result<Arc<Timeline>> {
|
||||
let (conf, wal_backup_launcher_tx) = TIMELINES_STATE.lock().unwrap().get_dependencies();
|
||||
|
||||
match Timeline::load_timeline(conf, ttid, wal_backup_launcher_tx) {
|
||||
match Timeline::load_timeline(&conf, ttid, wal_backup_launcher_tx) {
|
||||
Ok(timeline) => {
|
||||
let tli = Arc::new(timeline);
|
||||
|
||||
// TODO: prevent concurrent timeline creation/loading
|
||||
TIMELINES_STATE
|
||||
.lock()
|
||||
.unwrap()
|
||||
.timelines
|
||||
.insert(ttid, tli.clone());
|
||||
|
||||
tli.bootstrap(&conf);
|
||||
|
||||
Ok(tli)
|
||||
}
|
||||
// If we can't load a timeline, it's bad. Caller will figure it out.
|
||||
@@ -217,7 +232,7 @@ impl GlobalTimelines {
|
||||
info!("creating new timeline {}", ttid);
|
||||
|
||||
let timeline = Arc::new(Timeline::create_empty(
|
||||
conf,
|
||||
&conf,
|
||||
ttid,
|
||||
wal_backup_launcher_tx,
|
||||
server_info,
|
||||
@@ -240,23 +255,24 @@ impl GlobalTimelines {
|
||||
// Write the new timeline to the disk and start background workers.
|
||||
// Bootstrap is transactional, so if it fails, the timeline will be deleted,
|
||||
// and the state on disk should remain unchanged.
|
||||
if let Err(e) = timeline.bootstrap(&mut shared_state).await {
|
||||
// Note: the most likely reason for bootstrap failure is that the timeline
|
||||
if let Err(e) = timeline.init_new(&mut shared_state, &conf).await {
|
||||
// Note: the most likely reason for init failure is that the timeline
|
||||
// directory already exists on disk. This happens when timeline is corrupted
|
||||
// and wasn't loaded from disk on startup because of that. We want to preserve
|
||||
// the timeline directory in this case, for further inspection.
|
||||
|
||||
// TODO: this is an unusual error, perhaps we should send it to sentry
|
||||
// TODO: compute will try to create timeline every second, we should add backoff
|
||||
error!("failed to bootstrap timeline {}: {}", ttid, e);
|
||||
error!("failed to init new timeline {}: {}", ttid, e);
|
||||
|
||||
// Timeline failed to bootstrap, it cannot be used. Remove it from the map.
|
||||
// Timeline failed to init, it cannot be used. Remove it from the map.
|
||||
TIMELINES_STATE.lock().unwrap().timelines.remove(&ttid);
|
||||
return Err(e);
|
||||
}
|
||||
// We are done with bootstrap, release the lock, return the timeline.
|
||||
// {} block forces release before .await
|
||||
}
|
||||
timeline.update_status_notify().await?;
|
||||
timeline.wal_backup_launcher_tx.send(timeline.ttid).await?;
|
||||
Ok(timeline)
|
||||
}
|
||||
|
||||
@@ -12,25 +12,26 @@ import psycopg2.extras
|
||||
# We call the test "flaky" if it failed at least once on the main branch in the last N=10 days.
|
||||
FLAKY_TESTS_QUERY = """
|
||||
SELECT
|
||||
DISTINCT parent_suite, suite, test
|
||||
DISTINCT parent_suite, suite, REGEXP_REPLACE(test, '(release|debug)-pg(\\d+)-?', '') as deparametrized_test
|
||||
FROM
|
||||
(
|
||||
SELECT
|
||||
revision,
|
||||
jsonb_array_elements(data -> 'children') -> 'name' as parent_suite,
|
||||
jsonb_array_elements(jsonb_array_elements(data -> 'children') -> 'children') -> 'name' as suite,
|
||||
jsonb_array_elements(jsonb_array_elements(jsonb_array_elements(data -> 'children') -> 'children') -> 'children') -> 'name' as test,
|
||||
jsonb_array_elements(jsonb_array_elements(jsonb_array_elements(data -> 'children') -> 'children') -> 'children') -> 'status' as status,
|
||||
jsonb_array_elements(jsonb_array_elements(jsonb_array_elements(data -> 'children') -> 'children') -> 'children') -> 'retriesStatusChange' as retries_status_change,
|
||||
to_timestamp((jsonb_array_elements(jsonb_array_elements(jsonb_array_elements(data -> 'children') -> 'children') -> 'children') -> 'time' -> 'start')::bigint / 1000)::date as timestamp
|
||||
reference,
|
||||
jsonb_array_elements(data -> 'children') ->> 'name' as parent_suite,
|
||||
jsonb_array_elements(jsonb_array_elements(data -> 'children') -> 'children') ->> 'name' as suite,
|
||||
jsonb_array_elements(jsonb_array_elements(jsonb_array_elements(data -> 'children') -> 'children') -> 'children') ->> 'name' as test,
|
||||
jsonb_array_elements(jsonb_array_elements(jsonb_array_elements(data -> 'children') -> 'children') -> 'children') ->> 'status' as status,
|
||||
jsonb_array_elements(jsonb_array_elements(jsonb_array_elements(data -> 'children') -> 'children') -> 'children') ->> 'retriesStatusChange' as retries_status_change,
|
||||
to_timestamp((jsonb_array_elements(jsonb_array_elements(jsonb_array_elements(data -> 'children') -> 'children') -> 'children') -> 'time' ->> 'start')::bigint / 1000)::date as timestamp
|
||||
FROM
|
||||
regress_test_results
|
||||
WHERE
|
||||
reference = 'refs/heads/main'
|
||||
) data
|
||||
WHERE
|
||||
timestamp > CURRENT_DATE - INTERVAL '%s' day
|
||||
AND (status::text IN ('"failed"', '"broken"') OR retries_status_change::boolean)
|
||||
AND (
|
||||
(status IN ('failed', 'broken') AND reference = 'refs/heads/main')
|
||||
OR retries_status_change::boolean
|
||||
)
|
||||
;
|
||||
"""
|
||||
|
||||
@@ -40,6 +41,9 @@ def main(args: argparse.Namespace):
|
||||
interval_days = args.days
|
||||
output = args.output
|
||||
|
||||
build_type = args.build_type
|
||||
pg_version = args.pg_version
|
||||
|
||||
res: DefaultDict[str, DefaultDict[str, Dict[str, bool]]]
|
||||
res = defaultdict(lambda: defaultdict(dict))
|
||||
|
||||
@@ -55,8 +59,21 @@ def main(args: argparse.Namespace):
|
||||
rows = []
|
||||
|
||||
for row in rows:
|
||||
logging.info(f"\t{row['parent_suite'].replace('.', '/')}/{row['suite']}.py::{row['test']}")
|
||||
res[row["parent_suite"]][row["suite"]][row["test"]] = True
|
||||
# We don't want to automatically rerun tests in a performance suite
|
||||
if row["parent_suite"] != "test_runner.regress":
|
||||
continue
|
||||
|
||||
deparametrized_test = row["deparametrized_test"]
|
||||
dash_if_needed = "" if deparametrized_test.endswith("[]") else "-"
|
||||
parametrized_test = deparametrized_test.replace(
|
||||
"[",
|
||||
f"[{build_type}-pg{pg_version}{dash_if_needed}",
|
||||
)
|
||||
res[row["parent_suite"]][row["suite"]][parametrized_test] = True
|
||||
|
||||
logging.info(
|
||||
f"\t{row['parent_suite'].replace('.', '/')}/{row['suite']}.py::{parametrized_test}"
|
||||
)
|
||||
|
||||
logging.info(f"saving results to {output.name}")
|
||||
json.dump(res, output, indent=2)
|
||||
@@ -77,6 +94,18 @@ if __name__ == "__main__":
|
||||
type=int,
|
||||
help="how many days to look back for flaky tests (default: 10)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--build-type",
|
||||
required=True,
|
||||
type=str,
|
||||
help="for which build type to create list of flaky tests (debug or release)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--pg-version",
|
||||
required=True,
|
||||
type=int,
|
||||
help="for which Postgres version to create list of flaky tests (14, 15, etc.)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"connstr",
|
||||
help="connection string to the test results database",
|
||||
|
||||
@@ -125,6 +125,7 @@ async fn publish(client: Option<BrokerClientChannel>, n_keys: u64) {
|
||||
tenant_id: vec![0xFF; 16],
|
||||
timeline_id: tli_from_u64(counter % n_keys),
|
||||
}),
|
||||
term: 0,
|
||||
last_log_term: 0,
|
||||
flush_lsn: counter,
|
||||
commit_lsn: 2,
|
||||
@@ -132,6 +133,7 @@ async fn publish(client: Option<BrokerClientChannel>, n_keys: u64) {
|
||||
remote_consistent_lsn: 4,
|
||||
peer_horizon_lsn: 5,
|
||||
safekeeper_connstr: "zenith-1-sk-1.local:7676".to_owned(),
|
||||
http_connstr: "zenith-1-sk-1.local:7677".to_owned(),
|
||||
local_start_lsn: 0,
|
||||
availability_zone: None,
|
||||
};
|
||||
|
||||
@@ -22,6 +22,8 @@ message SubscribeSafekeeperInfoRequest {
|
||||
message SafekeeperTimelineInfo {
|
||||
uint64 safekeeper_id = 1;
|
||||
TenantTimelineId tenant_timeline_id = 2;
|
||||
// Safekeeper term
|
||||
uint64 term = 12;
|
||||
// Term of the last entry.
|
||||
uint64 last_log_term = 3;
|
||||
// LSN of the last record.
|
||||
@@ -36,6 +38,8 @@ message SafekeeperTimelineInfo {
|
||||
uint64 local_start_lsn = 9;
|
||||
// A connection string to use for WAL receiving.
|
||||
string safekeeper_connstr = 10;
|
||||
// HTTP endpoint connection string
|
||||
string http_connstr = 13;
|
||||
// Availability zone of a safekeeper.
|
||||
optional string availability_zone = 11;
|
||||
}
|
||||
|
||||
@@ -519,6 +519,7 @@ mod tests {
|
||||
tenant_id: vec![0x00; 16],
|
||||
timeline_id,
|
||||
}),
|
||||
term: 0,
|
||||
last_log_term: 0,
|
||||
flush_lsn: 1,
|
||||
commit_lsn: 2,
|
||||
@@ -526,6 +527,7 @@ mod tests {
|
||||
remote_consistent_lsn: 4,
|
||||
peer_horizon_lsn: 5,
|
||||
safekeeper_connstr: "neon-1-sk-1.local:7676".to_owned(),
|
||||
http_connstr: "neon-1-sk-1.local:7677".to_owned(),
|
||||
local_start_lsn: 0,
|
||||
availability_zone: None,
|
||||
}
|
||||
|
||||
@@ -233,10 +233,19 @@ if TYPE_CHECKING:
|
||||
|
||||
def assert_prefix_empty(neon_env_builder: "NeonEnvBuilder", prefix: Optional[str] = None):
|
||||
response = list_prefix(neon_env_builder, prefix)
|
||||
objects = response.get("Contents")
|
||||
assert (
|
||||
response["KeyCount"] == 0
|
||||
), f"remote dir with prefix {prefix} is not empty after deletion: {objects}"
|
||||
keys = response["KeyCount"]
|
||||
objects = response.get("Contents", [])
|
||||
|
||||
if keys != 0 and len(objects) == 0:
|
||||
# this has been seen in one case with mock_s3:
|
||||
# https://neon-github-public-dev.s3.amazonaws.com/reports/pr-4938/6000769714/index.html#suites/3556ed71f2d69272a7014df6dcb02317/ca01e4f4d8d9a11f
|
||||
# looking at moto impl, it might be there's a race with common prefix (sub directory) not going away with deletes
|
||||
common_prefixes = response.get("CommonPrefixes", [])
|
||||
log.warn(
|
||||
f"contradicting ListObjectsV2 response with KeyCount={keys} and Contents={objects}, CommonPrefixes={common_prefixes}"
|
||||
)
|
||||
|
||||
assert keys == 0, f"remote dir with prefix {prefix} is not empty after deletion: {objects}"
|
||||
|
||||
|
||||
def assert_prefix_not_empty(neon_env_builder: "NeonEnvBuilder", prefix: Optional[str] = None):
|
||||
|
||||
@@ -0,0 +1,32 @@
|
||||
import queue
|
||||
import threading
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin, wait_for_last_flush_lsn
|
||||
from fixtures.types import TenantId
|
||||
|
||||
|
||||
def test_pageserver_startup_many_tenants(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
# below doesn't work because summaries contain tenant and timeline ids and we check for them
|
||||
|
||||
tenant_id, timeline_id = env.initial_tenant, env.initial_timeline
|
||||
pshttp = env.pageserver.http_client()
|
||||
ep = env.endpoints.create_start("main")
|
||||
ep.safe_psql("create table foo(b text)")
|
||||
for i in range(0, 8):
|
||||
ep.safe_psql("insert into foo(b) values ('some text')")
|
||||
# pg_bin.run_capture(["pgbench", "-i", "-s1", ep.connstr()])
|
||||
wait_for_last_flush_lsn(env, ep, tenant_id, timeline_id)
|
||||
pshttp.timeline_checkpoint(tenant_id, timeline_id)
|
||||
ep.stop_and_destroy()
|
||||
|
||||
env.pageserver.stop()
|
||||
for sk in env.safekeepers:
|
||||
sk.stop()
|
||||
|
||||
tenant_dir = env.repo_dir / "tenants" / str(env.initial_tenant)
|
||||
|
||||
for i in range(0, 20_000):
|
||||
import shutil
|
||||
|
||||
shutil.copytree(tenant_dir, tenant_dir.parent / str(TenantId.generate()))
|
||||
@@ -0,0 +1,24 @@
|
||||
{
|
||||
"public_extensions": [
|
||||
"anon",
|
||||
"pg_buffercache"
|
||||
],
|
||||
"library_index": {
|
||||
"anon": "anon",
|
||||
"pg_buffercache": "pg_buffercache"
|
||||
},
|
||||
"extension_data": {
|
||||
"pg_buffercache": {
|
||||
"control_data": {
|
||||
"pg_buffercache.control": "# pg_buffercache extension \ncomment = 'examine the shared buffer cache' \ndefault_version = '1.3' \nmodule_pathname = '$libdir/pg_buffercache' \nrelocatable = true \ntrusted=true"
|
||||
},
|
||||
"archive_path": "5670669815/v14/extensions/pg_buffercache.tar.zst"
|
||||
},
|
||||
"anon": {
|
||||
"control_data": {
|
||||
"anon.control": "# PostgreSQL Anonymizer (anon) extension \ncomment = 'Data anonymization tools' \ndefault_version = '1.1.0' \ndirectory='extension/anon' \nrelocatable = false \nrequires = 'pgcrypto' \nsuperuser = false \nmodule_pathname = '$libdir/anon' \ntrusted = true \n"
|
||||
},
|
||||
"archive_path": "5670669815/v14/extensions/anon.tar.zst"
|
||||
}
|
||||
}
|
||||
}
|
||||
Binary file not shown.
Binary file not shown.
@@ -0,0 +1,17 @@
|
||||
{
|
||||
"public_extensions": [
|
||||
"anon"
|
||||
],
|
||||
"library_index": {
|
||||
"anon": "anon"
|
||||
},
|
||||
"extension_data": {
|
||||
"anon": {
|
||||
"control_data": {
|
||||
"anon.control": "# PostgreSQL Anonymizer (anon) extension \ncomment = 'Data anonymization tools' \ndefault_version = '1.1.0' \ndirectory='extension/anon' \nrelocatable = false \nrequires = 'pgcrypto' \nsuperuser = false \nmodule_pathname = '$libdir/anon' \ntrusted = true \n"
|
||||
},
|
||||
"archive_path": "5670669815/v15/extensions/anon.tar.zst"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Binary file not shown.
327
test_runner/regress/test_download_extensions.py
Normal file
327
test_runner/regress/test_download_extensions.py
Normal file
@@ -0,0 +1,327 @@
|
||||
import os
|
||||
import shutil
|
||||
import threading
|
||||
from contextlib import closing
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
)
|
||||
from fixtures.pg_version import PgVersion
|
||||
from fixtures.remote_storage import RemoteStorageKind, available_s3_storages
|
||||
|
||||
|
||||
# Cleaning up downloaded files is important for local tests
|
||||
# or else one test could reuse the files from another test or another test run
|
||||
def cleanup(pg_version):
|
||||
PGDIR = Path(f"pg_install/v{pg_version}")
|
||||
|
||||
LIB_DIR = PGDIR / Path("lib/postgresql")
|
||||
cleanup_lib_globs = ["anon*", "postgis*", "pg_buffercache*"]
|
||||
cleanup_lib_glob_paths = [LIB_DIR.glob(x) for x in cleanup_lib_globs]
|
||||
|
||||
SHARE_DIR = PGDIR / Path("share/postgresql/extension")
|
||||
cleanup_ext_globs = [
|
||||
"anon*",
|
||||
"address_standardizer*",
|
||||
"postgis*",
|
||||
"pageinspect*",
|
||||
"pg_buffercache*",
|
||||
"pgrouting*",
|
||||
]
|
||||
cleanup_ext_glob_paths = [SHARE_DIR.glob(x) for x in cleanup_ext_globs]
|
||||
|
||||
all_glob_paths = cleanup_lib_glob_paths + cleanup_ext_glob_paths
|
||||
all_cleanup_files = []
|
||||
for file_glob in all_glob_paths:
|
||||
for file in file_glob:
|
||||
all_cleanup_files.append(file)
|
||||
|
||||
for file in all_cleanup_files:
|
||||
try:
|
||||
os.remove(file)
|
||||
log.info(f"removed file {file}")
|
||||
except Exception as err:
|
||||
log.info(
|
||||
f"skipping remove of file {file} because it doesn't exist.\
|
||||
this may be expected or unexpected depending on the test {err}"
|
||||
)
|
||||
|
||||
cleanup_folders = [SHARE_DIR / Path("anon"), PGDIR / Path("download_extensions")]
|
||||
for folder in cleanup_folders:
|
||||
try:
|
||||
shutil.rmtree(folder)
|
||||
log.info(f"removed folder {folder}")
|
||||
except Exception as err:
|
||||
log.info(
|
||||
f"skipping remove of folder {folder} because it doesn't exist.\
|
||||
this may be expected or unexpected depending on the test {err}"
|
||||
)
|
||||
|
||||
|
||||
def upload_files(env):
|
||||
log.info("Uploading test files to mock bucket")
|
||||
os.chdir("test_runner/regress/data/extension_test")
|
||||
for path in os.walk("."):
|
||||
prefix, _, files = path
|
||||
for file in files:
|
||||
# the [2:] is to remove the leading "./"
|
||||
full_path = os.path.join(prefix, file)[2:]
|
||||
|
||||
with open(full_path, "rb") as f:
|
||||
log.info(f"UPLOAD {full_path} to ext/{full_path}")
|
||||
env.remote_storage_client.upload_fileobj(
|
||||
f,
|
||||
env.ext_remote_storage.bucket_name,
|
||||
f"ext/{full_path}",
|
||||
)
|
||||
os.chdir("../../../..")
|
||||
|
||||
|
||||
# Test downloading remote extension.
|
||||
@pytest.mark.parametrize("remote_storage_kind", available_s3_storages())
|
||||
@pytest.mark.skip(reason="https://github.com/neondatabase/neon/issues/4949")
|
||||
def test_remote_extensions(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
remote_storage_kind: RemoteStorageKind,
|
||||
pg_version: PgVersion,
|
||||
):
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind=remote_storage_kind,
|
||||
test_name="test_remote_extensions",
|
||||
enable_remote_extensions=True,
|
||||
)
|
||||
env = neon_env_builder.init_start()
|
||||
tenant_id, _ = env.neon_cli.create_tenant()
|
||||
env.neon_cli.create_timeline("test_remote_extensions", tenant_id=tenant_id)
|
||||
|
||||
assert env.ext_remote_storage is not None # satisfy mypy
|
||||
assert env.remote_storage_client is not None # satisfy mypy
|
||||
|
||||
# For MOCK_S3 we upload test files.
|
||||
# For REAL_S3 we use the files already in the bucket
|
||||
if remote_storage_kind == RemoteStorageKind.MOCK_S3:
|
||||
upload_files(env)
|
||||
|
||||
# Start a compute node and check that it can download the extensions
|
||||
# and use them to CREATE EXTENSION and LOAD
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_remote_extensions",
|
||||
tenant_id=tenant_id,
|
||||
remote_ext_config=env.ext_remote_storage.to_string(),
|
||||
# config_lines=["log_min_messages=debug3"],
|
||||
)
|
||||
try:
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
# Check that appropriate control files were downloaded
|
||||
cur.execute("SELECT * FROM pg_available_extensions")
|
||||
all_extensions = [x[0] for x in cur.fetchall()]
|
||||
log.info(all_extensions)
|
||||
assert "anon" in all_extensions
|
||||
|
||||
# postgis is on real s3 but not mock s3.
|
||||
# it's kind of a big file, would rather not upload to github
|
||||
if remote_storage_kind == RemoteStorageKind.REAL_S3:
|
||||
assert "postgis" in all_extensions
|
||||
# this may fail locally if dependency is missing
|
||||
# we don't really care about the error,
|
||||
# we just want to make sure it downloaded
|
||||
try:
|
||||
cur.execute("CREATE EXTENSION postgis")
|
||||
except Exception as err:
|
||||
log.info(f"(expected) error creating postgis extension: {err}")
|
||||
# we do not check the error, so this is basically a NO-OP
|
||||
# however checking the log you can make sure that it worked
|
||||
# and also get valuable information about how long loading the extension took
|
||||
|
||||
# this is expected to fail on my computer because I don't have the pgcrypto extension
|
||||
try:
|
||||
cur.execute("CREATE EXTENSION anon")
|
||||
except Exception as err:
|
||||
log.info("error creating anon extension")
|
||||
assert "pgcrypto" in str(err), "unexpected error creating anon extension"
|
||||
finally:
|
||||
cleanup(pg_version)
|
||||
|
||||
|
||||
# Test downloading remote library.
|
||||
@pytest.mark.parametrize("remote_storage_kind", available_s3_storages())
|
||||
@pytest.mark.skip(reason="https://github.com/neondatabase/neon/issues/4949")
|
||||
def test_remote_library(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
remote_storage_kind: RemoteStorageKind,
|
||||
pg_version: PgVersion,
|
||||
):
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind=remote_storage_kind,
|
||||
test_name="test_remote_library",
|
||||
enable_remote_extensions=True,
|
||||
)
|
||||
env = neon_env_builder.init_start()
|
||||
tenant_id, _ = env.neon_cli.create_tenant()
|
||||
env.neon_cli.create_timeline("test_remote_library", tenant_id=tenant_id)
|
||||
|
||||
assert env.ext_remote_storage is not None # satisfy mypy
|
||||
assert env.remote_storage_client is not None # satisfy mypy
|
||||
|
||||
# For MOCK_S3 we upload test files.
|
||||
# For REAL_S3 we use the files already in the bucket
|
||||
if remote_storage_kind == RemoteStorageKind.MOCK_S3:
|
||||
upload_files(env)
|
||||
|
||||
# and use them to run LOAD library
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_remote_library",
|
||||
tenant_id=tenant_id,
|
||||
remote_ext_config=env.ext_remote_storage.to_string(),
|
||||
# config_lines=["log_min_messages=debug3"],
|
||||
)
|
||||
try:
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
# try to load library
|
||||
try:
|
||||
cur.execute("LOAD 'anon'")
|
||||
except Exception as err:
|
||||
log.info(f"error loading anon library: {err}")
|
||||
raise AssertionError("unexpected error loading anon library") from err
|
||||
|
||||
# test library which name is different from extension name
|
||||
# this may fail locally if dependency is missing
|
||||
# however, it does successfully download the postgis archive
|
||||
if remote_storage_kind == RemoteStorageKind.REAL_S3:
|
||||
try:
|
||||
cur.execute("LOAD 'postgis_topology-3'")
|
||||
except Exception as err:
|
||||
log.info("error loading postgis_topology-3")
|
||||
assert "No such file or directory" in str(
|
||||
err
|
||||
), "unexpected error loading postgis_topology-3"
|
||||
finally:
|
||||
cleanup(pg_version)
|
||||
|
||||
|
||||
# Here we test a complex extension
|
||||
# which has multiple extensions in one archive
|
||||
# using postgis as an example
|
||||
# @pytest.mark.skipif(
|
||||
# RemoteStorageKind.REAL_S3 not in available_s3_storages(),
|
||||
# reason="skipping test because real s3 not enabled",
|
||||
# )
|
||||
@pytest.mark.skip(reason="https://github.com/neondatabase/neon/issues/4949")
|
||||
def test_multiple_extensions_one_archive(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
pg_version: PgVersion,
|
||||
):
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind=RemoteStorageKind.REAL_S3,
|
||||
test_name="test_multiple_extensions_one_archive",
|
||||
enable_remote_extensions=True,
|
||||
)
|
||||
env = neon_env_builder.init_start()
|
||||
tenant_id, _ = env.neon_cli.create_tenant()
|
||||
env.neon_cli.create_timeline("test_multiple_extensions_one_archive", tenant_id=tenant_id)
|
||||
|
||||
assert env.ext_remote_storage is not None # satisfy mypy
|
||||
assert env.remote_storage_client is not None # satisfy mypy
|
||||
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_multiple_extensions_one_archive",
|
||||
tenant_id=tenant_id,
|
||||
remote_ext_config=env.ext_remote_storage.to_string(),
|
||||
)
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("CREATE EXTENSION address_standardizer;")
|
||||
cur.execute("CREATE EXTENSION address_standardizer_data_us;")
|
||||
# execute query to ensure that it works
|
||||
cur.execute(
|
||||
"SELECT house_num, name, suftype, city, country, state, unit \
|
||||
FROM standardize_address('us_lex', 'us_gaz', 'us_rules', \
|
||||
'One Rust Place, Boston, MA 02109');"
|
||||
)
|
||||
res = cur.fetchall()
|
||||
log.info(res)
|
||||
assert len(res) > 0
|
||||
|
||||
cleanup(pg_version)
|
||||
|
||||
|
||||
# Test that extension is downloaded after endpoint restart,
|
||||
# when the library is used in the query.
|
||||
#
|
||||
# Run the test with mutliple simultaneous connections to an endpoint.
|
||||
# to ensure that the extension is downloaded only once.
|
||||
#
|
||||
@pytest.mark.skip(reason="https://github.com/neondatabase/neon/issues/4949")
|
||||
def test_extension_download_after_restart(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
pg_version: PgVersion,
|
||||
):
|
||||
if "15" in pg_version: # SKIP v15 for now because test set only has extension built for v14
|
||||
return None
|
||||
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind=RemoteStorageKind.MOCK_S3,
|
||||
test_name="test_extension_download_after_restart",
|
||||
enable_remote_extensions=True,
|
||||
)
|
||||
env = neon_env_builder.init_start()
|
||||
tenant_id, _ = env.neon_cli.create_tenant()
|
||||
env.neon_cli.create_timeline("test_extension_download_after_restart", tenant_id=tenant_id)
|
||||
|
||||
assert env.ext_remote_storage is not None # satisfy mypy
|
||||
assert env.remote_storage_client is not None # satisfy mypy
|
||||
|
||||
# For MOCK_S3 we upload test files.
|
||||
upload_files(env)
|
||||
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_extension_download_after_restart",
|
||||
tenant_id=tenant_id,
|
||||
remote_ext_config=env.ext_remote_storage.to_string(),
|
||||
config_lines=["log_min_messages=debug3"],
|
||||
)
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("CREATE extension pg_buffercache;")
|
||||
cur.execute("SELECT * from pg_buffercache;")
|
||||
res = cur.fetchall()
|
||||
assert len(res) > 0
|
||||
log.info(res)
|
||||
|
||||
# shutdown compute node
|
||||
endpoint.stop()
|
||||
# remove extension files locally
|
||||
cleanup(pg_version)
|
||||
|
||||
# spin up compute node again (there are no extension files available, because compute is stateless)
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_extension_download_after_restart",
|
||||
tenant_id=tenant_id,
|
||||
remote_ext_config=env.ext_remote_storage.to_string(),
|
||||
config_lines=["log_min_messages=debug3"],
|
||||
)
|
||||
|
||||
# connect to compute node and run the query
|
||||
# that will trigger the download of the extension
|
||||
def run_query(endpoint, thread_id: int):
|
||||
log.info("thread_id {%d} starting", thread_id)
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("SELECT * from pg_buffercache;")
|
||||
res = cur.fetchall()
|
||||
assert len(res) > 0
|
||||
log.info("thread_id {%d}, res = %s", thread_id, res)
|
||||
|
||||
threads = [threading.Thread(target=run_query, args=(endpoint, i)) for i in range(2)]
|
||||
|
||||
for thread in threads:
|
||||
thread.start()
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
|
||||
cleanup(pg_version)
|
||||
@@ -1,12 +0,0 @@
|
||||
# Configuration for cgroups in VM compute nodes
|
||||
group neon-postgres {
|
||||
perm {
|
||||
admin {
|
||||
uid = vm-informant;
|
||||
}
|
||||
task {
|
||||
gid = users;
|
||||
}
|
||||
}
|
||||
memory {}
|
||||
}
|
||||
Reference in New Issue
Block a user